Skip to content
Snippets Groups Projects
Commit 1ac1dc9a authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished implementations within the message package

parent b9d5198d
No related branches found
No related tags found
4 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast,!187Xx 3829/triggers
......@@ -14,8 +14,8 @@ import (
type Messages struct {
MessageReceptionBuffLen uint
MessageReceptionWorkerPoolSize uint
MaxChecksGarbledMessage uint
GarbledMessageWait time.Duration
MaxChecksInProcessMessage uint
InProcessMessageWait time.Duration
RealtimeOnly bool
}
......@@ -23,8 +23,8 @@ func GetDefaultMessage() Messages {
return Messages{
MessageReceptionBuffLen: 500,
MessageReceptionWorkerPoolSize: 4,
MaxChecksGarbledMessage: 10,
GarbledMessageWait: 15 * time.Minute,
MaxChecksInProcessMessage: 10,
InProcessMessageWait: 15 * time.Minute,
RealtimeOnly: false,
}
}
......@@ -8,7 +8,7 @@
package message
import (
"gitlab.com/elixxir/client/storage/reception"
"gitlab.com/elixxir/client/interfaces"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
......@@ -19,5 +19,5 @@ type Bundle struct {
RoundInfo *pb.RoundInfo
Messages []format.Message
Finish func()
Identity reception.IdentityUse
Identity interfaces.Identity
}
......@@ -9,74 +9,71 @@ package message
import (
"fmt"
"sync"
"time"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/preimage"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/edge"
"gitlab.com/elixxir/crypto/e2e"
fingerprint2 "gitlab.com/elixxir/crypto/fingerprint"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime"
"sync"
)
func (m *Manager) handleMessages(stop *stoppable.Single) {
func (p *pickup) handleMessages(stop *stoppable.Single) {
for {
select {
case <-stop.Quit():
stop.ToStopped()
return
case bundle := <-m.messageReception:
case bundle := <-p.messageReception:
go func() {
wg := sync.WaitGroup{}
wg.Add(len(bundle.Messages))
for _, msg := range bundle.Messages {
for i := range bundle.Messages {
msg := bundle.Messages[i]
go func() {
m.handleMessage(msg, bundle)
count, ts := p.inProcess.Add(msg, bundle.RoundInfo, bundle.Identity)
wg.Done()
success := p.handleMessage(msg, bundle)
if success {
p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity)
} else {
// fail the message if any part of the decryption fails,
// unless it is the last attempts and has been in the buffer long
// enough, in which case remove it
if count == p.param.MaxChecksInProcessMessage &&
netTime.Since(ts) > p.param.InProcessMessageWait {
p.inProcess.Remove(msg, bundle.RoundInfo, bundle.Identity)
} else {
p.inProcess.Failed(msg, bundle.RoundInfo, bundle.Identity)
}
}
}()
}
wg.Wait()
bundle.Finish()
}()
}
}
}
}
func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) {
func (p *pickup) handleMessage(ecrMsg format.Message, bundle Bundle) bool {
fingerprint := ecrMsg.GetKeyFP()
// msgDigest := ecrMsg.Digest()
identity := bundle.Identity
round := bundle.RoundInfo
// newID := *id.ID{} // todo use new id systme from ticket
// {
// ID id.ID
// ephID ephemeral.Id
// }
//save to garbled
m.garbledStore.Add(ecrMsg)
var receptionID interfaces.Identity
// If we have a fingerprint, process it.
if proc, exists := m.pop(fingerprint); exists {
if proc, exists := p.pop(identity.Source, fingerprint); exists {
proc.Process(ecrMsg, receptionID, round)
m.garbledStore.Remove(ecrMsg)
return
return true
}
triggers, exists := m.get(ecrMsg.GetIdentityFP(), ecrMsg.GetContents())
triggers, exists := p.get(identity.Source, ecrMsg.GetIdentityFP(), ecrMsg.GetContents())
if exists {
for _, t := range triggers {
go t.Process(ecrMsg, receptionID, round)
......@@ -85,8 +82,7 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) {
jww.ERROR.Printf("empty trigger list for %s",
ecrMsg.GetIdentityFP()) // get preimage
}
m.garbledStore.Remove(ecrMsg)
return
return true
} else {
// TODO: delete this else block because it should not be needed.
jww.INFO.Printf("checking backup %v", preimage.MakeDefault(identity.Source))
......@@ -106,7 +102,6 @@ func (m *Manager) handleMessage(ecrMsg format.Message, bundle Bundle) {
im := fmt.Sprintf("Garbled/RAW Message: keyFP: %v, round: %d"+
"msgDigest: %s, not determined to be for client",
ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest())
m.Internal.Events.Report(1, "MessageReception", "Garbled", im)
//denote as active in garbled
m.garbledStore.Failed(ecrMsg)
p.events.Report(1, "MessageReception", "Garbled", im)
return false
}
......@@ -23,9 +23,9 @@ import (
// CheckInProgressMessages triggers rechecking all in progress messages
// if the queue is not full Exposed on the network pickup
func (m *pickup) CheckInProgressMessages() {
func (p *pickup) CheckInProgressMessages() {
select {
case m.checkInProgress <- struct{}{}:
case p.checkInProgress <- struct{}{}:
default:
jww.WARN.Println("Failed to check garbled messages " +
"due to full channel")
......@@ -33,23 +33,23 @@ func (m *pickup) CheckInProgressMessages() {
}
//long running thread which processes messages that need to be checked
func (m *pickup) recheckInProgressRunner(stop *stoppable.Single) {
func (p *pickup) recheckInProgressRunner(stop *stoppable.Single) {
for {
select {
case <-stop.Quit():
stop.ToStopped()
return
case <-m.checkInProgress:
case <-p.checkInProgress:
jww.INFO.Printf("[GARBLE] Checking Garbled messages")
m.recheckInProgress()
p.recheckInProgress()
}
}
}
//handler for a single run of recheck messages
func (m *pickup) recheckInProgress() {
func (p *pickup) recheckInProgress() {
//try to decrypt every garbled message, excising those who's counts are too high
for grbldMsg, ri, identity, has := m.inProcess.Next(); has; grbldMsg, ri, identity, has = m.inProcess.Next() {
for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() {
bundle := Bundle{
Round: id.Round(ri.ID),
RoundInfo: ri,
......@@ -58,7 +58,7 @@ func (m *pickup) recheckInProgress() {
Identity: identity,
}
select {
case m.messageReception <- bundle:
case p.messageReception <- bundle:
default:
jww.WARN.Printf("failed to send bundle, channel full")
......
......@@ -80,8 +80,8 @@ func TestManager_CheckGarbledMessages(t *testing.T) {
m := NewManager(i, params.Network{Messages: params.Messages{
MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20,
MaxChecksGarbledMessage: 20,
GarbledMessageWait: time.Hour,
MaxChecksInProcessMessage: 20,
InProcessMessageWait: time.Hour,
}}, nil, sender)
rng := csprng.NewSystemRNG()
......
......@@ -115,24 +115,24 @@ func NewManager(param params.Network,
}
//Gets the channel to send received messages on
func (m *pickup) GetMessageReceptionChannel() chan<- Bundle {
return m.messageReception
func (p *pickup) GetMessageReceptionChannel() chan<- Bundle {
return p.messageReception
}
//Starts all worker pool
func (m *pickup) StartProcessies() stoppable.Stoppable {
func (p *pickup) StartProcessies() stoppable.Stoppable {
multi := stoppable.NewMulti("MessageReception")
//create the message handler workers
for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ {
for i := uint(0); i < p.param.MessageReceptionWorkerPoolSize; i++ {
stop := stoppable.NewSingle(fmt.Sprintf("MessageReception Worker %v", i))
go m.handleMessages(stop)
go p.handleMessages(stop)
multi.Add(stop)
}
//create the in progress messages thread
garbledStop := stoppable.NewSingle("GarbledMessages")
go m.recheckInProgressRunner(garbledStop)
go p.recheckInProgressRunner(garbledStop)
multi.Add(garbledStop)
return multi
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment