diff --git a/cmix/follow.go b/cmix/follow.go index 87f45b1314eb91ce00b002c9fbb3f3fc9a0da707..ceda2004f11eb8a24a21fa8ed8233f6a73feb7cf 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -417,7 +417,7 @@ func (c *client) follow(identity receptionID.IdentityUse, // remaining earliestRemaining, roundsWithMessages, roundsUnknown := gwRoundsState.RangeUnchecked( - updatedEarliestRound, c.param.KnownRoundsThreshold, roundChecker) + updatedEarliestRound, c.param.KnownRoundsThreshold, roundChecker, 100) jww.DEBUG.Printf("Processed RangeUnchecked for %d, Oldest: %d, "+ "firstUnchecked: %d, last Checked: %d, threshold: %d, "+ @@ -436,16 +436,15 @@ func (c *client) follow(identity receptionID.IdentityUse, var roundsWithMessages2 []id.Round - if !c.param.RealtimeOnly { - roundsWithMessages2 = identity.UR.Iterate(func(rid id.Round) bool { - if gwRoundsState.Checked(rid) { - return Checker(rid, filterList, identity.CR) - } - return false - }, roundsUnknown, abandon) - } + roundsWithMessages2 = identity.UR.Iterate(func(rid id.Round) bool { + if gwRoundsState.Checked(rid) { + return Checker(rid, filterList, identity.CR) + } + return false + }, roundsUnknown, abandon) - for _, rid := range roundsWithMessages { + for i := 0; i < len(roundsWithMessages); i++ { + rid := roundsWithMessages[i] // Denote that the round has been looked at in the tracking store if identity.CR.Check(rid) { c.GetMessagesFromRound(rid, identity.EphemeralIdentity) diff --git a/cmix/message/params.go b/cmix/message/params.go index 61bc149c329f920e8afe523c7d0c3779b282d1fd..2b63ee6d21c8cdc0a1aab3aba13b774b2d8449a7 100644 --- a/cmix/message/params.go +++ b/cmix/message/params.go @@ -35,7 +35,7 @@ type paramsDisk struct { func GetDefaultParams() Params { return Params{ MessageReceptionBuffLen: 500, - MessageReceptionWorkerPoolSize: 4, + MessageReceptionWorkerPoolSize: 2, MaxChecksInProcessMessage: 10, InProcessMessageWait: 15 * time.Minute, RealtimeOnly: false, diff --git a/cmix/params.go b/cmix/params.go index ec4148ea782f62079898dc829f0b28a495cd9032..01232336446cf9c5a51a711b7aaa6c8c3ea6c90a 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -52,9 +52,6 @@ type Params struct { // for debugging. VerboseRoundTracking bool - // RealtimeOnly disables all attempts to pick up dropped or missed messages. - RealtimeOnly bool - // ReplayRequests Resends auth requests up the stack if received multiple // times. ReplayRequests bool @@ -96,7 +93,7 @@ type paramsDisk struct { // default parameters. func GetDefaultParams() Params { n := Params{ - TrackNetworkPeriod: 100 * time.Millisecond, + TrackNetworkPeriod: 1000 * time.Millisecond, MaxCheckedRounds: 500, RegNodesBufferLen: 1000, NetworkHealthTimeout: 15 * time.Second, @@ -104,7 +101,6 @@ func GetDefaultParams() Params { KnownRoundsThreshold: 1500, // 5 rounds/sec * 60 sec/min * 5 min FastPolling: true, VerboseRoundTracking: false, - RealtimeOnly: false, ReplayRequests: true, MaxParallelIdentityTracks: 5, ClockSkewClamp: 50 * time.Millisecond, @@ -141,7 +137,6 @@ func (p Params) MarshalJSON() ([]byte, error) { KnownRoundsThreshold: p.KnownRoundsThreshold, FastPolling: p.FastPolling, VerboseRoundTracking: p.VerboseRoundTracking, - RealtimeOnly: p.RealtimeOnly, ReplayRequests: p.ReplayRequests, Rounds: p.Rounds, Pickup: p.Pickup, @@ -170,7 +165,6 @@ func (p *Params) UnmarshalJSON(data []byte) error { KnownRoundsThreshold: pDisk.KnownRoundsThreshold, FastPolling: pDisk.FastPolling, VerboseRoundTracking: pDisk.VerboseRoundTracking, - RealtimeOnly: pDisk.RealtimeOnly, ReplayRequests: pDisk.ReplayRequests, Rounds: pDisk.Rounds, Pickup: pDisk.Pickup, @@ -182,13 +176,6 @@ func (p *Params) UnmarshalJSON(data []byte) error { return nil } -func (p Params) SetRealtimeOnlyAll() Params { - p.RealtimeOnly = true - p.Pickup.RealtimeOnly = true - p.Message.RealtimeOnly = true - return p -} - const DefaultDebugTag = "External" type CMIXParams struct { diff --git a/cmix/pickup/get.go b/cmix/pickup/get.go index a31fb36300a2b116ef17ceec9d7eb7e401aef644..af795d0ab2b55421fe3e8094f4a715f82be29f88 100644 --- a/cmix/pickup/get.go +++ b/cmix/pickup/get.go @@ -59,13 +59,11 @@ func (m *pickup) GetMessagesFromRound( identity.Source) // store the round as an un-retrieved round - if !m.params.RealtimeOnly { - err = m.unchecked.AddRound(roundID, ri, - identity.Source, identity.EphId) - if err != nil { - jww.FATAL.Panicf( - "Failed to denote Unchecked Round for round %d", roundID) - } + err = m.unchecked.AddRound(roundID, ri, + identity.Source, identity.EphId) + if err != nil { + jww.FATAL.Panicf( + "Failed to denote Unchecked Round for round %d", roundID) } // If found, send to Message Retrieval Workers diff --git a/cmix/pickup/params.go b/cmix/pickup/params.go index bdddf820e786caebc3c8912fb7a33786eb022ce0..a30a065290b34701d4f11da91594778c9fdbbfcd 100644 --- a/cmix/pickup/params.go +++ b/cmix/pickup/params.go @@ -35,9 +35,6 @@ type Params struct { // tried SendTimeout time.Duration - // Disables all attempts to pick up dropped or missed messages - RealtimeOnly bool - // Toggles if historical rounds should always be used ForceHistoricalRounds bool } @@ -57,13 +54,12 @@ type paramsDisk struct { // GetDefaultParams returns a default set of Params. func GetDefaultParams() Params { return Params{ - NumMessageRetrievalWorkers: 4, - LookupRoundsBufferLen: 2000, - MaxHistoricalRoundsRetries: 3, - UncheckRoundPeriod: 20 * time.Second, + NumMessageRetrievalWorkers: 2, + LookupRoundsBufferLen: 100000, + MaxHistoricalRoundsRetries: 2, + UncheckRoundPeriod: 120 * time.Second, ForceMessagePickupRetry: false, SendTimeout: 3 * time.Second, - RealtimeOnly: false, } } @@ -89,7 +85,6 @@ func (p Params) MarshalJSON() ([]byte, error) { UncheckRoundPeriod: p.UncheckRoundPeriod, ForceMessagePickupRetry: p.ForceMessagePickupRetry, SendTimeout: p.SendTimeout, - RealtimeOnly: p.RealtimeOnly, ForceHistoricalRounds: p.ForceHistoricalRounds, } @@ -112,7 +107,6 @@ func (p *Params) UnmarshalJSON(data []byte) error { UncheckRoundPeriod: pDisk.UncheckRoundPeriod, ForceMessagePickupRetry: pDisk.ForceMessagePickupRetry, SendTimeout: pDisk.SendTimeout, - RealtimeOnly: pDisk.RealtimeOnly, ForceHistoricalRounds: pDisk.ForceHistoricalRounds, } diff --git a/cmix/pickup/pickup.go b/cmix/pickup/pickup.go index 581aeaf285553f55586a5c6006a93dc64f941203..47a3565a4ad9c3ac0f15d8fcf18b2cd6b27d8b45 100644 --- a/cmix/pickup/pickup.go +++ b/cmix/pickup/pickup.go @@ -80,12 +80,10 @@ func (m *pickup) StartProcessors() stoppable.Stoppable { } // Start the periodic unchecked round worker - if !m.params.RealtimeOnly { - stopper := stoppable.NewSingle("UncheckRound") - go m.processUncheckedRounds( - m.params.UncheckRoundPeriod, backOffTable, stopper) - multi.Add(stopper) - } + stopper := stoppable.NewSingle("UncheckRound") + go m.processUncheckedRounds( + m.params.UncheckRoundPeriod, backOffTable, stopper) + multi.Add(stopper) return multi } diff --git a/cmix/pickup/retrieve.go b/cmix/pickup/retrieve.go index 45c8a7e3f66c2d321fbd3882acb40f4ed8e3c9ea..a556fd9f977439f3ef11f1d6c7aadf49630f315d 100644 --- a/cmix/pickup/retrieve.go +++ b/cmix/pickup/retrieve.go @@ -53,14 +53,12 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, ri := rl.Round jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) - if !m.params.RealtimeOnly { - err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw, - rl.Identity.Source, rl.Identity.EphId) - if err != nil { - jww.FATAL.Panicf( - "Failed to denote Unchecked Round for round %d", - id.Round(ri.ID)) - } + err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw, + rl.Identity.Source, rl.Identity.EphId) + if err != nil { + jww.FATAL.Panicf( + "Failed to denote Unchecked Round for round %d", + id.Round(ri.ID)) } // Convert gateways in round to proper ID format @@ -80,7 +78,7 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, // first, randomize other members of the team var rndBytes [32]byte stream := m.rng.GetStream() - _, err := stream.Read(rndBytes[:]) + _, err = stream.Read(rndBytes[:]) stream.Close() if err != nil { jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ @@ -142,13 +140,11 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, m.messageBundles <- bundle jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) - if !m.params.RealtimeOnly { - err = m.unchecked.Remove( - id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) - if err != nil { - jww.ERROR.Printf("Could not remove round %d from "+ - "unchecked rounds store: %v", ri.ID, err) - } + err = m.unchecked.Remove( + id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) + if err != nil { + jww.ERROR.Printf("Could not remove round %d from "+ + "unchecked rounds store: %v", ri.ID, err) } } @@ -212,11 +208,10 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round, " in round %d. This happening every once in a while is normal,"+ " but can be indicative of a problem if it is consistent", identity.Source, roundID) - if m.params.RealtimeOnly { - err = m.unchecked.Remove(roundID, identity.Source, identity.EphId) - if err != nil { - jww.ERROR.Printf("Failed to remove round %d: %+v", roundID, err) - } + + err = m.unchecked.EndCheck(roundID, identity.Source, identity.EphId) + if err != nil { + jww.ERROR.Printf("Failed to end the check for the round round %d: %+v", roundID, err) } return message.Bundle{}, nil diff --git a/cmix/pickup/store/store.go b/cmix/pickup/store/store.go index 88a300494aaa2003c3181c60cec46b6c6e884821..7ff76610178045751158bd9489e52b32bebdbd56 100644 --- a/cmix/pickup/store/store.go +++ b/cmix/pickup/store/store.go @@ -133,6 +133,9 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, defer s.mux.RUnlock() for _, rnd := range s.list { + if rnd.beingChecked { + continue + } jww.DEBUG.Printf("Round for lookup: %d, %+v\n", rnd.Id, rnd) go func(localRid id.Round, localRnd UncheckedRound) { iterator(localRid, localRnd) @@ -140,6 +143,23 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, } } +// EndCheck increments the amount of checks performed on this stored +// round. +func (s *UncheckedRoundStore) EndCheck(rid id.Round, recipient *id.ID, + ephId ephemeral.Id) error { + s.mux.Lock() + defer s.mux.Unlock() + nri := newRoundIdentity(rid, recipient, ephId) + rnd, exists := s.list[nri] + if !exists { + return errors.Errorf("round %d could not be found in RAM", rid) + } + + rnd.beingChecked = false + s.list[nri] = rnd + return nil +} + // IncrementCheck increments the amount of checks performed on this stored // round. func (s *UncheckedRoundStore) IncrementCheck(rid id.Round, recipient *id.ID, @@ -165,6 +185,7 @@ func (s *UncheckedRoundStore) IncrementCheck(rid id.Round, recipient *id.ID, // Update the rounds state rnd.LastCheck = netTime.Now() rnd.NumChecks++ + rnd.storageUpToDate = false s.list[nri] = rnd return s.save() } diff --git a/cmix/pickup/store/uncheckedRounds.go b/cmix/pickup/store/uncheckedRounds.go index bc62d8a1d64583c44ca690af23aa733ee2e742e8..48dd1e686e7086d6afb8e4007930222fbfb7ab37 100644 --- a/cmix/pickup/store/uncheckedRounds.go +++ b/cmix/pickup/store/uncheckedRounds.go @@ -55,17 +55,21 @@ type UncheckedRound struct { LastCheck time.Time // Number of times a round has been checked NumChecks uint64 + + storageUpToDate bool + beingChecked bool } // marshal serializes UncheckedRound r into a byte slice. func (r UncheckedRound) marshal(kv *versioned.KV) ([]byte, error) { buf := bytes.NewBuffer(nil) // Store teh round info - if r.Info != nil { + if r.Info != nil && !r.storageUpToDate { if err := storeRoundInfo(kv, r.Info, r.Source, r.EpdId); err != nil { return nil, errors.WithMessagef(err, "failed to marshal unchecked rounds") } + r.storageUpToDate = true } // Marshal the round ID @@ -126,6 +130,7 @@ func (r *UncheckedRound) unmarshal(kv *versioned.KV, buff *bytes.Buffer) error { r.NumChecks = binary.LittleEndian.Uint64(buff.Next(uint64Size)) r.Info, _ = loadRoundInfo(kv, r.Id, r.Source, r.EpdId) + r.storageUpToDate = true return nil } diff --git a/cmix/pickup/unchecked.go b/cmix/pickup/unchecked.go index ca289a1ee390bfb7dd9df3c634590d8a54be7752..37a8e2ce57f414094f117bb805ca33a4050dc3c7 100644 --- a/cmix/pickup/unchecked.go +++ b/cmix/pickup/unchecked.go @@ -46,41 +46,40 @@ func (m *pickup) processUncheckedRounds(checkInterval time.Duration, ticker := time.NewTicker(checkInterval) uncheckedRoundStore := m.unchecked for { + iterator := func(rid id.Round, rnd store.UncheckedRound) { + jww.DEBUG.Printf( + "Checking if round %d is due for a message lookup.", rid) + // If this round is due for a round check, send the round over + // to the retrieval thread. If not due, then check next round. + if !isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { + return + } + + jww.INFO.Printf( + "Round %d due for a message lookup, retrying...", rid) + + // Check if it needs to be processed by historical Rounds + m.GetMessagesFromRound(rid, receptionID.EphemeralIdentity{ + EphId: rnd.EpdId, + Source: rnd.Source, + }) + + // Update the state of the round for next look-up (if needed) + err := uncheckedRoundStore.IncrementCheck(rid, rnd.Source, rnd.EpdId) + if err != nil { + jww.ERROR.Printf("processUncheckedRounds error: Could not "+ + "increment check attempts for round %d: %v", rid, err) + } + } + + // Pull and iterate through uncheckedRound list + m.unchecked.IterateOverList(iterator) select { case <-stop.Quit(): ticker.Stop() stop.ToStopped() return - case <-ticker.C: - iterator := func(rid id.Round, rnd store.UncheckedRound) { - jww.DEBUG.Printf( - "Checking if round %d is due for a message lookup.", rid) - // If this round is due for a round check, send the round over - // to the retrieval thread. If not due, then check next round. - if !isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { - return - } - - jww.INFO.Printf( - "Round %d due for a message lookup, retrying...", rid) - - // Check if it needs to be processed by historical Rounds - m.GetMessagesFromRound(rid, receptionID.EphemeralIdentity{ - EphId: rnd.EpdId, - Source: rnd.Source, - }) - - // Update the state of the round for next look-up (if needed) - err := uncheckedRoundStore.IncrementCheck(rid, rnd.Source, rnd.EpdId) - if err != nil { - jww.ERROR.Printf("processUncheckedRounds error: Could not "+ - "increment check attempts for round %d: %v", rid, err) - } - } - - // Pull and iterate through uncheckedRound list - m.unchecked.IterateOverList(iterator) } } } diff --git a/cmix/rounds/historical.go b/cmix/rounds/historical.go index 165ce22817fd2e9e01dda3de8c410ab0c174f40c..fb56ed3cc5c63392c17a15b10f9bb4f3b5b4ce87 100644 --- a/cmix/rounds/historical.go +++ b/cmix/rounds/historical.go @@ -210,7 +210,8 @@ func processHistoricalRoundsResponse(response *pb.HistoricalRoundsResponse, // The interface has missing returns returned as nil, such roundRequests // need to be removed as processing so that the network follower will // pick them up in the future. - if roundInfo == nil { + if roundInfo == nil || roundInfo.ID == 0 || + roundInfo.Topology == nil || len(roundInfo.Topology) == 0 { var errMsg string roundRequests[i].numAttempts++ diff --git a/go.mod b/go.mod index f801c6b274cd70c15ad0a93673b045959ef61096..8d315073422fe21c6fb144031b932b9233e93744 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( gitlab.com/elixxir/comms v0.0.4-0.20221108183306-7c473bca3cd6 gitlab.com/elixxir/crypto v0.0.7-0.20221108183134-3f4a29c68e97 gitlab.com/elixxir/ekv v0.2.1 - gitlab.com/elixxir/primitives v0.0.3-0.20221108183049-7bdff545bf8e + gitlab.com/elixxir/primitives v0.0.3-0.20221109183528-7cbb24d14a5b gitlab.com/xx_network/comms v0.0.4-0.20221108183032-8732b403e506 gitlab.com/xx_network/crypto v0.0.5-0.20221108182939-05998dc093e9 gitlab.com/xx_network/primitives v0.0.4-0.20221104175439-f0d440beacd9 diff --git a/go.sum b/go.sum index c01af643f75095482c030841cb7c53a8058b6132..c973b41ced0827e7ef1f81bcc15ba1c0353ff8ae 100644 --- a/go.sum +++ b/go.sum @@ -491,6 +491,8 @@ gitlab.com/elixxir/ekv v0.2.1 h1:dtwbt6KmAXG2Tik5d60iDz2fLhoFBgWwST03p7T+9Is= gitlab.com/elixxir/ekv v0.2.1/go.mod h1:USLD7xeDnuZEavygdrgzNEwZXeLQJK/w1a+htpN+JEU= gitlab.com/elixxir/primitives v0.0.3-0.20221108183049-7bdff545bf8e h1:U9aMSBhc2ShJEeCXHNoUPVwrxKAGRhbGZWsbqES8iGA= gitlab.com/elixxir/primitives v0.0.3-0.20221108183049-7bdff545bf8e/go.mod h1:KeMEPwtmA0zJpetvv5SeyOePIAzsBmQSwJ2nAHYnczw= +gitlab.com/elixxir/primitives v0.0.3-0.20221109183528-7cbb24d14a5b h1:d9FdXv6CB5elYH3cYMcgnRkRGLYm7SPPneSCeQGUsqs= +gitlab.com/elixxir/primitives v0.0.3-0.20221109183528-7cbb24d14a5b/go.mod h1:KeMEPwtmA0zJpetvv5SeyOePIAzsBmQSwJ2nAHYnczw= gitlab.com/xx_network/comms v0.0.4-0.20221108183032-8732b403e506 h1:QoFyWFGuUquPvam/rHvnUEDdTQ3DzIhxn3x8WTWxXMA= gitlab.com/xx_network/comms v0.0.4-0.20221108183032-8732b403e506/go.mod h1:/zveWGwSqQbPhPSiGIbS3v78nga8nD5XD1oXr0vD/Zc= gitlab.com/xx_network/crypto v0.0.5-0.20221108182939-05998dc093e9 h1:A/gYzA36bQxj6tRczs11JEUsWzaLmTRXN89tlqWEOUc=