diff --git a/channels/manager.go b/channels/manager.go index cfede7fdf5dbdf327ceb297076e17cd2d14bb367..f3de1b863b589c23f50f04e2f78f9f14c88924d0 100644 --- a/channels/manager.go +++ b/channels/manager.go @@ -143,7 +143,7 @@ func setupManager(identity cryptoChannel.PrivateIdentity, kv *versioned.KV, m.events = initEvents(model) m.st = loadSendTracker(net, kv, m.events.triggerEvent, - m.events.triggerAdminEvent, model.UpdateSentStatus) + m.events.triggerAdminEvent, model.UpdateSentStatus, rng) m.loadChannels() diff --git a/channels/sendTracker.go b/channels/sendTracker.go index 0c6aecb4c5c2f6b21b013700b56b5d6f22f7ec91..1742d52ba4e8dae2d46d09b6aa91e6ab57df4654 100644 --- a/channels/sendTracker.go +++ b/channels/sendTracker.go @@ -16,6 +16,7 @@ import ( "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/storage/versioned" cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/primitives/states" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" @@ -66,6 +67,8 @@ type sendTracker struct { net Client kv *versioned.KV + + rngSrc *fastRNG.StreamGenerator } // messageReceiveFunc is a function type for sendTracker.MessageReceive so it @@ -76,8 +79,8 @@ type messageReceiveFunc func(messageID cryptoChannel.MessageID, r rounds.Round) // function with the cmix client, delayed on when the network goes healthy, // which will attempt to discover the status of all rounds that are outstanding. func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc, - adminTrigger triggerAdminEventFunc, - updateStatus updateStatusFunc) *sendTracker { + adminTrigger triggerAdminEventFunc, updateStatus updateStatusFunc, + rngSource *fastRNG.StreamGenerator) *sendTracker { st := &sendTracker{ byRound: make(map[id.Round][]*tracked), byMessageID: make(map[cryptoChannel.MessageID]*tracked), @@ -87,6 +90,7 @@ func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc, updateStatus: updateStatus, net: net, kv: kv, + rngSrc: rngSource, } /*if err := st.load(); !kv.Exists(err){ @@ -208,8 +212,19 @@ func (st *sendTracker) denotePendingSend(channelID *id.ID, return 0, err } + // create a random message id so there will not be collisions in a database + // that requires a unique message ID + stream := st.rngSrc.GetStream() + randMid := cryptoChannel.MessageID{} + num, err := stream.Read(randMid[:]) + if num != len(randMid[:]) || err != nil { + jww.FATAL.Panicf("failed to get a random message ID, read "+ + "len: %d, err: %+v", num, err) + } + stream.Close() + // track the message on disk - st.handleDenoteSend(uuid, channelID, cryptoChannel.MessageID{}, + st.handleDenoteSend(uuid, channelID, randMid, rounds.Round{}) return uuid, nil } @@ -227,11 +242,23 @@ func (st *sendTracker) denotePendingAdminSend(channelID *id.ID, receptionID.EphemeralIdentity{}, rounds.Round{}, Unsent) - // track the message on disk if err != nil { return 0, err } - st.handleDenoteSend(uuid, channelID, cryptoChannel.MessageID{}, + + // create a random message id so there will not be collisions in a database + // that requires a unique message ID + stream := st.rngSrc.GetStream() + randMid := cryptoChannel.MessageID{} + num, err := stream.Read(randMid[:]) + if num != len(randMid[:]) || err != nil { + jww.FATAL.Panicf("failed to get a random message ID, read "+ + "len: %d, err: %+v", num, err) + } + stream.Close() + + // track the message on disk + st.handleDenoteSend(uuid, channelID, randMid, rounds.Round{}) return uuid, nil } diff --git a/channels/sendTracker_test.go b/channels/sendTracker_test.go index 5210b20e6c0598f5c8bf16e831a9b5b9e3c3dda6..c2062c6d8c514983b2ab8950fb7b7dc41567a0f6 100644 --- a/channels/sendTracker_test.go +++ b/channels/sendTracker_test.go @@ -7,8 +7,10 @@ import ( "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/storage/versioned" cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/ekv" "gitlab.com/elixxir/primitives/states" + "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/netTime" @@ -67,7 +69,9 @@ func TestSendTracker_MessageReceive(t *testing.T) { cid := id.NewIdFromString("channel", id.User, t) - st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus) + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + + st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus, crng) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) process := st.MessageReceive(mid, r) @@ -140,7 +144,9 @@ func TestSendTracker_failedSend(t *testing.T) { triggerCh <- status } - st := loadSendTracker(&mockClient{}, kv, nil, adminTrigger, updateStatus) + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + + st := loadSendTracker(&mockClient{}, kv, nil, adminTrigger, updateStatus, crng) cid := id.NewIdFromString("channel", id.User, t) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) @@ -206,7 +212,9 @@ func TestSendTracker_send(t *testing.T) { triggerCh <- true } - st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus) + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + + st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus, crng) cid := id.NewIdFromString("channel", id.User, t) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) @@ -265,7 +273,9 @@ func TestSendTracker_send(t *testing.T) { func TestSendTracker_load_store(t *testing.T) { kv := versioned.NewKV(ekv.MakeMemstore()) - st := loadSendTracker(&mockClient{}, kv, nil, nil, nil) + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + + st := loadSendTracker(&mockClient{}, kv, nil, nil, nil, crng) cid := id.NewIdFromString("channel", id.User, t) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) rid := id.Round(2) @@ -275,7 +285,7 @@ func TestSendTracker_load_store(t *testing.T) { t.Fatalf("Failed to store byRound: %+v", err) } - st2 := loadSendTracker(&mockClient{}, kv, nil, nil, nil) + st2 := loadSendTracker(&mockClient{}, kv, nil, nil, nil, crng) if len(st2.byRound) != len(st.byRound) { t.Fatalf("byRound was not properly loaded") } @@ -294,7 +304,9 @@ func TestRoundResult_callback(t *testing.T) { return 0, nil } - st := loadSendTracker(&mockClient{}, kv, trigger, nil, update) + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + + st := loadSendTracker(&mockClient{}, kv, trigger, nil, update, crng) cid := id.NewIdFromString("channel", id.User, t) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) diff --git a/channels/send_test.go b/channels/send_test.go index 5da3010922520fe229f918784d72ad32146e59f0..c3c8114c85d69193216c04bf903991f69129917e 100644 --- a/channels/send_test.go +++ b/channels/send_test.go @@ -152,6 +152,8 @@ func TestSendGeneric(t *testing.T) { t.Fatalf(err.Error()) } + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + m := &manager{ me: pi, channels: make(map[id.ID]*joinedChannel), @@ -173,7 +175,7 @@ func TestSendGeneric(t *testing.T) { return 0, nil }, func(uuid uint64, messageID cryptoChannel.MessageID, timestamp time.Time, round rounds.Round, status SentStatus) { - }), + }, crng), } channelID := new(id.ID) @@ -240,6 +242,8 @@ func TestAdminGeneric(t *testing.T) { t.Fatalf(err.Error()) } + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + m := &manager{ channels: make(map[id.ID]*joinedChannel), nicknameManager: &nicknameManager{ @@ -260,7 +264,7 @@ func TestAdminGeneric(t *testing.T) { return 0, nil }, func(uuid uint64, messageID cryptoChannel.MessageID, timestamp time.Time, round rounds.Round, status SentStatus) { - }), + }, crng), } messageType := Text @@ -330,6 +334,8 @@ func TestSendMessage(t *testing.T) { t.Fatalf(err.Error()) } + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + m := &manager{ me: pi, channels: make(map[id.ID]*joinedChannel), @@ -350,7 +356,7 @@ func TestSendMessage(t *testing.T) { return 0, nil }, func(uuid uint64, messageID cryptoChannel.MessageID, timestamp time.Time, round rounds.Round, status SentStatus) { - }), + }, crng), } channelID := new(id.ID) @@ -425,6 +431,8 @@ func TestSendReply(t *testing.T) { t.Fatalf(err.Error()) } + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + m := &manager{ me: pi, channels: make(map[id.ID]*joinedChannel), @@ -445,7 +453,7 @@ func TestSendReply(t *testing.T) { return 0, nil }, func(uuid uint64, messageID cryptoChannel.MessageID, timestamp time.Time, round rounds.Round, status SentStatus) { - }), + }, crng), } channelID := new(id.ID) @@ -520,6 +528,8 @@ func TestSendReaction(t *testing.T) { t.Fatalf(err.Error()) } + crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG) + m := &manager{ me: pi, nicknameManager: &nicknameManager{ @@ -540,7 +550,7 @@ func TestSendReaction(t *testing.T) { return 0, nil }, func(uuid uint64, messageID cryptoChannel.MessageID, timestamp time.Time, round rounds.Round, status SentStatus) { - }), + }, crng), } channelID := new(id.ID)