Skip to content
Snippets Groups Projects
Commit 943173f9 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'hotfix/MessageRetrieval2' into 'release'

Hotfix/message retrieval2

See merge request !634
parents f4fb6f67 96b02f19
No related branches found
No related tags found
1 merge request!23Release
...@@ -18,7 +18,6 @@ import ( ...@@ -18,7 +18,6 @@ import (
type Manager struct { type Manager struct {
params params.Rounds params params.Rounds
internal.Internal internal.Internal
sender *gateway.Sender sender *gateway.Sender
......
...@@ -38,19 +38,18 @@ const noRoundError = "does not have round %d" ...@@ -38,19 +38,18 @@ const noRoundError = "does not have round %d"
// of that round for messages for the requested identity in the roundLookup // of that round for messages for the requested identity in the roundLookup
func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
quitCh <-chan struct{}) { quitCh <-chan struct{}) {
done := false done := false
for !done { for !done {
select { select {
case <-quitCh: case <-quitCh:
done = true done = true
case rl := <-m.lookupRoundMessages: case rl := <-m.lookupRoundMessages:
// wrap this around the pickup logic
ri := rl.roundInfo ri := rl.roundInfo
jww.DEBUG.Printf("Checking for messages in round %d", ri.ID)
err := m.Session.UncheckedRounds().AddRound(rl.roundInfo, err := m.Session.UncheckedRounds().AddRound(rl.roundInfo,
rl.identity.EphId, rl.identity.Source) rl.identity.EphId, rl.identity.Source)
if err != nil { if err != nil {
jww.ERROR.Printf("Could not find round %d in unchecked rounds store: %v", jww.ERROR.Printf("Could not add round %d in unchecked rounds store: %v",
rl.roundInfo.ID, err) rl.roundInfo.ID, err)
} }
...@@ -81,10 +80,10 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, ...@@ -81,10 +80,10 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
}) })
// If ForceMessagePickupRetry, we are forcing processUncheckedRounds by // If ForceMessagePickupRetry, we are forcing processUncheckedRounds by
// randomly not picking up messages // randomly not picking up messages (FOR INTEGRATION TEST). Only done if
// round has not been ignored before
var bundle message.Bundle var bundle message.Bundle
if m.params.ForceMessagePickupRetry { if m.params.ForceMessagePickupRetry {
jww.INFO.Printf("Forcing message pickup retry for round %d", ri.ID)
bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds) bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds)
if err != nil { if err != nil {
jww.ERROR.Printf("Failed to get pickup round %d "+ jww.ERROR.Printf("Failed to get pickup round %d "+
...@@ -105,6 +104,7 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, ...@@ -105,6 +104,7 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
} }
if len(bundle.Messages) != 0 { if len(bundle.Messages) != 0 {
jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID)) err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID))
if err != nil { if err != nil {
jww.ERROR.Printf("Could not remove round %d "+ jww.ERROR.Printf("Could not remove round %d "+
...@@ -140,12 +140,13 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id ...@@ -140,12 +140,13 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
// If the gateway doesnt have the round, return an error // If the gateway doesnt have the round, return an error
msgResp, err := comms.RequestMessages(host, msgReq) msgResp, err := comms.RequestMessages(host, msgReq)
if err == nil && !msgResp.GetHasRound() { if err == nil && !msgResp.GetHasRound() {
jww.INFO.Printf("No round error for round %d received from %s", roundID, target)
return message.Bundle{}, false, errors.Errorf(noRoundError, roundID) return message.Bundle{}, false, errors.Errorf(noRoundError, roundID)
} }
return msgResp, false, err return msgResp, false, err
}) })
jww.INFO.Printf("Received message for round %d, processing...", roundID)
// Fail the round if an error occurs so it can be tried again later // Fail the round if an error occurs so it can be tried again later
if err != nil { if err != nil {
return message.Bundle{}, errors.WithMessagef(err, "Failed to "+ return message.Bundle{}, errors.WithMessagef(err, "Failed to "+
...@@ -191,19 +192,24 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id ...@@ -191,19 +192,24 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
// not looking up messages // not looking up messages
func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup,
comms messageRetrievalComms, gwIds []*id.ID) (bundle message.Bundle, err error) { comms messageRetrievalComms, gwIds []*id.ID) (bundle message.Bundle, err error) {
// Flip a coin to determine whether to pick up message rnd, _ := m.Session.UncheckedRounds().GetRound(id.Round(ri.ID))
stream := m.Rng.GetStream() if rnd.NumChecks == 0 {
defer stream.Close() // Flip a coin to determine whether to pick up message
b := make([]byte, 8) stream := m.Rng.GetStream()
_, err = stream.Read(b) defer stream.Close()
if err != nil { b := make([]byte, 8)
jww.FATAL.Panic(err.Error()) _, err = stream.Read(b)
} if err != nil {
result := binary.BigEndian.Uint64(b) jww.FATAL.Panic(err.Error())
if result%2 == 0 { }
// Do not call get message, leaving the round to be picked up result := binary.BigEndian.Uint64(b)
// in unchecked round scheduler process if result%2 == 0 {
return jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID)
// Do not call get message, leaving the round to be picked up
// in unchecked round scheduler process
return
}
} }
// Attempt to request for this gateway // Attempt to request for this gateway
......
...@@ -68,6 +68,7 @@ func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTab ...@@ -68,6 +68,7 @@ func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTab
select { select {
case m.lookupRoundMessages <- rl: case m.lookupRoundMessages <- rl:
case <-time.After(500 * time.Second): case <-time.After(500 * time.Second):
jww.WARN.Printf("Timing out, not retrying round %d", rl.roundInfo.ID)
} }
// Update the state of the round for next look-up (if needed) // Update the state of the round for next look-up (if needed)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment