diff --git a/broadcast/interface.go b/broadcast/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..1da049fc8807fd0d170242b4d1c0de9ff68060e4 --- /dev/null +++ b/broadcast/interface.go @@ -0,0 +1,42 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + crypto "gitlab.com/elixxir/crypto/broadcast" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" +) + +// ListenerFunc is registered when creating a new symmetric broadcasting channel +// and receives all new broadcast messages for the channel. +type ListenerFunc func(payload []byte, + receptionID receptionID.EphemeralIdentity, round rounds.Round) + +// Symmetric manages the listening and broadcasting of a symmetric broadcast +// channel. +type Symmetric interface { + // MaxPayloadSize returns the maximum size for a broadcasted payload. + MaxPayloadSize() int + + // Get returns the crypto.Symmetric object containing the cryptographic and + // identifying information about the channel. + Get() crypto.Symmetric + + // Broadcast broadcasts the payload to the channel. The payload size must be + // equal to MaxPayloadSize. + Broadcast(payload []byte, cMixParams cmix.CMIXParams) ( + id.Round, ephemeral.Id, error) + + // Stop unregisters the listener callback and stops the channel's identity + // from being tracked. + Stop() +} diff --git a/broadcast/processor.go b/broadcast/processor.go new file mode 100644 index 0000000000000000000000000000000000000000..419072fdc06517f57b0ec1d788d9ec284dc4acfd --- /dev/null +++ b/broadcast/processor.go @@ -0,0 +1,46 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + crypto "gitlab.com/elixxir/crypto/broadcast" + "gitlab.com/elixxir/primitives/format" +) + +// Error messages. +const ( + errDecrypt = "[BCAST] Failed to decrypt payload for broadcast %s (%q): %+v" +) + +// processor manages the reception and decryption of a broadcast message. +// Adheres to the message.Processor interface. +type processor struct { + s *crypto.Symmetric + cb ListenerFunc +} + +// Process decrypts the broadcast message and sends the results on the callback. +func (p *processor) Process(msg format.Message, + receptionID receptionID.EphemeralIdentity, round rounds.Round) { + + payload, err := p.s.Decrypt(msg.GetContents(), msg.GetMac(), msg.GetKeyFP()) + if err != nil { + jww.ERROR.Printf(errDecrypt, p.s.ReceptionID, p.s.Name, err) + return + } + + go p.cb(payload, receptionID, round) +} + +// String returns a string identifying the processor for debugging purposes. +func (p *processor) String() string { + return "symmetricChannel-" + p.s.Name +} diff --git a/broadcast/processor_test.go b/broadcast/processor_test.go new file mode 100644 index 0000000000000000000000000000000000000000..04e2f4a791485e7b5b7410f62785164d9d18f753 --- /dev/null +++ b/broadcast/processor_test.go @@ -0,0 +1,69 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "bytes" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + crypto "gitlab.com/elixxir/crypto/broadcast" + "gitlab.com/elixxir/crypto/cmix" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/crypto/signature/rsa" + "gitlab.com/xx_network/primitives/id" + "testing" + "time" +) + +// Tests that process.Process properly decrypts the payload and passes it to the +// callback. +func Test_processor_Process(t *testing.T) { + rng := csprng.NewSystemRNG() + rsaPrivKey, err := rsa.GenerateKey(rng, 64) + if err != nil { + t.Errorf("Failed to generate RSA key: %+v", err) + } + s := &crypto.Symmetric{ + ReceptionID: id.NewIdFromString("channel", id.User, t), + Name: "MyChannel", + Description: "This is my channel that I channel stuff on.", + Salt: cmix.NewSalt(rng, 32), + RsaPubKey: rsaPrivKey.GetPublic(), + } + + cbChan := make(chan []byte) + cb := func(payload []byte, _ receptionID.EphemeralIdentity, _ rounds.Round) { + cbChan <- payload + } + + p := &processor{ + s: s, + cb: cb, + } + + msg := format.NewMessage(4092) + payload := make([]byte, msg.ContentsSize()) + _, _ = rng.Read(payload) + encryptedPayload, mac, fp := p.s.Encrypt(payload, rng) + msg.SetContents(encryptedPayload) + msg.SetMac(mac) + msg.SetKeyFP(fp) + + p.Process(msg, receptionID.EphemeralIdentity{}, rounds.Round{}) + + select { + case r := <-cbChan: + if !bytes.Equal(r, payload) { + t.Errorf("Did not receive expected payload."+ + "\nexpected: %v\nreceived: %v", payload, r) + } + case <-time.After(15 * time.Millisecond): + t.Error("Timed out waiting for listener channel to be called.") + } +} diff --git a/broadcast/sizedBroadcast.go b/broadcast/sizedBroadcast.go new file mode 100644 index 0000000000000000000000000000000000000000..e909ab30014ca926ad03a9040d8e5bbc9211ca29 --- /dev/null +++ b/broadcast/sizedBroadcast.go @@ -0,0 +1,75 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "encoding/binary" + "github.com/pkg/errors" +) + +// Message field sizes. +const ( + sizeSize = 2 + sizedBroadcastMinSize = sizeSize +) + +// Error messages. +const ( + // NewSizedBroadcast + errNewSizedBroadcastMaxSize = "size of payload and its size %d too large to fit in max payload size %d" + + // DecodeSizedBroadcast + errDecodeSizedBroadcastDataLen = "size of data %d must be greater than %d" + errDecodeSizedBroadcastSize = "stated payload size %d larger than provided data %d" +) + +/* ++---------------------------+ +| cMix Message Contents | ++---------+-----------------+ +| Size | Payload | +| 2 bytes | remaining space | ++---------+-----------------+ +*/ + +// NewSizedBroadcast creates a new broadcast with its size information embedded. +// The maxPayloadSize is the maximum size of the resulting payload. Returns an +// error when the sized broadcast cannot fit in the max payload size. +func NewSizedBroadcast(maxPayloadSize int, payload []byte) ([]byte, error) { + if len(payload)+sizedBroadcastMinSize > maxPayloadSize { + return nil, errors.Errorf(errNewSizedBroadcastMaxSize, + len(payload)+sizedBroadcastMinSize, maxPayloadSize) + } + + b := make([]byte, sizeSize) + binary.LittleEndian.PutUint16(b, uint16(len(payload))) + + return append(b, payload...), nil +} + +// DecodeSizedBroadcast the data into its original payload of the correct size. +func DecodeSizedBroadcast(data []byte) ([]byte, error) { + if len(data) < sizedBroadcastMinSize { + return nil, errors.Errorf( + errDecodeSizedBroadcastDataLen, len(data), sizedBroadcastMinSize) + } + + size := binary.LittleEndian.Uint16(data[:sizeSize]) + if int(size) > len(data[sizeSize:]) { + return nil, errors.Errorf( + errDecodeSizedBroadcastSize, size, len(data[sizeSize:])) + } + + return data[sizeSize : size+sizeSize], nil +} + +// MaxSizedBroadcastPayloadSize returns the maximum payload size in a sized +// broadcast for the given out message max payload size. +func MaxSizedBroadcastPayloadSize(maxPayloadSize int) int { + return maxPayloadSize - sizedBroadcastMinSize +} diff --git a/broadcast/sizedBroadcast_test.go b/broadcast/sizedBroadcast_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a065101498b5bb6fe22ba66804f85190b8aab887 --- /dev/null +++ b/broadcast/sizedBroadcast_test.go @@ -0,0 +1,115 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "bytes" + "fmt" + "testing" +) + +// Tests that a payload smaller than the max payload size encoded via +// NewSizedBroadcast and decoded via DecodeSizedBroadcast matches the original. +func TestNewSizedBroadcast_DecodeSizedBroadcast_SmallPayload(t *testing.T) { + const maxPayloadSize = 512 + payload := []byte("This is my payload message.") + + data, err := NewSizedBroadcast(maxPayloadSize, payload) + if err != nil { + t.Errorf("NewSizedBroadcast returned an error: %+v", err) + } + + decodedPayload, err := DecodeSizedBroadcast(data) + if err != nil { + t.Errorf("DecodeSizedBroadcast returned an error: %+v", err) + } + + if !bytes.Equal(payload, decodedPayload) { + t.Errorf("Decoded payload does not match original."+ + "\nexpected: %q\nreceived: %q", payload, decodedPayload) + } +} + +// Tests that a payload the same size as the max payload size encoded via +// NewSizedBroadcast and decoded via DecodeSizedBroadcast matches the original. +func TestNewSizedBroadcast_DecodeSizedBroadcast_FullSizesPayload(t *testing.T) { + payload := []byte("This is my payload message.") + maxPayloadSize := len(payload) + sizeSize + + data, err := NewSizedBroadcast(maxPayloadSize, payload) + if err != nil { + t.Errorf("NewSizedBroadcast returned an error: %+v", err) + } + + decodedPayload, err := DecodeSizedBroadcast(data) + if err != nil { + t.Errorf("DecodeSizedBroadcast returned an error: %+v", err) + } + + if !bytes.Equal(payload, decodedPayload) { + t.Errorf("Decoded payload does not match original."+ + "\nexpected: %q\nreceived: %q", payload, decodedPayload) + } +} + +// Error path: tests that NewSizedBroadcast returns an error when the payload is +// larger than the max payload size. +func TestNewSizedBroadcast_MaxPayloadSizeError(t *testing.T) { + payload := []byte("This is my payload message.") + maxPayloadSize := len(payload) + expectedErr := fmt.Sprintf(errNewSizedBroadcastMaxSize, + len(payload)+sizedBroadcastMinSize, maxPayloadSize) + + _, err := NewSizedBroadcast(maxPayloadSize, payload) + if err == nil || err.Error() != expectedErr { + t.Errorf("NewSizedBroadcast did not return the expected error when "+ + "the payload is too large.\nexpected: %s\nreceived: %+v", + expectedErr, err) + } +} + +// Error path: tests that DecodeSizedBroadcast returns an error when the length +// of the data is shorter than the minimum length of a sized broadcast. +func TestDecodeSizedBroadcast_DataTooShortError(t *testing.T) { + data := []byte{0} + expectedErr := fmt.Sprintf( + errDecodeSizedBroadcastDataLen, len(data), sizedBroadcastMinSize) + + _, err := DecodeSizedBroadcast(data) + if err == nil || err.Error() != expectedErr { + t.Errorf("DecodeSizedBroadcast did not return the expected error "+ + "when the data is too small.\nexpected: %s\nreceived: %+v", + expectedErr, err) + } +} + +// Error path: tests that DecodeSizedBroadcast returns an error when the payload +// size is larger than the actual payload contained in the data. +func TestDecodeSizedBroadcast_SizeMismatchError(t *testing.T) { + data := []byte{255, 0, 10} + expectedErr := fmt.Sprintf( + errDecodeSizedBroadcastSize, data[0], len(data[sizeSize:])) + + _, err := DecodeSizedBroadcast(data) + if err == nil || err.Error() != expectedErr { + t.Errorf("DecodeSizedBroadcast did not return the expected error "+ + "when the size is too large.\nexpected: %s\nreceived: %+v", + expectedErr, err) + } +} + +// Tests that MaxSizedBroadcastPayloadSize returns the correct max size. +func TestMaxSizedBroadcastPayloadSize(t *testing.T) { + maxPayloadSize := 512 + expectedSize := maxPayloadSize - sizedBroadcastMinSize + receivedSize := MaxSizedBroadcastPayloadSize(maxPayloadSize) + if receivedSize != expectedSize { + t.Errorf("Incorrect max paylaod size.\nexpected: %d\nreceived: %d", + expectedSize, receivedSize) + } +} diff --git a/broadcast/symmetricClient.go b/broadcast/symmetricClient.go new file mode 100644 index 0000000000000000000000000000000000000000..28d718868a0659f9f2816f7fd2b16a4feb9cb116 --- /dev/null +++ b/broadcast/symmetricClient.go @@ -0,0 +1,143 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity" + "gitlab.com/elixxir/client/cmix/message" + crypto "gitlab.com/elixxir/crypto/broadcast" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "time" +) + +// Error messages. +const ( + // symmetricClient.Broadcast + errNetworkHealth = "cannot send broadcast when the network is not healthy" + errPayloadSize = "size of payload %d must be %d" +) + +// Tags. +const ( + cMixSendTag = "SymmBcast" + symmetricBroadcastServiceTag = "SymmetricBroadcast" +) + +// symmetricClient manages the sending and receiving of symmetric broadcast +// messages on a given symmetric broadcast channel. Adheres to the Symmetric +// interface. +type symmetricClient struct { + channel crypto.Symmetric + net Client + rng *fastRNG.StreamGenerator +} + +// Client contains the methods from cmix.Client that are required by +// symmetricClient. +type Client interface { + GetMaxMessageLength() int + Send(recipient *id.ID, fingerprint format.Fingerprint, + service message.Service, payload, mac []byte, + cMixParams cmix.CMIXParams) (id.Round, ephemeral.Id, error) + IsHealthy() bool + AddIdentity(id *id.ID, validUntil time.Time, persistent bool) + AddService(clientID *id.ID, newService message.Service, + response message.Processor) + DeleteClientService(clientID *id.ID) + RemoveIdentity(id *id.ID) +} + +// NewSymmetricClient generates a new Symmetric for the given channel. It starts +// listening for new messages on the callback immediately. +func NewSymmetricClient(channel crypto.Symmetric, listenerCb ListenerFunc, + net Client, rng *fastRNG.StreamGenerator) Symmetric { + // Add channel's identity + net.AddIdentity(channel.ReceptionID, identity.Forever, true) + + // Create new service + service := message.Service{ + Identifier: channel.ReceptionID.Bytes(), + Tag: symmetricBroadcastServiceTag, + } + + // Create new message processor + p := &processor{ + s: &channel, + cb: listenerCb, + } + + // Add service + net.AddService(channel.ReceptionID, service, p) + + jww.INFO.Printf("New symmetric broadcast client created for channel %q (%s)", + channel.Name, channel.ReceptionID) + + return &symmetricClient{ + channel: channel, + net: net, + rng: rng, + } +} + +// MaxPayloadSize returns the maximum size for a broadcasted payload. +func (s *symmetricClient) MaxPayloadSize() int { + return s.net.GetMaxMessageLength() +} + +// Get returns the crypto.Symmetric object containing the cryptographic and +// identifying information about the channel. +func (s *symmetricClient) Get() crypto.Symmetric { + return s.channel +} + +// Broadcast broadcasts the payload to the channel. +func (s *symmetricClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) ( + id.Round, ephemeral.Id, error) { + if !s.net.IsHealthy() { + return 0, ephemeral.Id{}, errors.New(errNetworkHealth) + } + + if len(payload) != s.MaxPayloadSize() { + return 0, ephemeral.Id{}, + errors.Errorf(errPayloadSize, len(payload), s.MaxPayloadSize()) + } + + // Encrypt payload + rng := s.rng.GetStream() + encryptedPayload, mac, fp := s.channel.Encrypt(payload, rng) + rng.Close() + + // Create service + service := message.Service{ + Identifier: s.channel.ReceptionID.Bytes(), + Tag: symmetricBroadcastServiceTag, + } + + if cMixParams.DebugTag == cmix.DefaultDebugTag { + cMixParams.DebugTag = cMixSendTag + } + + return s.net.Send( + s.channel.ReceptionID, fp, service, encryptedPayload, mac, cMixParams) +} + +// Stop unregisters the listener callback and stops the channel's identity +// from being tracked. +func (s *symmetricClient) Stop() { + // Removes currently tracked identity + s.net.RemoveIdentity(s.channel.ReceptionID) + + // Delete all registered services + s.net.DeleteClientService(s.channel.ReceptionID) +} diff --git a/broadcast/symmetricClient_test.go b/broadcast/symmetricClient_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4b6f60ac89fb1b7a06e5ce1a3fa7398720848250 --- /dev/null +++ b/broadcast/symmetricClient_test.go @@ -0,0 +1,135 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "bytes" + "fmt" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + crypto "gitlab.com/elixxir/crypto/broadcast" + cMixCrypto "gitlab.com/elixxir/crypto/cmix" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" + "reflect" + "sync" + "testing" + "time" +) + +// Tests that symmetricClient adheres to the Symmetric interface. +var _ Symmetric = (*symmetricClient)(nil) + +// Tests that symmetricClient adheres to the Symmetric interface. +var _ Client = (cmix.Client)(nil) + +// Tests that all clients listening on a symmetric broadcast channel receive the +// message that is broadcasted. +func Test_symmetricClient_Smoke(t *testing.T) { + // Initialise objects used by all clients + cMixHandler := newMockCmixHandler() + rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG) + channel := crypto.Symmetric{ + ReceptionID: id.NewIdFromString("ReceptionID", id.User, t), + Name: "MyChannel", + Description: "This is my channel about stuff.", + Salt: cMixCrypto.NewSalt(csprng.NewSystemRNG(), 32), + RsaPubKey: newRsaPubKey(64, t), + } + + // Set up callbacks, callback channels, and the symmetric clients + const n = 5 + cbChans := make([]chan []byte, n) + clients := make([]Symmetric, n) + for i := range clients { + cbChan := make(chan []byte, 10) + cb := func(payload []byte, _ receptionID.EphemeralIdentity, + _ rounds.Round) { + cbChan <- payload + } + + s := NewSymmetricClient(channel, cb, newMockCmix(cMixHandler), rngGen) + + cbChans[i] = cbChan + clients[i] = s + + // Test that Get returns the expected channel + if !reflect.DeepEqual(s.Get(), channel) { + t.Errorf("Client %d returned wrong channel."+ + "\nexpected: %+v\nreceived: %+v", i, channel, s.Get()) + } + } + + // Send broadcast from each client + for i := range clients { + payload := make([]byte, newMockCmix(cMixHandler).GetMaxMessageLength()) + copy(payload, + fmt.Sprintf("Hello from client %d of %d.", i, len(clients))) + + // Start processes that waits for each client to receive broadcast + var wg sync.WaitGroup + for j := range cbChans { + wg.Add(1) + go func(i, j int, cbChan chan []byte) { + defer wg.Done() + select { + case r := <-cbChan: + if !bytes.Equal(payload, r) { + t.Errorf("Client %d failed to receive expected "+ + "payload from client %d."+ + "\nexpected: %q\nreceived: %q", j, i, payload, r) + } + case <-time.After(25 * time.Millisecond): + t.Errorf("Client %d timed out waiting for broadcast "+ + "payload from client %d.", j, i) + } + }(i, j, cbChans[j]) + } + + // Broadcast payload + _, _, err := clients[i].Broadcast(payload, cmix.GetDefaultCMIXParams()) + if err != nil { + t.Errorf("Client %d failed to send broadcast: %+v", i, err) + } + + // Wait for all clients to receive payload or time out + wg.Wait() + } + + // Stop each client + for i := range clients { + clients[i].Stop() + } + + payload := make([]byte, newMockCmix(cMixHandler).GetMaxMessageLength()) + copy(payload, "This message should not get through.") + + // Start waiting on channels and error if anything is received + var wg sync.WaitGroup + for i := range cbChans { + wg.Add(1) + go func(i int, cbChan chan []byte) { + defer wg.Done() + select { + case r := <-cbChan: + t.Errorf("Client %d received message: %q", i, r) + case <-time.After(25 * time.Millisecond): + } + }(i, cbChans[i]) + } + + // Broadcast payload + _, _, err := clients[0].Broadcast(payload, cmix.GetDefaultCMIXParams()) + if err != nil { + t.Errorf("Client 0 failed to send broadcast: %+v", err) + } + + wg.Wait() +} diff --git a/broadcast/utils_test.go b/broadcast/utils_test.go new file mode 100644 index 0000000000000000000000000000000000000000..944fd3c3993dce55cfe153b8b1c29cd828b66c5c --- /dev/null +++ b/broadcast/utils_test.go @@ -0,0 +1,130 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package broadcast + +import ( + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/crypto/signature/rsa" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "math/rand" + "sync" + "testing" + "time" +) + +// newRsaPubKey generates a new random RSA public key for testing. +func newRsaPubKey(seed int64, t *testing.T) *rsa.PublicKey { + prng := rand.New(rand.NewSource(seed)) + privKey, err := rsa.GenerateKey(prng, 64) + if err != nil { + t.Errorf("Failed to generate new RSA key: %+v", err) + } + + return privKey.GetPublic() +} + +//////////////////////////////////////////////////////////////////////////////// +// Mock cMix Client // +//////////////////////////////////////////////////////////////////////////////// + +type mockCmixHandler struct { + processorMap map[id.ID]map[string][]message.Processor + sync.Mutex +} + +func newMockCmixHandler() *mockCmixHandler { + return &mockCmixHandler{ + processorMap: make(map[id.ID]map[string][]message.Processor), + } +} + +type mockCmix struct { + numPrimeBytes int + health bool + handler *mockCmixHandler +} + +func newMockCmix(handler *mockCmixHandler) *mockCmix { + return &mockCmix{ + numPrimeBytes: 4096, + health: true, + handler: handler, + } +} + +func (m *mockCmix) GetMaxMessageLength() int { + return format.NewMessage(m.numPrimeBytes).ContentsSize() +} + +func (m *mockCmix) Send(recipient *id.ID, fingerprint format.Fingerprint, + service message.Service, payload, mac []byte, _ cmix.CMIXParams) ( + id.Round, ephemeral.Id, error) { + msg := format.NewMessage(m.numPrimeBytes) + msg.SetContents(payload) + msg.SetMac(mac) + msg.SetKeyFP(fingerprint) + + m.handler.Lock() + defer m.handler.Unlock() + for _, p := range m.handler.processorMap[*recipient][service.Tag] { + p.Process(msg, receptionID.EphemeralIdentity{}, rounds.Round{}) + } + + return 0, ephemeral.Id{}, nil +} + +func (m *mockCmix) IsHealthy() bool { + return m.health +} + +func (m *mockCmix) AddIdentity(id *id.ID, _ time.Time, _ bool) { + m.handler.Lock() + defer m.handler.Unlock() + + if _, exists := m.handler.processorMap[*id]; exists { + return + } + + m.handler.processorMap[*id] = make(map[string][]message.Processor) +} + +func (m *mockCmix) AddService(clientID *id.ID, newService message.Service, + response message.Processor) { + m.handler.Lock() + defer m.handler.Unlock() + + if _, exists := m.handler.processorMap[*clientID][newService.Tag]; !exists { + m.handler.processorMap[*clientID][newService.Tag] = + []message.Processor{response} + return + } + + m.handler.processorMap[*clientID][newService.Tag] = + append(m.handler.processorMap[*clientID][newService.Tag], response) +} + +func (m *mockCmix) DeleteClientService(clientID *id.ID) { + m.handler.Lock() + defer m.handler.Unlock() + + for tag := range m.handler.processorMap[*clientID] { + delete(m.handler.processorMap[*clientID], tag) + } +} + +func (m *mockCmix) RemoveIdentity(id *id.ID) { + m.handler.Lock() + defer m.handler.Unlock() + + delete(m.handler.processorMap, *id) +} diff --git a/cmix/identity/tracker.go b/cmix/identity/tracker.go index 51f324b67f88c6a01512391817cb3554b04117d8..a6162ff7418437f307e16d9983fd6e9ec8f151bf 100644 --- a/cmix/identity/tracker.go +++ b/cmix/identity/tracker.go @@ -142,7 +142,7 @@ func (t manager) StartProcesses() stoppable.Stoppable { func (t *manager) AddIdentity(id *id.ID, validUntil time.Time, persistent bool) { t.newIdentity <- TrackedID{ NextGeneration: netTime.Now().Add(-time.Second), - LastGeneration: time.Time{}, + LastGeneration: netTime.Now().Add(-time.Duration(ephemeral.Period)), Source: id, ValidUntil: validUntil, Persistent: persistent, diff --git a/go.mod b/go.mod index 7e99394cf5300b8d2a17c9659890f42a16b70330..412d59c59862e11e2ea681e4b3048c8c49f0ade2 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,8 @@ require ( github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 gitlab.com/elixxir/comms v0.0.4-0.20220323190139-9ed75f3a8b2c - gitlab.com/elixxir/crypto v0.0.7-0.20220420170330-979607dcc6da - gitlab.com/elixxir/ekv v0.1.7 + gitlab.com/elixxir/crypto v0.0.7-0.20220425192911-a23209a58073 + gitlab.com/elixxir/ekv v0.1.7 gitlab.com/elixxir/primitives v0.0.3-0.20220330212736-cce83b5f948f gitlab.com/xx_network/comms v0.0.4-0.20220315161313-76acb14429ac gitlab.com/xx_network/crypto v0.0.5-0.20220317171841-084640957d71 diff --git a/go.sum b/go.sum index 68096279f9c5896d461e7d7810d4a99058a52a93..449abdbda8c4778ece9f8de9852071e33cf18f92 100644 --- a/go.sum +++ b/go.sum @@ -303,6 +303,16 @@ gitlab.com/elixxir/crypto v0.0.7-0.20220418163058-a76028e93dd3 h1:tYr7CjBj3p4tmU gitlab.com/elixxir/crypto v0.0.7-0.20220418163058-a76028e93dd3/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= gitlab.com/elixxir/crypto v0.0.7-0.20220420170330-979607dcc6da h1:SZQTa7Gp2vPAwu3yQDhkiKPhFqQXCsiRi0s40e/em/Y= gitlab.com/elixxir/crypto v0.0.7-0.20220420170330-979607dcc6da/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= +gitlab.com/elixxir/crypto v0.0.7-0.20220421222239-fb92da398627 h1:vJ+hTCN0W+CHAUquqHseJC9+mWmTMdTViQbha7zJ/Rs= +gitlab.com/elixxir/crypto v0.0.7-0.20220421222239-fb92da398627/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= +gitlab.com/elixxir/crypto v0.0.7-0.20220425183558-d1b1b40a5ba8 h1:HifQbDMGC9jgnsT6pGilE4JKaYbA/X6+YF9qe+FhjxY= +gitlab.com/elixxir/crypto v0.0.7-0.20220425183558-d1b1b40a5ba8/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= +gitlab.com/elixxir/crypto v0.0.7-0.20220425185754-5050a93bc64c h1:jUyeQpETz8iKbxZirZNUAlul66sUwzMxtKwCUJPrhBE= +gitlab.com/elixxir/crypto v0.0.7-0.20220425185754-5050a93bc64c/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= +gitlab.com/elixxir/crypto v0.0.7-0.20220425192754-f9f5ca6ad2eb h1:K10bnRhnZS6XFxkwGkI6wMLAWLN/lTy9Qjx6f6exSsE= +gitlab.com/elixxir/crypto v0.0.7-0.20220425192754-f9f5ca6ad2eb/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= +gitlab.com/elixxir/crypto v0.0.7-0.20220425192911-a23209a58073 h1:T0BK30t1F9M8RCdsuMN1bL34fSLNPqjqC+oufnjPh3I= +gitlab.com/elixxir/crypto v0.0.7-0.20220425192911-a23209a58073/go.mod h1:JkByWX/TXCjdu6pRJsx+jwttbBGvlAljYSJMImDmt+4= gitlab.com/elixxir/ekv v0.1.6 h1:M2hUSNhH/ChxDd+s8xBqSEKgoPtmE6hOEBqQ73KbN6A= gitlab.com/elixxir/ekv v0.1.6/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4= gitlab.com/elixxir/ekv v0.1.7 h1:OW2z+N4QCqqMFzouAwFTWWMKz0Y/PDhyYReN7gQ5NiQ= diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go index 8f513bdb948a61d11b7486eea80848a97a9876b0..f661e73ca3daed2070079501fc1db73567840862 100644 --- a/storage/utility/messageBuffer.go +++ b/storage/utility/messageBuffer.go @@ -230,7 +230,7 @@ func (mb *MessageBuffer) Add(m interface{}) interface{} { // Save buffer err = mb.save() if err != nil { - jww.FATAL.Panicf("Error whilse saving buffer: %v", err) + jww.FATAL.Panicf("Error while saving buffer: %v", err) } return m