diff --git a/fileTransfer/send.go b/fileTransfer/send.go index b42c16d13cb3c589a3b8dc620878e206aa73b65e..fb593445ffbbbf0eeb03b72b9ebce2b90e369ff8 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -15,7 +15,7 @@ import ( "gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/interfaces/utility" + "gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/stoppable" ftStorage "gitlab.com/elixxir/client/storage/fileTransfer" ds "gitlab.com/elixxir/comms/network/dataStructures" @@ -547,7 +547,7 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { } // Wait until the result tracking responds - success, numTimeOut, numRoundFail := utility.TrackResults( + success, numTimeOut, numRoundFail := network.TrackResults( sendResults, len(rounds)) // If a single partition of the end file transfer message does not transmit, diff --git a/keyExchange/rekey.go b/keyExchange/rekey.go index 5f3380c3f07aa79227bd1c5db43a4ea8cc04f51f..bf5d77803692f09e4327ca4193a9b0e0cd7f585e 100644 --- a/keyExchange/rekey.go +++ b/keyExchange/rekey.go @@ -16,7 +16,7 @@ import ( "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/interfaces/utility" + network2 "gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/e2e" @@ -145,7 +145,7 @@ func negotiate(instance *network.Instance, sendE2E interfaces.SendE2E, } //Wait until the result tracking responds - success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, + success, numRoundFail, numTimeOut := network2.TrackResults(sendResults, len(rounds)) // If a single partition of the Key Negotiation request does not diff --git a/keyExchange/trigger.go b/keyExchange/trigger.go index 4605f09280f6b6b55153d3f91b6316393d36de8e..984fc3306400da69c38f2005b4e1695e153c6563 100644 --- a/keyExchange/trigger.go +++ b/keyExchange/trigger.go @@ -16,7 +16,7 @@ import ( "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/interfaces/utility" + "gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/e2e" @@ -145,7 +145,7 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, } //Wait until the result tracking responds - success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, + success, numRoundFail, numTimeOut := network.TrackResults(sendResults, len(rounds)) // If a single partition of the Key Negotiation request does not // transmit, the partner will not be able to read the confirmation. If diff --git a/network/cmixMessageBuffer.go b/network/cmixMessageBuffer.go index a70733ba44e7981be4a6fae8329a52c9227a7a52..d70c22aba048412c200692d88f296f4583870373 100644 --- a/network/cmixMessageBuffer.go +++ b/network/cmixMessageBuffer.go @@ -98,13 +98,18 @@ type CmixMessageBuffer struct { mb *utility.MessageBuffer } -func NewCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { - mb, err := utility.NewMessageBuffer(kv, &cmixMessageHandler{}, key) +func NewOrLoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { + + cmb, err := LoadCmixMessageBuffer(kv, key) if err != nil { - return nil, err + mb, err := utility.NewMessageBuffer(kv, &cmixMessageHandler{}, key) + if err != nil { + return nil, err + } + return &CmixMessageBuffer{mb: mb}, nil } - return &CmixMessageBuffer{mb: mb}, nil + return cmb, nil } func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { @@ -116,26 +121,31 @@ func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, er return &CmixMessageBuffer{mb: mb}, nil } -func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID) { +func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID, params CMIXParams) { + paramBytes, _ := params.MarshalJSON() sm := storedMessage{ Msg: msg.Marshal(), Recipient: recipent.Marshal(), + Params: paramBytes, } cmb.mb.Add(sm) } -func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID) { +func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID, + params CMIXParams) { + paramBytes, _ := params.MarshalJSON() sm := storedMessage{ Msg: msg.Marshal(), Recipient: recipent.Marshal(), + Params: paramBytes, } cmb.mb.AddProcessing(sm) } -func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) { +func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, CMIXParams, bool) { m, ok := cmb.mb.Next() if !ok { - return format.Message{}, nil, false + return format.Message{}, nil, CMIXParams{}, false } sm := m.(storedMessage) @@ -146,10 +156,20 @@ func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) { } recpient, err := id.Unmarshal(sm.Recipient) if err != nil { - jww.FATAL.Panicf("Could nto get an id for stored cmix "+ + jww.FATAL.Panicf("Could not get an id for stored cmix "+ "message buffer: %+v", err) } - return msg, recpient, true + + params := CMIXParams{} + if sm.Params == nil || len(sm.Params) == 0 { + params = GetDefaultCMIXParams() + } else { + if err = params.UnmarshalJSON(sm.Params); err != nil { + jww.FATAL.Panicf("Could not parse the parms for stored CMIX "+ + "message buffer: %+v", err) + } + } + return msg, recpient, params, true } func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipent *id.ID) { diff --git a/network/critical.go b/network/critical.go index d838135bd4fe947b2a01a74a72b947e4b589d6c6..221ae508e18d48c2af13e367025fe62663aab17c 100644 --- a/network/critical.go +++ b/network/critical.go @@ -1,13 +1,123 @@ package network import ( + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/network/health" + "gitlab.com/elixxir/client/stoppable" + "gitlab.com/elixxir/client/storage/versioned" + ds "gitlab.com/elixxir/comms/network/dataStructures" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/elixxir/primitives/states" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "time" ) -type Manager struct { - storage *CmixMessageBuffer - trigger chan bool - hm *health.Monitor +const criticalRawMessagesKey = "CriticalRawMessages" + +// roundEventRegistrar is an interface for the round events system to allow +// for easy testing +type roundEventRegistrar interface { + AddRoundEventChan(rid id.Round, eventChan chan ds.EventReturn, + timeout time.Duration, validStates ...states.Round) *ds.EventCallback +} + +// criticalSender is an anonymous function which takes the data critical +// knows for sending. It should call sendCmixHelper and use scope sharing in an +// anonymous function to include the structures from manager which critical is +// not aware of +type criticalSender func(msg format.Message, recipient *id.ID, + params CMIXParams) (id.Round, ephemeral.Id, error) + +//Structure which allows the auto resending of messages that must be received +type critical struct { + *CmixMessageBuffer + roundEvents roundEventRegistrar + trigger chan bool + send criticalSender +} + +func newCritical(kv *versioned.KV, hm health.Monitor, + roundEvents roundEventRegistrar, send criticalSender) *critical { + cm, err := NewOrLoadCmixMessageBuffer(kv, criticalRawMessagesKey) + if err != nil { + jww.FATAL.Panicf("Failed to load the buffer for critical "+ + "messages: %v", err) + } + c := &critical{ + CmixMessageBuffer: cm, + roundEvents: roundEvents, + trigger: make(chan bool, 100), + send: send, + } + hm.AddHealthCallback(func(healthy bool) { + c.trigger <- healthy + }) + return c +} + +func (c *critical) runCriticalMessages(stop *stoppable.Single) { + for { + select { + case <-stop.Quit(): + stop.ToStopped() + return + case isHealthy := <-c.trigger: + if isHealthy { + c.evaluate(stop) + } + } + } +} + +func (c *critical) handle(msg format.Message, recipient *id.ID, rid id.Round, rtnErr error) { + if rtnErr != nil { + c.Failed(msg, recipient) + } else { + sendResults := make(chan ds.EventReturn, 1) + + c.roundEvents.AddRoundEventChan(rid, sendResults, 1*time.Minute, + states.COMPLETED, states.FAILED) + + success, numTimeOut, _ := TrackResults(sendResults, 1) + if !success { + if numTimeOut > 0 { + jww.ERROR.Printf("critical raw message resend to %s "+ + "(msgDigest: %s) on round %d failed to transmit due to "+ + "timeout", recipient, msg.Digest(), rid) + } else { + jww.ERROR.Printf("critical raw message resend to %s "+ + "(msgDigest: %s) on round %d failed to transmit due to "+ + "send failure", recipient, msg.Digest(), rid) + } + + c.Failed(msg, recipient) + return + } + + jww.INFO.Printf("Successful resend of critical raw message "+ + "to %s (msgDigest: %s) on round %d", recipient, msg.Digest(), rid) + + c.Succeeded(msg, recipient) + } + } -func NewManager() +// evaluate tries to send every message in the critical messages and the raw critical +// messages buffer in parallel +func (c *critical) evaluate(stop *stoppable.Single) { + for msg, recipient, params, has := c.Next(); has; msg, recipient, params, has = c.Next() { + localRid := recipient.DeepCopy() + go func(msg format.Message, recipient *id.ID, params CMIXParams) { + params.Stop = stop + jww.INFO.Printf("Resending critical raw message to %s "+ + "(msgDigest: %s)", recipient, msg.Digest()) + //send the message + round, _, err := c.send(msg, recipient, params) + //pass to the handler + c.handle(msg, recipient, round, err) + //wait on the results to make sure the rounds were successful + }(msg, localRid, params) + } + +} diff --git a/network/manager.go b/network/manager.go index fd4cbc25b647a4fa287195881d72a8574a9b5e47..d7ccd6372a8d9c433c81c351ed656df89d4966e8 100644 --- a/network/manager.go +++ b/network/manager.go @@ -29,6 +29,7 @@ import ( "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/ndf" "math" "sync/atomic" @@ -69,6 +70,7 @@ type manager struct { address.Space identity.Tracker health.Monitor + crit *critical // Earliest tracked round earliestRound *uint64 @@ -122,7 +124,6 @@ func NewManager(params Params, comms *client.Comms, session storage.Session, } /* set up modules */ - nodechan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) // Set up gateway.Sender @@ -160,8 +161,20 @@ func NewManager(params Params, comms *client.Comms, session storage.Session, // Set upthe ability to register with new nodes when they appear m.instance.SetAddGatewayChan(nodechan) + // set up the health monitor m.Monitor = health.Init(instance, params.NetworkHealthTimeout) + //set up critical message tracking (sendCmix only) + critSender := func(msg format.Message, recipient *id.ID, + params CMIXParams) (id.Round, ephemeral.Id, error) { + return sendCmixHelper(m.Sender, msg, recipient, params, m.instance, + m.session.GetCmixGroup(), m.Registrar, m.rng, m.events, + m.session.GetTransmissionID(), m.comms) + } + + m.crit = newCritical(session.GetKV(), m.Monitor, + m.instance.GetRoundEvents(), critSender) + // Report health events m.Monitor.AddHealthCallback(func(isHealthy bool) { m.events.Report(5, "health", "IsHealthy", diff --git a/network/params.go b/network/params.go index 2baf33104c9ad39286006e55d4f85e20d3879a15..219633077ba18f40492fe064df2471537331acae 100644 --- a/network/params.go +++ b/network/params.go @@ -2,7 +2,6 @@ package network import ( "encoding/json" - "gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/historical" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/rounds" @@ -90,29 +89,34 @@ type CMIXParams struct { RoundTries uint Timeout time.Duration RetryDelay time.Duration - ExcludedRounds excludedRounds.ExcludedRounds + ExcludedRounds excludedRounds.ExcludedRounds `json:"-"` // Duration to wait before sending on a round times out and a new round is // tried SendTimeout time.Duration - // an alternate identity preimage to use on send. If not set, the default - // for the sending identity will be used - trigger interfaces.Trigger - // Tag which prints with sending logs to help localize the source // All internal sends are tagged, so the default tag is "External" DebugTag string //Threading interface, can be used to stop the send early - Stop *stoppable.Single + Stop *stoppable.Single `json:"-"` //List of nodes to not send to, will skip a round with these //nodes in it - BlacklistedNodes map[id.ID]interface{} + //todo - do not omit this on json + BlacklistedNodes map[id.ID]bool `json:"-"` + + // Sets the message as critical. The system will track that the round it + // sends on completes and will auto resend in the event the round fails or + // completion cannot be determined. The sent data will be byte identical, + // so this has a high chance of metadata leak. This system should only be + // used in cases where repeats cannot be different + // Only used in sendCmix, not sendManyCmix + Critical bool } -func GetDefaultCMIX() CMIXParams { +func GetDefaultCMIXParams() CMIXParams { return CMIXParams{ RoundTries: 10, Timeout: 25 * time.Second, @@ -122,13 +126,17 @@ func GetDefaultCMIX() CMIXParams { } } -func (c CMIXParams) Marshal() ([]byte, error) { +func (c CMIXParams) MarshalJSON() ([]byte, error) { return json.Marshal(c) } +func (c CMIXParams) UnmarshalJSON(b []byte) error { + return json.Unmarshal(b, &c) +} + // GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set func GetCMIXParameters(params string) (CMIXParams, error) { - p := GetDefaultCMIX() + p := GetDefaultCMIXParams() if len(params) > 0 { err := json.Unmarshal([]byte(params), &p) if err != nil { diff --git a/network/sendCmix.go b/network/sendCmix.go index 592e24eedb527bb53dd92c9ed079719f39b54734..fa47cacac67828a6552bdaf165e7ab6adf39c2d1 100644 --- a/network/sendCmix.go +++ b/network/sendCmix.go @@ -52,8 +52,9 @@ import ( // Will return an error if the network is unhealthy or if it fails to send // (along with the reason). Blocks until successful send or err. // WARNING: Do not roll your own crypto -func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, - payload, mac []byte, cmixParams CMIXParams) (id.Round, ephemeral.Id, error) { +func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, + service message.Service, payload, mac []byte, cmixParams CMIXParams) ( + id.Round, ephemeral.Id, error) { if !m.Monitor.IsHealthy() { return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " + "network is not healthy") @@ -66,9 +67,19 @@ func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, ser msg.SetMac(mac) msg.SetSIH(service.Hash(msg.GetContents())) - return sendCmixHelper(m.Sender, msg, recipient, cmixParams, m.instance, - m.session.GetCmixGroup(), m.Registrar, m.rng, m.events, + if cmixParams.Critical { + m.crit.AddProcessing(msg, recipient, cmixParams) + } + + rid, ephID, rtnErr := sendCmixHelper(m.Sender, msg, recipient, cmixParams, + m.instance, m.session.GetCmixGroup(), m.Registrar, m.rng, m.events, m.session.GetTransmissionID(), m.comms) + + if cmixParams.Critical { + m.crit.handle(msg, recipient, rid, rtnErr) + } + + return rid, ephID, rtnErr } // Helper function for sendCmix diff --git a/network/sendManyCmix.go b/network/sendManyCmix.go index 307062798f5b86326e1525c64b07ffe7ebe9a524..fc070733db1708285b4b76612434de954f23adc6 100644 --- a/network/sendManyCmix.go +++ b/network/sendManyCmix.go @@ -63,6 +63,7 @@ type TargetedCmixMessage struct { // (panic otherwise). If used, fill with random bits. // Will return an error if the network is unhealthy or if it fails to send // (along with the reason). Blocks until successful send or err. +// Does not support Critical Messages // WARNING: Do not roll your own crypto func (m *manager) SendManyCMIX(messages []TargetedCmixMessage, p CMIXParams) (id.Round, []ephemeral.Id, error) { diff --git a/interfaces/utility/trackResults.go b/network/trackResults.go similarity index 98% rename from interfaces/utility/trackResults.go rename to network/trackResults.go index a02f1e0e4d2442b7a9979dd50dd93c2976d02fe6..4d7bbb1d60e4c8c1cd6c7f191c64a8792602e82a 100644 --- a/interfaces/utility/trackResults.go +++ b/network/trackResults.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package utility +package network import ( jww "github.com/spf13/jwalterweatherman" diff --git a/single/response.go b/single/response.go index 3a6dbfa01c7d42500313346919f29eefff650701..5c3e80abf7a54ec9e174ebc5b9494c8d90f4770e 100644 --- a/single/response.go +++ b/single/response.go @@ -12,7 +12,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/client/interfaces/utility" + "gitlab.com/elixxir/client/network" ds "gitlab.com/elixxir/comms/network/dataStructures" cAuth "gitlab.com/elixxir/crypto/e2e/auth" "gitlab.com/elixxir/crypto/e2e/singleUse" @@ -101,7 +101,7 @@ func (m *Manager) respondSingleUse(partner Contact, payload []byte, } // Wait until the result tracking responds - success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, len(roundMap)) + success, numRoundFail, numTimeOut := network.TrackResults(sendResults, len(roundMap)) if !success { return errors.Errorf("tracking results of %d rounds: %d round "+ "failures, %d round event time outs; the send cannot be retried.",