diff --git a/context/stoppable/single.go b/context/stoppable/single.go index acd8da0077427215934dae20b68ff9346396492c..c5eadaefa40e8070ebb4b85ae710ff65d0f8fab6 100644 --- a/context/stoppable/single.go +++ b/context/stoppable/single.go @@ -32,7 +32,7 @@ func (s *Single) IsRunning() bool { } // Quit returns the read only channel it will send the stop signal on. -func (s *Single) Quit() chan<- struct{} { +func (s *Single) Quit() <-chan struct{} { return s.quit } diff --git a/network/manager.go b/network/manager.go index 90e0184ff8dd521bafaffbcc4fb4f8a58801aba1..c8b5a4173710665924057b3661ea13c399c97561 100644 --- a/network/manager.go +++ b/network/manager.go @@ -14,8 +14,10 @@ import ( "gitlab.com/elixxir/client/context" "gitlab.com/elixxir/client/context/stoppable" "gitlab.com/elixxir/client/network/health" + "gitlab.com/elixxir/client/network/parse" "gitlab.com/elixxir/comms/client" "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" // "gitlab.com/xx_network/primitives/ndf" @@ -41,6 +43,9 @@ type Manager struct { //contains the network instance instance *network.Instance + //Partitioner + partitioner parse.Partitioner + //channels nodeRegistration chan network.NodeGateway @@ -74,13 +79,16 @@ func NewManager(ctx *context.Context) (*Manager, error) { " client network manager") } + msgSize := format.NewMessage(ctx.Session.Cmix().GetGroup().GetP().ByteLen()).ContentsSize() + cm := &Manager{ - Comms: comms, - Context: ctx, - runners: stoppable.NewMulti("network.Manager"), - health: health.Init(ctx, 5*time.Second), - instance: instance, - uid: cryptoUser.GetUserID(), + Comms: comms, + Context: ctx, + runners: stoppable.NewMulti("network.Manager"), + health: health.Init(ctx, 5*time.Second), + instance: instance, + uid: cryptoUser.GetUserID(), + partitioner: parse.NewPartitioner(msgSize, ctx), } return cm, nil diff --git a/network/parse/partition.go b/network/parse/partition.go index 8818e3628c3abb020bdaa48ee05e4eef9fa2630b..001c97e1b4ace9507f09e1b8bcf0edd2d3cfc1b9 100644 --- a/network/parse/partition.go +++ b/network/parse/partition.go @@ -17,10 +17,10 @@ type Partitioner struct { partContentsSize int deltaFirstPart int maxSize int - ctx context.Context + ctx *context.Context } -func NewPartitioner(messageSize int, ctx context.Context) Partitioner { +func NewPartitioner(messageSize int, ctx *context.Context) Partitioner { p := Partitioner{ baseMessageSize: messageSize, firstContentsSize: messageSize - firstHeaderLen, diff --git a/network/send.go b/network/send.go deleted file mode 100644 index 0b54f9a33f359eb7f50cdcd58e39e6d2a2511b28..0000000000000000000000000000000000000000 --- a/network/send.go +++ /dev/null @@ -1,38 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2020 Privategrity Corporation / -// / -// All rights reserved. / -//////////////////////////////////////////////////////////////////////////////// - -package network - -import ( - "github.com/pkg/errors" - "gitlab.com/elixxir/client/context/message" - "gitlab.com/elixxir/client/context/params" - "gitlab.com/xx_network/primitives/id" -) - -// SendE2E sends an end-to-end payload to the provided recipient with -// the provided msgType. Returns the list of rounds in which parts of -// the message were sent or an error if it fails. -func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E, cmixP params.CMIX) ( - []id.Round, error) { - - if !m.health.IsRunning() { - return nil, errors.New("Cannot send e2e message when the " + - "network is not healthy") - } - - return nil, nil -} - -// SendUnsafe sends an unencrypted payload to the provided recipient -// with the provided msgType. Returns the list of rounds in which parts -// of the message were sent or an error if it fails. -// NOTE: Do not use this function unless you know what you are doing. -// This function always produces an error message in client logging. -func (m *Manager) SendUnsafe(msg message.Send) ([]id.Round, error) { - return nil, nil -} - diff --git a/network/sendE2E.go b/network/sendE2E.go new file mode 100644 index 0000000000000000000000000000000000000000..24290abc1eadadd063ba89d5ec530257177d2211 --- /dev/null +++ b/network/sendE2E.go @@ -0,0 +1,96 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 Privategrity Corporation / +// / +// All rights reserved. / +//////////////////////////////////////////////////////////////////////////////// + +package network + +import ( + "github.com/pkg/errors" + "gitlab.com/elixxir/client/context/message" + "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + "sync" + "time" +) + +// SendE2E sends an end-to-end payload to the provided recipient with +// the provided msgType. Returns the list of rounds in which parts of +// the message were sent or an error if it fails. +func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) ( + []id.Round, error) { + + if !m.health.IsRunning() { + return nil, errors.New("Cannot send e2e message when the " + + "network is not healthy") + } + + return m.sendE2E(msg, e2eP) +} + +func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) { + + //timestamp the message + ts := time.Now() + + //partition the message + partitions, err := m.partitioner.Partition(msg.Recipient, msg.MessageType, ts, + msg.Payload) + if err != nil { + return nil, errors.WithMessage(err, "failed to send unsafe message") + } + + //encrypt then send the partitions over cmix + roundIds := make([]id.Round, len(partitions)) + errCh := make(chan error, len(partitions)) + + // get the key manager for the partner + partner, err := m.Context.Session.E2e().GetPartner(msg.Recipient) + if err != nil { + return nil, errors.WithMessagef(err, "Could not send End to End encrypted "+ + "message, no relationship found with %s", partner) + } + + wg := sync.WaitGroup{} + + for i, p := range partitions { + //create the cmix message + msgCmix := format.NewMessage(m.Context.Session.Cmix().GetGroup().GetP().ByteLen()) + msgCmix.SetContents(p) + + //get a key to end to end encrypt + key, err := partner.GetKeyForSending(param.Type) + if err != nil { + return nil, errors.WithMessagef(err, "Failed to get key "+ + "for end to end encryption") + } + + //end to end encrypt the cmix message + msgEnc := key.Encrypt(msgCmix) + + //send the cmix message, each partition in its own thread + wg.Add(1) + go func(i int) { + var err error + roundIds[i], err = m.sendCMIX(msgEnc, param.CMIX) + if err != nil { + errCh <- err + } + wg.Done() + }(i) + } + + wg.Wait() + + //see if any parts failed to send + numFail, errRtn := getSendErrors(errCh) + if numFail > 0 { + return nil, errors.Errorf("Failed to E2E send %v/%v sub payloads:"+ + " %s", numFail, len(partitions), errRtn) + } + + //return the rounds if everything send successfully + return roundIds, nil +} diff --git a/network/sendUnsafe.go b/network/sendUnsafe.go index 1ae2e9d505591696853ae1c2f171bd6c7edcfbe1..ecef2431ac7d2de4a88e3c15fa97040746dfadcc 100644 --- a/network/sendUnsafe.go +++ b/network/sendUnsafe.go @@ -1 +1,93 @@ package network + +import ( + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/context/message" + "gitlab.com/elixxir/client/context/params" + "gitlab.com/elixxir/crypto/e2e" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + "sync" + "time" +) + +// SendUnsafe sends an unencrypted payload to the provided recipient +// with the provided msgType. Returns the list of rounds in which parts +// of the message were sent or an error if it fails. +// NOTE: Do not use this function unless you know what you are doing. +// This function always produces an error message in client logging. +func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { + if !m.health.IsRunning() { + return nil, errors.New("cannot send unsafe message when the " + + "network is not healthy") + } + + jww.WARN.Println("Sending unsafe message. Unsafe payloads have no end" + + " to end encryption, they have limited security and privacy " + + "preserving properties") + + return m.sendUnsafe(msg, param) +} + +func (m *Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) { + + //timestamp the message + ts := time.Now() + + //partition the message + partitions, err := m.partitioner.Partition(msg.Recipient, msg.MessageType, ts, + msg.Payload) + + if err != nil { + return nil, errors.WithMessage(err, "failed to send unsafe message") + } + + //send the partitions over cmix + roundIds := make([]id.Round, len(partitions)) + errCh := make(chan error, len(partitions)) + + wg := sync.WaitGroup{} + + for i, p := range partitions { + msgCmix := format.NewMessage(m.Context.Session.Cmix().GetGroup().GetP().ByteLen()) + msgCmix.SetContents(p) + e2e.SetUnencrypted(msgCmix, msg.Recipient) + wg.Add(1) + go func(i int) { + var err error + roundIds[i], err = m.sendCMIX(msgCmix, param.CMIX) + if err != nil { + errCh <- err + } + wg.Done() + }(i) + } + + //see if any parts failed to send + numFail, errRtn := getSendErrors(errCh) + if numFail > 0 { + return nil, errors.Errorf("Failed to send %v/%v sub payloads:"+ + " %s", numFail, len(partitions), errRtn) + } + + //return the rounds if everything send successfully + return roundIds, nil +} + +//returns any errors on the error channel +func getSendErrors(c chan error) (int, string) { + var errRtn string + numFail := 0 + done := false + for !done { + select { + case err := <-c: + errRtn += err.Error() + numFail++ + default: + done = true + } + } + return numFail, errRtn +} diff --git a/storage/e2e/manager.go b/storage/e2e/manager.go index 511f9c73746a9d850afa1f8bae7e73fef70eb721..69e4aa569f896c85c7af4f40a0bfa02cd1bc402b 100644 --- a/storage/e2e/manager.go +++ b/storage/e2e/manager.go @@ -9,12 +9,11 @@ package e2e import ( "fmt" "github.com/pkg/errors" - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/context/params" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/cyclic" - "gitlab.com/xx_network/primitives/id" dh "gitlab.com/elixxir/crypto/diffieHellman" + "gitlab.com/xx_network/primitives/id" ) const managerPrefix = "Manager{partner:%s}" @@ -121,7 +120,7 @@ func (m *Manager) NewReceiveSession(partnerPubKey *cyclic.Int, params SessionPar // none is passed func (m *Manager) NewSendSession(myPrivKey *cyclic.Int, params SessionParams) *Session { //find the latest public key from the other party - sourceSession := m.receive.GetNewestRekeyableSession() + sourceSession := m.receive.getNewestRekeyableSession() //create the session session := newSession(m, myPrivKey, sourceSession.partnerPubKey, nil, @@ -134,20 +133,20 @@ func (m *Manager) NewSendSession(myPrivKey *cyclic.Int, params SessionParams) *S } // gets the correct session to send with depending on the type of send -func (m *Manager) GetSessionForSending(st params.SendType) *Session { +func (m *Manager) GetKeyForSending(st params.SendType) (*Key, error) { switch st { case params.Standard: - return m.send.GetSessionForSending() + return m.send.getKeyForSending() case params.KeyExchange: - return m.send.GetNewestRekeyableSession() + return m.send.getKeyForRekey() default: - jww.ERROR.Printf("Cannot get session for invalid Send Type: %s", - st) } - return nil + return nil, errors.Errorf("Cannot get session for invalid "+ + "Send Type: %s", st) } + // gets the send session of the passed ID. Returns nil if no session is found func (m *Manager) GetSendSession(sessionID SessionID) *Session { return m.send.GetByID(sessionID) diff --git a/storage/e2e/sessionBuff.go b/storage/e2e/sessionBuff.go index 5f47d8412ffe2ec60c3a71ab26f96c5b43f4e095..47d4674044bb01a4073b0ca8651d9b5ebc517513 100644 --- a/storage/e2e/sessionBuff.go +++ b/storage/e2e/sessionBuff.go @@ -29,7 +29,8 @@ type sessionBuff struct { key string - mux sync.RWMutex + mux sync.RWMutex + sendMux sync.Mutex } func NewSessionBuff(manager *Manager, key string) *sessionBuff { @@ -151,14 +152,23 @@ func (sb *sessionBuff) GetNewest() *Session { return sb.sessions[0] } -// returns the session which is most likely to be successful for sending -func (sb *sessionBuff) GetSessionForSending() *Session { - //dont need to take the lock due to the use of a copy of the buffer - sessions := sb.getInternalBufferShallowCopy() - if len(sessions) == 0 { - return nil +// returns the key which is most likely to be successful for sending +func (sb *sessionBuff) getKeyForSending() (*Key, error) { + sb.sendMux.Lock() + defer sb.sendMux.Unlock() + s := sb.getSessionForSending() + if s == nil { + return nil, errors.New("Failed to find a session for sending") } + return s.PopKey() +} + + +// returns the session which is most likely to be successful for sending +func (sb *sessionBuff) getSessionForSending() *Session { + sessions := sb.sessions + var confirmedRekey []*Session var unconfirmedActive []*Session var unconfirmedRekey []*Session @@ -204,8 +214,20 @@ func (sb *sessionBuff) TriggerNegotiation() []*Session { return instructions } +// returns the key which is most likely to be successful for sending +func (sb *sessionBuff) getKeyForRekey() (*Key, error) { + sb.sendMux.Lock() + defer sb.sendMux.Unlock() + s := sb.getNewestRekeyableSession() + if s == nil { + return nil, errors.New("Failed to find a session for rekeying") + } + + return s.PopReKey() +} + // returns the newest session which can be used to start a key negotiation -func (sb *sessionBuff) GetNewestRekeyableSession() *Session { +func (sb *sessionBuff) getNewestRekeyableSession() *Session { //dont need to take the lock due to the use of a copy of the buffer sessions := sb.getInternalBufferShallowCopy()