Skip to content
Snippets Groups Projects
Commit 90d9a6e7 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'hotfix/noRetry' into 'release'

activated the unchecked rounds process and removed the realtime mode becasue...

See merge request !452
parents 121dc3f3 8dcb19b3
No related branches found
No related tags found
2 merge requests!510Release,!452activated the unchecked rounds process and removed the realtime mode becasue...
......@@ -417,7 +417,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
// remaining
earliestRemaining, roundsWithMessages, roundsUnknown :=
gwRoundsState.RangeUnchecked(
updatedEarliestRound, c.param.KnownRoundsThreshold, roundChecker)
updatedEarliestRound, c.param.KnownRoundsThreshold, roundChecker, 100)
jww.DEBUG.Printf("Processed RangeUnchecked for %d, Oldest: %d, "+
"firstUnchecked: %d, last Checked: %d, threshold: %d, "+
......@@ -436,16 +436,15 @@ func (c *client) follow(identity receptionID.IdentityUse,
var roundsWithMessages2 []id.Round
if !c.param.RealtimeOnly {
roundsWithMessages2 = identity.UR.Iterate(func(rid id.Round) bool {
if gwRoundsState.Checked(rid) {
return Checker(rid, filterList, identity.CR)
}
return false
}, roundsUnknown, abandon)
}
for _, rid := range roundsWithMessages {
for i := 0; i < len(roundsWithMessages); i++ {
rid := roundsWithMessages[i]
// Denote that the round has been looked at in the tracking store
if identity.CR.Check(rid) {
c.GetMessagesFromRound(rid, identity.EphemeralIdentity)
......
......@@ -35,7 +35,7 @@ type paramsDisk struct {
func GetDefaultParams() Params {
return Params{
MessageReceptionBuffLen: 500,
MessageReceptionWorkerPoolSize: 4,
MessageReceptionWorkerPoolSize: 2,
MaxChecksInProcessMessage: 10,
InProcessMessageWait: 15 * time.Minute,
RealtimeOnly: false,
......
......@@ -52,9 +52,6 @@ type Params struct {
// for debugging.
VerboseRoundTracking bool
// RealtimeOnly disables all attempts to pick up dropped or missed messages.
RealtimeOnly bool
// ReplayRequests Resends auth requests up the stack if received multiple
// times.
ReplayRequests bool
......@@ -96,7 +93,7 @@ type paramsDisk struct {
// default parameters.
func GetDefaultParams() Params {
n := Params{
TrackNetworkPeriod: 100 * time.Millisecond,
TrackNetworkPeriod: 1000 * time.Millisecond,
MaxCheckedRounds: 500,
RegNodesBufferLen: 1000,
NetworkHealthTimeout: 15 * time.Second,
......@@ -104,7 +101,6 @@ func GetDefaultParams() Params {
KnownRoundsThreshold: 1500, // 5 rounds/sec * 60 sec/min * 5 min
FastPolling: true,
VerboseRoundTracking: false,
RealtimeOnly: false,
ReplayRequests: true,
MaxParallelIdentityTracks: 5,
ClockSkewClamp: 50 * time.Millisecond,
......@@ -141,7 +137,6 @@ func (p Params) MarshalJSON() ([]byte, error) {
KnownRoundsThreshold: p.KnownRoundsThreshold,
FastPolling: p.FastPolling,
VerboseRoundTracking: p.VerboseRoundTracking,
RealtimeOnly: p.RealtimeOnly,
ReplayRequests: p.ReplayRequests,
Rounds: p.Rounds,
Pickup: p.Pickup,
......@@ -170,7 +165,6 @@ func (p *Params) UnmarshalJSON(data []byte) error {
KnownRoundsThreshold: pDisk.KnownRoundsThreshold,
FastPolling: pDisk.FastPolling,
VerboseRoundTracking: pDisk.VerboseRoundTracking,
RealtimeOnly: pDisk.RealtimeOnly,
ReplayRequests: pDisk.ReplayRequests,
Rounds: pDisk.Rounds,
Pickup: pDisk.Pickup,
......@@ -182,13 +176,6 @@ func (p *Params) UnmarshalJSON(data []byte) error {
return nil
}
func (p Params) SetRealtimeOnlyAll() Params {
p.RealtimeOnly = true
p.Pickup.RealtimeOnly = true
p.Message.RealtimeOnly = true
return p
}
const DefaultDebugTag = "External"
type CMIXParams struct {
......
......@@ -59,14 +59,12 @@ func (m *pickup) GetMessagesFromRound(
identity.Source)
// store the round as an un-retrieved round
if !m.params.RealtimeOnly {
err = m.unchecked.AddRound(roundID, ri,
identity.Source, identity.EphId)
if err != nil {
jww.FATAL.Panicf(
"Failed to denote Unchecked Round for round %d", roundID)
}
}
// If found, send to Message Retrieval Workers
m.lookupRoundMessages <- roundLookup{
......
......@@ -35,9 +35,6 @@ type Params struct {
// tried
SendTimeout time.Duration
// Disables all attempts to pick up dropped or missed messages
RealtimeOnly bool
// Toggles if historical rounds should always be used
ForceHistoricalRounds bool
}
......@@ -57,13 +54,12 @@ type paramsDisk struct {
// GetDefaultParams returns a default set of Params.
func GetDefaultParams() Params {
return Params{
NumMessageRetrievalWorkers: 4,
LookupRoundsBufferLen: 2000,
MaxHistoricalRoundsRetries: 3,
UncheckRoundPeriod: 20 * time.Second,
NumMessageRetrievalWorkers: 2,
LookupRoundsBufferLen: 100000,
MaxHistoricalRoundsRetries: 2,
UncheckRoundPeriod: 120 * time.Second,
ForceMessagePickupRetry: false,
SendTimeout: 3 * time.Second,
RealtimeOnly: false,
}
}
......@@ -89,7 +85,6 @@ func (p Params) MarshalJSON() ([]byte, error) {
UncheckRoundPeriod: p.UncheckRoundPeriod,
ForceMessagePickupRetry: p.ForceMessagePickupRetry,
SendTimeout: p.SendTimeout,
RealtimeOnly: p.RealtimeOnly,
ForceHistoricalRounds: p.ForceHistoricalRounds,
}
......@@ -112,7 +107,6 @@ func (p *Params) UnmarshalJSON(data []byte) error {
UncheckRoundPeriod: pDisk.UncheckRoundPeriod,
ForceMessagePickupRetry: pDisk.ForceMessagePickupRetry,
SendTimeout: pDisk.SendTimeout,
RealtimeOnly: pDisk.RealtimeOnly,
ForceHistoricalRounds: pDisk.ForceHistoricalRounds,
}
......
......@@ -80,12 +80,10 @@ func (m *pickup) StartProcessors() stoppable.Stoppable {
}
// Start the periodic unchecked round worker
if !m.params.RealtimeOnly {
stopper := stoppable.NewSingle("UncheckRound")
go m.processUncheckedRounds(
m.params.UncheckRoundPeriod, backOffTable, stopper)
multi.Add(stopper)
}
return multi
}
......@@ -53,7 +53,6 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
ri := rl.Round
jww.DEBUG.Printf("Checking for messages in round %d", ri.ID)
if !m.params.RealtimeOnly {
err := m.unchecked.AddRound(id.Round(ri.ID), ri.Raw,
rl.Identity.Source, rl.Identity.EphId)
if err != nil {
......@@ -61,7 +60,6 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
"Failed to denote Unchecked Round for round %d",
id.Round(ri.ID))
}
}
// Convert gateways in round to proper ID format
gwIds := make([]*id.ID, ri.Topology.Len())
......@@ -80,7 +78,7 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
// first, randomize other members of the team
var rndBytes [32]byte
stream := m.rng.GetStream()
_, err := stream.Read(rndBytes[:])
_, err = stream.Read(rndBytes[:])
stream.Close()
if err != nil {
jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+
......@@ -142,14 +140,12 @@ func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms,
m.messageBundles <- bundle
jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
if !m.params.RealtimeOnly {
err = m.unchecked.Remove(
id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId)
if err != nil {
jww.ERROR.Printf("Could not remove round %d from "+
"unchecked rounds store: %v", ri.ID, err)
}
}
}
......@@ -212,11 +208,10 @@ func (m *pickup) getMessagesFromGateway(roundID id.Round,
" in round %d. This happening every once in a while is normal,"+
" but can be indicative of a problem if it is consistent",
identity.Source, roundID)
if m.params.RealtimeOnly {
err = m.unchecked.Remove(roundID, identity.Source, identity.EphId)
err = m.unchecked.EndCheck(roundID, identity.Source, identity.EphId)
if err != nil {
jww.ERROR.Printf("Failed to remove round %d: %+v", roundID, err)
}
jww.ERROR.Printf("Failed to end the check for the round round %d: %+v", roundID, err)
}
return message.Bundle{}, nil
......
......@@ -133,6 +133,9 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round,
defer s.mux.RUnlock()
for _, rnd := range s.list {
if rnd.beingChecked {
continue
}
jww.DEBUG.Printf("Round for lookup: %d, %+v\n", rnd.Id, rnd)
go func(localRid id.Round, localRnd UncheckedRound) {
iterator(localRid, localRnd)
......@@ -140,6 +143,23 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round,
}
}
// EndCheck increments the amount of checks performed on this stored
// round.
func (s *UncheckedRoundStore) EndCheck(rid id.Round, recipient *id.ID,
ephId ephemeral.Id) error {
s.mux.Lock()
defer s.mux.Unlock()
nri := newRoundIdentity(rid, recipient, ephId)
rnd, exists := s.list[nri]
if !exists {
return errors.Errorf("round %d could not be found in RAM", rid)
}
rnd.beingChecked = false
s.list[nri] = rnd
return nil
}
// IncrementCheck increments the amount of checks performed on this stored
// round.
func (s *UncheckedRoundStore) IncrementCheck(rid id.Round, recipient *id.ID,
......@@ -165,6 +185,7 @@ func (s *UncheckedRoundStore) IncrementCheck(rid id.Round, recipient *id.ID,
// Update the rounds state
rnd.LastCheck = netTime.Now()
rnd.NumChecks++
rnd.storageUpToDate = false
s.list[nri] = rnd
return s.save()
}
......
......@@ -55,17 +55,21 @@ type UncheckedRound struct {
LastCheck time.Time
// Number of times a round has been checked
NumChecks uint64
storageUpToDate bool
beingChecked bool
}
// marshal serializes UncheckedRound r into a byte slice.
func (r UncheckedRound) marshal(kv *versioned.KV) ([]byte, error) {
buf := bytes.NewBuffer(nil)
// Store teh round info
if r.Info != nil {
if r.Info != nil && !r.storageUpToDate {
if err := storeRoundInfo(kv, r.Info, r.Source, r.EpdId); err != nil {
return nil, errors.WithMessagef(err,
"failed to marshal unchecked rounds")
}
r.storageUpToDate = true
}
// Marshal the round ID
......@@ -126,6 +130,7 @@ func (r *UncheckedRound) unmarshal(kv *versioned.KV, buff *bytes.Buffer) error {
r.NumChecks = binary.LittleEndian.Uint64(buff.Next(uint64Size))
r.Info, _ = loadRoundInfo(kv, r.Id, r.Source, r.EpdId)
r.storageUpToDate = true
return nil
}
......
......@@ -46,13 +46,6 @@ func (m *pickup) processUncheckedRounds(checkInterval time.Duration,
ticker := time.NewTicker(checkInterval)
uncheckedRoundStore := m.unchecked
for {
select {
case <-stop.Quit():
ticker.Stop()
stop.ToStopped()
return
case <-ticker.C:
iterator := func(rid id.Round, rnd store.UncheckedRound) {
jww.DEBUG.Printf(
"Checking if round %d is due for a message lookup.", rid)
......@@ -81,6 +74,12 @@ func (m *pickup) processUncheckedRounds(checkInterval time.Duration,
// Pull and iterate through uncheckedRound list
m.unchecked.IterateOverList(iterator)
select {
case <-stop.Quit():
ticker.Stop()
stop.ToStopped()
return
case <-ticker.C:
}
}
}
......
......@@ -210,7 +210,8 @@ func processHistoricalRoundsResponse(response *pb.HistoricalRoundsResponse,
// The interface has missing returns returned as nil, such roundRequests
// need to be removed as processing so that the network follower will
// pick them up in the future.
if roundInfo == nil {
if roundInfo == nil || roundInfo.ID == 0 ||
roundInfo.Topology == nil || len(roundInfo.Topology) == 0 {
var errMsg string
roundRequests[i].numAttempts++
......
......@@ -17,7 +17,7 @@ require (
gitlab.com/elixxir/comms v0.0.4-0.20221108183306-7c473bca3cd6
gitlab.com/elixxir/crypto v0.0.7-0.20221108183134-3f4a29c68e97
gitlab.com/elixxir/ekv v0.2.1
gitlab.com/elixxir/primitives v0.0.3-0.20221108183049-7bdff545bf8e
gitlab.com/elixxir/primitives v0.0.3-0.20221109183528-7cbb24d14a5b
gitlab.com/xx_network/comms v0.0.4-0.20221108183032-8732b403e506
gitlab.com/xx_network/crypto v0.0.5-0.20221108182939-05998dc093e9
gitlab.com/xx_network/primitives v0.0.4-0.20221104175439-f0d440beacd9
......
......@@ -491,6 +491,8 @@ gitlab.com/elixxir/ekv v0.2.1 h1:dtwbt6KmAXG2Tik5d60iDz2fLhoFBgWwST03p7T+9Is=
gitlab.com/elixxir/ekv v0.2.1/go.mod h1:USLD7xeDnuZEavygdrgzNEwZXeLQJK/w1a+htpN+JEU=
gitlab.com/elixxir/primitives v0.0.3-0.20221108183049-7bdff545bf8e h1:U9aMSBhc2ShJEeCXHNoUPVwrxKAGRhbGZWsbqES8iGA=
gitlab.com/elixxir/primitives v0.0.3-0.20221108183049-7bdff545bf8e/go.mod h1:KeMEPwtmA0zJpetvv5SeyOePIAzsBmQSwJ2nAHYnczw=
gitlab.com/elixxir/primitives v0.0.3-0.20221109183528-7cbb24d14a5b h1:d9FdXv6CB5elYH3cYMcgnRkRGLYm7SPPneSCeQGUsqs=
gitlab.com/elixxir/primitives v0.0.3-0.20221109183528-7cbb24d14a5b/go.mod h1:KeMEPwtmA0zJpetvv5SeyOePIAzsBmQSwJ2nAHYnczw=
gitlab.com/xx_network/comms v0.0.4-0.20221108183032-8732b403e506 h1:QoFyWFGuUquPvam/rHvnUEDdTQ3DzIhxn3x8WTWxXMA=
gitlab.com/xx_network/comms v0.0.4-0.20221108183032-8732b403e506/go.mod h1:/zveWGwSqQbPhPSiGIbS3v78nga8nD5XD1oXr0vD/Zc=
gitlab.com/xx_network/crypto v0.0.5-0.20221108182939-05998dc093e9 h1:A/gYzA36bQxj6tRczs11JEUsWzaLmTRXN89tlqWEOUc=
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment