Skip to content
Snippets Groups Projects
Commit ab9e8a09 authored by Jonah Husson's avatar Jonah Husson
Browse files

Minor fixes surrounding garbled messages, add unit test for garbled

parent c8b3f6ba
Branches
Tags
No related merge requests found
......@@ -9,6 +9,7 @@ package message
import (
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/primitives/format"
"time"
)
......@@ -45,6 +46,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) {
func (m *Manager) handleGarbledMessages() {
garbledMsgs := m.Session.GetGarbledMessages()
e2eKv := m.Session.E2e()
var failedMsgs []format.Message
//try to decrypt every garbled message, excising those who's counts are too high
for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has; grbldMsg, count, timestamp, has = garbledMsgs.Next() {
fingerprint := grbldMsg.GetKeyFP()
......@@ -52,9 +54,9 @@ func (m *Manager) handleGarbledMessages() {
if key, isE2E := e2eKv.PopKey(fingerprint); isE2E {
// Decrypt encrypted message
msg, err := key.Decrypt(grbldMsg)
if err == nil {
// get the sender
sender := key.GetSession().GetPartner()
if err == nil {
//remove from the buffer if decryption is successful
garbledMsgs.Remove(grbldMsg)
//handle the successfully decrypted message
......@@ -74,7 +76,10 @@ func (m *Manager) handleGarbledMessages() {
time.Since(timestamp) > m.param.GarbledMessageWait {
garbledMsgs.Remove(grbldMsg)
} else {
garbledMsgs.Failed(grbldMsg)
failedMsgs = append(failedMsgs, grbldMsg)
}
}
for _, grbldMsg := range failedMsgs {
garbledMsgs.Failed(grbldMsg)
}
}
package message
import (
"encoding/binary"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message/parse"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/e2e"
"gitlab.com/elixxir/client/switchboard"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"math/rand"
"testing"
"time"
)
type TestListener struct {
ch chan bool
}
// the Hear function is called to exercise the listener, passing in the
// data as an item
func (l TestListener) Hear(item message.Receive) {
l.ch <- true
}
// Returns a name, used for debugging
func (l TestListener) Name() string {
return "TEST LISTENER FOR GARBLED MESSAGES"
}
func TestManager_CheckGarbledMessages(t *testing.T) {
sess1 := storage.InitTestingSession(t)
sess2 := storage.InitTestingSession(t)
sw := switchboard.New()
l := TestListener{
ch: make(chan bool),
}
sw.RegisterListener(sess2.GetUser().ID, message.Raw, l)
comms, err := client.NewClientComms(sess1.GetUser().ID, nil, nil, nil)
if err != nil {
t.Errorf("Failed to start client comms: %+v", err)
}
i := internal.Internal{
Session: sess1,
Switchboard: sw,
Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
Comms: comms,
Health: nil,
Uid: sess1.GetUser().ID,
Instance: nil,
NodeRegistration: nil,
}
m := NewManager(i, params.Messages{
MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20,
MaxChecksGarbledMessage: 20,
GarbledMessageWait: time.Hour,
}, nil)
e2ekv := i.Session.E2e()
err = e2ekv.AddPartner(sess2.GetUser().ID, sess2.E2e().GetDHPublicKey(), e2ekv.GetDHPrivateKey(), e2e.GetDefaultSessionParams(), e2e.GetDefaultSessionParams())
if err != nil {
t.Errorf("Failed to add e2e partner: %+v", err)
t.FailNow()
}
err = sess2.E2e().AddPartner(sess1.GetUser().ID, sess1.E2e().GetDHPublicKey(), sess2.E2e().GetDHPrivateKey(), e2e.GetDefaultSessionParams(), e2e.GetDefaultSessionParams())
if err != nil {
t.Errorf("Failed to add e2e partner: %+v", err)
t.FailNow()
}
partner1, err := sess2.E2e().GetPartner(sess1.GetUser().ID)
if err != nil {
t.Errorf("Failed to get partner: %+v", err)
t.FailNow()
}
msg := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen())
key, err := partner1.GetKeyForSending(params.Standard)
if err != nil {
t.Errorf("failed to get key: %+v", err)
t.FailNow()
}
contents := make([]byte, msg.ContentsSize())
prng := rand.New(rand.NewSource(42))
prng.Read(contents)
fmp := parse.FirstMessagePartFromBytes(contents)
binary.BigEndian.PutUint32(fmp.Type, uint32(message.Raw))
fmp.NumParts[0] = uint8(1)
binary.BigEndian.PutUint16(fmp.Len, 256)
fmp.Part[0] = 0
ts, err := time.Now().MarshalBinary()
if err != nil {
t.Errorf("failed to martial ts: %+v", err)
}
copy(fmp.Timestamp, ts)
msg.SetContents(fmp.Bytes())
encryptedMsg := key.Encrypt(msg)
i.Session.GetGarbledMessages().Add(encryptedMsg)
quitch := make(chan struct{})
go m.processGarbledMessages(quitch)
m.CheckGarbledMessages()
ticker := time.NewTicker(time.Second)
select {
case <-ticker.C:
t.Error("Didn't hear anything")
case <-l.ch:
t.Log("Heard something")
}
}
......@@ -335,5 +335,13 @@ func InitTestingSession(i interface{}) *Session {
globals.Log.FATAL.Panicf("InitTestingSession failed to create dummy critical messages: %+v", err)
}
s.garbledMessages, err = utility.NewMeteredCmixMessageBuffer(s.kv, garbledMessagesKey)
if err != nil {
globals.Log.FATAL.Panicf("Failed to create garbledMessages buffer: %+v")
}
s.conversations = conversation.NewStore(s.kv)
s.partition = partition.New(s.kv)
return s
}
......@@ -257,6 +257,7 @@ func (mb *MessageBuffer) Next() (interface{}, bool) {
if err != nil {
jww.FATAL.Panicf("Could not load message: %v", err)
}
return m, true
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment