Something went wrong on our end
-
Jonah Husson authoredJonah Husson authored
results.go 6.64 KiB
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package api
import (
"fmt"
"time"
jww "github.com/spf13/jwalterweatherman"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/network"
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
)
// Enum of possible round results to pass back
type RoundResult uint
const (
TimeOut RoundResult = iota
Failed
Succeeded
)
func (rr RoundResult) String() string {
switch rr {
case TimeOut:
return "TimeOut"
case Failed:
return "Failed"
case Succeeded:
return "Succeeded"
default:
return fmt.Sprintf("UNKNOWN RESULT: %d", rr)
}
}
// Callback interface which reports the requested rounds.
// Designed such that the caller may decide how much detail they need.
// allRoundsSucceeded:
// Returns false if any rounds in the round map were unsuccessful.
// Returns true if ALL rounds were successful
// timedOut:
// Returns true if any of the rounds timed out while being monitored
// Returns false if all rounds statuses were returned
// rounds contains a mapping of all previously requested rounds to
// their respective round results
type RoundEventCallback func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult)
// Comm interface for RequestHistoricalRounds.
// Constructed for testability with getRoundResults
type historicalRoundsComm interface {
RequestHistoricalRounds(host *connect.Host,
message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
GetHost(hostId *id.ID) (*connect.Host, bool)
}
// Adjudicates on the rounds requested. Checks if they are
// older rounds or in progress rounds.
func (c *Client) GetRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback RoundEventCallback) error {
jww.INFO.Printf("GetRoundResults(%v, %s)", roundList, timeout)
sendResults := make(chan ds.EventReturn, len(roundList))
return c.getRoundResults(roundList, timeout, roundCallback,
sendResults, c.comms)
}
// Helper function which does all the logic for GetRoundResults
func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback RoundEventCallback, sendResults chan ds.EventReturn,
commsInterface historicalRoundsComm) error {
networkInstance := c.network.GetInstance()
// Generate a message to track all older rounds
historicalRequest := &pb.HistoricalRounds{
Rounds: []uint64{},
}
// Generate all tracking structures for rounds
roundEvents := c.GetRoundEvents()
roundsResults := make(map[id.Round]RoundResult)
allRoundsSucceeded := true
numResults := 0
oldestRound := networkInstance.GetOldestRoundID()
// Parse and adjudicate every round
for _, rnd := range roundList {
// Every round is timed out by default, until proven to have finished
roundsResults[rnd] = TimeOut
roundInfo, err := networkInstance.GetRound(rnd)
// If we have the round in the buffer
if err == nil {
// Check if the round is done (completed or failed) or in progress
if states.Round(roundInfo.State) == states.COMPLETED {
roundsResults[rnd] = Succeeded
} else if states.Round(roundInfo.State) == states.FAILED {
roundsResults[rnd] = Failed
allRoundsSucceeded = false
} else {
// If in progress, add a channel monitoring its status
roundEvents.AddRoundEventChan(rnd, sendResults,
timeout-time.Millisecond, states.COMPLETED, states.FAILED)
numResults++
}
} else {
// Update oldest round (buffer may have updated externally)
if rnd < oldestRound {
// If round is older that oldest round in our buffer
// Add it to the historical round request (performed later)
historicalRequest.Rounds = append(historicalRequest.Rounds, uint64(rnd))
numResults++
} else {
// Otherwise, monitor it's progress
roundEvents.AddRoundEventChan(rnd, sendResults,
timeout-time.Millisecond, states.COMPLETED, states.FAILED)
numResults++
}
}
}
// Find out what happened to old (historical) rounds if any are needed
if len(historicalRequest.Rounds) > 0 {
go c.getHistoricalRounds(historicalRequest, networkInstance, sendResults, commsInterface)
}
// Determine the results of all rounds requested
go func() {
// Create the results timer
timer := time.NewTimer(timeout)
for {
// If we know about all rounds, return
if numResults == 0 {
roundCallback(allRoundsSucceeded, false, roundsResults)
return
}
// Wait for info about rounds or the timeout to occur
select {
case <-timer.C:
roundCallback(false, true, roundsResults)
return
case roundReport := <-sendResults:
numResults--
// Skip if the round is nil (unknown from historical rounds)
// they default to timed out, so correct behavior is preserved
if roundReport.RoundInfo == nil || roundReport.TimedOut {
allRoundsSucceeded = false
} else {
// If available, denote the result
roundId := id.Round(roundReport.RoundInfo.ID)
if states.Round(roundReport.RoundInfo.State) == states.COMPLETED {
roundsResults[roundId] = Succeeded
} else {
roundsResults[roundId] = Failed
allRoundsSucceeded = false
}
}
}
}
}()
return nil
}
// Helper function which asynchronously pings a random gateway until
// it gets information on it's requested historical rounds
func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds,
instance *network.Instance, sendResults chan ds.EventReturn, comms historicalRoundsComm) {
var resp *pb.HistoricalRoundsResponse
//retry 5 times
for i := 0; i < 5; i++ {
// Find a gateway to request about the roundRequests
result, err := c.GetNetworkInterface().GetSender().SendToAny(func(host *connect.Host) (interface{}, error) {
return comms.RequestHistoricalRounds(host, msg)
})
// If an error, retry with (potentially) a different gw host.
// If no error from received gateway request, exit loop
// and process rounds
if err == nil {
resp = result.(*pb.HistoricalRoundsResponse)
break
} else {
jww.ERROR.Printf("Failed to lookup historical rounds: %s", err)
}
}
if resp == nil {
return
}
// Process historical rounds, sending back to the caller thread
for _, ri := range resp.Rounds {
sendResults <- ds.EventReturn{
RoundInfo: ri,
}
}
}