Skip to content
Snippets Groups Projects
Commit df0fd4c1 authored by Jake Taylor's avatar Jake Taylor
Browse files

add proxied message sending flag

parent 651fa192
Branches
Tags
1 merge request!23Release
...@@ -16,6 +16,8 @@ type Messages struct { ...@@ -16,6 +16,8 @@ type Messages struct {
MessageReceptionWorkerPoolSize uint MessageReceptionWorkerPoolSize uint
MaxChecksGarbledMessage uint MaxChecksGarbledMessage uint
GarbledMessageWait time.Duration GarbledMessageWait time.Duration
// Use proxied (rather than direct) message sending
ProxySending bool
} }
func GetDefaultMessage() Messages { func GetDefaultMessage() Messages {
...@@ -24,5 +26,6 @@ func GetDefaultMessage() Messages { ...@@ -24,5 +26,6 @@ func GetDefaultMessage() Messages {
MessageReceptionWorkerPoolSize: 4, MessageReceptionWorkerPoolSize: 4,
MaxChecksGarbledMessage: 10, MaxChecksGarbledMessage: 10,
GarbledMessageWait: 15 * time.Minute, GarbledMessageWait: 15 * time.Minute,
ProxySending: false,
} }
} }
...@@ -40,7 +40,7 @@ const sendTimeBuffer = 2500 * time.Millisecond ...@@ -40,7 +40,7 @@ const sendTimeBuffer = 2500 * time.Millisecond
// Public manager function to send a message over CMIX // Public manager function to send a message over CMIX
func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) {
msgCopy := msg.Copy() msgCopy := msg.Copy()
return sendCmixHelper(sender, msgCopy, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) return sendCmixHelper(sender, msgCopy, recipient, m.param, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms)
} }
// Payloads send are not End to End encrypted, MetaData is NOT protected with // Payloads send are not End to End encrypted, MetaData is NOT protected with
...@@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient ...@@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient
// If the message is successfully sent, the id of the round sent it is returned, // If the message is successfully sent, the id of the round sent it is returned,
// which can be registered with the network instance to get a callback on // which can be registered with the network instance to get a callback on
// its status // its status
func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, messageParams params.Messages, cmixParams params.CMIX, instance *network.Instance,
session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID, session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID,
comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) { comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) {
...@@ -61,13 +61,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -61,13 +61,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
jww.INFO.Printf("Looking for round to send cMix message to %s "+ jww.INFO.Printf("Looking for round to send cMix message to %s "+
"(msgDigest: %s)", recipient, msg.Digest()) "(msgDigest: %s)", recipient, msg.Digest())
for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ {
elapsed := netTime.Now().Sub(timeStart) elapsed := netTime.Now().Sub(timeStart)
if elapsed > param.Timeout { if elapsed > cmixParams.Timeout {
jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+
"were found before timeout %s", recipient, msg.Digest(), "were found before timeout %s", recipient, msg.Digest(),
param.Timeout) cmixParams.Timeout)
return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out") return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out")
} }
if numRoundTries > 0 { if numRoundTries > 0 {
...@@ -76,7 +76,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -76,7 +76,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
msg.Digest()) msg.Digest())
} }
remainingTime := param.Timeout - elapsed remainingTime := cmixParams.Timeout - elapsed
//find the best round to send to, excluding attempted rounds //find the best round to send to, excluding attempted rounds
bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer)
if bestRound == nil { if bestRound == nil {
...@@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
"(msgDigest: %s) due to missing relationships with nodes: %s", "(msgDigest: %s) due to missing relationships with nodes: %s",
bestRound.ID, recipient, msg.Digest(), missingKeys) bestRound.ID, recipient, msg.Digest(), missingKeys)
go handleMissingNodeKeys(instance, nodeRegistration, missingKeys) go handleMissingNodeKeys(instance, nodeRegistration, missingKeys)
time.Sleep(param.RetryDelay) time.Sleep(cmixParams.RetryDelay)
continue continue
} }
...@@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
encMsg.Digest(), firstGateway.String()) encMsg.Digest(), firstGateway.String())
// Send the payload // Send the payload
result, err := sender.SendToSpecific(firstGateway, func(host *connect.Host, target *id.ID) (interface{}, bool, error) { sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) {
wrappedMsg.Target = target.Marshal() wrappedMsg.Target = target.Marshal()
result, err := comms.SendPutMessage(host, wrappedMsg) result, err := comms.SendPutMessage(host, wrappedMsg)
if err != nil { if err != nil {
...@@ -203,7 +203,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID ...@@ -203,7 +203,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID
} }
} }
return result, false, err return result, false, err
}) }
var result interface{}
if messageParams.ProxySending {
result, err = sender.SendToSpecific(firstGateway, sendFunc)
} else {
result, err = sender.SendToSpecific(firstGateway, sendFunc)
}
//if the comm errors or the message fails to send, continue retrying. //if the comm errors or the message fails to send, continue retrying.
//return if it sends properly //return if it sends properly
......
...@@ -143,7 +143,7 @@ func Test_attemptSendCmix(t *testing.T) { ...@@ -143,7 +143,7 @@ func Test_attemptSendCmix(t *testing.T) {
msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen())
msgCmix.SetContents([]byte("test")) msgCmix.SetContents([]byte("test"))
e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID())
_, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultMessage(), params.GetDefaultCMIX(),
m.Instance, m.Session, m.nodeRegistration, m.Rng, m.Instance, m.Session, m.nodeRegistration, m.Rng,
m.TransmissionID, &MockSendCMIXComms{t: t}) m.TransmissionID, &MockSendCMIXComms{t: t})
if err != nil { if err != nil {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment