diff --git a/api/client.go b/api/client.go index 8f316a986d98f1e50180197f1464752f3a60e011..ca5820965983a6afab51e4e210991252d60f067f 100644 --- a/api/client.go +++ b/api/client.go @@ -213,7 +213,7 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie //Attach the services interface c.services = newServiceProcessiesList(c.runner) - //initilize comms + // initialize comms err = c.initComms() if err != nil { return nil, err @@ -241,13 +241,7 @@ func Login(storageDir string, password []byte, parameters params.Network) (*Clie return nil, err } - //update gateway connections - err = c.network.GetInstance().UpdateGatewayConnections() - if err != nil { - return nil, err - } - - //initilize the auth tracker + // initialize the auth tracker c.auth = auth.NewManager(c.switchboard, c.storage, c.network) return c, nil @@ -304,13 +298,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, return nil, err } - //update gateway connections - err = c.network.GetInstance().UpdateGatewayConnections() - if err != nil { - return nil, err - } - - //initilize the auth tracker + // initialize the auth tracker c.auth = auth.NewManager(c.switchboard, c.storage, c.network) return c, nil @@ -533,13 +521,13 @@ func (c *Client) GetNodeRegistrationStatus() (int, int, error) { cmixStore := c.storage.Cmix() var numRegistered int - for i, n := range nodes{ + for i, n := range nodes { nid, err := id.Unmarshal(n.ID) - if err!=nil{ - return 0,0, errors.Errorf("Failed to unmarshal node ID %v " + + if err != nil { + return 0, 0, errors.Errorf("Failed to unmarshal node ID %v "+ "(#%d): %s", n.ID, i, err.Error()) } - if cmixStore.Has(nid){ + if cmixStore.Has(nid) { numRegistered++ } } diff --git a/api/results.go b/api/results.go index 675ba01c14edb9f80a5e4dda9461a9ee03ea282c..919ae478a1797e5cff8175efbe0025f15d8ec479 100644 --- a/api/results.go +++ b/api/results.go @@ -11,7 +11,6 @@ import ( "time" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/network/gateway" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/network" ds "gitlab.com/elixxir/comms/network/dataStructures" @@ -177,24 +176,21 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, // Helper function which asynchronously pings a random gateway until // it gets information on it's requested historical rounds func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, - instance *network.Instance, sendResults chan ds.EventReturn, - comms historicalRoundsComm) { + instance *network.Instance, sendResults chan ds.EventReturn, comms historicalRoundsComm) { var resp *pb.HistoricalRoundsResponse for { // Find a gateway to request about the roundRequests - gwHost, err := gateway.Get(instance.GetPartialNdf().Get(), c.comms, c.rng.GetStream()) - if err != nil { - jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ - "data: %s", err) - } + result, err := c.GetNetworkInterface().GetSender().SendToAny(1, func(host *connect.Host) (interface{}, error) { + return comms.RequestHistoricalRounds(host, msg) + }) // If an error, retry with (potentially) a different gw host. // If no error from received gateway request, exit loop // and process rounds - resp, err = comms.RequestHistoricalRounds(gwHost, msg) if err == nil { + resp = result.(*pb.HistoricalRoundsResponse) break } } diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go index da548c855e48a30a5fff0895842f04975e00c88b..1daa7d38104731870459b1323c8d918401a398fd 100644 --- a/interfaces/networkManager.go +++ b/interfaces/networkManager.go @@ -10,6 +10,7 @@ package interfaces import ( "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/e2e" @@ -24,6 +25,7 @@ type NetworkManager interface { SendCMIX(message format.Message, recipient *id.ID, p params.CMIX) (id.Round, ephemeral.Id, error) GetInstance() *network.Instance GetHealthTracker() HealthTracker + GetSender() *gateway.Sender Follow(report ClientErrorReport) (stoppable.Stoppable, error) CheckGarbledMessages() InProgressRegistrations() int diff --git a/network/follow.go b/network/follow.go index 3c05519620d7dce05140d7958f779ab8940f6ade..ba0302891fb8d145ab6d0cf8a50d88ed92d65e98 100644 --- a/network/follow.go +++ b/network/follow.go @@ -30,7 +30,6 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces" - "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/rounds" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/primitives/knownRounds" @@ -83,14 +82,6 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, m.tracker.Track(identity.EphId, identity.Source) - //randomly select a gateway to poll - //TODO: make this more intelligent - gwHost, err := gateway.Get(m.Instance.GetPartialNdf().Get(), comms, rng) - if err != nil { - jww.FATAL.Panicf("Failed to follow network, NDF has corrupt "+ - "data: %s", err) - } - // Get client version for poll version := m.Session.GetClientVersion() @@ -105,20 +96,26 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, EndTimestamp: identity.EndRequest.UnixNano(), ClientVersion: []byte(version.String()), } - jww.TRACE.Printf("Executing poll for %v(%s) range: %s-%s(%s) from %s", - identity.EphId.Int64(), identity.Source, identity.StartRequest, - identity.EndRequest, identity.EndRequest.Sub(identity.StartRequest), gwHost.GetId()) - pollResp, err := comms.SendPoll(gwHost, &pollReq) + result, err := m.GetSender().SendToAny(1, func(host *connect.Host) (interface{}, error) { + jww.TRACE.Printf("Executing poll for %v(%s) range: %s-%s(%s) from %s", + identity.EphId.Int64(), identity.Source, identity.StartRequest, + identity.EndRequest, identity.EndRequest.Sub(identity.StartRequest), host.GetId()) + result, err := comms.SendPoll(host, &pollReq) + if err != nil { + report( + "NetworkFollower", + fmt.Sprintf("Failed to poll network, \"%s\", Gateway: %s", err.Error(), host.String()), + fmt.Sprintf("%+v", err), + ) + jww.ERROR.Printf("Unable to poll %s for NDF: %+v", host, err) + } + return result, err + }) if err != nil { - report( - "NetworkFollower", - fmt.Sprintf("Failed to poll network, \"%s\", Gateway: %s", err.Error(), gwHost.String()), - fmt.Sprintf("%+v", err), - ) - jww.ERROR.Printf("Unable to poll %s for NDF: %+v", gwHost, err) return } + pollResp := result.(*pb.GatewayPollResponse) // ---- Process Network State Update Data ---- gwRoundsState := &knownRounds.KnownRounds{} @@ -138,11 +135,8 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, return } - err = m.Instance.UpdateGatewayConnections() - if err != nil { - jww.ERROR.Printf("Unable to update gateway connections: %+v", err) - return - } + // update gateway connections + m.GetSender().UpdateNdf(m.GetInstance().GetPartialNdf().Get()) } //check that the stored address space is correct @@ -172,6 +166,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, if bytes.Equal(clientErr.ClientId, m.Session.GetUser().TransmissionID.Marshal()) { // Obtain relevant NodeGateway information + // TODO ??? nGw, err := m.Instance.GetNodeAndGateway(gwHost.GetId()) if err != nil { jww.ERROR.Printf("Unable to get NodeGateway: %+v", err) diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index e8ab9f17102eaca390a2e013fe31f7495cdc8e54..b986ec06eb4fa5bc7d3734b570926da57509b6da 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -17,9 +17,7 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" - "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/network" - "gitlab.com/elixxir/crypto/shuffle" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" @@ -56,29 +54,29 @@ type HostPool struct { // Allows configuration of HostPool parameters type PoolParams struct { - poolSize uint32 // Quantity of Hosts in the HostPool - errThreshold uint64 // How many errors will cause a Host to be ejected from the HostPool - pruneInterval time.Duration // How frequently the HostPool updates the pool - hostParams connect.HostParams // Parameters for the creation of new Host objects + PoolSize uint32 // Quantity of Hosts in the HostPool + ErrThreshold uint64 // How many errors will cause a Host to be ejected from the HostPool + PruneInterval time.Duration // How frequently the HostPool updates the pool + HostParams connect.HostParams // Parameters for the creation of new Host objects } // Returns a default set of PoolParams func DefaultPoolParams() PoolParams { return PoolParams{ - poolSize: 30, - errThreshold: 1, - pruneInterval: 10 * time.Second, - hostParams: connect.GetDefaultHostParams(), + PoolSize: 30, + ErrThreshold: 1, + PruneInterval: 10 * time.Second, + HostParams: connect.GetDefaultHostParams(), } } // Build and return new HostPool object -func NewHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinition, getter HostManager, +func newHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinition, getter HostManager, storage *storage.Session, addGateway chan network.NodeGateway) (*HostPool, error) { result := &HostPool{ manager: getter, hostMap: make(map[id.ID]uint32), - hostList: make([]*connect.Host, poolParams.poolSize), + hostList: make([]*connect.Host, poolParams.PoolSize), poolParams: poolParams, ndf: ndf, rng: rng, @@ -92,12 +90,11 @@ func NewHostPool(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinitio return nil, err } - // Convert the NDF into a map object for future comparison - result.ndfMap, err = convertNdfToMap(ndf) - if err != nil { - return nil, err - } - return result, nil + // Convert the NDF into an empty map object in order to allow updateConns + result.ndfMap, _ = convertNdfToMap(nil) + + // Propagate the NDF and return + return result, result.updateConns() } // Mutates internal ndf to the given ndf @@ -111,13 +108,13 @@ func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { // Obtain a random, unique list of Hosts of the given length from the HostPool func (h *HostPool) GetAny(length int) []*connect.Host { checked := make(map[uint32]interface{}) // Keep track of Hosts already selected to avoid duplicates - if length > int(h.poolParams.poolSize) { - length = int(h.poolParams.poolSize) + if length > int(h.poolParams.PoolSize) { + length = int(h.poolParams.PoolSize) } result := make([]*connect.Host, length) h.hostMux.RLock() for i := 0; i < length; { - gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) + gwIdx := readRangeUint32(0, h.poolParams.PoolSize, h.rng) if _, ok := checked[gwIdx]; !ok { result[i] = h.hostList[gwIdx] checked[gwIdx] = nil @@ -139,8 +136,8 @@ func (h *HostPool) GetSpecific(target *id.ID) (*connect.Host, bool) { func (h *HostPool) GetPreferred(targets []*id.ID) []*connect.Host { checked := make(map[uint32]interface{}) // Keep track of Hosts already selected to avoid duplicates length := len(targets) - if length > int(h.poolParams.poolSize) { - length = int(h.poolParams.poolSize) + if length > int(h.poolParams.PoolSize) { + length = int(h.poolParams.PoolSize) } result := make([]*connect.Host, length) @@ -152,7 +149,7 @@ func (h *HostPool) GetPreferred(targets []*id.ID) []*connect.Host { continue } - gwIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) + gwIdx := readRangeUint32(0, h.poolParams.PoolSize, h.rng) if _, ok := checked[gwIdx]; !ok { result[i] = h.hostList[gwIdx] checked[gwIdx] = nil @@ -174,7 +171,7 @@ func (h *HostPool) StartHostPool() stoppable.Stoppable { // Long-running thread that manages the HostPool on a timer func (h *HostPool) manageHostPool(stopper *stoppable.Single) { - tick := time.Tick(h.poolParams.pruneInterval) + tick := time.Tick(h.poolParams.PruneInterval) for { select { case <-stopper.Quit(): @@ -205,14 +202,14 @@ func (h *HostPool) manageHostPool(stopper *stoppable.Single) { func (h *HostPool) pruneHostPool() error { // Verify the NDF has at least as many Gateways as needed for the HostPool ndfLen := uint32(len(h.ndf.Gateways)) - if ndfLen == 0 || ndfLen < h.poolParams.poolSize { + if ndfLen == 0 || ndfLen < h.poolParams.PoolSize { return errors.Errorf("no gateways available") } - for poolIdx := uint32(0); poolIdx < h.poolParams.poolSize; { + for poolIdx := uint32(0); poolIdx < h.poolParams.PoolSize; { host := h.hostList[poolIdx] // Check the Host for errors - if host == nil || host.GetMetrics().GetErrorCounter() >= h.poolParams.errThreshold { + if host == nil || host.GetMetrics().GetErrorCounter() >= h.poolParams.ErrThreshold { // If errors occurred, randomly select a new Gw by index in the NDF ndfIdx := readRangeUint32(0, uint32(len(h.ndf.Gateways)), h.rng) @@ -277,7 +274,7 @@ func (h *HostPool) ForceAdd(gwIds []*id.ID) error { } // Randomly select another Gateway in the HostPool for replacement - poolIdx := readRangeUint32(0, h.poolParams.poolSize, h.rng) + poolIdx := readRangeUint32(0, h.poolParams.PoolSize, h.rng) if _, ok := checked[poolIdx]; !ok { err := h.replaceHost(gwIds[i], poolIdx) if err != nil { @@ -322,6 +319,10 @@ func (h *HostPool) updateConns() error { // Takes ndf.Gateways and puts their IDs into a map object func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[id.ID]int, error) { result := make(map[id.ID]int) + if ndf == nil { + return result, nil + } + // Process gateway Id's into set for i, gw := range ndf.Gateways { gwId, err := id.Unmarshal(gw.ID) @@ -359,7 +360,7 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { } // Add the new gateway host - _, err := h.manager.AddHost(gwId, gw.Address, []byte(gw.TlsCertificate), h.poolParams.hostParams) + _, err := h.manager.AddHost(gwId, gw.Address, []byte(gw.TlsCertificate), h.poolParams.HostParams) if err != nil { jww.ERROR.Printf("Could not add gateway host %s: %+v", gwId, err) } @@ -383,65 +384,6 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { } } -// Get the Host of a random gateway in the NDF -func Get(ndf *ndf.NetworkDefinition, hg HostManager, rng io.Reader) (*connect.Host, error) { - gwLen := uint32(len(ndf.Gateways)) - if gwLen == 0 { - return nil, errors.Errorf("no gateways available") - } - - gwIdx := readRangeUint32(0, gwLen, rng) - gwID, err := id.Unmarshal(ndf.Nodes[gwIdx].ID) - if err != nil { - return nil, errors.WithMessage(err, "failed to get Gateway") - } - gwID.SetType(id.Gateway) - gwHost, ok := hg.GetHost(gwID) - if !ok { - return nil, errors.Errorf("host for gateway %s could not be "+ - "retrieved", gwID) - } - return gwHost, nil -} - -// GetAllShuffled returns a shuffled list of gateway hosts from the specified round -func GetAllShuffled(hg HostManager, ri *mixmessages.RoundInfo) ([]*connect.Host, error) { - roundTop := ri.GetTopology() - hosts := make([]*connect.Host, 0) - shuffledList := make([]uint64, 0) - - // Collect all host information from the round - for index := range roundTop { - selectedId, err := id.Unmarshal(roundTop[index]) - if err != nil { - return nil, err - } - - selectedId.SetType(id.Gateway) - - gwHost, ok := hg.GetHost(selectedId) - if !ok { - return nil, errors.Errorf("Could not find host for gateway %s", selectedId) - } - hosts = append(hosts, gwHost) - shuffledList = append(shuffledList, uint64(index)) - } - - returnHosts := make([]*connect.Host, len(hosts)) - - // Shuffle a list corresponding to the valid gateway hosts - shuffle.Shuffle(&shuffledList) - - // Index through the shuffled list, building a list - // of shuffled gateways from the round - for index, shuffledIndex := range shuffledList { - returnHosts[index] = hosts[shuffledIndex] - } - - return returnHosts, nil - -} - // readUint32 reads an integer from an io.Reader (which should be a CSPRNG) func readUint32(rng io.Reader) uint32 { var rndBytes [4]byte diff --git a/network/gateway/hostpool_test.go b/network/gateway/hostpool_test.go index 83be2608fa2226140e2dbc1bb5453d1a7608c42d..06a404e66f54a27fc42c1b745d2030b967b719e4 100644 --- a/network/gateway/hostpool_test.go +++ b/network/gateway/hostpool_test.go @@ -28,7 +28,7 @@ func TestNewHostPool(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -47,7 +47,7 @@ func TestNewHostPool(t *testing.T) { } // Call the constructor - _, err := NewHostPool(params, rng, testNdf, manager, + _, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -64,8 +64,8 @@ func TestHostPool_ManageHostPool(t *testing.T) { // Construct custom params params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) - params.pruneInterval = 1 * time.Second + params.PoolSize = uint32(len(testNdf.Gateways)) + params.PruneInterval = 1 * time.Second // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -84,7 +84,7 @@ func TestHostPool_ManageHostPool(t *testing.T) { } // Call the constructor - testPool, err := NewHostPool(params, rng, testNdf, manager, + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -263,7 +263,7 @@ func TestHostPool_PruneHostPool(t *testing.T) { testNdf := getTestNdf(t) newIndex := uint32(20) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) rng := csprng.NewSystemRNG() // Construct a manager (bypass business logic in constructor) @@ -297,7 +297,7 @@ func TestHostPool_PruneHostPool(t *testing.T) { // Construct a host past the error threshold errorThresholdIndex := 0 - overThreshold := params.errThreshold + 25 + overThreshold := params.ErrThreshold + 25 hostList[errorThresholdIndex].SetMetricsTesting(connect.NewMetricTesting(overThreshold, t), t) oldHost := hostList[0] @@ -334,7 +334,7 @@ func TestHostPool_PruneHostPool_Error(t *testing.T) { params := DefaultPoolParams() // Trigger the case where the Ndf doesn't have enough gateways - params.poolSize = uint32(len(testNdf.Gateways)) + 1 + params.PoolSize = uint32(len(testNdf.Gateways)) + 1 rng := csprng.NewSystemRNG() // Construct a manager (bypass business logic in constructor) @@ -399,7 +399,7 @@ func TestHostPool_GetPreferred(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager hostMap := make(map[id.ID]bool, 0) @@ -423,7 +423,7 @@ func TestHostPool_GetPreferred(t *testing.T) { } // Call the constructor - testPool, err := NewHostPool(params, rng, testNdf, manager, + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -471,7 +471,7 @@ func TestHostPool_GetAny(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -490,7 +490,7 @@ func TestHostPool_GetAny(t *testing.T) { } // Call the constructor - testPool, err := NewHostPool(params, rng, testNdf, manager, + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -528,7 +528,7 @@ func TestHostPool_ForceAdd(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -547,7 +547,7 @@ func TestHostPool_ForceAdd(t *testing.T) { } // Call the constructor - testPool, err := NewHostPool(params, rng, testNdf, manager, + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -589,7 +589,7 @@ func TestHostPool_UpdateConns_AddGateways(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -608,7 +608,7 @@ func TestHostPool_UpdateConns_AddGateways(t *testing.T) { } // Call the constructor - testPool, err := NewHostPool(params, rng, testNdf, manager, + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -664,7 +664,7 @@ func TestHostPool_UpdateConns_RemoveGateways(t *testing.T) { testStorage := storage.InitTestingSession(t) addGwChan := make(chan network.NodeGateway) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from ndf into host manager for _, gw := range testNdf.Gateways { @@ -683,7 +683,7 @@ func TestHostPool_UpdateConns_RemoveGateways(t *testing.T) { } // Call the constructor - testPool, err := NewHostPool(params, rng, testNdf, manager, + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) if err != nil { t.Errorf("Failed to create mock host pool: %v", err) @@ -735,7 +735,7 @@ func TestHostPool_AddGateway(t *testing.T) { testNdf := getTestNdf(t) newIndex := uint32(20) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Construct a manager (bypass business logic in constructor) hostPool := &HostPool{ @@ -768,7 +768,7 @@ func TestHostPool_RemoveGateway(t *testing.T) { testNdf := getTestNdf(t) newIndex := uint32(20) params := DefaultPoolParams() - params.poolSize = uint32(len(testNdf.Gateways)) + params.PoolSize = uint32(len(testNdf.Gateways)) // Construct a manager (bypass business logic in constructor) hostPool := &HostPool{ @@ -788,7 +788,7 @@ func TestHostPool_RemoveGateway(t *testing.T) { } // Add the new gateway host - h, err := hostPool.manager.AddHost(gwId, "", nil, params.hostParams) + h, err := hostPool.manager.AddHost(gwId, "", nil, params.HostParams) if err != nil { jww.ERROR.Printf("Could not add gateway host %s: %+v", gwId, err) } diff --git a/network/gateway/send.go b/network/gateway/sender.go similarity index 52% rename from network/gateway/send.go rename to network/gateway/sender.go index 9a6511c97f44600c90710c7e3b4033aa694517f0..2ae6020fee6be515d612866a949e5a00887594c5 100644 --- a/network/gateway/send.go +++ b/network/gateway/sender.go @@ -10,25 +10,42 @@ package gateway import ( "github.com/pkg/errors" + "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/comms/network" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" + "io" ) // Object used for sending that wraps the HostPool for providing destinations -type Mesh struct { - HostPool +type Sender struct { + *HostPool +} + +// Create a new Sender object wrapping a HostPool object +func NewSender(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinition, getter HostManager, + storage *storage.Session, addGateway chan network.NodeGateway) (*Sender, error) { + + hostPool, err := newHostPool(poolParams, rng, ndf, getter, storage, addGateway) + if err != nil { + return nil, err + } + return &Sender{hostPool}, nil } // Call given sendFunc to a specific Host in the HostPool, // attempting with up to numProxies destinations in case of failure -func (m *Mesh) SendToSpecific(target *id.ID, numProxies int, +func (m *Sender) SendToSpecific(targets []*id.ID, numProxies int, sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) { - host, ok := m.GetSpecific(target) + for _, target := range targets { + host, ok := m.GetSpecific(target) - if ok { - result, err := sendFunc(host) - if err == nil { - return result, m.ForceAdd([]*id.ID{host.GetId()}) + if ok { + result, err := sendFunc(host) + if err == nil { + return result, m.ForceAdd([]*id.ID{host.GetId()}) + } } } @@ -36,7 +53,7 @@ func (m *Mesh) SendToSpecific(target *id.ID, numProxies int, } // Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations -func (m *Mesh) SendToAny(numProxies int, +func (m *Sender) SendToAny(numProxies int, sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) { proxies := m.GetAny(numProxies) @@ -49,3 +66,18 @@ func (m *Mesh) SendToAny(numProxies int, return nil, errors.Errorf("Unable to send to any proxies") } + +// Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations +func (m *Sender) SendToPreferred(targets []*id.ID, + sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) { + + targetHosts := m.GetPreferred(targets) + for _, host := range targetHosts { + result, err := sendFunc(host) + if err == nil { + return result, nil + } + } + + return nil, errors.Errorf("Unable to send to any preferred") +} diff --git a/network/manager.go b/network/manager.go index 4693b0a2cfeeb851d53ee39942e64296b09037ea..aac414669337c7e51a88726ce6281b2bbd32749c 100644 --- a/network/manager.go +++ b/network/manager.go @@ -11,6 +11,7 @@ package network // and intraclient state are accessible through the context object. import ( + "gitlab.com/elixxir/client/network/gateway" "sync/atomic" "time" @@ -39,6 +40,8 @@ import ( type manager struct { // parameters of the network param params.Network + // + sender *gateway.Sender //Shared data with all sub managers internal.Internal @@ -91,13 +94,15 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, ReceptionID: session.User().GetCryptographicIdentity().GetReceptionID(), } - // register the node registration channel early so login connection updates - // get triggered for registration if necessary - instance.SetAddGatewayChan(m.NodeRegistration) + // Set up gateway.Sender + poolParams := gateway.DefaultPoolParams() + poolParams.HostParams.MaxRetries = 3 + m.sender, err = gateway.NewSender(poolParams, rng.GetStream(), + ndf, comms, session, m.NodeRegistration) //create sub managers - m.message = message.NewManager(m.Internal, m.param.Messages, m.NodeRegistration) - m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel()) + m.message = message.NewManager(m.Internal, m.param.Messages, m.NodeRegistration, m.sender) + m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel(), m.sender) return &m, nil } @@ -166,6 +171,11 @@ func (m *manager) GetInstance() *network.Instance { return m.Instance } +// GetSender returns the gateway.Sender object +func (m *manager) GetSender() *gateway.Sender { + return m.sender +} + // triggers a check on garbled messages to see if they can be decrypted // this should be done when a new e2e client is added in case messages were // received early or arrived out of order diff --git a/network/message/manager.go b/network/message/manager.go index 807ccc07066cf2281e557b6e07d09dd4180d3c15..7728910aa7e62186c682dcb97b297cb470dcac58 100644 --- a/network/message/manager.go +++ b/network/message/manager.go @@ -10,6 +10,7 @@ package message import ( "fmt" "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message/parse" "gitlab.com/elixxir/client/stoppable" @@ -21,6 +22,7 @@ type Manager struct { param params.Messages partitioner parse.Partitioner internal.Internal + sender *gateway.Sender messageReception chan Bundle nodeRegistration chan network.NodeGateway @@ -29,7 +31,7 @@ type Manager struct { } func NewManager(internal internal.Internal, param params.Messages, - nodeRegistration chan network.NodeGateway) *Manager { + nodeRegistration chan network.NodeGateway, sender *gateway.Sender) *Manager { dummyMessage := format.NewMessage(internal.Session.Cmix().GetGroup().GetP().ByteLen()) m := Manager{ param: param, @@ -38,6 +40,7 @@ func NewManager(internal internal.Internal, param params.Messages, networkIsHealthy: make(chan bool, 1), triggerGarbled: make(chan struct{}, 100), nodeRegistration: nodeRegistration, + sender: sender, } m.Internal = internal return &m diff --git a/network/rounds/historical.go b/network/rounds/historical.go index 1a0f5c100697044ebdbdaf7975b5019e87be0356..1714296cfa20e9b6a4b6cb89eeffb4803d237e1d 100644 --- a/network/rounds/historical.go +++ b/network/rounds/historical.go @@ -9,7 +9,6 @@ package rounds import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/storage/reception" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/comms/connect" @@ -87,13 +86,6 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c continue } - //find a gateway to request about the roundRequests - gwHost, err := gateway.Get(m.Instance.GetPartialNdf().Get(), comm, rng) - if err != nil { - jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ - "data: %s", err) - } - rounds := make([]uint64, len(roundRequests)) for i, rr := range roundRequests { rounds[i] = uint64(rr.rid) @@ -104,18 +96,21 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c Rounds: rounds, } - jww.DEBUG.Printf("Requesting Historical rounds %v from "+ - "gateway %s", rounds, gwHost.GetId()) + result, err := m.sender.SendToAny(1, func(host *connect.Host) (interface{}, error) { + jww.DEBUG.Printf("Requesting Historical rounds %v from "+ + "gateway %s", rounds, host.GetId()) + return comm.RequestHistoricalRounds(host, hr) + }) - response, err := comm.RequestHistoricalRounds(gwHost, hr) if err != nil { jww.ERROR.Printf("Failed to request historical roundRequests "+ - "data for rounds %v: %s", rounds, response) + "data for rounds %v: %s", rounds, err) // if the check fails to resolve, break the loop and so they will be // checked again timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C continue } + response := result.(*pb.HistoricalRoundsResponse) // process the returned historical roundRequests. for i, roundInfo := range response.Rounds { diff --git a/network/rounds/manager.go b/network/rounds/manager.go index c2a3a37b2acf611d73d7e190525bf4d25fd84620..8b6f59f7e372249dfdbb5b1ee6a61d966d6b25e8 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -10,6 +10,7 @@ package rounds import ( "fmt" "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/stoppable" @@ -23,6 +24,7 @@ type Manager struct { p *processing internal.Internal + sender *gateway.Sender historicalRounds chan historicalRoundRequest lookupRoundMessages chan roundLookup @@ -30,7 +32,7 @@ type Manager struct { } func NewManager(internal internal.Internal, params params.Rounds, - bundles chan<- message.Bundle) *Manager { + bundles chan<- message.Bundle, sender *gateway.Sender) *Manager { m := &Manager{ params: params, p: newProcessingRounds(), @@ -38,6 +40,7 @@ func NewManager(internal internal.Internal, params params.Rounds, historicalRounds: make(chan historicalRoundRequest, params.HistoricalRoundsBufferLen), lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), messageBundles: bundles, + sender: sender, } m.Internal = internal diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 5269357776e9be37984cacfbe31a2208dda912d2..9b345ef00da69c821d798caba158074bc1daa498 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -10,7 +10,6 @@ package rounds import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage/reception" pb "gitlab.com/elixxir/comms/mixmessages" @@ -44,52 +43,42 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, done = true case rl := <-m.lookupRoundMessages: ri := rl.roundInfo - var bundle message.Bundle - // Get a shuffled list of gateways in the round - gwHosts, err := gateway.GetAllShuffled(comms, ri) - if err != nil { - jww.WARN.Printf("Failed to get gateway hosts from "+ - "round %v, not requesting from them", - ri.ID) - break + // Convert gateways in round to proper ID format + gwIds := make([]*id.ID, len(ri.Topology)) + for i, idBytes := range ri.Topology { + gwId, err := id.Unmarshal(idBytes) + if err != nil { + // TODO + } + gwIds[i] = gwId } - // Attempt to request messages for every gateway in the list. - // If we retrieve without error, then we exit. If we error, then - // we retry with the next gateway in the list until we exhaust the list - for i, gwHost := range gwHosts { + // Send to the gateways using backup proxies + result, err := m.sender.SendToSpecific(gwIds, 5, func(host *connect.Host) (interface{}, error) { // Attempt to request for this gateway - bundle, err = m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwHost) + result, err := m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, host) if err != nil { - - jww.WARN.Printf("Failed on gateway [%d/%d] to get messages for round %v", - i, len(gwHosts), ri.ID) - - // Retry for the next gateway in the list - continue + jww.WARN.Printf("Failed on gateway %s to get messages for round %v", + host.GetId().String(), ri.ID) } - - // If a non-error request, no longer retry - break - - } - gwIDs := make([]*id.ID, 0) - for _, gwHost := range gwHosts { - gwIDs = append(gwIDs, gwHost.GetId()) - } + return result, err + }) // After trying all gateways, if none returned we mark the round as a // failure and print out the last error if err != nil { m.p.Fail(id.Round(ri.ID), rl.identity.EphId, rl.identity.Source) jww.ERROR.Printf("Failed to get pickup round %d "+ - "from all gateways (%v): final gateway %s returned : %s", - id.Round(ri.ID), gwIDs, gwHosts[len(gwHosts)-1].GetId(), err) - } else if len(bundle.Messages) != 0 { - // If successful and there are messages, we send them to another thread - bundle.Identity = rl.identity - m.messageBundles <- bundle + "from all gateways (%v): %s", + id.Round(ri.ID), gwIds, err) + } else if result != nil { + bundle := result.(message.Bundle) + if len(bundle.Messages) != 0 { + // If successful and there are messages, we send them to another thread + bundle.Identity = rl.identity + m.messageBundles <- bundle + } } } }