From d20e8577a8dadcd850981937d7abb659269ff0f8 Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Wed, 17 Feb 2021 10:55:09 -0800 Subject: [PATCH] fixed getRoundResults --- api/messages.go | 128 +++++++++++++++++++++++++++++++----------------- 1 file changed, 82 insertions(+), 46 deletions(-) diff --git a/api/messages.go b/api/messages.go index 844aa44ca..c36e5ae5e 100644 --- a/api/messages.go +++ b/api/messages.go @@ -19,21 +19,31 @@ import ( ) type RoundEventCallback interface { - Report(rid, state int, timedOut bool) bool + Report(succeeded, timedout bool, rounds map[id.Round]RoundResult) +} + +type RoundResult uint +const( + TimeOut RoundResult = iota + Failed + Succeeded +) + + +func (c *Client) GetRoundResults(roundList []id.Round, + roundCallback RoundEventCallback, timeout time.Duration) error { } // Adjudicates on the rounds requested. Checks if they are older rounds or in progress rounds. // Sends updates on the rounds with callbacks -func (c *Client) RegisterMessageDelivery(roundList []id.Round, roundCallback RoundEventCallback, - timeoutMS int) error { +func (c *Client) getRoundResults(roundList []id.Round, + roundCallback RoundEventCallback, timeout time.Duration, sendResults chan ds.EventReturn, ) error { // Get the oldest round in the buffer networkInstance := c.network.GetInstance() - timeout := time.Duration(timeoutMS)*time.Millisecond /*check message delivery*/ - sendResults := make(chan ds.EventReturn, len(roundList)) roundEvents := c.GetRoundEvents() - rndEventObjs := make([]*ds.EventCallback, len(roundList)) + numResults := 0 // Generate a message historicalRequest := &pb.HistoricalRounds{ @@ -41,53 +51,79 @@ func (c *Client) RegisterMessageDelivery(roundList []id.Round, roundCallback Ro } - oldestRound := networkInstance.GetOldestRoundID() - for i, rnd := range roundList { + rounds := make(map[id.Round]RoundResult) + succeeded := true + + for _, rnd := range roundList { + rounds[rnd]=TimeOut roundInfo, err := networkInstance.GetRound(rnd) - if err != nil { + + if err==nil{ + if states.Round(roundInfo.State) == states.COMPLETED { + rounds[rnd] = Succeeded + }else if states.Round(roundInfo.State) == states.FAILED { + rounds[rnd] = Failed + succeeded = false + }else{ + roundEvents.AddRoundEventChan(rnd, sendResults, + timeout-time.Millisecond, states.COMPLETED, states.FAILED) + numResults++ + } + }else { jww.DEBUG.Printf("Failed to ger round [%d] in buffer: %v", rnd, err) // Update oldest round (buffer may have updated externally) - oldestRound = networkInstance.GetOldestRoundID() - } - - - if rnd < oldestRound { - // If round is older that oldest round in our buffer - // Add it to the historical round request (performed later) - historicalRequest.Rounds = append(historicalRequest.Rounds, uint64(rnd)) - } else { - // If the round is in the buffer, and done (completed or failed) send the results - // through the channel - if roundInfo != nil && (states.Round(roundInfo.State) == states.COMPLETED || - states.Round(roundInfo.State) == states.FAILED) { - sendResults <- ds.EventReturn{ - RoundInfo: roundInfo, - } - continue + oldestRound := networkInstance.GetOldestRoundID() + if rnd < oldestRound { + // If round is older that oldest round in our buffer + // Add it to the historical round request (performed later) + historicalRequest.Rounds = append(historicalRequest.Rounds, uint64(rnd)) + numResults++ + }else{ + roundEvents.AddRoundEventChan(rnd, sendResults, + timeout-time.Millisecond, states.COMPLETED, states.FAILED) + numResults++ } - - // If it is still in progress, create a monitoring channel - rndEventObjs[i] = roundEvents.AddRoundEventChan(rnd, sendResults, - timeout, states.COMPLETED, states.FAILED) - } } - - // Find out what happened to old (historical) rounds - historicalReport := make(chan ds.EventReturn, len(historicalRequest.Rounds)) - go c.getHistoricalRounds(historicalRequest, networkInstance, historicalReport) + //request historical rounds if any are needed + if len(historicalRequest.Rounds)>0{ + // Find out what happened to old (historical) rounds + go c.getHistoricalRounds(historicalRequest, networkInstance, sendResults) + } // Determine the success of all rounds requested go func() { - select { - case roundReport := <-sendResults: - ri := roundReport.RoundInfo - roundCallback.Report(int(ri.ID), int(ri.State), roundReport.TimedOut) - case roundReport := <- historicalReport: - ri := roundReport.RoundInfo - roundCallback.Report(int(ri.ID), int(ri.State), roundReport.TimedOut) - + //create the results timer + timer := time.NewTimer(timeout) + for { + //if we know about all rounds, return + if numResults==0{ + roundCallback.Report(succeeded, true, rounds) + return + } + + //wait for info about rounds or the timeout to occur + select { + case <- timer.C: + roundCallback.Report(false, true, rounds) + return + case roundReport := <-sendResults: + numResults-- + // skip if the round is nil (unknown from historical rounds) + // they default to timed out, so correct behavior is preserved + if roundReport.RoundInfo==nil || roundReport.TimedOut{ + succeeded = false + }else{ + //if available, denote the result + if states.Round(roundReport.RoundInfo.State) == states.COMPLETED{ + rounds[id.Round(roundReport.RoundInfo.ID)] = Succeeded + } else{ + rounds[id.Round(roundReport.RoundInfo.ID)] = Failed + succeeded = false + } + } + } } }() @@ -120,9 +156,9 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, // Process historical rounds, sending back to the caller thread for _, ri := range resp.Rounds { - sendResults <- ds.EventReturn{ - RoundInfo: ri, - TimedOut: false, + sendResults<- ds.EventReturn{ + ri, + false, } } } \ No newline at end of file -- GitLab