diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 81050831aa9db87bb6ab999842d744324f4348d8..c950eac65f2bba880aa93e5a6736ca91c85394d0 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -8,12 +8,15 @@ package message import ( + "fmt" "github.com/golang-collections/collections/set" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/interfaces/params" - "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/client/storage" + pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fingerprint" "gitlab.com/elixxir/primitives/format" "gitlab.com/elixxir/primitives/states" @@ -24,6 +27,12 @@ import ( "time" ) +// interface for sendcmix comms; allows mocking this in testing +type sendCmixCommsInterface interface { + GetHost(hostId *id.ID) (*connect.Host, bool) + SendPutMessage(host *connect.Host, message *pb.GatewaySlot) (*pb.GatewaySlotResponse, error) +} + const sendTimeBuffer = uint64(100 * time.Millisecond) // WARNING: Potentially Unsafe @@ -36,12 +45,20 @@ const sendTimeBuffer = uint64(100 * time.Millisecond) // which can be registered with the network instance to get a callback on // its status func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CMIX) (id.Round, ephemeral.Id, error) { + return sendCmixHelper(msg, recipient, param, m.Instance, m.Session, m.nodeRegistration, m.Rng, m.Uid, m.Comms) +} + +// Attempt to send a message over cmix +func sendCmixHelper(msg format.Message, recipient *id.ID, param params.CMIX, instance *network.Instance, + session *storage.Session, nodeRegistration chan network.NodeGateway, rng *fastRNG.StreamGenerator, senderId *id.ID, + comms sendCmixCommsInterface) (id.Round, ephemeral.Id, error) { timeStart := time.Now() attempted := set.New() for numRoundTries := uint(0); numRoundTries < param.RoundTries; numRoundTries++ { elapsed := time.Now().Sub(timeStart) + jww.DEBUG.Printf("SendCMIX Send Attempt %d", numRoundTries+1) if elapsed > param.Timeout { return 0, ephemeral.Id{}, errors.New("Sending cmix message timed out") @@ -49,7 +66,7 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM remainingTime := param.Timeout - elapsed jww.TRACE.Printf("SendCMIX GetUpcommingRealtime") //find the best round to send to, excluding attempted rounds - bestRound, _ := m.Instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted) + bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime(remainingTime, attempted) if bestRound == nil { continue } @@ -65,13 +82,12 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM ephID, _, _, err := ephemeral.GetId(recipient, uint(bestRound.AddressSpaceSize), int64(bestRound.Timestamps[states.REALTIME])) - msg.SetEphemeralRID(ephID[:]) //set the identity fingerprint ifp, err := fingerprint.IdentityFP(msg.GetContents(), recipient) - if err!=nil{ - jww.FATAL.Panicf("failed to generate the Identity " + + if err != nil { + jww.FATAL.Panicf("failed to generate the Identity "+ "fingerprint due to unrecoverable error: %+v", err) } @@ -87,9 +103,9 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM jww.TRACE.Printf("SendCMIX GetRoundKeys") //get they keys for the round, reject if any nodes do not have //keying relationships - roundKeys, missingKeys := m.Session.Cmix().GetRoundKeys(topology) + roundKeys, missingKeys := session.Cmix().GetRoundKeys(topology) if len(missingKeys) > 0 { - go handleMissingNodeKeys(m.Instance, m.nodeRegistration, missingKeys) + go handleMissingNodeKeys(instance, nodeRegistration, missingKeys) time.Sleep(param.RetryDelay) continue } @@ -98,7 +114,7 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM firstGateway := topology.GetNodeAtIndex(0).DeepCopy() firstGateway.SetType(id.Gateway) - transmitGateway, ok := m.Comms.GetHost(firstGateway) + transmitGateway, ok := comms.GetHost(firstGateway) if !ok { jww.ERROR.Printf("Failed to get host for gateway %s", transmitGateway) time.Sleep(param.RetryDelay) @@ -107,7 +123,7 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM //encrypt the message salt := make([]byte, 32) - stream := m.Rng.GetStream() + stream := rng.GetStream() _, err = stream.Read(salt) stream.Close() @@ -119,8 +135,8 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM encMsg, kmacs := roundKeys.Encrypt(msg, salt) //build the message payload - msgPacket := &mixmessages.Slot{ - SenderID: m.Uid.Bytes(), + msgPacket := &pb.Slot{ + SenderID: senderId.Bytes(), PayloadA: encMsg.GetPayloadA(), PayloadB: encMsg.GetPayloadB(), Salt: salt, @@ -128,19 +144,21 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM } //create the wrapper to the gateway - msg := &mixmessages.GatewaySlot{ + wrappedMsg := &pb.GatewaySlot{ Message: msgPacket, RoundID: bestRound.ID, } //Add the mac proving ownership - msg.MAC = roundKeys.MakeClientGatewayKey(salt, - network.GenerateSlotDigest(msg)) + wrappedMsg.MAC = roundKeys.MakeClientGatewayKey(salt, + network.GenerateSlotDigest(wrappedMsg)) //add the round on to the list of attempted so it is not tried again attempted.Insert(bestRound) + jww.DEBUG.Printf("SendCMIX SendPutMessage") //Send the payload - gwSlotResp, err := m.Comms.SendPutMessage(transmitGateway, msg) + fmt.Println("Sending") + gwSlotResp, err := comms.SendPutMessage(transmitGateway, wrappedMsg) //if the comm errors or the message fails to send, continue retrying. //return if it sends properly if err != nil { @@ -156,7 +174,6 @@ func (m *Manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM return id.Round(bestRound.ID), ephID, nil } } - return 0, ephemeral.Id{}, errors.New("failed to send the message") } diff --git a/network/message/sendCmix_test.go b/network/message/sendCmix_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a623c77fa03842a62b3dddfb20a18b236bcceb22 --- /dev/null +++ b/network/message/sendCmix_test.go @@ -0,0 +1,150 @@ +package message + +import ( + "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/interfaces/params" + "gitlab.com/elixxir/client/network/internal" + "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/client/switchboard" + "gitlab.com/elixxir/comms/client" + "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/crypto/cyclic" + "gitlab.com/elixxir/crypto/e2e" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/elixxir/primitives/states" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/crypto/large" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" + "testing" + "time" +) + +type MockSendCMIXComms struct { + t *testing.T +} + +func (mc *MockSendCMIXComms) GetHost(hostId *id.ID) (*connect.Host, bool) { + nid1 := id.NewIdFromString("zezima", id.Node, mc.t) + gwid := nid1.DeepCopy() + gwid.SetType(id.Gateway) + h, _ := connect.NewHost(gwid, "0.0.0.0", []byte(""), connect.HostParams{ + MaxRetries: 0, + AuthEnabled: false, + }) + return h, true +} +func (mc *MockSendCMIXComms) SendPutMessage(host *connect.Host, message *mixmessages.GatewaySlot) (*mixmessages.GatewaySlotResponse, error) { + return &mixmessages.GatewaySlotResponse{ + Accepted: true, + RoundID: 3, + }, nil +} + +func Test_attemptSendCmix(t *testing.T) { + sess1 := storage.InitTestingSession(t) + + sess2 := storage.InitTestingSession(t) + + sw := switchboard.New() + l := TestListener{ + ch: make(chan bool), + } + sw.RegisterListener(sess2.GetUser().TransmissionID, message.Raw, l) + comms, err := client.NewClientComms(sess1.GetUser().TransmissionID, nil, nil, nil) + if err != nil { + t.Errorf("Failed to start client comms: %+v", err) + } + inst, err := network.NewInstanceTesting(comms.ProtoComms, getNDF(), nil, nil, nil, t) + if err != nil { + t.Errorf("Failed to start instance: %+v", err) + } + now := uint64(time.Now().UnixNano()) + nid1 := id.NewIdFromString("zezima", id.Node, t) + nid2 := id.NewIdFromString("jakexx360", id.Node, t) + nid3 := id.NewIdFromString("westparkhome", id.Node, t) + grp := cyclic.NewGroup(large.NewInt(7), large.NewInt(13)) + sess1.Cmix().Add(nid1, grp.NewInt(1)) + sess1.Cmix().Add(nid2, grp.NewInt(2)) + sess1.Cmix().Add(nid3, grp.NewInt(3)) + inst.GetWaitingRounds().Insert(&mixmessages.RoundInfo{ + ID: 3, + UpdateID: 0, + State: uint32(states.QUEUED), + BatchSize: 0, + Topology: [][]byte{nid1.Marshal(), nid2.Marshal(), nid3.Marshal()}, + Timestamps: []uint64{now - 30, now - 15, now, now + 15, 0}, + Errors: nil, + ClientErrors: nil, + ResourceQueueTimeoutMillis: 0, + Signature: nil, + AddressSpaceSize: 0, + }) + i := internal.Internal{ + Session: sess1, + Switchboard: sw, + Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + Comms: comms, + Health: nil, + Uid: sess1.GetUser().TransmissionID, + Instance: inst, + NodeRegistration: nil, + } + m := NewManager(i, params.Messages{ + MessageReceptionBuffLen: 20, + MessageReceptionWorkerPoolSize: 20, + MaxChecksGarbledMessage: 20, + GarbledMessageWait: time.Hour, + }, nil) + msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen()) + msgCmix.SetContents([]byte("test")) + e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID()) + _, _, err = sendCmixHelper(msgCmix, sess2.GetUser().ReceptionID, params.GetDefaultCMIX(), + m.Instance, m.Session, m.nodeRegistration, m.Rng, + m.Uid, &MockSendCMIXComms{t: t}) + if err != nil { + t.Errorf("Failed to sendcmix: %+v", err) + } +} + +func getNDF() *ndf.NetworkDefinition { + return &ndf.NetworkDefinition{ + E2E: ndf.Group{ + Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B" + + "7A8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3DD2AE" + + "DF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E7861575E745D31F" + + "8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC6ADC718DD2A3E041" + + "023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C4A530E8FFB1BC51DADDF45" + + "3B0B2717C2BC6669ED76B4BDD5C9FF558E88F26E5785302BEDBCA23EAC5ACE9209" + + "6EE8A60642FB61E8F3D24990B8CB12EE448EEF78E184C7242DD161C7738F32BF29" + + "A841698978825B4111B4BC3E1E198455095958333D776D8B2BEEED3A1A1A221A6E" + + "37E664A64B83981C46FFDDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F2" + + "78DE8014A47323631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696" + + "015CB79C3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E" + + "6319BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC35873" + + "847AEF49F66E43873", + Generator: "2", + }, + CMIX: ndf.Group{ + Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642F0B5C48" + + "C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757264E5A1A44F" + + "FE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F9716BFE6117C6B5" + + "B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091EB51743BF33050C38DE2" + + "35567E1B34C3D6A5C0CEAA1A0F368213C3D19843D0B4B09DCB9FC72D39C8DE41" + + "F1BF14D4BB4563CA28371621CAD3324B6A2D392145BEBFAC748805236F5CA2FE" + + "92B871CD8F9C36D3292B5509CA8CAA77A2ADFC7BFD77DDA6F71125A7456FEA15" + + "3E433256A2261C6A06ED3693797E7995FAD5AABBCFBE3EDA2741E375404AE25B", + Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E24809670716C613" + + "D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D1AA58C4328A06C4" + + "6A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A338661D10461C0D135472" + + "085057F3494309FFA73C611F78B32ADBB5740C361C9F35BE90997DB2014E2EF5" + + "AA61782F52ABEB8BD6432C4DD097BC5423B285DAFB60DC364E8161F4A2A35ACA" + + "3A10B1C4D203CC76A470A33AFDCBDD92959859ABD8B56E1725252D78EAC66E71" + + "BA9AE3F1DD2487199874393CD4D832186800654760E1E34C09E4D155179F9EC0" + + "DC4473F996BDCE6EED1CABED8B6F116F7AD9CF505DF0F998E34AB27514B0FFE7", + }, + } +}