Skip to content
Snippets Groups Projects
Select Git revision
  • 9bd1a64e4bcca029bb2881c47286d1720f586411
  • main default protected
  • development
  • integration
  • v1.1.5
  • v1.1.4
  • v1.1.3
  • v1.1.2
  • v1.1.1
  • v1.1.0
  • v1.0.0
11 results

Bindings

Blame
  • results.go NaN GiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    package api
    
    import (
    	"fmt"
    	"time"
    
    	jww "github.com/spf13/jwalterweatherman"
    	"gitlab.com/elixxir/client/network/gateway"
    	pb "gitlab.com/elixxir/comms/mixmessages"
    	"gitlab.com/elixxir/comms/network"
    	ds "gitlab.com/elixxir/comms/network/dataStructures"
    	"gitlab.com/elixxir/primitives/states"
    	"gitlab.com/xx_network/comms/connect"
    	"gitlab.com/xx_network/primitives/id"
    )
    
    // Enum of possible round results to pass back
    type RoundResult uint
    
    const (
    	TimeOut RoundResult = iota
    	Failed
    	Succeeded
    )
    
    func (rr RoundResult) String() string {
    	switch rr {
    	case TimeOut:
    		return "TimeOut"
    	case Failed:
    		return "Failed"
    	case Succeeded:
    		return "Succeeded"
    	default:
    		return fmt.Sprintf("UNKNOWN RESULT: %d", rr)
    	}
    }
    
    // Callback interface which reports the requested rounds.
    // Designed such that the caller may decide how much detail they need.
    // allRoundsSucceeded:
    //   Returns false if any rounds in the round map were unsuccessful.
    //   Returns true if ALL rounds were successful
    // timedOut:
    //    Returns true if any of the rounds timed out while being monitored
    //	  Returns false if all rounds statuses were returned
    // rounds contains a mapping of all previously requested rounds to
    //   their respective round results
    type RoundEventCallback func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult)
    
    // Comm interface for RequestHistoricalRounds.
    // Constructed for testability with getRoundResults
    type historicalRoundsComm interface {
    	RequestHistoricalRounds(host *connect.Host,
    		message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
    	GetHost(hostId *id.ID) (*connect.Host, bool)
    }
    
    // Adjudicates on the rounds requested. Checks if they are
    // older rounds or in progress rounds.
    func (c *Client) GetRoundResults(roundList []id.Round, timeout time.Duration,
    	roundCallback RoundEventCallback) error {
    
    	sendResults := make(chan ds.EventReturn, len(roundList))
    
    	return c.getRoundResults(roundList, timeout, roundCallback,
    		sendResults, c.comms)
    }
    
    // Helper function which does all the logic for GetRoundResults
    func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
    	roundCallback RoundEventCallback, sendResults chan ds.EventReturn,
    	commsInterface historicalRoundsComm) error {
    
    	networkInstance := c.network.GetInstance()
    
    	// Generate a message to track all older rounds
    	historicalRequest := &pb.HistoricalRounds{
    		Rounds: []uint64{},
    	}
    
    	// Generate all tracking structures for rounds
    	roundEvents := c.GetRoundEvents()
    	roundsResults := make(map[id.Round]RoundResult)
    	allRoundsSucceeded := true
    	numResults := 0
    
    	// Parse and adjudicate every round
    	for _, rnd := range roundList {
    		// Every round is timed out by default, until proven to have finished
    		roundsResults[rnd] = TimeOut
    		roundInfo, err := networkInstance.GetRound(rnd)
    		// If we have the round in the buffer
    		if err == nil {
    			// Check if the round is done (completed or failed) or in progress
    			if states.Round(roundInfo.State) == states.COMPLETED {
    				roundsResults[rnd] = Succeeded
    			} else if states.Round(roundInfo.State) == states.FAILED {
    				roundsResults[rnd] = Failed
    				allRoundsSucceeded = false
    			} else {
    				// If in progress, add a channel monitoring its status
    				roundEvents.AddRoundEventChan(rnd, sendResults,
    					timeout-time.Millisecond, states.COMPLETED, states.FAILED)
    				numResults++
    			}
    		} else {
    			jww.DEBUG.Printf("Failed to ger round [%d] in buffer: %v", rnd, err)
    			// Update oldest round (buffer may have updated externally)
    			oldestRound := networkInstance.GetOldestRoundID()
    			if rnd < oldestRound {
    				// If round is older that oldest round in our buffer
    				// Add it to the historical round request (performed later)
    				historicalRequest.Rounds = append(historicalRequest.Rounds, uint64(rnd))
    				numResults++
    			} else {
    				// Otherwise, monitor it's progress
    				roundEvents.AddRoundEventChan(rnd, sendResults,
    					timeout-time.Millisecond, states.COMPLETED, states.FAILED)
    				numResults++
    			}
    		}
    	}
    
    	// Find out what happened to old (historical) rounds if any are needed
    	if len(historicalRequest.Rounds) > 0 {
    		go c.getHistoricalRounds(historicalRequest, networkInstance, sendResults, commsInterface)
    	}
    
    	// Determine the results of all rounds requested
    	go func() {
    		// Create the results timer
    		timer := time.NewTimer(timeout)
    		for {
    
    			// If we know about all rounds, return
    			if numResults == 0 {
    				roundCallback(allRoundsSucceeded, false, roundsResults)
    				return
    			}
    
    			// Wait for info about rounds or the timeout to occur
    			select {
    			case <-timer.C:
    				roundCallback(false, true, roundsResults)
    				return
    			case roundReport := <-sendResults:
    				numResults--
    				// Skip if the round is nil (unknown from historical rounds)
    				// they default to timed out, so correct behavior is preserved
    				if roundReport.RoundInfo == nil || roundReport.TimedOut {
    					allRoundsSucceeded = false
    				} else {
    					// If available, denote the result
    					roundId := id.Round(roundReport.RoundInfo.ID)
    					if states.Round(roundReport.RoundInfo.State) == states.COMPLETED {
    						roundsResults[roundId] = Succeeded
    					} else {
    						roundsResults[roundId] = Failed
    						allRoundsSucceeded = false
    
    					}
    				}
    			}
    		}
    	}()
    
    	return nil
    }
    
    // Helper function which asynchronously pings a random gateway until
    // it gets information on it's requested historical rounds
    func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds,
    	instance *network.Instance, sendResults chan ds.EventReturn,
    	comms historicalRoundsComm) {
    
    	var resp *pb.HistoricalRoundsResponse
    
    	for {
    		// Find a gateway to request about the roundRequests
    		gwHost, err := gateway.Get(instance.GetPartialNdf().Get(), comms, c.rng.GetStream())
    		if err != nil {
    			jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+
    				"data: %s", err)
    		}
    
    		// If an error, retry with (potentially) a different gw host.
    		// If no error from received gateway request, exit loop
    		// and process rounds
    		resp, err = comms.RequestHistoricalRounds(gwHost, msg)
    		if err == nil {
    			break
    		}
    	}
    
    	// Process historical rounds, sending back to the caller thread
    	for _, ri := range resp.Rounds {
    		sendResults <- ds.EventReturn{
    			RoundInfo: ri,
    		}
    	}
    }