Skip to content
Snippets Groups Projects
Commit b038dd56 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

moved send to the top level network manager,

integrated critical messages resending,
enabled key exchange
parent c7a6dd14
No related branches found
No related tags found
No related merge requests found
......@@ -16,6 +16,7 @@ import (
"gitlab.com/elixxir/client/context/stoppable"
"gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/keyExchange"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/network/node"
"gitlab.com/elixxir/client/network/permissioning"
......@@ -140,11 +141,14 @@ func (m *manager) startRunners() error {
m.runners.Add(trackNetworkStopper)
// Message reception
m.runners.Add(m.message.StartMessageReceptionWorkerPool())
m.runners.Add(m.message.StartProcessies())
// Round processing
m.runners.Add(m.round.StartProcessors())
// Key exchange
m.runners.Add(keyExchange.Start(m.Context))
return nil
}
......
package message
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/context/message"
"gitlab.com/elixxir/client/context/params"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/context/utility"
"gitlab.com/elixxir/client/storage/e2e"
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/states"
"time"
)
func (m *Manager) processCriticalMessages(quitCh <-chan struct{}) {
done := false
for !done {
select {
case <-quitCh:
done = true
case isHealthy := <-m.networkIsHealthy:
if isHealthy {
m.criticalMessages()
}
}
}
}
func (m *Manager) criticalMessages() {
critMsgs := m.Session.GetCriticalMessages()
//try to send every message in the critical messages buffer in paralell
for msg, param, has := critMsgs.Next(); has; msg, param, has = critMsgs.Next() {
go func(msg message.Send, param params.E2E) {
//send the message
rounds, err := m.sendE2E(msg, param)
//if the message fail to send, notify the buffer so it can be handled
//in the future and exit
if err != nil {
jww.ERROR.Printf("Failed to send critical message on " +
"notification of healthy network")
critMsgs.Failed(msg)
return
}
//wait on the results to make sure the rounds were sucesfull
sendResults := make(chan ds.EventReturn, len(rounds))
roundEvents := m.Instance.GetRoundEvents()
for _, r := range rounds {
roundEvents.AddRoundEventChan(r, sendResults, 1*time.Minute,
states.COMPLETED, states.FAILED)
}
success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds))
if !success {
jww.ERROR.Printf("critical message send failed to transmit "+
"transmit %v/%v paritions: %v round failures, %v timeouts",
numRoundFail+numTimeOut, len(rounds), numRoundFail, numTimeOut)
critMsgs.Failed(msg)
return
}
critMsgs.Succeeded(msg)
}(msg, param)
}
}
......@@ -17,6 +17,7 @@ type Manager struct {
messageReception chan Bundle
nodeRegistration chan network.NodeGateway
networkIsHealthy chan bool
}
func NewManager(internal internal.Internal, param params.Messages,
......@@ -26,6 +27,7 @@ func NewManager(internal internal.Internal, param params.Messages,
param: param,
partitioner: parse.NewPartitioner(dummyMessage.ContentsSize(), internal.Session),
messageReception: make(chan Bundle, param.MessageReceptionBuffLen),
networkIsHealthy: make(chan bool, 1),
nodeRegistration: nodeRegistration,
}
m.Internal = internal
......@@ -38,7 +40,7 @@ func (m *Manager) GetMessageReceptionChannel() chan<- Bundle {
}
//Starts all worker pool
func (m *Manager) StartMessageReceptionWorkerPool() stoppable.Stoppable {
func (m *Manager) StartProcessies() stoppable.Stoppable {
multi := stoppable.NewMulti("MessageReception")
for i := uint(0); i < m.param.MessageReceptionWorkerPoolSize; i++ {
......@@ -47,5 +49,9 @@ func (m *Manager) StartMessageReceptionWorkerPool() stoppable.Stoppable {
multi.Add(stop)
}
critStop := stoppable.NewSingle("Critical Messages Handler")
go m.processCriticalMessages(critStop.Quit())
m.Health.AddChannel(m.networkIsHealthy)
return multi
}
......@@ -13,22 +13,10 @@ import (
"time"
)
// SendCMIX sends a "raw" CMIX message payload to the provided
// recipient. Note that both SendE2E and SendUnsafe call SendCMIX.
// Returns the round ID of the round the payload was sent or an error
// if it fails.
func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
if !m.Health.IsRunning() {
return 0, errors.New("Cannot send cmix message when the " +
"network is not healthy")
}
return m.sendCMIX(msg, param)
}
// Internal send e2e which bypasses the network check, for use in SendE2E and
// SendUnsafe which do their own network checks
func (m *Manager) sendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
timeStart := time.Now()
attempted := set.New()
......
......@@ -17,21 +17,7 @@ import (
"time"
)
// SendE2E sends an end-to-end payload to the provided recipient with
// the provided msgType. Returns the list of rounds in which parts of
// the message were sent or an error if it fails.
func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) (
[]id.Round, error) {
if !m.Health.IsRunning() {
return nil, errors.New("Cannot send e2e message when the " +
"network is not healthy")
}
return m.sendE2E(msg, e2eP)
}
func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error) {
func (m *Manager) SendE2E(msg message.Send, param params.E2E) ([]id.Round, error) {
//timestamp the message
ts := time.Now()
......@@ -75,7 +61,7 @@ func (m *Manager) sendE2E(msg message.Send, param params.E2E) ([]id.Round, error
wg.Add(1)
go func(i int) {
var err error
roundIds[i], err = m.sendCMIX(msgEnc, param.CMIX)
roundIds[i], err = m.SendCMIX(msgEnc, param.CMIX)
if err != nil {
errCh <- err
}
......
......@@ -2,7 +2,6 @@ package message
import (
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/context/message"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/crypto/e2e"
......@@ -12,25 +11,7 @@ import (
"time"
)
// SendUnsafe sends an unencrypted payload to the provided recipient
// with the provided msgType. Returns the list of rounds in which parts
// of the message were sent or an error if it fails.
// NOTE: Do not use this function unless you know what you are doing.
// This function always produces an error message in client logging.
func (m *rounds.Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
if !m.health.IsRunning() {
return nil, errors.New("cannot send unsafe message when the " +
"network is not healthy")
}
jww.WARN.Println("Sending unsafe message. Unsafe payloads have no end" +
" to end encryption, they have limited security and privacy " +
"preserving properties")
return m.sendUnsafe(msg, param)
}
func (m *rounds.Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
//timestamp the message
ts := time.Now()
......@@ -56,7 +37,7 @@ func (m *rounds.Manager) sendUnsafe(msg message.Send, param params.Unsafe) ([]id
wg.Add(1)
go func(i int) {
var err error
roundIds[i], err = m.sendCMIX(msgCmix, param.CMIX)
roundIds[i], err = m.SendCMIX(msgCmix, param.CMIX)
if err != nil {
errCh <- err
}
......
package network
import (
"github.com/pkg/errors"
"gitlab.com/elixxir/client/context/message"
"gitlab.com/elixxir/client/context/params"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
jww "github.com/spf13/jwalterweatherman"
)
// SendCMIX sends a "raw" CMIX message payload to the provided
// recipient. Note that both SendE2E and SendUnsafe call SendCMIX.
// Returns the round ID of the round the payload was sent or an error
// if it fails.
func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, error) {
if !m.m.Health.IsRunning() {
return 0, errors.New("Cannot send cmix message when the " +
"network is not healthy")
}
return m.m.message.SendCMIX(msg, param)
}
// SendUnsafe sends an unencrypted payload to the provided recipient
// with the provided msgType. Returns the list of rounds in which parts
// of the message were sent or an error if it fails.
// NOTE: Do not use this function unless you know what you are doing.
// This function always produces an error message in client logging.
func (m *Manager) SendUnsafe(msg message.Send, param params.Unsafe) ([]id.Round, error) {
if !m.m.Health.IsRunning() {
return nil, errors.New("cannot send unsafe message when the " +
"network is not healthy")
}
jww.WARN.Println("Sending unsafe message. Unsafe payloads have no end" +
" to end encryption, they have limited security and privacy " +
"preserving properties")
return m.SendUnsafe(msg, param)
}
// SendE2E sends an end-to-end payload to the provided recipient with
// the provided msgType. Returns the list of rounds in which parts of
// the message were sent or an error if it fails.
func (m *Manager) SendE2E(msg message.Send, e2eP params.E2E) (
[]id.Round, error) {
if !m.m.Health.IsRunning() {
return nil, errors.New("Cannot send e2e message when the " +
"network is not healthy")
}
return m.m.message.SendE2E(msg, e2eP)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment