From 23a5a42ec072a4b81426b642aaaeb7e872758427 Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Thu, 1 Apr 2021 16:01:18 -0700 Subject: [PATCH] removed processign round and updated historical rounds to handle failures and retries --- interfaces/params/rounds.go | 8 +- network/rounds/check.go | 1 + network/rounds/historical.go | 22 +++- network/rounds/manager.go | 12 +- network/rounds/processingRounds.go | 155 ------------------------ network/rounds/processingRounds_test.go | 124 ------------------- network/rounds/retrieve.go | 4 +- network/rounds/utils_test.go | 1 - 8 files changed, 25 insertions(+), 302 deletions(-) delete mode 100644 network/rounds/processingRounds.go delete mode 100644 network/rounds/processingRounds_test.go diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index dbf0d5a00..87fa53fc0 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -12,9 +12,6 @@ import ( ) type Rounds struct { - // Maximum number of times to attempt to retrieve a round from a gateway - // before giving up on it - MaxAttemptsCheckingARound uint // Number of historical rounds required to automatically send a historical // rounds query MaxHistoricalRounds uint @@ -31,11 +28,13 @@ type Rounds struct { // Toggles if historical rounds should always be used ForceHistoricalRounds bool + + // Maximum number of times a historical round lookup will be attempted + MaxHistoricalRoundsRetries uint } func GetDefaultRounds() Rounds { return Rounds{ - MaxAttemptsCheckingARound: 5, MaxHistoricalRounds: 100, HistoricalRoundsPeriod: 100 * time.Millisecond, NumMessageRetrievalWorkers: 8, @@ -43,5 +42,6 @@ func GetDefaultRounds() Rounds { HistoricalRoundsBufferLen: 1000, LookupRoundsBufferLen: 2000, ForceHistoricalRounds: false, + MaxHistoricalRoundsRetries: 3, } } diff --git a/network/rounds/check.go b/network/rounds/check.go index c4ffdede2..9deb3ec6c 100644 --- a/network/rounds/check.go +++ b/network/rounds/check.go @@ -61,6 +61,7 @@ func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.Iden m.historicalRounds <- historicalRoundRequest{ rid: roundID, identity: identity, + numAttempts: 0, } } else { jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ diff --git a/network/rounds/historical.go b/network/rounds/historical.go index 1a0f5c100..d5451989c 100644 --- a/network/rounds/historical.go +++ b/network/rounds/historical.go @@ -36,6 +36,7 @@ type historicalRoundsComms interface { type historicalRoundRequest struct { rid id.Round identity reception.IdentityUse + numAttempts uint } // Long running thread which process historical rounds @@ -62,7 +63,6 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c select { case m.historicalRounds <- r: default: - m.p.NotProcessing(r.rid, r.identity.EphId, r.identity.Source) } } done = true @@ -123,9 +123,23 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c // need be be removes as processing so the network follower will // pick them up in the future. if roundInfo == nil { - jww.ERROR.Printf("Failed to retreive "+ - "historical round %d", roundRequests[i].rid) - m.p.Fail(roundRequests[i].rid, roundRequests[i].identity.EphId, roundRequests[i].identity.Source) + roundRequests[i].numAttempts++ + if roundRequests[i].numAttempts==m.params.MaxHistoricalRoundsRetries{ + jww.ERROR.Printf("Failed to retreive historical " + + "round %d on last attempt, will not try again", + roundRequests[i].rid) + }else{ + select { + case m.historicalRounds <-roundRequests[i]: + jww.WARN.Printf("Failed to retreive historical " + + "round %d, will try up to %d more times", + roundRequests[i].rid, m.params.MaxHistoricalRoundsRetries-roundRequests[i].numAttempts) + default: + jww.WARN.Printf("Failed to retreive historical " + + "round %d, failed to try again, round will not be " + + "retreived", roundRequests[i].rid) + } + } continue } // Successfully retrieved roundRequests are sent to the Message diff --git a/network/rounds/manager.go b/network/rounds/manager.go index c2a3a37b2..177c1c318 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -13,15 +13,11 @@ import ( "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/stoppable" - "gitlab.com/xx_network/primitives/id" - "gitlab.com/xx_network/primitives/id/ephemeral" ) type Manager struct { params params.Rounds - p *processing - internal.Internal historicalRounds chan historicalRoundRequest @@ -33,7 +29,6 @@ func NewManager(internal internal.Internal, params params.Rounds, bundles chan<- message.Bundle) *Manager { m := &Manager{ params: params, - p: newProcessingRounds(), historicalRounds: make(chan historicalRoundRequest, params.HistoricalRoundsBufferLen), lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), @@ -60,9 +55,4 @@ func (m *Manager) StartProcessors() stoppable.Stoppable { multi.Add(stopper) } return multi -} - -func (m *Manager) DeleteProcessingRoundDelete(round id.Round, eph ephemeral.Id, source *id.ID) { - - m.p.Delete(round, eph, source) -} +} \ No newline at end of file diff --git a/network/rounds/processingRounds.go b/network/rounds/processingRounds.go deleted file mode 100644 index 482aeef26..000000000 --- a/network/rounds/processingRounds.go +++ /dev/null @@ -1,155 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - -package rounds - -// File for storing info about which rounds are processing - -import ( - "crypto/md5" - "encoding/binary" - "fmt" - "gitlab.com/xx_network/primitives/id" - "gitlab.com/xx_network/primitives/id/ephemeral" - "sync" -) - -type Status uint8 - -const ( - NotProcessing Status = iota - Processing - Done -) - -func (s Status) String() string { - switch s { - case NotProcessing: - return "NotProcessing" - case Processing: - return "Processing" - case Done: - return "Done" - default: - return fmt.Sprintf("Unknown Status: %d", s) - } -} - -type status struct { - failCount uint - Status -} - -// processing struct with a lock so it can be managed with concurrent threads. -type processing struct { - rounds map[hashID]*status - sync.RWMutex -} - -type hashID [16]byte - -func makeHashID(round id.Round, eph ephemeral.Id, source *id.ID) hashID { - h := md5.New() - ridbytes := make([]byte, 8) - binary.BigEndian.PutUint64(ridbytes, uint64(round)) - h.Write(ridbytes) - h.Write(eph[:]) - h.Write(source.Bytes()) - - hBytes := h.Sum(nil) - hid := hashID{} - copy(hid[:], hBytes) - return hid -} - -// newProcessingRounds returns a new processing rounds object. -func newProcessingRounds() *processing { - return &processing{ - rounds: make(map[hashID]*status), - } -} - -// Process adds a round to the list of processing rounds. The returned boolean -// is true when the round changes from "not processing" to "processing". The -// returned count is the number of times the round has been processed. -func (pr *processing) Process(round id.Round, eph ephemeral.Id, source *id.ID) (Status, uint) { - hid := makeHashID(round, eph, source) - - pr.Lock() - defer pr.Unlock() - - var rs *status - var ok bool - - if rs, ok = pr.rounds[hid]; ok && rs.Status == NotProcessing { - rs.Status = Processing - return NotProcessing, rs.failCount - } else if !ok { - rs = &status{ - failCount: 0, - Status: Processing, - } - pr.rounds[hid] = rs - return NotProcessing, rs.failCount - } - - return rs.Status, rs.failCount -} - -// IsProcessing determines if a round ID is marked as processing. -func (pr *processing) IsProcessing(round id.Round, eph ephemeral.Id, source *id.ID) bool { - hid := makeHashID(round, eph, source) - pr.RLock() - defer pr.RUnlock() - - if rs, ok := pr.rounds[hid]; ok { - return rs.Status == Processing - } - - return false -} - -// Fail sets a round's processing status to failed and increments its fail -// counter so that it can be retried. -func (pr *processing) Fail(round id.Round, eph ephemeral.Id, source *id.ID) { - hid := makeHashID(round, eph, source) - pr.Lock() - defer pr.Unlock() - if rs, ok := pr.rounds[hid]; ok { - rs.Status = NotProcessing - rs.failCount++ - } -} - -// Done deletes a round from the processing list. -func (pr *processing) Done(round id.Round, eph ephemeral.Id, source *id.ID) { - hid := makeHashID(round, eph, source) - pr.Lock() - defer pr.Unlock() - if rs, ok := pr.rounds[hid]; ok { - rs.Status = Done - } -} - -// NotProcessing sets a round's processing status to failed so that it can be -// retried but does not increment its fail counter. -func (pr *processing) NotProcessing(round id.Round, eph ephemeral.Id, source *id.ID) { - hid := makeHashID(round, eph, source) - pr.Lock() - defer pr.Unlock() - if rs, ok := pr.rounds[hid]; ok { - rs.Status = NotProcessing - } -} - -// Done deletes a round from the processing list. -func (pr *processing) Delete(round id.Round, eph ephemeral.Id, source *id.ID) { - hid := makeHashID(round, eph, source) - pr.Lock() - defer pr.Unlock() - delete(pr.rounds, hid) -} diff --git a/network/rounds/processingRounds_test.go b/network/rounds/processingRounds_test.go deleted file mode 100644 index ce186dfbe..000000000 --- a/network/rounds/processingRounds_test.go +++ /dev/null @@ -1,124 +0,0 @@ -/////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 xx network SEZC // -// // -// Use of this source code is governed by a license that can be found in the // -// LICENSE file // -/////////////////////////////////////////////////////////////////////////////// - -package rounds - -import ( - "gitlab.com/xx_network/primitives/id" - "gitlab.com/xx_network/primitives/id/ephemeral" - "reflect" - "testing" -) - -// Testing functions for Processing Round structure - -// Tests happy path of newProcessingRounds. -func Test_newProcessingRounds(t *testing.T) { - expectedPr := &processing{ - rounds: make(map[hashID]*status), - } - - pr := newProcessingRounds() - - if !reflect.DeepEqual(expectedPr, pr) { - t.Errorf("Did not get expected processing."+ - "\n\texpected: %v\n\trecieved: %v", expectedPr, pr) - } -} - -// Tests happy path of Process. -func TestProcessing_Process(t *testing.T) { - pr := newProcessingRounds() - - ephID := ephemeral.Id{} - source := &id.ID{} - - testData := []struct { - rid id.Round - status Status - count uint - }{ - {10, NotProcessing, 0}, - {10, NotProcessing, 0}, - {10, Processing, 0}, - {100, NotProcessing, 0}, - {100, NotProcessing, 0}, - {100, Processing, 0}, - } - - for i, d := range testData { - hid := makeHashID(d.rid, ephID, source) - if _, exists := pr.rounds[hid]; exists { - pr.rounds[hid].Status = d.status - } - status, count := pr.Process(d.rid, ephID, source) - if status != d.status { - t.Errorf("Process() did not return the correct boolean for round "+ - "ID %d (%d).\nexpected: %s\nrecieved: %s", - d.rid, i, d.status, status) - } - if count != d.count { - t.Errorf("Process did not return the expected count for round ID "+ - "%d (%d).\n\texpected: %d\n\trecieved: %d", - d.rid, i, d.count, count) - } - - if _, ok := pr.rounds[hid]; !ok { - t.Errorf("Process() did not add round ID %d to the map (%d).", - d.rid, i) - } - } - -} - -// Tests happy path of IsProcessing. -func TestProcessing_IsProcessing(t *testing.T) { - pr := newProcessingRounds() - ephID := ephemeral.Id{} - source := &id.ID{} - rid := id.Round(10) - hid := makeHashID(rid, ephID, source) - pr.rounds[hid] = &status{0, Processing} - if !pr.IsProcessing(rid, ephID, source) { - t.Errorf("IsProcessing() should have returned %s for round ID %d.", Processing, rid) - } - pr.rounds[hid].Status = NotProcessing - if pr.IsProcessing(rid, ephID, source) { - t.Errorf("IsProcessing() should have returned %s for round ID %d.", NotProcessing, rid) - } -} - -// Tests happy path of Fail. -func TestProcessing_Fail(t *testing.T) { - pr := newProcessingRounds() - rid := id.Round(10) - ephID := ephemeral.Id{} - source := &id.ID{} - hid := makeHashID(rid, ephID, source) - pr.rounds[hid] = &status{0, Processing} - pr.Fail(rid, ephID, source) - if pr.rounds[hid].Status == Processing { - t.Errorf("Fail() did not mark processing as false for round id %d.", rid) - } - if pr.rounds[hid].failCount != 1 { - t.Errorf("Fail() did not increment the fail count of round id %d.", rid) - } -} - -// Tests happy path of Done. -func TestProcessing_Done(t *testing.T) { - pr := newProcessingRounds() - rid := id.Round(10) - ephID := ephemeral.Id{} - source := &id.ID{} - hid := makeHashID(rid, ephID, source) - pr.rounds[hid] = &status{0, Processing} - pr.Done(rid, ephID, source) - if s, _ := pr.rounds[hid]; s.Status != Done { - t.Errorf("Done() failed to flag round ID %d.", rid) - } -} diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 526935777..effd66b7f 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -82,7 +82,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, // After trying all gateways, if none returned we mark the round as a // failure and print out the last error if err != nil { - m.p.Fail(id.Round(ri.ID), rl.identity.EphId, rl.identity.Source) jww.ERROR.Printf("Failed to get pickup round %d "+ "from all gateways (%v): final gateway %s returned : %s", id.Round(ri.ID), gwIDs, gwHosts[len(gwHosts)-1].GetId(), err) @@ -116,7 +115,6 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id } // if the gateway doesnt have the round, return an error if !msgResp.GetHasRound() { - m.p.Done(roundID, identity.EphId, identity.Source) return message.Bundle{}, errors.Errorf(noRoundError) } @@ -139,7 +137,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id Round: roundID, Messages: make([]format.Message, len(msgs)), Finish: func() { - m.p.Done(roundID, identity.EphId, identity.Source) + return }, } diff --git a/network/rounds/utils_test.go b/network/rounds/utils_test.go index f8f14e327..69c5925ed 100644 --- a/network/rounds/utils_test.go +++ b/network/rounds/utils_test.go @@ -23,7 +23,6 @@ func newManager(face interface{}) *Manager { testManager := &Manager{ lookupRoundMessages: make(chan roundLookup), messageBundles: make(chan message.Bundle), - p: newProcessingRounds(), Internal: internal.Internal{ Session: sess1, TransmissionID: sess1.GetUser().TransmissionID, -- GitLab