diff --git a/interfaces/params/message.go b/interfaces/params/message.go index fbf9779829b939145cf7bc1277fa79b5617b826a..acecde4ef9861ab722ff05c7e645272f20d21694 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -16,6 +16,8 @@ type Messages struct { MessageReceptionWorkerPoolSize uint MaxChecksGarbledMessage uint GarbledMessageWait time.Duration + // Use proxied (rather than direct) message sending + ProxySending bool } func GetDefaultMessage() Messages { @@ -24,5 +26,6 @@ func GetDefaultMessage() Messages { MessageReceptionWorkerPoolSize: 4, MaxChecksGarbledMessage: 10, GarbledMessageWait: 15 * time.Minute, + ProxySending: false, } } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 2a7095843eb57c6f9af2af16ea9725fab0aac627..b608e8c6e2c62ace780a8ce8391f943ac1712bb1 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -40,7 +40,7 @@ const sendTimeBuffer = 2500 * time.Millisecond // Public manager function to send a message over CMIX func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { msgCopy := msg.Copy() - return sendCmixHelper(sender, msgCopy, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) + return sendCmixHelper(sender, msgCopy, recipient, m.param, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, m.Comms) } // Payloads send are not End to End encrypted, MetaData is NOT protected with @@ -51,7 +51,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message, recipient // If the message is successfully sent, the id of the round sent it is returned, // which can be registered with the network instance to get a callback on // its status -func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, +func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID, messageParams params.Messages, cmixParams params.CMIX, instance *network.Instance, session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID, comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) { @@ -61,13 +61,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID jww.INFO.Printf("Looking for round to send cMix message to %s "+ "(msgDigest: %s)", recipient, msg.Digest()) - for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { + for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { elapsed := netTime.Now().Sub(timeStart) - if elapsed > param.Timeout { + if elapsed > cmixParams.Timeout { jww.INFO.Printf("No rounds to send to %s (msgDigest: %s) "+ "were found before timeout %s", recipient, msg.Digest(), - param.Timeout) + cmixParams.Timeout) return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out") } if numRoundTries > 0 { @@ -76,7 +76,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID msg.Digest()) } - remainingTime := param.Timeout - elapsed + remainingTime := cmixParams.Timeout - elapsed //find the best round to send to, excluding attempted rounds bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted, sendTimeBuffer) if bestRound == nil { @@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID "(msgDigest: %s) due to missing relationships with nodes: %s", bestRound.ID, recipient, msg.Digest(), missingKeys) go handleMissingNodeKeys(instance, nodeRegistration, missingKeys) - time.Sleep(param.RetryDelay) + time.Sleep(cmixParams.RetryDelay) continue } @@ -175,7 +175,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID encMsg.Digest(), firstGateway.String()) // Send the payload - result, err := sender.SendToSpecific(firstGateway, func(host *connect.Host, target *id.ID) (interface{}, bool, error) { + sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) { wrappedMsg.Target = target.Marshal() result, err := comms.SendPutMessage(host, wrappedMsg) if err != nil { @@ -203,7 +203,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, recipient *id.ID } } return result, false, err - }) + } + var result interface{} + if messageParams.ProxySending { + result, err = sender.SendToSpecific(firstGateway, sendFunc) + } else { + result, err = sender.SendToSpecific(firstGateway, sendFunc) + } //if the comm errors or the message fails to send, continue retrying. //return if it sends properly diff --git a/network/message/sendCmix_test.go b/network/message/sendCmix_test.go index 04da108426dda151bec4777338fe4a0036366832..28a2cdbab74a68f12ce6e45814bde8ff521adcf1 100644 --- a/network/message/sendCmix_test.go +++ b/network/message/sendCmix_test.go @@ -143,7 +143,7 @@ func Test_attemptSendCmix(t *testing.T) { msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) msgCmix.SetContents([]byte("test")) e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) - _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), + _, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultMessage(), params.GetDefaultCMIX(), m.Instance, m.Session, m.nodeRegistration, m.Rng, m.TransmissionID, &MockSendCMIXComms{t: t}) if err != nil {