From 91b0914f42dda6ff3b772c48ac1a8a6f803c188f Mon Sep 17 00:00:00 2001
From: Benjamin Wenger <ben@elixxir.ioo>
Date: Tue, 22 Sep 2020 13:45:01 -0700
Subject: [PATCH] implemented garbled message handling

---
 context/params/message.go                   |  6 +++
 network/keyExchange/exchange.go             |  4 +-
 network/keyExchange/trigger.go              | 19 ++++++--
 network/manager.go                          |  2 +-
 network/message/garbled.go                  | 54 +++++++++++++++++++++
 network/message/manager.go                  | 18 ++++++-
 storage/utility/messageBuffer.go            |  2 +-
 storage/utility/messageBuffer_test.go       |  4 +-
 storage/utility/meteredCmixMessageBuffer.go |  2 +-
 9 files changed, 100 insertions(+), 11 deletions(-)
 create mode 100644 network/message/garbled.go

diff --git a/context/params/message.go b/context/params/message.go
index 078d5ca98..2e1f3858f 100644
--- a/context/params/message.go
+++ b/context/params/message.go
@@ -1,13 +1,19 @@
 package params
 
+import "time"
+
 type Messages struct {
 	MessageReceptionBuffLen        uint
 	MessageReceptionWorkerPoolSize uint
+	MaxChecksGarbledMessage        uint
+	GarbledMessageWait             time.Duration
 }
 
 func GetDefaultMessage() Messages {
 	return Messages{
 		MessageReceptionBuffLen:        500,
 		MessageReceptionWorkerPoolSize: 4,
+		MaxChecksGarbledMessage:        10,
+		GarbledMessageWait:             15 * time.Minute,
 	}
 }
diff --git a/network/keyExchange/exchange.go b/network/keyExchange/exchange.go
index 12c425641..9f4ab35c5 100644
--- a/network/keyExchange/exchange.go
+++ b/network/keyExchange/exchange.go
@@ -12,7 +12,7 @@ const keyExchangeTriggerName = "KeyExchangeTrigger"
 const keyExchangeConfirmName = "KeyExchangeConfirm"
 const keyExchangeMulti = "KeyExchange"
 
-func Start(ctx *context.Context) stoppable.Stoppable {
+func Start(ctx *context.Context, garbledMessageTrigger chan<- struct{}) stoppable.Stoppable {
 
 	// register the rekey trigger thread
 	triggerCh := make(chan message.Receive, 100)
@@ -28,7 +28,7 @@ func Start(ctx *context.Context) stoppable.Stoppable {
 		})
 
 	// start the trigger thread
-	go startTrigger(ctx, triggerCh, triggerStop)
+	go startTrigger(ctx, triggerCh, triggerStop, garbledMessageTrigger)
 
 	//register the rekey confirm thread
 	confirmCh := make(chan message.Receive, 100)
diff --git a/network/keyExchange/trigger.go b/network/keyExchange/trigger.go
index fd07c0c6d..c40713e88 100644
--- a/network/keyExchange/trigger.go
+++ b/network/keyExchange/trigger.go
@@ -23,18 +23,23 @@ const (
 )
 
 func startTrigger(ctx *context.Context, c chan message.Receive,
-	stop *stoppable.Single) {
+	stop *stoppable.Single, garbledMessageTrigger chan<- struct{}) {
 	for true {
 		select {
 		case <-stop.Quit():
 			return
 		case request := <-c:
-			handleTrigger(ctx, request)
+			err := handleTrigger(ctx, request, garbledMessageTrigger)
+			if err != nil {
+				jww.ERROR.Printf("Failed to handle rekey trigger: %s",
+					err)
+			}
 		}
 	}
 }
 
-func handleTrigger(ctx *context.Context, request message.Receive) error {
+func handleTrigger(ctx *context.Context, request message.Receive,
+	garbledMessageTrigger chan<- struct{}) error {
 	//ensure the message was encrypted properly
 	if request.Encryption != message.E2E {
 		errMsg := fmt.Sprintf(errBadTrigger, request.Sender)
@@ -78,6 +83,14 @@ func handleTrigger(ctx *context.Context, request message.Receive) error {
 		jww.INFO.Printf("New session from Key Exchange Trigger to "+
 			"create session %s for partner %s is a duplicate, request ignored",
 			session.GetID(), request.Sender)
+	} else {
+		//if the session is new, attempt to trigger garbled message processing
+		//if there is contention, skip
+		select {
+		case garbledMessageTrigger <- struct{}{}:
+		default:
+			jww.WARN.Println("Failed to trigger garbled messages")
+		}
 	}
 
 	//Send the Confirmation Message
diff --git a/network/manager.go b/network/manager.go
index efa775d2a..b39179cbd 100644
--- a/network/manager.go
+++ b/network/manager.go
@@ -139,7 +139,7 @@ func (m *manager) StartRunners() error {
 	m.runners.Add(m.round.StartProcessors())
 
 	// Key exchange
-	m.runners.Add(keyExchange.Start(m.Context))
+	m.runners.Add(keyExchange.Start(m.Context, m.message.GetTriggerGarbledCheckChannel()))
 
 	return nil
 }
diff --git a/network/message/garbled.go b/network/message/garbled.go
new file mode 100644
index 000000000..b9327fe19
--- /dev/null
+++ b/network/message/garbled.go
@@ -0,0 +1,54 @@
+package message
+
+import (
+	"gitlab.com/elixxir/client/context/message"
+	"time"
+)
+
+func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) {
+	done := false
+	for !done {
+		select {
+		case <-quitCh:
+			done = true
+		case <-m.triggerGarbled:
+			m.handleGarbledMessages()
+		}
+	}
+}
+
+func (m *Manager) handleGarbledMessages() {
+	garbledMsgs := m.Session.GetGarbledMessages()
+	e2eKv := m.Session.E2e()
+	//try to decrypt every garbled message, excising those who's counts are too high
+	for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has;
+	grbldMsg, count, timestamp, has = garbledMsgs.Next() {
+		fingerprint := grbldMsg.GetKeyFP()
+		// Check if the key is there, process it if it is
+		if key, isE2E := e2eKv.PopKey(fingerprint); isE2E {
+			// Decrypt encrypted message
+			msg, err := key.Decrypt(grbldMsg)
+			// get the sender
+			sender := key.GetSession().GetPartner()
+			if err == nil {
+				//remove from the buffer if decryption is successful
+				garbledMsgs.Remove(grbldMsg)
+				//handle the successfully decrypted message
+				xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, msg.GetContents())
+				if ok {
+					m.Switchboard.Speak(xxMsg)
+					continue
+				}
+			}
+		}
+		// fail the message if any part of the decryption fails,
+		// unless it is our of attempts and has been in the buffer long enough,
+		// then remove it
+		if count == m.param.MaxChecksGarbledMessage &&
+			time.Since(timestamp) > m.param.GarbledMessageWait {
+			garbledMsgs.Remove(grbldMsg)
+		} else {
+			garbledMsgs.Failed(grbldMsg)
+		}
+	}
+}
diff --git a/network/message/manager.go b/network/message/manager.go
index 6cbfe4b32..6776f8864 100644
--- a/network/message/manager.go
+++ b/network/message/manager.go
@@ -18,6 +18,7 @@ type Manager struct {
 	messageReception chan Bundle
 	nodeRegistration chan network.NodeGateway
 	networkIsHealthy chan bool
+	triggerGarbled   chan struct{}
 }
 
 func NewManager(internal internal.Internal, param params.Messages,
@@ -28,6 +29,7 @@ func NewManager(internal internal.Internal, param params.Messages,
 		partitioner:      parse.NewPartitioner(dummyMessage.ContentsSize(), internal.Session),
 		messageReception: make(chan Bundle, param.MessageReceptionBuffLen),
 		networkIsHealthy: make(chan bool, 1),
+		triggerGarbled:   make(chan struct{}, 1),
 		nodeRegistration: nodeRegistration,
 	}
 	m.Internal = internal
@@ -39,19 +41,33 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle {
 	return m.messageReception
 }
 
+//Gets the channel to send received messages on
+func (m *Manager) GetTriggerGarbledCheckChannel() chan<- struct{} {
+	return m.triggerGarbled
+}
+
 //Starts all worker pool
 func (m *Manager) StartProcessies() stoppable.Stoppable {
 	multi := stoppable.NewMulti("MessageReception")
 
+	//create the message reception workers
 	for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ {
 		stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i))
 		go m.processMessages(stop.Quit())
 		multi.Add(stop)
 	}
 
-	critStop := stoppable.NewSingle("Critical Messages Handler")
+	//create the critical messages thread
+	critStop := stoppable.NewSingle("CriticalMessages")
 	go m.processCriticalMessages(critStop.Quit())
 	m.Health.AddChannel(m.networkIsHealthy)
+	multi.Add(critStop)
+
+	//create the garbled messages thread
+	garbledStop := stoppable.NewSingle("GarbledMessages")
+	go m.processGarbledMessages(garbledStop.Quit())
+	multi.Add(garbledStop)
+
 
 	return multi
 }
diff --git a/storage/utility/messageBuffer.go b/storage/utility/messageBuffer.go
index fe8d376b2..881c7ddaa 100644
--- a/storage/utility/messageBuffer.go
+++ b/storage/utility/messageBuffer.go
@@ -261,7 +261,7 @@ func next(msgMap map[MessageHash]struct{}) MessageHash {
 	return MessageHash{}
 }
 
-// Succeeded sets a messaged as processed and removed it from the buffer.
+// Remove sets a messaged as processed and removed it from the buffer.
 func (mb *MessageBuffer) Succeeded(m interface{}) {
 	h := mb.handler.HashMessage(m)
 
diff --git a/storage/utility/messageBuffer_test.go b/storage/utility/messageBuffer_test.go
index f2ebcd092..212c16ad4 100644
--- a/storage/utility/messageBuffer_test.go
+++ b/storage/utility/messageBuffer_test.go
@@ -233,7 +233,7 @@ func TestMessageBuffer_Next(t *testing.T) {
 	}
 }
 
-// Tests happy path of MessageBuffer.Succeeded().
+// Tests happy path of MessageBuffer.Remove().
 func TestMessageBuffer_Succeeded(t *testing.T) {
 	th := newTestHandler()
 	// Create new MessageBuffer and fill with message
@@ -254,7 +254,7 @@ func TestMessageBuffer_Succeeded(t *testing.T) {
 	_, exists1 := testMB.messages[th.HashMessage(m)]
 	_, exists2 := testMB.processingMessages[th.HashMessage(m)]
 	if exists1 || exists2 {
-		t.Errorf("Succeeded() did not remove the message from the buffer."+
+		t.Errorf("Remove() did not remove the message from the buffer."+
 			"\n\tbuffer: %+v", testMB)
 	}
 }
diff --git a/storage/utility/meteredCmixMessageBuffer.go b/storage/utility/meteredCmixMessageBuffer.go
index 688887f7d..79a621ef0 100644
--- a/storage/utility/meteredCmixMessageBuffer.go
+++ b/storage/utility/meteredCmixMessageBuffer.go
@@ -142,7 +142,7 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, b
 	return msfFormat, rtnCnt, msg.Timestamp, true
 }
 
-func (mcmb *MeteredCmixMessageBuffer) Succeeded(m format.Message) {
+func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message) {
 	mcmb.mb.Succeeded(m)
 }
 
-- 
GitLab