diff --git a/network/gateway/sender.go b/network/gateway/sender.go index 2ae6020fee6be515d612866a949e5a00887594c5..83093dc7d86997ea5250ce49d64d060d0d37c526 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -37,12 +37,12 @@ func NewSender(poolParams PoolParams, rng io.Reader, ndf *ndf.NetworkDefinition, // Call given sendFunc to a specific Host in the HostPool, // attempting with up to numProxies destinations in case of failure func (m *Sender) SendToSpecific(targets []*id.ID, numProxies int, - sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) { + sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { for _, target := range targets { host, ok := m.GetSpecific(target) if ok { - result, err := sendFunc(host) + result, err := sendFunc(host, target) if err == nil { return result, m.ForceAdd([]*id.ID{host.GetId()}) } @@ -68,12 +68,12 @@ func (m *Sender) SendToAny(numProxies int, } // Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations -func (m *Sender) SendToPreferred(targets []*id.ID, - sendFunc func(host *connect.Host) (interface{}, error)) (interface{}, error) { +func (m *Sender) SendToPreferred(targets []*id.ID, numProxies int, + sendFunc func(host *connect.Host, target *id.ID) (interface{}, error)) (interface{}, error) { targetHosts := m.GetPreferred(targets) - for _, host := range targetHosts { - result, err := sendFunc(host) + for i, host := range targetHosts { + result, err := sendFunc(host, targets[i]) if err == nil { return result, nil } diff --git a/network/manager.go b/network/manager.go index 68179fbee4024339697a84816bfacc30ba786977..e8e75d149a015dda43fb7c17f20e8245c81d8c7f 100644 --- a/network/manager.go +++ b/network/manager.go @@ -136,7 +136,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab multi.Add(healthStop) // Node Updates - multi.Add(node.StartRegistration(m.Instance, m.Session, m.Rng, + multi.Add(node.StartRegistration(m.GetSender(), m.Session, m.Rng, m.Comms, m.NodeRegistration, m.param.ParallelNodeRegistrations)) // Adding/Keys //TODO-remover //m.runners.Add(StartNodeRemover(m.Context)) // Removing diff --git a/network/message/critical.go b/network/message/critical.go index 0ebaed5d32390c35f92a6a96557782be6336ff1e..1dad92c821e756e3c89c6d013c5d78c9e09a1015 100644 --- a/network/message/critical.go +++ b/network/message/critical.go @@ -95,7 +95,7 @@ func (m *Manager) criticalMessages() { jww.INFO.Printf("Resending critical raw message to %s "+ "(msgDigest: %s)", rid, msg.Digest()) //send the message - round, _, err := m.SendCMIX(msg, rid, param) + round, _, err := m.SendCMIX(m.sender, msg, rid, param) //if the message fail to send, notify the buffer so it can be handled //in the future and exit if err != nil { diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 1aced6efe44eaef52f17a08392d4c52a8d6eae7c..2bab96dddb7087e165b8d3c257b1c9c968981a75 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/network" @@ -29,7 +30,6 @@ import ( // interface for SendCMIX comms; allows mocking this in testing type sendCmixCommsInterface interface { - GetHost(hostId *id.ID) (*connect.Host, bool) SendPutMessage(host *connect.Host, message *pb.GatewaySlot) (*pb.GatewaySlotResponse, error) } @@ -38,9 +38,9 @@ const sendTimeBuffer = 2500 * time.Millisecond // WARNING: Potentially Unsafe // Public manager function to send a message over CMIX -func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { +func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { msgCopy := msg.Copy() - return sendCmixHelper(msgCopy, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) + return sendCmixHelper(sender, msgCopy, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) } // Payloads send are not End to End encrypted, MetaData is NOT protected with @@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM // If the message is successfully sent, the id of the round sent it is returned, // which can be registered with the network instance to get a callback on // its status -func sendCmixHelper(msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, +func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID, comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) { @@ -142,15 +142,6 @@ func sendCmixHelper(msg format.Message, recipient *id.ID, param params.CMIX, ins firstGateway := topology.GetNodeAtIndex(0).DeepCopy() firstGateway.SetType(id.Gateway) - transmitGateway, ok := comms.GetHost(firstGateway) - if !ok { - jww.ERROR.Printf("Failed to get host for gateway %s when "+ - "sending to %s (msgDigest: %s)", transmitGateway, recipient, - msg.Digest()) - time.Sleep(param.RetryDelay) - continue - } - //encrypt the message stream = rng.GetStream() salt := make([]byte, 32) @@ -187,9 +178,15 @@ func sendCmixHelper(msg format.Message, recipient *id.ID, param params.CMIX, ins jww.INFO.Printf("Sending to EphID %d (%s) on round %d, "+ "(msgDigest: %s, ecrMsgDigest: %s) via gateway %s", ephID.Int64(), recipient, bestRound.ID, msg.Digest(), - encMsg.Digest(), transmitGateway.GetId()) - // //Send the payload - gwSlotResp, err := comms.SendPutMessage(transmitGateway, wrappedMsg) + encMsg.Digest(), firstGateway.String()) + + // Send the payload + result, err := sender.SendToSpecific([]*id.ID{firstGateway}, 3, func(host *connect.Host, target *id.ID) (interface{}, error) { + wrappedMsg.Target = target.Marshal() + return comms.SendPutMessage(host, wrappedMsg) + }) + gwSlotResp := result.(*pb.GatewaySlotResponse) + //if the comm errors or the message fails to send, continue retrying. //return if it sends properly if err != nil { @@ -204,10 +201,10 @@ func sendCmixHelper(msg format.Message, recipient *id.ID, param params.CMIX, ins "with this node?") { jww.WARN.Printf("Failed to send to %s (msgDigest: %s) "+ "via %s due to failed authentication: %s", - recipient, msg.Digest(), transmitGateway.GetId(), err) + recipient, msg.Digest(), firstGateway.String(), err) //if we failed to send due to the gateway not recognizing our // authorization, renegotiate with the node to refresh it - nodeID := transmitGateway.GetId().DeepCopy() + nodeID := firstGateway.DeepCopy() nodeID.SetType(id.Node) //delete the keys session.Cmix().Remove(nodeID) @@ -226,7 +223,7 @@ func sendCmixHelper(msg format.Message, recipient *id.ID, param params.CMIX, ins } else { jww.FATAL.Panicf("Gateway %s returned no error, but failed "+ "to accept message when sending to EphID %d (%s) on round %d", - transmitGateway.GetId(), ephID.Int64(), recipient, bestRound.ID) + firstGateway.String(), ephID.Int64(), recipient, bestRound.ID) } } return 0, ephemeral.Id{}, errors.New("failed to send the message, " + diff --git a/network/message/sendE2E.go b/network/message/sendE2E.go index 78061232830f167adf91d21f5805fa2e00b58916..16c14701d47ead025dae9758fa4697ce6c5ea007 100644 --- a/network/message/sendE2E.go +++ b/network/message/sendE2E.go @@ -95,7 +95,7 @@ func (m *Manager) SendE2E(msg message.Send, param params.E2E) ([]id.Round, e2e.M wg.Add(1) go func(i int) { var err error - roundIds[i], _, err = m.SendCMIX(msgEnc, msg.Recipient, + roundIds[i], _, err = m.SendCMIX(m.sender, msgEnc, msg.Recipient, param.CMIX) if err != nil { errCh <- err diff --git a/network/message/sendUnsafe.go b/network/message/sendUnsafe.go index ef28cf2d789d1f5f0a097413046b5a0c50b0a230..df7f05cb8df4ba368f3a3a19368dcb13214f7d49 100644 --- a/network/message/sendUnsafe.go +++ b/network/message/sendUnsafe.go @@ -64,7 +64,7 @@ func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, wg.Add(1) go func(i int) { var err error - roundIds[i], _, err = m.SendCMIX(msgCmix, msg.Recipient, param.CMIX) + roundIds[i], _, err = m.SendCMIX(m.sender, msgCmix, msg.Recipient, param.CMIX) if err != nil { errCh <- err } diff --git a/network/node/register.go b/network/node/register.go index a27529b6eb612e9c3ae49a4e07393dbe67d8c347..943f85cc0fa05a9ba35e65825ec20dcfad785d03 100644 --- a/network/node/register.go +++ b/network/node/register.go @@ -13,6 +13,7 @@ import ( "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/network/gateway" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage/cmix" @@ -33,14 +34,13 @@ import ( ) type RegisterNodeCommsInterface interface { - GetHost(hostId *id.ID) (*connect.Host, bool) SendRequestNonceMessage(host *connect.Host, message *pb.NonceRequest) (*pb.Nonce, error) SendConfirmNonceMessage(host *connect.Host, message *pb.RequestRegistrationConfirmation) (*pb.RegistrationConfirmation, error) } -func StartRegistration(instance *network.Instance, session *storage.Session, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, +func StartRegistration(sender *gateway.Sender, session *storage.Session, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, c chan network.NodeGateway, numParallel uint) stoppable.Stoppable { multi := stoppable.NewMulti("NodeRegistrations") @@ -48,14 +48,14 @@ func StartRegistration(instance *network.Instance, session *storage.Session, rng for i := uint(0); i < numParallel; i++ { stop := stoppable.NewSingle(fmt.Sprintf("NodeRegistration %d", i)) - go registerNodes(session, rngGen, comms, stop, c) + go registerNodes(sender, session, rngGen, comms, stop, c) multi.Add(stop) } return multi } -func registerNodes(session *storage.Session, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, +func registerNodes(sender *gateway.Sender, session *storage.Session, rngGen *fastRNG.StreamGenerator, comms RegisterNodeCommsInterface, stop *stoppable.Single, c chan network.NodeGateway) { u := session.User() regSignature := u.GetTransmissionRegistrationValidationSignature() @@ -71,7 +71,7 @@ func registerNodes(session *storage.Session, rngGen *fastRNG.StreamGenerator, co t.Stop() return case gw := <-c: - err := registerWithNode(comms, gw, regSignature, uci, cmix, rng) + err := registerWithNode(sender, comms, gw, regSignature, uci, cmix, rng) if err != nil { jww.ERROR.Printf("Failed to register node: %+v", err) } @@ -82,7 +82,7 @@ func registerNodes(session *storage.Session, rngGen *fastRNG.StreamGenerator, co //registerWithNode serves as a helper for RegisterWithNodes // It registers a user with a specific in the client's ndf. -func registerWithNode(comms RegisterNodeCommsInterface, ngw network.NodeGateway, regSig []byte, +func registerWithNode(sender *gateway.Sender, comms RegisterNodeCommsInterface, ngw network.NodeGateway, regSig []byte, uci *user.CryptographicIdentity, store *cmix.Store, rng csprng.Source) error { nodeID, err := ngw.Node.GetNodeId() if err != nil { @@ -109,7 +109,7 @@ func registerWithNode(comms RegisterNodeCommsInterface, ngw network.NodeGateway, userNum := int(uci.GetTransmissionID().Bytes()[7]) h := sha256.New() h.Reset() - h.Write([]byte(strconv.Itoa(int(4000 + userNum)))) + h.Write([]byte(strconv.Itoa(4000 + userNum))) transmissionKey = store.GetGroup().NewIntFromBytes(h.Sum(nil)) jww.INFO.Printf("transmissionKey: %v", transmissionKey.Bytes()) @@ -118,24 +118,32 @@ func registerWithNode(comms RegisterNodeCommsInterface, ngw network.NodeGateway, // keys transmissionHash, _ := hash.NewCMixHash() - nonce, dhPub, err := requestNonce(comms, gatewayID, regSig, uci, store, rng) - if err != nil { - return errors.Errorf("Failed to request nonce: %+v", err) - } + _, err := sender.SendToAny(1, func(host *connect.Host) (interface{}, error) { + + nonce, dhPub, err := requestNonce(comms, host, gatewayID, regSig, uci, store, rng) + if err != nil { + return nil, errors.Errorf("Failed to request nonce: %+v", err) + } - // Load server DH pubkey - serverPubDH := store.GetGroup().NewIntFromBytes(dhPub) + // Load server DH pubkey + serverPubDH := store.GetGroup().NewIntFromBytes(dhPub) - // Confirm received nonce - jww.INFO.Println("Register: Confirming received nonce") - err = confirmNonce(comms, uci.GetTransmissionID().Bytes(), - nonce, uci.GetTransmissionRSA(), gatewayID) + // Confirm received nonce + jww.INFO.Println("Register: Confirming received nonce") + err = confirmNonce(comms, uci.GetTransmissionID().Bytes(), + nonce, uci.GetTransmissionRSA(), host, gatewayID) + if err != nil { + errMsg := fmt.Sprintf("Register: Unable to confirm nonce: %v", err) + return nil, errors.New(errMsg) + } + transmissionKey = registration.GenerateBaseKey(store.GetGroup(), + serverPubDH, store.GetDHPrivateKey(), transmissionHash) + return nil, nil + }) if err != nil { - errMsg := fmt.Sprintf("Register: Unable to confirm nonce: %v", err) - return errors.New(errMsg) + jww.ERROR.Printf("registerNode failed: %+v", err) + return err } - transmissionKey = registration.GenerateBaseKey(store.GetGroup(), - serverPubDH, store.GetDHPrivateKey(), transmissionHash) } store.Add(nodeID, transmissionKey) @@ -145,7 +153,7 @@ func registerWithNode(comms RegisterNodeCommsInterface, ngw network.NodeGateway, return nil } -func requestNonce(comms RegisterNodeCommsInterface, gwId *id.ID, regHash []byte, +func requestNonce(comms RegisterNodeCommsInterface, host *connect.Host, gwId *id.ID, regHash []byte, uci *user.CryptographicIdentity, store *cmix.Store, rng csprng.Source) ([]byte, []byte, error) { dhPub := store.GetDHPublicKey().Bytes() opts := rsa.NewDefaultOptions() @@ -165,10 +173,6 @@ func requestNonce(comms RegisterNodeCommsInterface, gwId *id.ID, regHash []byte, jww.INFO.Printf("Register: Requesting nonce from gateway %v", gwId.Bytes()) - host, ok := comms.GetHost(gwId) - if !ok { - return nil, nil, errors.Errorf("Failed to find host with ID %s", gwId.String()) - } nonceResponse, err := comms.SendRequestNonceMessage(host, &pb.NonceRequest{ Salt: uci.GetTransmissionSalt(), @@ -180,6 +184,7 @@ func requestNonce(comms RegisterNodeCommsInterface, gwId *id.ID, regHash []byte, RequestSignature: &messages.RSASignature{ Signature: clientSig, }, + Target: gwId.Marshal(), }) if err != nil { @@ -198,7 +203,7 @@ func requestNonce(comms RegisterNodeCommsInterface, gwId *id.ID, regHash []byte, // It signs a nonce and sends it for confirmation // Returns nil if successful, error otherwise func confirmNonce(comms RegisterNodeCommsInterface, UID, nonce []byte, - privateKeyRSA *rsa.PrivateKey, gwID *id.ID) error { + privateKeyRSA *rsa.PrivateKey, host *connect.Host, gwID *id.ID) error { opts := rsa.NewDefaultOptions() opts.Hash = hash.CMixHash h, _ := hash.NewCMixHash() @@ -224,12 +229,9 @@ func confirmNonce(comms RegisterNodeCommsInterface, UID, nonce []byte, NonceSignedByClient: &messages.RSASignature{ Signature: sig, }, + Target: gwID.Marshal(), } - host, ok := comms.GetHost(gwID) - if !ok { - return errors.Errorf("Failed to find host with ID %s", gwID.String()) - } confirmResponse, err := comms.SendConfirmNonceMessage(host, msg) if err != nil { err := errors.New(fmt.Sprintf( diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index c9a9ce5bd2b710449c89ed5745b633379d8b81a1..b388dde0a09c4ab1808d55074cad1e402dd2d7ef 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -55,9 +55,9 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, } // Send to the gateways using backup proxies - result, err := m.sender.SendToSpecific(gwIds, 5, func(host *connect.Host) (interface{}, error) { + result, err := m.sender.SendToPreferred(gwIds, 5, func(host *connect.Host, target *id.ID) (interface{}, error) { // Attempt to request for this gateway - result, err := m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, host) + result, err := m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, host, target) if err != nil { jww.WARN.Printf("Failed on gateway %s to get messages for round %v", host.GetId().String(), ri.ID) @@ -86,7 +86,7 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, // getMessagesFromGateway attempts to get messages from their assigned // gateway host in the round specified. If successful func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.IdentityUse, - comms messageRetrievalComms, gwHost *connect.Host) (message.Bundle, error) { + comms messageRetrievalComms, gwHost *connect.Host, target *id.ID) (message.Bundle, error) { jww.DEBUG.Printf("Trying to get messages for round %v for ephmeralID %d (%v) "+ "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), gwHost.GetId()) @@ -95,6 +95,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id msgReq := &pb.GetMessages{ ClientID: identity.EphId[:], RoundID: uint64(roundID), + Target: target.Marshal(), } msgResp, err := comms.RequestMessages(gwHost, msgReq) // Fail the round if an error occurs so it can be tried again later diff --git a/network/send.go b/network/send.go index 0e3e9020aa187716e3a7dd7da9f538bff47ba3e9..d70a0c6661c2ee6f4c40f313219baa7cbb86fd52 100644 --- a/network/send.go +++ b/network/send.go @@ -28,7 +28,7 @@ func (m *manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM "network is not healthy") } - return m.message.SendCMIX(msg, recipient, param) + return m.message.SendCMIX(m.GetSender(), msg, recipient, param) } // SendUnsafe sends an unencrypted payload to the provided recipient