Skip to content
Snippets Groups Projects
Commit 75615ac2 authored by Josh Brooks's avatar Josh Brooks
Browse files

Use storage to make sure forced retry is not done

parent 8d4e1c09
Branches
Tags
1 merge request!23Release
...@@ -14,7 +14,6 @@ import ( ...@@ -14,7 +14,6 @@ import (
"gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"sync"
) )
type Manager struct { type Manager struct {
...@@ -24,7 +23,6 @@ type Manager struct { ...@@ -24,7 +23,6 @@ type Manager struct {
historicalRounds chan historicalRoundRequest historicalRounds chan historicalRoundRequest
lookupRoundMessages chan roundLookup lookupRoundMessages chan roundLookup
lookupRetryTracker sync.Map
messageBundles chan<- message.Bundle messageBundles chan<- message.Bundle
} }
......
...@@ -83,16 +83,13 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, ...@@ -83,16 +83,13 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
// randomly not picking up messages (FOR INTEGRATION TEST). Only done if // randomly not picking up messages (FOR INTEGRATION TEST). Only done if
// round has not been ignored before // round has not been ignored before
var bundle message.Bundle var bundle message.Bundle
_, ok := m.lookupRetryTracker.Load(ri.ID) if m.params.ForceMessagePickupRetry {
jww.INFO.Printf("After adding round %d to tracker entry is %v", ri.ID, ok)
if !ok && m.params.ForceMessagePickupRetry {
bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds) bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds)
if err != nil { if err != nil {
jww.ERROR.Printf("Failed to get pickup round %d "+ jww.ERROR.Printf("Failed to get pickup round %d "+
"from all gateways (%v): %s", "from all gateways (%v): %s",
id.Round(ri.ID), gwIds, err) id.Round(ri.ID), gwIds, err)
} }
m.lookupRetryTracker.Store(ri.ID, struct{}{})
} else { } else {
// Attempt to request for this gateway // Attempt to request for this gateway
bundle, err = m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds) bundle, err = m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds)
...@@ -197,6 +194,8 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id ...@@ -197,6 +194,8 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
// not looking up messages // not looking up messages
func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup,
comms messageRetrievalComms, gwIds []*id.ID) (bundle message.Bundle, err error) { comms messageRetrievalComms, gwIds []*id.ID) (bundle message.Bundle, err error) {
_, ok := m.Session.UncheckedRounds().GetRound(id.Round(ri.ID))
if !ok {
// Flip a coin to determine whether to pick up message // Flip a coin to determine whether to pick up message
stream := m.Rng.GetStream() stream := m.Rng.GetStream()
defer stream.Close() defer stream.Close()
...@@ -214,7 +213,7 @@ func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, ...@@ -214,7 +213,7 @@ func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup,
return return
} }
}
// Attempt to request for this gateway // Attempt to request for this gateway
return m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds) return m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds)
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment