Skip to content
Snippets Groups Projects
Commit a6b49087 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

implemented critical messages

parent de7e3954
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
"gitlab.com/elixxir/client/api" "gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/interfaces/utility" "gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
ftStorage "gitlab.com/elixxir/client/storage/fileTransfer" ftStorage "gitlab.com/elixxir/client/storage/fileTransfer"
ds "gitlab.com/elixxir/comms/network/dataStructures" ds "gitlab.com/elixxir/comms/network/dataStructures"
...@@ -547,7 +547,7 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error { ...@@ -547,7 +547,7 @@ func (m *Manager) sendEndE2eMessage(recipient *id.ID) error {
} }
// Wait until the result tracking responds // Wait until the result tracking responds
success, numTimeOut, numRoundFail := utility.TrackResults( success, numTimeOut, numRoundFail := network.TrackResults(
sendResults, len(rounds)) sendResults, len(rounds))
// If a single partition of the end file transfer message does not transmit, // If a single partition of the end file transfer message does not transmit,
......
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/interfaces/utility" network2 "gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/e2e" "gitlab.com/elixxir/client/storage/e2e"
...@@ -145,7 +145,7 @@ func negotiate(instance *network.Instance, sendE2E interfaces.SendE2E, ...@@ -145,7 +145,7 @@ func negotiate(instance *network.Instance, sendE2E interfaces.SendE2E,
} }
//Wait until the result tracking responds //Wait until the result tracking responds
success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, success, numRoundFail, numTimeOut := network2.TrackResults(sendResults,
len(rounds)) len(rounds))
// If a single partition of the Key Negotiation request does not // If a single partition of the Key Negotiation request does not
......
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/interfaces/utility" "gitlab.com/elixxir/client/network"
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/e2e" "gitlab.com/elixxir/client/storage/e2e"
...@@ -145,7 +145,7 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, ...@@ -145,7 +145,7 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager,
} }
//Wait until the result tracking responds //Wait until the result tracking responds
success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, success, numRoundFail, numTimeOut := network.TrackResults(sendResults,
len(rounds)) len(rounds))
// If a single partition of the Key Negotiation request does not // If a single partition of the Key Negotiation request does not
// transmit, the partner will not be able to read the confirmation. If // transmit, the partner will not be able to read the confirmation. If
......
...@@ -98,15 +98,20 @@ type CmixMessageBuffer struct { ...@@ -98,15 +98,20 @@ type CmixMessageBuffer struct {
mb *utility.MessageBuffer mb *utility.MessageBuffer
} }
func NewCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { func NewOrLoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) {
cmb, err := LoadCmixMessageBuffer(kv, key)
if err != nil {
mb, err := utility.NewMessageBuffer(kv, &cmixMessageHandler{}, key) mb, err := utility.NewMessageBuffer(kv, &cmixMessageHandler{}, key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &CmixMessageBuffer{mb: mb}, nil return &CmixMessageBuffer{mb: mb}, nil
} }
return cmb, nil
}
func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) { func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, error) {
mb, err := utility.LoadMessageBuffer(kv, &cmixMessageHandler{}, key) mb, err := utility.LoadMessageBuffer(kv, &cmixMessageHandler{}, key)
if err != nil { if err != nil {
...@@ -116,26 +121,31 @@ func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, er ...@@ -116,26 +121,31 @@ func LoadCmixMessageBuffer(kv *versioned.KV, key string) (*CmixMessageBuffer, er
return &CmixMessageBuffer{mb: mb}, nil return &CmixMessageBuffer{mb: mb}, nil
} }
func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID) { func (cmb *CmixMessageBuffer) Add(msg format.Message, recipent *id.ID, params CMIXParams) {
paramBytes, _ := params.MarshalJSON()
sm := storedMessage{ sm := storedMessage{
Msg: msg.Marshal(), Msg: msg.Marshal(),
Recipient: recipent.Marshal(), Recipient: recipent.Marshal(),
Params: paramBytes,
} }
cmb.mb.Add(sm) cmb.mb.Add(sm)
} }
func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID) { func (cmb *CmixMessageBuffer) AddProcessing(msg format.Message, recipent *id.ID,
params CMIXParams) {
paramBytes, _ := params.MarshalJSON()
sm := storedMessage{ sm := storedMessage{
Msg: msg.Marshal(), Msg: msg.Marshal(),
Recipient: recipent.Marshal(), Recipient: recipent.Marshal(),
Params: paramBytes,
} }
cmb.mb.AddProcessing(sm) cmb.mb.AddProcessing(sm)
} }
func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) { func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, CMIXParams, bool) {
m, ok := cmb.mb.Next() m, ok := cmb.mb.Next()
if !ok { if !ok {
return format.Message{}, nil, false return format.Message{}, nil, CMIXParams{}, false
} }
sm := m.(storedMessage) sm := m.(storedMessage)
...@@ -146,10 +156,20 @@ func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) { ...@@ -146,10 +156,20 @@ func (cmb *CmixMessageBuffer) Next() (format.Message, *id.ID, bool) {
} }
recpient, err := id.Unmarshal(sm.Recipient) recpient, err := id.Unmarshal(sm.Recipient)
if err != nil { if err != nil {
jww.FATAL.Panicf("Could nto get an id for stored cmix "+ jww.FATAL.Panicf("Could not get an id for stored cmix "+
"message buffer: %+v", err) "message buffer: %+v", err)
} }
return msg, recpient, true
params := CMIXParams{}
if sm.Params == nil || len(sm.Params) == 0 {
params = GetDefaultCMIXParams()
} else {
if err = params.UnmarshalJSON(sm.Params); err != nil {
jww.FATAL.Panicf("Could not parse the parms for stored CMIX "+
"message buffer: %+v", err)
}
}
return msg, recpient, params, true
} }
func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipent *id.ID) { func (cmb *CmixMessageBuffer) Succeeded(msg format.Message, recipent *id.ID) {
......
package network package network
import ( import (
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/network/health" "gitlab.com/elixxir/client/network/health"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/elixxir/client/storage/versioned"
ds "gitlab.com/elixxir/comms/network/dataStructures"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"time"
) )
type Manager struct { const criticalRawMessagesKey = "CriticalRawMessages"
storage *CmixMessageBuffer
// roundEventRegistrar is an interface for the round events system to allow
// for easy testing
type roundEventRegistrar interface {
AddRoundEventChan(rid id.Round, eventChan chan ds.EventReturn,
timeout time.Duration, validStates ...states.Round) *ds.EventCallback
}
// criticalSender is an anonymous function which takes the data critical
// knows for sending. It should call sendCmixHelper and use scope sharing in an
// anonymous function to include the structures from manager which critical is
// not aware of
type criticalSender func(msg format.Message, recipient *id.ID,
params CMIXParams) (id.Round, ephemeral.Id, error)
//Structure which allows the auto resending of messages that must be received
type critical struct {
*CmixMessageBuffer
roundEvents roundEventRegistrar
trigger chan bool trigger chan bool
hm *health.Monitor send criticalSender
}
func newCritical(kv *versioned.KV, hm health.Monitor,
roundEvents roundEventRegistrar, send criticalSender) *critical {
cm, err := NewOrLoadCmixMessageBuffer(kv, criticalRawMessagesKey)
if err != nil {
jww.FATAL.Panicf("Failed to load the buffer for critical "+
"messages: %v", err)
}
c := &critical{
CmixMessageBuffer: cm,
roundEvents: roundEvents,
trigger: make(chan bool, 100),
send: send,
} }
hm.AddHealthCallback(func(healthy bool) {
c.trigger <- healthy
})
return c
}
func (c *critical) runCriticalMessages(stop *stoppable.Single) {
for {
select {
case <-stop.Quit():
stop.ToStopped()
return
case isHealthy := <-c.trigger:
if isHealthy {
c.evaluate(stop)
}
}
}
}
func (c *critical) handle(msg format.Message, recipient *id.ID, rid id.Round, rtnErr error) {
if rtnErr != nil {
c.Failed(msg, recipient)
} else {
sendResults := make(chan ds.EventReturn, 1)
func NewManager() c.roundEvents.AddRoundEventChan(rid, sendResults, 1*time.Minute,
states.COMPLETED, states.FAILED)
success, numTimeOut, _ := TrackResults(sendResults, 1)
if !success {
if numTimeOut > 0 {
jww.ERROR.Printf("critical raw message resend to %s "+
"(msgDigest: %s) on round %d failed to transmit due to "+
"timeout", recipient, msg.Digest(), rid)
} else {
jww.ERROR.Printf("critical raw message resend to %s "+
"(msgDigest: %s) on round %d failed to transmit due to "+
"send failure", recipient, msg.Digest(), rid)
}
c.Failed(msg, recipient)
return
}
jww.INFO.Printf("Successful resend of critical raw message "+
"to %s (msgDigest: %s) on round %d", recipient, msg.Digest(), rid)
c.Succeeded(msg, recipient)
}
}
// evaluate tries to send every message in the critical messages and the raw critical
// messages buffer in parallel
func (c *critical) evaluate(stop *stoppable.Single) {
for msg, recipient, params, has := c.Next(); has; msg, recipient, params, has = c.Next() {
localRid := recipient.DeepCopy()
go func(msg format.Message, recipient *id.ID, params CMIXParams) {
params.Stop = stop
jww.INFO.Printf("Resending critical raw message to %s "+
"(msgDigest: %s)", recipient, msg.Digest())
//send the message
round, _, err := c.send(msg, recipient, params)
//pass to the handler
c.handle(msg, recipient, round, err)
//wait on the results to make sure the rounds were successful
}(msg, localRid, params)
}
}
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
"math" "math"
"sync/atomic" "sync/atomic"
...@@ -69,6 +70,7 @@ type manager struct { ...@@ -69,6 +70,7 @@ type manager struct {
address.Space address.Space
identity.Tracker identity.Tracker
health.Monitor health.Monitor
crit *critical
// Earliest tracked round // Earliest tracked round
earliestRound *uint64 earliestRound *uint64
...@@ -122,7 +124,6 @@ func NewManager(params Params, comms *client.Comms, session storage.Session, ...@@ -122,7 +124,6 @@ func NewManager(params Params, comms *client.Comms, session storage.Session,
} }
/* set up modules */ /* set up modules */
nodechan := make(chan commNetwork.NodeGateway, nodes.InputChanLen) nodechan := make(chan commNetwork.NodeGateway, nodes.InputChanLen)
// Set up gateway.Sender // Set up gateway.Sender
...@@ -160,8 +161,20 @@ func NewManager(params Params, comms *client.Comms, session storage.Session, ...@@ -160,8 +161,20 @@ func NewManager(params Params, comms *client.Comms, session storage.Session,
// Set upthe ability to register with new nodes when they appear // Set upthe ability to register with new nodes when they appear
m.instance.SetAddGatewayChan(nodechan) m.instance.SetAddGatewayChan(nodechan)
// set up the health monitor
m.Monitor = health.Init(instance, params.NetworkHealthTimeout) m.Monitor = health.Init(instance, params.NetworkHealthTimeout)
//set up critical message tracking (sendCmix only)
critSender := func(msg format.Message, recipient *id.ID,
params CMIXParams) (id.Round, ephemeral.Id, error) {
return sendCmixHelper(m.Sender, msg, recipient, params, m.instance,
m.session.GetCmixGroup(), m.Registrar, m.rng, m.events,
m.session.GetTransmissionID(), m.comms)
}
m.crit = newCritical(session.GetKV(), m.Monitor,
m.instance.GetRoundEvents(), critSender)
// Report health events // Report health events
m.Monitor.AddHealthCallback(func(isHealthy bool) { m.Monitor.AddHealthCallback(func(isHealthy bool) {
m.events.Report(5, "health", "IsHealthy", m.events.Report(5, "health", "IsHealthy",
......
...@@ -2,7 +2,6 @@ package network ...@@ -2,7 +2,6 @@ package network
import ( import (
"encoding/json" "encoding/json"
"gitlab.com/elixxir/client/interfaces"
"gitlab.com/elixxir/client/network/historical" "gitlab.com/elixxir/client/network/historical"
"gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/network/rounds" "gitlab.com/elixxir/client/network/rounds"
...@@ -90,29 +89,34 @@ type CMIXParams struct { ...@@ -90,29 +89,34 @@ type CMIXParams struct {
RoundTries uint RoundTries uint
Timeout time.Duration Timeout time.Duration
RetryDelay time.Duration RetryDelay time.Duration
ExcludedRounds excludedRounds.ExcludedRounds ExcludedRounds excludedRounds.ExcludedRounds `json:"-"`
// Duration to wait before sending on a round times out and a new round is // Duration to wait before sending on a round times out and a new round is
// tried // tried
SendTimeout time.Duration SendTimeout time.Duration
// an alternate identity preimage to use on send. If not set, the default
// for the sending identity will be used
trigger interfaces.Trigger
// Tag which prints with sending logs to help localize the source // Tag which prints with sending logs to help localize the source
// All internal sends are tagged, so the default tag is "External" // All internal sends are tagged, so the default tag is "External"
DebugTag string DebugTag string
//Threading interface, can be used to stop the send early //Threading interface, can be used to stop the send early
Stop *stoppable.Single Stop *stoppable.Single `json:"-"`
//List of nodes to not send to, will skip a round with these //List of nodes to not send to, will skip a round with these
//nodes in it //nodes in it
BlacklistedNodes map[id.ID]interface{} //todo - do not omit this on json
BlacklistedNodes map[id.ID]bool `json:"-"`
// Sets the message as critical. The system will track that the round it
// sends on completes and will auto resend in the event the round fails or
// completion cannot be determined. The sent data will be byte identical,
// so this has a high chance of metadata leak. This system should only be
// used in cases where repeats cannot be different
// Only used in sendCmix, not sendManyCmix
Critical bool
} }
func GetDefaultCMIX() CMIXParams { func GetDefaultCMIXParams() CMIXParams {
return CMIXParams{ return CMIXParams{
RoundTries: 10, RoundTries: 10,
Timeout: 25 * time.Second, Timeout: 25 * time.Second,
...@@ -122,13 +126,17 @@ func GetDefaultCMIX() CMIXParams { ...@@ -122,13 +126,17 @@ func GetDefaultCMIX() CMIXParams {
} }
} }
func (c CMIXParams) Marshal() ([]byte, error) { func (c CMIXParams) MarshalJSON() ([]byte, error) {
return json.Marshal(c) return json.Marshal(c)
} }
func (c CMIXParams) UnmarshalJSON(b []byte) error {
return json.Unmarshal(b, &c)
}
// GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set // GetCMIXParameters func obtains default CMIX parameters, or overrides with given parameters if set
func GetCMIXParameters(params string) (CMIXParams, error) { func GetCMIXParameters(params string) (CMIXParams, error) {
p := GetDefaultCMIX() p := GetDefaultCMIXParams()
if len(params) > 0 { if len(params) > 0 {
err := json.Unmarshal([]byte(params), &p) err := json.Unmarshal([]byte(params), &p)
if err != nil { if err != nil {
......
...@@ -52,8 +52,9 @@ import ( ...@@ -52,8 +52,9 @@ import (
// Will return an error if the network is unhealthy or if it fails to send // Will return an error if the network is unhealthy or if it fails to send
// (along with the reason). Blocks until successful send or err. // (along with the reason). Blocks until successful send or err.
// WARNING: Do not roll your own crypto // WARNING: Do not roll your own crypto
func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, service message.Service, func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint,
payload, mac []byte, cmixParams CMIXParams) (id.Round, ephemeral.Id, error) { service message.Service, payload, mac []byte, cmixParams CMIXParams) (
id.Round, ephemeral.Id, error) {
if !m.Monitor.IsHealthy() { if !m.Monitor.IsHealthy() {
return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " + return 0, ephemeral.Id{}, errors.New("Cannot send cmix message when the " +
"network is not healthy") "network is not healthy")
...@@ -66,9 +67,19 @@ func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, ser ...@@ -66,9 +67,19 @@ func (m *manager) SendCMIX(recipient *id.ID, fingerprint format.Fingerprint, ser
msg.SetMac(mac) msg.SetMac(mac)
msg.SetSIH(service.Hash(msg.GetContents())) msg.SetSIH(service.Hash(msg.GetContents()))
return sendCmixHelper(m.Sender, msg, recipient, cmixParams, m.instance, if cmixParams.Critical {
m.session.GetCmixGroup(), m.Registrar, m.rng, m.events, m.crit.AddProcessing(msg, recipient, cmixParams)
}
rid, ephID, rtnErr := sendCmixHelper(m.Sender, msg, recipient, cmixParams,
m.instance, m.session.GetCmixGroup(), m.Registrar, m.rng, m.events,
m.session.GetTransmissionID(), m.comms) m.session.GetTransmissionID(), m.comms)
if cmixParams.Critical {
m.crit.handle(msg, recipient, rid, rtnErr)
}
return rid, ephID, rtnErr
} }
// Helper function for sendCmix // Helper function for sendCmix
......
...@@ -63,6 +63,7 @@ type TargetedCmixMessage struct { ...@@ -63,6 +63,7 @@ type TargetedCmixMessage struct {
// (panic otherwise). If used, fill with random bits. // (panic otherwise). If used, fill with random bits.
// Will return an error if the network is unhealthy or if it fails to send // Will return an error if the network is unhealthy or if it fails to send
// (along with the reason). Blocks until successful send or err. // (along with the reason). Blocks until successful send or err.
// Does not support Critical Messages
// WARNING: Do not roll your own crypto // WARNING: Do not roll your own crypto
func (m *manager) SendManyCMIX(messages []TargetedCmixMessage, func (m *manager) SendManyCMIX(messages []TargetedCmixMessage,
p CMIXParams) (id.Round, []ephemeral.Id, error) { p CMIXParams) (id.Round, []ephemeral.Id, error) {
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
// LICENSE file // // LICENSE file //
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
package utility package network
import ( import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
......
...@@ -12,7 +12,7 @@ import ( ...@@ -12,7 +12,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/interfaces/utility" "gitlab.com/elixxir/client/network"
ds "gitlab.com/elixxir/comms/network/dataStructures" ds "gitlab.com/elixxir/comms/network/dataStructures"
cAuth "gitlab.com/elixxir/crypto/e2e/auth" cAuth "gitlab.com/elixxir/crypto/e2e/auth"
"gitlab.com/elixxir/crypto/e2e/singleUse" "gitlab.com/elixxir/crypto/e2e/singleUse"
...@@ -101,7 +101,7 @@ func (m *Manager) respondSingleUse(partner Contact, payload []byte, ...@@ -101,7 +101,7 @@ func (m *Manager) respondSingleUse(partner Contact, payload []byte,
} }
// Wait until the result tracking responds // Wait until the result tracking responds
success, numRoundFail, numTimeOut := utility.TrackResults(sendResults, len(roundMap)) success, numRoundFail, numTimeOut := network.TrackResults(sendResults, len(roundMap))
if !success { if !success {
return errors.Errorf("tracking results of %d rounds: %d round "+ return errors.Errorf("tracking results of %d rounds: %d round "+
"failures, %d round event time outs; the send cannot be retried.", "failures, %d round event time outs; the send cannot be retried.",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment