diff --git a/network/manager.go b/network/manager.go index 1ae2e9d505591696853ae1c2f171bd6c7edcfbe1..45a2a8663b146f025a5be29176c01f226db73180 100644 --- a/network/manager.go +++ b/network/manager.go @@ -1 +1,63 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + package network + +// manager.go controls access to network resources. Interprocess communications +// and intraclient state are accessible through the context object. + +import ( + "github.com/pkg/errors" + "gitlab.com/elixxir/client/context" + "gitlab.com/elixxir/comms/client" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/elixxir/primitives/switchboard" + "gitlab.com/xx_network/primitives/id" + "sync" + "time" +) + +// Manager implements the NetworkManager interface inside context. It +// controls access to network resources and implements all of the communications +// functions used by the client. +type Manager struct { + // Comms pointer to send/recv messages + Comms *client.Comms + // Context contains all of the keying info used to send messages + Context *context.Context +} + +// NewManager builds a new reception manager object using inputted key fields +func NewManager(context *context.Context, uid *id.ID, privKey, pubKey, + salt []byte) (*Manager, error) { + comms, err := client.NewClientComms(uid, pubKey, privKey, salt) + if err != nil { + return nil, err + } + + cm := &Manager{ + Comms: comms, + Context: ctx, + } + + return cm, nil +} + +// GetRemoteVersion contacts the permissioning server and returns the current +// supported client version. +func (m *Manager) GetRemoteVersion() (string, error) { + permissioningHost, ok := m.Comms.GetHost(&id.Permissioning) + if !ok { + return "", errors.Errorf("no permissioning host with id %s", + id.Permissioning) + } + registrationVersion, err := m.Comms.SendGetCurrentClientVersionMessage( + permissioningHost) + if err != nil { + return "", err + } + return registrationVersion.Version, nil +} diff --git a/network/receive.go b/network/receive.go index 068d729f93a3c20c16d6ed5d34111fddd882100e..2cdd0210c0a4385d6244343d506e04208f881f08 100644 --- a/network/receive.go +++ b/network/receive.go @@ -25,352 +25,49 @@ import ( "time" ) -const reportDuration = 30 * time.Second - -// TODO: REMOVE ME DEAR GOD -var SessionV2 *storage.Session - -var errE2ENotFound = errors.New("E2EKey for matching fingerprint not found, can't process message") - -// MessageReceiver is a polling thread for receiving messages -func (rm *ReceptionManager) MessageReceiver(session user.Session, delay time.Duration, - receptionHost *connect.Host, callback func(error)) { - // FIXME: It's not clear we should be doing decryption here. - if session == nil { - globals.Log.FATAL.Panicf("No user session available") - } - - userData, err := SessionV2.GetUserData() +// Receive is called by a MessageReceiver routine whenever a new CMIX message +// is available. +func Receive(ctx *Context, m *CMIXMessage) { + decrypted, err := decrypt(ctx, m) // Returns MessagePart if err != nil { - globals.Log.FATAL.Panicf("No user data available: %+v", err) + // Add to error/garbled messages list + jww.WARN.Errorf("Could not decode message: %+v", err) + ctx.GetGarbledMesssages().Add(m) } - pollingMessage := pb.ClientRequest{ - UserID: userData.ThisUser.User.Bytes(), - } - quit := rm.quitChan - NumChecks := 0 - NumMessages := 0 - - reportTicker := time.NewTicker(reportDuration) - - var encryptedMessages []*format.Message - - globals.Log.DEBUG.Printf("Gateway Polling for Message Reception Begun") - receptionTicker := time.NewTicker(delay) - - for { - - NumChecks++ - select { - case <-quit: - globals.Log.DEBUG.Printf("Stopped message receiver\n") - return - case <-receptionTicker.C: - - //check if a report on the polling status is due, report to logs if - //it is - select { - case <-reportTicker.C: - globals.Log.DEBUG.Printf("Over the passed %v "+ - "gateway has been checked %v time and %v messages recieved", - reportDuration, NumChecks, NumMessages) - default: - } - - NumChecks++ - - var err error - encryptedMessages, err = rm.receiveMessagesFromGateway(session, &pollingMessage, receptionHost) - - if err != nil { - - if strings.Contains(err.Error(), "Client has exceeded communications rate limit") { - globals.Log.WARN.Printf("Rate limit excceded on gateway, pausing polling for 5 seconds") - time.Sleep(5 * time.Second) - } else if !strings.Contains(err.Error(), "Could not find any message IDs for this user") { - go callback(err) - return - } - } - NumMessages += len(encryptedMessages) - case <-rm.rekeyChan: - encryptedMessages = rm.PopGarbledMessages() - } - - if len(encryptedMessages) != 0 { - - decryptedMessages, senders, garbledMessages := rm.decryptMessages(session, encryptedMessages) - - if len(garbledMessages) != 0 { - rm.AppendGarbledMessage(garbledMessages...) - } - - if decryptedMessages != nil { - for i := range decryptedMessages { - // TODO Handle messages that do not need partitioning - assembledMessage, err := rm.collator.AddMessage(decryptedMessages[i], - senders[i], time.Minute) - if err != nil { - go callback(err) - } - if assembledMessage != nil { - // we got a fully assembled message. let's broadcast it - broadcastMessageReception( - assembledMessage, - rm.switchboard) - } - } - } - } + // Reconstruct the partitioned message + completeMsg := constructMessageFromPartition(ctx, decrypted) // Returns ClientMessage + if completeMsg != nil { + ctx.GetSwitchBoard().Say(completeMsg) } } -// FIXME: put all key and external object into context var or other solution. -func handleE2EReceiving(session user.Session, switchb *switchboard.Switchboard, - message *format.Message) (*id.ID, bool, error) { - - userData, err := SessionV2.GetUserData() - if err != nil { - return nil, false, fmt.Errorf("Could not get user data: %+v", - err) - } - - keyFingerprint := message.GetKeyFP() - - // Lookup reception key - recpKey := session.GetKeyStore(). - GetRecvKey(keyFingerprint) - - rekey := false - if recpKey == nil { - // TODO Handle sending error message to SW - return nil, false, fmt.Errorf("E2EKey for matching fingerprint not found, can't process message") - } else if recpKey.GetOuterType() == parse.Rekey { - // If key type is rekey, the message is a rekey from partner - rekey = true - } - - sender := recpKey.GetManager().GetPartner() - - globals.Log.DEBUG.Printf("E2E decrypting message") - if rekey { - err = crypto.E2EDecryptUnsafe(userData.E2EGrp, recpKey.GetKey(), - message) - } else { - err = crypto.E2EDecrypt(userData.E2EGrp, recpKey.GetKey(), - message) - } - - if err != nil { - // TODO handle Garbled message to SW - } - - // Get partner from Key Manager of receiving key - // since there is no space in message for senderID - // Get decrypted partner public key from message - // Send rekey message to switchboard - if rekey { - partner := recpKey.GetManager().GetPartner() - partnerPubKey := message.Contents.Get() - rekeyMsg := &parse.Message{ - Sender: partner, - TypedBody: parse.TypedBody{ - MessageType: int32(keyExchange.Type_NO_TYPE), - Body: partnerPubKey, - }, - InferredType: parse.Rekey, - Receiver: userData.ThisUser.User, - } - go switchb.Speak(rekeyMsg) - } - return sender, rekey, err +// StartMessageReceivers starts a worker pool of message receivers, which listen +// on a channel for messages and run them through processing. +func StartMessageReceivers(ctx *context.Context) Stoppable { + // We assume receivers channel is set up elsewhere, but note that this + // would also be a reasonable place under assumption of 1 call to + // message receivers (would also make sense to .Close it instead of + // using quit channel, which somewhat simplifies for loop later. + receiverCh := ctx.GetNetwork().GetMessageReceiverCh() + for i := 0; i < ctx.GetNumReceivers(); i++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go MessageReceiver(ctx, messagesCh, quitCh) + } + + // Return multistoppable } -func (rm *ReceptionManager) receiveMessagesFromGateway(session user.Session, - pollingMessage *pb.ClientRequest, receiveGateway *connect.Host) ([]*format.Message, error) { - // Get the last message ID received - var err error - - // FIXME: Cleanup after user.Session is removed and replaced. - if SessionV2 == nil { - globals.Log.WARN.Printf("SessionV2 is nil") - return nil, errors.New("SessionV2 is nil") - } - userData, err := SessionV2.GetUserData() - if err != nil { - globals.Log.WARN.Printf("Could not get UserData: %+v", err) - return nil, err - } - - pollingMessage.LastMessageID, err = SessionV2.GetLastMessageId() - if err != nil { - globals.Log.WARN.Printf("Could not get LastMessageID: %+v", err) - return nil, err - } - // FIXME: dont do this over an over - - // Gets a list of mssages that are newer than the last one recieved - messageIDs, err := rm.Comms.SendCheckMessages(receiveGateway, pollingMessage) - - if err != nil { - return nil, err - } - - if len(messageIDs.IDs) < 0 { - globals.Log.DEBUG.Printf("Checking novelty of %v messageIDs", len(messageIDs.IDs)) - } - - messages := make([]*format.Message, len(messageIDs.IDs)) - mIDs := make([]string, len(messageIDs.IDs)) - - // fixme: this could miss messages if the client has not seen them but - // the gateway say them before a message the client has seen - - // Loops through every new message and retrieves it - bufLoc := 0 - for _, messageID := range messageIDs.IDs { - // Get the first unseen message from the list of IDs - rm.recievedMesageLock.RLock() - _, received := rm.receivedMessages[messageID] - rm.recievedMesageLock.RUnlock() - if !received { - globals.Log.INFO.Printf("Got a message waiting on the gateway: %v", - messageID) - // We haven't seen this message before. - // So, we should retrieve it from the gateway. - newMessage, err := rm.Comms.SendGetMessage(receiveGateway, - &pb.ClientRequest{ - UserID: userData.ThisUser.User.Bytes(), - LastMessageID: messageID, - }) - if err != nil { - globals.Log.WARN.Printf( - "Couldn't receive message with ID %v while"+ - " polling gateway", messageID) - } else { - if newMessage.PayloadA == nil || - newMessage.PayloadB == nil { - globals.Log.INFO.Println("Message fields not populated") - continue - } - - msg := format.NewMessage() - msg.SetPayloadA(newMessage.PayloadA) - msg.SetDecryptedPayloadB(newMessage.PayloadB) - - globals.Log.WARN.Printf( - "Loc: %d, %v", bufLoc, messageID) - messages[bufLoc] = msg - mIDs[bufLoc] = messageID - bufLoc++ - } - } - } - // record that the messages were received so they are not re-retrieved - if bufLoc > 0 { - for i := 0; i < bufLoc; i++ { - globals.Log.INFO.Printf( - "Adding message ID %v to received message IDs", mIDs[i]) - rm.recievedMesageLock.Lock() - rm.receivedMessages[mIDs[i]] = struct{}{} - rm.recievedMesageLock.Unlock() - } - err = SessionV2.SetLastMessageId(mIDs[bufLoc-1]) - if err != nil { - return nil, err - } - err = session.StoreSession() - if err != nil { - globals.Log.ERROR.Printf("Could not store session "+ - "after messages received from gateway: %+v", err) - } - } - - return messages[:bufLoc], nil -} - -func (rm *ReceptionManager) decryptMessages(session user.Session, - encryptedMessages []*format.Message) ([]*format.Message, []*id.ID, - []*format.Message) { - - messages := make([]*format.Message, len(encryptedMessages)) - senders := make([]*id.ID, len(encryptedMessages)) - messagesSendersLoc := 0 - - garbledMessages := make([]*format.Message, len(encryptedMessages)) - garbledMessagesLoc := 0 - - for _, msg := range encryptedMessages { - var err error = nil - var rekey bool - var unpadded []byte - var sender *id.ID - garbled := false - - // If message is E2E, handle decryption - if e2e.IsUnencrypted(msg) { - // If message is non E2E, need to un-pad payload - unpadded, err = e2e.Unpad(msg.Contents.Get()) - if err == nil { - msg.Contents.SetRightAligned(unpadded) - } - - keyFP := msg.AssociatedData.GetKeyFP() - sender, err = makeUserID(keyFP[:]) - } else { - sender, rekey, err = handleE2EReceiving(session, - rm.switchboard, msg) - - if err == errE2ENotFound { - garbled = true - err = nil - } - } - - if err != nil { - globals.Log.WARN.Printf( - "Message did not decrypt properly, "+ - "not adding to messages array: %v", err.Error()) - } else if rekey { - globals.Log.INFO.Printf("Correctly processed rekey message," + - " not adding to messages array") - } else if garbled { - garbledMessages[garbledMessagesLoc] = msg - garbledMessagesLoc++ - } else { - messages[messagesSendersLoc] = msg - senders[messagesSendersLoc] = sender - messagesSendersLoc++ +func MessageReceiver(ctx *context.Context, messagesCh chan ClientMessage, + quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case m := <-messagesCh: + ReceiveMessage(ctx, m) // defined elsewhere... } } - - return messages[:messagesSendersLoc], senders[:messagesSendersLoc], garbledMessages[:garbledMessagesLoc] -} - -func broadcastMessageReception(message *parse.Message, - listeners *switchboard.Switchboard) { - - listeners.Speak(message) -} - -// Put a sender ID in a byte slice and set its type to user -func makeUserID(senderID []byte) (*id.ID, error) { - senderIDBytes := make([]byte, id.ArrIDLen) - copy(senderIDBytes, senderID[:]) - userID, err := id.Unmarshal(senderIDBytes) - if userID != nil { - userID.SetType(id.User) - } - return userID, err -} - -// skipErrChecker checks checks if the error is fatal or should be ignored -func skipErrChecker(err error) bool { - if strings.Contains(err.Error(), "Could not find any message IDs for this user") { - return true - } - - return false - } diff --git a/network/send.go b/network/send.go index 3779da1247a4f68ad2cb05b3380c82ec9d77f442..17a22048f709277bd8c3689244821eb5ef79b880 100644 --- a/network/send.go +++ b/network/send.go @@ -1,5 +1,5 @@ //////////////////////////////////////////////////////////////////////////////// -// Copyright © 2019 Privategrity Corporation / +// Copyright © 2020 Privategrity Corporation / // / // All rights reserved. / //////////////////////////////////////////////////////////////////////////////// @@ -7,282 +7,35 @@ package network import ( - "fmt" - "github.com/pkg/errors" - "gitlab.com/elixxir/client/crypto" - "gitlab.com/elixxir/client/globals" - "gitlab.com/elixxir/client/network/keyExchange" - "gitlab.com/elixxir/client/keyStore" - "gitlab.com/elixxir/client/parse" - "gitlab.com/elixxir/client/user" - pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/client/context/message" + "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/comms/network" - "gitlab.com/elixxir/crypto/cmix" - "gitlab.com/elixxir/crypto/csprng" - "gitlab.com/elixxir/crypto/e2e" - "gitlab.com/elixxir/crypto/hash" "gitlab.com/elixxir/primitives/format" - "gitlab.com/elixxir/primitives/switchboard" - "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" - "time" ) -// SendMessage to the provided Recipient -// TODO: It's not clear why we wouldn't hand off a sender object (with -// the keys) here. I won't touch crypto at this time, though... -// TODO This method would be cleaner if it took a parse.Message (particularly -// w.r.t. generating message IDs for multi-part messages.) -func (rm *ReceptionManager) SendMessage(session user.Session, topology *connect.Circuit, - recipientID *id.ID, cryptoType parse.CryptoType, - message []byte, transmissionHost *connect.Host) error { - // FIXME: We should really bring the plaintext parts of the NewMessage logic - // into this module, then have an EncryptedMessage type that is sent to/from - // the cMix network. This NewMessage does way too many things: break the - // message into parts, generate mic's, etc -- the crypto library should only - // know about the crypto and advertise a max message payload size - - // TBD: Is there a really good reason why we'd ever have more than one user - // in this library? why not pass a sender object instead? - globals.Log.DEBUG.Printf("Sending message to %q: %q", *recipientID, message) - parts, err := parse.Partition([]byte(message), - rm.nextId()) - if err != nil { - return fmt.Errorf("SendMessage Partition() error: %v", err.Error()) - } - // Every part should have the same timestamp - now := time.Now() - // GO Timestamp binary serialization is 15 bytes, which - // allows the encrypted timestamp to fit in 16 bytes - // using AES encryption - // The timestamp will be encrypted later - // NOTE: This sets 15 bytes, not 16 - nowBytes, err := now.MarshalBinary() - if err != nil { - return fmt.Errorf("SendMessage MarshalBinary() error: %v", err.Error()) - } - // Add a byte for later encryption (15->16 bytes) - extendedNowBytes := append(nowBytes, 0) - for i := range parts { - message := format.NewMessage() - message.SetRecipient(recipientID) - message.SetTimestamp(extendedNowBytes) - message.Contents.SetRightAligned(parts[i]) - err = rm.send(session, topology, cryptoType, message, false, transmissionHost) - if err != nil { - return errors.Wrap(err, "SendMessage send() error:") - } - } - return nil +// SendE2E sends an end-to-end payload to the provided recipient with +// the provided msgType. Returns the list of rounds in which parts of +// the message were sent or an error if it fails. +func (m *Manager) SendE2E(m message.Send, e2eP params.E2E, cmixP params.CMIX) ( + []id.Round, error) { + return nil, nil } -// Send Message without doing partitions -// This function will be needed for example to send a Rekey -// message, where a new public key will take up the whole message -func (rm *ReceptionManager) SendMessageNoPartition(session user.Session, - topology *connect.Circuit, recipientID *id.ID, cryptoType parse.CryptoType, - message []byte, transmissionHost *connect.Host) error { - size := len(message) - if size > format.TotalLen { - return fmt.Errorf("SendMessageNoPartition() error: message to be sent is too big") - } - now := time.Now() - // GO Timestamp binary serialization is 15 bytes, which - // allows the encrypted timestamp to fit in 16 bytes - // using AES encryption - // The timestamp will be encrypted later - // NOTE: This sets 15 bytes, not 16 - nowBytes, err := now.MarshalBinary() - if err != nil { - return fmt.Errorf("SendMessageNoPartition MarshalBinary() error: %v", err.Error()) - } - msg := format.NewMessage() - msg.SetRecipient(recipientID) - // Add a byte to support later encryption (15 -> 16 bytes) - nowBytes = append(nowBytes, 0) - msg.SetTimestamp(nowBytes) - msg.Contents.Set(message) - globals.Log.DEBUG.Printf("Sending message to %v: %x", *recipientID, message) - - err = rm.send(session, topology, cryptoType, msg, true, transmissionHost) - if err != nil { - return fmt.Errorf("SendMessageNoPartition send() error: %v", err.Error()) - } - return nil +// SendUnsafe sends an unencrypted payload to the provided recipient +// with the provided msgType. Returns the list of rounds in which parts +// of the message were sent or an error if it fails. +// NOTE: Do not use this function unless you know what you are doing. +// This function always produces an error message in client logging. +func (m *Manager) SendUnsafe(m message.Send) ([]id.Round, error) { + return nil, nil } -// send actually sends the message to the server -func (rm *ReceptionManager) send(session user.Session, topology *connect.Circuit, - cryptoType parse.CryptoType, - message *format.Message, - rekey bool, transmitGateway *connect.Host) error { - - userData, err := SessionV2.GetUserData() - if err != nil { - return err - } - - // Enable transmission blocking if enabled - if rm.blockTransmissions { - rm.sendLock.Lock() - defer func() { - time.Sleep(rm.transmitDelay) - rm.sendLock.Unlock() - }() - } - - uid := userData.ThisUser.User - - // Check message type - if cryptoType == parse.E2E { - handleE2ESending(session, rm.switchboard, message, rekey) - } else { - padded, err := e2e.Pad(message.Contents.GetRightAligned(), format.ContentsLen) - if err != nil { - return err - } - message.Contents.Set(padded) - e2e.SetUnencrypted(message) - fp := format.NewFingerprint(uid.Marshal()[:32]) - message.SetKeyFP(*fp) - } - // CMIX Encryption - salt := cmix.NewSalt(csprng.Source(&csprng.SystemRNG{}), 32) - encMsg, kmacs := crypto.CMIXEncrypt(session, topology, salt, message) - - // Construct slot message - msgPacket := &pb.Slot{ - SenderID: uid.Marshal(), - PayloadA: encMsg.GetPayloadA(), - PayloadB: encMsg.GetPayloadB(), - Salt: salt, - KMACs: kmacs, - } - - // Retrieve the base key for the zeroeth node - nodeKeys, err := SessionV2.GetNodeKeysFromCircuit(topology) - if err != nil { - globals.Log.ERROR.Printf("could not get nodeKeys: %+v", err) - return err - } - nk := nodeKeys[0] - - clientGatewayKey := cmix.GenerateClientGatewayKey(nk.TransmissionKey) - // Hash the clientGatewayKey and the the slot's salt - h, _ := hash.NewCMixHash() - h.Write(clientGatewayKey) - h.Write(msgPacket.Salt) - hashed := h.Sum(nil) - h.Reset() - - // Construct the gateway message - msg := &pb.GatewaySlot{ - Message: msgPacket, - RoundID: 0, - } - - // Hash the gatewaySlotDigest and the above hashed data - gatewaySlotDigest := network.GenerateSlotDigest(msg) - h.Write(hashed) - h.Write(gatewaySlotDigest) - - // Place the hashed data as the message's MAC - msg.MAC = h.Sum(nil) - // Send the message - gwSlotResp, err := rm.Comms.SendPutMessage(transmitGateway, msg) - if err != nil { - return err - } - - if !gwSlotResp.Accepted { - return errors.Errorf("Message was refused!") - } - - return err -} - -// FIXME: hand off all keys via a context variable or other solution. -func handleE2ESending(session user.Session, switchb *switchboard.Switchboard, - message *format.Message, - rekey bool) { - recipientID, err := message.GetRecipient() - if err != nil { - globals.Log.ERROR.Panic(err) - } - - userData, err := SessionV2.GetUserData() - if err != nil { - globals.Log.FATAL.Panicf("Couldn't get userData: %+v ", err) - } - - var key *keyStore.E2EKey - var action keyStore.Action - // Get KeyManager for this partner - km := session.GetKeyStore().GetSendManager(recipientID) - if km == nil { - partners := session.GetKeyStore().GetPartners() - globals.Log.INFO.Printf("Valid Partner IDs: %+v", partners) - globals.Log.FATAL.Panicf("Couldn't get KeyManager to E2E encrypt message to"+ - " user %v", *recipientID) - } - - // FIXME: This is a hack to prevent a crash, this function should be - // able to block until this condition is true. - for end, timeout := false, time.After(60*time.Second); !end; { - if rekey { - // Get send Rekey - key, action = km.PopRekey() - } else { - // Get send key - key, action = km.PopKey() - } - if key != nil { - end = true - } - - select { - case <-timeout: - end = true - default: - } - } - - if key == nil { - globals.Log.FATAL.Panicf("Couldn't get key to E2E encrypt message to"+ - " user %v", *recipientID) - } else if action == keyStore.Purge { - // Destroy this key manager - km := key.GetManager() - km.Destroy(session.GetKeyStore()) - globals.Log.WARN.Printf("Destroying E2E Send Keys Manager for partner: %v", *recipientID) - } else if action == keyStore.Deleted { - globals.Log.FATAL.Panicf("Key Manager is deleted when trying to get E2E Send Key") - } - - if action == keyStore.Rekey { - // Send RekeyTrigger message to switchboard - rekeyMsg := &parse.Message{ - Sender: userData.ThisUser.User, - TypedBody: parse.TypedBody{ - MessageType: int32(keyExchange.Type_REKEY_TRIGGER), - Body: []byte{}, - }, - InferredType: parse.None, - Receiver: recipientID, - } - go switchb.Speak(rekeyMsg) - } - - globals.Log.DEBUG.Printf("E2E encrypting message") - if rekey { - crypto.E2EEncryptUnsafe(userData.E2EGrp, - key.GetKey(), - key.KeyFingerprint(), - message) - } else { - crypto.E2EEncrypt(userData.E2EGrp, - key.GetKey(), - key.KeyFingerprint(), - message) - } +// SendCMIX sends a "raw" CMIX message payload to the provided +// recipient. Note that both SendE2E and SendUnsafe call SendCMIX. +// Returns the round ID of the round the payload was sent or an error +// if it fails. +func (m *Manager) SendCMIX(message format.Message) (id.Round, error) { + return nil, nil } diff --git a/network/threads.go b/network/threads.go index 14406c3eccc7e4e4e1e74219a94857cd292fdc33..0891fd793a15b1387946845518ec85fa41e82215 100644 --- a/network/threads.go +++ b/network/threads.go @@ -35,64 +35,6 @@ func (c ChanStop) Close(timeout time.Duration) { } } -// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable -// structure -func StartTrackNetwork(ctx *context.Context) Stoppable { - stopper := ChanStop{ - name: "TrackNetwork", - quit: make(chan bool), - } - go TrackNetwork(ctx, stopper.quit) - return stopper -} - -// TrackNetwork polls the network to get updated on the state of nodes, the -// round status, and informs the client when messages can be retrieved. -func TrackNetwork(ctx *context.Context, quitCh chan bool) { - ticker := timer.NewTicker(ctx.GetTrackNetworkPeriod()) - done := false - for !done { - select { - case <-quitCh: - done = true - case <-ticker: - trackNetwork(ctx) - } - } -} - -func trackNetwork(ctx) { - gateway, err := ctx.Session.GetNodeKeys().GetGatewayForSending() - if err != nil { - //... - } - - network := ctx.GetNetwork() - ndf, err := network.PollNDF(ctx, gateway) - if err != nil { - // .... - } - - newNodes, removedNodes := network.UpdateNDF(ndf) - for _, n := range newNodes { - network.addNodeCh <- n - } - for _, n := range removedNodes { - network.removeNodeCh <- n - } - - rounds, err = network.UpdateRounds(ctx, ndf) - if err != nil { - // ... - } - - err = rounds.GetKnownRound().MaskedRange(gateway, - network.CheckRoundsFunction) - if err != nil { - // ... - } -} - func StartProcessHistoricalRounds(ctx *context.Context) Stoppable { stopper := ChanStop{ name: "ProcessHistoricalRounds", @@ -142,34 +84,6 @@ func processHistoricalRounds(ctx *context.Context, rids []RoundID) []*RoundInfo return ris } -func StartMessageReceivers(ctx *context.Context) Stoppable { - // We assume receivers channel is set up elsewhere, but note that this - // would also be a reasonable place under assumption of 1 call to - // message receivers (would also make sense to .Close it instead of - // using quit channel, which somewhat simplifies for loop later. - receiverCh := ctx.GetNetwork().GetMessageReceiverCh() - for i := 0; i < ctx.GetNumReceivers(); i++ { - // quitCh created for each thread, add to multistop - quitCh := make(chan bool) - go MessageReceiver(ctx, messagesCh, quitCh) - } - - // Return multistoppable -} - -func MessageReceiver(ctx *context.Context, messagesCh chan ClientMessage, - quitCh chan bool) { - done := false - for !done { - select { - case <-quitCh: - done = true - case m := <-messagesCh: - ReceiveMessage(ctx, m) // defined elsewhere... - } - } -} - func StartNodeKeyExchange(ctx *context.Context) { keyCh := ctx.GetNetwork().GetNodeKeysCh() for i := 0; i < ctx.GetNumNodeKeyExchangers(); i++ { diff --git a/network/updates.go b/network/updates.go new file mode 100644 index 0000000000000000000000000000000000000000..5760d2834c48065a0e5512ebc435eac1ebc54fdd --- /dev/null +++ b/network/updates.go @@ -0,0 +1,84 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package network + +// updates.go tracks the network for: +// 1. Node addition and removal +// 2. New/Active/Complete rounds and their contact gateways +// This information is tracked by polling a gateway for the network definition +// file (NDF). Once it detects an event it sends it off to the proper channel +// for a worker to update the client state (add/remove a node, check for +// messages at a gateway, etc). See: +// - nodes.go for add/remove node events +// - rounds.go for round event handling & processing +// - receive.go for message handling + +import ( + "gitlab.com/elixxir/client/context" +) + +// GetUpdates polls the network for updates. +func (m *Manager) GetUpdates() (*network.Instance, error) { + return nil, nil +} + +// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable +func StartTrackNetwork(ctx *context.Context) Stoppable { + stopper := ChanStop{ + name: "TrackNetwork", + quit: make(chan bool), + } + go TrackNetwork(ctx, stopper.quit) + return stopper +} + +// TrackNetwork polls the network to get updated on the state of nodes, the +// round status, and informs the client when messages can be retrieved. +func TrackNetwork(ctx *context.Context, quitCh chan bool) { + ticker := timer.NewTicker(ctx.GetTrackNetworkPeriod()) + done := false + for !done { + select { + case <-quitCh: + done = true + case <-ticker: + trackNetwork(ctx) + } + } +} + +func trackNetwork(ctx) { + gateway, err := ctx.Session.GetNodeKeys().GetGatewayForSending() + if err != nil { + //... + } + + network := ctx.GetNetwork() + ndf, err := network.PollNDF(ctx, gateway) + if err != nil { + // .... + } + + newNodes, removedNodes := network.UpdateNDF(ndf) + for _, n := range newNodes { + network.addNodeCh <- n + } + for _, n := range removedNodes { + network.removeNodeCh <- n + } + + rounds, err = network.UpdateRounds(ctx, ndf) + if err != nil { + // ... + } + + err = rounds.GetKnownRound().MaskedRange(gateway, + network.CheckRoundsFunction) + if err != nil { + // ... + } +}