diff --git a/network/rounds/manager.go b/network/rounds/manager.go index c488e39f3b2dc2311166b99d21b680dd3d0b8028..4c1ec87a993afcdcfcf724d7af2ca69bf2ad6b98 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -18,7 +18,6 @@ import ( type Manager struct { params params.Rounds - internal.Internal sender *gateway.Sender diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 8b5029f07322d90ea11fa64a3ed0f4ba49ed33d0..6f37cb53d61f25d11001a758f8ea38554d1287cc 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -38,19 +38,18 @@ const noRoundError = "does not have round %d" // of that round for messages for the requested identity in the roundLookup func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, quitCh <-chan struct{}) { - done := false for !done { select { case <-quitCh: done = true case rl := <-m.lookupRoundMessages: - // wrap this around the pickup logic ri := rl.roundInfo + jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) err := m.Session.UncheckedRounds().AddRound(rl.roundInfo, rl.identity.EphId, rl.identity.Source) if err != nil { - jww.ERROR.Printf("Could not find round %d in unchecked rounds store: %v", + jww.ERROR.Printf("Could not add round %d in unchecked rounds store: %v", rl.roundInfo.ID, err) } @@ -81,10 +80,10 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, }) // If ForceMessagePickupRetry, we are forcing processUncheckedRounds by - // randomly not picking up messages + // randomly not picking up messages (FOR INTEGRATION TEST). Only done if + // round has not been ignored before var bundle message.Bundle if m.params.ForceMessagePickupRetry { - jww.INFO.Printf("Forcing message pickup retry for round %d", ri.ID) bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds) if err != nil { jww.ERROR.Printf("Failed to get pickup round %d "+ @@ -105,6 +104,7 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, } if len(bundle.Messages) != 0 { + jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID)) if err != nil { jww.ERROR.Printf("Could not remove round %d "+ @@ -140,12 +140,13 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id // If the gateway doesnt have the round, return an error msgResp, err := comms.RequestMessages(host, msgReq) if err == nil && !msgResp.GetHasRound() { + jww.INFO.Printf("No round error for round %d received from %s", roundID, target) return message.Bundle{}, false, errors.Errorf(noRoundError, roundID) } return msgResp, false, err }) - + jww.INFO.Printf("Received message for round %d, processing...", roundID) // Fail the round if an error occurs so it can be tried again later if err != nil { return message.Bundle{}, errors.WithMessagef(err, "Failed to "+ @@ -191,19 +192,24 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id // not looking up messages func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, comms messageRetrievalComms, gwIds []*id.ID) (bundle message.Bundle, err error) { - // Flip a coin to determine whether to pick up message - stream := m.Rng.GetStream() - defer stream.Close() - b := make([]byte, 8) - _, err = stream.Read(b) - if err != nil { - jww.FATAL.Panic(err.Error()) - } - result := binary.BigEndian.Uint64(b) - if result%2 == 0 { - // Do not call get message, leaving the round to be picked up - // in unchecked round scheduler process - return + rnd, _ := m.Session.UncheckedRounds().GetRound(id.Round(ri.ID)) + if rnd.NumChecks == 0 { + // Flip a coin to determine whether to pick up message + stream := m.Rng.GetStream() + defer stream.Close() + b := make([]byte, 8) + _, err = stream.Read(b) + if err != nil { + jww.FATAL.Panic(err.Error()) + } + result := binary.BigEndian.Uint64(b) + if result%2 == 0 { + jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID) + // Do not call get message, leaving the round to be picked up + // in unchecked round scheduler process + return + } + } // Attempt to request for this gateway diff --git a/network/rounds/unchecked.go b/network/rounds/unchecked.go index df2f6c85875b50bf85c1cbc1254d4d8337a4b9dd..e02f1181282d50bbd4c2d26e1bb8a4213115315d 100644 --- a/network/rounds/unchecked.go +++ b/network/rounds/unchecked.go @@ -68,6 +68,7 @@ func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTab select { case m.lookupRoundMessages <- rl: case <-time.After(500 * time.Second): + jww.WARN.Printf("Timing out, not retrying round %d", rl.roundInfo.ID) } // Update the state of the round for next look-up (if needed)