diff --git a/README.md b/README.md index b7b0c4585e5bbc5c6d533c7db63579fde1928d32..e494e57d83614256744b08ea2adf0af371103f4d 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ Flags: --e2eMaxKeys uint Max keys used before blocking until a rekey completes (default 800) --e2eMinKeys uint Minimum number of keys used before requesting rekey (default 500) --e2eNumReKeys uint Number of rekeys reserved for rekey operations (default 16) + --e2eRekeyThreshold float64 Number between 0 an 1. Percent of keys used before a rekey is started --forceHistoricalRounds Force all rounds to be sent to historical round retrieval --forceMessagePickupRetry Enable a mechanism which forces a 50% chance of no message pickup, instead triggering the message pickup retry mechanism -h, --help help for client diff --git a/api/client.go b/api/client.go index 1e53bd0fadde17f9070806ce22d52398bc5ece0c..c705187068226c528ac1ac24f1dfd28c49f1eb8f 100644 --- a/api/client.go +++ b/api/client.go @@ -378,7 +378,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, //// procedures and is generally unsafe. func LoginWithProtoClient(storageDir string, password []byte, protoClientJSON []byte, newBaseNdf string, parameters params.Network) (*Client, error) { - jww.INFO.Printf("LoginWithNewBaseNDF_UNSAFE()") + jww.INFO.Printf("LoginWithProtoClient()") // Parse the NDF def, err := parseNDF(newBaseNdf) @@ -407,6 +407,11 @@ func LoginWithProtoClient(storageDir string, password []byte, protoClientJSON [] //store the updated base NDF c.storage.SetNDF(def) + err = c.initPermissioning(def) + if err != nil { + return nil, err + } + // Initialize network and link it to context c.network, err = network.NewManager(c.storage, c.switchboard, c.rng, c.events, c.comms, parameters, def) diff --git a/auth/callback.go b/auth/callback.go index 5925c7a4d21f705e861b55c51740744562071b62..3151fdb43de00b988d3356ea1833c9979c83cfba 100644 --- a/auth/callback.go +++ b/auth/callback.go @@ -50,7 +50,13 @@ func (m *Manager) StartProcesses() (stoppable.Stoppable, error) { func (m *Manager) processAuthMessage(msg message.Receive) { authStore := m.storage.Auth() //lookup the message, check if it is an auth request - cmixMsg := format.Unmarshal(msg.Payload) + cmixMsg, err := format.Unmarshal(msg.Payload) + if err != nil { + jww.WARN.Printf("Invalid message when unmarshalling: %s", + err.Error()) + // Ignore this message + return + } fp := cmixMsg.GetKeyFP() jww.INFO.Printf("RAW AUTH FP: %v", fp) // this takes the request lock if it is a specific fp, all diff --git a/bindings/notifications.go b/bindings/notifications.go index 7c7bc07754d662cfa3b5a3993dcbde27152712f9..71db73728f59e1fcd4d382fe79de310eddf860d4 100644 --- a/bindings/notifications.go +++ b/bindings/notifications.go @@ -11,8 +11,8 @@ import ( "encoding/json" "github.com/pkg/errors" "gitlab.com/elixxir/client/storage/edge" - pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/crypto/fingerprint" + "gitlab.com/elixxir/primitives/notifications" ) type NotificationForMeReport struct { @@ -67,7 +67,8 @@ func NotificationsForMe(notifCSV, preimages string) (*ManyNotificationForMeRepor "cannot check if notification is for me") } - list, err := pb.DecodeNotificationsCSV(notifCSV) + list, err := notifications.DecodeNotificationsCSV(notifCSV) + if err != nil { return nil, err } diff --git a/cmd/root.go b/cmd/root.go index d94817ec6da64b13d0e99f3a042125286ef1d6fd..28dd11f80212f4c8e1e7749020c54df2d042707b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -573,6 +573,7 @@ func createClient() *api.Client { netParams.E2EParams.MaxKeys = uint16(viper.GetUint("e2eMaxKeys")) netParams.E2EParams.NumRekeys = uint16( viper.GetUint("e2eNumReKeys")) + netParams.E2EParams.RekeyThreshold = viper.GetFloat64("e2eRekeyThreshold") netParams.ForceHistoricalRounds = viper.GetBool("forceHistoricalRounds") netParams.FastPolling = !viper.GetBool("slowPolling") netParams.ForceMessagePickupRetry = viper.GetBool("forceMessagePickupRetry") @@ -596,6 +597,7 @@ func initClient() *api.Client { netParams.E2EParams.MaxKeys = uint16(viper.GetUint("e2eMaxKeys")) netParams.E2EParams.NumRekeys = uint16( viper.GetUint("e2eNumReKeys")) + netParams.E2EParams.RekeyThreshold = viper.GetFloat64("e2eRekeyThreshold") netParams.ForceHistoricalRounds = viper.GetBool("forceHistoricalRounds") netParams.FastPolling = viper.GetBool(" slowPolling") netParams.ForceMessagePickupRetry = viper.GetBool("forceMessagePickupRetry") @@ -1069,6 +1071,10 @@ func init() { "", uint(defaultE2EParams.NumRekeys), "Number of rekeys reserved for rekey operations") viper.BindPFlag("e2eNumReKeys", rootCmd.Flags().Lookup("e2eNumReKeys")) + rootCmd.Flags().Float64P("e2eRekeyThreshold", + "", defaultE2EParams.RekeyThreshold, + "Number between 0 an 1. Percent of keys used before a rekey is started") + viper.BindPFlag("e2eRekeyThreshold", rootCmd.Flags().Lookup("e2eRekeyThreshold")) rootCmd.Flags().String("profile-cpu", "", "Enable cpu profiling to this file") diff --git a/fileTransfer/manager_test.go b/fileTransfer/manager_test.go index b5951adaf07633ed26a95c9a844a5ee70a01b64c..c1133aa32a8f0f0e2fa09139678cdfd224e44fa2 100644 --- a/fileTransfer/manager_test.go +++ b/fileTransfer/manager_test.go @@ -84,6 +84,19 @@ func TestManager_Send(t *testing.T) { retry := float32(1.5) numFps := calcNumberOfFingerprints(numParts, retry) + rng := csprng.NewSystemRNG() + dhKey := m.store.E2e().GetGroup().NewInt(42) + pubKey := diffieHellman.GeneratePublicKey(dhKey, m.store.E2e().GetGroup()) + _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, rng) + theirSidhPub, _ := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhB, rng) + p := params.GetDefaultE2ESessionParams() + + err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, + mySidhPriv, theirSidhPub, p, p) + if err != nil { + t.Errorf("Failed to add partner %s: %+v", recipient, err) + } + tid, err := m.Send( fileName, fileType, fileData, recipient, retry, preview, nil, 0) if err != nil { @@ -244,9 +257,22 @@ func TestManager_Send_SendE2eError(t *testing.T) { preview := []byte("filePreview") retry := float32(1.5) + rng := csprng.NewSystemRNG() + dhKey := m.store.E2e().GetGroup().NewInt(42) + pubKey := diffieHellman.GeneratePublicKey(dhKey, m.store.E2e().GetGroup()) + _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, rng) + theirSidhPub, _ := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhB, rng) + p := params.GetDefaultE2ESessionParams() + + err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, + mySidhPriv, theirSidhPub, p, p) + if err != nil { + t.Errorf("Failed to add partner %s: %+v", recipient, err) + } + expectedErr := fmt.Sprintf(newFtSendE2eErr, recipient, "") - _, err := m.Send( + _, err = m.Send( fileName, fileType, fileData, recipient, retry, preview, nil, 0) if err == nil || !strings.Contains(err.Error(), expectedErr) { t.Errorf("Send did not return the expected error when the E2E message "+ diff --git a/fileTransfer/receive.go b/fileTransfer/receive.go index c1974176d183aed4ff24f9777d35e78d7f9b1753..9a70f5853212bae431653c096d9168d567008b2a 100644 --- a/fileTransfer/receive.go +++ b/fileTransfer/receive.go @@ -59,7 +59,10 @@ func (m *Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) // error, it can be either marked as used not used. func (m *Manager) readMessage(msg message.Receive) (format.Message, error) { // Unmarshal payload into cMix message - cMixMsg := format.Unmarshal(msg.Payload) + cMixMsg, err := format.Unmarshal(msg.Payload) + if err != nil { + return cMixMsg, err + } // Unmarshal cMix message contents into a file part message partMsg, err := unmarshalPartMessage(cMixMsg.GetContents()) diff --git a/fileTransfer/sendNew.go b/fileTransfer/sendNew.go index e78b7f9c4f49ee96b55abf550e46083bf06dc2b6..99a935b737a6d0cfff495cc4e458ab1314fd3414 100644 --- a/fileTransfer/sendNew.go +++ b/fileTransfer/sendNew.go @@ -34,8 +34,18 @@ func (m *Manager) sendNewFileTransfer(recipient *id.ID, fileName, return errors.Errorf(newFtProtoMarshalErr, err) } + // Get partner relationship so that the silent preimage can be generated + relationship, err := m.store.E2e().GetPartner(recipient) + if err != nil { + return err + } + + // Sends as a silent message to avoid a notification + p := params.GetDefaultE2E() + p.CMIX.IdentityPreimage = relationship.GetSilentPreimage() + // Send E2E message - rounds, _, _, err := m.net.SendE2E(sendMsg, params.GetDefaultE2E(), nil) + rounds, _, _, err := m.net.SendE2E(sendMsg, p, nil) if err != nil && len(rounds) == 0 { return errors.Errorf(newFtSendE2eErr, recipient, err) } diff --git a/fileTransfer/sendNew_test.go b/fileTransfer/sendNew_test.go index 6465f359c41ce50b65781729777e5be3b6b3d6cc..8a4da00a6d666001813c728311496ba39fcf9a1b 100644 --- a/fileTransfer/sendNew_test.go +++ b/fileTransfer/sendNew_test.go @@ -9,9 +9,14 @@ package fileTransfer import ( "fmt" + "github.com/cloudflare/circl/dh/sidh" "github.com/golang/protobuf/proto" "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/interfaces/params" + util "gitlab.com/elixxir/client/storage/utility" + "gitlab.com/elixxir/crypto/diffieHellman" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" + "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/id" "reflect" "strings" @@ -31,6 +36,19 @@ func TestManager_sendNewFileTransfer(t *testing.T) { numParts, fileSize, retry := uint16(16), uint32(256), float32(1.5) preview := []byte("filePreview") + rng := csprng.NewSystemRNG() + dhKey := m.store.E2e().GetGroup().NewInt(42) + pubKey := diffieHellman.GeneratePublicKey(dhKey, m.store.E2e().GetGroup()) + _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, rng) + theirSidhPub, _ := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhB, rng) + p := params.GetDefaultE2ESessionParams() + + err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, + mySidhPriv, theirSidhPub, p, p) + if err != nil { + t.Errorf("Failed to add partner %s: %+v", recipient, err) + } + expected, err := newNewFileTransferE2eMessage(recipient, fileName, fileType, key, mac, numParts, fileSize, retry, preview) if err != nil { @@ -60,8 +78,21 @@ func TestManager_sendNewFileTransfer_E2eError(t *testing.T) { recipient := id.NewIdFromString("recipient", id.User, t) key, _ := ftCrypto.NewTransferKey(NewPrng(42)) + rng := csprng.NewSystemRNG() + dhKey := m.store.E2e().GetGroup().NewInt(42) + pubKey := diffieHellman.GeneratePublicKey(dhKey, m.store.E2e().GetGroup()) + _, mySidhPriv := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhA, rng) + theirSidhPub, _ := util.GenerateSIDHKeyPair(sidh.KeyVariantSidhB, rng) + p := params.GetDefaultE2ESessionParams() + + err := m.store.E2e().AddPartner(recipient, pubKey, dhKey, + mySidhPriv, theirSidhPub, p, p) + if err != nil { + t.Errorf("Failed to add partner %s: %+v", recipient, err) + } + expectedErr := fmt.Sprintf(newFtSendE2eErr, recipient, "") - err := m.sendNewFileTransfer(recipient, "", "", key, nil, 16, 256, 1.5, nil) + err = m.sendNewFileTransfer(recipient, "", "", key, nil, 16, 256, 1.5, nil) if err == nil || !strings.Contains(err.Error(), expectedErr) { t.Errorf("sendNewFileTransfer di dnot return the expected error when "+ "SendE2E failed.\nexpected: %s\nreceived: %+v", expectedErr, err) diff --git a/fileTransfer/send_test.go b/fileTransfer/send_test.go index 45c273e90bb41657c4ff9042808bcb1497bf93be..ee10658b1fe6c190cbaa9a04879c72829a726787 100644 --- a/fileTransfer/send_test.go +++ b/fileTransfer/send_test.go @@ -1007,7 +1007,7 @@ func TestManager_getPartSize(t *testing.T) { primeByteLen := m.store.Cmix().GetGroup().GetP().ByteLen() cmixMsgUsedLen := format.AssociatedDataSize filePartMsgUsedLen := fmMinSize - expected := 2*primeByteLen - cmixMsgUsedLen - filePartMsgUsedLen + expected := 2*primeByteLen - cmixMsgUsedLen - filePartMsgUsedLen-1 // Get the part size partSize, err := m.getPartSize() diff --git a/go.mod b/go.mod index ed3be37ff1c1041ef3c5092825036211e3848c71..aee6663ff61632b61834e8ea6e9f11547915197d 100644 --- a/go.mod +++ b/go.mod @@ -19,10 +19,10 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 - gitlab.com/elixxir/comms v0.0.4-0.20211229173444-33e7e70d1a86 - gitlab.com/elixxir/crypto v0.0.7-0.20211227194530-ccd1a1c13a48 + gitlab.com/elixxir/comms v0.0.4-0.20211230230712-35eff148e814 + gitlab.com/elixxir/crypto v0.0.7-0.20211230230452-bca020488964 gitlab.com/elixxir/ekv v0.1.5 - gitlab.com/elixxir/primitives v0.0.3-0.20211229172706-598759fdea5f + gitlab.com/elixxir/primitives v0.0.3-0.20211230224340-fc0905d8776e gitlab.com/xx_network/comms v0.0.4-0.20211227194445-c099754b3cda gitlab.com/xx_network/crypto v0.0.5-0.20211227194420-f311e8920467 gitlab.com/xx_network/primitives v0.0.4-0.20211222205802-03e9d7d835b0 diff --git a/go.sum b/go.sum index cc800676886870809281d40a21a196afd18e55fc..1c5a33fa3c527fcf960bf4a2f1197db006e40e6d 100644 --- a/go.sum +++ b/go.sum @@ -270,39 +270,25 @@ github.com/zeebo/pcg v1.0.0 h1:dt+dx+HvX8g7Un32rY9XWoYnd0NmKmrIzpHF7qiTDj0= github.com/zeebo/pcg v1.0.0/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 h1:Gi6rj4mAlK0BJIk1HIzBVMjWNjIUfstrsXC2VqLYPcA= gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= -gitlab.com/elixxir/comms v0.0.4-0.20211228002620-2500b6eeaf9f h1:I7f/t7yoxTGmD4lQT/BNHnUZEq1j9hMwmT/4v0ofJcA= -gitlab.com/elixxir/comms v0.0.4-0.20211228002620-2500b6eeaf9f/go.mod h1:NbCBXlCMOhf4CwkaASTcG1ysEjlWSHezxwQtiC43zPQ= -gitlab.com/elixxir/comms v0.0.4-0.20211229172415-5d1030537852 h1:rnMUMEjReDZ0APcPl9ihJMKCDwOL8o5C+ckPfpbDzIY= -gitlab.com/elixxir/comms v0.0.4-0.20211229172415-5d1030537852/go.mod h1:ZsjJQEUHZTCmvRMbdPu/vtZ6Gp/MRx3JEIWoAXWj69g= -gitlab.com/elixxir/comms v0.0.4-0.20211229173129-9c5c27d061ce h1:RlSLHFMyfT8oyBvEYerzsH5vlX/wf4F1U5/T0dIJ2AM= -gitlab.com/elixxir/comms v0.0.4-0.20211229173129-9c5c27d061ce/go.mod h1:4NeH/X0nA2QkHmqNuq+tDEq6cqRwgaHD6ggdwwZ2uHE= -gitlab.com/elixxir/comms v0.0.4-0.20211229173444-33e7e70d1a86 h1:Sy2az2aX0x74xYj1MPCetRHVtvALPGVUH9z9kG8kbyw= -gitlab.com/elixxir/comms v0.0.4-0.20211229173444-33e7e70d1a86/go.mod h1:4NeH/X0nA2QkHmqNuq+tDEq6cqRwgaHD6ggdwwZ2uHE= +gitlab.com/elixxir/comms v0.0.4-0.20211230230712-35eff148e814 h1:Zqz6MqjylcSa2wegq6EWN5KopehpyisAYSswOWHjxhE= +gitlab.com/elixxir/comms v0.0.4-0.20211230230712-35eff148e814/go.mod h1:r4xZgd+DPDujHTJZ6Y8+JXM2Ae2rZHa6rFggOPxs/Gc= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= -gitlab.com/elixxir/crypto v0.0.6/go.mod h1:V8lricBRpa8v1ySymXQ1/lsb+8/lSak5S7ZWRT6OACY= -gitlab.com/elixxir/crypto v0.0.7-0.20211227194530-ccd1a1c13a48 h1:ryuk6ZssItz7ArQANxC3ZJ5l2w3EoxaPQHUnUNpVUSw= -gitlab.com/elixxir/crypto v0.0.7-0.20211227194530-ccd1a1c13a48/go.mod h1:0ebir69661mjvXspL0wPg8g1F0/yZm9AdPZ343OlgPU= +gitlab.com/elixxir/crypto v0.0.7-0.20211230230452-bca020488964 h1:93S9QAWB9sXWShx60hbDndYj0ZXuZGam2II4cGgDqiU= +gitlab.com/elixxir/crypto v0.0.7-0.20211230230452-bca020488964/go.mod h1:fexaw14nwGMlT6vL9eIJ1ixgiomyAp88hSHl0Yx0/xU= gitlab.com/elixxir/ekv v0.1.5 h1:R8M1PA5zRU1HVnTyrtwybdABh7gUJSCvt1JZwUSeTzk= gitlab.com/elixxir/ekv v0.1.5/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg= gitlab.com/elixxir/primitives v0.0.0-20200804170709-a1896d262cd9/go.mod h1:p0VelQda72OzoUckr1O+vPW0AiFe0nyKQ6gYcmFSuF8= gitlab.com/elixxir/primitives v0.0.0-20200804182913-788f47bded40/go.mod h1:tzdFFvb1ESmuTCOl1z6+yf6oAICDxH2NPUemVgoNLxc= gitlab.com/elixxir/primitives v0.0.1/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE= -gitlab.com/elixxir/primitives v0.0.3-0.20201116174806-97f190989704/go.mod h1:3fxFHSlQhkV4vs+S0dZEz3Om3m+40WX8L806yvSnNFc= -gitlab.com/elixxir/primitives v0.0.3-0.20211227194457-8745b796d1f5/go.mod h1:ZCFCwLN5ZFfEXduWnzhRtoCEq56kLTHAmj5IDaRcwDs= -gitlab.com/elixxir/primitives v0.0.3-0.20211228002547-80bc4c20cac2 h1:sJZ8L0clBpQNTI3tHCAchKTZiuWI0MehcnQG8DPYI5M= -gitlab.com/elixxir/primitives v0.0.3-0.20211228002547-80bc4c20cac2/go.mod h1:zA+1Lp9fGPo6pl1QxtMoNPLeZJ1O5m4kcH7HNxePQnQ= -gitlab.com/elixxir/primitives v0.0.3-0.20211229171537-3c77f26aa483 h1:dRSk52OjUtq1PFhirOwJ8dRKXLhOr9odgtNsC0DSCWk= -gitlab.com/elixxir/primitives v0.0.3-0.20211229171537-3c77f26aa483/go.mod h1:zA+1Lp9fGPo6pl1QxtMoNPLeZJ1O5m4kcH7HNxePQnQ= -gitlab.com/elixxir/primitives v0.0.3-0.20211229172706-598759fdea5f h1:O4khwXGk8LlPpEhtwGz/PXYo8TY0bp/12kmd2lmPWgI= -gitlab.com/elixxir/primitives v0.0.3-0.20211229172706-598759fdea5f/go.mod h1:zA+1Lp9fGPo6pl1QxtMoNPLeZJ1O5m4kcH7HNxePQnQ= +gitlab.com/elixxir/primitives v0.0.3-0.20211230224340-fc0905d8776e h1:ne2fn3W+Nk613f4g0dvrIYuUu8029t5NyQneMeR5b6Q= +gitlab.com/elixxir/primitives v0.0.3-0.20211230224340-fc0905d8776e/go.mod h1:zA+1Lp9fGPo6pl1QxtMoNPLeZJ1O5m4kcH7HNxePQnQ= gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw= gitlab.com/xx_network/comms v0.0.4-0.20211227194445-c099754b3cda h1:oWl8TuAgdx/6J9lwdH8wYGSb4W6D5TG1AZkzbA8EzbE= gitlab.com/xx_network/comms v0.0.4-0.20211227194445-c099754b3cda/go.mod h1:5arueRMa2MNa6dALnfJwyZOhqhV53Gqc+tlHRz+Ycjw= gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE= gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk= -gitlab.com/xx_network/crypto v0.0.5-0.20201124194022-366c10b1bce0/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk= gitlab.com/xx_network/crypto v0.0.5-0.20211227194420-f311e8920467 h1:LkZtWBYrM2e7QRf5aaBAcy7s7CpYGhAqgXRFVCdBRy4= gitlab.com/xx_network/crypto v0.0.5-0.20211227194420-f311e8920467/go.mod h1:c+x0w3Xk6QZe5w2Redn5SiaBpqAhgNSfwBr0JGa/yyo= gitlab.com/xx_network/primitives v0.0.0-20200803231956-9b192c57ea7c/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA= diff --git a/groupChat/receive.go b/groupChat/receive.go index b2915735ca0eea7e44e16ae12fc8526e197dba24..93a149c4e0e480d4d56b98123ce1627cf5962d55 100644 --- a/groupChat/receive.go +++ b/groupChat/receive.go @@ -87,8 +87,11 @@ func (m Manager) receive(rawMsgs chan message.Receive, stop *stoppable.Single) { func (m *Manager) readMessage(msg message.Receive) (gs.Group, group.MessageID, time.Time, *id.ID, []byte, bool, error) { // Unmarshal payload into cMix message - cMixMsg := format.Unmarshal(msg.Payload) - + cMixMsg, err := format.Unmarshal(msg.Payload) + if err != nil { + return gs.Group{}, group.MessageID{}, time.Time{}, nil, nil, + false, err + } // Unmarshal cMix message contents to get public message format pubMsg, err := unmarshalPublicMsg(cMixMsg.GetContents()) if err != nil { diff --git a/interfaces/params/E2E.go b/interfaces/params/E2E.go index 9d473c429b17414c34ca4bf7cc6672637d9160b5..66f5b8f989468ba4988dfeefd7dc00cd5d438510 100644 --- a/interfaces/params/E2E.go +++ b/interfaces/params/E2E.go @@ -10,7 +10,6 @@ package params import ( "encoding/json" "fmt" - "gitlab.com/elixxir/crypto/e2e" ) type E2E struct { @@ -64,35 +63,37 @@ func (st SendType) String() string { // Network E2E Params + + +type E2ESessionParams struct { + // using the DH as a seed, both sides generate a number + // of keys to use before they must rekey because + // there are no keys to use. + MinKeys uint16 + MaxKeys uint16 + // the percent of keys before a rekey is attempted. must be <0 + RekeyThreshold float64 + // extra keys generated and reserved for rekey attempts. This + // many keys are not allowed to be used for sending messages + // in order to ensure there are extras for rekeying. + NumRekeys uint16 +} + // DEFAULT KEY GENERATION PARAMETERS // Hardcoded limits for keys -// With 16 receiving states we can hold -// 16*64=1024 dirty bits for receiving keys -// With that limit, and setting maxKeys to 800, -// we need a Threshold of 224, and a scalar -// smaller than 1.28 to ensure we never generate -// more than 1024 keys -// With 1 receiving states for ReKeys we can hold -// 64 Rekeys +// sets the number of keys very high, but with a low rekey threshold. In this case, if the other party is online, you will read const ( - minKeys uint16 = 500 - maxKeys uint16 = 800 - ttlScalar float64 = 1.2 // generate 20% extra keys - threshold uint16 = 224 - numReKeys uint16 = 16 + minKeys uint16 = 1000 + maxKeys uint16 = 2000 + rekeyThrshold float64 = 0.05 + numReKeys uint16 = 16 ) -type E2ESessionParams struct { - MinKeys uint16 - MaxKeys uint16 - NumRekeys uint16 - e2e.TTLParams -} - func GetDefaultE2ESessionParams() E2ESessionParams { return E2ESessionParams{ MinKeys: minKeys, MaxKeys: maxKeys, + RekeyThreshold: rekeyThrshold, NumRekeys: numReKeys, } } diff --git a/keyExchange/rekey.go b/keyExchange/rekey.go index 9acbae9930e384b0c06140a8408c78b721797d92..773a8544c5a1d614f8b5ab1bbaab14035d3488f3 100644 --- a/keyExchange/rekey.go +++ b/keyExchange/rekey.go @@ -24,14 +24,17 @@ import ( "gitlab.com/elixxir/crypto/diffieHellman" "gitlab.com/elixxir/primitives/states" "time" + "fmt" ) func CheckKeyExchanges(instance *network.Instance, sendE2E interfaces.SendE2E, - sess *storage.Session, manager *e2e.Manager, sendTimeout time.Duration, + events interfaces.EventManager, sess *storage.Session, + manager *e2e.Manager, sendTimeout time.Duration, stop *stoppable.Single) { sessions := manager.TriggerNegotiations() for _, session := range sessions { - go trigger(instance, sendE2E, sess, manager, session, sendTimeout, stop) + go trigger(instance, sendE2E, events, sess, manager, session, + sendTimeout, stop) } } @@ -40,7 +43,8 @@ func CheckKeyExchanges(instance *network.Instance, sendE2E interfaces.SendE2E, // session. They run the same negotiation, the former does it on a newly created // session while the latter on an extant session func trigger(instance *network.Instance, sendE2E interfaces.SendE2E, - sess *storage.Session, manager *e2e.Manager, session *e2e.Session, + events interfaces.EventManager, sess *storage.Session, + manager *e2e.Manager, session *e2e.Session, sendTimeout time.Duration, stop *stoppable.Single) { var negotiatingSession *e2e.Session jww.INFO.Printf("[REKEY] Negotiation triggered for session %s with "+ @@ -66,12 +70,14 @@ func trigger(instance *network.Instance, sendE2E interfaces.SendE2E, rekeyPreimage := manager.GetSilentPreimage() // send the rekey notification to the partner - err := negotiate(instance, sendE2E, sess, negotiatingSession, sendTimeout, rekeyPreimage, stop) + err := negotiate(instance, sendE2E, sess, negotiatingSession, + sendTimeout, rekeyPreimage, stop) // if sending the negotiation fails, revert the state of the session to // unconfirmed so it will be triggered in the future if err != nil { jww.ERROR.Printf("[REKEY] Failed to do Key Negotiation with "+ "session %s: %s", session, err) + events.Report(1, "Rekey", "NegotiationFailed", err.Error()) } } @@ -155,7 +161,13 @@ func negotiate(instance *network.Instance, sendE2E interfaces.SendE2E, // in the session and the log jww.INFO.Printf("[REKEY] Key Negotiation rekey transmission for %s, msgID %s successful", session, msgID) - session.SetNegotiationStatus(e2e.Sent) - - return nil + err = session.TrySetNegotiationStatus(e2e.Sent) + if err != nil { + if (session.NegotiationStatus() == e2e.NewSessionTriggered) { + msg := fmt.Sprintf("All channels exhausted for %s, " + + "rekey impossible.", session) + return errors.WithMessage(err, msg) + } + } + return err } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 2126e1ac22f34fd6a40aeda155eb7f18b978de64..173fb1f8c2bbb87eb56d48472a737aff950300f9 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -98,6 +98,8 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { elapsed := netTime.Since(timeStart) + jww.TRACE.Printf("[SendCMIX] try %d, elapsed: %s", + numRoundTries, elapsed) if elapsed > cmixParams.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ @@ -121,6 +123,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, jww.WARN.Printf("Best round on send is nil") continue } + jww.DEBUG.Printf("[sendCMIX] bestRound: %v", bestRound) // add the round on to the list of attempted, so it is not tried again attempted.Insert(bestRound.GetRoundId()) @@ -146,6 +149,9 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, continue } + jww.DEBUG.Printf("[sendCMIX] round %v processed, firstGW: %s", + bestRound, firstGateway) + // Build the messages to send wrappedMsg, encMsg, ephID, err := buildSlotMessage(msg, recipient, @@ -154,8 +160,9 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, return 0, ephemeral.Id{}, err } - jww.INFO.Printf("Sending to EphID %d (%s) on round %d, "+ - "(msgDigest: %s, ecrMsgDigest: %s) via gateway %s", + jww.INFO.Printf("[sendCMIX] Sending to EphID %d (%s), "+ + "on round %d (msgDigest: %s, ecrMsgDigest: %s) "+ + "via gateway %s", ephID.Int64(), recipient, bestRound.ID, msg.Digest(), encMsg.Digest(), firstGateway.String()) @@ -164,23 +171,36 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, timeout time.Duration) (interface{}, error) { wrappedMsg.Target = target.Marshal() + jww.TRACE.Printf("[sendCMIX] sendFunc %s", host) + timeout = calculateSendTimeout(bestRound, maxTimeout) // Use the smaller of the two timeout durations calculatedTimeout := calculateSendTimeout(bestRound, maxTimeout) if calculatedTimeout < timeout { timeout = calculatedTimeout } - result, err := comms.SendPutMessage(host, wrappedMsg, timeout) + //send the message + result, err := comms.SendPutMessage(host, wrappedMsg, + timeout) + jww.TRACE.Printf("[sendCMIX] sendFunc %s putmsg", host) + if err != nil { // fixme: should we provide as a slice the whole topology? - err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err) - return result, errors.WithMessagef(err, "SendCmix %s", unrecoverableError) - + err := handlePutMessageError(firstGateway, + instance, session, nodeRegistration, + recipient.String(), bestRound, err) + jww.TRACE.Printf("[sendCMIX] sendFunc %s err %+v", + host, err) + return result, errors.WithMessagef(err, + "SendCmix %s", unrecoverableError) } return result, err } + jww.DEBUG.Printf("[sendCMIX] sendToPreferred %s", firstGateway) result, err := sender.SendToPreferred( []*id.ID{firstGateway}, sendFunc, stop, cmixParams.SendTimeout) + jww.DEBUG.Printf("[sendCMIX] sendToPreferred %s returned", + firstGateway) // Exit if the thread has been stopped if stoppable.CheckErr(err) { diff --git a/network/message/sendE2E.go b/network/message/sendE2E.go index 3751dc9ee929cceacf629843dddb9ff7eafa6ae4..73a1427355fd01a8b238f5987ed361bcc1189fa3 100644 --- a/network/message/sendE2E.go +++ b/network/message/sendE2E.go @@ -63,7 +63,8 @@ func (m *Manager) SendE2E(msg message.Send, param params.E2E, if msg.MessageType != message.KeyExchangeTrigger { // check if any rekeys need to happen and trigger them keyExchange.CheckKeyExchanges(m.Instance, m.SendE2E, - m.Session, partner, 1*time.Minute, stop) + m.Events, m.Session, partner, + 1*time.Minute, stop) } //create the cmix message diff --git a/single/collator_test.go b/single/collator_test.go index 79474123ecffa02a2cd68968579cde881bf11d6e..b940f0c7498b7d7d3ff67b92e367a6621ecc2266 100644 --- a/single/collator_test.go +++ b/single/collator_test.go @@ -33,7 +33,7 @@ func TestCollator_collate(t *testing.T) { buff := bytes.NewBuffer(expectedData) for i := 0; i < messageCount; i++ { - msgParts[i] = newResponseMessagePart(msgPayloadSize + 4) + msgParts[i] = newResponseMessagePart(msgPayloadSize + 5) msgParts[i].SetMaxParts(uint8(messageCount)) msgParts[i].SetPartNum(uint8(i)) msgParts[i].SetContents(buff.Next(msgPayloadSize)) @@ -88,7 +88,7 @@ func TestCollator_collate_UnmarshalError(t *testing.T) { // Error path: max reported parts by payload larger then set in collator func TestCollator_collate_MaxPartsError(t *testing.T) { - payloadBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF} + payloadBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF} c := newCollator(1) payload, collated, err := c.collate(payloadBytes) @@ -105,7 +105,7 @@ func TestCollator_collate_MaxPartsError(t *testing.T) { // Error path: the message part number is greater than the max number of parts. func TestCollator_collate_PartNumTooLargeError(t *testing.T) { - payloadBytes := []byte{25, 5, 5, 5} + payloadBytes := []byte{25, 5, 5, 5, 5} c := newCollator(5) payload, collated, err := c.collate(payloadBytes) @@ -122,7 +122,7 @@ func TestCollator_collate_PartNumTooLargeError(t *testing.T) { // Error path: a message with the part number already exists. func TestCollator_collate_PartExistsError(t *testing.T) { - payloadBytes := []byte{1, 5, 0, 1, 20} + payloadBytes := []byte{0, 1, 5, 0, 1, 20} c := newCollator(5) payload, collated, err := c.collate(payloadBytes) if err != nil { diff --git a/single/manager_test.go b/single/manager_test.go index e240211160072a32b9b966d893a5610103afe6f8..acffc2f62eca15feaf1c1305cbf084cdbf0e4c64 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -64,7 +64,7 @@ func TestManager_StartProcesses(t *testing.T) { DhPubKey: m.store.E2e().GetDHPublicKey(), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) callback, callbackChan := createReceiveComm() @@ -150,7 +150,7 @@ func TestManager_StartProcesses_Stop(t *testing.T) { DhPubKey: m.store.E2e().GetDHPublicKey(), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) callback, callbackChan := createReceiveComm() diff --git a/single/receiveResponse.go b/single/receiveResponse.go index 72a613b74acba023868fa3d34ab3c2e21b64c62a..7c27bda09a9082f171a7a4d35304b42f111c0227 100644 --- a/single/receiveResponse.go +++ b/single/receiveResponse.go @@ -70,7 +70,10 @@ func (m *Manager) processesResponse(rid *id.ID, ephID ephemeral.Id, } // Unmarshal CMIX message - cmixMsg := format.Unmarshal(msgBytes) + cmixMsg, err := format.Unmarshal(msgBytes) + if err != nil { + return err + } // Ensure the fingerprints match fp := cmixMsg.GetKeyFP() diff --git a/single/reception.go b/single/reception.go index 7616af03228ec5e643b1da4a1c20ad936d1332d9..e1aa8e08d3341e6bf3bdb4d5354d6e4cf90dfda6 100644 --- a/single/reception.go +++ b/single/reception.go @@ -35,7 +35,12 @@ func (m *Manager) receiveTransmissionHandler(rawMessages chan message.Receive, "single-use transmission.") // Check if message is a single-use transmit message - cmixMsg := format.Unmarshal(msg.Payload) + cmixMsg, err := format.Unmarshal(msg.Payload) + if err != nil { + jww.ERROR.Printf("Could not unmarshal msg: %s", + err.Error()) + continue + } if fp != cmixMsg.GetKeyFP() { // If the verification fails, then ignore the message as it is // likely garbled or for a different protocol diff --git a/single/reception_test.go b/single/reception_test.go index 27a87b9a181e94437b8a3b471492123c2451547b..efa83406dbb27791d2a209430043520b4a30c4c3 100644 --- a/single/reception_test.go +++ b/single/reception_test.go @@ -23,7 +23,7 @@ func TestManager_receiveTransmissionHandler(t *testing.T) { DhPubKey: m.store.E2e().GetDHPublicKey(), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) callback, callbackChan := createReceiveComm() @@ -91,7 +91,7 @@ func TestManager_receiveTransmissionHandler_FingerPrintError(t *testing.T) { DhPubKey: m.store.E2e().GetGroup().NewInt(42), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) callback, callbackChan := createReceiveComm() @@ -128,7 +128,7 @@ func TestManager_receiveTransmissionHandler_ProcessMessageError(t *testing.T) { DhPubKey: m.store.E2e().GetDHPublicKey(), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) callback, callbackChan := createReceiveComm() @@ -167,7 +167,7 @@ func TestManager_receiveTransmissionHandler_TagFpError(t *testing.T) { DhPubKey: m.store.E2e().GetDHPublicKey(), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) msg, _, _, _, err := m.makeTransmitCmixMessage(partner, payload, tag, 8, 32, diff --git a/single/responseMessage.go b/single/responseMessage.go index 28aec540b40c2386842c0e4eb79701d82ee78e41..72d3bbdb666f4d6153ed59cf20caa9e64b49d7fd 100644 --- a/single/responseMessage.go +++ b/single/responseMessage.go @@ -16,20 +16,23 @@ import ( const ( partNumLen = 1 maxPartsLen = 1 - responseMinSize = partNumLen + maxPartsLen + sizeSize + responseMinSize = receptionMessageVersionLen + partNumLen + maxPartsLen + sizeSize + receptionMessageVersion = 0 + receptionMessageVersionLen = 1 ) /* -+-----------------------------------------+ -| CMIX Message Contents | -+---------+----------+---------+----------+ -| partNum | maxParts | size | contents | -| 1 bytes | 1 byte | 2 bytes | variable | -+------------+----------+---------+-------+ ++---------------------------------------------------+ +| CMIX Message Contents | ++---------+----------+---------+---------+----------+ +| version | maxParts | size | partNum | contents | +| 1 bytes | 1 byte | 2 bytes | 1 bytes | variable | ++------------+----------+---------+------+----------+ */ type responseMessagePart struct { data []byte // Serial of all contents + version []byte // Version of the message partNum []byte // Index of message in a series of messages maxParts []byte // The number of parts in this message. size []byte // Size of the contents @@ -46,7 +49,9 @@ func newResponseMessagePart(externalPayloadSize int) responseMessagePart { externalPayloadSize, responseMinSize) } - return mapResponseMessagePart(make([]byte, externalPayloadSize)) + rmp := mapResponseMessagePart(make([]byte, externalPayloadSize)) + rmp.version[0] = receptionMessageVersion + return rmp } // mapResponseMessagePart builds a message part mapped to the passed in data. @@ -54,9 +59,10 @@ func newResponseMessagePart(externalPayloadSize int) responseMessagePart { func mapResponseMessagePart(data []byte) responseMessagePart { return responseMessagePart{ data: data, - partNum: data[:partNumLen], - maxParts: data[partNumLen : maxPartsLen+partNumLen], - size: data[maxPartsLen+partNumLen : responseMinSize], + version: data[:receptionMessageVersionLen], + partNum: data[receptionMessageVersionLen:receptionMessageVersionLen+partNumLen], + maxParts: data[receptionMessageVersionLen+partNumLen : receptionMessageVersionLen+maxPartsLen+partNumLen], + size: data[receptionMessageVersionLen+maxPartsLen+partNumLen : responseMinSize], contents: data[responseMinSize:], } } diff --git a/single/responseMessage_test.go b/single/responseMessage_test.go index 8871fdc898efa1b00f84c27a914dc6850bcef25c..b97aa27b544b03779bc28c32e6a9232722fd815f 100644 --- a/single/responseMessage_test.go +++ b/single/responseMessage_test.go @@ -21,10 +21,11 @@ func Test_newResponseMessagePart(t *testing.T) { payloadSize := prng.Intn(2000) expected := responseMessagePart{ data: make([]byte, payloadSize), + version: make([]byte, receptionMessageVersionLen), partNum: make([]byte, partNumLen), maxParts: make([]byte, maxPartsLen), size: make([]byte, sizeSize), - contents: make([]byte, payloadSize-partNumLen-maxPartsLen-sizeSize), + contents: make([]byte, payloadSize-partNumLen-maxPartsLen-sizeSize-receptionMessageVersionLen), } rmp := newResponseMessagePart(payloadSize) @@ -50,13 +51,14 @@ func Test_newResponseMessagePart_PayloadSizeError(t *testing.T) { // Happy path. func Test_mapResponseMessagePart(t *testing.T) { prng := rand.New(rand.NewSource(42)) + expectedVersion := uint8(0) expectedPartNum := uint8(prng.Uint32()) expectedMaxParts := uint8(prng.Uint32()) size := []byte{uint8(prng.Uint64()), uint8(prng.Uint64())} expectedContents := make([]byte, prng.Intn(2000)) prng.Read(expectedContents) var data []byte - data = append(data, expectedPartNum, expectedMaxParts) + data = append(data, expectedVersion, expectedPartNum, expectedMaxParts) data = append(data, size...) data = append(data, expectedContents...) diff --git a/single/response_test.go b/single/response_test.go index c19f472bafa42e122aea75f3f1085f3fc2f522cb..2a6c74521b593b490f3ee53f92aeff488f6bba7f 100644 --- a/single/response_test.go +++ b/single/response_test.go @@ -23,7 +23,7 @@ import ( func TestManager_GetMaxResponsePayloadSize(t *testing.T) { m := newTestManager(0, false, t) cmixPrimeSize := m.store.Cmix().GetGroup().GetP().ByteLen() - expectedSize := 2*cmixPrimeSize - format.KeyFPLen - format.MacLen - format.RecipientIDLen - responseMinSize + expectedSize := 2*cmixPrimeSize - format.KeyFPLen - format.MacLen - format.RecipientIDLen - responseMinSize-1 testSize := m.GetMaxResponsePayloadSize() if expectedSize != testSize { diff --git a/single/transmission_test.go b/single/transmission_test.go index 761f65d9fc5e757aa84710cf7f3ef476f0ae91ac..548cefa7c0be402eda3671278a4234e6fb610672 100644 --- a/single/transmission_test.go +++ b/single/transmission_test.go @@ -26,7 +26,7 @@ func TestManager_GetMaxTransmissionPayloadSize(t *testing.T) { m := newTestManager(0, false, t) cmixPrimeSize := m.store.Cmix().GetGroup().GetP().ByteLen() e2ePrimeSize := m.store.E2e().GetGroup().GetP().ByteLen() - expectedSize := 2*cmixPrimeSize - e2ePrimeSize - format.KeyFPLen - format.MacLen - format.RecipientIDLen - transmitPlMinSize + expectedSize := 2*cmixPrimeSize - e2ePrimeSize - format.KeyFPLen - format.MacLen - format.RecipientIDLen - transmitPlMinSize - transmitMessageVersionSize-1 testSize := m.GetMaxTransmissionPayloadSize() if expectedSize != testSize { @@ -260,7 +260,7 @@ func TestManager_makeTransmitCmixMessage(t *testing.T) { DhPubKey: m.store.E2e().GetGroup().NewInt(42), } tag := "Test tag" - payload := make([]byte, 132) + payload := make([]byte, 130) rand.New(rand.NewSource(42)).Read(payload) maxMsgs := uint8(8) timeNow := netTime.Now() diff --git a/single/transmitMessage.go b/single/transmitMessage.go index b3915fbe8ae43a87c3e0b7ce5fcff32e9c7adf12..6676a3ae5d754efa6bf1434e9ccb263214f23bb6 100644 --- a/single/transmitMessage.go +++ b/single/transmitMessage.go @@ -19,19 +19,23 @@ import ( ) /* -+-----------------------------------------------------------------+ -| CMIX Message Contents | -+------------+----------------------------------------------------+ -| pubKey | payload (transmitMessagePayload) | -| pubKeySize | externalPayloadSize - pubKeySize | -+------------+----------+---------+----------+---------+----------+ - | Tag FP | nonce | maxParts | size | contents | - | 16 bytes | 8 bytes | 1 byte | 2 bytes | variable | - +----------+---------+----------+---------+----------+ ++------------------------------------------------------------------------------+ +| CMIX Message Contents | ++------------+-------------------------------------------- --------+ +| Version | pubKey | payload (transmitMessagePayload) | +| 1 byte | pubKeySize | externalPayloadSize - pubKeySize | ++------------+------------+----------+---------+----------+---------+----------+ + | Tag FP | nonce | maxParts | size | contents | + | 16 bytes | 8 bytes | 1 byte | 2 bytes | variable | + +----------+---------+----------+---------+----------+ */ +const transmitMessageVersion = 0 +const transmitMessageVersionSize = 1 + type transmitMessage struct { data []byte // Serial of all contents + version []byte pubKey []byte payload []byte // The encrypted payload containing reception ID and contents } @@ -45,7 +49,10 @@ func newTransmitMessage(externalPayloadSize, pubKeySize int) transmitMessage { externalPayloadSize, pubKeySize) } - return mapTransmitMessage(make([]byte, externalPayloadSize), pubKeySize) + tm := mapTransmitMessage(make([]byte, externalPayloadSize), pubKeySize) + tm.version[0] = transmitMessageVersion + + return tm } // mapTransmitMessage builds a message mapped to the passed in data. It is @@ -53,8 +60,9 @@ func newTransmitMessage(externalPayloadSize, pubKeySize int) transmitMessage { func mapTransmitMessage(data []byte, pubKeySize int) transmitMessage { return transmitMessage{ data: data, - pubKey: data[:pubKeySize], - payload: data[pubKeySize:], + version: data[:transmitMessageVersionSize], + pubKey: data[transmitMessageVersionSize:transmitMessageVersionSize+pubKeySize], + payload: data[transmitMessageVersionSize+pubKeySize:], } } @@ -79,6 +87,11 @@ func (m transmitMessage) GetPubKey(grp *cyclic.Group) *cyclic.Int { return grp.NewIntFromBytes(m.pubKey) } +// Version returns the version of the message. +func (m transmitMessage) Version() uint8 { + return m.version[0] +} + // GetPubKeySize returns the length of the public key. func (m transmitMessage) GetPubKeySize() int { return len(m.pubKey) diff --git a/single/transmitMessage_test.go b/single/transmitMessage_test.go index 833fd5f4b91530547684e8a4f117ab22f628da43..5125526ab47e3d98e7f9e4c81306492f129a1772 100644 --- a/single/transmitMessage_test.go +++ b/single/transmitMessage_test.go @@ -27,8 +27,9 @@ func Test_newTransmitMessage(t *testing.T) { pubKeySize := prng.Intn(externalPayloadSize) expected := transmitMessage{ data: make([]byte, externalPayloadSize), + version: make([]byte, transmitMessageVersionSize), pubKey: make([]byte, pubKeySize), - payload: make([]byte, externalPayloadSize-pubKeySize), + payload: make([]byte, externalPayloadSize-pubKeySize-transmitMessageVersionSize), } m := newTransmitMessage(externalPayloadSize, pubKeySize) @@ -60,7 +61,9 @@ func Test_mapTransmitMessage(t *testing.T) { prng.Read(pubKey) payload := make([]byte, externalPayloadSize-pubKeySize) prng.Read(payload) + version := make([]byte, 1) var data []byte + data = append(data, version...) data = append(data, pubKey...) data = append(data, payload...) m := mapTransmitMessage(data, pubKeySize) @@ -137,7 +140,8 @@ func TestTransmitMessage_SetPayload_GetPayload_GetPayloadSize(t *testing.T) { prng := rand.New(rand.NewSource(42)) externalPayloadSize := prng.Intn(2000) pubKeySize := prng.Intn(externalPayloadSize) - payload := make([]byte, externalPayloadSize-pubKeySize) + payloadSize := externalPayloadSize - pubKeySize-transmitMessageVersionSize + payload := make([]byte, payloadSize) prng.Read(payload) m := newTransmitMessage(externalPayloadSize, pubKeySize) @@ -149,7 +153,7 @@ func TestTransmitMessage_SetPayload_GetPayload_GetPayloadSize(t *testing.T) { "\nexpected: %+v\nreceived: %+v", payload, testPayload) } - payloadSize := externalPayloadSize - pubKeySize + if payloadSize != m.GetPayloadSize() { t.Errorf("GetContentsSize() returned incorrect content size."+ "\nexpected: %d\nreceived: %d", payloadSize, m.GetPayloadSize()) diff --git a/storage/cmix/roundKeys_test.go b/storage/cmix/roundKeys_test.go index 0b701375bab6fee74bd509bab1d208f33dcde61b..1a360385cffa22784aef101c9b95dda900f899c2 100644 --- a/storage/cmix/roundKeys_test.go +++ b/storage/cmix/roundKeys_test.go @@ -23,28 +23,44 @@ import ( func TestRoundKeys_Encrypt_Consistency(t *testing.T) { const numKeys = 5 - expectedPayload := []byte{240, 199, 83, 226, 28, 164, 104, 139, 171, 255, 234, 86, 170, 65, 29, 254, 100, 4, 81, - 112, 154, 115, 224, 245, 29, 60, 226, 209, 135, 75, 108, 62, 95, 185, 211, 56, 83, 55, 250, 159, 173, 176, 137, - 181, 1, 155, 228, 223, 170, 232, 71, 225, 55, 27, 189, 218, 146, 74, 134, 133, 105, 17, 69, 105, 160, 60, 206, - 32, 244, 175, 98, 142, 217, 27, 92, 132, 225, 146, 171, 59, 2, 191, 220, 125, 212, 81, 114, 98, 75, 253, 93, - 126, 48, 230, 249, 118, 215, 90, 231, 126, 43, 235, 151, 191, 23, 77, 147, 98, 212, 86, 89, 42, 189, 24, 124, - 189, 201, 184, 82, 152, 255, 137, 119, 21, 74, 118, 157, 114, 229, 232, 36, 185, 104, 101, 132, 23, 79, 65, 195, - 53, 222, 27, 66, 80, 123, 252, 109, 254, 44, 120, 114, 126, 237, 159, 252, 185, 187, 95, 255, 31, 41, 245, 225, - 95, 101, 118, 190, 233, 44, 5, 42, 239, 140, 70, 216, 211, 129, 43, 189, 1, 11, 111, 2, 64, 254, 44, 87, 164, - 28, 188, 227, 1, 32, 134, 183, 156, 84, 222, 79, 27, 210, 124, 46, 153, 56, 122, 117, 17, 171, 85, 232, 112, - 170, 10, 31, 115, 17, 119, 233, 150, 200, 183, 198, 74, 70, 179, 135, 27, 195, 190, 56, 126, 143, 226, 93, 16, - 46, 147, 248, 128, 124, 182, 254, 187, 223, 187, 54, 181, 62, 89, 202, 176, 25, 249, 139, 167, 26, 98, 143, 3, - 78, 54, 116, 201, 6, 33, 158, 225, 254, 106, 15, 6, 175, 96, 2, 63, 0, 59, 188, 124, 120, 147, 95, 24, 26, 115, - 235, 154, 240, 65, 226, 133, 91, 249, 223, 55, 122, 0, 76, 225, 104, 101, 242, 46, 136, 122, 127, 159, 0, 9, - 210, 42, 181, 31, 94, 20, 106, 175, 195, 56, 223, 165, 217, 164, 93, 55, 190, 253, 192, 249, 117, 226, 222, 65, - 82, 136, 36, 58, 3, 246, 76, 101, 24, 20, 50, 89, 22, 144, 184, 38, 82, 103, 2, 48, 59, 73, 75, 58, 33, 206, 49, - 88, 201, 44, 176, 242, 248, 254, 127, 101, 62, 57, 103, 75, 213, 73, 30, 146, 223, 118, 104, 126, 189, 179, 132, - 25, 183, 178, 65, 131, 72, 121, 42, 170, 40, 186, 65, 73, 175, 234, 52, 10, 171, 36, 165, 24, 156, 12, 198, 100, - 77, 137, 91, 221, 152, 219, 207, 244, 44, 126, 178, 119, 133, 147, 158, 54, 188, 52, 10, 63, 138, 180, 44, 29, - 40, 236, 255, 163, 208, 2, 212, 184, 50, 157, 82, 199, 90, 1, 205, 214, 143, 123, 92, 210, 88, 98, 182, 197, 49, - 170, 100, 143, 145, 9, 156, 0, 45, 59, 196, 6, 8, 157, 98, 15, 111, 162, 51, 12, 223, 0, 173, 187, 178, 1, 156, - 68, 183, 64, 178, 250, 40, 65, 50, 161, 96, 163, 106, 14, 43, 179, 75, 199, 15, 223, 192, 121, 144, 223, 167, - 254, 150, 188} + expectedPayload := []byte{220, 95, 160, 88, 229, 136, 42, 254, 239, 32, + 57, 120, 7, 187, 69, 66, 199, 95, 136, 118, 130, 192, 167, 143, + 94, 80, 250, 22, 85, 47, 200, 208, 68, 179, 143, 31, 21, 215, + 17, 117, 179, 170, 67, 59, 14, 158, 116, 249, 10, 116, 166, 127, + 168, 26, 11, 41, 129, 166, 133, 135, 93, 217, 61, 99, 29, 198, + 86, 34, 83, 72, 158, 44, 178, 57, 158, 168, 107, 43, 54, 107, + 183, 16, 149, 133, 109, 166, 154, 248, 185, 218, 32, 11, 200, + 191, 240, 197, 27, 21, 82, 198, 42, 109, 79, 28, 116, 64, 34, + 44, 178, 75, 142, 79, 17, 31, 17, 196, 104, 20, 44, 125, 80, 72, + 205, 76, 23, 69, 132, 176, 180, 211, 193, 200, 175, 149, 133, 2, + 153, 114, 21, 239, 107, 46, 237, 41, 48, 188, 241, 97, 89, 65, + 213, 218, 73, 38, 213, 194, 113, 142, 203, 176, 124, 222, 172, + 128, 152, 228, 18, 128, 26, 122, 199, 192, 255, 84, 222, 165, + 77, 199, 57, 56, 7, 72, 20, 158, 133, 90, 63, 68, 145, 54, 34, + 223, 152, 157, 105, 217, 30, 111, 83, 4, 200, 125, 120, 189, + 232, 146, 130, 84, 119, 240, 144, 166, 111, 6, 56, 26, 93, 95, + 69, 225, 103, 174, 211, 204, 66, 181, 33, 198, 65, 140, 53, 255, + 37, 120, 204, 59, 128, 70, 54, 228, 26, 197, 107, 186, 22, 93, + 189, 234, 89, 217, 90, 133, 153, 189, 114, 73, 75, 55, 77, 209, + 136, 102, 193, 60, 241, 25, 101, 238, 162, 49, 94, 219, 46, 152, + 100, 120, 152, 131, 78, 128, 226, 47, 21, 253, 171, 40, 122, 161, + 69, 56, 102, 63, 89, 160, 209, 219, 142, 51, 179, 165, 243, 70, + 137, 24, 221, 105, 39, 0, 214, 201, 221, 184, 104, 165, 44, 82, + 13, 239, 197, 80, 252, 200, 115, 146, 200, 51, 63, 173, 88, 163, + 3, 214, 135, 89, 118, 99, 197, 98, 80, 176, 150, 139, 71, 6, 7, + 37, 252, 82, 225, 187, 212, 65, 4, 154, 28, 170, 224, 242, 17, + 68, 245, 73, 234, 216, 255, 2, 168, 235, 116, 147, 252, 217, 85, + 157, 38, 243, 43, 213, 250, 219, 124, 86, 155, 129, 99, 195, + 217, 163, 9, 133, 217, 6, 77, 127, 88, 168, 217, 84, 66, 224, + 90, 11, 210, 218, 215, 143, 239, 221, 138, 231, 57, 149, 175, + 221, 188, 128, 169, 28, 215, 39, 147, 36, 52, 146, 75, 20, 228, + 230, 197, 1, 80, 38, 208, 139, 4, 240, 163, 104, 158, 49, 29, + 248, 206, 79, 52, 203, 219, 178, 46, 81, 170, 100, 14, 253, 150, + 240, 191, 92, 18, 23, 94, 73, 110, 212, 237, 84, 86, 102, 32, + 78, 209, 207, 213, 117, 141, 148, 218, 209, 253, 220, 108, 135, + 163, 159, 134, 125, 6, 225, 163, 35, 115, 146, 103, 169, 152, + 251, 188, 125, 159, 185, 119, 67, 80, 92, 232, 208, 1, 32, 144, + 250, 32, 187} expectedKmacs := [][]byte{{110, 235, 79, 128, 16, 94, 181, 95, 101, 152, 187, 204, 87, 236, 211, 102, 88, 130, 191, 103, 23, 229, diff --git a/storage/conversation/message.go b/storage/conversation/message.go new file mode 100644 index 0000000000000000000000000000000000000000..748256a5d20c83a59476eb9cc91cf1e6030bba79 --- /dev/null +++ b/storage/conversation/message.go @@ -0,0 +1,135 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package conversation + +import ( + "bytes" + "encoding/base64" + "encoding/binary" + "time" +) + +// Constants for data length. +const ( + MessageIdLen = 32 + TruncatedMessageIdLen = 8 +) + +// MessageId is the ID of a message stored in a Message. +type MessageId [MessageIdLen]byte + +// truncatedMessageId represents the first64 bits of the MessageId. +type truncatedMessageId [TruncatedMessageIdLen]byte + +// A Message is the structure held in a ring buffer. +// It represents a received message by the user, which needs +// its reception verified to the original sender of the message. +type Message struct { + // id is the sequential ID of the Message in the ring buffer + id uint32 + // The ID of the message + MessageId MessageId + Timestamp time.Time +} + +// newMessage is the constructor for a Message object. +func newMessage(id uint32, mid MessageId, timestamp time.Time) *Message { + return &Message{ + id: id, + MessageId: mid, + Timestamp: timestamp, + } +} + +// marshal creates a byte buffer containing the serialized information +// of a Message. +func (m *Message) marshal() []byte { + buff := bytes.NewBuffer(nil) + + // Serialize and write the ID into a byte buffer + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, m.id) + buff.Write(b) + + // Serialize and write the MessageID into a byte buffer + buff.Write(m.MessageId.Bytes()) + + // Serialize and write the timestamp into a byte buffer + b = make([]byte, 8) + binary.LittleEndian.PutUint64(b, uint64(m.Timestamp.UnixNano())) + buff.Write(b) + + return buff.Bytes() +} + +// unmarshalMessage deserializes byte data into a Message. +func unmarshalMessage(data []byte) *Message { + buff := bytes.NewBuffer(data) + + // Deserialize the ID + ID := binary.LittleEndian.Uint32(buff.Next(4)) + + // Deserialize the message ID + midData := buff.Next(MessageIdLen) + mid := NewMessageIdFromBytes(midData) + + tsNano := binary.LittleEndian.Uint64(buff.Next(8)) + ts := time.Unix(0, int64(tsNano)) + + return &Message{ + id: ID, + MessageId: mid, + Timestamp: ts, + } + +} + +// NewMessageIdFromBytes is a constructor for MessageId +// creates a MessageId from byte data. +func NewMessageIdFromBytes(data []byte) MessageId { + mid := MessageId{} + copy(mid[:], data) + return mid +} + +// String returns a base64 encode of the MessageId. This functions +// satisfies the fmt.Stringer interface. +func (mid MessageId) String() string { + return base64.StdEncoding.EncodeToString(mid[:]) +} + +// truncate converts a MessageId into a truncatedMessageId. +func (mid MessageId) truncate() truncatedMessageId { + return newTruncatedMessageId(mid.Bytes()) +} + +// Bytes returns the byte data of the MessageId. +func (mid MessageId) Bytes() []byte { + return mid[:] +} + +// newTruncatedMessageId is a constructor for truncatedMessageId +// creates a truncatedMessageId from byte data. +func newTruncatedMessageId(data []byte) truncatedMessageId { + tmid := truncatedMessageId{} + copy(tmid[:], data) + return tmid + +} + +// String returns a base64 encode of the truncatedMessageId. This functions +// satisfies the fmt.Stringer interface. +func (tmid truncatedMessageId) String() string { + return base64.StdEncoding.EncodeToString(tmid[:]) + +} + +// Bytes returns the byte data of the truncatedMessageId. +func (tmid truncatedMessageId) Bytes() []byte { + return tmid[:] +} diff --git a/storage/conversation/message_test.go b/storage/conversation/message_test.go new file mode 100644 index 0000000000000000000000000000000000000000..eb11eabfb99e3d6fb02983af597a3355a852bf40 --- /dev/null +++ b/storage/conversation/message_test.go @@ -0,0 +1,85 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package conversation + +import ( + "bytes" + "reflect" + "testing" + "time" +) + +// TestMessage_MarshalUnmarshal tests whether a marshalled Message deserializes into +// the same Message using unmarshalMessage. +func TestMessage_MarshalUnmarshal(t *testing.T) { + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.Local) + testId := NewMessageIdFromBytes([]byte("messageId123")) + + message := &Message{ + id: 0, + MessageId: testId, + Timestamp: timestamp, + } + + serialized := message.marshal() + + unmarshalled := unmarshalMessage(serialized) + + if !reflect.DeepEqual(unmarshalled, message) { + t.Fatalf("Unmarshal did not output expected data."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", message, unmarshalled) + } + +} + +// TestMessageId_truncate tests the MessageId truncate function. +func TestMessageId_truncate(t *testing.T) { + testId := NewMessageIdFromBytes([]byte("This is going to be 32 bytes....")) + + tmid := testId.truncate() + expected := truncatedMessageId{} + copy(expected[:], testId.Bytes()) + if len(tmid.Bytes()) != TruncatedMessageIdLen { + t.Fatalf("MessageId.Truncate() did not produce a truncatedMessageId of "+ + "TruncatedMessageIdLen (%d)."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", TruncatedMessageIdLen, expected, tmid) + } +} + +// TestNewMessageIdFromBytes tests that NewMessageIdFromBytes +// properly constructs a MessageId. +func TestNewMessageIdFromBytes(t *testing.T) { + expected := make([]byte, 0, MessageIdLen) + for i := 0; i < MessageIdLen; i++ { + expected = append(expected, byte(i)) + } + testId := NewMessageIdFromBytes(expected) + if !bytes.Equal(expected, testId.Bytes()) { + t.Fatalf("Unexpected output from NewMessageIdFromBytes."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, testId.Bytes()) + } + +} + +// TestNewTruncatedMessageId tests that newTruncatedMessageId +// constructs a proper truncatedMessageId. +func TestNewTruncatedMessageId(t *testing.T) { + expected := make([]byte, 0, TruncatedMessageIdLen) + for i := 0; i < TruncatedMessageIdLen; i++ { + expected = append(expected, byte(i)) + } + testId := newTruncatedMessageId(expected) + if !bytes.Equal(expected, testId.Bytes()) { + t.Fatalf("Unexpected output from newTruncatedMessageId."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, testId.Bytes()) + } +} diff --git a/storage/conversation/ring.go b/storage/conversation/ring.go new file mode 100644 index 0000000000000000000000000000000000000000..320e93d4e459f8561ebfaa6b70aa6f9c82f5a435 --- /dev/null +++ b/storage/conversation/ring.go @@ -0,0 +1,321 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package conversation + +import ( + "bytes" + "encoding/binary" + "github.com/pkg/errors" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/xx_network/primitives/netTime" + "math" + "sync" + "time" +) + +// Storage keys and versions. +const ( + ringBuffPrefix = "ringBuffPrefix" + ringBuffKey = "ringBuffKey" + ringBuffVersion = 0 + messageKey = "ringBuffMessageKey" + messageVersion = 0 +) + +// Error messages. +const ( + saveMessageErr = "failed to save message with message ID %s to storage: %+v" + loadMessageErr = "failed to load message with truncated ID %s from storage: %+v" + loadBuffErr = "failed to load ring buffer from storage: %+v" + noMessageFoundErr = "failed to find message with message ID %s" + lookupTooOldErr = "requested ID %d is lower than oldest id %d" + lookupPastRecentErr = "requested id %d is higher than most recent id %d" +) + +// Buff is a circular buffer which containing Message's. +type Buff struct { + buff []*Message + lookup map[truncatedMessageId]*Message + oldest, newest uint32 + mux sync.RWMutex + kv *versioned.KV +} + +// NewBuff initializes a new ring buffer with size n. +func NewBuff(kv *versioned.KV, n int) (*Buff, error) { + kv = kv.Prefix(ringBuffPrefix) + + // Construct object + rb := &Buff{ + buff: make([]*Message, n), + lookup: make(map[truncatedMessageId]*Message, n), + oldest: 0, + // Set to max int since index is unsigned. + // Upon first insert, index will overflow back to zero. + newest: math.MaxUint32, + kv: kv, + } + + // Save to storage and return + return rb, rb.save() +} + +// Add pushes a message to the circular buffer Buff. +func (rb *Buff) Add(id MessageId, timestamp time.Time) error { + rb.mux.Lock() + defer rb.mux.Unlock() + rb.push(&Message{ + MessageId: id, + Timestamp: timestamp, + }) + + return rb.save() +} + +// Get retrieves the most recent entry. +func (rb *Buff) Get() *Message { + rb.mux.RLock() + defer rb.mux.RUnlock() + + mostRecentIndex := rb.newest % uint32(len(rb.buff)) + return rb.buff[mostRecentIndex] + +} + +// GetByMessageId looks up and returns the message with MessageId id from +// Buff.lookup. If the message does not exist, an error is returned. +func (rb *Buff) GetByMessageId(id MessageId) (*Message, error) { + rb.mux.RLock() + defer rb.mux.RUnlock() + + // Look up message + msg, exists := rb.lookup[id.truncate()] + if !exists { // If message not found, return an error + return nil, errors.Errorf(noMessageFoundErr, id) + } + + // Return message if found + return msg, nil +} + +// GetNextMessage looks up the Message with the next sequential Message.id +// in the ring buffer after the Message with the requested MessageId. +func (rb *Buff) GetNextMessage(id MessageId) (*Message, error) { + rb.mux.RLock() + defer rb.mux.RUnlock() + + // Look up message + msg, exists := rb.lookup[id.truncate()] + if !exists { // If message not found, return an error + return nil, errors.Errorf(noMessageFoundErr, id) + } + + lookupId := msg.id + 1 + + // Check it's not before our first known id + if lookupId < rb.oldest { + return nil, errors.Errorf(lookupTooOldErr, id, rb.oldest) + } + + // Check it's not after our last known id + if lookupId > rb.newest { + return nil, errors.Errorf(lookupPastRecentErr, id, rb.newest) + } + + return rb.buff[(lookupId % uint32(len(rb.buff)))], nil +} + +// next is a helper function for Buff, which handles incrementing +// the old & new markers. +func (rb *Buff) next() { + rb.newest++ + if rb.newest >= uint32(len(rb.buff)) { + rb.oldest++ + } +} + +// push adds a Message to the Buff, clearing the overwritten message from +// both the buff and the lookup structures. +func (rb *Buff) push(val *Message) { + // Update circular buffer trackers + rb.next() + + val.id = rb.newest + + // Handle overwrite of the oldest message + rb.handleMessageOverwrite() + + // Set message in RAM + rb.buff[rb.newest%uint32(len(rb.buff))] = val + rb.lookup[val.MessageId.truncate()] = val + +} + +// handleMessageOverwrite is a helper function which deletes the message +// that will be overwritten by push from the lookup structure. +func (rb *Buff) handleMessageOverwrite() { + overwriteIndex := rb.newest % uint32(len(rb.buff)) + messageToOverwrite := rb.buff[overwriteIndex] + if messageToOverwrite != nil { + delete(rb.lookup, messageToOverwrite.MessageId.truncate()) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Storage Functions // +//////////////////////////////////////////////////////////////////////////////// + +// LoadBuff loads the ring buffer from storage. It loads all +// messages from storage and repopulates the buffer. +func LoadBuff(kv *versioned.KV) (*Buff, error) { + kv = kv.Prefix(ringBuffPrefix) + + // Extract ring buffer from storage + vo, err := kv.Get(ringBuffKey, ringBuffVersion) + if err != nil { + return nil, errors.Errorf(loadBuffErr, err) + } + + // Unmarshal ring buffer from data + newest, oldest, list := unmarshalBuffer(vo.Data) + + // Construct buffer + rb := &Buff{ + buff: make([]*Message, len(list)), + lookup: make(map[truncatedMessageId]*Message, len(list)), + oldest: oldest, + newest: newest, + mux: sync.RWMutex{}, + kv: kv, + } + + // Load each message from storage + for i, tmid := range list { + msg, err := loadMessage(tmid, kv) + if err != nil { + return nil, err + } + + // Place message into reconstructed buffer (RAM) + rb.lookup[tmid] = msg + rb.buff[i] = msg + } + + return rb, nil +} + +// save stores the ring buffer and its elements to storage. +// NOTE: save is unsafe, a lock should be held by the caller. +func (rb *Buff) save() error { + + // Save each message individually to storage + for _, msg := range rb.buff { + if msg != nil { + if err := rb.saveMessage(msg); err != nil { + return errors.Errorf(saveMessageErr, + msg.MessageId, err) + } + } + } + + return rb.saveBuff() +} + +// saveBuff is a function which saves the marshalled Buff. +func (rb *Buff) saveBuff() error { + obj := &versioned.Object{ + Version: ringBuffVersion, + Timestamp: netTime.Now(), + Data: rb.marshal(), + } + + return rb.kv.Set(ringBuffKey, ringBuffVersion, obj) + +} + +// marshal creates a byte buffer containing serialized information +// on the Buff. +func (rb *Buff) marshal() []byte { + // Create buffer of proper size + // (newest (4 bytes) + oldest (4 bytes) + + // (TruncatedMessageIdLen * length of buffer) + buff := bytes.NewBuffer(nil) + buff.Grow(4 + 4 + (TruncatedMessageIdLen * len(rb.lookup))) + + // Write newest index into buffer + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, uint32(rb.newest)) + buff.Write(b) + + // Write oldest index into buffer + b = make([]byte, 4) + binary.LittleEndian.PutUint32(b, uint32(rb.oldest)) + buff.Write(b) + + // Write the truncated message IDs into buffer + for _, msg := range rb.buff { + if msg != nil { + buff.Write(msg.MessageId.truncate().Bytes()) + } + } + + return buff.Bytes() +} + +// unmarshalBuffer unmarshalls a byte slice into Buff information. +func unmarshalBuffer(b []byte) (newest, oldest uint32, + list []truncatedMessageId) { + buff := bytes.NewBuffer(b) + + // Read the newest index from the buffer + newest = binary.LittleEndian.Uint32(buff.Next(4)) + + // Read the oldest index from the buffer + oldest = binary.LittleEndian.Uint32(buff.Next(4)) + + // Initialize list to the number of truncated IDs + list = make([]truncatedMessageId, 0, buff.Len()/TruncatedMessageIdLen) + + // Read each truncatedMessageId and save into list + for next := buff.Next(TruncatedMessageIdLen); len(next) == TruncatedMessageIdLen; next = buff.Next(TruncatedMessageIdLen) { + list = append(list, newTruncatedMessageId(next)) + } + + return +} + +// saveMessage saves a Message to storage, using the truncatedMessageId +// as the KV key. +func (rb *Buff) saveMessage(msg *Message) error { + obj := &versioned.Object{ + Version: messageVersion, + Timestamp: netTime.Now(), + Data: msg.marshal(), + } + + return rb.kv.Set( + makeMessageKey(msg.MessageId.truncate()), messageVersion, obj) + +} + +// loadMessage loads a message given truncatedMessageId from storage. +func loadMessage(tmid truncatedMessageId, kv *versioned.KV) (*Message, error) { + // Load message from storage + vo, err := kv.Get(makeMessageKey(tmid), messageVersion) + if err != nil { + return nil, errors.Errorf(loadMessageErr, tmid, err) + } + + // Unmarshal message + return unmarshalMessage(vo.Data), nil +} + +// makeMessageKey generates te key used to save a message to storage. +func makeMessageKey(tmid truncatedMessageId) string { + return messageKey + tmid.String() +} diff --git a/storage/conversation/ring_test.go b/storage/conversation/ring_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5b1c02a0711c52ed19906ead6bb18ac25970c0d3 --- /dev/null +++ b/storage/conversation/ring_test.go @@ -0,0 +1,352 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package conversation + +import ( + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" + "reflect" + "strconv" + "testing" + "time" +) + +// TestNewBuff tests the creation of a Buff object. +func TestNewBuff(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + /// Check buffer was initialized to expected length + if len(testBuff.buff) != buffLen { + t.Fatalf("NewBuff did not produce buffer of "+ + "expected size. "+ + "\n\tExpected: %d"+ + "\n\tReceived slice size: %v", + buffLen, len(testBuff.lookup)) + } + + // Check that buffer exists in KV + _, err = kv.Prefix(ringBuffPrefix).Get(ringBuffKey, ringBuffVersion) + if err != nil { + t.Fatalf("Could not pull Buff from KV: %v", err) + } + +} + +// TestBuff_Add tests whether Buff.Add properly adds to the Buff object. +// This includes modifying the Buff.buff, buff.lookup and proper index updates. +func TestBuff_Add(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + // Insert initial message + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + mid := NewMessageIdFromBytes([]byte("test")) + err = testBuff.Add(mid, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Check that map entries exist + if len(testBuff.lookup) != 1 { + t.Fatalf("Message was not added to buffer's map") + } + + // Check that expected entry exists in the map + received, exists := testBuff.lookup[mid.truncate()] + if !exists { + t.Fatalf("Message does not exist in buffer after add.") + } + + // Reconstruct added message + expected := &Message{ + MessageId: mid, + Timestamp: timestamp, + id: 0, + } + + // Check map for inserted Message + if !reflect.DeepEqual(expected, received) { + t.Fatalf("Expected Message not found in map."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, received) + } + + // Check buffer for inserted Message + if !reflect.DeepEqual(testBuff.buff[0], expected) { + t.Fatalf("Expected message not found in buffer."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, testBuff.buff[0]) + } + + // Check that newest index was updated + if testBuff.newest != 0 { + t.Fatalf("Buffer's newest index was not updated to expected value."+ + "\n\tExpected: %d"+ + "\n\tReceived: %d", 0, testBuff.newest) + } +} + +// TestBuff_Add_Overflow inserts buffer length + 1 Message's to the buffer +// and ensures the oldest value is overwritten. +func TestBuff_Add_Overflow(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + // Insert message to be overwritten + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + oldest := NewMessageIdFromBytes([]byte("will be overwritten")) + err = testBuff.Add(oldest, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Insert buffLen elements to overwrite element inserted above + for i := 0; i < buffLen; i++ { + timestamp = time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + mid := NewMessageIdFromBytes([]byte(strconv.Itoa(i))) + err = testBuff.Add(mid, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + if testBuff.newest != uint32(i+1) { + t.Fatalf("Buffer's newest index was not updated for insert."+ + "\n\tExpected: %d"+ + "\n\tReceived: %d", i+1, testBuff.newest) + } + } + + // Test that the oldest index has been updated + if testBuff.oldest != 1 { + t.Fatalf("Buffer's oldest index was not updated to expected value."+ + "\n\tExpected: %d"+ + "\n\tReceived: %d", 1, testBuff.oldest) + } + + // Check that oldest value no longer exists in map + _, exists := testBuff.lookup[oldest.truncate()] + if exists { + t.Fatalf("Oldest value expected to be overwritten in map!") + } + +} + +// TestBuff_Get tests that Buff.Get returns the latest inserted Message. +func TestBuff_Get(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + // Insert initial message + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + mid := NewMessageIdFromBytes([]byte("test")) + err = testBuff.Add(mid, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Reconstruct expected message + expected := &Message{ + MessageId: mid, + Timestamp: timestamp, + id: 0, + } + + // Retrieve newly inserted value using Get() + received := testBuff.Get() + + // Check that retrieved value is expected + if !reflect.DeepEqual(received, expected) { + t.Fatalf("Get() did not retrieve expected value."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, received) + } + + // Construct new message to insert + newlyInsertedMid := NewMessageIdFromBytes([]byte("test2")) + newlyInserted := &Message{ + MessageId: newlyInsertedMid, + Timestamp: timestamp, + id: 1, + } + + // Add new message to buffer + err = testBuff.Add(newlyInsertedMid, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Ensure newly inserted message is returned by Get() + if !reflect.DeepEqual(testBuff.Get(), newlyInserted) { + t.Fatalf("Get() did not retrieve expected value."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, received) + } + +} + +// TestBuff_GetByMessageId tests that Buff.GetByMessageId returns the Message with +// the requested MessageId. +func TestBuff_GetByMessageId(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + // Insert initial message + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + mid := NewMessageIdFromBytes([]byte("test")) + err = testBuff.Add(mid, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Reconstruct expected message + expected := &Message{ + MessageId: mid, + Timestamp: timestamp, + id: 0, + } + + // Retrieve message using getter + received, err := testBuff.GetByMessageId(mid) + if err != nil { + t.Fatalf("GetMessageId error: %v", err) + } + + // Check retrieved value matches expected + if !reflect.DeepEqual(received, expected) { + t.Fatalf("GetByMessageId retrieved unexpected value."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, received) + } + +} + +// TestBuff_GetByMessageId_Error tests that Buff.GetByMessageId returns an error +// when requesting a MessageId that does not exist in Buff. +func TestBuff_GetByMessageId_Error(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + uninsertedMid := NewMessageIdFromBytes([]byte("test")) + + // Un-inserted MessageId should not exist in Buff, causing an error + _, err = testBuff.GetByMessageId(uninsertedMid) + if err == nil { + t.Fatalf("GetByMessageId should error when requesting a " + + "MessageId not in the buffer") + } + +} + +// TestBuff_GetNextMessage tests whether +func TestBuff_GetNextMessage(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + // Insert initial message + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + oldMsgId := NewMessageIdFromBytes([]byte("test")) + err = testBuff.Add(oldMsgId, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Insert next message + nextMsgId := NewMessageIdFromBytes([]byte("test2")) + err = testBuff.Add(nextMsgId, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + + // Construct expected message (the newest message) + expected := &Message{ + MessageId: nextMsgId, + Timestamp: timestamp, + id: 1, + } + + // Retrieve message after the old message + received, err := testBuff.GetNextMessage(oldMsgId) + if err != nil { + t.Fatalf("GetNextMessage error: %v", err) + } + + if !reflect.DeepEqual(expected, received) { + t.Fatalf("GetNextMessage did not retrieve expected value."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expected, received) + } + +} + +// TestBuff_marshalUnmarshal tests that the Buff's marshal and unmarshalBuffer functionality +// are inverse methods. +func TestLoadBuff(t *testing.T) { + // Initialize buffer + kv := versioned.NewKV(make(ekv.Memstore)) + buffLen := 20 + testBuff, err := NewBuff(kv, buffLen) + if err != nil { + t.Fatalf("NewBuff error: %v", err) + } + + // Insert buffLen elements to overwrite element inserted above + for i := 0; i < buffLen; i++ { + timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) + mid := NewMessageIdFromBytes([]byte(strconv.Itoa(i))) + err = testBuff.Add(mid, timestamp) + if err != nil { + t.Fatalf("Add error: %v", err) + } + } + + // Load buffer from storage + received, err := LoadBuff(kv) + if err != nil { + t.Fatalf("LoadBuff error: %v", err) + } + + if reflect.DeepEqual(testBuff, received) { + t.Fatalf("Loaded buffer does not match stored.") + } + +} diff --git a/storage/e2e/session.go b/storage/e2e/session.go index 5f05d882b572a9f6bd9a36caaa1d18252a85d5e0..8fbcae123c8a22ab87504bdac5188e78c9139902 100644 --- a/storage/e2e/session.go +++ b/storage/e2e/session.go @@ -22,6 +22,7 @@ import ( "gitlab.com/xx_network/crypto/randomness" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" + "math" "math/big" "sync" "testing" @@ -637,8 +638,8 @@ func (s *Session) generate(kv *versioned.KV) *versioned.KV { int64(p.MaxKeys-p.MinKeys)), s.baseKey.Bytes(), h).Int64() + int64(p.MinKeys)) - // start rekeying when 75% of keys have been used - s.rekeyThreshold = (numKeys * 3) / 4 + // start rekeying when enough keys have been used + s.rekeyThreshold = uint32(math.Ceil(s.e2eParams.RekeyThreshold*float64(numKeys))) // the total number of keys should be the number of rekeys plus the // number of keys to use diff --git a/storage/e2e/session_test.go b/storage/e2e/session_test.go index 58b2ed87b525963863bbf9c4d8b066fd2d8a533f..2719df3c7d28ee09726c0bb9eec31682e96b865c 100644 --- a/storage/e2e/session_test.go +++ b/storage/e2e/session_test.go @@ -263,12 +263,6 @@ func cmpSerializedFields(a *Session, b *Session) error { if a.e2eParams.NumRekeys != b.e2eParams.NumRekeys { return errors.New("NumRekeys differed") } - if a.e2eParams.MinNumKeys != b.e2eParams.MinNumKeys { - return errors.New("minNumKeys differed") - } - if a.e2eParams.TTLScalar != b.e2eParams.TTLScalar { - return errors.New("ttlScalar differed") - } if a.baseKey.Cmp(b.baseKey) != 0 { return errors.New("baseKey differed") } diff --git a/storage/session.go b/storage/session.go index fea09353f9438ab0f3ca8c13094a3f307e6b7230..e54ab01fbc4a2002bac5ffa445d906ffd8710186 100644 --- a/storage/session.go +++ b/storage/session.go @@ -73,6 +73,7 @@ type Session struct { uncheckedRounds *rounds.UncheckedRoundStore hostList *hostList.Store edgeCheck *edge.Store + ringBuff *conversation.Buff } // Initialize a new Session object @@ -490,5 +491,11 @@ func InitTestingSession(i interface{}) *Session { jww.FATAL.Panicf("Failed to create new edge Store: %+v", err) } + // todo: uncomment once NewBuff has been added properly + //s.ringBuff, err = conversation.NewBuff(s.kv, 100) + //if err != nil { + // jww.FATAL.Panicf("Failed to create ring buffer store: %+v", err) + //} + return s } diff --git a/storage/utility/cmixMessageBuffer.go b/storage/utility/cmixMessageBuffer.go index 8c67658932ab64e75d91e2272f3b94c9c90f6314..e072f05d4bf1dbc7fab4ba5371c32ba786f63860 100644 --- a/storage/utility/cmixMessageBuffer.go +++ b/storage/utility/cmixMessageBuffer.go @@ -137,7 +137,11 @@ func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) { } sm := m.(storedMessage) - msg := format.Unmarshal(sm.Msg) + msg, err := format.Unmarshal(sm.Msg) + if err != nil { + jww.FATAL.Panicf("Could not unmarshal for stored cmix "+ + "message buffer: %+v", err) + } recpient, err := id.Unmarshal(sm.Recipient) if err != nil { jww.FATAL.Panicf("Could nto get an id for stored cmix "+ diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go index 8060b4781fc25af49b97251abcb87981e9929cff..ed1920ff42f2cf0f122549584ef11d480d43ebbb 100644 --- a/storage/utility/meteredCmixMessageBuffer.go +++ b/storage/utility/meteredCmixMessageBuffer.go @@ -149,7 +149,11 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, b "update: %s", err) } - msfFormat := format.Unmarshal(msg.M) + msfFormat, err := format.Unmarshal(msg.M) + if err != nil { + jww.FATAL.Panicf("Failed to unmarshal message after count "+ + "update: %s", err) + } return msfFormat, rtnCnt, msg.Timestamp, true }