Skip to content
Snippets Groups Projects
Commit 2c2b16c2 authored by Jono Wenger's avatar Jono Wenger
Browse files

Fix network/message errors and formatting

parent de7e3954
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
...@@ -15,23 +15,23 @@ import ( ...@@ -15,23 +15,23 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// FingerprintsManager is a thread-safe map, mapping format.Fingerprint's to // FingerprintsManager is a thread-safe map, mapping format.Fingerprint's to a
// a Handler object. // Handler object.
type FingerprintsManager struct { type FingerprintsManager struct {
fpMap map[id.ID]map[format.Fingerprint]Processor fpMap map[id.ID]map[format.Fingerprint]Processor
sync.Mutex sync.Mutex
} }
// newFingerprints is a constructor function for the Fingerprints tracker. // newFingerprints is a constructor function for the fingerprints tracker.
func newFingerprints() *FingerprintsManager { func newFingerprints() *FingerprintsManager {
return &FingerprintsManager{ return &FingerprintsManager{
fpMap: make(map[id.ID]map[format.Fingerprint]Processor), fpMap: make(map[id.ID]map[format.Fingerprint]Processor),
} }
} }
// Pop returns the associated handler to the fingerprint and removes // pop returns the associated handler to the fingerprint and removes it from the
// it from our list. // list.
// CRITICAL: it is never ok to process a fingerprint twice. This is a security // CRITICAL: It is never ok to process a fingerprint twice. This is a security
// vulnerability. // vulnerability.
func (f *FingerprintsManager) pop(clientID *id.ID, func (f *FingerprintsManager) pop(clientID *id.ID,
fingerprint format.Fingerprint) ( fingerprint format.Fingerprint) (
...@@ -52,10 +52,10 @@ func (f *FingerprintsManager) pop(clientID *id.ID, ...@@ -52,10 +52,10 @@ func (f *FingerprintsManager) pop(clientID *id.ID,
return nil, false return nil, false
} }
// AddFingerprint is a thread-safe setter for the Fingerprints // AddFingerprint is a thread-safe setter for the FingerprintsManager map.
// map. AddFingerprint maps the given fingerprint key to the handler // AddFingerprint maps the given fingerprint key to the handler value. If there
// value. If there is already an entry for this fingerprint, the // is already an entry for this fingerprint, the method returns with no write
// method returns with no write operation. // operation.
func (f *FingerprintsManager) AddFingerprint(clientID *id.ID, func (f *FingerprintsManager) AddFingerprint(clientID *id.ID,
fingerprint format.Fingerprint, mp Processor) error { fingerprint format.Fingerprint, mp Processor) error {
f.Lock() f.Lock()
...@@ -69,16 +69,15 @@ func (f *FingerprintsManager) AddFingerprint(clientID *id.ID, ...@@ -69,16 +69,15 @@ func (f *FingerprintsManager) AddFingerprint(clientID *id.ID,
} }
if _, exists := f.fpMap[cid][fingerprint]; exists { if _, exists := f.fpMap[cid][fingerprint]; exists {
return errors.Errorf("fingerprint %s already exists", return errors.Errorf("fingerprint %s already exists", fingerprint)
fingerprint)
} }
f.fpMap[cid][fingerprint] = mp f.fpMap[cid][fingerprint] = mp
return nil return nil
} }
// DeleteFingerprint is a thread-safe deletion operation on the Fingerprints map. // DeleteFingerprint is a thread-safe deletion operation on the Fingerprints
// It will remove the entry for the given fingerprint from the map. // map. It will remove the entry for the given fingerprint from the map.
func (f *FingerprintsManager) DeleteFingerprint(clientID *id.ID, func (f *FingerprintsManager) DeleteFingerprint(clientID *id.ID,
fingerprint format.Fingerprint) { fingerprint format.Fingerprint) {
f.Lock() f.Lock()
......
...@@ -9,7 +9,7 @@ package message ...@@ -9,7 +9,7 @@ package message
import ( import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
// Unit test. // Unit test.
func Test_newFingerprints(t *testing.T) { func Test_newFingerprints(t *testing.T) {
expected := &FingerprintsManager{ expected := &FingerprintsManager{
fpMap: make(map[id.ID]map[format.Fingerprint]interfaces.MessageProcessor), fpMap: make(map[id.ID]map[format.Fingerprint]Processor),
} }
received := newFingerprints() received := newFingerprints()
...@@ -87,7 +87,7 @@ func TestFingerprintsManager_AddFingerprint(t *testing.T) { ...@@ -87,7 +87,7 @@ func TestFingerprintsManager_AddFingerprint(t *testing.T) {
} }
// Check that received value contains the expected data // Check that received value contains the expected data
expected := map[format.Fingerprint]interfaces.MessageProcessor{fp: mp} expected := map[format.Fingerprint]Processor{fp: mp}
if !reflect.DeepEqual(received, expected) { if !reflect.DeepEqual(received, expected) {
t.Fatalf("Add error: Map does not contain expected data."+ t.Fatalf("Add error: Map does not contain expected data."+
"\nexpected: %v\nreceived: %v", expected, received) "\nexpected: %v\nreceived: %v", expected, received)
...@@ -129,7 +129,7 @@ func TestFingerprintsManager_DeleteClientFingerprints(t *testing.T) { ...@@ -129,7 +129,7 @@ func TestFingerprintsManager_DeleteClientFingerprints(t *testing.T) {
numTests := 100 numTests := 100
cid := id.NewIdFromString("clientID", id.User, t) cid := id.NewIdFromString("clientID", id.User, t)
fingerprints := make([]format.Fingerprint, 0, numTests) fingerprints := make([]format.Fingerprint, 0, numTests)
processors := make([]interfaces.MessageProcessor, 0, numTests) processors := make([]Processor, 0, numTests)
for i := 0; i < numTests; i++ { for i := 0; i < numTests; i++ {
fp := format.NewFingerprint([]byte(strconv.Itoa(i))) fp := format.NewFingerprint([]byte(strconv.Itoa(i)))
mp := NewMockMsgProcessor(t) mp := NewMockMsgProcessor(t)
...@@ -170,7 +170,7 @@ func (mock *MockMsgProcessor) MarkFingerprintUsed(_ format.Fingerprint) { ...@@ -170,7 +170,7 @@ func (mock *MockMsgProcessor) MarkFingerprintUsed(_ format.Fingerprint) {
return return
} }
func (mock *MockMsgProcessor) Process(format.Message, interfaces.EphemeralIdentity, func (mock *MockMsgProcessor) Process(format.Message, receptionID.EphemeralIdentity,
*mixmessages.RoundInfo) { *mixmessages.RoundInfo) {
return return
} }
...@@ -10,7 +10,6 @@ package message ...@@ -10,7 +10,6 @@ package message
import ( import (
"fmt" "fmt"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/preimage"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -29,21 +28,27 @@ func (p *handler) handleMessages(stop *stoppable.Single) { ...@@ -29,21 +28,27 @@ func (p *handler) handleMessages(stop *stoppable.Single) {
wg.Add(len(bundle.Messages)) wg.Add(len(bundle.Messages))
for i := range bundle.Messages { for i := range bundle.Messages {
msg := bundle.Messages[i] msg := bundle.Messages[i]
go func() { go func() {
count, ts := p.inProcess.Add(msg, bundle.RoundInfo, bundle.Identity) count, ts := p.inProcess.Add(
msg, bundle.RoundInfo, bundle.Identity)
wg.Done() wg.Done()
success := p.handleMessage(msg, bundle) success := p.handleMessage(msg, bundle)
if success { if success {
p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity) p.inProcess.Remove(
msg, bundle.RoundInfo, bundle.Identity)
} else { } else {
// fail the message if any part of the decryption fails, // Fail the message if any part of the decryption
// unless it is the last attempts and has been in the buffer long // fails, unless it is the last attempts and has
// enough, in which case remove it // been in the buffer long enough, in which case
// remove it
if count == p.param.MaxChecksInProcessMessage && if count == p.param.MaxChecksInProcessMessage &&
netTime.Since(ts) > p.param.InProcessMessageWait { netTime.Since(ts) > p.param.InProcessMessageWait {
p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity) p.inProcess.Remove(
msg, bundle.RoundInfo, bundle.Identity)
} else { } else {
p.inProcess.Failed(msg, bundle.RoundInfo, bundle.Identity) p.inProcess.Failed(
msg, bundle.RoundInfo, bundle.Identity)
} }
} }
...@@ -63,26 +68,26 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { ...@@ -63,26 +68,26 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool {
identity := bundle.Identity identity := bundle.Identity
round := bundle.RoundInfo round := bundle.RoundInfo
// If we have a fingerprint, process it. // If we have a fingerprint, process it
if proc, exists := p.pop(identity.Source, fingerprint); exists { if proc, exists := p.pop(identity.Source, fingerprint); exists {
proc.Process(ecrMsg, identity, round) proc.Process(ecrMsg, identity, round)
return true return true
} }
triggers, exists := p.get(identity.Source, ecrMsg.GetSIH(), ecrMsg.GetContents()) triggers, exists := p.get(
identity.Source, ecrMsg.GetSIH(), ecrMsg.GetContents())
if exists { if exists {
for _, t := range triggers { for _, t := range triggers {
go t.Process(ecrMsg, identity, round) go t.Process(ecrMsg, identity, round)
} }
if len(triggers) == 0 { if len(triggers) == 0 {
jww.ERROR.Printf("empty service list for %s", jww.ERROR.Printf("empty service list for %s", ecrMsg.GetSIH())
ecrMsg.GetSIH()) // get preimage
} }
return true return true
} else { } else {
// TODO: delete this else block because it should not be needed. // TODO: Delete this else block because it should not be needed.
jww.INFO.Printf("checking backup %v", preimage.MakeDefault(identity.Source)) jww.INFO.Printf("checking backup %v", identity.Source)
// //if it doesnt exist, check against the default fingerprint for the identity // //if it does not exist, check against the default fingerprint for the identity
// forMe = fingerprint2.CheckIdentityFP(ecrMsg.GetSIH(), // forMe = fingerprint2.CheckIdentityFP(ecrMsg.GetSIH(),
// ecrMsgContents, preimage.MakeDefault(identity.Source)) // ecrMsgContents, preimage.MakeDefault(identity.Source))
} }
...@@ -91,6 +96,8 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool { ...@@ -91,6 +96,8 @@ func (p *handler) handleMessage(ecrMsg format.Message, bundle Bundle) bool {
"msgDigest: %s, not determined to be for client", "msgDigest: %s, not determined to be for client",
ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest()) ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest())
jww.TRACE.Printf(im) jww.TRACE.Printf(im)
p.events.Report(1, "MessageReception", "Garbled", im) p.events.Report(1, "MessageReception", "Garbled", im)
return false return false
} }
...@@ -2,9 +2,7 @@ package message ...@@ -2,9 +2,7 @@ package message
import ( import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
...@@ -23,28 +21,14 @@ func TestMain(m *testing.M) { ...@@ -23,28 +21,14 @@ func TestMain(m *testing.M) {
os.Exit(m.Run()) os.Exit(m.Run())
} }
type TestListener struct {
ch chan bool
}
// Hear is called to exercise the listener, passing in the data as an item.
func (l TestListener) Hear(item message.Receive) {
l.ch <- true
}
// Name returns a name; used for debugging.
func (l TestListener) Name() string {
return "TEST LISTENER FOR GARBLED MESSAGES"
}
func Test_pickup_CheckInProgressMessages(t *testing.T) { func Test_pickup_CheckInProgressMessages(t *testing.T) {
kv := versioned.NewKV(make(ekv.Memstore)) kv := versioned.NewKV(make(ekv.Memstore))
p := NewHandler(params.Network{Messages: params.Messages{ p := NewHandler(Params{
MessageReceptionBuffLen: 20, MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20, MessageReceptionWorkerPoolSize: 20,
MaxChecksInProcessMessage: 20, MaxChecksInProcessMessage: 20,
InProcessMessageWait: time.Hour, InProcessMessageWait: time.Hour,
}}, kv, nil).(*handler) }, kv, nil).(*handler)
msg := makeTestFormatMessages(1)[0] msg := makeTestFormatMessages(1)[0]
cid := id.NewIdFromString("clientID", id.User, t) cid := id.NewIdFromString("clientID", id.User, t)
...@@ -56,7 +40,7 @@ func Test_pickup_CheckInProgressMessages(t *testing.T) { ...@@ -56,7 +40,7 @@ func Test_pickup_CheckInProgressMessages(t *testing.T) {
} }
p.inProcess.Add(msg, p.inProcess.Add(msg,
&pb.RoundInfo{ID: 1, Timestamps: []uint64{0, 1, 2, 3}}, &pb.RoundInfo{ID: 1, Timestamps: []uint64{0, 1, 2, 3}},
interfaces.EphemeralIdentity{Source: cid}) receptionID.EphemeralIdentity{Source: cid})
stop := stoppable.NewSingle("stop") stop := stoppable.NewSingle("stop")
go p.recheckInProgressRunner(stop) go p.recheckInProgressRunner(stop)
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
"encoding/json" "encoding/json"
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/utility"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
...@@ -131,7 +131,8 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) ( ...@@ -131,7 +131,8 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (
*MeteredCmixMessageBuffer, error) { *MeteredCmixMessageBuffer, error) {
mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key) mb, err := utility.LoadMessageBuffer(kv, &meteredCmixMessageHandler{}, key)
if err != nil { if err != nil {
jww.WARN.Printf("Failed to find MeteredCmixMessageBuffer %s, making a new one", key) jww.WARN.Printf(
"Failed to find MeteredCmixMessageBuffer %s, making a new one", key)
return NewMeteredCmixMessageBuffer(kv, key) return NewMeteredCmixMessageBuffer(kv, key)
} }
...@@ -139,10 +140,10 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) ( ...@@ -139,10 +140,10 @@ func NewOrLoadMeteredCmixMessageBuffer(kv *versioned.KV, key string) (
} }
func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo,
identity interfaces.EphemeralIdentity) (uint, time.Time) { identity receptionID.EphemeralIdentity) (uint, time.Time) {
if m.GetPrimeByteLen() == 0 { if m.GetPrimeByteLen() == 0 {
jww.FATAL.Panicf("Cannot handle a metered " + jww.FATAL.Panic(
"cmix message with a length of 0") "Cannot handle a metered cMix message with a length of 0.")
} }
msg := buildMsg(m, ri, identity) msg := buildMsg(m, ri, identity)
...@@ -153,10 +154,10 @@ func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo, ...@@ -153,10 +154,10 @@ func (mcmb *MeteredCmixMessageBuffer) Add(m format.Message, ri *pb.RoundInfo,
} }
func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message,
ri *pb.RoundInfo, identity interfaces.EphemeralIdentity) (uint, time.Time) { ri *pb.RoundInfo, identity receptionID.EphemeralIdentity) (uint, time.Time) {
if m.GetPrimeByteLen() == 0 { if m.GetPrimeByteLen() == 0 {
jww.FATAL.Panicf("Cannot handle a metered " + jww.FATAL.Panic(
"cmix message with a length of 0") "Cannot handle a metered cMix message with a length of 0.")
} }
msg := buildMsg(m, ri, identity) msg := buildMsg(m, ri, identity)
...@@ -167,68 +168,71 @@ func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message, ...@@ -167,68 +168,71 @@ func (mcmb *MeteredCmixMessageBuffer) AddProcessing(m format.Message,
} }
func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo, func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, *pb.RoundInfo,
interfaces.EphemeralIdentity, bool) { receptionID.EphemeralIdentity, bool) {
m, ok := mcmb.mb.Next() m, ok := mcmb.mb.Next()
if !ok { if !ok {
return format.Message{}, nil, interfaces.EphemeralIdentity{}, false return format.Message{}, nil, receptionID.EphemeralIdentity{}, false
} }
msg := m.(meteredCmixMessage) msg := m.(meteredCmixMessage)
// increment the count and save // Increment the count and save
msg.Count++ msg.Count++
mcmh := &meteredCmixMessageHandler{} mcmh := &meteredCmixMessageHandler{}
err := mcmh.SaveMessage(mcmb.kv, msg, err := mcmh.SaveMessage(mcmb.kv, msg,
utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg))) utility.MakeStoredMessageKey(mcmb.key, mcmh.HashMessage(msg)))
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to save metered message after count "+ jww.FATAL.Panicf(
"update: %s", err) "Failed to save metered message after count update: %s", err)
} }
msfFormat, err := format.Unmarshal(msg.M) msfFormat, err := format.Unmarshal(msg.M)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to unmarshal message after count "+ jww.FATAL.Panicf(
"update: %s", err) "Failed to unmarshal message after count update: %s", err)
} }
ri := &pb.RoundInfo{} ri := &pb.RoundInfo{}
err = proto.Unmarshal(msg.Ri, ri) err = proto.Unmarshal(msg.Ri, ri)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to unmarshal round info from msg format") jww.FATAL.Panicf(
"Failed to unmarshal round info from msg format: %s", err)
} }
identity := interfaces.EphemeralIdentity{} identity := receptionID.EphemeralIdentity{}
err = json.Unmarshal(msg.Identity, &identity) err = json.Unmarshal(msg.Identity, &identity)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to unmarshal identity from msg format") jww.FATAL.Panicf(
"Failed to unmarshal identity from msg format: %s", err)
} }
return msfFormat, ri, identity, true return msfFormat, ri, identity, true
} }
func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo, func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message, ri *pb.RoundInfo,
identity interfaces.EphemeralIdentity) { identity receptionID.EphemeralIdentity) {
mcmb.mb.Succeeded(buildMsg(m, ri, identity)) mcmb.mb.Succeeded(buildMsg(m, ri, identity))
} }
func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo, func (mcmb *MeteredCmixMessageBuffer) Failed(m format.Message, ri *pb.RoundInfo,
identity interfaces.EphemeralIdentity) { identity receptionID.EphemeralIdentity) {
mcmb.mb.Failed(buildMsg(m, ri, identity)) mcmb.mb.Failed(buildMsg(m, ri, identity))
} }
func buildMsg(m format.Message, ri *pb.RoundInfo, identity interfaces.EphemeralIdentity) meteredCmixMessage { func buildMsg(m format.Message, ri *pb.RoundInfo,
identity receptionID.EphemeralIdentity) meteredCmixMessage {
if m.GetPrimeByteLen() == 0 { if m.GetPrimeByteLen() == 0 {
jww.FATAL.Panicf("Cannot handle a metered " + jww.FATAL.Panic(
"cmix message with a length of 0") "Cannot handle a metered cMix message with a length of 0.")
} }
riMarshal, err := proto.Marshal(ri) riMarshal, err := proto.Marshal(ri)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to marshal round info") jww.FATAL.Panicf("Failed to marshal round info: %s", err)
} }
identityMarshal, err := json.Marshal(&identity) identityMarshal, err := json.Marshal(&identity)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to marshal identity") jww.FATAL.Panicf("Failed to marshal identity: %s", err)
} }
return meteredCmixMessage{ return meteredCmixMessage{
......
...@@ -10,7 +10,7 @@ package message ...@@ -10,7 +10,7 @@ package message
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/identity/receptionID"
"gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/utility"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
...@@ -137,10 +137,10 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) { ...@@ -137,10 +137,10 @@ func Test_meteredCmixMessageHandler_Smoke(t *testing.T) {
// AddFingerprint two messages // AddFingerprint two messages
mcmb.Add(testMsgs[0], mcmb.Add(testMsgs[0],
&pb.RoundInfo{ID: 1, Timestamps: []uint64{0, 1, 2, 3}}, &pb.RoundInfo{ID: 1, Timestamps: []uint64{0, 1, 2, 3}},
interfaces.EphemeralIdentity{Source: id.NewIdFromString("user1", id.User, t)}) receptionID.EphemeralIdentity{Source: id.NewIdFromString("user1", id.User, t)})
mcmb.Add(testMsgs[1], mcmb.Add(testMsgs[1],
&pb.RoundInfo{ID: 2, Timestamps: []uint64{0, 1, 2, 3}}, &pb.RoundInfo{ID: 2, Timestamps: []uint64{0, 1, 2, 3}},
interfaces.EphemeralIdentity{Source: id.NewIdFromString("user2", id.User, t)}) receptionID.EphemeralIdentity{Source: id.NewIdFromString("user2", id.User, t)})
msg, ri, identity, exists := mcmb.Next() msg, ri, identity, exists := mcmb.Next()
if !exists { if !exists {
......
...@@ -57,7 +57,8 @@ func NewHandler(param Params, kv *versioned.KV, events event.Manager) Handler { ...@@ -57,7 +57,8 @@ func NewHandler(param Params, kv *versioned.KV, events event.Manager) Handler {
garbled, err := NewOrLoadMeteredCmixMessageBuffer(kv, inProcessKey) garbled, err := NewOrLoadMeteredCmixMessageBuffer(kv, inProcessKey)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to load or new the Garbled Messages system: %v", err) jww.FATAL.Panicf(
"Failed to load or new the Garbled Messages system: %v", err)
} }
m := handler{ m := handler{
......
...@@ -7,13 +7,13 @@ import ( ...@@ -7,13 +7,13 @@ import (
) )
type Processor interface { type Processor interface {
// Process decrypts and hands off the message to its internal down // Process decrypts and hands off the message to its internal down stream
// stream message processing system. // message processing system.
// CRITICAL: Fingerprints should never be used twice. Process must // CRITICAL: Fingerprints should never be used twice. Process must denote,
// denote, in long term storage, usage of a fingerprint and that // in long term storage, usage of a fingerprint and that fingerprint must
// fingerprint must not be added again during application load. // not be added again during application load.
// It is a security vulnerability to reuse a fingerprint. It leaks // It is a security vulnerability to reuse a fingerprint. It leaks privacy
// privacy and can lead to compromise of message contents and integrity. // and can lead to compromise of message contents and integrity.
Process(message format.Message, receptionID receptionID.EphemeralIdentity, Process(message format.Message, receptionID receptionID.EphemeralIdentity,
round *mixmessages.RoundInfo) round *mixmessages.RoundInfo)
} }
...@@ -7,13 +7,13 @@ import ( ...@@ -7,13 +7,13 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// GetDefaultService is used to generate a default service. All identities // GetDefaultService is used to generate a default service. All identities will
// will respond to their default service, but it lacks privacy because it // respond to their default service, but it lacks privacy because it uses the
// uses the public ID as the key. Used for initial reach out in some protocols, // public ID as the key. Used for initial reach out in some protocols, otherwise
// otherwise should not be used // should not be used.
func GetDefaultService(recipient *id.ID) Service { func GetDefaultService(recipient *id.ID) Service {
jww.WARN.Printf("Generating Default Service for %s - "+ jww.WARN.Printf(
"may not be private", recipient) "Generating Default Service for %s - may not be private", recipient)
return Service{ return Service{
Identifier: recipient[:], Identifier: recipient[:],
Tag: sih.Default, Tag: sih.Default,
...@@ -21,9 +21,9 @@ func GetDefaultService(recipient *id.ID) Service { ...@@ -21,9 +21,9 @@ func GetDefaultService(recipient *id.ID) Service {
} }
} }
// GetRandomService is used to make a servivce for cMix sending when no // GetRandomService is used to make a service for cMix sending when no service
// service is needed. It fills the Identifier with random, bits in order to // is needed. It fills the Identifier with random, bits in order to preserve
// preserve privacy // privacy.
func GetRandomService(rng csprng.Source) Service { func GetRandomService(rng csprng.Source) Service {
identifier := make([]byte, 32) identifier := make([]byte, 32)
if _, err := rng.Read(identifier); err != nil { if _, err := rng.Read(identifier); err != nil {
......
...@@ -10,10 +10,10 @@ import ( ...@@ -10,10 +10,10 @@ import (
type Service struct { type Service struct {
Identifier []byte Identifier []byte
Tag string Tag string
Source []byte //optional metadata field, only used on reception Source []byte // Optional metadata field, only used on reception
//private field for lazy evaluation of preimage // Private field for lazy evaluation of preimage
//Nil denotes not yet evaluated // A value of nil denotes not yet evaluated
lazyPreimage *sih.Preimage lazyPreimage *sih.Preimage
} }
...@@ -28,7 +28,6 @@ func (si Service) HashFromMessageHash(messageHash []byte) []byte { ...@@ -28,7 +28,6 @@ func (si Service) HashFromMessageHash(messageHash []byte) []byte {
} }
func (si Service) preimage() sih.Preimage { func (si Service) preimage() sih.Preimage {
// calculate
if si.lazyPreimage == nil { if si.lazyPreimage == nil {
p := sih.MakePreimage(si.Identifier, si.Tag) p := sih.MakePreimage(si.Identifier, si.Tag)
si.lazyPreimage = &p si.lazyPreimage = &p
......
...@@ -7,9 +7,9 @@ import ( ...@@ -7,9 +7,9 @@ import (
type ServicesTracker func(ServiceList) type ServicesTracker func(ServiceList)
// TrackServices adds a service tracker to be triggered when a nee service // TrackServices adds a service tracker to be triggered when a new service is
// as added. Generally used for notificatiosn to use this system to identify a // added. Generally used for notification to use this system to identify a
// received message // received message.
func (sm *ServicesManager) TrackServices(tracker ServicesTracker) { func (sm *ServicesManager) TrackServices(tracker ServicesTracker) {
if tracker == nil { if tracker == nil {
return return
...@@ -20,8 +20,8 @@ func (sm *ServicesManager) TrackServices(tracker ServicesTracker) { ...@@ -20,8 +20,8 @@ func (sm *ServicesManager) TrackServices(tracker ServicesTracker) {
sm.trackers = append(sm.trackers, tracker) sm.trackers = append(sm.trackers, tracker)
} }
// triggerServiceTracking triggers the tracking of services. // triggerServiceTracking triggers the tracking of services. Is it called when a
// Is it called when a service is added or removed. // service is added or removed.
func (sm *ServicesManager) triggerServiceTracking() { func (sm *ServicesManager) triggerServiceTracking() {
if len(sm.trackers) == 0 { if len(sm.trackers) == 0 {
return return
...@@ -44,15 +44,15 @@ func (sm *ServicesManager) triggerServiceTracking() { ...@@ -44,15 +44,15 @@ func (sm *ServicesManager) triggerServiceTracking() {
// The ServiceList holds all services. // The ServiceList holds all services.
type ServiceList map[id.ID][]Service type ServiceList map[id.ID][]Service
type slMarshled struct { type slMarshaled struct {
Id id.ID Id id.ID
Services []Service Services []Service
} }
func (sl ServiceList) MarshalJSON() ([]byte, error) { func (sl ServiceList) MarshalJSON() ([]byte, error) {
slList := make([]slMarshled, 0, len(sl)) slList := make([]slMarshaled, 0, len(sl))
for uid, s := range sl { for uid, s := range sl {
slList = append(slList, slMarshled{ slList = append(slList, slMarshaled{
Id: uid, Id: uid,
Services: s, Services: s,
}) })
...@@ -61,7 +61,7 @@ func (sl ServiceList) MarshalJSON() ([]byte, error) { ...@@ -61,7 +61,7 @@ func (sl ServiceList) MarshalJSON() ([]byte, error) {
} }
func (sl ServiceList) UnmarshalJSON(b []byte) error { func (sl ServiceList) UnmarshalJSON(b []byte) error {
slList := make([]slMarshled, 0) slList := make([]slMarshaled, 0)
if err := json.Unmarshal(b, &slList); err != nil { if err := json.Unmarshal(b, &slList); err != nil {
return err return err
} }
......
...@@ -16,9 +16,9 @@ import ( ...@@ -16,9 +16,9 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
/* Service Identification Hash - predefined hash based tags appended to /* Service Identification Hash - predefined hash based tags appended to all cMix
all cMix messages which,though trial hashing, are used to determine if a message messages which,though trial hashing, are used to determine if a message applies
applies to this client. to this client.
Services are used for 2 purposes - can be processed by the notification system, Services are used for 2 purposes - can be processed by the notification system,
or can be used to implement custom non fingerprint processing of payloads (i.e. or can be used to implement custom non fingerprint processing of payloads (i.e.
...@@ -63,7 +63,7 @@ func NewServices() *ServicesManager { ...@@ -63,7 +63,7 @@ func NewServices() *ServicesManager {
// fingerprint is received or it has exhausted the map. // fingerprint is received or it has exhausted the map.
// If a match is found, this means the message received is for the client, and // If a match is found, this means the message received is for the client, and
// that one or multiple services exist to process this message. // that one or multiple services exist to process this message.
// These services are returned to the caller along with the a true boolean. // These services are returned to the caller along with a true boolean.
// If the map has been exhausted with no matches found, it returns nil and false. // If the map has been exhausted with no matches found, it returns nil and false.
func (sm *ServicesManager) get(clientID *id.ID, receivedSIH, func (sm *ServicesManager) get(clientID *id.ID, receivedSIH,
ecrMsgContents []byte) ([]Processor, ecrMsgContents []byte) ([]Processor,
...@@ -77,17 +77,18 @@ func (sm *ServicesManager) get(clientID *id.ID, receivedSIH, ...@@ -77,17 +77,18 @@ func (sm *ServicesManager) get(clientID *id.ID, receivedSIH,
return nil, false return nil, false
} }
for _, s := range services { for _, s := range services {
// check if the sih matches this service // Check if the SIH matches this service
if s.ForMe(ecrMsgContents, receivedSIH) { if s.ForMe(ecrMsgContents, receivedSIH) {
// return this service directly if not the default service
if s.defaultList == nil && s.Tag != sih.Default { if s.defaultList == nil && s.Tag != sih.Default {
// Return this service directly if not the default service
return []Processor{s}, true return []Processor{s}, true
// if it is default, and the default list isn't empty,
// return the default list
} else if s.defaultList != nil { } else if s.defaultList != nil {
// If it is default and the default list is not empty, then
// return the default list
return s.defaultList, true return s.defaultList, true
} }
// return false if its for me but i have nothing registered to
// Return false if it is for me, but I have nothing registered to
// respond to default queries // respond to default queries
return []Processor{}, false return []Processor{}, false
} }
...@@ -157,8 +158,8 @@ func (sm *ServicesManager) DeleteService(clientID *id.ID, toDelete Service, ...@@ -157,8 +158,8 @@ func (sm *ServicesManager) DeleteService(clientID *id.ID, toDelete Service,
return return
} }
// do unique handling if this is a default service and there is more // Do unique handling if this is a default service and there is more than
// then one registered // one registered
if services.defaultList != nil && len(services.defaultList) > 1 { if services.defaultList != nil && len(services.defaultList) > 1 {
for i, p := range services.defaultList { for i, p := range services.defaultList {
if p == processor { if p == processor {
......
package message
import (
"gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf"
"testing"
"time"
)
type MockSendCMIXComms struct {
t *testing.T
}
func (mc *MockSendCMIXComms) GetHost(*id.ID) (*connect.Host, bool) {
nid1 := id.NewIdFromString("zezima", id.Node, mc.t)
gwID := nid1.DeepCopy()
gwID.SetType(id.Gateway)
p := connect.GetDefaultHostParams()
p.MaxRetries = 0
p.AuthEnabled = false
h, _ := connect.NewHost(gwID, "0.0.0.0", []byte(""), p)
return h, true
}
func (mc *MockSendCMIXComms) AddHost(
*id.ID, string, []byte, connect.HostParams) (host *connect.Host, err error) {
host, _ = mc.GetHost(nil)
return host, nil
}
func (mc *MockSendCMIXComms) RemoveHost(*id.ID) {
}
func (mc *MockSendCMIXComms) SendPutMessage(*connect.Host,
*mixmessages.GatewaySlot, time.Duration) (*mixmessages.GatewaySlotResponse, error) {
return &mixmessages.GatewaySlotResponse{
Accepted: true,
RoundID: 3,
}, nil
}
func (mc *MockSendCMIXComms) SendPutManyMessages(*connect.Host,
*mixmessages.GatewaySlots, time.Duration) (*mixmessages.GatewaySlotResponse, error) {
return &mixmessages.GatewaySlotResponse{
Accepted: true,
RoundID: 3,
}, nil
}
func getNDF() *ndf.NetworkDefinition {
nodeId := id.NewIdFromString("zezima", id.Node, &testing.T{})
gwId := nodeId.DeepCopy()
gwId.SetType(id.Gateway)
return &ndf.NetworkDefinition{
E2E: ndf.Group{
Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B7A" +
"8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3D" +
"D2AEDF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E78615" +
"75E745D31F8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC" +
"6ADC718DD2A3E041023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C" +
"4A530E8FFB1BC51DADDF453B0B2717C2BC6669ED76B4BDD5C9FF558E88F2" +
"6E5785302BEDBCA23EAC5ACE92096EE8A60642FB61E8F3D24990B8CB12EE" +
"448EEF78E184C7242DD161C7738F32BF29A841698978825B4111B4BC3E1E" +
"198455095958333D776D8B2BEEED3A1A1A221A6E37E664A64B83981C46FF" +
"DDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F278DE8014A47323" +
"631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696015CB79C" +
"3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E63" +
"19BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC3" +
"5873847AEF49F66E43873",
Generator: "2",
},
CMIX: ndf.Group{
Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642" +
"F0B5C48C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757" +
"264E5A1A44FFE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F" +
"9716BFE6117C6B5B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091E" +
"B51743BF33050C38DE235567E1B34C3D6A5C0CEAA1A0F368213C3D19843D" +
"0B4B09DCB9FC72D39C8DE41F1BF14D4BB4563CA28371621CAD3324B6A2D3" +
"92145BEBFAC748805236F5CA2FE92B871CD8F9C36D3292B5509CA8CAA77A" +
"2ADFC7BFD77DDA6F71125A7456FEA153E433256A2261C6A06ED3693797E7" +
"995FAD5AABBCFBE3EDA2741E375404AE25B",
Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E2480" +
"9670716C613D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D" +
"1AA58C4328A06C46A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A33" +
"8661D10461C0D135472085057F3494309FFA73C611F78B32ADBB5740C361" +
"C9F35BE90997DB2014E2EF5AA61782F52ABEB8BD6432C4DD097BC5423B28" +
"5DAFB60DC364E8161F4A2A35ACA3A10B1C4D203CC76A470A33AFDCBDD929" +
"59859ABD8B56E1725252D78EAC66E71BA9AE3F1DD2487199874393CD4D83" +
"2186800654760E1E34C09E4D155179F9EC0DC4473F996BDCE6EED1CABED8" +
"B6F116F7AD9CF505DF0F998E34AB27514B0FFE7",
},
Gateways: []ndf.Gateway{
{
ID: gwId.Marshal(),
Address: "0.0.0.0",
TlsCertificate: "",
},
},
Nodes: []ndf.Node{
{
ID: nodeId.Marshal(),
Address: "0.0.0.0",
TlsCertificate: "",
Status: ndf.Active,
},
},
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment