diff --git a/network/manager.go b/network/manager.go index b9e7a7864b5d4d39e8e9dcf7e71e50adee107e3f..f7ffa09716ed045496483d5d6d22ba8c5198da34 100644 --- a/network/manager.go +++ b/network/manager.go @@ -16,6 +16,7 @@ import ( "gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/network/internal" + "gitlab.com/elixxir/client/network/keyExchange" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/node" "gitlab.com/elixxir/client/network/permissioning" @@ -140,11 +141,14 @@ func (m *manager) startRunners() error { m.runners.Add(trackNetworkStopper) // Message reception - m.runners.Add(m.message.StartMessageReceptionWorkerPool()) + m.runners.Add(m.message.StartProcessies()) // Round processing m.runners.Add(m.round.StartProcessors()) + // Key exchange + m.runners.Add(keyExchange.Start(m.Context)) + return nil } diff --git a/network/message/critical.go b/network/message/critical.go new file mode 100644 index 0000000000000000000000000000000000000000..a46a257306ba180374e1551b801dd1ce21e83fb8 --- /dev/null +++ b/network/message/critical.go @@ -0,0 +1,62 @@ +package message + +import ( + "github.com/pkg/errors" + "gitlab.com/elixxir/client/context/message" + "gitlab.com/elixxir/client/context/params" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/context/utility" + "gitlab.com/elixxir/client/storage/e2e" + ds "gitlab.com/elixxir/comms/network/dataStructures" + "gitlab.com/elixxir/primitives/states" + "time" +) + +func (m *Manager) processCriticalMessages(quitCh <-chan struct{}) { + done := false + for !done { + select { + case <-quitCh: + done = true + case isHealthy := <-m.networkIsHealthy: + if isHealthy { + m.criticalMessages() + } + } + } +} + +func (m *Manager) criticalMessages() { + critMsgs := m.Session.GetCriticalMessages() + //try to send every message in the critical messages buffer in paralell + for msg, param, has := critMsgs.Next(); has; msg, param, has = critMsgs.Next() { + go func(msg message.Send, param params.E2E) { + //send the message + rounds, err := m.sendE2E(msg, param) + //if the message fail to send, notify the buffer so it can be handled + //in the future and exit + if err != nil { + jww.ERROR.Printf("Failed to send critical message on " + + "notification of healthy network") + critMsgs.Failed(msg) + return + } + //wait on the results to make sure the rounds were sucesfull + sendResults := make(chan ds.EventReturn, len(rounds)) + roundEvents := m.Instance.GetRoundEvents() + for _, r := range rounds { + roundEvents.AddRoundEventChan(r, sendResults, 1*time.Minute, + states.COMPLETED, states.FAILED) + } + success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds)) + if !success { + jww.ERROR.Printf("critical message send failed to transmit "+ + "transmit %v/%v paritions: %v round failures, %v timeouts", + numRoundFail+numTimeOut, len(rounds), numRoundFail, numTimeOut) + critMsgs.Failed(msg) + return + } + critMsgs.Succeeded(msg) + }(msg, param) + } +} diff --git a/network/message/manager.go b/network/message/manager.go index ad9c0e2f7621c3be3c40e487a69857219b9d8c3f..6cbfe4b320b3716c0492f1b0007d84ffeaf9431f 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -17,6 +17,7 @@ type Manager struct { messageReception chan Bundle nodeRegistration chan network.NodeGateway + networkIsHealthy chan bool } func NewManager(internal internal.Internal, param params.Messages, @@ -26,6 +27,7 @@ func NewManager(internal internal.Internal, param params.Messages, param: param, partitioner: parse.NewPartitioner(dummyMessage.ContentsSize(), internal.Session), messageReception: make(chan Bundle, param.MessageReceptionBuffLen), + networkIsHealthy: make(chan bool, 1), nodeRegistration: nodeRegistration, } m.Internal = internal @@ -38,7 +40,7 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { } //Starts all worker pool -func (m *Manager) StartMessageReceptionWorkerPool() stoppable.Stoppable { +func (m *Manager) StartProcessies() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { @@ -47,5 +49,9 @@ func (m *Manager) StartMessageReceptionWorkerPool() stoppable.Stoppable { multi.Add(stop) } + critStop := stoppable.NewSingle("Critical Messages Handler") + go m.processCriticalMessages(critStop.Quit()) + m.Health.AddChannel(m.networkIsHealthy) + return multi } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index beada89e90134b38dfc528e0e6abc59f859da54a..669fff55928c7fc77680d8b76251c350206f042f 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -13,22 +13,10 @@ import ( "time" ) -// SendCMIX sends a "raw" CMIX message payload to the provided -// recipient. Note that both SendE2E and SendUnsafe call SendCMIX. -// Returns the round ID of the round the payload was sent or an error -// if it fails. -func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { - if !m.Health.IsRunning() { - return 0, errors.New("Cannot send cmix message when the " + - "network is not healthy") - } - - return m.sendCMIX(msg, param) -} // Internal send e2e which bypasses the network check, for use in SendE2E and // SendUnsafe which do their own network checks -func (m *Manager) sendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { +func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { timeStart := time.Now() attempted := set.New() diff --git a/network/message/sendE2E.go b/network/message/sendE2E.go index 53c3353886d12cc6d6540c5d5db518ce3629c4d4..cdc3b8b210613d0057e2b38258bc5ce66c965be5 100644 --- a/network/message/sendE2E.go +++ b/network/message/sendE2E.go @@ -17,21 +17,7 @@ import ( "time" ) -// SendE2E sends an end-to-end payload to the provided recipient with -// the provided msgType. Returns the list of rounds in which parts of -// the message were sent or an error if it fails. -func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( - []id.Round, error) { - - if !m.Health.IsRunning() { - return nil, errors.New("Cannot send e2e message when the " + - "network is not healthy") - } - - return m.sendE2E(msg, e2eP) -} - -func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) { +func (m *Manager) SendE2E(msg message.Send, param params.E2E) ([]id.Round, error) { //timestamp the message ts := time.Now() @@ -75,7 +61,7 @@ func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error wg.Add(1) go func(i int) { var err error - roundIds[i], err = m.sendCMIX(msgEnc, param.CMIX) + roundIds[i], err = m.SendCMIX(msgEnc, param.CMIX) if err != nil { errCh <- err } diff --git a/network/message/sendUnsafe.go b/network/message/sendUnsafe.go index 788c59c6d973a0603297a4c445f93c3539c2a889..8619e521e2ddd8c545073919461217aab5fc3e66 100644 --- a/network/message/sendUnsafe.go +++ b/network/message/sendUnsafe.go @@ -2,7 +2,6 @@ package message import ( "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/context/message" "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/crypto/e2e" @@ -12,25 +11,7 @@ import ( "time" ) -// SendUnsafe sends an unencrypted payload to the provided recipient -// with the provided msgType. Returns the list of rounds in which parts -// of the message were sent or an error if it fails. -// NOTE: Do not use this function unless you know what you are doing. -// This function always produces an error message in client logging. -func (m *rounds.Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { - if !m.health.IsRunning() { - return nil, errors.New("cannot send unsafe message when the " + - "network is not healthy") - } - - jww.WARN.Println("Sending unsafe message. Unsafe payloads have no end" + - " to end encryption, they have limited security and privacy " + - "preserving properties") - - return m.sendUnsafe(msg, param) -} - -func (m *rounds.Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { +func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { //timestamp the message ts := time.Now() @@ -56,7 +37,7 @@ func (m *rounds.Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id wg.Add(1) go func(i int) { var err error - roundIds[i], err = m.sendCMIX(msgCmix, param.CMIX) + roundIds[i], err = m.SendCMIX(msgCmix, param.CMIX) if err != nil { errCh <- err } diff --git a/network/send.go b/network/send.go new file mode 100644 index 0000000000000000000000000000000000000000..acbdf18368b48dba8812fdd172147986170b5746 --- /dev/null +++ b/network/send.go @@ -0,0 +1,55 @@ +package network + +import ( + "github.com/pkg/errors" + "gitlab.com/elixxir/client/context/message" + "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + jww "github.com/spf13/jwalterweatherman" +) + +// SendCMIX sends a "raw" CMIX message payload to the provided +// recipient. Note that both SendE2E and SendUnsafe call SendCMIX. +// Returns the round ID of the round the payload was sent or an error +// if it fails. +func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) { + if !m.m.Health.IsRunning() { + return 0, errors.New("Cannot send cmix message when the " + + "network is not healthy") + } + + return m.m.message.SendCMIX(msg, param) +} + +// SendUnsafe sends an unencrypted payload to the provided recipient +// with the provided msgType. Returns the list of rounds in which parts +// of the message were sent or an error if it fails. +// NOTE: Do not use this function unless you know what you are doing. +// This function always produces an error message in client logging. +func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { + if !m.m.Health.IsRunning() { + return nil, errors.New("cannot send unsafe message when the " + + "network is not healthy") + } + + jww.WARN.Println("Sending unsafe message. Unsafe payloads have no end" + + " to end encryption, they have limited security and privacy " + + "preserving properties") + + return m.SendUnsafe(msg, param) +} + +// SendE2E sends an end-to-end payload to the provided recipient with +// the provided msgType. Returns the list of rounds in which parts of +// the message were sent or an error if it fails. +func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( + []id.Round, error) { + + if !m.m.Health.IsRunning() { + return nil, errors.New("Cannot send e2e message when the " + + "network is not healthy") + } + + return m.m.message.SendE2E(msg, e2eP) +}