diff --git a/interfaces/params/message.go b/interfaces/params/message.go index 66371a7797d1c647c3a7efe96d801f4980e008fd..70ef9ad8efbb678a15afdf58bd2cbb6eb72035de 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -14,8 +14,8 @@ import ( type Messages struct { MessageReceptionBuffLen uint MessageReceptionWorkerPoolSize uint - MaxChecksGarbledMessage uint - GarbledMessageWait time.Duration + MaxChecksInProcessMessage uint + InProcessMessageWait time.Duration RealtimeOnly bool } @@ -23,8 +23,8 @@ func GetDefaultMessage() Messages { return Messages{ MessageReceptionBuffLen: 500, MessageReceptionWorkerPoolSize: 4, - MaxChecksGarbledMessage: 10, - GarbledMessageWait: 15 * time.Minute, + MaxChecksInProcessMessage: 10, + InProcessMessageWait: 15 * time.Minute, RealtimeOnly: false, } } diff --git a/network/message/bundle.go b/network/message/bundle.go index 81c649bd3798d693cd451dd7322d9d83e95510d9..a60d070ae2f556df96b80639cdb754603818c220 100644 --- a/network/message/bundle.go +++ b/network/message/bundle.go @@ -8,7 +8,7 @@ package message import ( - "gitlab.com/elixxir/client/storage/reception" + "gitlab.com/elixxir/client/interfaces" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -19,5 +19,5 @@ type Bundle struct { RoundInfo *pb.RoundInfo Messages []format.Message Finish func() - Identity reception.IdentityUse + Identity interfaces.Identity } diff --git a/network/message/handler.go b/network/message/handler.go index 0a051e0791cfae0512baed311647ae16ccb8851f..182188a614d76f407db7e9c8ea3e0aace9957bc7 100644 --- a/network/message/handler.go +++ b/network/message/handler.go @@ -9,74 +9,71 @@ package message import ( "fmt" - "sync" - "time" - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/preimage" "gitlab.com/elixxir/client/stoppable" - "gitlab.com/elixxir/client/storage/edge" - "gitlab.com/elixxir/crypto/e2e" fingerprint2 "gitlab.com/elixxir/crypto/fingerprint" "gitlab.com/elixxir/primitives/format" - "gitlab.com/elixxir/primitives/states" - "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/netTime" + "sync" ) -func (m *Manager) handleMessages(stop *stoppable.Single) { +func (p *pickup) handleMessages(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case bundle := <-m.messageReception: - go func(){ - wg := sync.WaitGroup{} - wg.Add(len(bundle.Messages)) - for _, msg := range bundle.Messages { - go func() { - m.handleMessage(msg, bundle) - wg.Done() - }() - } - wg.Wait() - bundle.Finish() - }() + case bundle := <-p.messageReception: + go func() { + wg := sync.WaitGroup{} + wg.Add(len(bundle.Messages)) + for i := range bundle.Messages { + msg := bundle.Messages[i] + go func() { + count, ts := p.inProcess.Add(msg, bundle.RoundInfo, bundle.Identity) + wg.Done() + success := p.handleMessage(msg, bundle) + if success { + p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity) + } else { + // fail the message if any part of the decryption fails, + // unless it is the last attempts and has been in the buffer long + // enough, in which case remove it + if count == p.param.MaxChecksInProcessMessage && + netTime.Since(ts) > p.param.InProcessMessageWait { + p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity) + } else { + p.inProcess.Failed(msg, bundle.RoundInfo, bundle.Identity) + } - } + } + }() + } + wg.Wait() + bundle.Finish() + }() } } } -func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { +func (p *pickup) handleMessage(ecrMsg format.Message, bundle Bundle) bool { fingerprint := ecrMsg.GetKeyFP() - // msgDigest := ecrMsg.Digest() identity := bundle.Identity - round := bundle.RoundInfo - // newID := *id.ID{} // todo use new id systme from ticket - // { - // ID id.ID - // ephID ephemeral.Id - // } - - //save to garbled - m.garbledStore.Add(ecrMsg) var receptionID interfaces.Identity // If we have a fingerprint, process it. - if proc, exists := m.pop(fingerprint); exists { + if proc, exists := p.pop(identity.Source, fingerprint); exists { proc.Process(ecrMsg, receptionID, round) - m.garbledStore.Remove(ecrMsg) - return + return true } - triggers, exists := m.get(ecrMsg.GetIdentityFP(), ecrMsg.GetContents()) + triggers, exists := p.get(identity.Source, ecrMsg.GetIdentityFP(), ecrMsg.GetContents()) if exists { for _, t := range triggers { go t.Process(ecrMsg, receptionID, round) @@ -85,8 +82,7 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { jww.ERROR.Printf("empty trigger list for %s", ecrMsg.GetIdentityFP()) // get preimage } - m.garbledStore.Remove(ecrMsg) - return + return true } else { // TODO: delete this else block because it should not be needed. jww.INFO.Printf("checking backup %v", preimage.MakeDefault(identity.Source)) @@ -106,7 +102,6 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) { im := fmt.Sprintf("Garbled/RAW Message: keyFP: %v, round: %d"+ "msgDigest: %s, not determined to be for client", ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest()) - m.Internal.Events.Report(1, "MessageReception", "Garbled", im) - //denote as active in garbled - m.garbledStore.Failed(ecrMsg) + p.events.Report(1, "MessageReception", "Garbled", im) + return false } diff --git a/network/message/inProgress.go b/network/message/inProgress.go index da4c8fea897149e044737923e995d453e4c99e10..b96d85d369533a372b298c88ff853423e6a443a2 100644 --- a/network/message/inProgress.go +++ b/network/message/inProgress.go @@ -23,9 +23,9 @@ import ( // CheckInProgressMessages triggers rechecking all in progress messages // if the queue is not full Exposed on the network pickup -func (m *pickup) CheckInProgressMessages() { +func (p *pickup) CheckInProgressMessages() { select { - case m.checkInProgress <- struct{}{}: + case p.checkInProgress <- struct{}{}: default: jww.WARN.Println("Failed to check garbled messages " + "due to full channel") @@ -33,23 +33,23 @@ func (m *pickup) CheckInProgressMessages() { } //long running thread which processes messages that need to be checked -func (m *pickup) recheckInProgressRunner(stop *stoppable.Single) { +func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) { for { select { case <-stop.Quit(): stop.ToStopped() return - case <-m.checkInProgress: + case <-p.checkInProgress: jww.INFO.Printf("[GARBLE] Checking Garbled messages") - m.recheckInProgress() + p.recheckInProgress() } } } //handler for a single run of recheck messages -func (m *pickup) recheckInProgress() { +func (p *pickup) recheckInProgress() { //try to decrypt every garbled message, excising those who's counts are too high - for grbldMsg, ri, identity, has := m.inProcess.Next(); has; grbldMsg, ri, identity, has = m.inProcess.Next() { + for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { bundle := Bundle{ Round: id.Round(ri.ID), RoundInfo: ri, @@ -58,7 +58,7 @@ func (m *pickup) recheckInProgress() { Identity: identity, } select { - case m.messageReception <- bundle: + case p.messageReception <- bundle: default: jww.WARN.Printf("failed to send bundle, channel full") diff --git a/network/message/garbled_test.go b/network/message/inProgress_test.go similarity index 98% rename from network/message/garbled_test.go rename to network/message/inProgress_test.go index 5aeed7ce154a0f86f216eceff2c641decb34eddd..f5c76b05717169b148fbfe9b52ea625838805c94 100644 --- a/network/message/garbled_test.go +++ b/network/message/inProgress_test.go @@ -80,8 +80,8 @@ func TestManager_CheckGarbledMessages(t *testing.T) { m := NewManager(i, params.Network{Messages: params.Messages{ MessageReceptionBuffLen: 20, MessageReceptionWorkerPoolSize: 20, - MaxChecksGarbledMessage: 20, - GarbledMessageWait: time.Hour, + MaxChecksInProcessMessage: 20, + InProcessMessageWait: time.Hour, }}, nil, sender) rng := csprng.NewSystemRNG() diff --git a/network/message/manager.go b/network/message/manager.go index 1da8c8beeb79af192e174df83a083d3d80088005..8afaf5c67ca9fb771c8484829602ba3c7d1f36f6 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -115,24 +115,24 @@ func NewManager(param params.Network, } //Gets the channel to send received messages on -func (m *pickup) GetMessageReceptionChannel() chan<- Bundle { - return m.messageReception +func (p *pickup) GetMessageReceptionChannel() chan<- Bundle { + return p.messageReception } //Starts all worker pool -func (m *pickup) StartProcessies() stoppable.Stoppable { +func (p *pickup) StartProcessies() stoppable.Stoppable { multi := stoppable.NewMulti("MessageReception") //create the message handler workers - for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { + for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ { stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) - go m.handleMessages(stop) + go p.handleMessages(stop) multi.Add(stop) } //create the in progress messages thread garbledStop := stoppable.NewSingle("GarbledMessages") - go m.recheckInProgressRunner(garbledStop) + go p.recheckInProgressRunner(garbledStop) multi.Add(garbledStop) return multi