Skip to content
Snippets Groups Projects
Commit 7bd7001c authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

Merge branch 'noNilMesageIDs' into 'project/Channels'

modified sent code to hand random instead of nil ids to the layer below

See merge request !402
parents 114e6dba 2b243d52
No related branches found
No related tags found
4 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!402modified sent code to hand random instead of nil ids to the layer below,!340Project/channels
...@@ -143,7 +143,7 @@ func setupManager(identity cryptoChannel.PrivateIdentity, kv *versioned.KV, ...@@ -143,7 +143,7 @@ func setupManager(identity cryptoChannel.PrivateIdentity, kv *versioned.KV,
m.events = initEvents(model) m.events = initEvents(model)
m.st = loadSendTracker(net, kv, m.events.triggerEvent, m.st = loadSendTracker(net, kv, m.events.triggerEvent,
m.events.triggerAdminEvent, model.UpdateSentStatus) m.events.triggerAdminEvent, model.UpdateSentStatus, rng)
m.loadChannels() m.loadChannels()
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
cryptoChannel "gitlab.com/elixxir/crypto/channel" cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -66,6 +67,8 @@ type sendTracker struct { ...@@ -66,6 +67,8 @@ type sendTracker struct {
net Client net Client
kv *versioned.KV kv *versioned.KV
rngSrc *fastRNG.StreamGenerator
} }
// messageReceiveFunc is a function type for sendTracker.MessageReceive so it // messageReceiveFunc is a function type for sendTracker.MessageReceive so it
...@@ -76,8 +79,8 @@ type messageReceiveFunc func(messageID cryptoChannel.MessageID, r rounds.Round) ...@@ -76,8 +79,8 @@ type messageReceiveFunc func(messageID cryptoChannel.MessageID, r rounds.Round)
// function with the cmix client, delayed on when the network goes healthy, // function with the cmix client, delayed on when the network goes healthy,
// which will attempt to discover the status of all rounds that are outstanding. // which will attempt to discover the status of all rounds that are outstanding.
func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc, func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc,
adminTrigger triggerAdminEventFunc, adminTrigger triggerAdminEventFunc, updateStatus updateStatusFunc,
updateStatus updateStatusFunc) *sendTracker { rngSource *fastRNG.StreamGenerator) *sendTracker {
st := &sendTracker{ st := &sendTracker{
byRound: make(map[id.Round][]*tracked), byRound: make(map[id.Round][]*tracked),
byMessageID: make(map[cryptoChannel.MessageID]*tracked), byMessageID: make(map[cryptoChannel.MessageID]*tracked),
...@@ -87,6 +90,7 @@ func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc, ...@@ -87,6 +90,7 @@ func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc,
updateStatus: updateStatus, updateStatus: updateStatus,
net: net, net: net,
kv: kv, kv: kv,
rngSrc: rngSource,
} }
/*if err := st.load(); !kv.Exists(err){ /*if err := st.load(); !kv.Exists(err){
...@@ -208,8 +212,19 @@ func (st *sendTracker) denotePendingSend(channelID *id.ID, ...@@ -208,8 +212,19 @@ func (st *sendTracker) denotePendingSend(channelID *id.ID,
return 0, err return 0, err
} }
// create a random message id so there will not be collisions in a database
// that requires a unique message ID
stream := st.rngSrc.GetStream()
randMid := cryptoChannel.MessageID{}
num, err := stream.Read(randMid[:])
if num != len(randMid[:]) || err != nil {
jww.FATAL.Panicf("failed to get a random message ID, read "+
"len: %d, err: %+v", num, err)
}
stream.Close()
// track the message on disk // track the message on disk
st.handleDenoteSend(uuid, channelID, cryptoChannel.MessageID{}, st.handleDenoteSend(uuid, channelID, randMid,
rounds.Round{}) rounds.Round{})
return uuid, nil return uuid, nil
} }
...@@ -227,11 +242,23 @@ func (st *sendTracker) denotePendingAdminSend(channelID *id.ID, ...@@ -227,11 +242,23 @@ func (st *sendTracker) denotePendingAdminSend(channelID *id.ID,
receptionID.EphemeralIdentity{}, receptionID.EphemeralIdentity{},
rounds.Round{}, Unsent) rounds.Round{}, Unsent)
// track the message on disk
if err != nil { if err != nil {
return 0, err return 0, err
} }
st.handleDenoteSend(uuid, channelID, cryptoChannel.MessageID{},
// create a random message id so there will not be collisions in a database
// that requires a unique message ID
stream := st.rngSrc.GetStream()
randMid := cryptoChannel.MessageID{}
num, err := stream.Read(randMid[:])
if num != len(randMid[:]) || err != nil {
jww.FATAL.Panicf("failed to get a random message ID, read "+
"len: %d, err: %+v", num, err)
}
stream.Close()
// track the message on disk
st.handleDenoteSend(uuid, channelID, randMid,
rounds.Round{}) rounds.Round{})
return uuid, nil return uuid, nil
} }
......
...@@ -7,8 +7,10 @@ import ( ...@@ -7,8 +7,10 @@ import (
"gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/client/storage/versioned"
cryptoChannel "gitlab.com/elixxir/crypto/channel" cryptoChannel "gitlab.com/elixxir/crypto/channel"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/ekv" "gitlab.com/elixxir/ekv"
"gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
...@@ -67,7 +69,9 @@ func TestSendTracker_MessageReceive(t *testing.T) { ...@@ -67,7 +69,9 @@ func TestSendTracker_MessageReceive(t *testing.T) {
cid := id.NewIdFromString("channel", id.User, t) cid := id.NewIdFromString("channel", id.User, t)
st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus) crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus, crng)
mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid)
process := st.MessageReceive(mid, r) process := st.MessageReceive(mid, r)
...@@ -140,7 +144,9 @@ func TestSendTracker_failedSend(t *testing.T) { ...@@ -140,7 +144,9 @@ func TestSendTracker_failedSend(t *testing.T) {
triggerCh <- status triggerCh <- status
} }
st := loadSendTracker(&mockClient{}, kv, nil, adminTrigger, updateStatus) crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
st := loadSendTracker(&mockClient{}, kv, nil, adminTrigger, updateStatus, crng)
cid := id.NewIdFromString("channel", id.User, t) cid := id.NewIdFromString("channel", id.User, t)
mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid)
...@@ -206,7 +212,9 @@ func TestSendTracker_send(t *testing.T) { ...@@ -206,7 +212,9 @@ func TestSendTracker_send(t *testing.T) {
triggerCh <- true triggerCh <- true
} }
st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus) crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
st := loadSendTracker(&mockClient{}, kv, trigger, nil, updateStatus, crng)
cid := id.NewIdFromString("channel", id.User, t) cid := id.NewIdFromString("channel", id.User, t)
mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid)
...@@ -265,7 +273,9 @@ func TestSendTracker_send(t *testing.T) { ...@@ -265,7 +273,9 @@ func TestSendTracker_send(t *testing.T) {
func TestSendTracker_load_store(t *testing.T) { func TestSendTracker_load_store(t *testing.T) {
kv := versioned.NewKV(ekv.MakeMemstore()) kv := versioned.NewKV(ekv.MakeMemstore())
st := loadSendTracker(&mockClient{}, kv, nil, nil, nil) crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
st := loadSendTracker(&mockClient{}, kv, nil, nil, nil, crng)
cid := id.NewIdFromString("channel", id.User, t) cid := id.NewIdFromString("channel", id.User, t)
mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid)
rid := id.Round(2) rid := id.Round(2)
...@@ -275,7 +285,7 @@ func TestSendTracker_load_store(t *testing.T) { ...@@ -275,7 +285,7 @@ func TestSendTracker_load_store(t *testing.T) {
t.Fatalf("Failed to store byRound: %+v", err) t.Fatalf("Failed to store byRound: %+v", err)
} }
st2 := loadSendTracker(&mockClient{}, kv, nil, nil, nil) st2 := loadSendTracker(&mockClient{}, kv, nil, nil, nil, crng)
if len(st2.byRound) != len(st.byRound) { if len(st2.byRound) != len(st.byRound) {
t.Fatalf("byRound was not properly loaded") t.Fatalf("byRound was not properly loaded")
} }
...@@ -294,7 +304,9 @@ func TestRoundResult_callback(t *testing.T) { ...@@ -294,7 +304,9 @@ func TestRoundResult_callback(t *testing.T) {
return 0, nil return 0, nil
} }
st := loadSendTracker(&mockClient{}, kv, trigger, nil, update) crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
st := loadSendTracker(&mockClient{}, kv, trigger, nil, update, crng)
cid := id.NewIdFromString("channel", id.User, t) cid := id.NewIdFromString("channel", id.User, t)
mid := cryptoChannel.MakeMessageID([]byte("hello"), cid) mid := cryptoChannel.MakeMessageID([]byte("hello"), cid)
......
...@@ -152,6 +152,8 @@ func TestSendGeneric(t *testing.T) { ...@@ -152,6 +152,8 @@ func TestSendGeneric(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
m := &manager{ m := &manager{
me: pi, me: pi,
channels: make(map[id.ID]*joinedChannel), channels: make(map[id.ID]*joinedChannel),
...@@ -173,7 +175,7 @@ func TestSendGeneric(t *testing.T) { ...@@ -173,7 +175,7 @@ func TestSendGeneric(t *testing.T) {
return 0, nil return 0, nil
}, func(uuid uint64, messageID cryptoChannel.MessageID, }, func(uuid uint64, messageID cryptoChannel.MessageID,
timestamp time.Time, round rounds.Round, status SentStatus) { timestamp time.Time, round rounds.Round, status SentStatus) {
}), }, crng),
} }
channelID := new(id.ID) channelID := new(id.ID)
...@@ -240,6 +242,8 @@ func TestAdminGeneric(t *testing.T) { ...@@ -240,6 +242,8 @@ func TestAdminGeneric(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
m := &manager{ m := &manager{
channels: make(map[id.ID]*joinedChannel), channels: make(map[id.ID]*joinedChannel),
nicknameManager: &nicknameManager{ nicknameManager: &nicknameManager{
...@@ -260,7 +264,7 @@ func TestAdminGeneric(t *testing.T) { ...@@ -260,7 +264,7 @@ func TestAdminGeneric(t *testing.T) {
return 0, nil return 0, nil
}, func(uuid uint64, messageID cryptoChannel.MessageID, }, func(uuid uint64, messageID cryptoChannel.MessageID,
timestamp time.Time, round rounds.Round, status SentStatus) { timestamp time.Time, round rounds.Round, status SentStatus) {
}), }, crng),
} }
messageType := Text messageType := Text
...@@ -330,6 +334,8 @@ func TestSendMessage(t *testing.T) { ...@@ -330,6 +334,8 @@ func TestSendMessage(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
m := &manager{ m := &manager{
me: pi, me: pi,
channels: make(map[id.ID]*joinedChannel), channels: make(map[id.ID]*joinedChannel),
...@@ -350,7 +356,7 @@ func TestSendMessage(t *testing.T) { ...@@ -350,7 +356,7 @@ func TestSendMessage(t *testing.T) {
return 0, nil return 0, nil
}, func(uuid uint64, messageID cryptoChannel.MessageID, }, func(uuid uint64, messageID cryptoChannel.MessageID,
timestamp time.Time, round rounds.Round, status SentStatus) { timestamp time.Time, round rounds.Round, status SentStatus) {
}), }, crng),
} }
channelID := new(id.ID) channelID := new(id.ID)
...@@ -425,6 +431,8 @@ func TestSendReply(t *testing.T) { ...@@ -425,6 +431,8 @@ func TestSendReply(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
m := &manager{ m := &manager{
me: pi, me: pi,
channels: make(map[id.ID]*joinedChannel), channels: make(map[id.ID]*joinedChannel),
...@@ -445,7 +453,7 @@ func TestSendReply(t *testing.T) { ...@@ -445,7 +453,7 @@ func TestSendReply(t *testing.T) {
return 0, nil return 0, nil
}, func(uuid uint64, messageID cryptoChannel.MessageID, }, func(uuid uint64, messageID cryptoChannel.MessageID,
timestamp time.Time, round rounds.Round, status SentStatus) { timestamp time.Time, round rounds.Round, status SentStatus) {
}), }, crng),
} }
channelID := new(id.ID) channelID := new(id.ID)
...@@ -520,6 +528,8 @@ func TestSendReaction(t *testing.T) { ...@@ -520,6 +528,8 @@ func TestSendReaction(t *testing.T) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
crng := fastRNG.NewStreamGenerator(100, 5, csprng.NewSystemRNG)
m := &manager{ m := &manager{
me: pi, me: pi,
nicknameManager: &nicknameManager{ nicknameManager: &nicknameManager{
...@@ -540,7 +550,7 @@ func TestSendReaction(t *testing.T) { ...@@ -540,7 +550,7 @@ func TestSendReaction(t *testing.T) {
return 0, nil return 0, nil
}, func(uuid uint64, messageID cryptoChannel.MessageID, }, func(uuid uint64, messageID cryptoChannel.MessageID,
timestamp time.Time, round rounds.Round, status SentStatus) { timestamp time.Time, round rounds.Round, status SentStatus) {
}), }, crng),
} }
channelID := new(id.ID) channelID := new(id.ID)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment