diff --git a/cmd/version.go b/cmd/version.go index 238832206010e9325bb466f3576495ab7dfcd685..a53283f7672280700467c2a706b50dfb51ffb4b1 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -18,7 +18,7 @@ import ( ) // Change this value to set the version for this build -const currentVersion = "4.4.2" +const currentVersion = "4.4.3" func Version() string { out := fmt.Sprintf("Elixxir Client v%s -- %s\n\n", xxdk.SEMVER, diff --git a/cmix/client.go b/cmix/client.go index d835a119b88ad8d66ad2e71ee3885ede079a1d03..45fff4cd7038acefdae07895af09c15d28f3c497 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -186,9 +186,8 @@ func (c *client) initialize(ndfile *ndf.NetworkDefinition) error { // Enable optimized HostPool initialization poolParams.MaxPings = 50 - poolParams.ForceConnection = true sender, err := gateway.NewSender(poolParams, c.rng, ndfile, c.comms, - c.session, nodeChan) + c.session, c.comms, nodeChan) if err != nil { return err } @@ -288,6 +287,9 @@ func (c *client) Follow(report ClientErrorReport) (stoppable.Stoppable, error) { //Start the critical processing thread multi.Add(c.crit.startProcessies()) + //start the host pool thread + multi.Add(c.Sender.StartProcesses()) + return multi, nil } diff --git a/cmix/gateway/certChecker.go b/cmix/gateway/certChecker.go new file mode 100644 index 0000000000000000000000000000000000000000..9b8a2e49ccf214ea2c5988646af70bddb28e73f2 --- /dev/null +++ b/cmix/gateway/certChecker.go @@ -0,0 +1,136 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + "bytes" + "crypto" + "crypto/sha256" + "fmt" + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/storage/versioned" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/crypto/hash" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/crypto/signature/rsa" + "gitlab.com/xx_network/primitives/id" + "time" +) + +const ( + certCheckerPrefix = "GwCertChecker" + keyTemplate = "GatewayCertificate-%s" + certCheckerStorageVer = uint64(1) +) + +// CertCheckerCommInterface is an interface for client comms to be used in cert checker +type CertCheckerCommInterface interface { + GetGatewayTLSCertificate(host *connect.Host, + message *pb.RequestGatewayCert) (*pb.GatewayCertificate, error) +} + +// certChecker stores verified certificates and handles verification checking +type certChecker struct { + kv *versioned.KV + comms CertCheckerCommInterface +} + +// newCertChecker initializes a certChecker object +func newCertChecker(comms CertCheckerCommInterface, kv *versioned.KV) *certChecker { + return &certChecker{ + kv: kv.Prefix(certCheckerPrefix), + comms: comms, + } +} + +// CheckRemoteCertificate attempts to verify the tls certificate for a given host +func (cc *certChecker) CheckRemoteCertificate(gwHost *connect.Host) error { + if !gwHost.IsWeb() { + jww.TRACE.Printf("remote certificate verification is only " + + "implemented for web connections") + return nil + } + // Request signed certificate from the gateway + // NOTE: the remote certificate on the host is populated using the response + // after sending, so this must occur before getting the remote + // certificate from the host + gwTlsCertResp, err := cc.comms.GetGatewayTLSCertificate(gwHost, &pb.RequestGatewayCert{}) + if err != nil { + return err + } + remoteCertSignature := gwTlsCertResp.GetSignature() + declaredFingerprint := sha256.Sum256(gwTlsCertResp.GetCertificate()) + + // Get remote certificate used for connection from the host object + actualRemoteCert, err := gwHost.GetRemoteCertificate() + if err != nil { + return err + } + rawActualRemoteCert := actualRemoteCert.Raw + actualFingerprint := sha256.Sum256(rawActualRemoteCert) + + // If the fingerprints of the used & declared certs do not match, return an error + if actualFingerprint != declaredFingerprint { + return errors.Errorf("Declared & used remote certificates "+ + "do not match\n\tDeclared: %+v\n\tUsed: %+v\n", + declaredFingerprint, actualFingerprint) + } + + // Check if we have already verified this certificate for this host + storedFingerprint, err := cc.loadGatewayCertificateFingerprint(gwHost.GetId()) + if err == nil { + if bytes.Compare(storedFingerprint, actualFingerprint[:]) == 0 { + return nil + } + } + + // Verify received signature + err = verifyRemoteCertificate(rawActualRemoteCert, remoteCertSignature, gwHost) + if err != nil { + return err + } + + // Store checked certificate fingerprint + return cc.storeGatewayCertificateFingerprint(actualFingerprint[:], gwHost.GetId()) +} + +// verifyRemoteCertificate verifies the RSA signature of a gateway on its tls certificate +func verifyRemoteCertificate(cert, sig []byte, gwHost *connect.Host) error { + opts := rsa.NewDefaultOptions() + opts.Hash = crypto.SHA256 + h := opts.Hash.New() + h.Write(cert) + return rsa.Verify(gwHost.GetPubKey(), hash.CMixHash, h.Sum(nil), sig, rsa.NewDefaultOptions()) +} + +// loadGatewayCertificateFingerprint retrieves the stored certificate +// fingerprint for a given gateway, or returns an error if not found +func (cc *certChecker) loadGatewayCertificateFingerprint(id *id.ID) ([]byte, error) { + key := getKey(id) + obj, err := cc.kv.Get(key, certCheckerStorageVer) + if err != nil { + return nil, err + } + return obj.Data, err +} + +// storeGatewayCertificateFingerprint stores the certificate fingerprint for a given gateway +func (cc *certChecker) storeGatewayCertificateFingerprint(fingerprint []byte, id *id.ID) error { + key := getKey(id) + return cc.kv.Set(key, &versioned.Object{ + Version: certCheckerStorageVer, + Timestamp: time.Now(), + Data: fingerprint, + }) +} + +// getKey is a helper function to generate the key for a gateway certificate fingerprint +func getKey(id *id.ID) string { + return fmt.Sprintf(keyTemplate, id.String()) +} diff --git a/cmix/gateway/certChecker_test.go b/cmix/gateway/certChecker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e4782cafcbe4321c664a2954810b8d98f3c427cb --- /dev/null +++ b/cmix/gateway/certChecker_test.go @@ -0,0 +1,51 @@ +package gateway + +import ( + "bytes" + "gitlab.com/elixxir/client/v4/storage/versioned" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/comms/testkeys" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "golang.org/x/crypto/blake2b" + "testing" +) + +type mockCertCheckerComm struct { +} + +func (mccc *mockCertCheckerComm) GetGatewayTLSCertificate(host *connect.Host, + message *pb.RequestGatewayCert) (*pb.GatewayCertificate, error) { + return &pb.GatewayCertificate{}, nil +} + +// Test load & store functions for cert checker +func Test_certChecker_loadStore(t *testing.T) { + kv := versioned.NewKV(ekv.MakeMemstore()) + cc := newCertChecker(&mockCertCheckerComm{}, kv) + + gwCert := testkeys.LoadFromPath(testkeys.GetGatewayCertPath()) + gwID := id.NewIdFromString("testid01", id.Gateway, t) + + expectedFp := blake2b.Sum256(gwCert) + + fp, err := cc.loadGatewayCertificateFingerprint(gwID) + if err == nil || fp != nil { + t.Errorf("Should error & receive nil when nothing is in storage") + } + + err = cc.storeGatewayCertificateFingerprint(expectedFp[:], gwID) + if err != nil { + t.Fatal("Failed to store certificate") + } + + fp, err = cc.loadGatewayCertificateFingerprint(gwID) + if err != nil { + t.Fatalf("Failed to load certificate for gwID %s: %+v", gwID, err) + } + + if bytes.Compare(fp, expectedFp[:]) != 0 { + t.Errorf("Did not receive expected fingerprint after load\n\tExpected: %+v\n\tReceived: %+v\n", expectedFp, fp) + } +} diff --git a/cmix/gateway/delay.go b/cmix/gateway/delay.go new file mode 100644 index 0000000000000000000000000000000000000000..4df6d140662688dbef7a70b973f3aa86a269dae0 --- /dev/null +++ b/cmix/gateway/delay.go @@ -0,0 +1,96 @@ +package gateway + +import ( + "gitlab.com/xx_network/primitives/netTime" + "math" + "time" +) + +// piecewise table of delay to percent of the bucket that +// is full +var table = map[float64]time.Duration{ + 0: 0, + 0.1: 0, + 0.2: 0, + 0.3: 0, + 0.4: 100 * time.Millisecond, + 0.5: 500 * time.Millisecond, + 0.6: 2 * time.Second, + 0.7: 5 * time.Second, + 0.8: 8 * time.Second, + 0.9: 9 * time.Second, + 1.0: 10 * time.Second, + 1.1: 10 * time.Second, +} + +// getDelay computes the delay through linear +func getDelay(bucket float64, poolsize uint) time.Duration { + ratio := bucket / float64(poolsize) + + if ratio < 0 { + ratio = 0 + } + + if ratio > 1 { + ratio = 1 + } + + ratioFloor := math.Floor(ratio) + ratioCeil := math.Ceil(ratio) + + upperRatio := ratio - ratioFloor + lowerRatio := 1 - upperRatio + + bottom := time.Duration(float64(table[ratioFloor]) * lowerRatio) + top := time.Duration(float64(table[ratioCeil]) * upperRatio) + + return top + bottom +} + +// bucket is a leaky bucket implementation. +type bucket struct { + //time until the entire bucket is leaked + leakRate time.Duration + points float64 + lastEdit time.Time + poolsize uint +} + +// newBucket initializes a new bucket. +func newBucket(poolSize int) *bucket { + return &bucket{ + leakRate: time.Duration(poolSize) * table[1], + points: 0, + lastEdit: netTime.Now(), + poolsize: uint(poolSize), + } +} + +func (b *bucket) leak() { + now := netTime.Now() + + delta := now.Sub(b.lastEdit) + if delta < 0 { + return + } + + leaked := (float64(delta) / float64(b.leakRate)) * float64(b.poolsize) + b.points -= leaked + if b.points < 0 { + b.points = 0 + } +} + +func (b *bucket) Add() { + b.leak() + b.points += 1 +} + +func (b *bucket) Reset() { + b.points = 0 +} + +func (b *bucket) GetDelay() time.Duration { + b.leak() + return getDelay(b.points, b.poolsize) +} diff --git a/cmix/gateway/guilty.go b/cmix/gateway/guilty.go new file mode 100644 index 0000000000000000000000000000000000000000..ef242edb93b836d7dc29ffc4d31b90071fc622d4 --- /dev/null +++ b/cmix/gateway/guilty.go @@ -0,0 +1,47 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/ndf" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" +) + +// List of errors that initiate a Host replacement +var errorsList = []string{ + context.DeadlineExceeded.Error(), + "connection refused", + "host disconnected", + "transport is closing", + balancer.ErrTransientFailure.Error(), + "Last try to connect", + ndf.NO_NDF, + "Host is in cool down", + grpc.ErrClientConnClosing.Error(), + connect.TooManyProxyError, + "Failed to fetch", + "NetworkError when attempting to fetch resource.", +} + +var errorMap = make(map[string]struct{}) + +func init() { + for _, str := range errorsList { + errorMap[str] = struct{}{} + } +} + +// IsGuilty returns true if the error means the host +// will get kicked out of the pool +func IsGuilty(err error) bool { + _, exists := errorMap[err.Error()] + return exists +} diff --git a/cmix/gateway/hostPool.go b/cmix/gateway/hostPool.go index f44ff2892d5f7ba881011334c1042842e081ee21..013b20f0dd37928e44a95ad71d53be2034d16e4f 100644 --- a/cmix/gateway/hostPool.go +++ b/cmix/gateway/hostPool.go @@ -5,51 +5,61 @@ // LICENSE file. // //////////////////////////////////////////////////////////////////////////////// -// Package gateway Handles functionality related to providing Gateway -// connect.Host objects for message sending to the rest of the client -// repository. -// Used to minimize the number of open connections on mobile clients. - package gateway import ( - "encoding/json" - "math" - "sort" - "strings" - "sync" - "time" - "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/stoppable" "gitlab.com/elixxir/client/v4/storage" "gitlab.com/elixxir/client/v4/storage/versioned" - "gitlab.com/elixxir/comms/network" + commNetwork "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" - "gitlab.com/elixxir/crypto/shuffle" "gitlab.com/xx_network/comms/connect" - "gitlab.com/xx_network/crypto/randomness" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/balancer" + "math" + "strconv" + "sync" + "sync/atomic" + "testing" ) -// List of errors that initiate a Host replacement -var errorsList = []string{ - context.DeadlineExceeded.Error(), - "connection refused", - "host disconnected", - "transport is closing", - balancer.ErrTransientFailure.Error(), - "Last try to connect", - ndf.NO_NDF, - "Host is in cool down", - grpc.ErrClientConnClosing.Error(), - connect.TooManyProxyError, - "Failed to fetch", - "NetworkError when attempting to fetch resource.", +type hostPool struct { + /*internal state*/ + writePool *pool + readPool atomic.Value + + ndfMap map[id.ID]int // Map gateway ID to its index in the NDF + ndf *ndf.NetworkDefinition + + /*Runner inputs*/ + // a send on this channel adds a node to the host pool + // if a nil id is sent, a few random nodes are tested + // and the best is added + // if a specific id is sent, that id is added + addRequest chan *id.ID + removeRequest chan *id.ID + newHost chan *connect.Host + doneTesting chan []*connect.Host + newNdf chan *ndf.NetworkDefinition + + /*worker inputs*/ + // tests the list of nodes. Finds the one with the lowest ping, + // connects, and then returns over addNode + testNodes chan []*connect.Host + + /* external objects*/ + rng *fastRNG.StreamGenerator + params Params + manager HostManager + filterMux sync.Mutex + filter Filter + kv *versioned.KV + addChan chan commNetwork.NodeGateway + + /* computed parameters*/ + numNodesToTest int } // HostManager Interface allowing storage and retrieval of Host objects @@ -69,763 +79,233 @@ type HostManager interface { // accepted. type Filter func(map[id.ID]int, *ndf.NetworkDefinition) map[id.ID]int -// HostPool Handles providing hosts to the client -type HostPool struct { - hostMap map[id.ID]uint32 // Map key to its index in the slice - hostList []*connect.Host // Each index in the slice contains the value - hostMux sync.RWMutex // Mutex for the above map/list combination - - ndfMap map[id.ID]int // Map gateway ID to its index in the NDF - ndf *ndf.NetworkDefinition - ndfMux sync.RWMutex - - poolParams PoolParams - rng *fastRNG.StreamGenerator - manager HostManager - addGatewayChan chan<- network.NodeGateway - - kv *versioned.KV - - filterMux sync.Mutex - filter Filter -} - -// PoolParams allows configuration of HostPool parameters. -type PoolParams struct { - // MaxPoolSize is the maximum number of Hosts in the HostPool. - MaxPoolSize uint32 - - // PoolSize allows override of HostPool size. Set to zero for dynamic size - // calculation. - PoolSize uint32 - - // ProxyAttempts dictates how many proxies will be used in event of send - // failure. - ProxyAttempts uint32 - - // MaxPings is the number of gateways to concurrently test when initializing - // HostPool. Disabled if zero. - MaxPings uint32 - - // ForceConnection determines whether host connections are initialized when - // added to HostPool. - ForceConnection bool - - // HostParams is the parameters for the creation of new Host objects. - //fixme params: have this adhere to json.Marshaler. - // This will allow the PoolParams object to have full adherence. - HostParams connect.HostParams `json:"-"` -} - -// poolParamsDisk will be the marshal-able and umarshal-able object. -type poolParamsDisk struct { - MaxPoolSize uint32 - PoolSize uint32 - ProxyAttempts uint32 - MaxPings uint32 - ForceConnection bool -} - -// DefaultPoolParams returns a default set of PoolParams. -func DefaultPoolParams() PoolParams { - p := PoolParams{ - MaxPoolSize: MaxPoolSize, - ProxyAttempts: 5, - PoolSize: 0, - MaxPings: 0, - ForceConnection: false, - HostParams: connect.GetDefaultHostParams(), - } - p.HostParams.MaxRetries = 1 - p.HostParams.MaxSendRetries = 1 - p.HostParams.AuthEnabled = false - p.HostParams.EnableCoolOff = false - p.HostParams.NumSendsBeforeCoolOff = 1 - p.HostParams.CoolOffTimeout = 5 * time.Minute - p.HostParams.SendTimeout = 1000 * time.Millisecond - p.HostParams.PingTimeout = 1000 * time.Millisecond - return p -} - -// GetParameters returns the default PoolParams, or -// override with given parameters, if set. -func GetParameters(params string) (PoolParams, error) { - p := DefaultPoolParams() - if len(params) > 0 { - err := json.Unmarshal([]byte(params), &p) - if err != nil { - return PoolParams{}, err - } - } - return p, nil -} - -// MarshalJSON adheres to the json.Marshaler interface. -func (pp PoolParams) MarshalJSON() ([]byte, error) { - ppd := poolParamsDisk{ - MaxPoolSize: pp.MaxPoolSize, - PoolSize: pp.PoolSize, - ProxyAttempts: pp.ProxyAttempts, - MaxPings: pp.MaxPings, - ForceConnection: pp.ForceConnection, - } - - return json.Marshal(&ppd) -} - -// UnmarshalJSON adheres to the json.Unmarshaler interface. -func (pp *PoolParams) UnmarshalJSON(data []byte) error { - ppDisk := poolParamsDisk{} - err := json.Unmarshal(data, &ppDisk) - if err != nil { - return err - } - - *pp = PoolParams{ - MaxPoolSize: ppDisk.MaxPoolSize, - PoolSize: ppDisk.PoolSize, - ProxyAttempts: ppDisk.ProxyAttempts, - MaxPings: ppDisk.MaxPings, - ForceConnection: ppDisk.ForceConnection, - // Since this does not adhere to json.Marshaler yet, - // file it in manually assuming default values - HostParams: connect.GetDefaultHostParams(), - } - - return nil +var defaultFilter = func(m map[id.ID]int, _ *ndf.NetworkDefinition) map[id.ID]int { + return m } -// newHostPool builds and returns a new HostPool object. -func newHostPool(poolParams PoolParams, rng *fastRNG.StreamGenerator, +// newHostPool is a helper function which initializes a hostPool. This +// will not initiate the long-running threads (see hostPool.StartProcesses). +func newHostPool(params Params, rng *fastRNG.StreamGenerator, netDef *ndf.NetworkDefinition, getter HostManager, storage storage.Session, - addGateway chan<- network.NodeGateway) (*HostPool, error) { + addChan chan commNetwork.NodeGateway, comms CertCheckerCommInterface) ( + *hostPool, error) { var err error // Determine size of HostPool - if poolParams.PoolSize == 0 { - poolParams.PoolSize, err = getPoolSize( - uint32(len(netDef.Gateways)), poolParams.MaxPoolSize) + if params.PoolSize == 0 { + params.PoolSize, err = getPoolSize( + uint32(len(netDef.Gateways)), params.MaxPoolSize) if err != nil { return nil, err } } - result := &HostPool{ - manager: getter, - hostMap: make(map[id.ID]uint32), - hostList: make([]*connect.Host, poolParams.PoolSize), - poolParams: poolParams, - ndf: netDef.DeepCopy(), - rng: rng, - addGatewayChan: addGateway, - kv: storage.GetKV().Prefix(hostListPrefix), - - // Initialise the filter so it does not filter any IDs - filter: func(m map[id.ID]int, _ *ndf.NetworkDefinition) map[id.ID]int { - return m - }, - } - - // Propagate the NDF - err = result.updateConns() + // Calculate the minimum input of buffers + buffLen := 10 * len(netDef.Gateways) + if buffLen < int(params.MinBufferLength) { + buffLen = int(params.MinBufferLength) + } + + // Override rotation and tune parameters if the network is + // too small + if int(params.PoolSize*params.MaxPings) > len(netDef.Gateways) { + params.EnableRotation = false + params.MaxPings = 1 + } + + // Build the underlying pool + p := newPool(int(params.PoolSize)) + + // Build the host pool + hp := &hostPool{ + writePool: p, + readPool: atomic.Value{}, + ndf: netDef.DeepCopy(), + addRequest: make(chan *id.ID, buffLen), + removeRequest: make(chan *id.ID, buffLen), + newHost: make(chan *connect.Host, buffLen), + doneTesting: make(chan []*connect.Host, buffLen), + newNdf: make(chan *ndf.NetworkDefinition, buffLen), + testNodes: make(chan []*connect.Host, buffLen), + rng: rng, + params: params, + manager: getter, + filter: defaultFilter, + kv: storage.GetKV().Prefix(hostListPrefix), + numNodesToTest: getNumNodesToTest(int(params.MaxPings), + len(netDef.Gateways), int(params.PoolSize)), + addChan: addChan, + } + hp.readPool.Store(p.deepCopy()) + + // Process the ndf + hp.ndfMap = hp.processNdf(hp.ndf) + + // Prime the host pool at add its first hosts + hl, err := getHostPreparedList(hp.kv, int(params.PoolSize)) if err != nil { - return nil, err - } - - // Get the last used list of hosts and use it to seed the host pool list - hostList, err := getHostList(result.kv) - numHostsAdded := 0 - if err == nil { - for _, hid := range hostList { - err = result.replaceHostNoStore(hid, uint32(numHostsAdded)) - if err != nil { - jww.WARN.Printf("Unable to add stored host %s: %s", hid, err) - } else { - numHostsAdded++ - if numHostsAdded >= len(result.hostList) { - break - } - } - } - } else { - jww.WARN.Printf( - "Building new HostPool because no HostList stored: %s", err.Error()) + jww.WARN.Printf("Starting host pool from scratch, "+ + "cannot get old pool: %+v", err) } - // Build the initial HostPool and return - if result.poolParams.MaxPings > 0 { - // If pinging enabled, select random performant Hosts - err = result.initialize(uint32(numHostsAdded)) - if err != nil { - return nil, err - } - } else { - // Otherwise, select random Hosts - for i := numHostsAdded; i < len(result.hostList); i++ { - err = result.replaceHost(result.selectGateway(), uint32(i)) - if err != nil { - return nil, err - } - } + for i := range hl { + hp.addRequest <- hl[i] } - jww.INFO.Printf("Initialized HostPool with size: %d/%d", - poolParams.PoolSize, len(netDef.Gateways)) - return result, nil + return hp, nil } -// initialize the HostPool with a performant set of Hosts. -func (h *HostPool) initialize(startIdx uint32) error { - // If HostPool is full, there is no need to initialize - if startIdx == h.poolParams.PoolSize { - return nil +// newTestingHostPool initializes a hostPool for testing purposes only. +func newTestingHostPool(params Params, rng *fastRNG.StreamGenerator, + netDef *ndf.NetworkDefinition, getter HostManager, + storage storage.Session, addChan chan commNetwork.NodeGateway, + comms CertCheckerCommInterface, t *testing.T) (*hostPool, error) { + if t == nil { + jww.FATAL.Panicf("can only be called in testing") } - // Randomly shuffle gateways in NDF - randomGateways := make([]ndf.Gateway, 0, len(h.ndf.Gateways)) - - // Filter out not active gateways - for i := 0; i < len(h.ndf.Gateways); i++ { - if h.ndf.Nodes[i].Status == ndf.Active { - randomGateways = append(randomGateways, h.ndf.Gateways[i]) - } - } - - // Randomize the gateway order - var rndBytes [32]byte - stream := h.rng.GetStream() - _, err := stream.Read(rndBytes[:]) - stream.Close() + hp, err := newHostPool(params, rng, netDef, getter, storage, addChan, comms) if err != nil { - return errors.Errorf( - "Failed to randomize shuffle for HostPool initialization: %+v", err) - } - shuffle.ShuffleSwap(rndBytes[:], len(randomGateways), func(i, j int) { - randomGateways[i], randomGateways[j] = randomGateways[j], randomGateways[i] - }) - - // Set constants - type gatewayDuration struct { - id *id.ID - latency time.Duration - } - - numGatewaysToTry := h.poolParams.MaxPings - numGateways := uint32(len(randomGateways)) - if numGatewaysToTry > numGateways { - numGatewaysToTry = numGateways - } - - resultList := make([]gatewayDuration, 0, numGatewaysToTry) - - // Begin trying gateways - c := make(chan gatewayDuration, numGatewaysToTry) - - i := 0 - for exit := false; !exit; { - triedHosts := uint32(0) - for ; triedHosts < numGateways && i < len(randomGateways); i++ { - // Select a gateway not yet selected - gwId, err := randomGateways[i].GetGatewayId() - if err != nil { - jww.WARN.Printf("ID for gateway %s could not be retrieved", gwId) - } - - // Skip if already in HostPool - if _, ok := h.hostMap[*gwId]; ok { - continue - } - triedHosts++ - - go func() { - // Obtain that GwId's Host object - newHost, ok := h.manager.GetHost(gwId) - if !ok { - jww.WARN.Printf( - "Host for gateway %s could not be retrieved", gwId) - return - } - - // Ping the Host latency and send the result - jww.DEBUG.Printf("Testing host %s...", gwId) - latency, _ := newHost.IsOnline() - c <- gatewayDuration{gwId, latency} - }() - } - - // Collect ping results - pingTimeout := 2 * h.poolParams.HostParams.PingTimeout - timer := time.NewTimer(pingTimeout) - - newAppends := uint32(0) - innerLoop: - for { - select { - case gw := <-c: - // Only add successful pings - if gw.latency > 0 { - newAppends++ - resultList = append(resultList, gw) - jww.DEBUG.Printf("Adding HostPool result %d/%d: %s: %d", - len(resultList), numGatewaysToTry, gw.id, gw.latency) - } - - // Break if we have all needed slots - if newAppends == triedHosts { - exit = true - timer.Stop() - break innerLoop - } - - case <-timer.C: - jww.INFO.Printf("HostPool initialization timed out after %s.", - pingTimeout) - break innerLoop - } - } - - if i >= len(randomGateways) { - exit = true - } + return nil, err } - // Sort the resultList by lowest latency - sort.Slice(resultList, func(i, j int) bool { - return resultList[i].latency < resultList[j].latency - }) - jww.DEBUG.Printf("Gateway pool results: %+v", resultList) - - // Process ping results - for _, result := range resultList { - err = h.replaceHost(result.id, startIdx) - if err != nil { - jww.WARN.Printf("Unable to replaceHost: %+v", err) - continue - } - startIdx++ - if startIdx >= h.poolParams.PoolSize { - break - } - } + // Overwrite is connected + hp.writePool.isConnected = func(host *connect.Host) bool { return true } - // Ran out of Hosts to try - if startIdx < h.poolParams.PoolSize { - return errors.Errorf( - "Unable to initialize enough viable hosts for HostPool") + gwID, _ := hp.ndf.Gateways[0].GetGatewayId() + h, exists := hp.manager.GetHost(gwID) + if !exists { + return nil, errors.Errorf("impossible error") } - - return nil + // Add one member to the host pool + stream := rng.GetStream() + hp.writePool.addOrReplace(stream, h) + hp.readPool.Store(hp.writePool.deepCopy()) + stream.Close() + return hp, nil } -// UpdateNdf mutates internal NDF to the given NDF -func (h *HostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { - if len(ndf.Gateways) == 0 { - jww.WARN.Printf("Unable to UpdateNdf: no gateways available") - return - } +// StartProcesses starts all background threads fgr the host pool +func (hp *hostPool) StartProcesses() stoppable.Stoppable { + multi := stoppable.NewMulti("HostPool") - // Lock order is extremely important here - h.hostMux.Lock() - h.ndfMux.Lock() - h.ndf = ndf.DeepCopy() - err := h.updateConns() - if err != nil { - jww.ERROR.Printf("Unable to updateConns: %+v", err) + // Create the Node Tester workers + for i := 0; i < hp.params.NumConnectionsWorkers; i++ { + stop := stoppable.NewSingle( + "Node Tester Worker " + strconv.Itoa(i)) + go hp.nodeTester(stop) + multi.Add(stop) } - h.ndfMux.Unlock() - h.hostMux.Unlock() -} - -// SetGatewayFilter sets the filter used to filter gateways from the ID map. -func (h *HostPool) SetGatewayFilter(f Filter) { - h.filterMux.Lock() - defer h.filterMux.Unlock() - - h.filter = f -} -// GetHostParams returns a copy of the connect.HostParams struct. -func (h *HostPool) GetHostParams() connect.HostParams { - hp := h.poolParams.HostParams - hpCopy := connect.HostParams{ - MaxRetries: hp.MaxRetries, - AuthEnabled: hp.AuthEnabled, - EnableCoolOff: hp.EnableCoolOff, - NumSendsBeforeCoolOff: hp.NumSendsBeforeCoolOff, - CoolOffTimeout: hp.CoolOffTimeout, - SendTimeout: hp.SendTimeout, - EnableMetrics: hp.EnableMetrics, - ExcludeMetricErrors: make([]string, len(hp.ExcludeMetricErrors)), - KaClientOpts: hp.KaClientOpts, - } - for i := 0; i < len(hp.ExcludeMetricErrors); i++ { - hpCopy.ExcludeMetricErrors[i] = hp.ExcludeMetricErrors[i] + // If rotation is enabled, start the rotation thread + if hp.params.EnableRotation { + rotationStop := stoppable.NewSingle("Rotation") + go hp.Rotation(rotationStop) + multi.Add(rotationStop) } - return hpCopy -} -// getFilter returns the filter used to filter gateways from the ID map. -func (h *HostPool) getFilter() Filter { - h.filterMux.Lock() - defer h.filterMux.Unlock() + // Start the main thread + runnerStop := stoppable.NewSingle("Runner") + go hp.runner(runnerStop) + multi.Add(runnerStop) - return h.filter + return multi } -// getAny obtains a random and unique list of hosts of the given length from the -// HostPool. -func (h *HostPool) getAny(length uint32, excluded []*id.ID) []*connect.Host { - if length > h.poolParams.PoolSize { - length = h.poolParams.PoolSize - } - - // Keep track of Hosts already selected to avoid duplicates - checked := make(map[uint32]interface{}) - if excluded != nil { - // Add excluded Hosts to already-checked list - for i := range excluded { - gwId := excluded[i] - if idx, ok := h.hostMap[*gwId]; ok { - checked[idx] = nil - } - } - } - - result := make([]*connect.Host, 0, length) - rng := h.rng.GetStream() - h.hostMux.RLock() - for i := uint32(0); i < length; { - // If we've checked the entire HostPool, bail - if uint32(len(checked)) >= h.poolParams.PoolSize { - break - } - - // Check the next HostPool index - gwIdx := randomness.ReadRangeUint32(0, h.poolParams.PoolSize, - rng) - if _, ok := checked[gwIdx]; !ok { - result = append(result, h.hostList[gwIdx]) - checked[gwIdx] = nil - i++ - } +// Remove triggers the node to be removed from the host pool and disconnects, +// if the node is present +func (hp *hostPool) Remove(h *connect.Host) { + h.Disconnect() + select { + case hp.removeRequest <- h.GetId(): + default: + jww.WARN.Printf("Failed to pass instruction to remove %s", h.GetId()) } - - h.hostMux.RUnlock() - rng.Close() - - return result -} - -// getSpecific obtains a specific connect.Host from the manager, irrespective of -// the HostPool. -func (h *HostPool) getSpecific(target *id.ID) (*connect.Host, bool) { - return h.manager.GetHost(target) } -// getPreferred tries to obtain the given targets from the HostPool. If each is -// not present, then obtains a random replacement from the HostPool. -func (h *HostPool) getPreferred(targets []*id.ID) []*connect.Host { - // Keep track of Hosts already selected to avoid duplicates - checked := make(map[uint32]interface{}) - length := len(targets) - if length > int(h.poolParams.PoolSize) { - length = int(h.poolParams.PoolSize) - } - result := make([]*connect.Host, length) - - rng := h.rng.GetStream() - h.hostMux.RLock() - for i := 0; i < length; { - if hostIdx, ok := h.hostMap[*targets[i]]; ok { - result[i] = h.hostList[hostIdx] - checked[hostIdx] = nil - i++ - continue - } - - gwIdx := randomness.ReadRangeUint32(0, h.poolParams.PoolSize, - rng) - if _, ok := checked[gwIdx]; !ok { - result[i] = h.hostList[gwIdx] - checked[gwIdx] = nil - i++ - } +// Add adds the given gateway to the hostpool, if it is present +func (hp *hostPool) Add(gwId *id.ID) { + select { + case hp.addRequest <- gwId: + default: + jww.WARN.Printf("Failed to pass instruction to add %s", gwId) } - h.hostMux.RUnlock() - rng.Close() - - return result } -// checkReplace replaces the given hostId in the HostPool if the given hostErr -// is in errorList. Returns whether the host was replaced. -func (h *HostPool) checkReplace(hostId *id.ID, hostErr error) (bool, error) { - var err error - - // Check if host should be replaced - doReplace := false - if hostErr != nil { - for _, errString := range errorsList { - if strings.Contains(hostErr.Error(), errString) { - // Host needs to be replaced, flag and continue - doReplace = true - break - } - } - } - - // If the Host is still in the pool - if doReplace { - // Lock order is extremely important here - h.hostMux.Lock() - if oldPoolIndex, ok := h.hostMap[*hostId]; ok { - // Replace it - h.ndfMux.RLock() - err = h.replaceHost(h.selectGateway(), oldPoolIndex) - h.ndfMux.RUnlock() - } - h.hostMux.Unlock() +// UpdateNdf updates the NDF used by the hostpool, +// updating hosts and removing gateways which are no longer +// in the nDF +func (hp *hostPool) UpdateNdf(ndf *ndf.NetworkDefinition) { + select { + case hp.newNdf <- ndf: + default: + jww.WARN.Printf("Failed to update the HostPool's NDF") } - - return doReplace && err == nil, err } -// selectGateway selects a viable HostPool candidate from the NDF. -func (h *HostPool) selectGateway() *id.ID { - rng := h.rng.GetStream() - defer rng.Close() - - // Loop until a replacement Host is found - for { - // Randomly select a new Gw by index in the NDF - ndfIdx := randomness.ReadRangeUint32(0, - uint32(len(h.ndf.Gateways)), rng) - - // Use the random ndfIdx to obtain a GwId from the NDF - gwId, err := id.Unmarshal(h.ndf.Gateways[ndfIdx].ID) - if err != nil { - jww.WARN.Printf("Unable to unmarshal gateway: %+v", err) - continue - } - - // Verify the Gateway's Node is not Stale before adding to HostPool - nodeId := gwId.DeepCopy() - nodeId.SetType(id.Node) - nodeNdfIdx := h.ndfMap[*nodeId] - isNodeIsNotActive := h.ndf.Nodes[nodeNdfIdx].Status != ndf.Active - if isNodeIsNotActive { - jww.DEBUG.Printf("Ignoring stale node: %s", nodeId) - continue - } +// SetGatewayFilter sets the filter used to filter gateways from the ID map. +func (hp *hostPool) SetGatewayFilter(f Filter) { + hp.filterMux.Lock() + defer hp.filterMux.Unlock() - // Verify the new GwId is not already in the hostMap - if _, ok := h.hostMap[*gwId]; !ok { - return gwId - } - } + hp.filter = f } -// replaceHost replaces the given slot in the HostPool with a new Gateway with -// the specified ID. The resulting host list is saved to storage. -func (h *HostPool) replaceHost(newId *id.ID, oldPoolIndex uint32) error { - err := h.replaceHostNoStore(newId, oldPoolIndex) - if err != nil { - return err - } - - // Convert list of non-nil and non-zero hosts to ID list - idList := make([]*id.ID, 0, len(h.hostList)) - for _, host := range h.hostList { - if host.GetId() != nil && !host.GetId().Cmp(&id.ID{}) { - idList = append(idList, host.GetId()) - } +// GetHostParams returns a copy of the connect.HostParams struct. +func (hp *hostPool) GetHostParams() connect.HostParams { + param := hp.params.HostParams + hpCopy := connect.HostParams{ + MaxRetries: param.MaxRetries, + AuthEnabled: param.AuthEnabled, + EnableCoolOff: param.EnableCoolOff, + NumSendsBeforeCoolOff: param.NumSendsBeforeCoolOff, + CoolOffTimeout: param.CoolOffTimeout, + SendTimeout: param.SendTimeout, + EnableMetrics: param.EnableMetrics, + ExcludeMetricErrors: make([]string, len(param.ExcludeMetricErrors)), + KaClientOpts: param.KaClientOpts, + } + for i := 0; i < len(param.ExcludeMetricErrors); i++ { + hpCopy.ExcludeMetricErrors[i] = param.ExcludeMetricErrors[i] } - - // Save the list to storage - return saveHostList(h.kv, idList) + return hpCopy } -// replaceHostNoStore replaces the given slot in the HostPool with a new Gateway -// with the specified ID without saving changes to storage. -func (h *HostPool) replaceHostNoStore(newId *id.ID, oldPoolIndex uint32) error { - // Obtain that GwId's Host object - newHost, ok := h.manager.GetHost(newId) - if !ok { - return errors.Errorf("host for gateway %s could not be retrieved", newId) - } - - // Keep track of oldHost for cleanup - oldHost := h.hostList[oldPoolIndex] - - // Use the poolIdx to overwrite the random Host in the corresponding index - // in the hostList - h.hostList[oldPoolIndex] = newHost - - // Use the GwId to keep track of the new random Host's index in the hostList - h.hostMap[*newId] = oldPoolIndex - - // Clean up and disconnect old Host - oldHostIDStr := "unknown" - if oldHost != nil { - oldHostIDStr = oldHost.GetId().String() - delete(h.hostMap, *oldHost.GetId()) - go oldHost.Disconnect() - } - - // Manually connect the new Host - if h.poolParams.ForceConnection { - go func() { - err := newHost.Connect() - if err != nil { - jww.WARN.Printf("Unable to initialize Host connection to %s: "+ - "%+v", newId, err) - } - }() - } - - jww.DEBUG.Printf("Replaced Host at %d [%s] with new Host %s", - oldPoolIndex, oldHostIDStr, newId) - return nil +// getPool return the pool assoceated with the +func (hp *hostPool) getPool() Pool { + p := hp.readPool.Load() + return (p).(*pool) } -// forceAdd adds the Gateways to the HostPool, each replacing a random Gateway. -func (h *HostPool) forceAdd(gwId *id.ID) error { - rng := h.rng.GetStream() - h.hostMux.Lock() - defer h.hostMux.Unlock() - defer rng.Close() - - // Verify the gateway ID is not already in the hostMap - if _, ok := h.hostMap[*gwId]; ok { - // If it is, skip - return nil - } +// getFilter returns the filter used to filter gateways from the ID map. +func (hp *hostPool) getFilter() Filter { + hp.filterMux.Lock() + defer hp.filterMux.Unlock() - // Randomly select another Gateway in the HostPool for replacement - poolIdx := randomness.ReadRangeUint32(0, h.poolParams.PoolSize, rng) - return h.replaceHost(gwId, poolIdx) + return hp.filter } -// updateConns updates the internal HostPool with any changes to the NDF. -func (h *HostPool) updateConns() error { - // Prepare NDFs for comparison - newMap, err := convertNdfToMap(h.ndf) +// getHostList returns the host list from storage. +// it will trip the list if it is too long and +// extend it if it is too short +func getHostPreparedList(kv *versioned.KV, poolSize int) ([]*id.ID, error) { + obj, err := kv.Get(hostListKey, hostListVersion) if err != nil { - return errors.Errorf("Unable to convert new NDF to set: %+v", err) + return make([]*id.ID, poolSize), errors.Errorf(getStorageErr, err) } - // Filter out unwanted gateway IDs - newMap = h.getFilter()(newMap, h.ndf) - - // Keep track of the old NDF set - oldMap := h.ndfMap - // Update the internal NDF set - h.ndfMap = newMap - - // Handle adding Gateways - for gwId, ndfIdx := range newMap { - if _, ok := oldMap[gwId]; !ok && gwId.GetType() == id.Gateway { - // If GwId in newMap is not in ndfMap, add the Gateway - h.addGateway(gwId.DeepCopy(), ndfIdx) - } - } - - // Handle removing Gateways - for gwId := range oldMap { - if _, ok := newMap[gwId]; !ok && gwId.GetType() == id.Gateway { - // If GwId in ndfMap is not in newMap, remove the Gateway - h.removeGateway(gwId.DeepCopy()) - } - } - - return nil -} - -// convertNdfToMap takes NDF.Gateways and puts their IDs into a map object. -func convertNdfToMap(ndf *ndf.NetworkDefinition) (map[id.ID]int, error) { - result := make(map[id.ID]int) - if ndf == nil { - return result, nil - } - - // Process node and gateway IDs into set - // NOTE: We expect len(ndf.Gateways) == len(ndf.Nodes) - for i := range ndf.Gateways { - gw := ndf.Gateways[i] - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - return nil, err - } - result[*gwId] = i - - node := ndf.Nodes[i] - nodeId, err := id.Unmarshal(node.ID) - if err != nil { - return nil, err - } - result[*nodeId] = i + rawHL, err := unmarshalHostList(obj.Data) + if err != nil { + return make([]*id.ID, poolSize), err } - return result, nil -} - -// removeGateway removes old gateways. -func (h *HostPool) removeGateway(gwId *id.ID) { - h.manager.RemoveHost(gwId) - // If needed, replace the removed Gateway in the HostPool with a new one - if poolIndex, ok := h.hostMap[*gwId]; ok { - err := h.replaceHost(h.selectGateway(), poolIndex) - if err != nil { - jww.ERROR.Printf("Unable to remove gateway: %+v", err) - } + if len(rawHL) > poolSize { + rawHL = rawHL[:poolSize] + } else if len(rawHL) < poolSize { + rawHL = append(rawHL, make([]*id.ID, poolSize-len(rawHL))...) } -} - -// addGateway adds a new Gateway. -func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { - gw := h.ndf.Gateways[ndfIndex] - - // Check if the host exists - host, ok := h.manager.GetHost(gwId) - if !ok { - // Check if gateway ID collides with an existing hard coded ID - if id.CollidesWithHardCodedID(gwId) { - jww.ERROR.Printf("Gateway ID invalid, collides with a "+ - "hard coded ID. Invalid ID: %s", gwId) - } - - // process the cert. Normally this is a simple passthrough, in - // webassembly, it replaces with the web assembly connection info - gwAddr, cert, err := getConnectionInfo(gwId, gw.Address, gw.TlsCertificate) - if err != nil { - jww.ERROR.Printf("Could not add gateway host, invalid port %s: %+v", gwId, err) - } else { - // Add the new gateway host - _, err = h.manager.AddHost( - gwId, gwAddr, cert, h.poolParams.HostParams) - if err != nil { - jww.ERROR.Printf("Could not add gateway host %s: %+v", gwId, err) - } - } - - // Send AddGateway event if we do not already possess keys for the GW - // the recipient of the channel checks if it should process - ng := network.NodeGateway{ - Node: h.ndf.Nodes[ndfIndex], - Gateway: gw, - } - - select { - case h.addGatewayChan <- ng: - default: - jww.WARN.Printf( - "Unable to send AddGateway event for ID %s", gwId) - } - - } else if host.GetAddress() != gw.Address { - host.UpdateAddress(gw.Address) - } + return rawHL, nil } // getPoolSize determines the size of the HostPool based on the size of the NDF. @@ -842,3 +322,19 @@ func getPoolSize(ndfLen, maxSize uint32) (uint32, error) { } return poolSize, nil } + +// getNumNodesToTest returns the number of nodes to test when +// finding a node to send messages to in order to ensure +// the pool of all nodes will not be exhausted +func getNumNodesToTest(maxPings, numGateways, poolSize int) int { + //calculate the number of nodes to test at once + numNodesToTest := maxPings + accessRatio := numGateways / poolSize + if accessRatio < 1 { + accessRatio = 1 + } + if numNodesToTest > accessRatio { + numNodesToTest = accessRatio + } + return numNodesToTest +} diff --git a/cmix/gateway/hostpool_test.go b/cmix/gateway/hostpool_test.go index abccaa1d504c8e15f780d91981250df8e9f68705..9ceae7064e25bf95d3438b780874af486c7641ca 100644 --- a/cmix/gateway/hostpool_test.go +++ b/cmix/gateway/hostpool_test.go @@ -8,8 +8,8 @@ package gateway import ( - "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/stoppable" "gitlab.com/elixxir/client/v4/storage" "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" @@ -20,6 +20,7 @@ import ( "os" "reflect" "testing" + "time" ) func TestMain(m *testing.M) { @@ -34,8 +35,7 @@ func Test_newHostPool(t *testing.T) { rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() + params := DefaultParams() params.MaxPoolSize = uint32(len(testNdf.Gateways)) // Pull all gateways from NDF into host manager @@ -48,14 +48,14 @@ func Test_newHostPool(t *testing.T) { // Add mock gateway to manager _, err = manager.AddHost(gwId, "", nil, connect.GetDefaultHostParams()) if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) + t.Fatalf("Could not Add mock host to manager: %+v", err) } } // Call the constructor _, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) + testStorage, nil, nil) if err != nil { t.Fatalf("Failed to create mock host pool: %v", err) } @@ -67,7 +67,7 @@ func Test_newHostPool_HostListStore(t *testing.T) { rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) + addGwChan := make(chan network.NodeGateway, len(testNdf.Gateways)) params := DefaultPoolParams() params.MaxPoolSize = uint32(len(testNdf.Gateways)) @@ -87,7 +87,8 @@ func Test_newHostPool_HostListStore(t *testing.T) { } // Call the constructor - hp, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan) + mccc := &mockCertCheckerComm{} + hp, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan, mccc) if err != nil { t.Fatalf("Failed to create mock host pool: %v", err) } @@ -110,7 +111,7 @@ func TestHostPool_ManageHostPool(t *testing.T) { rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) + addGwChan := make(chan network.NodeGateway, len(testNdf.Gateways)) // Construct custom params params := DefaultPoolParams() @@ -127,19 +128,20 @@ func TestHostPool_ManageHostPool(t *testing.T) { _, err = manager.AddHost( gwId, gw.Address, nil, connect.GetDefaultHostParams()) if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) + t.Fatalf("Could not Add mock host to manager: %+v", err) } } // Call the constructor + mccc := &mockCertCheckerComm{} testPool, err := newHostPool( - params, rng, testNdf, manager, testStorage, addGwChan) + params, rng, testNdf, manager, testStorage, addGwChan, mccc) if err != nil { t.Fatalf("Failed to create mock host pool: %+v", err) } - // Construct a list of new gateways/nodes to add to the NDF + // Construct a list of new gateways/nodes to Add to the NDF newGatewayLen := len(testNdf.Gateways) newGateways := make([]ndf.Gateway, newGatewayLen) newNodes := make([]ndf.Node, newGatewayLen) @@ -167,335 +169,49 @@ func TestHostPool_ManageHostPool(t *testing.T) { if err != nil { t.Fatalf("Failed to marshal gateway ID for %v", ndfGw) } - if _, ok := testPool.hostMap[*gwId]; ok { + if _, ok := testPool.writePool.hostMap[*gwId]; ok { t.Errorf("Expected gateway %v to be removed from pool", gwId) } } } -// Full happy path test. -func TestHostPool_replaceHost(t *testing.T) { - manager := newMockManager() - testNdf := getTestNdf(t) - newIndex := uint32(20) - - testStorage := storage.InitTestingSession(t) - - // Construct a manager (bypass business logic in constructor) - hostPool := &HostPool{ - manager: manager, - hostList: make([]*connect.Host, newIndex+1), - hostMap: make(map[id.ID]uint32), - ndf: testNdf, - kv: testStorage.GetKV().Prefix(hostListPrefix), - } - - /* "Replace" a host with no entry */ - - // Pull a gateway ID from the NDF - gwIdOne, err := id.Unmarshal(testNdf.Gateways[0].ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - - // Add mock gateway to manager - _, err = manager.AddHost(gwIdOne, "", nil, connect.GetDefaultHostParams()) - if err != nil { - t.Errorf("Could not add mock host to manager: %v", err) - } - - // "Replace" (insert) the host - err = hostPool.replaceHost(gwIdOne, newIndex) - if err != nil { - t.Errorf("Could not replace host: %v", err) - } - - // Check the state of the map has been correctly updated - retrievedIndex, ok := hostPool.hostMap[*gwIdOne] - if !ok { - t.Errorf("Expected insertion of gateway ID into map") - } - if retrievedIndex != newIndex { - t.Errorf("Index pulled from map not expected value."+ - "\n\tExpected: %d"+ - "\n\tReceived: %d", newIndex, retrievedIndex) - } - - // Check that the state of the list been correctly updated - retrievedHost := hostPool.hostList[newIndex] - if !gwIdOne.Cmp(retrievedHost.GetId()) { - t.Errorf("ID pulled from list is not expected."+ - "\nexpected: %s\nreceived: %s", gwIdOne, retrievedHost.GetId()) - } - - /* Replace the initial host with a new host */ - - // Pull a different gateway ID from the NDF - gwIdTwo, err := id.Unmarshal(testNdf.Gateways[1].ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - - // Add second mock gateway to manager - _, err = manager.AddHost(gwIdTwo, "", nil, connect.GetDefaultHostParams()) - if err != nil { - t.Errorf("Could not add mock host to manager: %v", err) - } - - // Replace the old host - err = hostPool.replaceHost(gwIdTwo, newIndex) - if err != nil { - t.Errorf("Could not replace host: %v", err) - } - - // Check that the state of the list been correctly updated for new host - retrievedHost = hostPool.hostList[newIndex] - if !gwIdTwo.Cmp(retrievedHost.GetId()) { - t.Errorf("ID pulled from list is not expected."+ - "\nexpected: %s\nreceived: %s", gwIdTwo, retrievedHost.GetId()) - } - - // Check the state of the map has been correctly removed for the old gateway - retrievedOldIndex, ok := hostPool.hostMap[*gwIdOne] - if ok { - t.Errorf("Exoected old gateway to be cleared from map") - } - if retrievedOldIndex != 0 { - t.Errorf("Index pulled from map with old gateway as the key "+ - "was not cleared."+ - "\n\tExpected: %d"+ - "\n\tReceived: %d", 0, retrievedOldIndex) - } - - // Check the state of the map has been correctly updated for the old gateway - retrievedIndex, ok = hostPool.hostMap[*gwIdTwo] - if !ok { - t.Errorf("Expected insertion of gateway ID into map") - } - if retrievedIndex != newIndex { - t.Errorf("Index pulled from map using new gateway as the key "+ - "was not updated."+ - "\n\tExpected: %d"+ - "\n\tReceived: %d", newIndex, retrievedIndex) - } - - // Check that the host list was saved to storage - hostList, err := getHostList(hostPool.kv) - if err != nil { - t.Errorf("Failed to get host list: %+v", err) - } - - expectedList := []*id.ID{gwIdTwo} - - if !reflect.DeepEqual(expectedList, hostList) { - t.Errorf("Failed to save expected host list to storage."+ - "\nexpected: %+v\nreceived: %+v", expectedList, hostList) - } -} - -// Error path, could not get host -func TestHostPool_replaceHost_Error(t *testing.T) { - manager := newMockManager() - - // Construct a manager (bypass business logic in constructor) - hostPool := &HostPool{ - manager: manager, - hostList: make([]*connect.Host, 1), - hostMap: make(map[id.ID]uint32), - } - - // Construct an unknown gateway ID to the manager - gatewayId := id.NewIdFromString("BadGateway", id.Gateway, t) - - err := hostPool.replaceHost(gatewayId, 0) - if err == nil { - t.Errorf("Expected error in happy path: Should not be able to find a host") - } - -} - -// Unit test -func TestHostPool_ForceReplace(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - - // Construct custom params - params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) - - // Add a stale nodes - newGateway := ndf.Gateway{ - ID: id.NewIdFromUInt(27, id.Gateway, t).Bytes(), - } - newNode := ndf.Node{ - ID: id.NewIdFromUInt(27, id.Node, t).Bytes(), - Status: ndf.Stale, - } - testNdf.Gateways = append(testNdf.Gateways, newGateway) - testNdf.Nodes = append(testNdf.Nodes, newNode) - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - - } - - // Call the constructor - testPool, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %v", err) - } - - // Add all gateways to hostPool's map - for i := uint32(0); i < params.PoolSize; i++ { - gw := testNdf.Gateways[i] - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) - } - - err = testPool.replaceHost(gwId, i) - if err != nil { - t.Fatalf("Failed to replace host in set-up: %+v", err) - } - } - - oldGatewayIndex := 0 - oldHost := testPool.hostList[oldGatewayIndex] - - // Force the replacement of the gateway at a given index - err = testPool.replaceHost(testPool.selectGateway(), uint32(oldGatewayIndex)) - if err != nil { - t.Errorf("Failed to force replace: %v", err) - } - - // Ensure that old gateway has been removed from the map - if _, ok := testPool.hostMap[*oldHost.GetId()]; ok { - t.Errorf("Expected old host to be removed from map") - } - - // Ensure we are disconnected from the old host - if isConnected, _ := oldHost.Connected(); isConnected { - t.Errorf("Failed to disconnect from old host %s", oldHost) - } - -} - // Unit test. -func TestHostPool_checkReplace(t *testing.T) { +func TestHostPool_UpdateNdf(t *testing.T) { manager := newMockManager() rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - - // Construct custom params + addGwChan := make(chan network.NodeGateway, len(testNdf.Gateways)) params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - 5 - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - } + params.MaxPoolSize = uint32(len(testNdf.Gateways)) - // Call the constructor - testPool, err := newHostPool( - params, rng, testNdf, manager, testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %+v", err) + addedIDs := []*id.ID{ + id.NewIdFromString("testID0", id.Gateway, t), + id.NewIdFromString("testID1", id.Gateway, t), + id.NewIdFromString("testID2", id.Gateway, t), + id.NewIdFromString("testID3", id.Gateway, t), } - - // Call check replace - oldGatewayIndex := 0 - oldHost := testPool.hostList[oldGatewayIndex] - expectedError := errors.Errorf(errorsList[0]) - wasReplaced, err := testPool.checkReplace(oldHost.GetId(), expectedError) + err := saveHostList(testStorage.GetKV().Prefix(hostListPrefix), addedIDs) if err != nil { - t.Errorf("Failed to check replace: %+v", err) - } - if !wasReplaced { - t.Error("Expected to replace.") - } - - // Ensure that old gateway has been removed from the map - if _, ok := testPool.hostMap[*oldHost.GetId()]; ok { - t.Error("Expected old host to be removed from map.") + t.Fatalf("Failed to store host list: %+v", err) } - // Ensure we are disconnected from the old host - if isConnected, _ := oldHost.Connected(); isConnected { - t.Errorf("Failed to disconnect from old host %s.", oldHost) + for i, hid := range addedIDs { + testNdf.Gateways[i].ID = hid.Marshal() } - // Check that an error not in the global list results in a no-op - goodGatewayIndex := 0 - goodGateway := testPool.hostList[goodGatewayIndex] - unexpectedErr := errors.Errorf("not in global error list") - wasReplaced, err = testPool.checkReplace(oldHost.GetId(), unexpectedErr) + // Call the constructor + mccc := &mockCertCheckerComm{} + testPool, err := newHostPool(params, rng, testNdf, manager, testStorage, addGwChan, mccc) if err != nil { - t.Errorf("Failed to check replace: %+v", err) - } - if wasReplaced { - t.Errorf("Expected not to replace") - } - - // Ensure that gateway with an unexpected error was not modified - if _, ok := testPool.hostMap[*goodGateway.GetId()]; !ok { - t.Errorf("Expected gateway with non-expected error to not be modified") - } - - // Ensure gateway host has not been disconnected - if isConnected, _ := oldHost.Connected(); isConnected { - t.Errorf("Should not disconnect from %s", oldHost) + t.Fatalf("Failed to create mock host pool: %v", err) } -} - -// Unit test. -func TestHostPool_UpdateNdf(t *testing.T) { - manager := newMockManager() - testNdf := getTestNdf(t) - newIndex := uint32(20) - - testStorage := storage.InitTestingSession(t) - - // Construct a manager (bypass business logic in constructor) - hostPool := &HostPool{ - manager: manager, - hostList: make([]*connect.Host, newIndex+1), - hostMap: make(map[id.ID]uint32), - ndf: testNdf, - kv: testStorage.GetKV().Prefix(hostListPrefix), - poolParams: DefaultPoolParams(), - filter: func(m map[id.ID]int, _ *ndf.NetworkDefinition) map[id.ID]int { - return m - }, - } + stop := stoppable.NewSingle("tester") + go testPool.runner(stop) + defer func() { + stop.Close() + }() // Construct a new Ndf different from original one above newNdf := getTestNdf(t) @@ -509,419 +225,13 @@ func TestHostPool_UpdateNdf(t *testing.T) { newNdf.Nodes = append(newNdf.Nodes, newNode) // Update pool with the new Ndf - hostPool.UpdateNdf(newNdf) - - // Check that the host pool's NDF has been modified properly - if len(newNdf.Nodes) != len(hostPool.ndf.Nodes) || - len(newNdf.Gateways) != len(hostPool.ndf.Gateways) { - t.Errorf("Host pool NDF not updated to new NDF.") - } -} - -// Full test -func TestHostPool_getPreferred(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) - - // Pull all gateways from NDF into host manager - hostMap := make(map[id.ID]bool, 0) - targets := make([]*id.ID, 0) - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - - hostMap[*gwId] = true - targets = append(targets, gwId) - - } - - // Call the constructor - testPool, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %v", err) - } - - retrievedList := testPool.getPreferred(targets) - if len(retrievedList) != len(targets) { - t.Errorf("Requested list did not output requested length."+ - "\n\tExpected: %d"+ - "\n\tReceived: %v", len(targets), len(retrievedList)) - } - - // In case where all requested gateways are present - // ensure requested hosts were returned - for _, h := range retrievedList { - if !hostMap[*h.GetId()] { - t.Errorf("A target gateways which should have been returned was not."+ - "\n\tExpected: %v", h.GetId()) - } - } - - // Replace a request with a gateway not in pool - targets[3] = id.NewIdFromUInt(74, id.Gateway, t) - retrievedList = testPool.getPreferred(targets) - if len(retrievedList) != len(targets) { - t.Errorf("Requested list did not output requested length."+ - "\n\tExpected: %d"+ - "\n\tReceived: %v", len(targets), len(retrievedList)) - } - - // In case where a requested gateway is not present - for _, h := range retrievedList { - if h.GetId().Cmp(targets[3]) { - t.Errorf("Should not have returned ID not in pool") - } - } - -} - -// Unit test. -func TestHostPool_getAny(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - - } - - // Call the constructor - testPool, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %v", err) - } - - requested := 3 - anyList := testPool.getAny(uint32(requested), nil) - if len(anyList) != requested { - t.Errorf("GetAnyList did not get requested length."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", requested, len(anyList)) - } - - for _, h := range anyList { - _, ok := manager.GetHost(h.GetId()) - if !ok { - t.Errorf("Host %s in retrieved list not in manager", h) - } - } - - // Request more than are in host list - largeRequest := uint32(requested * 1000) - largeRetrieved := testPool.getAny(largeRequest, nil) - if len(largeRetrieved) != len(testPool.hostList) { - t.Errorf("Large request should result in a list of all in host list") - } - -} - -// Unit test -func TestHostPool_forceAdd(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() - params.PoolSize = uint32(len(testNdf.Gateways)) - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - - } - - // Call the constructor - testPool, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %v", err) - } - - // Construct a new gateway to add - gwId := id.NewIdFromUInt(uint64(100), id.Gateway, t) - // Add mock gateway to manager - _, err = manager.AddHost(gwId, "", nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %v", err) - } - - // forceAdd gateway - err = testPool.forceAdd(gwId) - if err != nil { - t.Errorf("Could not add gateways: %v", err) - } - - // check that gateways have been added to the map - if _, ok := testPool.hostMap[*gwId]; !ok { - t.Errorf("Failed to forcefully add new gateway ID: %v", gwId) - } -} - -// Unit test which only adds information to NDF. -func TestHostPool_updateConns_AddGateways(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %v", err) - } - - } - - // Call the constructor - testPool, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %v", err) - } - - // Construct a list of new gateways/nodes to add to NDF - newGatewayLen := 10 - newGateways := make([]ndf.Gateway, newGatewayLen) - newNodes := make([]ndf.Node, newGatewayLen) - for i := 0; i < newGatewayLen; i++ { - // Construct gateways - gwId := id.NewIdFromUInt(uint64(100+i), id.Gateway, t) - newGateways[i] = ndf.Gateway{ID: gwId.Bytes()} - // Construct nodes - nodeId := gwId.DeepCopy() - nodeId.SetType(id.Node) - newNodes[i] = ndf.Node{ID: nodeId.Bytes()} - - } - - // Update the NDF - newNdf := getTestNdf(t) - newNdf.Gateways = append(newNdf.Gateways, newGateways...) - newNdf.Nodes = append(newNdf.Nodes, newNodes...) - testPool.UpdateNdf(newNdf) - // Update the connections - err = testPool.updateConns() - if err != nil { - t.Errorf("Failed to update connections: %v", err) - } - - // Check that new gateways are in manager - for _, ndfGw := range newGateways { - gwId, err := id.Unmarshal(ndfGw.ID) - if err != nil { - t.Errorf("Failed to marshal gateway ID for %v", ndfGw) - } - _, ok := testPool.getSpecific(gwId) - if !ok { - t.Errorf("Failed to find gateway %v in manager", gwId) - } - } - -} - -// Unit test which only adds information to NDF. -func TestHostPool_updateConns_RemoveGateways(t *testing.T) { - manager := newMockManager() - rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testNdf := getTestNdf(t) - testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - - } - - // Call the constructor - testPool, err := newHostPool(params, rng, testNdf, manager, - testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock host pool: %v", err) - } + time.Sleep(1 * time.Second) - // Construct a list of new gateways/nodes to add to NDF - newGatewayLen := len(testNdf.Gateways) - newGateways := make([]ndf.Gateway, newGatewayLen) - newNodes := make([]ndf.Node, newGatewayLen) - for i := 0; i < newGatewayLen; i++ { - // Construct gateways - gwId := id.NewIdFromUInt(uint64(100+i), id.Gateway, t) - newGateways[i] = ndf.Gateway{ID: gwId.Bytes()} - // Construct nodes - nodeId := gwId.DeepCopy() - nodeId.SetType(id.Node) - newNodes[i] = ndf.Node{ID: nodeId.Bytes(), Status: ndf.Active} - - } - - // Update the NDF, replacing old data entirely - newNdf := getTestNdf(t) - newNdf.Gateways = newGateways - newNdf.Nodes = newNodes - - testPool.UpdateNdf(newNdf) - - // Update the connections - err = testPool.updateConns() - if err != nil { - t.Errorf("Failed to update connections: %v", err) - } - - // Check that old gateways are not in pool - for _, ndfGw := range testNdf.Gateways { - gwId, err := id.Unmarshal(ndfGw.ID) - if err != nil { - t.Fatalf("Failed to marshal gateway ID for %v", ndfGw) - } - if _, ok := testPool.hostMap[*gwId]; ok { - t.Errorf("Expected gateway %v to be removed from pool", gwId) - } - } -} - -// Unit test. -func TestHostPool_addGateway(t *testing.T) { - manager := newMockManager() - testNdf := getTestNdf(t) - newIndex := uint32(20) - params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - - testStorage := storage.InitTestingSession(t) - - // Construct a manager (bypass business logic in constructor) - hostPool := &HostPool{ - manager: manager, - hostList: make([]*connect.Host, newIndex+1), - hostMap: make(map[id.ID]uint32), - ndf: testNdf, - poolParams: params, - addGatewayChan: make(chan network.NodeGateway), - kv: testStorage.GetKV().Prefix(hostListPrefix), - } - - ndfIndex := 0 - - gwId, err := id.Unmarshal(testNdf.Gateways[ndfIndex].ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - - hostPool.addGateway(gwId, ndfIndex) - - _, ok := manager.GetHost(gwId) - if !ok { - t.Errorf("Unsuccessfully added host to manager") - } -} - -// Unit test -func TestHostPool_removeGateway(t *testing.T) { - manager := newMockManager() - testNdf := getTestNdf(t) - newIndex := uint32(20) - params := DefaultPoolParams() - params.MaxPoolSize = uint32(len(testNdf.Gateways)) - - // Construct a manager (bypass business logic in constructor) - hostPool := &HostPool{ - manager: manager, - hostList: make([]*connect.Host, newIndex+1), - hostMap: make(map[id.ID]uint32), - ndf: testNdf, - poolParams: params, - addGatewayChan: make(chan network.NodeGateway), - rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), - } - - ndfIndex := 0 - - gwId, err := id.Unmarshal(testNdf.Gateways[ndfIndex].ID) - if err != nil { - t.Errorf("Failed to unmarshal ID in mock NDF: %+v", err) - } - - // Manually add host information - hostPool.addGateway(gwId, ndfIndex) - - // Call the removal - hostPool.removeGateway(gwId) - - // Check that the map and list have been updated - if hostPool.hostList[ndfIndex] != nil { - t.Errorf("Host list index was not set to nil after removal") - } - - if _, ok := hostPool.hostMap[*gwId]; ok { - t.Errorf("Host map did not delete host entry") + // Check that the host pool's NDF has been modified properly + if len(newNdf.Nodes) != len(testPool.ndf.Nodes) || + len(newNdf.Gateways) != len(testPool.ndf.Gateways) { + t.Errorf("Host pool NDF not updated to new NDF.") } } diff --git a/cmix/gateway/nodeTester.go b/cmix/gateway/nodeTester.go new file mode 100644 index 0000000000000000000000000000000000000000..1e601c6bf8790bc807fcfbbcf30631e13b63d0c0 --- /dev/null +++ b/cmix/gateway/nodeTester.go @@ -0,0 +1,107 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/stoppable" + "gitlab.com/xx_network/comms/connect" + "sync" + "time" +) + +// connectivityFailure is a constant value indicating that the node being +// processed in hostPool.nodeTester has failed its connectivity test. +const connectivityFailure = 0 + +// nodeTester is a long-running thread that tests the connectivity of nodes +// that may be added to the hostPool. +func (hp *hostPool) nodeTester(stop *stoppable.Single) { + + for { + select { + case <-stop.Quit(): + stop.ToStopped() + return + case queryList := <-hp.testNodes: + jww.DEBUG.Printf("[NodeTester] Received queryList of nodes to test: %v", queryList) + // Test all nodes, find the best + resultList := make([]time.Duration, len(queryList)) + + wg := sync.WaitGroup{} + for i := 0; i < len(queryList); i++ { + wg.Add(1) + go func(hostToQuery *connect.Host, index int) { + latency, pinged := hostToQuery.IsOnline() + if !pinged { + latency = connectivityFailure + } + resultList[index] = latency + wg.Done() + }(queryList[i], i) + } + + // Wait until all tests complete + wg.Wait() + + // Find the fastest one which is not 0 (designated as failure) + lowestLatency := time.Hour + var bestHost *connect.Host + for i := 0; i < len(queryList); i++ { + if resultList[i] != connectivityFailure && resultList[i] < lowestLatency { + lowestLatency = resultList[i] + bestHost = queryList[i] + } + } + + if bestHost != nil { + // Connect to the host then send it over to be added to the + // host pool + err := bestHost.Connect() + if err == nil { + select { + case hp.newHost <- bestHost: + default: + jww.ERROR.Printf("failed to send best host to main thread, " + + "will be dropped, new addRequest to be sent") + bestHost = nil + } + } else { + jww.WARN.Printf("Failed to connect to bestHost %s with error %+v, will be dropped", bestHost.GetId(), err) + bestHost = nil + } + + } + + // Send the tested nodes back to be labeled as available again + select { + case hp.doneTesting <- queryList: + jww.DEBUG.Printf("[NodeTester] Completed testing query list %s", queryList) + default: + jww.ERROR.Printf("Failed to send queryList to main thread, " + + "nodes are stuck in testing, this should never happen") + bestHost = nil + } + + if bestHost == nil { + jww.WARN.Printf("No host selected, restarting the request process") + // If none of the hosts could be contacted, send a signal + // to add a new node to the pool + select { + case hp.addRequest <- nil: + default: + jww.WARN.Printf("Failed to send a signal to add hosts after " + + "testing failure") + } + } + + } + + } + +} diff --git a/cmix/gateway/params.go b/cmix/gateway/params.go new file mode 100644 index 0000000000000000000000000000000000000000..b5d4fed177791f56bd0cef11de74050a63b3ac40 --- /dev/null +++ b/cmix/gateway/params.go @@ -0,0 +1,120 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + "encoding/json" + "gitlab.com/xx_network/comms/connect" + "time" +) + +// Params allows configuration of HostPool parameters. +type Params struct { + // MaxPoolSize is the maximum number of Hosts in the HostPool. + MaxPoolSize uint32 + + // PoolSize allows override of HostPool size. Set to zero for dynamic size + // calculation. + PoolSize uint32 + + // ProxyAttempts dictates how many proxies will be used in event of send + // failure. + ProxyAttempts uint32 + + // MaxPings is the number of gateways to concurrently test when selecting + // a new member of HostPool. Must be at least 1. + MaxPings uint32 + + // NumConnectionsWorkers is the number of workers connecting to gateways + NumConnectionsWorkers int + + // MinBufferLength is the minimum length of input buffers + // to the hostpool runner + MinBufferLength uint32 + + // EnableRotation enables the system which auto rotates + // gateways regularly. this system will auto disable + // if the network size is less than 20 + EnableRotation bool + + // RotationPeriod is how long until a single + // host is rotated + RotationPeriod time.Duration + + // RotationPeriodVariability is the max that the rotation + // period can randomly deviate from the stated amount + RotationPeriodVariability time.Duration + + // HostParams is the parameters for the creation of new Host objects. + HostParams connect.HostParams +} + +// DefaultParams returns a default set of PoolParams. +func DefaultParams() Params { + p := Params{ + MaxPoolSize: MaxPoolSize, + ProxyAttempts: 5, + PoolSize: 0, + MaxPings: 5, + NumConnectionsWorkers: 5, + MinBufferLength: 100, + EnableRotation: true, + RotationPeriod: 7 * time.Minute, + RotationPeriodVariability: 4 * time.Minute, + + HostParams: GetDefaultHostPoolHostParams(), + } + + return p +} + +// DefaultPoolParams is a deprecated version of DefaultParams +// it does the same thing, just under a different function name +// Use DefaultParams. +func DefaultPoolParams() Params { + return DefaultParams() +} + +// GetParameters returns the default PoolParams, or +// override with given parameters, if set. +func GetParameters(params string) (Params, error) { + p := DefaultParams() + if len(params) > 0 { + err := json.Unmarshal([]byte(params), &p) + if err != nil { + return Params{}, err + } + } + return p, nil +} + +// MarshalJSON adheres to the json.Marshaler interface. +func (pp *Params) MarshalJSON() ([]byte, error) { + return json.Marshal(pp) +} + +// UnmarshalJSON adheres to the json.Unmarshaler interface. +func (pp *Params) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, pp) +} + +// GetDefaultHostPoolHostParams returns the default parameters used for +// hosts in the host pool +func GetDefaultHostPoolHostParams() connect.HostParams { + hp := connect.GetDefaultHostParams() + hp.MaxRetries = 1 + hp.MaxSendRetries = 1 + hp.AuthEnabled = false + hp.EnableCoolOff = false + hp.NumSendsBeforeCoolOff = 1 + hp.CoolOffTimeout = 5 * time.Minute + hp.SendTimeout = 1000 * time.Millisecond + hp.PingTimeout = 1000 * time.Millisecond + hp.DisableAutoConnect = true + return hp +} diff --git a/cmix/gateway/pool.go b/cmix/gateway/pool.go new file mode 100644 index 0000000000000000000000000000000000000000..72f4bc55b8dee0717f5a1e4ca36ca3862fda25e2 --- /dev/null +++ b/cmix/gateway/pool.go @@ -0,0 +1,319 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/crypto/randomness" + "gitlab.com/xx_network/primitives/id" + "io" + "strings" +) + +var errHostPoolNotReady = "Host pool is not ready, wait a " + + "little then try again. if this persists, you may have connectivity issues" + +func IsHostPoolNotReadyError(err error) bool { + return strings.Contains(err.Error(), errHostPoolNotReady) +} + +type Pool interface { + Get(id *id.ID) (*connect.Host, bool) + Has(id *id.ID) bool + Size() int + IsReady() error + GetAny(length uint32, excluded []*id.ID, rng io.Reader) []*connect.Host + GetSpecific(target *id.ID) (*connect.Host, bool) + GetPreferred(targets []*id.ID, rng io.Reader) []*connect.Host +} + +type pool struct { + hostMap map[id.ID]uint // Map key to its index in the slice + hostList []*connect.Host // Each index in the slice contains the value + isConnected func(host *connect.Host) bool +} + +// newPool creates a pool of size "size" +func newPool(size int) *pool { + return &pool{ + hostMap: make(map[id.ID]uint, size), + hostList: make([]*connect.Host, 0, size), + isConnected: func(host *connect.Host) bool { + c, _ := host.Connected() + return c + }, + } +} + +// Get returns a specific member of the host pool if it exists +func (p *pool) Get(id *id.ID) (*connect.Host, bool) { + h, exists := p.hostMap[*id] + return p.hostList[h], exists +} + +// Has returns true if the id is a member of the host pool +func (p *pool) Has(id *id.ID) bool { + _, exists := p.hostMap[*id] + return exists +} + +// Size returns the number of currently connected and usable members +// of the host pool +func (p *pool) Size() int { + size := 0 + for i := 0; i < len(p.hostList); i++ { + if p.isConnected(p.hostList[i]) { + size++ + } + } + return size +} + +// IsReady returns true if there is at least one connected member of the hostPool +func (p *pool) IsReady() error { + jww.TRACE.Printf("[IsReady] Length of Host List %d", len(p.hostList)) + for i := 0; i < len(p.hostList); i++ { + if p.isConnected(p.hostList[i]) { + return nil + } + } + return errors.New(errHostPoolNotReady) +} + +// GetAny returns up to n host pool members randomly, excluding any which are +// in the host pool list. if the number of returnable members is less than +// the number requested, is will return the smaller set +// will not return any disconnected hosts +func (p *pool) GetAny(length uint32, excluded []*id.ID, rng io.Reader) []*connect.Host { + + poolLen := uint32(len(p.hostList)) + if length > poolLen { + length = poolLen + } + + // Keep track of Hosts already selected to avoid duplicates + checked := make(map[uint32]interface{}, len(p.hostList)) + if excluded != nil { + // Add excluded Hosts to already-checked list + for i := range excluded { + gwId := excluded[i] + if idx, ok := p.hostMap[*gwId]; ok { + checked[uint32(idx)] = nil + } + } + } + + result := make([]*connect.Host, 0, length) + for i := uint32(0); i < length; { + // If we've checked the entire HostPool, bail + if uint32(len(checked)) >= poolLen { + break + } + + // Check the next HostPool index + gwIdx := randomness.ReadRangeUint32(0, poolLen, + rng) + + if _, ok := checked[gwIdx]; !ok { + h := p.hostList[gwIdx] + + checked[gwIdx] = nil + if !p.isConnected(h) { + continue + } + + result = append(result, p.hostList[gwIdx]) + i++ + } + } + + return result +} + +// GetSpecific obtains a specific connect.Host from the pool if it exists, +// it otherwise returns nil (and false on the bool) if it does not. +// It will not return the host if it is in the pool but disconnected +func (p *pool) GetSpecific(target *id.ID) (*connect.Host, bool) { + if idx, exists := p.hostMap[*target]; exists { + h := p.hostList[idx] + if !p.isConnected(h) { + return nil, false + } + return h, true + } + return nil, false +} + +// GetPreferred tries to obtain the given targets from the HostPool. If each is +// not present, then obtains a random replacement from the HostPool which will +// be proxied. +func (p *pool) GetPreferred(targets []*id.ID, rng io.Reader) []*connect.Host { + // Keep track of Hosts already selected to avoid duplicates + checked := make(map[id.ID]struct{}) + + //edge checks + numToReturn := len(targets) + poolLen := len(p.hostList) + if numToReturn > poolLen { + numToReturn = poolLen + } + result := make([]*connect.Host, 0, numToReturn) + + //check if any targets are in the pool + numSelected := 0 + for _, target := range targets { + if targeted, inPool := p.GetSpecific(target); inPool { + result = append(result, targeted) + checked[*target] = struct{}{} + numSelected++ + } + } + + //fill the rest of the list with random proxies until full + for numSelected < numToReturn && len(checked) < len(p.hostList) { + + gwIdx := randomness.ReadRangeUint32(0, uint32(len(p.hostList)), + rng) + selected := p.hostList[gwIdx] + //check if it is already in the list, if not Add it + gwID := selected.GetId() + if _, ok := checked[*gwID]; !ok { + checked[*gwID] = struct{}{} + if !p.isConnected(selected) { + continue + } + result = append(result, selected) + numSelected++ + } + } + + return result +} + +// addOrReplace adds the given host if the pool is not full, or replaces a +// random one if the pool is full. If a host was replaced, it returns it, so +// it can be cleaned up. +func (p *pool) addOrReplace(rng io.Reader, host *connect.Host) *connect.Host { + // if the pool is not full, append to the end + if len(p.hostList) < cap(p.hostList) { + jww.TRACE.Printf("[AddOrReplace] Adding host %s to host list", host.GetId()) + p.hostList = append(p.hostList, host) + p.hostMap[*host.GetId()] = uint(len(p.hostList) - 1) + return nil + } else { + jww.TRACE.Printf("[AddOrReplace] Internally replacing...") + selectedIndex := uint(randomness.ReadRangeUint32(0, uint32(len(p.hostList)), rng)) + return p.internalReplace(selectedIndex, host) + } +} + +// replaceSpecific will replace a specific gateway with the given ID with +// the given host. +func (p *pool) replaceSpecific(toReplace *id.ID, + host *connect.Host) (*connect.Host, error) { + selectedIndex, exists := p.hostMap[*toReplace] + if !exists { + return nil, errors.Errorf("Cannot replace %s, host does not "+ + "exist in pool", toReplace) + } + return p.internalReplace(selectedIndex, host), nil +} + +// internalReplace places the given host into the hostList and the hostMap. +// This will replace the data from the given index. +func (p *pool) internalReplace(selectedIndex uint, host *connect.Host) *connect.Host { + toRemove := p.hostList[selectedIndex] + p.hostList[selectedIndex] = host + delete(p.hostMap, *toRemove.GetId()) + p.hostMap[*host.GetId()] = selectedIndex + return toRemove +} + +// deepCopy returns a deep copy of the internal state of pool. +func (p *pool) deepCopy() *pool { + pCopy := &pool{ + hostMap: make(map[id.ID]uint, len(p.hostMap)), + hostList: make([]*connect.Host, len(p.hostList)), + isConnected: p.isConnected, + } + + copy(pCopy.hostList, p.hostList) + + for key, data := range p.hostMap { + pCopy.hostMap[key] = data + } + + return pCopy +} + +// selectNew will pull random nodes from the pool. +func (p *pool) selectNew(rng csprng.Source, allNodes map[id.ID]int, + currentlyAddingNodes map[id.ID]struct{}, numToSelect int) ([]*id.ID, + map[id.ID]struct{}, error) { + + newList := make(map[id.ID]interface{}) + + // Copy all nodes while removing nodes from the host list and + // from the processing list + for nid := range allNodes { + _, inPool := p.hostMap[nid] + _, inAdd := currentlyAddingNodes[nid] + if !(inPool || inAdd) { + newList[nid] = struct{}{} + } + } + + // Error out if no nodes are left + if len(newList) == 0 { + return nil, nil, errors.New("no nodes available for selection") + } + + if numToSelect > len(newList) { + // Return all nodes + selections := make([]*id.ID, 0, len(newList)) + for gwID := range newList { + localGwid := gwID.DeepCopy() + selections = append(selections, localGwid) + currentlyAddingNodes[*localGwid] = struct{}{} + jww.DEBUG.Printf("[SelectNew] Adding gwId %s to inProgress", localGwid) + } + return selections, currentlyAddingNodes, nil + } + + // Randomly select numToSelect indices + toSelectMap := make(map[uint]struct{}, numToSelect) + for i := 0; i < numToSelect; i++ { + newSelection := uint(randomness.ReadRangeUint32(0, uint32(len(newList)), rng)) + if _, exists := toSelectMap[newSelection]; exists { + i-- + continue + } + toSelectMap[newSelection] = struct{}{} + } + + // Use the random indices to choose gateways + selections := make([]*id.ID, 0, numToSelect) + // Select the new ones + index := uint(0) + for gwID := range newList { + localGwid := gwID.DeepCopy() + if _, exists := toSelectMap[index]; exists { + selections = append(selections, localGwid) + currentlyAddingNodes[*localGwid] = struct{}{} + if len(selections) == cap(selections) { + break + } + } + index++ + } + + return selections, currentlyAddingNodes, nil +} diff --git a/cmix/gateway/pool_test.go b/cmix/gateway/pool_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9a899bef69706796b5ac211cb123648eb3545c48 --- /dev/null +++ b/cmix/gateway/pool_test.go @@ -0,0 +1,216 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "math/rand" + "testing" +) + +// Unit test, happy paths of GetAny. +func TestHostPool_GetAny(t *testing.T) { + manager := newMockManager() + rng := rand.New(rand.NewSource(42)) + testNdf := getTestNdf(t) + params := DefaultParams() + params.MaxPoolSize = uint32(len(testNdf.Gateways)) + + // Call the constructor + testPool := newPool(5) + + // Pull all gateways from NDF into host manager + for _, gw := range testNdf.Gateways { + + gwId, err := id.Unmarshal(gw.ID) + if err != nil { + t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) + } + // Add mock gateway to manager + var h *connect.Host + h, err = manager.AddHost( + gwId, gw.Address, nil, connect.GetDefaultHostParams()) + if err != nil { + t.Fatalf("Could not Add mock host to manager: %+v", err) + } + + //Add to the host pool + testPool.addOrReplace(rng, h) + + } + + testPool.isConnected = func(host *connect.Host) bool { return true } + + requested := 3 + anyList := testPool.GetAny(uint32(requested), nil, rng) + if len(anyList) != requested { + t.Errorf("GetAnyList did not get requested length."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", requested, len(anyList)) + } + + for _, h := range anyList { + _, ok := manager.GetHost(h.GetId()) + if !ok { + t.Errorf("Host %s in retrieved list not in manager", h) + } + } + + // Request more than are in host list + largeRequest := uint32(requested * 1000) + largeRetrieved := testPool.GetAny(largeRequest, nil, rng) + if len(largeRetrieved) != len(testPool.hostList) { + t.Errorf("Large request should result in a list of all in host list") + } + + // request the whole host pool with a member exluced + excluded := []*id.ID{testPool.hostList[2].GetId()} + requestedExcluded := uint32(len(testPool.hostList)) + excludedRetrieved := testPool.GetAny(requestedExcluded, excluded, rng) + + if len(excludedRetrieved) != int(requestedExcluded-1) { + t.Errorf("One member should not have been returned due to being excluded") + } + + for i := 0; i < len(excludedRetrieved); i++ { + if excludedRetrieved[i].GetId().Cmp(excluded[0]) { + t.Errorf("index %d of the returned list includes the excluded id %s", i, excluded[0]) + } + } +} + +// Unit test, happy paths of GetAny. +func TestHostPool_GetSpecific(t *testing.T) { + manager := newMockManager() + rng := rand.New(rand.NewSource(42)) + testNdf := getTestNdf(t) + params := DefaultParams() + params.MaxPoolSize = uint32(len(testNdf.Gateways)) + + // Call the constructor + poolLen := 5 + testPool := newPool(poolLen) + + testPool.isConnected = func(host *connect.Host) bool { return true } + + // Pull all gateways from NDF into host manager + for i, gw := range testNdf.Gateways { + + gwId, err := id.Unmarshal(gw.ID) + if err != nil { + t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) + } + // Add mock gateway to manager + var h *connect.Host + h, err = manager.AddHost( + gwId, gw.Address, nil, connect.GetDefaultHostParams()) + if err != nil { + t.Fatalf("Could not Add mock host to manager: %+v", err) + } + + //Add to the host pool + if i < poolLen { + testPool.addOrReplace(rng, h) + } + } + + testPool.isConnected = func(host *connect.Host) bool { return true } + + //test get specific returns something in the host pool] + toGet := testPool.hostList[0].GetId() + h, exists := testPool.GetSpecific(toGet) + if !exists { + t.Errorf("Failed to get member of host pool that should "+ + "be there, got: %s", h) + } + if h == nil || !h.GetId().Cmp(toGet) { + t.Errorf("Wrong or invalid host returned") + } + + //test get specific returns nothing when the item is not in the host pool + toGet, _ = testNdf.Gateways[poolLen+1].GetGatewayId() + h, exists = testPool.GetSpecific(toGet) + if exists || h != nil { + t.Errorf("Got a member of host pool that should not be there") + } + +} + +// Full test +func TestHostPool_GetPreferred(t *testing.T) { + manager := newMockManager() + rng := rand.New(rand.NewSource(42)) + testNdf := getTestNdf(t) + params := DefaultParams() + params.PoolSize = uint32(len(testNdf.Gateways)) + + poolLen := 12 + testPool := newPool(poolLen) + + testPool.isConnected = func(host *connect.Host) bool { return true } + + // Pull all gateways from NDF into host manager + hostMap := make(map[id.ID]bool, 0) + targets := make([]*id.ID, 0) + for i, gw := range testNdf.Gateways { + + gwId, err := id.Unmarshal(gw.ID) + if err != nil { + t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) + } + // Add mock gateway to manager + h, err := manager.AddHost( + gwId, gw.Address, nil, connect.GetDefaultHostParams()) + if err != nil { + t.Fatalf("Could not Add mock host to manager: %+v", err) + } + + hostMap[*gwId] = true + targets = append(targets, gwId) + + //Add to the host pool + if i < poolLen { + testPool.addOrReplace(rng, h) + } + + } + + retrievedList := testPool.GetPreferred(targets, rng) + if len(retrievedList) != len(targets) { + t.Errorf("Requested list did not output requested length."+ + "\n\tExpected: %d"+ + "\n\tReceived: %v", len(targets), len(retrievedList)) + } + + // In case where all requested gateways are present + // ensure requested hosts were returned + for _, h := range retrievedList { + if !hostMap[*h.GetId()] { + t.Errorf("A target gateways which should have been returned was not."+ + "\n\tExpected: %v", h.GetId()) + } + } + + // Replace a request with a gateway not in pool + targets[3] = id.NewIdFromUInt(74, id.Gateway, t) + retrievedList = testPool.GetPreferred(targets, rng) + if len(retrievedList) != len(targets) { + t.Errorf("Requested list did not output requested length."+ + "\n\tExpected: %d"+ + "\n\tReceived: %v", len(targets), len(retrievedList)) + } + + // In case where a requested gateway is not present + for _, h := range retrievedList { + if h.GetId().Cmp(targets[3]) { + t.Errorf("Should not have returned ID not in pool") + } + } + +} diff --git a/cmix/gateway/rotation.go b/cmix/gateway/rotation.go new file mode 100644 index 0000000000000000000000000000000000000000..ada22d7d3b7beb0a39ae8eb92449659e69e931ae --- /dev/null +++ b/cmix/gateway/rotation.go @@ -0,0 +1,58 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/stoppable" + "gitlab.com/elixxir/crypto/hash" + "gitlab.com/xx_network/crypto/randomness" + "math/big" + "time" +) + +// Rotation is a long-running thread which will cause the runner thread +// to shuffle the hostPool. This will be done over a random interval, with +// the interval being randomly selected every Rotation. The Rotation +// is performed by sending a nil signal to hostPool.addRequest, which +// will force random updates to the hostPool. +func (hp *hostPool) Rotation(stop *stoppable.Single) { + for { + + delay := hp.params.RotationPeriod + if hp.params.RotationPeriodVariability != 0 { + stream := hp.rng.GetStream() + + seed := make([]byte, 32) + _, err := stream.Read(seed) + if err != nil { + jww.FATAL.Panicf("Failed to read (rng): %+v", err) + } + h, _ := hash.NewCMixHash() + r := randomness.RandInInterval(big.NewInt(int64(hp.params.RotationPeriodVariability)), seed, h).Int64() + r = r - (r / 2) + + delay = delay + time.Duration(r) + } + + t := time.NewTimer(delay) + + select { + case <-stop.Quit(): + stop.ToStopped() + t.Stop() + return + case <-t.C: + select { + case hp.addRequest <- nil: + default: + jww.WARN.Printf("Failed to send an Add request after %s delay", delay) + } + } + } +} diff --git a/cmix/gateway/runner.go b/cmix/gateway/runner.go new file mode 100644 index 0000000000000000000000000000000000000000..83fc5ec07f5ec92cf1eb0a625fae66e4017e3cc2 --- /dev/null +++ b/cmix/gateway/runner.go @@ -0,0 +1,300 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +package gateway + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/v4/stoppable" + "gitlab.com/elixxir/comms/network" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" + "time" +) + +// runner is the primary long-running thread for handling events. It will +// handle the following signals: +// - Requests to add hosts to the hostPool. +// - Requests to remove hosts from the hostPool. +// - Indications that a host has been tested (see nodeTester). +// - Indications that a new host that is ready to be added to the hostPool. +// - Indications that a new NDF has been received. +func (hp *hostPool) runner(stop *stoppable.Single) { + + inProgress := make(map[id.ID]struct{}) + toRemoveList := make(map[id.ID]interface{}, 2*cap(hp.writePool.hostList)) + online := newBucket(cap(hp.writePool.hostList)) + + for { + update := false + input: + select { + case <-stop.Quit(): + stop.ToStopped() + return + // Receives a request to add a node to the host pool if a + // specific node if is sent. It will send that node off + // to testing. If no specific node is sent (ie it receive nil), + // it will send a random one + case toAdd := <-hp.addRequest: + + var hostList []*connect.Host + hostList, inProgress = hp.processAddRequest(toAdd, inProgress) + if len(hostList) == 0 { + jww.ERROR.Printf("Host list for testing is empty, this " + + "error should never occur") + break input + } + + // Send the signal to the adding pool to add + select { + case hp.testNodes <- hostList: + default: + jww.ERROR.Printf("Failed to send add message") + } + // Handle requests to remove a node from the host pool + case toRemove := <-hp.removeRequest: + + // If the host is already slated to be removed, ignore + if _, exists := toRemoveList[*toRemove]; exists { + break input + } + + // Do not remove if it is not present in the pool + if !hp.writePool.Has(toRemove) { + jww.DEBUG.Printf("Skipping remove request for %s,"+ + " not in the host pool", toRemove) + break input + } + + // add to the leaky bucket detecting if we are offline + online.Add() + + // Add to the "to remove" list. This will replace that + // node on th next addition to the pool + toRemoveList[*toRemove] = struct{}{} + + // Send a signal back to this thread to add a node to the pool + go func() { + hp.addRequest <- nil + }() + + // Internal signal on reception of vetted node to add to pool + case newHost := <-hp.newHost: + // Verify the new host is still in the NDF, + // due to how testing is async, it can get removed + if _, exists := hp.ndfMap[*newHost.GetId()]; !exists { + jww.WARN.Printf("New vetted host (%s) is not in NDF, "+ + "this is theoretically possible but extremely unlikely. "+ + "If this is seen more than once, it is likely something is "+ + "wrong", newHost.GetId()) + // Send a signal back to this thread to add a node to the pool + go func() { + hp.addRequest <- nil + }() + break input + } + + // + online.Reset() + + // Replace a node slated for replacement if required + // pop to remove list + toRemove := pop(toRemoveList) + if toRemove != nil { + //if this fails, handle the new host without removing a node + if oldHost, err := hp.writePool.replaceSpecific(toRemove, newHost); err == nil { + update = true + if oldHost != nil { + go func() { + oldHost.Disconnect() + }() + } + } else { + jww.WARN.Printf("Failed to replace %s due to %s, skipping "+ + "addition to host pool", toRemove, err) + } + } else { + stream := hp.rng.GetStream() + hp.writePool.addOrReplace(stream, newHost) + stream.Close() + + update = true + } + // Tested gateways get passed back, so they can be + // removed from the list of gateways which are being + // tested + case tested := <-hp.doneTesting: + for _, h := range tested { + delete(inProgress, *h.GetId()) + jww.DEBUG.Printf("[Runner] Deleted %s from inProgress", h.GetId()) + } + // New NDF updates come in over this channel + case newNDF := <-hp.newNdf: + hp.ndf = newNDF.DeepCopy() + + // Process the new NDF map + newNDFMap := hp.processNdf(hp.ndf) + + // Remove all gateways which are not missing from the host pool + // that are in the host pool + for gwID := range hp.ndfMap { + if hp.writePool.Has(&gwID) { + hp.removeRequest <- gwID.DeepCopy() + } + } + + // Replace the ndfMap + hp.ndfMap = newNDFMap + + } + + // Handle updates by writing host pool into storage + if update == true { + poolCopy := hp.writePool.deepCopy() + hp.readPool.Store(poolCopy) + + saveList := make([]*id.ID, 0, len(poolCopy.hostList)) + for i := 0; i < len(poolCopy.hostList); i++ { + saveList = append(saveList, poolCopy.hostList[i].GetId()) + } + + err := saveHostList(hp.kv, saveList) + if err != nil { + jww.WARN.Printf("Host list could not be stored, updates will "+ + "not be available on load: %s", err) + } + } + + // Wait the delay until next iteration. + delay := online.GetDelay() + select { + case <-time.After(delay): + case <-stop.Quit(): + stop.ToStopped() + return + } + + } + +} + +// processAddRequest will return the host of the passed in node if it is +// specified (ie it is not nil). If it is nil, it will select random nodes +// for testing. +func (hp *hostPool) processAddRequest(toAdd *id.ID, + inProgress map[id.ID]struct{}) ([]*connect.Host, map[id.ID]struct{}) { + // Get the nodes to add + var toTest []*id.ID + + // Add the given ID if it is in the NDF + if toAdd != nil { + // Check if it is in the NDF + if _, exist := hp.ndfMap[*toAdd]; exist { + toTest = []*id.ID{toAdd} + } + } + + // If there are no nodes to add, randomly select some + if len(toTest) == 0 { + var err error + stream := hp.rng.GetStream() + toTest, inProgress, err = hp.writePool.selectNew(stream, hp.ndfMap, inProgress, + hp.numNodesToTest) + stream.Close() + if err != nil { + jww.DEBUG.Printf("[ProcessAndRequest] SelectNew returned error: %s", err) + jww.WARN.Printf("Failed to select any nodes to test for adding, " + + "skipping add. This error may be the result of being disconnected " + + "from the internet or very old network credentials") + return nil, inProgress + } + } + + // Get hosts for the selected nodes + hostList := make([]*connect.Host, 0, len(toTest)) + for i := 0; i < len(toTest); i++ { + gwID := toTest[i] + h, exists := hp.manager.GetHost(gwID) + if !exists { + jww.FATAL.Panicf("Gateway is not in host pool, this should" + + "be impossible") + } + hostList = append(hostList, h) + } + return hostList, inProgress +} + +// processNdf is a helper function which processes a new NDF, converting it to +// a map which maps the gateway's ID to the index it is in the NDF. This map is +// returned, and may be set as hostPool.ndfMap's new value. +func (hp *hostPool) processNdf(newNdf *ndf.NetworkDefinition) map[id.ID]int { + newNDFMap := make(map[id.ID]int, len(hp.ndf.Gateways)) + + // Make a list of all gateways + for i := 0; i < len(newNdf.Gateways); i++ { + gw := newNdf.Gateways[i] + + // Get the ID and bail if it cannot be retrieved + gwID, err := gw.GetGatewayId() + if err != nil { + jww.WARN.Printf("Skipped gateway %d: %x, "+ + "ID couldn't be unmarshalled, %+v", i, + newNdf.Gateways[i].ID, err) + continue + } + + // Skip adding if the node is not active + if newNdf.Nodes[i].Status != ndf.Active { + continue + } + + // Check if the ID exists, if it does not add its host + if _, exists := hp.manager.GetHost(gwID); !exists { + var gwAddr string + var cert []byte + gwAddr, cert, err = getConnectionInfo(gwID, gw.Address, gw.TlsCertificate) + if err == nil { + _, err = hp.manager.AddHost(gwID, gwAddr, + cert, hp.params.HostParams) + } + + if err != nil { + jww.WARN.Printf("Skipped gateway %d: %s, "+ + "host could not be added, %+v", i, + gwID, err) + continue + } + + hp.addChan <- network.NodeGateway{ + Node: newNdf.Nodes[i], + Gateway: gw, + } + + } + + // Add to the new map + newNDFMap[*gwID] = i + + // Delete from the old ndf map so we can track which gateways are + // missing + delete(hp.ndfMap, *gwID) + } + + return newNDFMap +} + +// pop selects an element from the map that tends to be an earlier insert, +// removes it, and returns it +func pop(m map[id.ID]interface{}) *id.ID { + for tr := range m { + delete(m, tr) + return &tr + } + return nil +} diff --git a/cmix/gateway/sender.go b/cmix/gateway/sender.go index b511151babdcd27c7ce19cd0a27578ffdb3f128f..e6c4fc3cd74d08a4ad901215f658be9b277cfba7 100644 --- a/cmix/gateway/sender.go +++ b/cmix/gateway/sender.go @@ -14,13 +14,14 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/v4/stoppable" "gitlab.com/elixxir/client/v4/storage" - "gitlab.com/elixxir/comms/network" + commNetwork "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/netTime" "strings" + "testing" "time" ) @@ -34,60 +35,79 @@ type Sender interface { UpdateNdf(ndf *ndf.NetworkDefinition) SetGatewayFilter(f Filter) GetHostParams() connect.HostParams + StartProcesses() stoppable.Stoppable } type sender struct { - *HostPool + *hostPool } const RetryableError = "Nonfatal error occurred, please retry" -// NewSender Create a new Sender object wrapping a HostPool object -func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, - ndf *ndf.NetworkDefinition, getter HostManager, storage storage.Session, - addGateway chan network.NodeGateway) (Sender, error) { +// NewSender creates a new Sender object wrapping a HostPool object +func NewSender(poolParams Params, rng *fastRNG.StreamGenerator, + ndf *ndf.NetworkDefinition, getter HostManager, + storage storage.Session, comms CertCheckerCommInterface, + addChan chan commNetwork.NodeGateway) (Sender, error) { - hostPool, err := newHostPool( - poolParams, rng, ndf, getter, storage, addGateway) + hp, err := newHostPool(poolParams, rng, ndf, + getter, storage, addChan, comms) if err != nil { return nil, err } - return &sender{hostPool}, nil + return &sender{hp}, nil } -// SendToAny call given sendFunc to any Host in the HostPool, attempting with up -// to numProxies destinations. +// NewSender creates a new Sender object wrapping a HostPool object +func NewTestingSender(poolParams Params, rng *fastRNG.StreamGenerator, + ndf *ndf.NetworkDefinition, getter HostManager, + storage storage.Session, addChan chan commNetwork.NodeGateway, + t *testing.T) (Sender, error) { + + if t == nil { + jww.FATAL.Panicf("can only be called in testing") + } + + hp, err := newTestingHostPool(poolParams, rng, ndf, + getter, storage, addChan, nil, t) + if err != nil { + return nil, err + } + + return &sender{hp}, nil +} + +// SendToAny will call the given send function to any connect.Host in the host pool. func (s *sender) SendToAny(sendFunc func(*connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) { - proxies := s.getAny(s.poolParams.ProxyAttempts, nil) + p := s.getPool() + + if err := p.IsReady(); err != nil { + return nil, errors.WithMessagef(err, "Failed to SendToAny") + } + + rng := s.rng.GetStream() + + proxies := p.GetAny(s.params.ProxyAttempts, nil, rng) + rng.Close() for proxy := range proxies { - result, err := sendFunc(proxies[proxy]) + proxyHost := proxies[proxy] + result, err := sendFunc(proxyHost) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToAny") } else if err == nil { return result, nil } else { - // Now we must check whether the Host should be replaced - replaced, checkReplaceErr := s.checkReplace( - proxies[proxy].GetId(), err) - if replaced { - jww.WARN.Printf("Unable to SendToAny, replaced a proxy %s "+ - "with error %s", proxies[proxy].GetId(), err.Error()) - } else { - if checkReplaceErr != nil { - jww.WARN.Printf("Unable to SendToAny via %s: %s. "+ - "Unable to replace host: %+v", - proxies[proxy].GetId(), err.Error(), checkReplaceErr) - } else { - jww.WARN.Printf("Unable to SendToAny via %s: %s. "+ - "Did not replace host.", - proxies[proxy].GetId(), err.Error()) - } + // send a signal to remove from the host pool if it is a not + // allowed error + if IsGuilty(err) { + s.Remove(proxyHost) } - // End for non-retryable errors + // If the send function denotes the error are recoverable, + // try another host if !strings.Contains(err.Error(), RetryableError) { return nil, errors.WithMessage(err, "Received error with SendToAny") @@ -102,16 +122,24 @@ func (s *sender) SendToAny(sendFunc func(*connect.Host) (interface{}, error), type SendToPreferredFunc func(host *connect.Host, target *id.ID, timeout time.Duration) (interface{}, error) -// SendToPreferred Call given sendFunc to any Host in the HostPool, attempting -// with up to numProxies destinations. Returns an error if the timeout is -// reached. +// SendToPreferred calls the given send function to any connect.Host in the +// host pool, attempting. Returns an error if the timeout is reached. func (s *sender) SendToPreferred(targets []*id.ID, sendFunc SendToPreferredFunc, stop *stoppable.Single, timeout time.Duration) (interface{}, error) { startTime := netTime.Now() + p := s.getPool() + + if err := p.IsReady(); err != nil { + return nil, errors.WithMessagef(err, "Failed to SendToPreferred") + } + + rng := s.rng.GetStream() + defer rng.Close() + // get the hosts and shuffle randomly - targetHosts := s.getPreferred(targets) + targetHosts := p.GetPreferred(targets, rng) // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { @@ -129,36 +157,27 @@ func (s *sender) SendToPreferred(targets []*id.ID, sendFunc SendToPreferredFunc, } else if err == nil { return result, nil } else { - // Now we must check whether the Host should be replaced - replaced, checkReplaceErr := s.checkReplace(targetHosts[i].GetId(), err) - if replaced { - jww.WARN.Printf("Unable to SendToPreferred first pass via %s, "+ - "replaced a proxy %s with error %s", - targets[i], targetHosts[i].GetId(), err.Error()) - } else { - if checkReplaceErr != nil { - jww.WARN.Printf("Unable to SendToPreferred first pass %s "+ - "via %s: %s. Unable to replace host: %+v", targets[i], - targetHosts[i].GetId(), err.Error(), checkReplaceErr) - } else { - jww.WARN.Printf("Unable to SendToPreferred first pass %s "+ - "via %s: %s. Did not replace host.", - targets[i], targetHosts[i].GetId(), err.Error()) - } + // send a signal to remove from the host pool if it is a not + // allowed error + if IsGuilty(err) { + s.Remove(targetHosts[i]) } - // End for non-retryable errors + // If the send function denotes the error are recoverable, + // try another host if !strings.Contains(err.Error(), RetryableError) { - return nil, errors.WithMessage( - err, "Received error with SendToPreferred") + return nil, + errors.WithMessage(err, "Received error with SendToAny") } } } + //re-get the pool in case it has had an update + p = s.getPool() // Build a list of proxies for every target proxies := make([][]*connect.Host, len(targets)) for i := 0; i < len(targets); i++ { - proxies[i] = s.getAny(s.poolParams.ProxyAttempts, targets) + proxies[i] = p.GetAny(s.params.ProxyAttempts, targets, rng) } // Build a map of bad proxies @@ -166,7 +185,7 @@ func (s *sender) SendToPreferred(targets []*id.ID, sendFunc SendToPreferredFunc, // Iterate between each target's list of proxies, using the next target for // each proxy - for proxyIdx := uint32(0); proxyIdx < s.poolParams.ProxyAttempts; proxyIdx++ { + for proxyIdx := uint32(0); proxyIdx < s.params.ProxyAttempts; proxyIdx++ { for targetIdx := range proxies { // Return an error if the timeout duration is reached if netTime.Since(startTime) > timeout { @@ -203,23 +222,17 @@ func (s *sender) SendToPreferred(targets []*id.ID, sendFunc SendToPreferredFunc, target, proxy, err) continue } else { - // Check whether the Host should be replaced - replaced, checkReplaceErr := s.checkReplace(proxy.GetId(), err) - badProxies[proxy.String()] = nil - if replaced { - jww.WARN.Printf("Unable to SendToPreferred second pass "+ - "via %s, replaced a proxy %s with error %s", - target, proxy.GetId(), err.Error()) - } else { - if checkReplaceErr != nil { - jww.WARN.Printf("Unable to SendToPreferred second "+ - "pass %s via %s: %s. Unable to replace host: %+v", - target, proxy.GetId(), err.Error(), checkReplaceErr) - } else { - jww.WARN.Printf("Unable to SendToPreferred second "+ - "pass %s via %s: %s. Did not replace host.", - target, proxy.GetId(), err.Error()) - } + // send a signal to remove from the host pool if it is a not + // allowed error + if IsGuilty(err) { + s.Remove(proxy) + } + + // If the send function denotes the error are recoverable, + // try another host + if !strings.Contains(err.Error(), RetryableError) { + return nil, + errors.WithMessage(err, "Received error with SendToAny") } // End for non-retryable errors diff --git a/cmix/gateway/sender_test.go b/cmix/gateway/sender_test.go index 5b4ad651c30d99cc348ce24a084d905935046daa..dcef08a2636ffb41f6e1d8991d0fbd1f143d7125 100644 --- a/cmix/gateway/sender_test.go +++ b/cmix/gateway/sender_test.go @@ -25,11 +25,11 @@ func TestNewSender(t *testing.T) { rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() + params := DefaultParams() params.MaxPoolSize = uint32(len(testNdf.Gateways)) - - _, err := NewSender(params, rng, testNdf, manager, testStorage, addGwChan) + addChan := make(chan network.NodeGateway, len(testNdf.Gateways)) + mccc := &mockCertCheckerComm{} + _, err := NewSender(params, rng, testNdf, manager, testStorage, mccc, addChan) if err != nil { t.Fatalf("Failed to create mock sender: %v", err) } @@ -41,50 +41,47 @@ func TestSender_SendToAny(t *testing.T) { rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() + params := DefaultParams() params.PoolSize = uint32(len(testNdf.Gateways)) - - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { - - gwId, err := id.Unmarshal(gw.ID) - if err != nil { - t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) - } - // Add mock gateway to manager - _, err = manager.AddHost( - gwId, gw.Address, nil, connect.GetDefaultHostParams()) - if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) - } - - } + addChan := make(chan network.NodeGateway, len(testNdf.Gateways)) + mccc := &mockCertCheckerComm{} senderFace, err := NewSender( - params, rng, testNdf, manager, testStorage, addGwChan) + params, rng, testNdf, manager, testStorage, mccc, addChan) s := senderFace.(*sender) if err != nil { t.Fatalf("Failed to create mock sender: %v", err) } - // Add all gateways to hostPool's map - for index, gw := range testNdf.Gateways { + stream := rng.GetStream() + defer stream.Close() + + // Put 3 gateways into the pool + for i := 0; i < cap(s.writePool.hostList); i++ { + gw := testNdf.Gateways[i] gwId, err := id.Unmarshal(gw.ID) if err != nil { t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) } - - err = s.replaceHost(gwId, uint32(index)) + // Add mock gateway to manager + gwHost, err := manager.AddHost( + gwId, gw.Address, nil, connect.GetDefaultHostParams()) if err != nil { - t.Fatalf("Failed to replace host in set-up: %+v", err) + t.Fatalf("Could not Add mock host to manager: %+v", err) } + + s.writePool.addOrReplace(stream, gwHost) } + s.writePool.isConnected = func(host *connect.Host) bool { return true } + + //update the read pool + s.readPool.Store(s.writePool) + // Test sendToAny with test interfaces result, err := s.SendToAny(SendToAnyHappyPath, nil) if err != nil { - t.Errorf("Should not error in SendToAny happy path: %v", err) + t.Errorf("Should not error in SendToAny happy path: %+v", err) } if !reflect.DeepEqual(result, happyPathReturn) { @@ -111,38 +108,50 @@ func TestSender_SendToPreferred(t *testing.T) { rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) testNdf := getTestNdf(t) testStorage := storage.InitTestingSession(t) - addGwChan := make(chan network.NodeGateway) - params := DefaultPoolParams() + params := DefaultParams() params.PoolSize = uint32(len(testNdf.Gateways)) - 5 // Do not test proxy attempts code in this test // (self contain to code specific in sendPreferred) params.ProxyAttempts = 0 + mccc := &mockCertCheckerComm{} + addChan := make(chan network.NodeGateway, len(testNdf.Gateways)) + sFace, err := NewSender(params, rng, testNdf, manager, testStorage, mccc, addChan) + if err != nil { + t.Fatalf("Failed to create mock sender: %v", err) + } + s := sFace.(*sender) + + stream := rng.GetStream() + defer stream.Close() - // Pull all gateways from NDF into host manager - for _, gw := range testNdf.Gateways { + var preferredHost *connect.Host + // Put 3 gateways into the pool + for i := 0; i < cap(s.writePool.hostList); i++ { + gw := testNdf.Gateways[i] gwId, err := id.Unmarshal(gw.ID) if err != nil { t.Fatalf("Failed to unmarshal ID in mock NDF: %+v", err) } // Add mock gateway to manager - _, err = manager.AddHost( + gwHost, err := manager.AddHost( gwId, gw.Address, nil, connect.GetDefaultHostParams()) if err != nil { - t.Fatalf("Could not add mock host to manager: %+v", err) + t.Fatalf("Could not Add mock host to manager: %+v", err) } - } + s.writePool.addOrReplace(stream, gwHost) - sFace, err := NewSender(params, rng, testNdf, manager, testStorage, addGwChan) - if err != nil { - t.Fatalf("Failed to create mock sender: %v", err) + if i == 1 { + preferredHost = gwHost + } } - s := sFace.(*sender) - preferredIndex := 0 - preferredHost := s.hostList[preferredIndex] + s.writePool.isConnected = func(host *connect.Host) bool { return true } + + //update the read pool + s.readPool.Store(s.writePool) // Happy path result, err := s.SendToPreferred([]*id.ID{preferredHost.GetId()}, @@ -164,20 +173,21 @@ func TestSender_SendToPreferred(t *testing.T) { t.Fatalf("Expected error path did not receive error") } - // Check the host has been replaced - if _, ok := s.hostMap[*preferredHost.GetId()]; ok { - t.Errorf("Expected host %s to be removed due to error", preferredHost) - } - - // Ensure we are disconnected from the old host - if isConnected, _ := preferredHost.Connected(); isConnected { - t.Errorf("ForceReplace error: Failed to disconnect from old host %s", + // Check the host removal signal has been sent + select { + case removeSignal := <-s.removeRequest: + if !removeSignal.Cmp(preferredHost.GetId()) { + t.Errorf("Expected host %s to be removed due "+ + "to error, instead %s removed", preferredHost, removeSignal) + } + default: + t.Errorf("Expected host %s error to trigger a removal signal", preferredHost) } // get a new host to test on - preferredIndex = 4 - preferredHost = s.hostList[preferredIndex] + preferredIndex := 4 + preferredHost = s.writePool.hostList[preferredIndex] // Unknown error return will not trigger replacement _, err = s.SendToPreferred([]*id.ID{preferredHost.GetId()}, @@ -187,7 +197,7 @@ func TestSender_SendToPreferred(t *testing.T) { } // Check the host has not been replaced - if _, ok := s.hostMap[*preferredHost.GetId()]; !ok { + if _, ok := s.writePool.hostMap[*preferredHost.GetId()]; !ok { t.Errorf("Host %s should not have been removed due on an unknown error", preferredHost) } diff --git a/cmix/gateway/storeHostList.go b/cmix/gateway/storeHostList.go index 4b689ddd9f7dd4d3e5d06d4493c480ef096ed900..3d0c69f2f4af4a768b3d13f62e88c5ff5c2c530c 100644 --- a/cmix/gateway/storeHostList.go +++ b/cmix/gateway/storeHostList.go @@ -77,7 +77,7 @@ func unmarshalHostList(data []byte) ([]*id.ID, error) { buff := bytes.NewBuffer(data) list := make([]*id.ID, 0, len(data)/id.ArrIDLen) - // Read each ID from data, unmarshal, and add to list + // Read each ID from data, unmarshal, and Add to list length := id.ArrIDLen for n := buff.Next(length); len(n) == length; n = buff.Next(length) { hid, err := id.Unmarshal(n) diff --git a/cmix/nodes/certChecker_test.go b/cmix/nodes/certChecker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..92f5a14aba855e6796f908369eca1e075640fe97 --- /dev/null +++ b/cmix/nodes/certChecker_test.go @@ -0,0 +1,14 @@ +package nodes + +import ( + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" +) + +type mockCertCheckerComm struct { +} + +func (mccc *mockCertCheckerComm) GetGatewayTLSCertificate(host *connect.Host, + message *pb.RequestGatewayCert) (*pb.GatewayCertificate, error) { + return &pb.GatewayCertificate{}, nil +} diff --git a/cmix/nodes/register.go b/cmix/nodes/register.go index daad4c163d3be77890f735d49134b64b78cb5156..f172e316ea637042915bbe38deb568b718e0ea4c 100644 --- a/cmix/nodes/register.go +++ b/cmix/nodes/register.go @@ -35,7 +35,6 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, inProgress, attempts *sync.Map, index int) { atomic.AddInt64(r.numberRunning, 1) - for { select { case <-r.pauser: @@ -79,15 +78,6 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, continue } - // Keep track of how many times registering with this node - // has been attempted - numAttempts := uint(1) - if nunAttemptsInterface, hasValue := attempts.LoadOrStore( - nidStr, numAttempts); hasValue { - numAttempts = nunAttemptsInterface.(uint) - attempts.Store(nidStr, numAttempts+1) - } - // No need to register with stale nodes if isStale := gw.Node.Status == ndf.Stale; isStale { jww.DEBUG.Printf( @@ -104,16 +94,45 @@ func registerNodes(r *registrar, s session, stop *stoppable.Single, // Process the result if err != nil { - jww.ERROR.Printf("Failed to register node: %s", err.Error()) - // If we have not reached the attempt limit for this gateway, - // then send it back into the channel to retry - if numAttempts < maxAttempts { + if gateway.IsHostPoolNotReadyError(err) { + jww.WARN.Printf("Failed to register node due to non ready host "+ + "pool: %s", err.Error()) + + // retry registering without counting it against the node go func() { - // Delay the send operation for a backoff - time.Sleep(delayTable[numAttempts-1]) r.c <- gw }() + + //wait 5 seconds to give the host pool a chance to resolve + select { + case <-time.NewTimer(5 * time.Second).C: + case <-stop.Quit(): + stop.ToStopped() + return + } + } else { + jww.ERROR.Printf("Failed to register node: %s", err.Error()) + + // Keep track of how many times registering with this node + // has been attempted + numAttempts := uint(1) + if nunAttemptsInterface, hasValue := attempts.LoadOrStore( + nidStr, numAttempts); hasValue { + numAttempts = nunAttemptsInterface.(uint) + attempts.Store(nidStr, numAttempts+1) + } + + // If we have not reached the attempt limit for this gateway, + // then send it back into the channel to retry + if numAttempts < maxAttempts { + go func() { + // Delay the send operation for a backoff + time.Sleep(delayTable[numAttempts-1]) + r.c <- gw + }() + } } + } rng.Close() } diff --git a/cmix/nodes/registrar_test.go b/cmix/nodes/registrar_test.go index 9659428d648e0d67697192a17f46ab377dc4fbfb..942cba507e335b63f136aa157b6079856a328a6a 100644 --- a/cmix/nodes/registrar_test.go +++ b/cmix/nodes/registrar_test.go @@ -30,8 +30,11 @@ func TestLoadRegistrar_New(t *testing.T) { rngGen := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 + addChan := make(chan commNetwork.NodeGateway, 1) + mccc := &mockCertCheckerComm{} + sender, err := gateway.NewSender(gateway.DefaultPoolParams(), rngGen, - getNDF(), newMockManager(), session, nil) + getNDF(), newMockManager(), session, mccc, addChan) if err != nil { t.Fatalf("Failed to create new sender: %+v", err) } diff --git a/cmix/nodes/utils_test.go b/cmix/nodes/utils_test.go index cf0a6488406cc7a918aea580432adaf1dc4f126d..68fb6ccde10ad75ac6a2b2ea4e52fe0a6d5b3122 100644 --- a/cmix/nodes/utils_test.go +++ b/cmix/nodes/utils_test.go @@ -92,8 +92,10 @@ func makeTestRegistrar(mockComms *MockClientComms, t *testing.T) *registrar { rngGen := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 + addChan := make(chan commNetwork.NodeGateway, 1) + mccc := &mockCertCheckerComm{} sender, err := gateway.NewSender(gateway.DefaultPoolParams(), rngGen, - getNDF(), newMockManager(), session, nil) + getNDF(), newMockManager(), session, mccc, addChan) if err != nil { t.Fatalf("Failed to create new sender: %+v", err) } @@ -115,6 +117,11 @@ func makeTestRegistrar(mockComms *MockClientComms, t *testing.T) *registrar { type mockSender struct{} +func (m mockSender) StartProcesses() stoppable.Stoppable { + //TODO implement me + panic("implement me") +} + func (m mockSender) SendToAny( sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) { diff --git a/cmix/pickup/certChecker_test.go b/cmix/pickup/certChecker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..144f3e0e3c8b03cb123d6c8386e8495043f39934 --- /dev/null +++ b/cmix/pickup/certChecker_test.go @@ -0,0 +1,14 @@ +package pickup + +import ( + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" +) + +type mockCertCheckerComm struct { +} + +func (mccc *mockCertCheckerComm) GetGatewayTLSCertificate(host *connect.Host, + message *pb.RequestGatewayCert) (*pb.GatewayCertificate, error) { + return &pb.GatewayCertificate{}, nil +} diff --git a/cmix/pickup/retrieve_test.go b/cmix/pickup/retrieve_test.go index 04f92627fd39dd3db78a07e81ac065ce1fbc857b..e009bf770e7519382b29f4a3f938d786e0258e1a 100644 --- a/cmix/pickup/retrieve_test.go +++ b/cmix/pickup/retrieve_test.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/v4/cmix/message" "gitlab.com/elixxir/client/v4/cmix/rounds" "gitlab.com/elixxir/client/v4/stoppable" + "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/csprng" @@ -43,8 +44,9 @@ func Test_manager_processMessageRetrieval(t *testing.T) { p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 var err error - testManager.sender, err = gateway.NewSender(p, testManager.rng, - testNdf, mockComms, testManager.session, nil) + addChan := make(chan network.NodeGateway, 1) + testManager.sender, err = gateway.NewTestingSender(p, testManager.rng, + testNdf, mockComms, testManager.session, addChan, t) if err != nil { t.Errorf(err.Error()) } @@ -133,10 +135,12 @@ func Test_manager_processMessageRetrieval_NoRound(t *testing.T) { gwId.SetType(id.Gateway) testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} testManager.rng = fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) + addChan := make(chan network.NodeGateway, 1) + mccc := &mockCertCheckerComm{} testManager.sender, _ = gateway.NewSender(p, testManager.rng, - testNdf, mockComms, testManager.session, nil) + testNdf, mockComms, testManager.session, mccc, addChan) stop := stoppable.NewSingle("singleStoppable") // Create a local channel so reception is possible @@ -210,9 +214,12 @@ func Test_manager_processMessageRetrieval_FalsePositive(t *testing.T) { p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 + addChan := make(chan network.NodeGateway, 1) + mccc := &mockCertCheckerComm{} + testManager.sender, _ = gateway.NewSender(p, testManager.rng, - testNdf, mockComms, testManager.session, nil) + testNdf, mockComms, testManager.session, mccc, addChan) // Create a local channel so reception is possible // (testManager.messageBundles is sent only via newManager call above) @@ -354,8 +361,10 @@ func Test_manager_processMessageRetrieval_MultipleGateways(t *testing.T) { p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 - testManager.sender, _ = gateway.NewSender( - p, testManager.rng, testNdf, mockComms, testManager.session, nil) + addChan := make(chan network.NodeGateway, 1) + + testManager.sender, _ = gateway.NewTestingSender( + p, testManager.rng, testNdf, mockComms, testManager.session, addChan, t) // Create a local channel so reception is possible // (testManager.messageBundles is sent only via newManager call above) diff --git a/cmix/pickup/unchecked_test.go b/cmix/pickup/unchecked_test.go index 2650bee31f61756c6f6d225f613f1456ab37ace5..a897836dd8ed01655bed347cdda1f72503d6f98f 100644 --- a/cmix/pickup/unchecked_test.go +++ b/cmix/pickup/unchecked_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/elixxir/client/v4/cmix/message" "gitlab.com/elixxir/client/v4/stoppable" pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/csprng" @@ -39,8 +40,9 @@ func TestUncheckedRoundScheduler(t *testing.T) { p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 rngGen := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) - testManager.sender, _ = gateway.NewSender( - p, rngGen, testNdf, mockComms, testManager.session, nil) + addChan := make(chan network.NodeGateway, 1) + testManager.sender, _ = gateway.NewTestingSender( + p, rngGen, testNdf, mockComms, testManager.session, addChan, t) // Create a local channel so reception is possible // (testManager.messageBundles is sent only via newManager call above) diff --git a/cmix/rounds/historical_test.go b/cmix/rounds/historical_test.go index 3926f6e734881bda2901b005933bdf2876a7178b..0568cedbe862a355ffa50bb12fa07a77f0fdda8a 100644 --- a/cmix/rounds/historical_test.go +++ b/cmix/rounds/historical_test.go @@ -152,6 +152,11 @@ type testGWSender struct { sync.RWMutex } +func (t *testGWSender) StartProcesses() stoppable.Stoppable { + //TODO implement me + panic("implement me") +} + func (t *testGWSender) getSendCnt() int { t.RLock() defer t.RUnlock() diff --git a/go.mod b/go.mod index 2dc195e41ac28df4ff077a7bba3fad3955728c69..c7eea03933739948b7698df558373ab8515ec97f 100644 --- a/go.mod +++ b/go.mod @@ -15,11 +15,11 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f - gitlab.com/elixxir/comms v0.0.4-0.20230104190642-585e834d2a73 - gitlab.com/elixxir/crypto v0.0.7-0.20230104191842-814e756a51f2 + gitlab.com/elixxir/comms v0.0.4-0.20230109184457-e10f20295430 + gitlab.com/elixxir/crypto v0.0.7-0.20230109182503-bd51c95bdcb3 gitlab.com/elixxir/ekv v0.2.1 gitlab.com/elixxir/primitives v0.0.3-0.20221214192222-988b44a6958a - gitlab.com/xx_network/comms v0.0.4-0.20221229210111-3cf0b9df563d + gitlab.com/xx_network/comms v0.0.4-0.20230109184153-4cb43814fa1d gitlab.com/xx_network/crypto v0.0.5-0.20221121220724-8eefdbb0eb46 gitlab.com/xx_network/primitives v0.0.4-0.20221219230308-4b5550a9247d go.uber.org/ratelimit v0.2.0 diff --git a/go.sum b/go.sum index ab6b2231b0630e8b8cd39b1998de3933a9884543..2572110c7e64d3bb20e182d60d4ef83d673c50c7 100644 --- a/go.sum +++ b/go.sum @@ -493,16 +493,16 @@ github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo= github.com/zeebo/pcg v1.0.1/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f h1:yXGvNBqzZwAhDYlSnxPRbgor6JWoOt1Z7s3z1O9JR40= gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= -gitlab.com/elixxir/comms v0.0.4-0.20230104190642-585e834d2a73 h1:gRQTICBcP9Dr6DTZo1bw0S5Oji/eXJw6fImTtvg3Brg= -gitlab.com/elixxir/comms v0.0.4-0.20230104190642-585e834d2a73/go.mod h1:aFnxDpIxEEFHdAa2dEeydzo00u/IAcfrqPSEnmeffbY= -gitlab.com/elixxir/crypto v0.0.7-0.20230104191842-814e756a51f2 h1:/+uUXuy1HcAUmGsd5z9aUtfwrJtSz3mXmCTJ6ku/dKU= -gitlab.com/elixxir/crypto v0.0.7-0.20230104191842-814e756a51f2/go.mod h1:7whUm4bnEdEoiVfMnu3TbHgvlrz0Ywp/Tekqg2Wl7vw= +gitlab.com/elixxir/comms v0.0.4-0.20230109184457-e10f20295430 h1:OydFdoBbLz5iFzCiYEb+m8Q2pZjdVVCne4m+MyFAzUc= +gitlab.com/elixxir/comms v0.0.4-0.20230109184457-e10f20295430/go.mod h1:aFnxDpIxEEFHdAa2dEeydzo00u/IAcfrqPSEnmeffbY= +gitlab.com/elixxir/crypto v0.0.7-0.20230109182503-bd51c95bdcb3 h1:5au07K9R4K4RMRTGqdB+EB1hzAezcXuxxblZx17geDc= +gitlab.com/elixxir/crypto v0.0.7-0.20230109182503-bd51c95bdcb3/go.mod h1:7whUm4bnEdEoiVfMnu3TbHgvlrz0Ywp/Tekqg2Wl7vw= gitlab.com/elixxir/ekv v0.2.1 h1:dtwbt6KmAXG2Tik5d60iDz2fLhoFBgWwST03p7T+9Is= gitlab.com/elixxir/ekv v0.2.1/go.mod h1:USLD7xeDnuZEavygdrgzNEwZXeLQJK/w1a+htpN+JEU= gitlab.com/elixxir/primitives v0.0.3-0.20221214192222-988b44a6958a h1:F17FfEjS+/uDI/TTYQD21S5JvNZ9+p9bieau2nyLCzo= gitlab.com/elixxir/primitives v0.0.3-0.20221214192222-988b44a6958a/go.mod h1:DUnCTXYKgjpro5+6ITySKIf+qzW2vhW40IVHMimdsqw= -gitlab.com/xx_network/comms v0.0.4-0.20221229210111-3cf0b9df563d h1:VbkvQcSt4XUS3i+9p9V9RefH58rbLupbIYJZQgy36Y8= -gitlab.com/xx_network/comms v0.0.4-0.20221229210111-3cf0b9df563d/go.mod h1:3bLNCD4r/PFvx9dirhIE9e4UqzTwwJxp7xgo2853MIc= +gitlab.com/xx_network/comms v0.0.4-0.20230109184153-4cb43814fa1d h1:HPLHNw1sX/eqbchxupz9/rZkLxGEKAW3ZIWqHIzDFjw= +gitlab.com/xx_network/comms v0.0.4-0.20230109184153-4cb43814fa1d/go.mod h1:3bLNCD4r/PFvx9dirhIE9e4UqzTwwJxp7xgo2853MIc= gitlab.com/xx_network/crypto v0.0.5-0.20221121220724-8eefdbb0eb46 h1:6AHgUpWdJ72RVTTdJSvfThZiYTQNUnrPaTCl/EkRLpg= gitlab.com/xx_network/crypto v0.0.5-0.20221121220724-8eefdbb0eb46/go.mod h1:acWUBKCpae/XVaQF7J9RnLAlBT13i5r7gnON+mrIxBk= gitlab.com/xx_network/primitives v0.0.4-0.20221219230308-4b5550a9247d h1:D9hEtiQ7xj0yFBkDkb4X4S95RfNoeXxtB1eE4UuFHtk= diff --git a/xxdk/certChecker_test.go b/xxdk/certChecker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f601b93a4da4c1a3feebeb24586756755d1b3d77 --- /dev/null +++ b/xxdk/certChecker_test.go @@ -0,0 +1,14 @@ +package xxdk + +import ( + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" +) + +type mockCertCheckerComm struct { +} + +func (mccc *mockCertCheckerComm) GetGatewayTLSCertificate(host *connect.Host, + message *pb.RequestGatewayCert) (*pb.GatewayCertificate, error) { + return &pb.GatewayCertificate{}, nil +} diff --git a/xxdk/utils_test.go b/xxdk/utils_test.go index 31988d86c42cfcadedc85976fcd5fd653c94276b..92421f2ed0cdfa47828d1f9b1c96cc1e725b6a27 100644 --- a/xxdk/utils_test.go +++ b/xxdk/utils_test.go @@ -75,8 +75,10 @@ func newTestingClient(face interface{}) (*Cmix, error) { p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 + addChan := make(chan network.NodeGateway, 1) + mccc := &mockCertCheckerComm{} sender, err := gateway.NewSender(p, c.GetRng(), def, commsManager, - c.storage, nil) + c.storage, mccc, addChan) if err != nil { return nil, err } diff --git a/xxdk/version_vars.go b/xxdk/version_vars.go index 181941e863826463a83af6c28767b7abfba43c32..e4e186e08e9d131d10d29ec57d9376211754da96 100644 --- a/xxdk/version_vars.go +++ b/xxdk/version_vars.go @@ -1,11 +1,11 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2023-01-04 14:29:07.79478227 -0800 PST m=+0.290821809 +// 2023-01-09 12:47:37.384575 -0600 CST m=+0.087037408 package xxdk -const GITVERSION = `6774fe71 Add DmToken to ModelMessage` -const SEMVER = "4.4.2" +const GITVERSION = `d86dbd0b Merge branch 'release' into 'NewHostPool'` +const SEMVER = "4.4.3" const DEPENDENCIES = `module gitlab.com/elixxir/client/v4 go 1.19 @@ -23,11 +23,11 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f - gitlab.com/elixxir/comms v0.0.4-0.20230104190642-585e834d2a73 - gitlab.com/elixxir/crypto v0.0.7-0.20230104191842-814e756a51f2 + gitlab.com/elixxir/comms v0.0.4-0.20230109184457-e10f20295430 + gitlab.com/elixxir/crypto v0.0.7-0.20230109182503-bd51c95bdcb3 gitlab.com/elixxir/ekv v0.2.1 gitlab.com/elixxir/primitives v0.0.3-0.20221214192222-988b44a6958a - gitlab.com/xx_network/comms v0.0.4-0.20221229210111-3cf0b9df563d + gitlab.com/xx_network/comms v0.0.4-0.20230109184153-4cb43814fa1d gitlab.com/xx_network/crypto v0.0.5-0.20221121220724-8eefdbb0eb46 gitlab.com/xx_network/primitives v0.0.4-0.20221219230308-4b5550a9247d go.uber.org/ratelimit v0.2.0