Skip to content
Snippets Groups Projects
Commit 0739502e authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

Merge branch 'hotfix/WaitForMessageDelivery' into 'release'

Hotfix/wait for message delivery

See merge request !593
parents cf98ef00 f0a791f5
No related branches found
No related tags found
No related merge requests found
...@@ -67,6 +67,8 @@ type historicalRoundsComm interface { ...@@ -67,6 +67,8 @@ type historicalRoundsComm interface {
func (c *Client) GetRoundResults(roundList []id.Round, timeout time.Duration, func (c *Client) GetRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback RoundEventCallback) error { roundCallback RoundEventCallback) error {
jww.INFO.Printf("GetRoundResults(%v, %s)", roundList, timeout)
sendResults := make(chan ds.EventReturn, len(roundList)) sendResults := make(chan ds.EventReturn, len(roundList))
return c.getRoundResults(roundList, timeout, roundCallback, return c.getRoundResults(roundList, timeout, roundCallback,
...@@ -91,6 +93,8 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -91,6 +93,8 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
allRoundsSucceeded := true allRoundsSucceeded := true
numResults := 0 numResults := 0
oldestRound := networkInstance.GetOldestRoundID()
// Parse and adjudicate every round // Parse and adjudicate every round
for _, rnd := range roundList { for _, rnd := range roundList {
// Every round is timed out by default, until proven to have finished // Every round is timed out by default, until proven to have finished
...@@ -111,9 +115,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -111,9 +115,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
numResults++ numResults++
} }
} else { } else {
jww.DEBUG.Printf("Failed to ger round [%d] in buffer: %v", rnd, err)
// Update oldest round (buffer may have updated externally) // Update oldest round (buffer may have updated externally)
oldestRound := networkInstance.GetOldestRoundID()
if rnd < oldestRound { if rnd < oldestRound {
// If round is older that oldest round in our buffer // If round is older that oldest round in our buffer
// Add it to the historical round request (performed later) // Add it to the historical round request (performed later)
...@@ -151,7 +153,9 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -151,7 +153,9 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback(false, true, roundsResults) roundCallback(false, true, roundsResults)
return return
case roundReport := <-sendResults: case roundReport := <-sendResults:
numResults-- numResults--
// Skip if the round is nil (unknown from historical rounds) // Skip if the round is nil (unknown from historical rounds)
// they default to timed out, so correct behavior is preserved // they default to timed out, so correct behavior is preserved
if roundReport.RoundInfo == nil || roundReport.TimedOut { if roundReport.RoundInfo == nil || roundReport.TimedOut {
...@@ -182,12 +186,14 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, ...@@ -182,12 +186,14 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds,
var resp *pb.HistoricalRoundsResponse var resp *pb.HistoricalRoundsResponse
for { //retry 5 times
for i:=0;i<5;i++{
// Find a gateway to request about the roundRequests // Find a gateway to request about the roundRequests
gwHost, err := gateway.Get(instance.GetPartialNdf().Get(), comms, c.rng.GetStream()) gwHost, err := gateway.Get(instance.GetPartialNdf().Get(), comms, c.rng.GetStream())
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ jww.ERROR.Printf("Failed to track network, NDF has corrupt "+
"data: %s", err) "data: %s", err)
continue
} }
// If an error, retry with (potentially) a different gw host. // If an error, retry with (potentially) a different gw host.
...@@ -195,10 +201,17 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, ...@@ -195,10 +201,17 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds,
// and process rounds // and process rounds
resp, err = comms.RequestHistoricalRounds(gwHost, msg) resp, err = comms.RequestHistoricalRounds(gwHost, msg)
if err == nil { if err == nil {
jww.ERROR.Printf("Failed to lookup historical rounds: %s",
err)
}else{
break break
} }
} }
if resp == nil{
return
}
// Process historical rounds, sending back to the caller thread // Process historical rounds, sending back to the caller thread
for _, ri := range resp.Rounds { for _, ri := range resp.Rounds {
sendResults <- ds.EventReturn{ sendResults <- ds.EventReturn{
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
package bindings package bindings
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
...@@ -157,11 +156,7 @@ func UnmarshalContact(b []byte) (*Contact, error) { ...@@ -157,11 +156,7 @@ func UnmarshalContact(b []byte) (*Contact, error) {
//Unmarshals a marshaled send report object, returns an error if it fails //Unmarshals a marshaled send report object, returns an error if it fails
func UnmarshalSendReport(b []byte) (*SendReport, error) { func UnmarshalSendReport(b []byte) (*SendReport, error) {
sr := &SendReport{} sr := &SendReport{}
if err := json.Unmarshal(b, sr); err != nil { return sr, sr.Unmarshal(b)
return nil, errors.New(fmt.Sprintf("Failed to Unmarshal "+
"Send Report: %+v", err))
}
return sr, nil
} }
// StartNetworkFollower kicks off the tracking of the network. It starts // StartNetworkFollower kicks off the tracking of the network. It starts
...@@ -363,16 +358,25 @@ func (c *Client) WaitForRoundCompletion(roundID int, ...@@ -363,16 +358,25 @@ func (c *Client) WaitForRoundCompletion(roundID int,
// the same pointer. // the same pointer.
func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte, func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte,
mdc MessageDeliveryCallback, timeoutMS int) error { mdc MessageDeliveryCallback, timeoutMS int) error {
jww.INFO.Printf("WaitForMessageDelivery(%v, _, %v)",
marshaledSendReport, timeoutMS)
sr, err := UnmarshalSendReport(marshaledSendReport) sr, err := UnmarshalSendReport(marshaledSendReport)
if err != nil { if err != nil {
return errors.New(fmt.Sprintf("Failed to "+ return errors.New(fmt.Sprintf("Failed to "+
"WaitForRoundCompletion callback due to bad Send Report: %+v", err)) "WaitForMessageDelivery callback due to bad Send Report: %+v", err))
}
if sr==nil || sr.rl == nil || len(sr.rl.list) == 0{
return errors.New(fmt.Sprintf("Failed to "+
"WaitForMessageDelivery callback due to invalid Send Report " +
"unmarshal: %s", string(marshaledSendReport)))
} }
f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]api.RoundResult) { f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]api.RoundResult) {
results := make([]byte, len(sr.rl.list)) results := make([]byte, len(sr.rl.list))
jww.INFO.Printf("Processing WaitForMessageDelivery report " +
"for %v, success: %v, timedout: %v", sr.mid, allRoundsSucceeded,
timedOut)
for i, r := range sr.rl.list { for i, r := range sr.rl.list {
if result, exists := rounds[r]; exists { if result, exists := rounds[r]; exists {
results[i] = byte(result) results[i] = byte(result)
...@@ -384,7 +388,9 @@ func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte, ...@@ -384,7 +388,9 @@ func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte,
timeout := time.Duration(timeoutMS) * time.Millisecond timeout := time.Duration(timeoutMS) * time.Millisecond
return c.api.GetRoundResults(sr.rl.list, timeout, f) err = c.api.GetRoundResults(sr.rl.list, timeout, f)
return err
} }
// Returns a user object from which all information about the current user // Returns a user object from which all information about the current user
......
...@@ -135,6 +135,11 @@ type SendReport struct { ...@@ -135,6 +135,11 @@ type SendReport struct {
mid e2e.MessageID mid e2e.MessageID
} }
type SendReportDisk struct{
List []id.Round
Mid []byte
}
func (sr *SendReport) GetRoundList() *RoundList { func (sr *SendReport) GetRoundList() *RoundList {
return sr.rl return sr.rl
} }
...@@ -144,5 +149,22 @@ func (sr *SendReport) GetMessageID() []byte { ...@@ -144,5 +149,22 @@ func (sr *SendReport) GetMessageID() []byte {
} }
func (sr *SendReport) Marshal() ([]byte, error) { func (sr *SendReport) Marshal() ([]byte, error) {
return json.Marshal(sr) srd := SendReportDisk{
List: sr.rl.list,
Mid: sr.mid[:],
}
return json.Marshal(&srd)
}
func (sr *SendReport) Unmarshal(b []byte) error {
srd := SendReportDisk{
}
if err := json.Unmarshal(b, &srd); err!=nil{
return errors.New(fmt.Sprintf("Failed to unmarshal send " +
"report: %s", err.Error()))
}
copy(sr.mid[:],srd.Mid)
sr.rl = &RoundList{list:srd.List}
return nil
} }
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"math" "math"
"sync/atomic"
"time" "time"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
...@@ -64,8 +65,9 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch ...@@ -64,8 +65,9 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch
case <-ticker.C: case <-ticker.C:
m.follow(report, rng, m.Comms) m.follow(report, rng, m.Comms)
case <-TrackTicker.C: case <-TrackTicker.C:
jww.INFO.Println(m.tracker.Report()) numPolls := atomic.SwapUint64(m.tracker, 0)
m.tracker = newPollTracker() jww.INFO.Printf("Polled the network %d times in the " +
"last %s", numPolls, debugTrackPeriod)
} }
} }
} }
...@@ -80,7 +82,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, ...@@ -80,7 +82,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
"impossible: %+v", err) "impossible: %+v", err)
} }
m.tracker.Track(identity.EphId, identity.Source) atomic.AddUint64(m.tracker, 1)
//randomly select a gateway to poll //randomly select a gateway to poll
//TODO: make this more intelligent //TODO: make this more intelligent
...@@ -110,11 +112,13 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, ...@@ -110,11 +112,13 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
pollResp, err := comms.SendPoll(gwHost, &pollReq) pollResp, err := comms.SendPoll(gwHost, &pollReq)
if err != nil { if err != nil {
if report!=nil{
report( report(
"NetworkFollower", "NetworkFollower",
fmt.Sprintf("Failed to poll network, \"%s\", Gateway: %s", err.Error(), gwHost.String()), fmt.Sprintf("Failed to poll network, \"%s\", Gateway: %s", err.Error(), gwHost.String()),
fmt.Sprintf("%+v", err), fmt.Sprintf("%+v", err),
) )
}
jww.ERROR.Printf("Unable to poll %s for NDF: %+v", gwHost, err) jww.ERROR.Printf("Unable to poll %s for NDF: %+v", gwHost, err)
return return
} }
......
...@@ -43,8 +43,8 @@ type manager struct { ...@@ -43,8 +43,8 @@ type manager struct {
round *rounds.Manager round *rounds.Manager
message *message.Manager message *message.Manager
//map of polls for debugging //number of polls done in a period of time
tracker *pollTracker tracker *uint64
//tracks already checked rounds //tracks already checked rounds
checked *checkedRounds checked *checkedRounds
...@@ -67,10 +67,12 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, ...@@ -67,10 +67,12 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
// set them here when they are needed on startup // set them here when they are needed on startup
session.E2e().SetE2ESessionParams(params.E2EParams) session.E2e().SetE2ESessionParams(params.E2EParams)
tracker := uint64(0)
//create manager object //create manager object
m := manager{ m := manager{
param: params, param: params,
tracker: newPollTracker(), tracker: &tracker,
checked: newCheckedRounds(), checked: newCheckedRounds(),
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment