diff --git a/bindings/delivery.go b/bindings/delivery.go index 89521d887175367d6fd40966363b8e12a7fbd89a..92ccdd759425226f206bac0f7f280fefdbd70f60 100644 --- a/bindings/delivery.go +++ b/bindings/delivery.go @@ -115,7 +115,7 @@ func (c *Cmix) WaitForRoundResult( timeout := time.Duration(timeoutMS) * time.Millisecond - err = c.api.GetCmix().GetRoundResults(timeout, f, rl...) + c.api.GetCmix().GetRoundResults(timeout, f, rl...) - return err + return nil } diff --git a/channels/adminListener.go b/channels/adminListener.go index 7d5b7b8baf2d8da7b8b4f800a82f970d2dd73a2c..0b26859d97f51c953ed543fad52cf14bd39d9483 100644 --- a/channels/adminListener.go +++ b/channels/adminListener.go @@ -20,8 +20,9 @@ import ( // adminListener adheres to the [broadcast.ListenerFunc] interface and is used // when admin messages are received on the channel. type adminListener struct { - chID *id.ID - trigger triggerAdminEventFunc + chID *id.ID + trigger triggerAdminEventFunc + checkSent messageReceiveFunc } // Listen is called when a message is received for the admin listener @@ -47,6 +48,11 @@ func (al *adminListener) Listen(payload []byte, return } + //check if we sent the message, ignore triggering if we sent + if al.checkSent(msgID) { + return + } + /* CRYPTOGRAPHICALLY RELEVANT CHECKS */ // Check the round to ensure that the message is not a replay @@ -58,7 +64,7 @@ func (al *adminListener) Listen(payload []byte, } // Submit the message to the event model for listening - al.trigger(al.chID, cm, msgID, receptionID, round) + al.trigger(al.chID, cm, msgID, receptionID, round, Delivered) return } diff --git a/channels/adminListener_test.go b/channels/adminListener_test.go index a6b78926d0c62bd43acb990ac395d7bcfe48a998..c3414b673711bc5d4329aa458f35d7e7072c4dbe 100644 --- a/channels/adminListener_test.go +++ b/channels/adminListener_test.go @@ -34,7 +34,8 @@ type triggerAdminEventDummy struct { func (taed *triggerAdminEventDummy) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, messageID cryptoChannel.MessageID, - receptionID receptionID.EphemeralIdentity, round rounds.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round, + status SentStatus) { taed.gotData = true taed.chID = chID @@ -78,8 +79,9 @@ func TestAdminListener_Listen(t *testing.T) { dummy := &triggerAdminEventDummy{} al := adminListener{ - chID: chID, - trigger: dummy.triggerAdminEvent, + chID: chID, + trigger: dummy.triggerAdminEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } // Call the listener @@ -144,8 +146,9 @@ func TestAdminListener_Listen_BadRound(t *testing.T) { dummy := &triggerAdminEventDummy{} al := adminListener{ - chID: chID, - trigger: dummy.triggerAdminEvent, + chID: chID, + trigger: dummy.triggerAdminEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } // Call the listener @@ -181,8 +184,9 @@ func TestAdminListener_Listen_BadChannelMessage(t *testing.T) { dummy := &triggerAdminEventDummy{} al := adminListener{ - chID: chID, - trigger: dummy.triggerAdminEvent, + chID: chID, + trigger: dummy.triggerAdminEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } // Call the listener @@ -233,8 +237,9 @@ func TestAdminListener_Listen_BadSizedBroadcast(t *testing.T) { dummy := &triggerAdminEventDummy{} al := adminListener{ - chID: chID, - trigger: dummy.triggerAdminEvent, + chID: chID, + trigger: dummy.triggerAdminEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } // Call the listener diff --git a/channels/eventModel.go b/channels/eventModel.go index 1a244956b3a88237f1b11b188fc6a0ae40408232..ba68e417cd823d1b22b5580dcdd159bb1909456f 100644 --- a/channels/eventModel.go +++ b/channels/eventModel.go @@ -86,7 +86,11 @@ type EventModel interface { type MessageTypeReceiveMessage func(channelID *id.ID, messageID cryptoChannel.MessageID, messageType MessageType, senderUsername string, content []byte, timestamp time.Time, - lease time.Duration, round rounds.Round) + lease time.Duration, round rounds.Round, status SentStatus) + +// updateStatusFunc is a function type for EventModel.UpdateSentStatus so it +// can be mocked for testing where used +type updateStatusFunc func(messageID cryptoChannel.MessageID, status SentStatus) // events is an internal structure which processes events and stores the // handlers for those events @@ -134,13 +138,14 @@ func (e *events) RegisterReceiveHandler(messageType MessageType, } type triggerEventFunc func(chID *id.ID, umi *userMessageInternal, - receptionID receptionID.EphemeralIdentity, round rounds.Round) + receptionID receptionID.EphemeralIdentity, round rounds.Round, status SentStatus) // triggerEvent is an internal function which is used to trigger message // reception on a message received from a user (symmetric encryption) // It will call the appropriate MessageTypeHandler assuming one exists. func (e *events) triggerEvent(chID *id.ID, umi *userMessageInternal, - receptionID receptionID.EphemeralIdentity, round rounds.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round, + status SentStatus) { um := umi.GetUserMessage() cm := umi.GetChannelMessage() messageType := MessageType(cm.PayloadType) @@ -162,19 +167,21 @@ func (e *events) triggerEvent(chID *id.ID, umi *userMessageInternal, //Call the listener. This is already in an instanced event, no new thread needed. listener(chID, umi.GetMessageID(), messageType, um.Username, - cm.Payload, ts, time.Duration(cm.Lease), round) + cm.Payload, ts, time.Duration(cm.Lease), round, status) return } type triggerAdminEventFunc func(chID *id.ID, cm *ChannelMessage, messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, - round rounds.Round) + round rounds.Round, status SentStatus) // triggerAdminEvent is an internal function which is used to trigger message // reception on a message received from the admin (asymmetric encryption) // It will call the appropriate MessageTypeHandler assuming one exists. func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, - messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, round rounds.Round) { + messageID cryptoChannel.MessageID, + receptionID receptionID.EphemeralIdentity, round rounds.Round, + status SentStatus) { messageType := MessageType(cm.PayloadType) //check if the type is already registered @@ -194,7 +201,7 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, //Call the listener. This is already in an instanced event, no new thread needed. listener(chID, messageID, messageType, AdminUsername, - cm.Payload, ts, time.Duration(cm.Lease), round) + cm.Payload, ts, time.Duration(cm.Lease), round, status) return } @@ -206,7 +213,7 @@ func (e *events) triggerAdminEvent(chID *id.ID, cm *ChannelMessage, func (e *events) receiveTextMessage(channelID *id.ID, messageID cryptoChannel.MessageID, messageType MessageType, senderUsername string, content []byte, timestamp time.Time, - lease time.Duration, round rounds.Round) { + lease time.Duration, round rounds.Round, status SentStatus) { txt := &CMIXChannelText{} if err := proto.Unmarshal(content, txt); err != nil { @@ -223,7 +230,7 @@ func (e *events) receiveTextMessage(channelID *id.ID, var replyTo cryptoChannel.MessageID copy(replyTo[:], txt.ReplyMessageID) e.model.ReceiveReply(channelID, messageID, replyTo, - senderUsername, txt.Text, timestamp, lease, round, Delivered) + senderUsername, txt.Text, timestamp, lease, round, status) return } else { @@ -240,7 +247,7 @@ func (e *events) receiveTextMessage(channelID *id.ID, fmt.Println(channelID) e.model.ReceiveMessage(channelID, messageID, senderUsername, txt.Text, - timestamp, lease, round, Delivered) + timestamp, lease, round, status) } // receiveReaction is the internal function which handles the reception of @@ -252,7 +259,7 @@ func (e *events) receiveTextMessage(channelID *id.ID, func (e *events) receiveReaction(channelID *id.ID, messageID cryptoChannel.MessageID, messageType MessageType, senderUsername string, content []byte, timestamp time.Time, - lease time.Duration, round rounds.Round) { + lease time.Duration, round rounds.Round, status SentStatus) { react := &CMIXChannelReaction{} if err := proto.Unmarshal(content, react); err != nil { jww.ERROR.Printf("Failed to text unmarshal message %s from %s on "+ @@ -276,7 +283,7 @@ func (e *events) receiveReaction(channelID *id.ID, var reactTo cryptoChannel.MessageID copy(reactTo[:], react.ReactionMessageID) e.model.ReceiveReaction(channelID, messageID, reactTo, senderUsername, - react.Reaction, timestamp, lease, round, Delivered) + react.Reaction, timestamp, lease, round, status) } else { jww.ERROR.Printf("Failed process reaction %s from %s on channel "+ "%s, type %s, ts: %s, lease: %s, round: %d, reacting to "+ diff --git a/channels/eventModel_test.go b/channels/eventModel_test.go index 5241c9fc6acb1796aa1af20a435c3d0b349aa279..ef1b635a9de5dc96ebe032c859219da36cc193fc 100644 --- a/channels/eventModel_test.go +++ b/channels/eventModel_test.go @@ -195,7 +195,8 @@ type dummyMessageTypeHandler struct { func (dmth *dummyMessageTypeHandler) dummyMessageTypeReceiveMessage( channelID *id.ID, messageID cryptoChannel.MessageID, messageType MessageType, senderUsername string, content []byte, - timestamp time.Time, lease time.Duration, round rounds.Round) { + timestamp time.Time, lease time.Duration, round rounds.Round, + status SentStatus) { dmth.triggered = true dmth.channelID = channelID dmth.messageID = messageID @@ -232,7 +233,7 @@ func TestEvents_triggerEvents(t *testing.T) { r.Timestamps[states.QUEUED] = time.Now() //call the trigger - e.triggerEvent(chID, umi, receptionID.EphemeralIdentity{}, r) + e.triggerEvent(chID, umi, receptionID.EphemeralIdentity{}, r, Delivered) //check that the event was triggered if !dummy.triggered { @@ -301,7 +302,7 @@ func TestEvents_triggerEvents_noChannel(t *testing.T) { r.Timestamps[states.QUEUED] = time.Now() //call the trigger - e.triggerEvent(chID, umi, receptionID.EphemeralIdentity{}, r) + e.triggerEvent(chID, umi, receptionID.EphemeralIdentity{}, r, Delivered) //check that the event was triggered if dummy.triggered { @@ -336,7 +337,8 @@ func TestEvents_triggerAdminEvents(t *testing.T) { msgID := cryptoChannel.MakeMessageID(u.userMessage.Message) //call the trigger - e.triggerAdminEvent(chID, cm, msgID, receptionID.EphemeralIdentity{}, r) + e.triggerAdminEvent(chID, cm, msgID, receptionID.EphemeralIdentity{}, r, + Delivered) //check that the event was triggered if !dummy.triggered { @@ -407,7 +409,8 @@ func TestEvents_triggerAdminEvents_noChannel(t *testing.T) { msgID := cryptoChannel.MakeMessageID(u.userMessage.Message) //call the trigger - e.triggerAdminEvent(chID, cm, msgID, receptionID.EphemeralIdentity{}, r) + e.triggerAdminEvent(chID, cm, msgID, receptionID.EphemeralIdentity{}, r, + Delivered) //check that the event was triggered if dummy.triggered { @@ -447,7 +450,7 @@ func TestEvents_receiveTextMessage_Message(t *testing.T) { //call the handler e.receiveTextMessage(chID, msgID, 0, senderUsername, - textMarshaled, ts, lease, r) + textMarshaled, ts, lease, r, Delivered) //check the results on the model if !me.eventReceive.channelID.Cmp(chID) { @@ -520,7 +523,7 @@ func TestEvents_receiveTextMessage_Reply(t *testing.T) { //call the handler e.receiveTextMessage(chID, msgID, Text, senderUsername, - textMarshaled, ts, lease, r) + textMarshaled, ts, lease, r, Delivered) //check the results on the model if !me.eventReceive.channelID.Cmp(chID) { @@ -593,7 +596,7 @@ func TestEvents_receiveTextMessage_Reply_BadReply(t *testing.T) { //call the handler e.receiveTextMessage(chID, msgID, 0, senderUsername, - textMarshaled, ts, lease, r) + textMarshaled, ts, lease, r, Delivered) //check the results on the model if !me.eventReceive.channelID.Cmp(chID) { @@ -666,7 +669,7 @@ func TestEvents_receiveReaction(t *testing.T) { //call the handler e.receiveReaction(chID, msgID, 0, senderUsername, - textMarshaled, ts, lease, r) + textMarshaled, ts, lease, r, Delivered) //check the results on the model if !me.eventReceive.channelID.Cmp(chID) { @@ -739,7 +742,7 @@ func TestEvents_receiveReaction_InvalidReactionMessageID(t *testing.T) { //call the handler e.receiveReaction(chID, msgID, 0, senderUsername, - textMarshaled, ts, lease, r) + textMarshaled, ts, lease, r, Delivered) //check the results on the model if me.eventReceive.channelID != nil { @@ -802,7 +805,7 @@ func TestEvents_receiveReaction_InvalidReactionContent(t *testing.T) { //call the handler e.receiveReaction(chID, msgID, 0, senderUsername, - textMarshaled, ts, lease, r) + textMarshaled, ts, lease, r, Delivered) //check the results on the model if me.eventReceive.channelID != nil { diff --git a/channels/interface.go b/channels/interface.go index 219857e43939df8b2a0784784115a198b36b5189..01e4066032f88a52fa9a1e3d118caf799e0beba8 100644 --- a/channels/interface.go +++ b/channels/interface.go @@ -9,6 +9,7 @@ package channels import ( "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/rounds" cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast" cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/xx_network/crypto/signature/rsa" @@ -43,7 +44,7 @@ type Manager interface { // Them meaning of validUntil depends on the use case. SendGeneric(channelID *id.ID, messageType MessageType, msg []byte, validUntil time.Duration, params cmix.CMIXParams) ( - cryptoChannel.MessageID, id.Round, ephemeral.Id, error) + cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) // SendAdminGeneric is used to send a raw message over a channel encrypted // with admin keys, identifying it as sent by the admin. In general, it @@ -52,8 +53,8 @@ type Manager interface { // return an error. The message must be at most 510 bytes long. SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, messageType MessageType, msg []byte, validUntil time.Duration, - params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id, - error) + params cmix.CMIXParams) (cryptoChannel.MessageID, rounds.Round, + ephemeral.Id, error) // SendMessage is used to send a formatted message over a channel. // Due to the underlying encoding using compression, it isn't @@ -63,7 +64,7 @@ type Manager interface { // lasting forever if ValidForever is used SendMessage(channelID *id.ID, msg string, validUntil time.Duration, params cmix.CMIXParams) ( - cryptoChannel.MessageID, id.Round, ephemeral.Id, error) + cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) // SendReply is used to send a formatted message over a channel. // Due to the underlying encoding using compression, it isn't @@ -75,14 +76,14 @@ type Manager interface { // lasting forever if ValidForever is used SendReply(channelID *id.ID, msg string, replyTo cryptoChannel.MessageID, validUntil time.Duration, params cmix.CMIXParams) ( - cryptoChannel.MessageID, id.Round, ephemeral.Id, error) + cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) // SendReaction is used to send a reaction to a message over a channel. // The reaction must be a single emoji with no other characters, and will // be rejected otherwise. // Clients will drop the reaction if they do not recognize the reactTo message SendReaction(channelID *id.ID, reaction string, reactTo cryptoChannel.MessageID, - params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, + params cmix.CMIXParams) (cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) // RegisterReceiveHandler is used to register handlers for non default message diff --git a/channels/joinedChannel.go b/channels/joinedChannel.go index 2137740120da242b2d9bfcd1fff360dd63fda34a..8d41a47c7e404aab79e76b5788c65c86f14b11a9 100644 --- a/channels/joinedChannel.go +++ b/channels/joinedChannel.go @@ -72,7 +72,8 @@ func (m *manager) loadChannels() { for i := range chList { jc, err := loadJoinedChannel( - chList[i], m.kv, m.net, m.rng, m.name, m.events, m.broadcastMaker) + chList[i], m.kv, m.net, m.rng, m.name, m.events, m.broadcastMaker, + m.st.MessageReceive) if err != nil { jww.FATAL.Panicf("Failed to load channel %s: %+v", chList[i], err) } @@ -110,17 +111,19 @@ func (m *manager) addChannel(channel *cryptoBroadcast.Channel) error { // Connect to listeners err = b.RegisterListener((&userListener{ - name: m.name, - chID: channel.ReceptionID, - trigger: m.events.triggerEvent, + name: m.name, + chID: channel.ReceptionID, + trigger: m.events.triggerEvent, + checkSent: m.st.MessageReceive, }).Listen, broadcast.Symmetric) if err != nil { return err } err = b.RegisterListener((&adminListener{ - chID: channel.ReceptionID, - trigger: m.events.triggerAdminEvent, + chID: channel.ReceptionID, + trigger: m.events.triggerAdminEvent, + checkSent: m.st.MessageReceive, }).Listen, broadcast.Asymmetric) if err != nil { return err @@ -209,7 +212,7 @@ func (jc *joinedChannel) Store(kv *versioned.KV) error { // loadJoinedChannel loads a given channel from ekv storage. func loadJoinedChannel(chId *id.ID, kv *versioned.KV, net broadcast.Client, rngGen *fastRNG.StreamGenerator, name NameService, e *events, - broadcastMaker broadcast.NewBroadcastChannelFunc) (*joinedChannel, error) { + broadcastMaker broadcast.NewBroadcastChannelFunc, mr messageReceiveFunc) (*joinedChannel, error) { obj, err := kv.Get(makeJoinedChannelKey(chId), joinedChannelVersion) if err != nil { return nil, err @@ -222,7 +225,7 @@ func loadJoinedChannel(chId *id.ID, kv *versioned.KV, net broadcast.Client, return nil, err } - b, err := initBroadcast(jcd.Broadcast, name, e, net, broadcastMaker, rngGen) + b, err := initBroadcast(jcd.Broadcast, name, e, net, broadcastMaker, rngGen, mr) jc := &joinedChannel{broadcast: b} return jc, nil @@ -241,24 +244,26 @@ func makeJoinedChannelKey(chId *id.ID) string { func initBroadcast(c *cryptoBroadcast.Channel, name NameService, e *events, net broadcast.Client, broadcastMaker broadcast.NewBroadcastChannelFunc, - rngGen *fastRNG.StreamGenerator) (broadcast.Channel, error) { + rngGen *fastRNG.StreamGenerator, mr messageReceiveFunc) (broadcast.Channel, error) { b, err := broadcastMaker(c, net, rngGen) if err != nil { return nil, err } err = b.RegisterListener((&userListener{ - name: name, - chID: c.ReceptionID, - trigger: e.triggerEvent, + name: name, + chID: c.ReceptionID, + trigger: e.triggerEvent, + checkSent: mr, }).Listen, broadcast.Symmetric) if err != nil { return nil, err } err = b.RegisterListener((&adminListener{ - chID: c.ReceptionID, - trigger: e.triggerAdminEvent, + chID: c.ReceptionID, + trigger: e.triggerAdminEvent, + checkSent: mr, }).Listen, broadcast.Asymmetric) if err != nil { return nil, err diff --git a/channels/joinedChannel_test.go b/channels/joinedChannel_test.go index ba20c46dcece6249505c7d724286f5e40d0f0e1e..1d4750deb0eeddb9adbcfe5dd5c6189e50e19f83 100644 --- a/channels/joinedChannel_test.go +++ b/channels/joinedChannel_test.go @@ -370,7 +370,9 @@ func Test_loadJoinedChannel(t *testing.T) { } loadedJc, err := loadJoinedChannel(ch.ReceptionID, m.kv, m.net, m.rng, - m.name, m.events, m.broadcastMaker) + m.name, m.events, m.broadcastMaker, func(messageID cryptoChannel.MessageID) bool { + return false + }) if err != nil { t.Errorf("Failed to load joinedChannel: %+v", err) } @@ -490,9 +492,10 @@ func (m *mockBroadcastClient) AddService(*id.ID, message.Service, message.Proces func (m *mockBroadcastClient) DeleteClientService(*id.ID) {} func (m *mockBroadcastClient) RemoveIdentity(*id.ID) {} func (m *mockBroadcastClient) GetRoundResults(timeout time.Duration, - roundCallback clientCmix.RoundEventCallback, roundList ...id.Round) error { - return nil + roundCallback clientCmix.RoundEventCallback, roundList ...id.Round) { } +func (m *mockBroadcastClient) AddHealthCallback(f func(bool)) uint64 { return 0 } +func (m *mockBroadcastClient) RemoveHealthCallback(uint64) {} //////////////////////////////////////////////////////////////////////////////// // Mock EventModel // diff --git a/channels/manager.go b/channels/manager.go index d0f7a117fe77e63f43efbb9dccbaf1ae56908d53..2c63aed82c7000b446b0617785775a1b1f8e3ad0 100644 --- a/channels/manager.go +++ b/channels/manager.go @@ -38,6 +38,9 @@ type manager struct { // Events model *events + //send tracker + st *sendTracker + // Makes the function that is used to create broadcasts be a pointer so that // it can be replaced in tests broadcastMaker broadcast.NewBroadcastChannelFunc @@ -56,7 +59,9 @@ type Client interface { DeleteClientService(clientID *id.ID) RemoveIdentity(id *id.ID) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, - roundList ...id.Round) error + roundList ...id.Round) + AddHealthCallback(f func(bool)) uint64 + RemoveHealthCallback(uint64) } // NewManager creates a new channel.Manager. It prefixes the KV with the @@ -77,6 +82,9 @@ func NewManager(kv *versioned.KV, net Client, m.events = initEvents(model) + m.st = loadSendTracker(net, kv, m.events.triggerEvent, + m.events.triggerAdminEvent, model.UpdateSentStatus) + m.loadChannels() return &m @@ -145,7 +153,8 @@ func (m *manager) ReplayChannel(chID *id.ID) error { jc.broadcast.Stop() //re-instantiate the broadcast, re-registering it from scratch - b, err := initBroadcast(c, m.name, m.events, m.net, m.broadcastMaker, m.rng) + b, err := initBroadcast(c, m.name, m.events, m.net, m.broadcastMaker, m.rng, + m.st.MessageReceive) if err != nil { return err } diff --git a/channels/send.go b/channels/send.go index ecbb607fba1810c2c92d1f45a2096599fef4fe12..86c6b92e24b2c2d10c2a99c8b92328a36e7eb4d2 100644 --- a/channels/send.go +++ b/channels/send.go @@ -10,6 +10,7 @@ package channels import ( "gitlab.com/elixxir/client/broadcast" "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/rounds" cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" @@ -31,15 +32,17 @@ const ( // it will always be possible to send a payload of 802 bytes at minimum func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, msg []byte, validUntil time.Duration, params cmix.CMIXParams) ( - cryptoChannel.MessageID, id.Round, ephemeral.Id, error) { + cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) { //find the channel ch, err := m.getChannel(channelID) if err != nil { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err } var msgId cryptoChannel.MessageID + var usrMsg *UserMessage + var chMsg *ChannelMessage //Note: we are not checking check if message is too long before trying to //find a round @@ -47,7 +50,7 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, assemble := func(rid id.Round) ([]byte, error) { //Build the message - chMsg := &ChannelMessage{ + chMsg = &ChannelMessage{ Lease: validUntil.Nanoseconds(), RoundID: uint64(rid), PayloadType: uint32(messageType), @@ -72,7 +75,7 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, //Build the user message validationSig, unameLease := m.name.GetChannelValidationSignature() - usrMsg := &UserMessage{ + usrMsg = &UserMessage{ Message: chMsgSerial, ValidationSignature: validationSig, Signature: messageSig, @@ -97,11 +100,16 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, return usrMsgSerialSized, nil } - // TODO: send the send message over to reception manually so it is added to - // the database early This requires an entire project in order to track - // round state. - rid, ephid, err := ch.broadcast.BroadcastWithAssembler(assemble, params) - return msgId, rid.ID, ephid, err + r, ephid, err := ch.broadcast.BroadcastWithAssembler(assemble, params) + if err != nil { + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err + } + m.st.send(channelID, &userMessageInternal{ + userMessage: usrMsg, + channelMessage: chMsg, + messageID: msgId, + }, r) + return msgId, r, ephid, err } // SendAdminGeneric is used to send a raw message over a channel encrypted @@ -111,21 +119,23 @@ func (m *manager) SendGeneric(channelID *id.ID, messageType MessageType, // return an error. The message must be at most 510 bytes long. func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, messageType MessageType, msg []byte, validUntil time.Duration, - params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id, + params cmix.CMIXParams) (cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) { //find the channel ch, err := m.getChannel(channelID) if err != nil { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err } //verify the private key is correct if ch.broadcast.Get().RsaPubKey.N.Cmp(privKey.GetPublic().N) != 0 { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, WrongPrivateKey + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, + WrongPrivateKey } var msgId cryptoChannel.MessageID + var chMsg *ChannelMessage //Note: we are not checking check if message is too long before trying to //find a round @@ -133,7 +143,7 @@ func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, assemble := func(rid id.Round) ([]byte, error) { //Build the message - chMsg := &ChannelMessage{ + chMsg = &ChannelMessage{ Lease: validUntil.Nanoseconds(), RoundID: uint64(rid), PayloadType: uint32(messageType), @@ -163,12 +173,11 @@ func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, return chMsgSerialSized, nil } - // TODO: send the send message over to reception manually so it is added to - // the database early. This requires an entire project in order to track - // round state. - rid, ephid, err := ch.broadcast.BroadcastAsymmetricWithAssembler(privKey, + r, ephid, err := ch.broadcast.BroadcastAsymmetricWithAssembler(privKey, assemble, params) - return msgId, rid.ID, ephid, err + + m.st.sendAdmin(channelID, chMsg, msgId, r) + return msgId, r, ephid, err } // SendMessage is used to send a formatted message over a channel. @@ -177,7 +186,7 @@ func (m *manager) SendAdminGeneric(privKey *rsa.PrivateKey, channelID *id.ID, // it will always be possible to send a payload of 798 bytes at minimum func (m *manager) SendMessage(channelID *id.ID, msg string, validUntil time.Duration, params cmix.CMIXParams) ( - cryptoChannel.MessageID, id.Round, ephemeral.Id, error) { + cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) { txt := &CMIXChannelText{ Version: cmixChannelTextVersion, Text: msg, @@ -186,7 +195,7 @@ func (m *manager) SendMessage(channelID *id.ID, msg string, txtMarshaled, err := proto.Marshal(txt) if err != nil { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err } return m.SendGeneric(channelID, Text, txtMarshaled, validUntil, params) @@ -200,8 +209,8 @@ func (m *manager) SendMessage(channelID *id.ID, msg string, // post the message as a normal message and not a reply. func (m *manager) SendReply(channelID *id.ID, msg string, replyTo cryptoChannel.MessageID, validUntil time.Duration, - params cmix.CMIXParams) (cryptoChannel.MessageID, id.Round, ephemeral.Id, - error) { + params cmix.CMIXParams) (cryptoChannel.MessageID, rounds.Round, + ephemeral.Id, error) { txt := &CMIXChannelText{ Version: cmixChannelTextVersion, Text: msg, @@ -210,7 +219,7 @@ func (m *manager) SendReply(channelID *id.ID, msg string, txtMarshaled, err := proto.Marshal(txt) if err != nil { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err } return m.SendGeneric(channelID, Text, txtMarshaled, validUntil, params) @@ -222,10 +231,10 @@ func (m *manager) SendReply(channelID *id.ID, msg string, // Clients will drop the reaction if they do not recognize the reactTo message func (m *manager) SendReaction(channelID *id.ID, reaction string, reactTo cryptoChannel.MessageID, params cmix.CMIXParams) ( - cryptoChannel.MessageID, id.Round, ephemeral.Id, error) { + cryptoChannel.MessageID, rounds.Round, ephemeral.Id, error) { if err := ValidateReaction(reaction); err != nil { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err } react := &CMIXChannelReaction{ @@ -236,7 +245,7 @@ func (m *manager) SendReaction(channelID *id.ID, reaction string, reactMarshaled, err := proto.Marshal(react) if err != nil { - return cryptoChannel.MessageID{}, 0, ephemeral.Id{}, err + return cryptoChannel.MessageID{}, rounds.Round{}, ephemeral.Id{}, err } return m.SendGeneric(channelID, Reaction, reactMarshaled, ValidForever, diff --git a/channels/sendTracker.go b/channels/sendTracker.go index f4878b8b9273ea6434c6c32e677679304acc0a09..30a36545ef451751065f36daf123721a4b060fc6 100644 --- a/channels/sendTracker.go +++ b/channels/sendTracker.go @@ -1,80 +1,242 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + package channels import ( + "encoding/json" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/storage/versioned" cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/xx_network/primitives/id" "sync" "time" ) -type trackedRound struct { - msgID cryptoChannel.MessageID - channelID *id.ID -} +const ( + sendTrackerStorageKey = "sendTrackerStorageKey" + sendTrackerStorageVersion = 0 + getRoundResultsTimeout = 60 * time.Second + // number of times it will attempt to get round status before the round + // is assumed to have failed. Tracking per round does not persist across + // runs + maxChecks = 3 +) -type trackedMessage struct { - roundID id.Round - channelID *id.ID +type tracked struct { + MsgID cryptoChannel.MessageID + ChannelID *id.ID + RoundID id.Round } +// the sendTracker tracks outbound messages and denotes when they are delivered +// to the event model. It also captures incoming messages and in the event they +// were sent by this user diverts them as status updates on the previously sent +// messages type sendTracker struct { - byRound map[id.Round][]trackedRound - - byMessageID map[cryptoChannel.MessageID]trackedMessage + byRound map[id.Round][]*tracked - mux sync.Mutex + byMessageID map[cryptoChannel.MessageID]*tracked - em EventModel + mux sync.RWMutex - myUsername string + trigger triggerEventFunc + adminTrigger triggerAdminEventFunc + updateStatus updateStatusFunc net Client + + kv *versioned.KV +} + +// messageReceiveFunc is a function type for sendTracker.MessageReceive so it +// can be mocked for testing where used +type messageReceiveFunc func(messageID cryptoChannel.MessageID) bool + +// loadSendTracker loads a sent tracker, restoring from disk. It will register a +// function with the cmix client, delayed on when the network goes healthy, +// which will attempt to discover the status of all rounds that are outstanding. +func loadSendTracker(net Client, kv *versioned.KV, trigger triggerEventFunc, + adminTrigger triggerAdminEventFunc, + updateStatus updateStatusFunc) *sendTracker { + st := &sendTracker{ + byRound: make(map[id.Round][]*tracked), + byMessageID: make(map[cryptoChannel.MessageID]*tracked), + trigger: trigger, + adminTrigger: adminTrigger, + updateStatus: updateStatus, + net: net, + kv: kv, + } + + /*if err := st.load(); !kv.Exists(err){ + jww.FATAL.Panicf("failed to load sent tracker: %+v", err) + }*/ + st.load() + + //register to check all outstanding rounds when the network becomes healthy + var callBackID uint64 + callBackID = net.AddHealthCallback(func(f bool) { + if !f { + return + } + net.RemoveHealthCallback(callBackID) + for rid := range st.byRound { + rr := &roundResults{ + round: rid, + st: st, + } + st.net.GetRoundResults(getRoundResultsTimeout, rr.callback, rr.round) + } + }) + + return st +} + +// store writes the list of rounds that have been +func (st *sendTracker) store() error { + data, err := json.Marshal(&st.byRound) + if err != nil { + return err + } + return st.kv.Set(sendTrackerStorageKey, &versioned.Object{ + Version: sendTrackerStorageVersion, + Timestamp: time.Now(), + Data: data, + }) } -func (sr *sendTracker) Send(channelID *id.ID, - messageID cryptoChannel.MessageID, myUsername string, - text string, timestamp time.Time, lease time.Duration, - round rounds.Round) { +// load will get the stored rounds to be checked from disk and builds +// internal datastructures +func (st *sendTracker) load() error { + obj, err := st.kv.Get(sendTrackerStorageKey, sendTrackerStorageVersion) + if err != nil { + return err + } + + err = json.Unmarshal(obj.Data, &st.byRound) + if err != nil { + return err + } + + for rid := range st.byRound { + roundList := st.byRound[rid] + for j := range roundList { + st.byMessageID[roundList[j].MsgID] = roundList[j] + } + } + return nil +} + +// send tracks a generic send message +func (st *sendTracker) send(channelID *id.ID, + umi *userMessageInternal, round rounds.Round) { + st.handleSend(channelID, umi.GetMessageID(), round) + go st.trigger(channelID, umi, + receptionID.EphemeralIdentity{}, round, Sent) +} +// sendAdmin tracks a generic sendAdmin message +func (st *sendTracker) sendAdmin(channelID *id.ID, + cm *ChannelMessage, msgID cryptoChannel.MessageID, round rounds.Round) { + st.handleSend(channelID, msgID, round) + go st.adminTrigger(channelID, cm, msgID, + receptionID.EphemeralIdentity{}, round, Sent) } -func (sr *sendTracker) handleSend(channelID *id.ID, +// handleSend does the nity gritty of editing internal structures +func (st *sendTracker) handleSend(channelID *id.ID, messageID cryptoChannel.MessageID, round rounds.Round) { - sr.mux.Lock() - defer sr.mux.Unlock() + st.mux.Lock() + defer st.mux.Unlock() //skip if already added - _, existsMessage := sr.byMessageID[messageID] + _, existsMessage := st.byMessageID[messageID] if existsMessage { return } + t := &tracked{messageID, channelID, round.ID} + //add the roundID - roundsList, existsRound := sr.byRound[round.ID] - sr.byRound[round.ID] = append(roundsList, trackedRound{messageID, - channelID}) + roundsList, existsRound := st.byRound[round.ID] + st.byRound[round.ID] = append(roundsList, t) //add the round - sr.byMessageID[messageID] = trackedMessage{round.ID, - channelID} + st.byMessageID[messageID] = t if !existsRound { - callback := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]cmix.RoundResult) { + rr := &roundResults{ + round: round.ID, + st: st, + } + st.net.GetRoundResults(getRoundResultsTimeout, rr.callback, rr.round) + } + + //store the changed list to disk + err := st.store() + if err != nil { + jww.FATAL.Panicf(err.Error()) + } +} + +// MessageReceive is used when a message is received to check if the message +// was sent by this user. If it was, the correct signal is sent to the event +// model and the function returns true, notifying the caller to not process +// the message +func (st *sendTracker) MessageReceive(messageID cryptoChannel.MessageID) bool { + st.mux.RLock() + //skip if already added + _, existsMessage := st.byMessageID[messageID] + st.mux.RUnlock() + if existsMessage { + return false + } + + st.mux.Lock() + defer st.mux.Unlock() + msgData, existsMessage := st.byMessageID[messageID] + if existsMessage { + return false + } + + delete(st.byMessageID, messageID) + + roundList := st.byRound[msgData.RoundID] + if len(roundList) == 1 { + delete(st.byRound, msgData.RoundID) + } else { + newRoundList := make([]*tracked, len(roundList)-1) + for i := range roundList { + if !roundList[i].MsgID.Equals(messageID) { + newRoundList = append(newRoundList, roundList[i]) + } } - sr.net.GetRoundResults(60 * time.Second) + st.byRound[msgData.RoundID] = newRoundList } + return true } +// roundResults represents a round which results are waiting on from the cmix layer type roundResults struct { - round id.Round - st *sendTracker + round id.Round + st *sendTracker + numChecks uint } -func (rr roundResults) callback(allRoundsSucceeded, timedOut bool, rounds map[id.Round]cmix.RoundResult) { +// callback is called when results are known about a round. it will re-trigger +// the wait if it fails up to 'maxChecks' times. +func (rr *roundResults) callback(allRoundsSucceeded, timedOut bool, rounds map[id.Round]cmix.RoundResult) { + rr.st.mux.Lock() //if the message was already handled, do nothing @@ -84,50 +246,38 @@ func (rr roundResults) callback(allRoundsSucceeded, timedOut bool, rounds map[id return } - delete(rr.st.byRound, rr.round) - - for i := range registered { - delete(rr.st.byMessageID, registered[i].msgID) + status := Delivered + if !allRoundsSucceeded { + status = Failed } - rr.st.mux.Unlock() - - for i := range registered { - rr.st.eventModel. - - } + if timedOut { + if rr.numChecks >= maxChecks { + jww.WARN.Printf("Channel messages sent on %d assumed to "+ + "have failed after %d attempts to get round status", rr.round, + maxChecks) + status = Failed + } else { + rr.numChecks++ -} + rr.st.mux.Unlock() -//deleteUnsafe deletes a tracked message from the database -func (sr *sendTracker) deleteUnsafe(channelID *id.ID, - messageID cryptoChannel.MessageID, round rounds.Round) { - sr.mux.Lock() - defer sr.mux.Unlock() + //retry if timed out + go rr.st.net.GetRoundResults(getRoundResultsTimeout, rr.callback, []id.Round{rr.round}...) + return + } - //skip if already added - _, existsMessage := sr.byMessageID[messageID] - if existsMessage { - return } - //add the roundID - roundsList, existsRound := sr.byRound[round.ID] - sr.byRound[round.ID] = append(roundsList, trackedRound{messageID, - channelID}) + delete(rr.st.byRound, rr.round) - //add the round - sr.byMessageID[messageID] = trackedMessage{round.ID, - channelID} + for i := range registered { + delete(rr.st.byMessageID, registered[i].MsgID) + } - if !existsRound { - callback := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]cmix.RoundResult) { - sr.mux.Lock() - defer sr.mux.Unlock() + rr.st.mux.Unlock() - if !allRoundsSucceeded - } - sr.net.GetRoundResults(60 * time.Second) + for i := range registered { + go rr.st.updateStatus(registered[i].MsgID, status) } - } diff --git a/channels/send_test.go b/channels/send_test.go index 4657041dabb0dbbad520b1caa08011025e22d820..2e2985ccaf12f6bd6c843486bf07d48f3b65c76b 100644 --- a/channels/send_test.go +++ b/channels/send_test.go @@ -11,8 +11,11 @@ import ( "bytes" "crypto/ed25519" "github.com/golang/protobuf/proto" + "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/storage/versioned" cryptoChannel "gitlab.com/elixxir/crypto/channel" + "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/crypto/csprng" "testing" "time" @@ -141,6 +144,15 @@ func TestSendGeneric(t *testing.T) { m := &manager{ channels: make(map[id.ID]*joinedChannel), name: nameService, + st: loadSendTracker(&mockBroadcastClient{}, + versioned.NewKV(ekv.MakeMemstore()), func(chID *id.ID, + umi *userMessageInternal, + receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(chID *id.ID, cm *ChannelMessage, + messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(messageID cryptoChannel.MessageID, status SentStatus) {}), } channelID := new(id.ID) @@ -212,6 +224,15 @@ func TestAdminGeneric(t *testing.T) { m := &manager{ channels: make(map[id.ID]*joinedChannel), name: nameService, + st: loadSendTracker(&mockBroadcastClient{}, + versioned.NewKV(ekv.MakeMemstore()), func(chID *id.ID, + umi *userMessageInternal, + receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(chID *id.ID, cm *ChannelMessage, + messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(messageID cryptoChannel.MessageID, status SentStatus) {}), } messageType := Text @@ -282,6 +303,15 @@ func TestSendMessage(t *testing.T) { m := &manager{ channels: make(map[id.ID]*joinedChannel), name: nameService, + st: loadSendTracker(&mockBroadcastClient{}, + versioned.NewKV(ekv.MakeMemstore()), func(chID *id.ID, + umi *userMessageInternal, + receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(chID *id.ID, cm *ChannelMessage, + messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(messageID cryptoChannel.MessageID, status SentStatus) {}), } channelID := new(id.ID) @@ -361,6 +391,15 @@ func TestSendReply(t *testing.T) { m := &manager{ channels: make(map[id.ID]*joinedChannel), name: nameService, + st: loadSendTracker(&mockBroadcastClient{}, + versioned.NewKV(ekv.MakeMemstore()), func(chID *id.ID, + umi *userMessageInternal, + receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(chID *id.ID, cm *ChannelMessage, + messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(messageID cryptoChannel.MessageID, status SentStatus) {}), } channelID := new(id.ID) @@ -440,6 +479,15 @@ func TestSendReaction(t *testing.T) { m := &manager{ channels: make(map[id.ID]*joinedChannel), name: nameService, + st: loadSendTracker(&mockBroadcastClient{}, + versioned.NewKV(ekv.MakeMemstore()), func(chID *id.ID, + umi *userMessageInternal, + receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(chID *id.ID, cm *ChannelMessage, + messageID cryptoChannel.MessageID, receptionID receptionID.EphemeralIdentity, + round rounds.Round, status SentStatus) { + }, func(messageID cryptoChannel.MessageID, status SentStatus) {}), } channelID := new(id.ID) diff --git a/channels/userListener.go b/channels/userListener.go index 959e5b487aaa22f8fb2999a51b5b79e7d4e2ecd7..227afb742a189d84c38cb2f0cf9cdb43747ba159 100644 --- a/channels/userListener.go +++ b/channels/userListener.go @@ -21,20 +21,21 @@ import ( // the userListener adheres to the [broadcast.ListenerFunc] interface and is // used when user messages are received on the channel type userListener struct { - name NameService - chID *id.ID - trigger triggerEventFunc + name NameService + chID *id.ID + trigger triggerEventFunc + checkSent messageReceiveFunc } // Listen is called when a message is received for the user listener -func (gul *userListener) Listen(payload []byte, +func (ul *userListener) Listen(payload []byte, receptionID receptionID.EphemeralIdentity, round rounds.Round) { //Remove the padding payloadUnpadded, err := broadcast.DecodeSizedBroadcast(payload) if err != nil { jww.WARN.Printf("Failed to strip the padding on User Message "+ - "on channel %s", gul.chID) + "on channel %s", ul.chID) return } @@ -42,7 +43,7 @@ func (gul *userListener) Listen(payload []byte, umi, err := unmarshalUserMessageInternal(payloadUnpadded) if err != nil { jww.WARN.Printf("Failed to unmarshal User Message on "+ - "channel %s", gul.chID) + "channel %s", ul.chID) return } @@ -50,13 +51,18 @@ func (gul *userListener) Listen(payload []byte, cm := umi.GetChannelMessage() msgID := umi.GetMessageID() + //check if we sent the message, ignore triggering if we sent + if ul.checkSent(msgID) { + return + } + /*CRYPTOGRAPHICALLY RELEVANT CHECKS*/ // check the round to ensure the message is not a replay if id.Round(cm.RoundID) != round.ID { jww.WARN.Printf("The round message %s send on %d referenced "+ "(%d) was not the same as the round the message was found on (%d)", - msgID, gul.chID, cm.RoundID, round.ID) + msgID, ul.chID, cm.RoundID, round.ID) return } @@ -65,17 +71,17 @@ func (gul *userListener) Listen(payload []byte, if !usernameLeaseEnd.After(round.Timestamps[states.QUEUED]) { jww.WARN.Printf("Message %s on channel %s purportedly from %s "+ "has an expired lease, ended %s, round %d was sent at %s", msgID, - gul.chID, um.Username, usernameLeaseEnd, round.ID, + ul.chID, um.Username, usernameLeaseEnd, round.ID, round.Timestamps[states.QUEUED]) return } // check that the signature from the nameserver is valid - if !gul.name.ValidateChannelMessage(um.Username, + if !ul.name.ValidateChannelMessage(um.Username, time.Unix(0, um.UsernameLease), um.ECCPublicKey, um.ValidationSignature) { jww.WARN.Printf("Message %s on channel %s purportedly from %s "+ "failed the check of its Name Server with signature %v", msgID, - gul.chID, um.Username, um.ValidationSignature) + ul.chID, um.Username, um.ValidationSignature) return } @@ -83,14 +89,14 @@ func (gul *userListener) Listen(payload []byte, if !ed25519.Verify(um.ECCPublicKey, um.Message, um.Signature) { jww.WARN.Printf("Message %s on channel %s purportedly from %s "+ "failed its user signature with signature %v", msgID, - gul.chID, um.Username, um.Signature) + ul.chID, um.Username, um.Signature) return } //TODO: Processing of the message relative to admin commands will be here //Submit the message to the event model for listening - gul.trigger(gul.chID, umi, receptionID, round) + ul.trigger(ul.chID, umi, receptionID, round, Delivered) return } diff --git a/channels/userListener_test.go b/channels/userListener_test.go index 78ec0120f26cc3f1db2c69c665a757ba3b908cf4..fcf69fbffd92cf60e1098fbc40a5924e8c1abeeb 100644 --- a/channels/userListener_test.go +++ b/channels/userListener_test.go @@ -35,7 +35,7 @@ type triggerEventDummy struct { } func (ted *triggerEventDummy) triggerEvent(chID *id.ID, umi *userMessageInternal, - receptionID receptionID.EphemeralIdentity, round rounds.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round, sent SentStatus) { ted.gotData = true ted.chID = chID @@ -102,9 +102,10 @@ func TestUserListener_Listen(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener @@ -195,9 +196,10 @@ func TestUserListener_Listen_BadUserSig(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener @@ -266,9 +268,10 @@ func TestUserListener_Listen_BadValidSig(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener @@ -336,9 +339,10 @@ func TestUserListener_Listen_BadUnameTs(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener @@ -407,9 +411,10 @@ func TestUserListener_Listen_BadRound(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener @@ -445,9 +450,10 @@ func TestUserListener_Listen_BadMessage(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener @@ -518,9 +524,10 @@ func TestUserListener_Listen_BadSizedBroadcast(t *testing.T) { dummy := &triggerEventDummy{} al := userListener{ - chID: chID, - name: ns, - trigger: dummy.triggerEvent, + chID: chID, + name: ns, + trigger: dummy.triggerEvent, + checkSent: func(messageID cryptoChannel.MessageID) bool { return false }, } //call the listener diff --git a/cmd/root.go b/cmd/root.go index 66240fdb9b20a332daaca3d7a77ff24a9186a6e6..af4fa2ac28c5c716b82b7b7fa42e3f0127370040 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -636,14 +636,8 @@ func acceptChannelVerified(user *xxdk.E2e, recipientID *id.ID, rid := acceptChannel(user, recipientID) // Monitor rounds for results - err := user.GetCmix().GetRoundResults(roundTimeout, + user.GetCmix().GetRoundResults(roundTimeout, makeVerifySendsCallback(retryChan, done), rid) - if err != nil { - jww.DEBUG.Printf("Could not verify "+ - "confirmation message for relationship with %s were sent "+ - "successfully, resending messages...", recipientID) - continue - } select { case <-retryChan: @@ -677,14 +671,9 @@ func requestChannelVerified(user *xxdk.E2e, } // Monitor rounds for results - err = user.GetCmix().GetRoundResults(roundTimeout, + user.GetCmix().GetRoundResults(roundTimeout, makeVerifySendsCallback(retryChan, done), rid) - if err != nil { - jww.DEBUG.Printf("Could not verify auth request was sent " + - "successfully, resending...") - continue - } select { case <-retryChan: @@ -716,14 +705,9 @@ func resetChannelVerified(user *xxdk.E2e, recipientContact contact.Contact, } // Monitor rounds for results - err = user.GetCmix().GetRoundResults(roundTimeout, + user.GetCmix().GetRoundResults(roundTimeout, makeVerifySendsCallback(retryChan, done), rid) - if err != nil { - jww.DEBUG.Printf("Could not verify auth request was sent " + - "successfully, resending...") - continue - } select { case <-retryChan: diff --git a/cmd/utils.go b/cmd/utils.go index 3752dcc7e9e548181b3982db440c39c1a5d3abb5..ad6de14de40c1b265dca23bb9b996c0e8f4b5e9d 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -53,13 +53,8 @@ func verifySendSuccess(user *xxdk.E2e, paramsE2E e2e.Params, } // Monitor rounds for results - err := user.GetCmix().GetRoundResults( + user.GetCmix().GetRoundResults( paramsE2E.CMIXParams.Timeout, f, roundIDs...) - if err != nil { - jww.DEBUG.Printf("Could not verify messages were sent " + - "successfully, resending messages...") - return false - } select { case <-retryChan: diff --git a/cmix/interface.go b/cmix/interface.go index 53309dc127d101595545a31df92b1df6ba49fc56..f6572fb1dbdf6811052cb5d541e05286288c20cb 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -254,7 +254,7 @@ type Client interface { // GetRoundResults adjudicates on the rounds requested. Checks if they are // older rounds or in progress rounds. GetRoundResults(timeout time.Duration, roundCallback RoundEventCallback, - roundList ...id.Round) error + roundList ...id.Round) // LookupHistoricalRound looks up the passed historical round on the network. // GetRoundResults does this lookup when needed, generally that is diff --git a/cmix/results.go b/cmix/results.go index 7c510525ea6bb0ba6bd073892327dc2c8f421702..916bdb206610fc2c98ed0509acba705d6c9c1a33 100644 --- a/cmix/results.go +++ b/cmix/results.go @@ -65,19 +65,19 @@ type RoundEventCallback func(allRoundsSucceeded, timedOut bool, rounds map[id.Ro // GetRoundResults adjudicates on the rounds requested. Checks if they are // older rounds or in progress rounds. func (c *client) GetRoundResults(timeout time.Duration, - roundCallback RoundEventCallback, roundList ...id.Round) error { + roundCallback RoundEventCallback, roundList ...id.Round) { jww.INFO.Printf("GetRoundResults(%v, %s)", roundList, timeout) sendResults := make(chan ds.EventReturn, len(roundList)) - return c.getRoundResults(roundList, timeout, roundCallback, + c.getRoundResults(roundList, timeout, roundCallback, sendResults) } // Helper function which does all the logic for GetRoundResults func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, - roundCallback RoundEventCallback, sendResults chan ds.EventReturn) error { + roundCallback RoundEventCallback, sendResults chan ds.EventReturn) { networkInstance := c.GetInstance() @@ -221,6 +221,4 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, } }() - - return nil } diff --git a/connect/authenticated.go b/connect/authenticated.go index 45f8dac4a97c204d9c25e87d07bbe86558933328..c5493deb0da13d227017e6695789f9317da2c830 100644 --- a/connect/authenticated.go +++ b/connect/authenticated.go @@ -138,12 +138,8 @@ func connectWithAuthentication(conn Connection, timeStart time.Time, // Track the result of the round(s) we sent the // identity authentication message on - err = net.GetRoundResults(remainingTime, + net.GetRoundResults(remainingTime, roundCb, sendReport.RoundList...) - if err != nil { - return nil, errors.Errorf("could not track rounds for successful " + - "identity confirmation message delivery") - } // Block waiting for confirmation of the round(s) success (or timeout jww.DEBUG.Printf("AuthenticatedConnection waiting for authenticated "+ "connection with %s to be established...", recipient.ID.String()) diff --git a/connect/utils_test.go b/connect/utils_test.go index 53d19ee6e1c21cb62671b8886ce3ec731f88adb9..86f190808b95d44e80cff854d87be643339f2f0c 100644 --- a/connect/utils_test.go +++ b/connect/utils_test.go @@ -195,9 +195,8 @@ func (m *mockCmix) HasNode(*id.ID) bool func (m *mockCmix) NumRegisteredNodes() int { return 24 } func (m *mockCmix) TriggerNodeRegistration(*id.ID) {} -func (m *mockCmix) GetRoundResults(_ time.Duration, roundCallback cmix.RoundEventCallback, _ ...id.Round) error { +func (m *mockCmix) GetRoundResults(_ time.Duration, roundCallback cmix.RoundEventCallback, _ ...id.Round) { roundCallback(true, false, nil) - return nil } func (m *mockCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { return nil } diff --git a/dummy/mockCmix_test.go b/dummy/mockCmix_test.go index 05e3ed20272c987ef8f5826ab8bb91e8243b147c..96bab83384e0d57d84d50c5cac95ce71bc9fb7df 100644 --- a/dummy/mockCmix_test.go +++ b/dummy/mockCmix_test.go @@ -177,7 +177,7 @@ func (m mockCmix) TriggerNodeRegistration(nid *id.ID) { panic("implement me") } -func (m mockCmix) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) error { +func (m mockCmix) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) { //TODO implement me panic("implement me") } diff --git a/e2e/fpGenerator_test.go b/e2e/fpGenerator_test.go index fb039e83bb36935bff6ca5393ddb963591f815e4..249dc5bba79fc07fb38439a64b4266a658215ee0 100644 --- a/e2e/fpGenerator_test.go +++ b/e2e/fpGenerator_test.go @@ -170,8 +170,7 @@ func (m *mockFpgCmix) RemoveHealthCallback(uint64) func (m *mockFpgCmix) HasNode(*id.ID) bool { return false } func (m *mockFpgCmix) NumRegisteredNodes() int { return 0 } func (m *mockFpgCmix) TriggerNodeRegistration(*id.ID) {} -func (m *mockFpgCmix) GetRoundResults(time.Duration, cmix.RoundEventCallback, ...id.Round) error { - return nil +func (m *mockFpgCmix) GetRoundResults(time.Duration, cmix.RoundEventCallback, ...id.Round) { } func (m *mockFpgCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { return nil } func (m *mockFpgCmix) SendToAny(func(host *connect.Host) (interface{}, error), *stoppable.Single) (interface{}, error) { diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index fda69f2f515a2f415c8f3bd3c633d119aa7b3e28..0b9fd388668ff77f9cbfed8bc7dc6dc6572d3eeb 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -302,8 +302,7 @@ func (m *mockNetManager) NumRegisteredNodes() int { func (m *mockNetManager) TriggerNodeRegistration(nid *id.ID) {} func (m *mockNetManager) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, - roundList ...id.Round) error { - return nil + roundList ...id.Round) { } func (m *mockNetManager) LookupHistoricalRound( diff --git a/e2e/utils_test.go b/e2e/utils_test.go index 526ca4f7dc5fd34abd07405c4fbb185a10dd0d46..6d8835b8c51edaf47ca2821521e9e55de95f5db3 100644 --- a/e2e/utils_test.go +++ b/e2e/utils_test.go @@ -250,8 +250,7 @@ func (m *mockCmix) RemoveHealthCallback(uint64) {} func (m *mockCmix) HasNode(*id.ID) bool { return true } func (m *mockCmix) NumRegisteredNodes() int { return 0 } func (m *mockCmix) TriggerNodeRegistration(*id.ID) {} -func (m *mockCmix) GetRoundResults(time.Duration, cmix.RoundEventCallback, ...id.Round) error { - return nil +func (m *mockCmix) GetRoundResults(time.Duration, cmix.RoundEventCallback, ...id.Round) { } func (m *mockCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { return nil } func (m *mockCmix) SendToAny(func(host *connect.Host) (interface{}, error), *stoppable.Single) (interface{}, error) { diff --git a/fileTransfer/connect/utils_test.go b/fileTransfer/connect/utils_test.go index 524aa47a733c0b4b2d9f3dabe0f992e5673e84b0..83d75d020beccafe57c72331b0595a05253c0470 100644 --- a/fileTransfer/connect/utils_test.go +++ b/fileTransfer/connect/utils_test.go @@ -189,9 +189,8 @@ func (m *mockCmix) NumRegisteredNodes() int { panic("implement me") } func (m *mockCmix) TriggerNodeRegistration(*id.ID) { panic("implement me") } func (m *mockCmix) GetRoundResults(_ time.Duration, - roundCallback cmix.RoundEventCallback, _ ...id.Round) error { + roundCallback cmix.RoundEventCallback, _ ...id.Round) { go roundCallback(true, false, map[id.Round]cmix.RoundResult{42: {}}) - return nil } func (m *mockCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { diff --git a/fileTransfer/e2e/utils_test.go b/fileTransfer/e2e/utils_test.go index e759a51ecce01cefc2fd59cc0b96fb8923c6ac68..a9e17b912810dbf24bf640d518bf2c434e95181b 100644 --- a/fileTransfer/e2e/utils_test.go +++ b/fileTransfer/e2e/utils_test.go @@ -191,9 +191,8 @@ func (m *mockCmix) NumRegisteredNodes() int { panic("implement me") } func (m *mockCmix) TriggerNodeRegistration(*id.ID) { panic("implement me") } func (m *mockCmix) GetRoundResults(_ time.Duration, - roundCallback cmix.RoundEventCallback, _ ...id.Round) error { + roundCallback cmix.RoundEventCallback, _ ...id.Round) { go roundCallback(true, false, map[id.Round]cmix.RoundResult{42: {}}) - return nil } func (m *mockCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { diff --git a/fileTransfer/groupChat/utils_test.go b/fileTransfer/groupChat/utils_test.go index 2455982c49fcff3eeadcb5080de3cb52017fb981..76b2ba1285a05381f719089e03bca00248182218 100644 --- a/fileTransfer/groupChat/utils_test.go +++ b/fileTransfer/groupChat/utils_test.go @@ -186,9 +186,8 @@ func (m *mockCmix) NumRegisteredNodes() int { panic("implement me") } func (m *mockCmix) TriggerNodeRegistration(*id.ID) { panic("implement me") } func (m *mockCmix) GetRoundResults(_ time.Duration, - roundCallback cmix.RoundEventCallback, _ ...id.Round) error { + roundCallback cmix.RoundEventCallback, _ ...id.Round) { go roundCallback(true, false, map[id.Round]cmix.RoundResult{42: {}}) - return nil } func (m *mockCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { diff --git a/fileTransfer/manager.go b/fileTransfer/manager.go index 5c75c7a6c7f3985f105a4880ec0c988cd39ee746..904af8664f69bf8b357dd4cf0a498f044ef964b8 100644 --- a/fileTransfer/manager.go +++ b/fileTransfer/manager.go @@ -156,7 +156,7 @@ type Cmix interface { AddHealthCallback(f func(bool)) uint64 RemoveHealthCallback(uint64) GetRoundResults(timeout time.Duration, - roundCallback cmix.RoundEventCallback, roundList ...id.Round) error + roundCallback cmix.RoundEventCallback, roundList ...id.Round) } // Storage interface matches a subset of the storage.Session methods used by the diff --git a/fileTransfer/send.go b/fileTransfer/send.go index 25f92296bba0c90b885d4ac763cdc51084b1ff3e..2ddf9e1a051229e732309fe8ed7d10dc91047924 100644 --- a/fileTransfer/send.go +++ b/fileTransfer/send.go @@ -134,7 +134,7 @@ func (m *manager) sendCmix(packet []store.Part) { } } - err = m.cmix.GetRoundResults( + m.cmix.GetRoundResults( roundResultsTimeout, m.roundResultsCallback(validParts), rid.ID) } diff --git a/fileTransfer/utils_test.go b/fileTransfer/utils_test.go index 01efb4fc0bb2be98bd2884daad9ecaca87923c53..96c400a9dfdaa63cb84242b083e41325c25606d1 100644 --- a/fileTransfer/utils_test.go +++ b/fileTransfer/utils_test.go @@ -242,9 +242,8 @@ func (m *mockCmix) NumRegisteredNodes() int { panic("implement me") } func (m *mockCmix) TriggerNodeRegistration(*id.ID) { panic("implement me") } func (m *mockCmix) GetRoundResults(_ time.Duration, - roundCallback cmix.RoundEventCallback, rids ...id.Round) error { + roundCallback cmix.RoundEventCallback, rids ...id.Round) { go roundCallback(true, false, map[id.Round]cmix.RoundResult{rids[0]: {}}) - return nil } func (m *mockCmix) LookupHistoricalRound(id.Round, rounds.RoundResultCallback) error { diff --git a/groupChat/networkManager_test.go b/groupChat/networkManager_test.go index b798551866d20e50119b4b10f377a2d7e78be471..e87b3f655bca1e2d5ccbf3d6547a25c1b1ac471d 100644 --- a/groupChat/networkManager_test.go +++ b/groupChat/networkManager_test.go @@ -170,7 +170,7 @@ func (tnm *testNetworkManager) TriggerNodeRegistration(nid *id.ID) { panic("implement me") } -func (tnm *testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) error { +func (tnm *testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) { //TODO implement me panic("implement me") } diff --git a/ud/networkManager_test.go b/ud/networkManager_test.go index aaf13c44775be47acb65d80bc2b4dea7b4aaf289..bd3630ebf9016e37535dbba6ea332c5676da4a1a 100644 --- a/ud/networkManager_test.go +++ b/ud/networkManager_test.go @@ -231,7 +231,7 @@ func (tnm *testNetworkManager) TriggerNodeRegistration(nid *id.ID) { panic("implement me") } -func (tnm *testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) error { +func (tnm *testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback cmix.RoundEventCallback, roundList ...id.Round) { //TODO implement me panic("implement me") } diff --git a/xxdk/utilsInterfaces_test.go b/xxdk/utilsInterfaces_test.go index e42ecebc506d78f185e1e039ed2440dded76c5d4..4fbf3852387c7c095934c2fbf65f388cfa046e2f 100644 --- a/xxdk/utilsInterfaces_test.go +++ b/xxdk/utilsInterfaces_test.go @@ -183,8 +183,7 @@ func (t *testNetworkManagerGeneric) GetIdentity(get *id.ID) ( return identity.TrackedID{}, nil } func (t *testNetworkManagerGeneric) GetRoundResults(timeout time.Duration, - roundCallback cmix.RoundEventCallback, roundList ...id.Round) error { - return nil + roundCallback cmix.RoundEventCallback, roundList ...id.Round) { } func (t *testNetworkManagerGeneric) HasNode(nid *id.ID) bool { return false } func (t *testNetworkManagerGeneric) IsHealthy() bool { return true }