Skip to content
Snippets Groups Projects
Commit c0c0333b authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

added getroundresults to network package

parent fa75ccc5
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"gitlab.com/elixxir/client/network"
"runtime/pprof" "runtime/pprof"
"strings" "strings"
"sync" "sync"
...@@ -407,7 +408,7 @@ func (c *Client) RegisterRoundEventsHandler(rid int, cb RoundEventCallback, ...@@ -407,7 +408,7 @@ func (c *Client) RegisterRoundEventsHandler(rid int, cb RoundEventCallback,
func (c *Client) WaitForRoundCompletion(roundID int, func (c *Client) WaitForRoundCompletion(roundID int,
rec RoundCompletionCallback, timeoutMS int) error { rec RoundCompletionCallback, timeoutMS int) error {
f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]api.RoundResult) { f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]network.RoundLookupStatus) {
rec.EventCallback(roundID, allRoundsSucceeded, timedOut) rec.EventCallback(roundID, allRoundsSucceeded, timedOut)
} }
...@@ -442,7 +443,7 @@ func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte, ...@@ -442,7 +443,7 @@ func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte,
"unmarshal: %s", string(marshaledSendReport))) "unmarshal: %s", string(marshaledSendReport)))
} }
f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]api.RoundResult) { f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]network.RoundLookupStatus) {
results := make([]byte, len(sr.rl.list)) results := make([]byte, len(sr.rl.list))
jww.INFO.Printf("Processing WaitForMessageDelivery report "+ jww.INFO.Printf("Processing WaitForMessageDelivery report "+
"for %v, success: %v, timedout: %v", sr.mid, allRoundsSucceeded, "for %v, success: %v, timedout: %v", sr.mid, allRoundsSucceeded,
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"gitlab.com/elixxir/client/network"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
...@@ -377,7 +378,7 @@ var rootCmd = &cobra.Command{ ...@@ -377,7 +378,7 @@ var rootCmd = &cobra.Command{
// Construct the callback function which // Construct the callback function which
// verifies successful message send or retries // verifies successful message send or retries
f := func(allRoundsSucceeded, timedOut bool, f := func(allRoundsSucceeded, timedOut bool,
rounds map[id.Round]api.RoundResult) { rounds map[id.Round]network.RoundLookupStatus) {
printRoundResults(allRoundsSucceeded, timedOut, rounds, roundIDs, msg) printRoundResults(allRoundsSucceeded, timedOut, rounds, roundIDs, msg)
if !allRoundsSucceeded { if !allRoundsSucceeded {
retryChan <- struct{}{} retryChan <- struct{}{}
......
...@@ -4,8 +4,8 @@ import ( ...@@ -4,8 +4,8 @@ import (
"fmt" "fmt"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"github.com/spf13/viper" "github.com/spf13/viper"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/network"
backupCrypto "gitlab.com/elixxir/crypto/backup" backupCrypto "gitlab.com/elixxir/crypto/backup"
"gitlab.com/elixxir/crypto/contact" "gitlab.com/elixxir/crypto/contact"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
...@@ -47,7 +47,7 @@ func printChanRequest(requestor contact.Contact) { ...@@ -47,7 +47,7 @@ func printChanRequest(requestor contact.Contact) {
// Helper function which prints the round resuls // Helper function which prints the round resuls
func printRoundResults(allRoundsSucceeded, timedOut bool, func printRoundResults(allRoundsSucceeded, timedOut bool,
rounds map[id.Round]api.RoundResult, roundIDs []id.Round, msg message.Send) { rounds map[id.Round]network.RoundLookupStatus, roundIDs []id.Round, msg message.Send) {
// Done as string slices for easy and human readable printing // Done as string slices for easy and human readable printing
successfulRounds := make([]string, 0) successfulRounds := make([]string, 0)
...@@ -58,9 +58,9 @@ func printRoundResults(allRoundsSucceeded, timedOut bool, ...@@ -58,9 +58,9 @@ func printRoundResults(allRoundsSucceeded, timedOut bool,
// Group all round reports into a category based on their // Group all round reports into a category based on their
// result (successful, failed, or timed out) // result (successful, failed, or timed out)
if result, exists := rounds[r]; exists { if result, exists := rounds[r]; exists {
if result == api.Succeeded { if result == network.Succeeded {
successfulRounds = append(successfulRounds, strconv.Itoa(int(r))) successfulRounds = append(successfulRounds, strconv.Itoa(int(r)))
} else if result == api.Failed { } else if result == network.Failed {
failedRounds = append(failedRounds, strconv.Itoa(int(r))) failedRounds = append(failedRounds, strconv.Itoa(int(r)))
} else { } else {
timedOutRounds = append(timedOutRounds, strconv.Itoa(int(r))) timedOutRounds = append(timedOutRounds, strconv.Itoa(int(r)))
......
...@@ -96,7 +96,7 @@ type Handler interface { ...@@ -96,7 +96,7 @@ type Handler interface {
// sessions using the passed cryptographic data and per the parameters sent // sessions using the passed cryptographic data and per the parameters sent
AddPartner(partnerID *id.ID, partnerPubKey, myPrivKey *cyclic.Int, AddPartner(partnerID *id.ID, partnerPubKey, myPrivKey *cyclic.Int,
partnerSIDHPubKey *sidh.PublicKey, mySIDHPrivKey *sidh.PrivateKey, partnerSIDHPubKey *sidh.PublicKey, mySIDHPrivKey *sidh.PrivateKey,
sendParams, receiveParams session.Params) (*partner.Manager, error) sendParams, receiveParams session.Params, myID *id.ID, myPrivateKey *cyclic.Int, temporary bool) (*partner.Manager, error)
// GetPartner returns the partner per its ID, if it exists // GetPartner returns the partner per its ID, if it exists
GetPartner(partnerID *id.ID) (*partner.Manager, error) GetPartner(partnerID *id.ID) (*partner.Manager, error)
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
ftStorage "gitlab.com/elixxir/client/storage/fileTransfer" ftStorage "gitlab.com/elixxir/client/storage/fileTransfer"
...@@ -119,7 +120,7 @@ type Manager struct { ...@@ -119,7 +120,7 @@ type Manager struct {
// getRoundResultsFunc is a function that matches client.GetRoundResults. It is // getRoundResultsFunc is a function that matches client.GetRoundResults. It is
// used to pass in an alternative function for testing. // used to pass in an alternative function for testing.
type getRoundResultsFunc func(roundList []id.Round, timeout time.Duration, type getRoundResultsFunc func(roundList []id.Round, timeout time.Duration,
roundCallback api.RoundEventCallback) error roundCallback network.RoundEventCallback) error
// queuedPart contains the unique information identifying a file part. // queuedPart contains the unique information identifying a file part.
type queuedPart struct { type queuedPart struct {
......
...@@ -10,8 +10,8 @@ package fileTransfer ...@@ -10,8 +10,8 @@ package fileTransfer
import ( import (
"fmt" "fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/elixxir/ekv" "gitlab.com/elixxir/ekv"
...@@ -77,13 +77,13 @@ func TestManager_oldTransferRecovery(t *testing.T) { ...@@ -77,13 +77,13 @@ func TestManager_oldTransferRecovery(t *testing.T) {
// Returns an error on function and round failure on callback if sendErr is // Returns an error on function and round failure on callback if sendErr is
// set; otherwise, it reports round successes and returns nil // set; otherwise, it reports round successes and returns nil
rr := func(rIDs []id.Round, _ time.Duration, cb api.RoundEventCallback) error { rr := func(rIDs []id.Round, _ time.Duration, cb network.RoundEventCallback) error {
rounds := make(map[id.Round]api.RoundResult, len(rIDs)) rounds := make(map[id.Round]network.RoundLookupStatus, len(rIDs))
for _, rid := range rIDs { for _, rid := range rIDs {
if finishedRounds[rid] != nil { if finishedRounds[rid] != nil {
rounds[rid] = api.Succeeded rounds[rid] = network.Succeeded
} else { } else {
rounds[rid] = api.Failed rounds[rid] = network.Failed
} }
} }
cb(true, false, rounds) cb(true, false, rounds)
...@@ -240,13 +240,13 @@ func TestManager_updateSentRounds(t *testing.T) { ...@@ -240,13 +240,13 @@ func TestManager_updateSentRounds(t *testing.T) {
// Returns an error on function and round failure on callback if sendErr is // Returns an error on function and round failure on callback if sendErr is
// set; otherwise, it reports round successes and returns nil // set; otherwise, it reports round successes and returns nil
rr := func(rIDs []id.Round, _ time.Duration, cb api.RoundEventCallback) error { rr := func(rIDs []id.Round, _ time.Duration, cb network.RoundEventCallback) error {
rounds := make(map[id.Round]api.RoundResult, len(rIDs)) rounds := make(map[id.Round]network.RoundLookupStatus, len(rIDs))
for _, rid := range rIDs { for _, rid := range rIDs {
if finishedRounds[rid] != nil { if finishedRounds[rid] != nil {
rounds[rid] = api.Succeeded rounds[rid] = network.Succeeded
} else { } else {
rounds[rid] = api.Failed rounds[rid] = network.Failed
} }
} }
cb(true, false, rounds) cb(true, false, rounds)
...@@ -322,7 +322,7 @@ func TestManager_updateSentRounds_Error(t *testing.T) { ...@@ -322,7 +322,7 @@ func TestManager_updateSentRounds_Error(t *testing.T) {
// Returns an error on function and round failure on callback if sendErr is // Returns an error on function and round failure on callback if sendErr is
// set; otherwise, it reports round successes and returns nil // set; otherwise, it reports round successes and returns nil
m.getRoundResults = func( m.getRoundResults = func(
[]id.Round, time.Duration, api.RoundEventCallback) error { []id.Round, time.Duration, network.RoundEventCallback) error {
return errors.Errorf("GetRoundResults error") return errors.Errorf("GetRoundResults error")
} }
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"encoding/binary" "encoding/binary"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network" "gitlab.com/elixxir/client/network"
...@@ -419,11 +418,11 @@ func (m *Manager) newCmixMessage(transfer *ftStorage.SentTransfer, ...@@ -419,11 +418,11 @@ func (m *Manager) newCmixMessage(transfer *ftStorage.SentTransfer,
// fails, then each part for each transfer is removed from the in-progress list, // fails, then each part for each transfer is removed from the in-progress list,
// added to the end of the sending queue, and the callback called with an error. // added to the end of the sending queue, and the callback called with an error.
func (m *Manager) makeRoundEventCallback( func (m *Manager) makeRoundEventCallback(
sentRounds map[id.Round][]ftCrypto.TransferID) api.RoundEventCallback { sentRounds map[id.Round][]ftCrypto.TransferID) network.RoundEventCallback {
return func(allSucceeded, timedOut bool, rounds map[id.Round]api.RoundResult) { return func(allSucceeded, timedOut bool, rounds map[id.Round]network.RoundLookupStatus) {
for rid, roundResult := range rounds { for rid, roundResult := range rounds {
if roundResult == api.Succeeded { if roundResult == network.Succeeded {
// If the round succeeded, then set all parts for each transfer // If the round succeeded, then set all parts for each transfer
// for this round to finished and call the progress callback // for this round to finished and call the progress callback
for _, tid := range sentRounds[rid] { for _, tid := range sentRounds[rid] {
......
...@@ -12,10 +12,10 @@ import ( ...@@ -12,10 +12,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/cloudflare/circl/dh/sidh" "github.com/cloudflare/circl/dh/sidh"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
ftStorage "gitlab.com/elixxir/client/storage/fileTransfer" ftStorage "gitlab.com/elixxir/client/storage/fileTransfer"
util "gitlab.com/elixxir/client/storage/utility" util "gitlab.com/elixxir/client/storage/utility"
...@@ -382,7 +382,7 @@ func TestManager_sendParts_RoundResultsError(t *testing.T) { ...@@ -382,7 +382,7 @@ func TestManager_sendParts_RoundResultsError(t *testing.T) {
grrErr := errors.New("GetRoundResultsError") grrErr := errors.New("GetRoundResultsError")
m.getRoundResults = m.getRoundResults =
func([]id.Round, time.Duration, api.RoundEventCallback) error { func([]id.Round, time.Duration, network.RoundEventCallback) error {
return grrErr return grrErr
} }
...@@ -765,7 +765,7 @@ func TestManager_makeRoundEventCallback(t *testing.T) { ...@@ -765,7 +765,7 @@ func TestManager_makeRoundEventCallback(t *testing.T) {
roundEventCB := m.makeRoundEventCallback( roundEventCB := m.makeRoundEventCallback(
map[id.Round][]ftCrypto.TransferID{rid: {tid}}) map[id.Round][]ftCrypto.TransferID{rid: {tid}})
roundEventCB(true, false, map[id.Round]api.RoundResult{rid: api.Succeeded}) roundEventCB(true, false, map[id.Round]network.RoundLookupStatus{rid: network.Succeeded})
<-done1 <-done1
...@@ -853,7 +853,7 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) { ...@@ -853,7 +853,7 @@ func TestManager_makeRoundEventCallback_RoundFailure(t *testing.T) {
roundEventCB := m.makeRoundEventCallback( roundEventCB := m.makeRoundEventCallback(
map[id.Round][]ftCrypto.TransferID{rid: {tid}}) map[id.Round][]ftCrypto.TransferID{rid: {tid}})
roundEventCB(false, false, map[id.Round]api.RoundResult{rid: api.Failed}) roundEventCB(false, false, map[id.Round]network.RoundLookupStatus{rid: network.Failed})
<-done1 <-done1
......
...@@ -12,11 +12,11 @@ import ( ...@@ -12,11 +12,11 @@ import (
"encoding/binary" "encoding/binary"
"github.com/cloudflare/circl/dh/sidh" "github.com/cloudflare/circl/dh/sidh"
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/event"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
network2 "gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
...@@ -170,13 +170,13 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive, ...@@ -170,13 +170,13 @@ func newTestManager(sendErr bool, sendChan, sendE2eChan chan message.Receive,
// Returns an error on function and round failure on callback if sendErr is // Returns an error on function and round failure on callback if sendErr is
// set; otherwise, it reports round successes and returns nil // set; otherwise, it reports round successes and returns nil
rr := func(rIDs []id.Round, _ time.Duration, cb api.RoundEventCallback) error { rr := func(rIDs []id.Round, _ time.Duration, cb network2.RoundEventCallback) error {
rounds := make(map[id.Round]api.RoundResult, len(rIDs)) rounds := make(map[id.Round]network2.RoundLookupStatus, len(rIDs))
for _, rid := range rIDs { for _, rid := range rIDs {
if sendErr { if sendErr {
rounds[rid] = api.Failed rounds[rid] = network2.Failed
} else { } else {
rounds[rid] = api.Succeeded rounds[rid] = network2.Succeeded
} }
} }
cb(!sendErr, false, rounds) cb(!sendErr, false, rounds)
......
...@@ -150,7 +150,7 @@ type Manager interface { ...@@ -150,7 +150,7 @@ type Manager interface {
// A service may have a nil response unless it is default. In general a // A service may have a nil response unless it is default. In general a
// nil service is used to detect notifications when pickup is done by // nil service is used to detect notifications when pickup is done by
// fingerprints // fingerprints
AddService(AddService *id.ID, newService message.Service, AddService(clientID *id.ID, newService message.Service,
response message.Processor) response message.Processor)
// DeleteService deletes a message service. If only a single response is // DeleteService deletes a message service. If only a single response is
...@@ -214,12 +214,17 @@ type Manager interface { ...@@ -214,12 +214,17 @@ type Manager interface {
// with a given node. // with a given node.
TriggerNodeRegistration(nid *id.ID) TriggerNodeRegistration(nid *id.ID)
/* === Historical Rounds ================================================ */ /* === Rounds =========================================================== */
/* A complete set of round info is not kept on the client, and sometimes /* A complete set of round info is not kept on the client, and sometimes
the network will need to be queried to get round info. Historical rounds the network will need to be queried to get round info. Historical rounds
is the system internal to the Network Manager to do this. It can be used is the system internal to the Network Manager to do this. It can be used
externally as well. */ externally as well. */
// GetRoundResults adjudicates on the rounds requested. Checks if they are
// older rounds or in progress rounds.
GetRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback RoundEventCallback) error
// LookupHistoricalRound looks up the passed historical round on the network. // LookupHistoricalRound looks up the passed historical round on the network.
LookupHistoricalRound( LookupHistoricalRound(
rid id.Round, callback historical.RoundResultCallback) error rid id.Round, callback historical.RoundResultCallback) error
......
...@@ -5,30 +5,29 @@ ...@@ -5,30 +5,29 @@
// LICENSE file // // LICENSE file //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
package api package network
import ( import (
"fmt" "fmt"
"gitlab.com/elixxir/client/network/historical"
"time" "time"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
pb "gitlab.com/elixxir/comms/mixmessages"
ds "gitlab.com/elixxir/comms/network/dataStructures" ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// RoundResult is the enum of possible round results to pass back // RoundLookupStatus is the enum of possible round results to pass back
type RoundResult uint type RoundLookupStatus uint
const ( const (
TimeOut RoundResult = iota TimeOut RoundLookupStatus = iota
Failed Failed
Succeeded Succeeded
) )
func (rr RoundResult) String() string { func (rr RoundLookupStatus) String() string {
switch rr { switch rr {
case TimeOut: case TimeOut:
return "TimeOut" return "TimeOut"
...@@ -41,6 +40,16 @@ func (rr RoundResult) String() string { ...@@ -41,6 +40,16 @@ func (rr RoundResult) String() string {
} }
} }
type RoundResult struct {
Status RoundLookupStatus
Round historical.Round
}
type historicalRoundsRtn struct {
Success bool
Round historical.Round
}
// RoundEventCallback interface which reports the requested rounds. // RoundEventCallback interface which reports the requested rounds.
// Designed such that the caller may decide how much detail they need. // Designed such that the caller may decide how much detail they need.
// allRoundsSucceeded: // allRoundsSucceeded:
...@@ -53,41 +62,30 @@ func (rr RoundResult) String() string { ...@@ -53,41 +62,30 @@ func (rr RoundResult) String() string {
// their respective round results // their respective round results
type RoundEventCallback func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult) type RoundEventCallback func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult)
// Comm interface for RequestHistoricalRounds.
// Constructed for testability with getRoundResults
type historicalRoundsComm interface {
RequestHistoricalRounds(host *connect.Host,
message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
GetHost(hostId *id.ID) (*connect.Host, bool)
}
// GetRoundResults adjudicates on the rounds requested. Checks if they are // GetRoundResults adjudicates on the rounds requested. Checks if they are
// older rounds or in progress rounds. // older rounds or in progress rounds.
func (c *Client) GetRoundResults(roundList []id.Round, timeout time.Duration, func (m *manager) GetRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback RoundEventCallback) error { roundCallback RoundEventCallback) error {
jww.INFO.Printf("GetRoundResults(%v, %s)", roundList, timeout) jww.INFO.Printf("GetRoundResults(%v, %s)", roundList, timeout)
sendResults := make(chan ds.EventReturn, len(roundList)) sendResults := make(chan ds.EventReturn, len(roundList))
return c.getRoundResults(roundList, timeout, roundCallback, return m.getRoundResults(roundList, timeout, roundCallback,
sendResults, c.comms) sendResults)
} }
// Helper function which does all the logic for GetRoundResults // Helper function which does all the logic for GetRoundResults
func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, func (m *manager) getRoundResults(roundList []id.Round, timeout time.Duration,
roundCallback RoundEventCallback, sendResults chan ds.EventReturn, roundCallback RoundEventCallback, sendResults chan ds.EventReturn) error {
commsInterface historicalRoundsComm) error {
networkInstance := c.network.GetInstance() networkInstance := m.GetInstance()
// Generate a message to track all older rounds // Generate a message to track all older rounds
historicalRequest := &pb.HistoricalRounds{ historicalRequest := make([]id.Round, 0, len(roundList))
Rounds: []uint64{},
}
// Generate all tracking structures for rounds // Generate all tracking structures for rounds
roundEvents := c.GetRoundEvents() roundEvents := networkInstance.GetRoundEvents()
roundsResults := make(map[id.Round]RoundResult) roundsResults := make(map[id.Round]RoundResult)
allRoundsSucceeded := true allRoundsSucceeded := true
anyRoundTimedOut := false anyRoundTimedOut := false
...@@ -98,15 +96,23 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -98,15 +96,23 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
// Parse and adjudicate every round // Parse and adjudicate every round
for _, rnd := range roundList { for _, rnd := range roundList {
// Every round is timed out by default, until proven to have finished // Every round is timed out by default, until proven to have finished
roundsResults[rnd] = TimeOut roundsResults[rnd] = RoundResult{
Status: TimeOut,
}
roundInfo, err := networkInstance.GetRound(rnd) roundInfo, err := networkInstance.GetRound(rnd)
// If we have the round in the buffer // If we have the round in the buffer
if err == nil { if err == nil {
// Check if the round is done (completed or failed) or in progress // Check if the round is done (completed or failed) or in progress
if states.Round(roundInfo.State) == states.COMPLETED { if states.Round(roundInfo.State) == states.COMPLETED {
roundsResults[rnd] = Succeeded roundsResults[rnd] = RoundResult{
Status: Succeeded,
Round: historical.MakeRound(roundInfo),
}
} else if states.Round(roundInfo.State) == states.FAILED { } else if states.Round(roundInfo.State) == states.FAILED {
roundsResults[rnd] = Failed roundsResults[rnd] = RoundResult{
Status: Failed,
Round: historical.MakeRound(roundInfo),
}
allRoundsSucceeded = false allRoundsSucceeded = false
} else { } else {
// If in progress, add a channel monitoring its state // If in progress, add a channel monitoring its state
...@@ -119,7 +125,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -119,7 +125,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
if rnd < oldestRound { if rnd < oldestRound {
// If round is older that oldest round in our buffer // If round is older that oldest round in our buffer
// Add it to the historical round request (performed later) // Add it to the historical round request (performed later)
historicalRequest.Rounds = append(historicalRequest.Rounds, uint64(rnd)) historicalRequest = append(historicalRequest, rnd)
numResults++ numResults++
} else { } else {
// Otherwise, monitor its progress // Otherwise, monitor its progress
...@@ -129,10 +135,30 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -129,10 +135,30 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
} }
} }
} }
historicalRoundsCh := make(chan RoundResult, len(historicalRequest))
// Find out what happened to old (historical) rounds if any are needed // Find out what happened to old (historical) rounds if any are needed
if len(historicalRequest.Rounds) > 0 { if len(historicalRequest) > 0 {
go c.getHistoricalRounds(historicalRequest, sendResults, commsInterface) for _, rnd := range historicalRequest {
rrc := func(round historical.Round, success bool) {
var status RoundLookupStatus
if success {
status = Succeeded
} else {
status = TimeOut
}
historicalRoundsCh <- RoundResult{
Status: status,
Round: round,
}
}
err := m.Retriever.LookupHistoricalRound(rnd, rrc)
if err != nil {
historicalRoundsCh <- RoundResult{
Status: TimeOut,
}
}
}
} }
// Determine the results of all rounds requested // Determine the results of all rounds requested
...@@ -147,11 +173,16 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -147,11 +173,16 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
return return
} }
var result RoundResult
hasResult := false
// Wait for info about rounds or the timeout to occur // Wait for info about rounds or the timeout to occur
select { select {
case <-timer.C: case <-timer.C:
roundCallback(false, true, roundsResults) roundCallback(false, true, roundsResults)
return return
case result = <-historicalRoundsCh:
hasResult = true
case roundReport := <-sendResults: case roundReport := <-sendResults:
numResults-- numResults--
...@@ -162,11 +193,18 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -162,11 +193,18 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
roundInfo, err := networkInstance.GetRound(roundId) roundInfo, err := networkInstance.GetRound(roundId)
// If we have the round in the buffer // If we have the round in the buffer
if err == nil { if err == nil {
hasResult = true
// Check if the round is done (completed or failed) or in progress // Check if the round is done (completed or failed) or in progress
if states.Round(roundInfo.State) == states.COMPLETED { if states.Round(roundInfo.State) == states.COMPLETED {
roundsResults[roundId] = Succeeded result = RoundResult{
Status: Succeeded,
Round: historical.MakeRound(roundInfo),
}
} else if states.Round(roundInfo.State) == states.FAILED { } else if states.Round(roundInfo.State) == states.FAILED {
roundsResults[roundId] = Failed result = RoundResult{
Status: Failed,
Round: historical.MakeRound(roundInfo),
}
allRoundsSucceeded = false allRoundsSucceeded = false
} }
continue continue
...@@ -174,62 +212,28 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, ...@@ -174,62 +212,28 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration,
allRoundsSucceeded = false allRoundsSucceeded = false
anyRoundTimedOut = true anyRoundTimedOut = true
} else { } else {
hasResult = true
// If available, denote the result // If available, denote the result
if states.Round(roundReport.RoundInfo.State) == states.COMPLETED { if states.Round(roundReport.RoundInfo.State) == states.COMPLETED {
roundsResults[roundId] = Succeeded result = RoundResult{
} else { Status: Succeeded,
roundsResults[roundId] = Failed Round: historical.MakeRound(roundReport.RoundInfo),
allRoundsSucceeded = false
} }
} else {
result = RoundResult{
Status: Failed,
Round: historical.MakeRound(roundReport.RoundInfo),
} }
allRoundsSucceeded = false
} }
} }
}()
return nil
}
// Helper function which asynchronously pings a random gateway until
// it gets information on its requested historical rounds
func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds,
sendResults chan ds.EventReturn, comms historicalRoundsComm) {
var resp *pb.HistoricalRoundsResponse
//retry 5 times
for i := 0; i < 5; i++ {
// Find a gateway to request about the roundRequests
result, err := c.GetNetworkInterface().GetSender().SendToAny(func(host *connect.Host) (interface{}, error) {
return comms.RequestHistoricalRounds(host, msg)
}, nil)
// If an error, retry with (potentially) a different gw host.
// If no error from received gateway request, exit loop
// and process rounds
if err == nil {
resp = result.(*pb.HistoricalRoundsResponse)
break
} else {
jww.ERROR.Printf("Failed to lookup historical rounds: %s", err)
} }
if hasResult {
roundsResults[result.Round.ID] = result
} }
if resp == nil {
return
} }
}()
// Service historical rounds, sending back to the caller thread return nil
for i, ri := range resp.Rounds {
if ri == nil {
// Handle unknown by historical rounds
sendResults <- ds.EventReturn{
RoundInfo: &pb.RoundInfo{ID: msg.Rounds[i]},
TimedOut: true,
}
} else {
sendResults <- ds.EventReturn{
RoundInfo: ri,
}
}
}
} }
...@@ -4,9 +4,10 @@ ...@@ -4,9 +4,10 @@
// Use of this source code is governed by a license that can be found in the // // Use of this source code is governed by a license that can be found in the //
// LICENSE file // // LICENSE file //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
package api package network
import ( import (
"gitlab.com/elixxir/client/api"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
ds "gitlab.com/elixxir/comms/network/dataStructures" ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/primitives/states"
...@@ -38,21 +39,21 @@ func TestClient_GetRoundResults(t *testing.T) { ...@@ -38,21 +39,21 @@ func TestClient_GetRoundResults(t *testing.T) {
} }
// Create a new copy of the test client for this test // Create a new copy of the test client for this test
client, err := newTestingClient(t) client, err := api.newTestingClient(t)
if err != nil { if err != nil {
t.Fatalf("Failed in setup: %+v", err) t.Fatalf("Failed in setup: %+v", err)
} }
// Construct the round call back function signature // Construct the round call back function signature
var successfulRounds, timeout bool var successfulRounds, timeout bool
receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult) { receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundLookupStatus) {
successfulRounds = allRoundsSucceeded successfulRounds = allRoundsSucceeded
timeout = timedOut timeout = timedOut
} }
// Call the round results // Call the round results
err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond, err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond,
receivedRCB, sendResults, NewNoHistoricalRoundsComm()) receivedRCB, sendResults, api.NewNoHistoricalRoundsComm())
if err != nil { if err != nil {
t.Errorf("Error in happy path: %v", err) t.Errorf("Error in happy path: %v", err)
} }
...@@ -101,21 +102,21 @@ func TestClient_GetRoundResults_FailedRounds(t *testing.T) { ...@@ -101,21 +102,21 @@ func TestClient_GetRoundResults_FailedRounds(t *testing.T) {
} }
// Create a new copy of the test client for this test // Create a new copy of the test client for this test
client, err := newTestingClient(t) client, err := api.newTestingClient(t)
if err != nil { if err != nil {
t.Fatalf("Failed in setup: %v", err) t.Fatalf("Failed in setup: %v", err)
} }
// Construct the round call back function signature // Construct the round call back function signature
var successfulRounds, timeout bool var successfulRounds, timeout bool
receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult) { receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundLookupStatus) {
successfulRounds = allRoundsSucceeded successfulRounds = allRoundsSucceeded
timeout = timedOut timeout = timedOut
} }
// Call the round results // Call the round results
err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond, err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond,
receivedRCB, sendResults, NewNoHistoricalRoundsComm()) receivedRCB, sendResults, api.NewNoHistoricalRoundsComm())
if err != nil { if err != nil {
t.Errorf("Error in happy path: %v", err) t.Errorf("Error in happy path: %v", err)
} }
...@@ -144,8 +145,8 @@ func TestClient_GetRoundResults_HistoricalRounds(t *testing.T) { ...@@ -144,8 +145,8 @@ func TestClient_GetRoundResults_HistoricalRounds(t *testing.T) {
sendResults := make(chan ds.EventReturn, len(roundList)-2) sendResults := make(chan ds.EventReturn, len(roundList)-2)
for i := 0; i < numRounds; i++ { for i := 0; i < numRounds; i++ {
// Skip sending rounds intended for historical rounds comm // Skip sending rounds intended for historical rounds comm
if i == failedHistoricalRoundID || if i == api.failedHistoricalRoundID ||
i == completedHistoricalRoundID { i == api.completedHistoricalRoundID {
continue continue
} }
...@@ -159,15 +160,15 @@ func TestClient_GetRoundResults_HistoricalRounds(t *testing.T) { ...@@ -159,15 +160,15 @@ func TestClient_GetRoundResults_HistoricalRounds(t *testing.T) {
} }
// Create a new copy of the test client for this test // Create a new copy of the test client for this test
client, err := newTestingClient(t) client, err := api.newTestingClient(t)
if err != nil { if err != nil {
t.Fatalf("Failed in setup: %v", err) t.Fatalf("Failed in setup: %v", err)
} }
// Overpopulate the round buffer, ensuring a circle back of the ring buffer // Overpopulate the round buffer, ensuring a circle back of the ring buffer
for i := 1; i <= ds.RoundInfoBufLen+completedHistoricalRoundID+1; i++ { for i := 1; i <= ds.RoundInfoBufLen+api.completedHistoricalRoundID+1; i++ {
ri := &pb.RoundInfo{ID: uint64(i)} ri := &pb.RoundInfo{ID: uint64(i)}
if err = signRoundInfo(ri); err != nil { if err = api.signRoundInfo(ri); err != nil {
t.Errorf("Failed to sign round in set up: %v", err) t.Errorf("Failed to sign round in set up: %v", err)
} }
...@@ -180,14 +181,14 @@ func TestClient_GetRoundResults_HistoricalRounds(t *testing.T) { ...@@ -180,14 +181,14 @@ func TestClient_GetRoundResults_HistoricalRounds(t *testing.T) {
// Construct the round call back function signature // Construct the round call back function signature
var successfulRounds, timeout bool var successfulRounds, timeout bool
receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult) { receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundLookupStatus) {
successfulRounds = allRoundsSucceeded successfulRounds = allRoundsSucceeded
timeout = timedOut timeout = timedOut
} }
// Call the round results // Call the round results
err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond, err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond,
receivedRCB, sendResults, NewHistoricalRoundsComm()) receivedRCB, sendResults, api.NewHistoricalRoundsComm())
if err != nil { if err != nil {
t.Errorf("Error in happy path: %v", err) t.Errorf("Error in happy path: %v", err)
} }
...@@ -217,21 +218,21 @@ func TestClient_GetRoundResults_Timeout(t *testing.T) { ...@@ -217,21 +218,21 @@ func TestClient_GetRoundResults_Timeout(t *testing.T) {
sendResults = nil sendResults = nil
// Create a new copy of the test client for this test // Create a new copy of the test client for this test
client, err := newTestingClient(t) client, err := api.newTestingClient(t)
if err != nil { if err != nil {
t.Fatalf("Failed in setup: %v", err) t.Fatalf("Failed in setup: %v", err)
} }
// Construct the round call back function signature // Construct the round call back function signature
var successfulRounds, timeout bool var successfulRounds, timeout bool
receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundResult) { receivedRCB := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]RoundLookupStatus) {
successfulRounds = allRoundsSucceeded successfulRounds = allRoundsSucceeded
timeout = timedOut timeout = timedOut
} }
// Call the round results // Call the round results
err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond, err = client.getRoundResults(roundList, time.Duration(10)*time.Millisecond,
receivedRCB, sendResults, NewNoHistoricalRoundsComm()) receivedRCB, sendResults, api.NewNoHistoricalRoundsComm())
if err != nil { if err != nil {
t.Errorf("Error in happy path: %v", err) t.Errorf("Error in happy path: %v", err)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment