diff --git a/README.md b/README.md index cea5fab8b9a8747cdba27356d70d8187ed49de7f..a5041e876b23c787616d2f260830f2aca19de0d3 100644 --- a/README.md +++ b/README.md @@ -179,7 +179,7 @@ Flags: (default 1) --sendDelay uint The delay between sending the messages in ms (default 500) - --sendid uiverboseRoundTrackingnt Use precanned user id (must be between 1 and + --sendid uint Use precanned user id (must be between 1 and 40, inclusive) --slowPolling bool Enables polling for all network updates and RSA signed rounds. Defaults to true (filtered updates with ECC signed rounds) if not set diff --git a/cmd/root.go b/cmd/root.go index 503b46710b7f56c276585405967b33262987347b..84f456cbccd2999f98ff3e46ff78ef714029084a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -29,6 +29,7 @@ import ( "runtime/pprof" "strconv" "strings" + "sync" "time" ) @@ -187,44 +188,48 @@ var rootCmd = &cobra.Command{ } paramsE2E := params.GetDefaultE2E() paramsUnsafe := params.GetDefaultUnsafe() - + wg := &sync.WaitGroup{} sendCnt := int(viper.GetUint("sendCount")) - //sendDelay := time.Duration(viper.GetUint("sendDelay")) - for i := 0; i < sendCnt; i++ { - go func(){ - fmt.Printf("Sending to %s: %s\n", recipientID, msgBody) - var roundIDs []id.Round - var roundTimeout time.Duration - if unsafe { - roundIDs, err = client.SendUnsafe(msg, - paramsUnsafe) - roundTimeout = paramsUnsafe.Timeout - } else { - roundIDs, _, _, err = client.SendE2E(msg, - paramsE2E) - roundTimeout = paramsE2E.Timeout - } - if err != nil { - jww.FATAL.Panicf("%+v", err) - } - - // Construct the callback function which prints out the rounds' results - f := func(allRoundsSucceeded, timedOut bool, - rounds map[id.Round]api.RoundResult) { - printRoundResults(allRoundsSucceeded, timedOut, rounds, roundIDs, msg) - } - - // Have the client report back the round results - err = errors.New("derp") - for j:=0;j<5&&err!=nil;j++{ - err = client.GetRoundResults(roundIDs, roundTimeout, f) - } - - if err != nil { - jww.FATAL.Panicf("%+v", err) - } - }() - } + wg.Add(sendCnt) + go func(){ + //sendDelay := time.Duration(viper.GetUint("sendDelay")) + for i := 0; i < sendCnt; i++ { + go func(){ + defer wg.Done() + fmt.Printf("Sending to %s: %s\n", recipientID, msgBody) + var roundIDs []id.Round + var roundTimeout time.Duration + if unsafe { + roundIDs, err = client.SendUnsafe(msg, + paramsUnsafe) + roundTimeout = paramsUnsafe.Timeout + } else { + roundIDs, _, _, err = client.SendE2E(msg, + paramsE2E) + roundTimeout = paramsE2E.Timeout + } + if err != nil { + jww.FATAL.Panicf("%+v", err) + } + + // Construct the callback function which prints out the rounds' results + f := func(allRoundsSucceeded, timedOut bool, + rounds map[id.Round]api.RoundResult) { + printRoundResults(allRoundsSucceeded, timedOut, rounds, roundIDs, msg) + } + + // Have the client report back the round results + err = errors.New("derp") + for j:=0;j<5&&err!=nil;j++{ + err = client.GetRoundResults(roundIDs, roundTimeout, f) + } + + if err != nil { + jww.FATAL.Panicf("%+v", err) + } + }() + } + }() // Wait until message timeout or we receive enough then exit // TODO: Actually check for how many messages we've received @@ -233,6 +238,7 @@ var rootCmd = &cobra.Command{ waitSecs := viper.GetUint("waitTimeout") waitTimeout := time.Duration(waitSecs) done := false + for !done && expectedCnt != 0 { timeoutTimer := time.NewTimer(waitTimeout * time.Second) select { @@ -255,7 +261,7 @@ var rootCmd = &cobra.Command{ if roundsNotepad != nil { roundsNotepad.INFO.Printf("\n%s", client.GetNetworkInterface().GetVerboseRounds()) } - + wg.Wait() err = client.StopNetworkFollower() if err != nil { jww.WARN.Printf( diff --git a/keyExchange/trigger.go b/keyExchange/trigger.go index e25da821f9a3485969608bd320a7f22534622255..68878349408043aa439fe45608e10141246d9cc6 100644 --- a/keyExchange/trigger.go +++ b/keyExchange/trigger.go @@ -120,6 +120,8 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, MessageType: message.KeyExchangeConfirm, } + + //send the message under the key exchange e2eParams := params.GetDefaultE2E() @@ -150,13 +152,13 @@ func handleTrigger(sess *storage.Session, net interfaces.NetworkManager, "transmit %v/%v paritions: %v round failures, %v timeouts", session, numRoundFail+numTimeOut, len(rounds), numRoundFail, numTimeOut) - sess.GetCriticalMessages().Failed(m) + sess.GetCriticalMessages().Failed(m, e2eParams) return nil } // otherwise, the transmission is a success and this should be denoted // in the session and the log - sess.GetCriticalMessages().Succeeded(m) + sess.GetCriticalMessages().Succeeded(m, e2eParams) jww.INFO.Printf("Key Negotiation transmission for %s successfully", session) diff --git a/network/message/critical.go b/network/message/critical.go index 9c5b6bff7c824a925c969459ddb683e38a1faa1d..5d4720ba595a635b4517bf94f1896936e51157a2 100644 --- a/network/message/critical.go +++ b/network/message/critical.go @@ -51,8 +51,9 @@ func (m *Manager) criticalMessages(stop *stoppable.Single) { //critical messages for msg, param, has := critMsgs.Next(); has; msg, param, has = critMsgs.Next() { go func(msg message.Send, param params.E2E) { - jww.INFO.Printf("Resending critical message to %s ", - msg.Recipient) + + jww.INFO.Printf("Resending critical message to %s with params %#v", + msg.Recipient, msg, param) //send the message rounds, _, _, err := m.SendE2E(msg, param, stop) //if the message fail to send, notify the buffer so it can be handled @@ -61,7 +62,7 @@ func (m *Manager) criticalMessages(stop *stoppable.Single) { jww.ERROR.Printf("Failed to send critical message to %s "+ " on notification of healthy network: %+v", msg.Recipient, err) - critMsgs.Failed(msg) + critMsgs.Failed(msg, param) return } //wait on the results to make sure the rounds were successful @@ -77,13 +78,13 @@ func (m *Manager) criticalMessages(stop *stoppable.Single) { "to transmit transmit %v/%v paritions on rounds %d: %v "+ "round failures, %v timeouts", msg.Recipient, numRoundFail+numTimeOut, len(rounds), rounds, numRoundFail, numTimeOut) - critMsgs.Failed(msg) + critMsgs.Failed(msg, param) return } jww.INFO.Printf("Successful resend of critical message "+ "to %s on rounds %d", msg.Recipient, rounds) - critMsgs.Succeeded(msg) + critMsgs.Succeeded(msg, param) }(msg, param) } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index ed2c1cf86242056ae7bf2b5093c296554337b3cb..bfb57a56903cb9f4f23e46cae2c59657c0d06da1 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -103,6 +103,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, jww.WARN.Printf("Failed to GetUpcomingRealtime (msgDigest: %s): %+v", msg.Digest(), err) } if bestRound == nil { + jww.WARN.Printf("Best round on send is nil") continue } diff --git a/storage/utility/e2eMessageBuffer.go b/storage/utility/e2eMessageBuffer.go index 259c6407a2c08c45b4f2e486182964031669b98c..5e4abcb3aa168332e5603899bf73e1e0facc8cdc 100644 --- a/storage/utility/e2eMessageBuffer.go +++ b/storage/utility/e2eMessageBuffer.go @@ -155,12 +155,12 @@ func (emb *E2eMessageBuffer) Next() (message.Send, params.E2E, bool) { message.Type(msg.MessageType)}, msg.Params, true } -func (emb *E2eMessageBuffer) Succeeded(m message.Send) { +func (emb *E2eMessageBuffer) Succeeded(m message.Send, p params.E2E) { emb.mb.Succeeded(e2eMessage{m.Recipient.Marshal(), - m.Payload, uint32(m.MessageType), params.E2E{}}) + m.Payload, uint32(m.MessageType), p}) } -func (emb *E2eMessageBuffer) Failed(m message.Send) { +func (emb *E2eMessageBuffer) Failed(m message.Send, p params.E2E) { emb.mb.Failed(e2eMessage{m.Recipient.Marshal(), - m.Payload, uint32(m.MessageType), params.E2E{}}) + m.Payload, uint32(m.MessageType), p}) } diff --git a/storage/utility/e2eMessageBuffer_test.go b/storage/utility/e2eMessageBuffer_test.go index 0aca79126fe53903957780f298e26ae1434597db..0fcef8fc65eb74560cce387fa250e111b79d1562 100644 --- a/storage/utility/e2eMessageBuffer_test.go +++ b/storage/utility/e2eMessageBuffer_test.go @@ -9,6 +9,7 @@ package utility import ( "encoding/json" + "fmt" "gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/params" "gitlab.com/elixxir/client/storage/versioned" @@ -112,7 +113,7 @@ func TestE2EMessageHandler_Smoke(t *testing.T) { if !exists { t.Error("Next() did not find any messages in buffer.") } - cmb.Succeeded(msg) + cmb.Succeeded(msg, params.E2E{}) if len(cmb.mb.messages) != 1 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", @@ -127,7 +128,7 @@ func TestE2EMessageHandler_Smoke(t *testing.T) { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", 0, len(cmb.mb.messages)) } - cmb.Failed(msg) + cmb.Failed(msg,params.E2E{}) if len(cmb.mb.messages) != 1 { t.Errorf("Unexpected length of buffer.\n\texpected: %d\n\trecieved: %d", @@ -138,7 +139,7 @@ func TestE2EMessageHandler_Smoke(t *testing.T) { if !exists { t.Error("Next() did not find any messages in buffer.") } - cmb.Succeeded(msg) + cmb.Succeeded(msg, params.E2E{}) msg, _, exists = cmb.Next() if exists { @@ -174,3 +175,43 @@ func makeTestE2EMessages(n int, t *testing.T) ([]e2eMessage, []message.Send) { return msgs, send } + +func TestE2EParamMarshalUnmarshal(t *testing.T) { + msg := e2eMessage{ + Recipient: id.DummyUser[:], + Payload: []byte{1,2,3,4,5,6,7,8,9}, + MessageType: 42, + Params: params.E2E{ + Type: 1, + RetryCount: 7, + CMIX: params.CMIX{ + RoundTries: 6, + Timeout: 99, + RetryDelay: -4, + }, + }, + } + + b, err := json.Marshal(&msg) + + if err!=nil{ + t.Errorf("Failed to Marshal E2eMessage") + } + + fmt.Printf("%s\n", string(b)) + + msg2 := &e2eMessage{} + + err = json.Unmarshal(b,&msg2) + + if err!=nil{ + t.Errorf("Failed to Unmarshal E2eMessage") + } + + fmt.Printf("%#v\n", msg2) + + if !reflect.DeepEqual(msg,msg2){ + t.Errorf("Unmarshaled message is not the same") + } + +} \ No newline at end of file