Skip to content
Snippets Groups Projects
Commit afe0eb47 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Initial reception outline

parent 041ea468
No related branches found
No related tags found
No related merge requests found
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package network package network
// manager.go controls access to network resources. Interprocess communications
// and intraclient state are accessible through the context object.
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/context"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/switchboard"
"gitlab.com/xx_network/primitives/id"
"sync"
"time"
)
// Manager implements the NetworkManager interface inside context. It
// controls access to network resources and implements all of the communications
// functions used by the client.
type Manager struct {
// Comms pointer to send/recv messages
Comms *client.Comms
// Context contains all of the keying info used to send messages
Context *context.Context
}
// NewManager builds a new reception manager object using inputted key fields
func NewManager(context *context.Context, uid *id.ID, privKey, pubKey,
salt []byte) (*Manager, error) {
comms, err := client.NewClientComms(uid, pubKey, privKey, salt)
if err != nil {
return nil, err
}
cm := &Manager{
Comms: comms,
Context: ctx,
}
return cm, nil
}
// GetRemoteVersion contacts the permissioning server and returns the current
// supported client version.
func (m *Manager) GetRemoteVersion() (string, error) {
permissioningHost, ok := m.Comms.GetHost(&id.Permissioning)
if !ok {
return "", errors.Errorf("no permissioning host with id %s",
id.Permissioning)
}
registrationVersion, err := m.Comms.SendGetCurrentClientVersionMessage(
permissioningHost)
if err != nil {
return "", err
}
return registrationVersion.Version, nil
}
...@@ -25,352 +25,49 @@ import ( ...@@ -25,352 +25,49 @@ import (
"time" "time"
) )
const reportDuration = 30 * time.Second // Receive is called by a MessageReceiver routine whenever a new CMIX message
// is available.
// TODO: REMOVE ME DEAR GOD func Receive(ctx *Context, m *CMIXMessage) {
var SessionV2 *storage.Session decrypted, err := decrypt(ctx, m) // Returns MessagePart
var errE2ENotFound = errors.New("E2EKey for matching fingerprint not found, can't process message")
// MessageReceiver is a polling thread for receiving messages
func (rm *ReceptionManager) MessageReceiver(session user.Session, delay time.Duration,
receptionHost *connect.Host, callback func(error)) {
// FIXME: It's not clear we should be doing decryption here.
if session == nil {
globals.Log.FATAL.Panicf("No user session available")
}
userData, err := SessionV2.GetUserData()
if err != nil {
globals.Log.FATAL.Panicf("No user data available: %+v", err)
}
pollingMessage := pb.ClientRequest{
UserID: userData.ThisUser.User.Bytes(),
}
quit := rm.quitChan
NumChecks := 0
NumMessages := 0
reportTicker := time.NewTicker(reportDuration)
var encryptedMessages []*format.Message
globals.Log.DEBUG.Printf("Gateway Polling for Message Reception Begun")
receptionTicker := time.NewTicker(delay)
for {
NumChecks++
select {
case <-quit:
globals.Log.DEBUG.Printf("Stopped message receiver\n")
return
case <-receptionTicker.C:
//check if a report on the polling status is due, report to logs if
//it is
select {
case <-reportTicker.C:
globals.Log.DEBUG.Printf("Over the passed %v "+
"gateway has been checked %v time and %v messages recieved",
reportDuration, NumChecks, NumMessages)
default:
}
NumChecks++
var err error
encryptedMessages, err = rm.receiveMessagesFromGateway(session, &pollingMessage, receptionHost)
if err != nil {
if strings.Contains(err.Error(), "Client has exceeded communications rate limit") {
globals.Log.WARN.Printf("Rate limit excceded on gateway, pausing polling for 5 seconds")
time.Sleep(5 * time.Second)
} else if !strings.Contains(err.Error(), "Could not find any message IDs for this user") {
go callback(err)
return
}
}
NumMessages += len(encryptedMessages)
case <-rm.rekeyChan:
encryptedMessages = rm.PopGarbledMessages()
}
if len(encryptedMessages) != 0 {
decryptedMessages, senders, garbledMessages := rm.decryptMessages(session, encryptedMessages)
if len(garbledMessages) != 0 {
rm.AppendGarbledMessage(garbledMessages...)
}
if decryptedMessages != nil {
for i := range decryptedMessages {
// TODO Handle messages that do not need partitioning
assembledMessage, err := rm.collator.AddMessage(decryptedMessages[i],
senders[i], time.Minute)
if err != nil {
go callback(err)
}
if assembledMessage != nil {
// we got a fully assembled message. let's broadcast it
broadcastMessageReception(
assembledMessage,
rm.switchboard)
}
}
}
}
}
}
// FIXME: put all key and external object into context var or other solution.
func handleE2EReceiving(session user.Session, switchb *switchboard.Switchboard,
message *format.Message) (*id.ID, bool, error) {
userData, err := SessionV2.GetUserData()
if err != nil {
return nil, false, fmt.Errorf("Could not get user data: %+v",
err)
}
keyFingerprint := message.GetKeyFP()
// Lookup reception key
recpKey := session.GetKeyStore().
GetRecvKey(keyFingerprint)
rekey := false
if recpKey == nil {
// TODO Handle sending error message to SW
return nil, false, fmt.Errorf("E2EKey for matching fingerprint not found, can't process message")
} else if recpKey.GetOuterType() == parse.Rekey {
// If key type is rekey, the message is a rekey from partner
rekey = true
}
sender := recpKey.GetManager().GetPartner()
globals.Log.DEBUG.Printf("E2E decrypting message")
if rekey {
err = crypto.E2EDecryptUnsafe(userData.E2EGrp, recpKey.GetKey(),
message)
} else {
err = crypto.E2EDecrypt(userData.E2EGrp, recpKey.GetKey(),
message)
}
if err != nil {
// TODO handle Garbled message to SW
}
// Get partner from Key Manager of receiving key
// since there is no space in message for senderID
// Get decrypted partner public key from message
// Send rekey message to switchboard
if rekey {
partner := recpKey.GetManager().GetPartner()
partnerPubKey := message.Contents.Get()
rekeyMsg := &parse.Message{
Sender: partner,
TypedBody: parse.TypedBody{
MessageType: int32(keyExchange.Type_NO_TYPE),
Body: partnerPubKey,
},
InferredType: parse.Rekey,
Receiver: userData.ThisUser.User,
}
go switchb.Speak(rekeyMsg)
}
return sender, rekey, err
}
func (rm *ReceptionManager) receiveMessagesFromGateway(session user.Session,
pollingMessage *pb.ClientRequest, receiveGateway *connect.Host) ([]*format.Message, error) {
// Get the last message ID received
var err error
// FIXME: Cleanup after user.Session is removed and replaced.
if SessionV2 == nil {
globals.Log.WARN.Printf("SessionV2 is nil")
return nil, errors.New("SessionV2 is nil")
}
userData, err := SessionV2.GetUserData()
if err != nil {
globals.Log.WARN.Printf("Could not get UserData: %+v", err)
return nil, err
}
pollingMessage.LastMessageID, err = SessionV2.GetLastMessageId()
if err != nil {
globals.Log.WARN.Printf("Could not get LastMessageID: %+v", err)
return nil, err
}
// FIXME: dont do this over an over
// Gets a list of mssages that are newer than the last one recieved
messageIDs, err := rm.Comms.SendCheckMessages(receiveGateway, pollingMessage)
if err != nil {
return nil, err
}
if len(messageIDs.IDs) < 0 {
globals.Log.DEBUG.Printf("Checking novelty of %v messageIDs", len(messageIDs.IDs))
}
messages := make([]*format.Message, len(messageIDs.IDs))
mIDs := make([]string, len(messageIDs.IDs))
// fixme: this could miss messages if the client has not seen them but
// the gateway say them before a message the client has seen
// Loops through every new message and retrieves it
bufLoc := 0
for _, messageID := range messageIDs.IDs {
// Get the first unseen message from the list of IDs
rm.recievedMesageLock.RLock()
_, received := rm.receivedMessages[messageID]
rm.recievedMesageLock.RUnlock()
if !received {
globals.Log.INFO.Printf("Got a message waiting on the gateway: %v",
messageID)
// We haven't seen this message before.
// So, we should retrieve it from the gateway.
newMessage, err := rm.Comms.SendGetMessage(receiveGateway,
&pb.ClientRequest{
UserID: userData.ThisUser.User.Bytes(),
LastMessageID: messageID,
})
if err != nil {
globals.Log.WARN.Printf(
"Couldn't receive message with ID %v while"+
" polling gateway", messageID)
} else {
if newMessage.PayloadA == nil ||
newMessage.PayloadB == nil {
globals.Log.INFO.Println("Message fields not populated")
continue
}
msg := format.NewMessage()
msg.SetPayloadA(newMessage.PayloadA)
msg.SetDecryptedPayloadB(newMessage.PayloadB)
globals.Log.WARN.Printf(
"Loc: %d, %v", bufLoc, messageID)
messages[bufLoc] = msg
mIDs[bufLoc] = messageID
bufLoc++
}
}
}
// record that the messages were received so they are not re-retrieved
if bufLoc > 0 {
for i := 0; i < bufLoc; i++ {
globals.Log.INFO.Printf(
"Adding message ID %v to received message IDs", mIDs[i])
rm.recievedMesageLock.Lock()
rm.receivedMessages[mIDs[i]] = struct{}{}
rm.recievedMesageLock.Unlock()
}
err = SessionV2.SetLastMessageId(mIDs[bufLoc-1])
if err != nil { if err != nil {
return nil, err // Add to error/garbled messages list
} jww.WARN.Errorf("Could not decode message: %+v", err)
err = session.StoreSession() ctx.GetGarbledMesssages().Add(m)
if err != nil {
globals.Log.ERROR.Printf("Could not store session "+
"after messages received from gateway: %+v", err)
}
}
return messages[:bufLoc], nil
}
func (rm *ReceptionManager) decryptMessages(session user.Session,
encryptedMessages []*format.Message) ([]*format.Message, []*id.ID,
[]*format.Message) {
messages := make([]*format.Message, len(encryptedMessages))
senders := make([]*id.ID, len(encryptedMessages))
messagesSendersLoc := 0
garbledMessages := make([]*format.Message, len(encryptedMessages))
garbledMessagesLoc := 0
for _, msg := range encryptedMessages {
var err error = nil
var rekey bool
var unpadded []byte
var sender *id.ID
garbled := false
// If message is E2E, handle decryption
if e2e.IsUnencrypted(msg) {
// If message is non E2E, need to un-pad payload
unpadded, err = e2e.Unpad(msg.Contents.Get())
if err == nil {
msg.Contents.SetRightAligned(unpadded)
} }
keyFP := msg.AssociatedData.GetKeyFP() // Reconstruct the partitioned message
sender, err = makeUserID(keyFP[:]) completeMsg := constructMessageFromPartition(ctx, decrypted) // Returns ClientMessage
} else { if completeMsg != nil {
sender, rekey, err = handleE2EReceiving(session, ctx.GetSwitchBoard().Say(completeMsg)
rm.switchboard, msg)
if err == errE2ENotFound {
garbled = true
err = nil
} }
} }
if err != nil { // StartMessageReceivers starts a worker pool of message receivers, which listen
globals.Log.WARN.Printf( // on a channel for messages and run them through processing.
"Message did not decrypt properly, "+ func StartMessageReceivers(ctx *context.Context) Stoppable {
"not adding to messages array: %v", err.Error()) // We assume receivers channel is set up elsewhere, but note that this
} else if rekey { // would also be a reasonable place under assumption of 1 call to
globals.Log.INFO.Printf("Correctly processed rekey message," + // message receivers (would also make sense to .Close it instead of
" not adding to messages array") // using quit channel, which somewhat simplifies for loop later.
} else if garbled { receiverCh := ctx.GetNetwork().GetMessageReceiverCh()
garbledMessages[garbledMessagesLoc] = msg for i := 0; i < ctx.GetNumReceivers(); i++ {
garbledMessagesLoc++ // quitCh created for each thread, add to multistop
} else { quitCh := make(chan bool)
messages[messagesSendersLoc] = msg go MessageReceiver(ctx, messagesCh, quitCh)
senders[messagesSendersLoc] = sender
messagesSendersLoc++
}
} }
return messages[:messagesSendersLoc], senders[:messagesSendersLoc], garbledMessages[:garbledMessagesLoc] // Return multistoppable
} }
func broadcastMessageReception(message *parse.Message, func MessageReceiver(ctx *context.Context, messagesCh chan ClientMessage,
listeners *switchboard.Switchboard) { quitCh chan bool) {
done := false
listeners.Speak(message) for !done {
} select {
case <-quitCh:
// Put a sender ID in a byte slice and set its type to user done = true
func makeUserID(senderID []byte) (*id.ID, error) { case m := <-messagesCh:
senderIDBytes := make([]byte, id.ArrIDLen) ReceiveMessage(ctx, m) // defined elsewhere...
copy(senderIDBytes, senderID[:])
userID, err := id.Unmarshal(senderIDBytes)
if userID != nil {
userID.SetType(id.User)
}
return userID, err
} }
// skipErrChecker checks checks if the error is fatal or should be ignored
func skipErrChecker(err error) bool {
if strings.Contains(err.Error(), "Could not find any message IDs for this user") {
return true
} }
return false
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Copyright © 2019 Privategrity Corporation / // Copyright © 2020 Privategrity Corporation /
// / // /
// All rights reserved. / // All rights reserved. /
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -7,282 +7,35 @@ ...@@ -7,282 +7,35 @@
package network package network
import ( import (
"fmt" "gitlab.com/elixxir/client/context/message"
"github.com/pkg/errors" "gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/client/crypto" "gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/globals"
"gitlab.com/elixxir/client/network/keyExchange"
"gitlab.com/elixxir/client/keyStore"
"gitlab.com/elixxir/client/parse"
"gitlab.com/elixxir/client/user"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/network" "gitlab.com/elixxir/comms/network"
"gitlab.com/elixxir/crypto/cmix"
"gitlab.com/elixxir/crypto/csprng"
"gitlab.com/elixxir/crypto/e2e"
"gitlab.com/elixxir/crypto/hash"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/switchboard"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"time"
) )
// SendMessage to the provided Recipient // SendE2E sends an end-to-end payload to the provided recipient with
// TODO: It's not clear why we wouldn't hand off a sender object (with // the provided msgType. Returns the list of rounds in which parts of
// the keys) here. I won't touch crypto at this time, though... // the message were sent or an error if it fails.
// TODO This method would be cleaner if it took a parse.Message (particularly func (m *Manager) SendE2E(m message.Send, e2eP params.E2E, cmixP params.CMIX) (
// w.r.t. generating message IDs for multi-part messages.) []id.Round, error) {
func (rm *ReceptionManager) SendMessage(session user.Session, topology *connect.Circuit, return nil, nil
recipientID *id.ID, cryptoType parse.CryptoType, }
message []byte, transmissionHost *connect.Host) error {
// FIXME: We should really bring the plaintext parts of the NewMessage logic // SendUnsafe sends an unencrypted payload to the provided recipient
// into this module, then have an EncryptedMessage type that is sent to/from // with the provided msgType. Returns the list of rounds in which parts
// the cMix network. This NewMessage does way too many things: break the // of the message were sent or an error if it fails.
// message into parts, generate mic's, etc -- the crypto library should only // NOTE: Do not use this function unless you know what you are doing.
// know about the crypto and advertise a max message payload size // This function always produces an error message in client logging.
func (m *Manager) SendUnsafe(m message.Send) ([]id.Round, error) {
// TBD: Is there a really good reason why we'd ever have more than one user return nil, nil
// in this library? why not pass a sender object instead? }
globals.Log.DEBUG.Printf("Sending message to %q: %q", *recipientID, message)
parts, err := parse.Partition([]byte(message), // SendCMIX sends a "raw" CMIX message payload to the provided
rm.nextId()) // recipient. Note that both SendE2E and SendUnsafe call SendCMIX.
if err != nil { // Returns the round ID of the round the payload was sent or an error
return fmt.Errorf("SendMessage Partition() error: %v", err.Error()) // if it fails.
} func (m *Manager) SendCMIX(message format.Message) (id.Round, error) {
// Every part should have the same timestamp return nil, nil
now := time.Now()
// GO Timestamp binary serialization is 15 bytes, which
// allows the encrypted timestamp to fit in 16 bytes
// using AES encryption
// The timestamp will be encrypted later
// NOTE: This sets 15 bytes, not 16
nowBytes, err := now.MarshalBinary()
if err != nil {
return fmt.Errorf("SendMessage MarshalBinary() error: %v", err.Error())
}
// Add a byte for later encryption (15->16 bytes)
extendedNowBytes := append(nowBytes, 0)
for i := range parts {
message := format.NewMessage()
message.SetRecipient(recipientID)
message.SetTimestamp(extendedNowBytes)
message.Contents.SetRightAligned(parts[i])
err = rm.send(session, topology, cryptoType, message, false, transmissionHost)
if err != nil {
return errors.Wrap(err, "SendMessage send() error:")
}
}
return nil
}
// Send Message without doing partitions
// This function will be needed for example to send a Rekey
// message, where a new public key will take up the whole message
func (rm *ReceptionManager) SendMessageNoPartition(session user.Session,
topology *connect.Circuit, recipientID *id.ID, cryptoType parse.CryptoType,
message []byte, transmissionHost *connect.Host) error {
size := len(message)
if size > format.TotalLen {
return fmt.Errorf("SendMessageNoPartition() error: message to be sent is too big")
}
now := time.Now()
// GO Timestamp binary serialization is 15 bytes, which
// allows the encrypted timestamp to fit in 16 bytes
// using AES encryption
// The timestamp will be encrypted later
// NOTE: This sets 15 bytes, not 16
nowBytes, err := now.MarshalBinary()
if err != nil {
return fmt.Errorf("SendMessageNoPartition MarshalBinary() error: %v", err.Error())
}
msg := format.NewMessage()
msg.SetRecipient(recipientID)
// Add a byte to support later encryption (15 -> 16 bytes)
nowBytes = append(nowBytes, 0)
msg.SetTimestamp(nowBytes)
msg.Contents.Set(message)
globals.Log.DEBUG.Printf("Sending message to %v: %x", *recipientID, message)
err = rm.send(session, topology, cryptoType, msg, true, transmissionHost)
if err != nil {
return fmt.Errorf("SendMessageNoPartition send() error: %v", err.Error())
}
return nil
}
// send actually sends the message to the server
func (rm *ReceptionManager) send(session user.Session, topology *connect.Circuit,
cryptoType parse.CryptoType,
message *format.Message,
rekey bool, transmitGateway *connect.Host) error {
userData, err := SessionV2.GetUserData()
if err != nil {
return err
}
// Enable transmission blocking if enabled
if rm.blockTransmissions {
rm.sendLock.Lock()
defer func() {
time.Sleep(rm.transmitDelay)
rm.sendLock.Unlock()
}()
}
uid := userData.ThisUser.User
// Check message type
if cryptoType == parse.E2E {
handleE2ESending(session, rm.switchboard, message, rekey)
} else {
padded, err := e2e.Pad(message.Contents.GetRightAligned(), format.ContentsLen)
if err != nil {
return err
}
message.Contents.Set(padded)
e2e.SetUnencrypted(message)
fp := format.NewFingerprint(uid.Marshal()[:32])
message.SetKeyFP(*fp)
}
// CMIX Encryption
salt := cmix.NewSalt(csprng.Source(&csprng.SystemRNG{}), 32)
encMsg, kmacs := crypto.CMIXEncrypt(session, topology, salt, message)
// Construct slot message
msgPacket := &pb.Slot{
SenderID: uid.Marshal(),
PayloadA: encMsg.GetPayloadA(),
PayloadB: encMsg.GetPayloadB(),
Salt: salt,
KMACs: kmacs,
}
// Retrieve the base key for the zeroeth node
nodeKeys, err := SessionV2.GetNodeKeysFromCircuit(topology)
if err != nil {
globals.Log.ERROR.Printf("could not get nodeKeys: %+v", err)
return err
}
nk := nodeKeys[0]
clientGatewayKey := cmix.GenerateClientGatewayKey(nk.TransmissionKey)
// Hash the clientGatewayKey and the the slot's salt
h, _ := hash.NewCMixHash()
h.Write(clientGatewayKey)
h.Write(msgPacket.Salt)
hashed := h.Sum(nil)
h.Reset()
// Construct the gateway message
msg := &pb.GatewaySlot{
Message: msgPacket,
RoundID: 0,
}
// Hash the gatewaySlotDigest and the above hashed data
gatewaySlotDigest := network.GenerateSlotDigest(msg)
h.Write(hashed)
h.Write(gatewaySlotDigest)
// Place the hashed data as the message's MAC
msg.MAC = h.Sum(nil)
// Send the message
gwSlotResp, err := rm.Comms.SendPutMessage(transmitGateway, msg)
if err != nil {
return err
}
if !gwSlotResp.Accepted {
return errors.Errorf("Message was refused!")
}
return err
}
// FIXME: hand off all keys via a context variable or other solution.
func handleE2ESending(session user.Session, switchb *switchboard.Switchboard,
message *format.Message,
rekey bool) {
recipientID, err := message.GetRecipient()
if err != nil {
globals.Log.ERROR.Panic(err)
}
userData, err := SessionV2.GetUserData()
if err != nil {
globals.Log.FATAL.Panicf("Couldn't get userData: %+v ", err)
}
var key *keyStore.E2EKey
var action keyStore.Action
// Get KeyManager for this partner
km := session.GetKeyStore().GetSendManager(recipientID)
if km == nil {
partners := session.GetKeyStore().GetPartners()
globals.Log.INFO.Printf("Valid Partner IDs: %+v", partners)
globals.Log.FATAL.Panicf("Couldn't get KeyManager to E2E encrypt message to"+
" user %v", *recipientID)
}
// FIXME: This is a hack to prevent a crash, this function should be
// able to block until this condition is true.
for end, timeout := false, time.After(60*time.Second); !end; {
if rekey {
// Get send Rekey
key, action = km.PopRekey()
} else {
// Get send key
key, action = km.PopKey()
}
if key != nil {
end = true
}
select {
case <-timeout:
end = true
default:
}
}
if key == nil {
globals.Log.FATAL.Panicf("Couldn't get key to E2E encrypt message to"+
" user %v", *recipientID)
} else if action == keyStore.Purge {
// Destroy this key manager
km := key.GetManager()
km.Destroy(session.GetKeyStore())
globals.Log.WARN.Printf("Destroying E2E Send Keys Manager for partner: %v", *recipientID)
} else if action == keyStore.Deleted {
globals.Log.FATAL.Panicf("Key Manager is deleted when trying to get E2E Send Key")
}
if action == keyStore.Rekey {
// Send RekeyTrigger message to switchboard
rekeyMsg := &parse.Message{
Sender: userData.ThisUser.User,
TypedBody: parse.TypedBody{
MessageType: int32(keyExchange.Type_REKEY_TRIGGER),
Body: []byte{},
},
InferredType: parse.None,
Receiver: recipientID,
}
go switchb.Speak(rekeyMsg)
}
globals.Log.DEBUG.Printf("E2E encrypting message")
if rekey {
crypto.E2EEncryptUnsafe(userData.E2EGrp,
key.GetKey(),
key.KeyFingerprint(),
message)
} else {
crypto.E2EEncrypt(userData.E2EGrp,
key.GetKey(),
key.KeyFingerprint(),
message)
}
} }
...@@ -35,64 +35,6 @@ func (c ChanStop) Close(timeout time.Duration) { ...@@ -35,64 +35,6 @@ func (c ChanStop) Close(timeout time.Duration) {
} }
} }
// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable
// structure
func StartTrackNetwork(ctx *context.Context) Stoppable {
stopper := ChanStop{
name: "TrackNetwork",
quit: make(chan bool),
}
go TrackNetwork(ctx, stopper.quit)
return stopper
}
// TrackNetwork polls the network to get updated on the state of nodes, the
// round status, and informs the client when messages can be retrieved.
func TrackNetwork(ctx *context.Context, quitCh chan bool) {
ticker := timer.NewTicker(ctx.GetTrackNetworkPeriod())
done := false
for !done {
select {
case <-quitCh:
done = true
case <-ticker:
trackNetwork(ctx)
}
}
}
func trackNetwork(ctx) {
gateway, err := ctx.Session.GetNodeKeys().GetGatewayForSending()
if err != nil {
//...
}
network := ctx.GetNetwork()
ndf, err := network.PollNDF(ctx, gateway)
if err != nil {
// ....
}
newNodes, removedNodes := network.UpdateNDF(ndf)
for _, n := range newNodes {
network.addNodeCh <- n
}
for _, n := range removedNodes {
network.removeNodeCh <- n
}
rounds, err = network.UpdateRounds(ctx, ndf)
if err != nil {
// ...
}
err = rounds.GetKnownRound().MaskedRange(gateway,
network.CheckRoundsFunction)
if err != nil {
// ...
}
}
func StartProcessHistoricalRounds(ctx *context.Context) Stoppable { func StartProcessHistoricalRounds(ctx *context.Context) Stoppable {
stopper := ChanStop{ stopper := ChanStop{
name: "ProcessHistoricalRounds", name: "ProcessHistoricalRounds",
...@@ -142,34 +84,6 @@ func processHistoricalRounds(ctx *context.Context, rids []RoundID) []*RoundInfo ...@@ -142,34 +84,6 @@ func processHistoricalRounds(ctx *context.Context, rids []RoundID) []*RoundInfo
return ris return ris
} }
func StartMessageReceivers(ctx *context.Context) Stoppable {
// We assume receivers channel is set up elsewhere, but note that this
// would also be a reasonable place under assumption of 1 call to
// message receivers (would also make sense to .Close it instead of
// using quit channel, which somewhat simplifies for loop later.
receiverCh := ctx.GetNetwork().GetMessageReceiverCh()
for i := 0; i < ctx.GetNumReceivers(); i++ {
// quitCh created for each thread, add to multistop
quitCh := make(chan bool)
go MessageReceiver(ctx, messagesCh, quitCh)
}
// Return multistoppable
}
func MessageReceiver(ctx *context.Context, messagesCh chan ClientMessage,
quitCh chan bool) {
done := false
for !done {
select {
case <-quitCh:
done = true
case m := <-messagesCh:
ReceiveMessage(ctx, m) // defined elsewhere...
}
}
}
func StartNodeKeyExchange(ctx *context.Context) { func StartNodeKeyExchange(ctx *context.Context) {
keyCh := ctx.GetNetwork().GetNodeKeysCh() keyCh := ctx.GetNetwork().GetNodeKeysCh()
for i := 0; i < ctx.GetNumNodeKeyExchangers(); i++ { for i := 0; i < ctx.GetNumNodeKeyExchangers(); i++ {
......
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package network
// updates.go tracks the network for:
// 1. Node addition and removal
// 2. New/Active/Complete rounds and their contact gateways
// This information is tracked by polling a gateway for the network definition
// file (NDF). Once it detects an event it sends it off to the proper channel
// for a worker to update the client state (add/remove a node, check for
// messages at a gateway, etc). See:
// - nodes.go for add/remove node events
// - rounds.go for round event handling & processing
// - receive.go for message handling
import (
"gitlab.com/elixxir/client/context"
)
// GetUpdates polls the network for updates.
func (m *Manager) GetUpdates() (*network.Instance, error) {
return nil, nil
}
// StartTrackNetwork starts a single TrackNetwork thread and returns a stoppable
func StartTrackNetwork(ctx *context.Context) Stoppable {
stopper := ChanStop{
name: "TrackNetwork",
quit: make(chan bool),
}
go TrackNetwork(ctx, stopper.quit)
return stopper
}
// TrackNetwork polls the network to get updated on the state of nodes, the
// round status, and informs the client when messages can be retrieved.
func TrackNetwork(ctx *context.Context, quitCh chan bool) {
ticker := timer.NewTicker(ctx.GetTrackNetworkPeriod())
done := false
for !done {
select {
case <-quitCh:
done = true
case <-ticker:
trackNetwork(ctx)
}
}
}
func trackNetwork(ctx) {
gateway, err := ctx.Session.GetNodeKeys().GetGatewayForSending()
if err != nil {
//...
}
network := ctx.GetNetwork()
ndf, err := network.PollNDF(ctx, gateway)
if err != nil {
// ....
}
newNodes, removedNodes := network.UpdateNDF(ndf)
for _, n := range newNodes {
network.addNodeCh <- n
}
for _, n := range removedNodes {
network.removeNodeCh <- n
}
rounds, err = network.UpdateRounds(ctx, ndf)
if err != nil {
// ...
}
err = rounds.GetKnownRound().MaskedRange(gateway,
network.CheckRoundsFunction)
if err != nil {
// ...
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment