Skip to content
Snippets Groups Projects
Commit 91b0914f authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

implemented garbled message handling

parent 5e42cfd3
Branches
Tags
No related merge requests found
package params package params
import "time"
type Messages struct { type Messages struct {
MessageReceptionBuffLen uint MessageReceptionBuffLen uint
MessageReceptionWorkerPoolSize uint MessageReceptionWorkerPoolSize uint
MaxChecksGarbledMessage uint
GarbledMessageWait time.Duration
} }
func GetDefaultMessage() Messages { func GetDefaultMessage() Messages {
return Messages{ return Messages{
MessageReceptionBuffLen: 500, MessageReceptionBuffLen: 500,
MessageReceptionWorkerPoolSize: 4, MessageReceptionWorkerPoolSize: 4,
MaxChecksGarbledMessage: 10,
GarbledMessageWait: 15 * time.Minute,
} }
} }
...@@ -12,7 +12,7 @@ const keyExchangeTriggerName = "KeyExchangeTrigger" ...@@ -12,7 +12,7 @@ const keyExchangeTriggerName = "KeyExchangeTrigger"
const keyExchangeConfirmName = "KeyExchangeConfirm" const keyExchangeConfirmName = "KeyExchangeConfirm"
const keyExchangeMulti = "KeyExchange" const keyExchangeMulti = "KeyExchange"
func Start(ctx *context.Context) stoppable.Stoppable { func Start(ctx *context.Context, garbledMessageTrigger chan<- struct{}) stoppable.Stoppable {
// register the rekey trigger thread // register the rekey trigger thread
triggerCh := make(chan message.Receive, 100) triggerCh := make(chan message.Receive, 100)
...@@ -28,7 +28,7 @@ func Start(ctx *context.Context) stoppable.Stoppable { ...@@ -28,7 +28,7 @@ func Start(ctx *context.Context) stoppable.Stoppable {
}) })
// start the trigger thread // start the trigger thread
go startTrigger(ctx, triggerCh, triggerStop) go startTrigger(ctx, triggerCh, triggerStop, garbledMessageTrigger)
//register the rekey confirm thread //register the rekey confirm thread
confirmCh := make(chan message.Receive, 100) confirmCh := make(chan message.Receive, 100)
......
...@@ -23,18 +23,23 @@ const ( ...@@ -23,18 +23,23 @@ const (
) )
func startTrigger(ctx *context.Context, c chan message.Receive, func startTrigger(ctx *context.Context, c chan message.Receive,
stop *stoppable.Single) { stop *stoppable.Single, garbledMessageTrigger chan<- struct{}) {
for true { for true {
select { select {
case <-stop.Quit(): case <-stop.Quit():
return return
case request := <-c: case request := <-c:
handleTrigger(ctx, request) err := handleTrigger(ctx, request, garbledMessageTrigger)
if err != nil {
jww.ERROR.Printf("Failed to handle rekey trigger: %s",
err)
}
} }
} }
} }
func handleTrigger(ctx *context.Context, request message.Receive) error { func handleTrigger(ctx *context.Context, request message.Receive,
garbledMessageTrigger chan<- struct{}) error {
//ensure the message was encrypted properly //ensure the message was encrypted properly
if request.Encryption != message.E2E { if request.Encryption != message.E2E {
errMsg := fmt.Sprintf(errBadTrigger, request.Sender) errMsg := fmt.Sprintf(errBadTrigger, request.Sender)
...@@ -78,6 +83,14 @@ func handleTrigger(ctx *context.Context, request message.Receive) error { ...@@ -78,6 +83,14 @@ func handleTrigger(ctx *context.Context, request message.Receive) error {
jww.INFO.Printf("New session from Key Exchange Trigger to "+ jww.INFO.Printf("New session from Key Exchange Trigger to "+
"create session %s for partner %s is a duplicate, request ignored", "create session %s for partner %s is a duplicate, request ignored",
session.GetID(), request.Sender) session.GetID(), request.Sender)
} else {
//if the session is new, attempt to trigger garbled message processing
//if there is contention, skip
select {
case garbledMessageTrigger <- struct{}{}:
default:
jww.WARN.Println("Failed to trigger garbled messages")
}
} }
//Send the Confirmation Message //Send the Confirmation Message
......
...@@ -139,7 +139,7 @@ func (m *manager) StartRunners() error { ...@@ -139,7 +139,7 @@ func (m *manager) StartRunners() error {
m.runners.Add(m.round.StartProcessors()) m.runners.Add(m.round.StartProcessors())
// Key exchange // Key exchange
m.runners.Add(keyExchange.Start(m.Context)) m.runners.Add(keyExchange.Start(m.Context, m.message.GetTriggerGarbledCheckChannel()))
return nil return nil
} }
......
package message
import (
"gitlab.com/elixxir/client/context/message"
"time"
)
func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) {
done := false
for !done {
select {
case <-quitCh:
done = true
case <-m.triggerGarbled:
m.handleGarbledMessages()
}
}
}
func (m *Manager) handleGarbledMessages() {
garbledMsgs := m.Session.GetGarbledMessages()
e2eKv := m.Session.E2e()
//try to decrypt every garbled message, excising those who's counts are too high
for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has;
grbldMsg, count, timestamp, has = garbledMsgs.Next() {
fingerprint := grbldMsg.GetKeyFP()
// Check if the key is there, process it if it is
if key, isE2E := e2eKv.PopKey(fingerprint); isE2E {
// Decrypt encrypted message
msg, err := key.Decrypt(grbldMsg)
// get the sender
sender := key.GetSession().GetPartner()
if err == nil {
//remove from the buffer if decryption is successful
garbledMsgs.Remove(grbldMsg)
//handle the successfully decrypted message
xxMsg, ok := m.partitioner.HandlePartition(sender, message.E2E, msg.GetContents())
if ok {
m.Switchboard.Speak(xxMsg)
continue
}
}
}
// fail the message if any part of the decryption fails,
// unless it is our of attempts and has been in the buffer long enough,
// then remove it
if count == m.param.MaxChecksGarbledMessage &&
time.Since(timestamp) > m.param.GarbledMessageWait {
garbledMsgs.Remove(grbldMsg)
} else {
garbledMsgs.Failed(grbldMsg)
}
}
}
...@@ -18,6 +18,7 @@ type Manager struct { ...@@ -18,6 +18,7 @@ type Manager struct {
messageReception chan Bundle messageReception chan Bundle
nodeRegistration chan network.NodeGateway nodeRegistration chan network.NodeGateway
networkIsHealthy chan bool networkIsHealthy chan bool
triggerGarbled chan struct{}
} }
func NewManager(internal internal.Internal, param params.Messages, func NewManager(internal internal.Internal, param params.Messages,
...@@ -28,6 +29,7 @@ func NewManager(internal internal.Internal, param params.Messages, ...@@ -28,6 +29,7 @@ func NewManager(internal internal.Internal, param params.Messages,
partitioner: parse.NewPartitioner(dummyMessage.ContentsSize(), internal.Session), partitioner: parse.NewPartitioner(dummyMessage.ContentsSize(), internal.Session),
messageReception: make(chan Bundle, param.MessageReceptionBuffLen), messageReception: make(chan Bundle, param.MessageReceptionBuffLen),
networkIsHealthy: make(chan bool, 1), networkIsHealthy: make(chan bool, 1),
triggerGarbled: make(chan struct{}, 1),
nodeRegistration: nodeRegistration, nodeRegistration: nodeRegistration,
} }
m.Internal = internal m.Internal = internal
...@@ -39,19 +41,33 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle { ...@@ -39,19 +41,33 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle {
return m.messageReception return m.messageReception
} }
//Gets the channel to send received messages on
func (m *Manager) GetTriggerGarbledCheckChannel() chan<- struct{} {
return m.triggerGarbled
}
//Starts all worker pool //Starts all worker pool
func (m *Manager) StartProcessies() stoppable.Stoppable { func (m *Manager) StartProcessies() stoppable.Stoppable {
multi := stoppable.NewMulti("MessageReception") multi := stoppable.NewMulti("MessageReception")
//create the message reception workers
for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ { for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ {
stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i)) stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i))
go m.processMessages(stop.Quit()) go m.processMessages(stop.Quit())
multi.Add(stop) multi.Add(stop)
} }
critStop := stoppable.NewSingle("Critical Messages Handler") //create the critical messages thread
critStop := stoppable.NewSingle("CriticalMessages")
go m.processCriticalMessages(critStop.Quit()) go m.processCriticalMessages(critStop.Quit())
m.Health.AddChannel(m.networkIsHealthy) m.Health.AddChannel(m.networkIsHealthy)
multi.Add(critStop)
//create the garbled messages thread
garbledStop := stoppable.NewSingle("GarbledMessages")
go m.processGarbledMessages(garbledStop.Quit())
multi.Add(garbledStop)
return multi return multi
} }
...@@ -261,7 +261,7 @@ func next(msgMap map[MessageHash]struct{}) MessageHash { ...@@ -261,7 +261,7 @@ func next(msgMap map[MessageHash]struct{}) MessageHash {
return MessageHash{} return MessageHash{}
} }
// Succeeded sets a messaged as processed and removed it from the buffer. // Remove sets a messaged as processed and removed it from the buffer.
func (mb *MessageBuffer) Succeeded(m interface{}) { func (mb *MessageBuffer) Succeeded(m interface{}) {
h := mb.handler.HashMessage(m) h := mb.handler.HashMessage(m)
......
...@@ -233,7 +233,7 @@ func TestMessageBuffer_Next(t *testing.T) { ...@@ -233,7 +233,7 @@ func TestMessageBuffer_Next(t *testing.T) {
} }
} }
// Tests happy path of MessageBuffer.Succeeded(). // Tests happy path of MessageBuffer.Remove().
func TestMessageBuffer_Succeeded(t *testing.T) { func TestMessageBuffer_Succeeded(t *testing.T) {
th := newTestHandler() th := newTestHandler()
// Create new MessageBuffer and fill with message // Create new MessageBuffer and fill with message
...@@ -254,7 +254,7 @@ func TestMessageBuffer_Succeeded(t *testing.T) { ...@@ -254,7 +254,7 @@ func TestMessageBuffer_Succeeded(t *testing.T) {
_, exists1 := testMB.messages[th.HashMessage(m)] _, exists1 := testMB.messages[th.HashMessage(m)]
_, exists2 := testMB.processingMessages[th.HashMessage(m)] _, exists2 := testMB.processingMessages[th.HashMessage(m)]
if exists1 || exists2 { if exists1 || exists2 {
t.Errorf("Succeeded() did not remove the message from the buffer."+ t.Errorf("Remove() did not remove the message from the buffer."+
"\n\tbuffer: %+v", testMB) "\n\tbuffer: %+v", testMB)
} }
} }
......
...@@ -142,7 +142,7 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, b ...@@ -142,7 +142,7 @@ func (mcmb *MeteredCmixMessageBuffer) Next() (format.Message, uint, time.Time, b
return msfFormat, rtnCnt, msg.Timestamp, true return msfFormat, rtnCnt, msg.Timestamp, true
} }
func (mcmb *MeteredCmixMessageBuffer) Succeeded(m format.Message) { func (mcmb *MeteredCmixMessageBuffer) Remove(m format.Message) {
mcmb.mb.Succeeded(m) mcmb.mb.Succeeded(m)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment