Skip to content
Snippets Groups Projects
Commit 647e9956 authored by Jonah Husson's avatar Jonah Husson
Browse files

Refactor send in cmix to have sendWithCompiler as its base

parent 6452b2d0
No related branches found
No related tags found
7 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!371[Channel RSAtoPrivate] Implement Reverse Asymmetric in Client/Broadcast,!354Channels impl,!340Project/channels,!339Project/channels,!317Xx 3999/send with compiler
......@@ -189,9 +189,15 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
// Set up critical message tracking (sendCmix only)
critSender := func(msg format.Message, recipient *id.ID, params CMIXParams,
) (id.Round, ephemeral.Id, error) {
return sendCmixHelper(c.Sender, msg, recipient, params, c.instance,
// TODO: Does this need to be reworked to take in a message compiler? This has ramifications down the stack in critical messaging
compiler := func(round id.Round) (format.Message, error) {
return msg, nil
}
rid, eid, _, sendErr := sendCmixHelper(c.Sender, compiler, recipient, params, c.instance,
c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
c.session.GetTransmissionID(), c.comms)
return rid, eid, sendErr
}
c.crit = newCritical(c.session.GetKV(), c.Monitor,
......
......@@ -56,14 +56,29 @@ import (
// WARNING: Do not roll your own crypto.
func (c *client) Send(recipient *id.ID, fingerprint format.Fingerprint,
service message.Service, payload, mac []byte, cmixParams CMIXParams) (
id.Round, ephemeral.Id, error) {
compiler := func(rid id.Round) (format.Fingerprint, message.Service, []byte, []byte) {
return fingerprint, service, payload, mac
}
return c.SendWithCompiler(recipient, compiler, cmixParams)
}
type Compiler func(rid id.Round) (fingerprint format.Fingerprint, service message.Service, payload, mac []byte)
type messageCompiler func(rid id.Round) (format.Message, error)
func (c *client) SendWithCompiler(recipient *id.ID, compiler Compiler, cmixParams CMIXParams) (
id.Round, ephemeral.Id, error) {
if !c.Monitor.IsHealthy() {
return 0, ephemeral.Id{}, errors.New(
"Cannot send cmix message when the network is not healthy")
}
compilerFunc := func(rid id.Round) (format.Message, error) {
fingerprint, service, payload, mac := compiler(rid)
if len(payload) != c.maxMsgLen {
return 0, ephemeral.Id{}, errors.Errorf(
return format.Message{}, errors.Errorf(
"bad message length (%d, need %d)",
len(payload), c.maxMsgLen)
}
......@@ -82,8 +97,10 @@ func (c *client) Send(recipient *id.ID, fingerprint format.Fingerprint,
if cmixParams.Critical {
c.crit.AddProcessing(msg, recipient, cmixParams)
}
return msg, nil
}
rid, ephID, rtnErr := sendCmixHelper(c.Sender, msg, recipient, cmixParams,
rid, ephID, msg, rtnErr := sendCmixHelper(c.Sender, compilerFunc, recipient, cmixParams,
c.instance, c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
c.session.GetTransmissionID(), c.comms)
......@@ -104,10 +121,10 @@ func (c *client) Send(recipient *id.ID, fingerprint format.Fingerprint,
// If the message is successfully sent, the ID of the round sent it is returned,
// which can be registered with the network instance to get a callback on its
// status.
func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
func sendCmixHelper(sender gateway.Sender, msgCompiler messageCompiler, recipient *id.ID,
cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group,
nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Reporter,
senderId *id.ID, comms SendCmixCommsInterface) (id.Round, ephemeral.Id, error) {
senderId *id.ID, comms SendCmixCommsInterface) (id.Round, ephemeral.Id, format.Message, error) {
timeStart := netTime.Now()
maxTimeout := sender.GetHostParams().SendTimeout
......@@ -120,15 +137,11 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
}
jww.INFO.Printf("[Send-%s] Looking for round to send cMix message to "+
"%s (msgDigest: %s)", cmixParams.DebugTag, recipient, msg.Digest())
"%s", cmixParams.DebugTag, recipient)
stream := rng.GetStream()
defer stream.Close()
// Flip leading bits randomly to thwart a tagging attack.
// See cmix.SetGroupBits for more info.
cmix.SetGroupBits(msg, grp, stream)
for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ {
elapsed := netTime.Since(timeStart)
jww.TRACE.Printf("[Send-%s] try %d, elapsed: %s",
......@@ -136,15 +149,15 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
if elapsed > cmixParams.Timeout {
jww.INFO.Printf("[Send-%s] No rounds to send to %s "+
"(msgDigest: %s) were found before timeout %s",
cmixParams.DebugTag, recipient, msg.Digest(), cmixParams.Timeout)
return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out")
"were found before timeout %s",
cmixParams.DebugTag, recipient, cmixParams.Timeout)
return 0, ephemeral.Id{}, format.Message{}, errors.New("Sending cmix message timed out")
}
if numRoundTries > 0 {
jww.INFO.Printf("[Send-%s] Attempt %d to find round to send "+
"message to %s (msgDigest: %s)", cmixParams.DebugTag,
numRoundTries+1, recipient, msg.Digest())
"message to %s", cmixParams.DebugTag,
numRoundTries+1, recipient)
}
// Find the best round to send to, excluding attempted rounds
......@@ -152,8 +165,8 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
bestRound, err := instance.GetWaitingRounds().GetUpcomingRealtime(
remainingTime, attempted, sendTimeBuffer)
if err != nil {
jww.WARN.Printf("[Send-%s] Failed to GetUpcomingRealtime "+
"(msgDigest: %s): %+v", cmixParams.DebugTag, msg.Digest(), err)
jww.WARN.Printf("[Send-%s] Failed to GetUpcomingRealtime: "+
"%+v", cmixParams.DebugTag, err)
}
if bestRound == nil {
......@@ -185,6 +198,16 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
continue
}
msg, err := msgCompiler(id.Round(bestRound.ID))
if err != nil {
jww.ERROR.Printf("Failed to compile message: %+v", err)
return 0, ephemeral.Id{}, format.Message{}, err
}
// Flip leading bits randomly to thwart a tagging attack.
// See cmix.SetGroupBits for more info.
cmix.SetGroupBits(msg, grp, stream)
// Retrieve host and key information from round
firstGateway, roundKeys, err := processRound(
nodes, bestRound, recipient.String(), msg.Digest())
......@@ -201,7 +224,7 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
wrappedMsg, encMsg, ephID, err := buildSlotMessage(msg, recipient,
firstGateway, stream, senderId, bestRound, roundKeys)
if err != nil {
return 0, ephemeral.Id{}, err
return 0, ephemeral.Id{}, format.Message{}, err
}
jww.INFO.Printf("[Send-%s] Sending to EphID %d (%s), on round %d "+
......@@ -251,7 +274,7 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
// Exit if the thread has been stopped
if stoppable.CheckErr(err) {
return 0, ephemeral.Id{}, err
return 0, ephemeral.Id{}, format.Message{}, err
}
// If the comm errors or the message fails to send, continue retrying
......@@ -260,7 +283,7 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
jww.ERROR.Printf("[Send-%s] SendCmix failed to send to "+
"EphID %d (%s) on round %d: %+v", cmixParams.DebugTag,
ephID.Int64(), recipient, bestRound.ID, err)
return 0, ephemeral.Id{}, err
return 0, ephemeral.Id{}, format.Message{}, err
}
jww.ERROR.Printf("[Send-%s] SendCmix failed to send to "+
......@@ -280,7 +303,7 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
jww.INFO.Print(m)
events.Report(1, "MessageSend", "Metric", m)
return id.Round(bestRound.ID), ephID, nil
return id.Round(bestRound.ID), ephID, msg, nil
} else {
jww.FATAL.Panicf("[Send-%s] Gateway %s returned no error, "+
"but failed to accept message when sending to EphID %d (%s) "+
......@@ -289,6 +312,6 @@ func sendCmixHelper(sender gateway.Sender, msg format.Message, recipient *id.ID,
}
}
return 0, ephemeral.Id{},
return 0, ephemeral.Id{}, format.Message{},
errors.New("failed to send the message, unknown error")
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment