From 1ac1dc9a57cba602261531b107baae6f5fef4a0e Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Tue, 22 Mar 2022 13:44:23 -0700 Subject: [PATCH] finished implementations within the message package --- interfaces/params/message.go | 8 +- network/message/bundle.go | 4 +- network/message/handler.go | 81 +++++++++---------- network/message/inProgress.go | 16 ++-- .../{garbled_test.go => inProgress_test.go} | 4 +- network/message/manager.go | 12 +-- 6 files changed, 60 insertions(+), 65 deletions(-) rename network/message/{garbled_test.go => inProgress_test.go} (98%) diff --git a/interfaces/params/message.go b/interfaces/params/message.go index 66371a779..70ef9ad8e 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -14,8 +14,8 @@ import ( type Messages struct { MessageReceptionBuffLen uint MessageReceptionWorkerPoolSize uint - MaxChecksGarbledMessage uint - GarbledMessageWait time.Duration + MaxChecksInProcessMessage uint + InProcessMessageWait time.Duration RealtimeOnly bool } @@ -23,8 +23,8 @@ func GetDefaultMessage() Messages { return Messages{ MessageReceptionBuffLen: 500, MessageReceptionWorkerPoolSize: 4, - MaxChecksGarbledMessage: 10, - GarbledMessageWait: 15 * time.Minute, + MaxChecksInProcessMessage: 10, + InProcessMessageWait: 15 * time.Minute, RealtimeOnly: false, } } diff --git a/network/message/bundle.go b/network/message/bundle.go index 81c649bd3..a60d070ae 100644 --- a/network/message/bundle.go +++ b/network/message/bundle.go @@ -8,7 +8,7 @@ package message import ( - "gitlab.com/elixxir/client/storage/reception" + "gitlab.com/elixxir/client/interfaces" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -19,5 +19,5 @@ type Bundle struct { RoundInfo *pb.RoundInfo Messages []format.Message Finish func() - Identity reception.IdentityUse + Identity interfaces.Identity } diff --git a/network/message/handler.go b/network/message/handler.go index 0a051e079..182188a61 100644 --- a/network/message/handler.go +++ b/network/message/handler.go @@ -9,74 +9,71 @@ package message import ( "fmt" - "sync" - "time" - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/preimage" "gitlab.com/elixxir/client/stoppable" - "gitlab.com/elixxir/client/storage/edge" - "gitlab.com/elixxir/crypto/e2e" fingerprint2 "gitlab.com/elixxir/crypto/fingerprint" "gitlab.com/elixxir/primitives/format" - "gitlab.com/elixxir/primitives/states" - "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" + "sync" ) -func (m *Manager) handleMessages(stop *stoppable.Single) { +func (p *pickup) handleMessages(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case bundle := <-m.messageReception: - go func(){ - wg := sync.WaitGroup{} - wg.Add(len(bundle.Messages)) - for _, msg := range bundle.Messages { - go func() { - m.handleMessage(msg, bundle) - wg.Done() - }() - } - wg.Wait() - bundle.Finish() - }() + case bundle := <-p.messageReception: + go func() { + wg := sync.WaitGroup{} + wg.Add(len(bundle.Messages)) + for i := range bundle.Messages { + msg := bundle.Messages[i] + go func() { + count, ts := p.inProcess.Add(msg, bundle.RoundInfo, bundle.Identity) + wg.Done() + success := p.handleMessage(msg, bundle) + if success { + p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity) + } else { + // fail the message if any part of the decryption fails, + // unless it is the last attempts and has been in the buffer long + // enough, in which case remove it + if count == p.param.MaxChecksInProcessMessage && + netTime.Since(ts) > p.param.InProcessMessageWait { + p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity) + } else { + p.inProcess.Failed(msg, bundle.RoundInfo, bundle.Identity) + } - } + } + }() + } + wg.Wait() + bundle.Finish() + }() } } } -func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { +func (p *pickup) handleMessage(ecrMsg format.Message, bundle Bundle) bool { fingerprint := ecrMsg.GetKeyFP() - // msgDigest := ecrMsg.Digest() identity := bundle.Identity - round := bundle.RoundInfo - // newID := *id.ID{} // todo use new id systme from ticket - // { - // ID id.ID - // ephID ephemeral.Id - // } - - //save to garbled - m.garbledStore.Add(ecrMsg) var receptionID interfaces.Identity // If we have a fingerprint, process it. - if proc, exists := m.pop(fingerprint); exists { + if proc, exists := p.pop(identity.Source, fingerprint); exists { proc.Process(ecrMsg, receptionID, round) - m.garbledStore.Remove(ecrMsg) - return + return true } - triggers, exists := m.get(ecrMsg.GetIdentityFP(), ecrMsg.GetContents()) + triggers, exists := p.get(identity.Source, ecrMsg.GetIdentityFP(), ecrMsg.GetContents()) if exists { for _, t := range triggers { go t.Process(ecrMsg, receptionID, round) @@ -85,8 +82,7 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { jww.ERROR.Printf("empty trigger list for %s", ecrMsg.GetIdentityFP()) // get preimage } - m.garbledStore.Remove(ecrMsg) - return + return true } else { // TODO: delete this else block because it should not be needed. jww.INFO.Printf("checking backup %v", preimage.MakeDefault(identity.Source)) @@ -106,7 +102,6 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { im := fmt.Sprintf("Garbled/RAW Message: keyFP: %v, round: %d"+ "msgDigest: %s, not determined to be for client", ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest()) - m.Internal.Events.Report(1, "MessageReception", "Garbled", im) - //denote as active in garbled - m.garbledStore.Failed(ecrMsg) + p.events.Report(1, "MessageReception", "Garbled", im) + return false } diff --git a/network/message/inProgress.go b/network/message/inProgress.go index da4c8fea8..b96d85d36 100644 --- a/network/message/inProgress.go +++ b/network/message/inProgress.go @@ -23,9 +23,9 @@ import ( // CheckInProgressMessages triggers rechecking all in progress messages // if the queue is not full Exposed on the network pickup -func (m *pickup) CheckInProgressMessages() { +func (p *pickup) CheckInProgressMessages() { select { - case m.checkInProgress <- struct{}{}: + case p.checkInProgress <- struct{}{}: default: jww.WARN.Println("Failed to check garbled messages " + "due to full channel") @@ -33,23 +33,23 @@ func (m *pickup) CheckInProgressMessages() { } //long running thread which processes messages that need to be checked -func (m *pickup) recheckInProgressRunner(stop *stoppable.Single) { +func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case <-m.checkInProgress: + case <-p.checkInProgress: jww.INFO.Printf("[GARBLE] Checking Garbled messages") - m.recheckInProgress() + p.recheckInProgress() } } } //handler for a single run of recheck messages -func (m *pickup) recheckInProgress() { +func (p *pickup) recheckInProgress() { //try to decrypt every garbled message, excising those who's counts are too high - for grbldMsg, ri, identity, has := m.inProcess.Next(); has; grbldMsg, ri, identity, has = m.inProcess.Next() { + for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { bundle := Bundle{ Round: id.Round(ri.ID), RoundInfo: ri, @@ -58,7 +58,7 @@ func (m *pickup) recheckInProgress() { Identity: identity, } select { - case m.messageReception <- bundle: + case p.messageReception <- bundle: default: jww.WARN.Printf("failed to send bundle, channel full") diff --git a/network/message/garbled_test.go b/network/message/inProgress_test.go similarity index 98% rename from network/message/garbled_test.go rename to network/message/inProgress_test.go index 5aeed7ce1..f5c76b057 100644 --- a/network/message/garbled_test.go +++ b/network/message/inProgress_test.go @@ -80,8 +80,8 @@ func TestManager_CheckGarbledMessages(t *testing.T) { m := NewManager(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, - MaxChecksGarbledMessage: 20, - GarbledMessageWait: time.Hour, + MaxChecksInProcessMessage: 20, + InProcessMessageWait: time.Hour, }}, nil, sender) rng := csprng.NewSystemRNG() diff --git a/network/message/manager.go b/network/message/manager.go index 1da8c8bee..8afaf5c67 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -115,24 +115,24 @@ func NewManager(param params.Network, } //Gets the channel to send received messages on -func (m *pickup) GetMessageReceptionChannel() chan<- Bundle { - return m.messageReception +func (p *pickup) GetMessageReceptionChannel() chan<- Bundle { + return p.messageReception } //Starts all worker pool -func (m *pickup) StartProcessies() stoppable.Stoppable { +func (p *pickup) StartProcessies() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") //create the message handler workers - for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { + for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ { stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) - go m.handleMessages(stop) + go p.handleMessages(stop) multi.Add(stop) } //create the in progress messages thread garbledStop := stoppable.NewSingle("GarbledMessages") - go m.recheckInProgressRunner(garbledStop) + go p.recheckInProgressRunner(garbledStop) multi.Add(garbledStop) return multi -- GitLab