diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 8b72fe995a753a900d4558819ddb78187f5c7970..ed2c1cf86242056ae7bf2b5093c296554337b3cb 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -155,12 +155,9 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, timeout) if err != nil { // fixme: should we provide as a slice the whole topology? - warn, err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err) - if warn { - jww.WARN.Printf("SendCmix Failed: %+v", err) - } else { - return result, errors.WithMessagef(err, "SendCmix %s", unrecoverableError) - } + err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipient.String(), bestRound, err) + return result, errors.WithMessagef(err, "SendCmix %s", unrecoverableError) + } return result, err } diff --git a/network/message/sendCmixUtils.go b/network/message/sendCmixUtils.go index ee1bd20ec9ec6390baa342a0a8bbc3a04a5a3282..4e71f5731f775386d9ab3afa1d10316214d67b32 100644 --- a/network/message/sendCmixUtils.go +++ b/network/message/sendCmixUtils.go @@ -51,12 +51,12 @@ const unrecoverableError = "failed with an unrecoverable error" func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, session *storage.Session, nodeRegistration chan network.NodeGateway, recipientString string, bestRound *pb.RoundInfo, - err error) (recoverable bool, returnErr error) { + err error) (returnErr error) { // If the comm errors or the message fails to send, then continue retrying; // otherwise, return if it sends properly if strings.Contains(err.Error(), "try a different round.") { - return false, errors.WithMessagef(err, "Failed to send to [%s] due to "+ + return errors.WithMessagef(err, "Failed to send to [%s] due to "+ "round error with round %d, bailing...", recipientString, bestRound.ID) } else if strings.Contains(err.Error(), "Could not authenticate client. "+ @@ -72,12 +72,12 @@ func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, // Trigger go handleMissingNodeKeys(instance, nodeRegistration, []*id.ID{nodeID}) - return false, errors.WithMessagef(err, "Failed to send to [%s] via %s "+ + return errors.WithMessagef(err, "Failed to send to [%s] via %s "+ "due to failed authentication, retrying...", recipientString, firstGateway) } - return false, errors.WithMessage(err, "Failed to put cmix message") + return errors.WithMessage(err, "Failed to put cmix message") } diff --git a/network/message/sendE2E.go b/network/message/sendE2E.go index e26dcb6de4ee21a8a72bcebf2dbf7ea26220453d..5f42729f651de44afa707d10b52d2d5c83caf428 100644 --- a/network/message/sendE2E.go +++ b/network/message/sendE2E.go @@ -90,8 +90,8 @@ func (m *Manager) SendE2E(msg message.Send, param params.E2E, //end to end encrypt the cmix message msgEnc := key.Encrypt(msgCmix) - jww.INFO.Printf("E2E sending %d/%d to %s with msgDigest: %s", - i+i, len(partitions), msg.Recipient, msgEnc.Digest()) + jww.INFO.Printf("E2E sending %d/%d to %s with msgDigest: %s, key fp: %s", + i+i, len(partitions), msg.Recipient, msgEnc.Digest(), key.Fingerprint()) //send the cmix message, each partition in its own thread wg.Add(1) diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index ade627067eb49a5cfdaa1387eff8c4fe37b81436..04719d10de62efb7a4045135b1b898704fa1d77b 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -145,14 +145,11 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message, result, err := comms.SendPutManyMessages(host, wrappedMessage, timeout) if err != nil { - warn, err := handlePutMessageError(firstGateway, instance, + err := handlePutMessageError(firstGateway, instance, session, nodeRegistration, recipientString, bestRound, err) - if warn { - jww.WARN.Printf("SendManyCMIX Failed: %+v", err) - } else { - return result, errors.WithMessagef(err, + return result, errors.WithMessagef(err, "SendManyCMIX %s", unrecoverableError) - } + } return result, err } diff --git a/network/roundTracking.go b/network/roundTracking.go index 08ca3b74902bd489ac1194ee51cd3eb26e5e6cc4..602d8c55dbb72cdc5cddf1034e6290bf00512e75 100644 --- a/network/roundTracking.go +++ b/network/roundTracking.go @@ -62,7 +62,7 @@ func (rt *RoundTracker) denote(rid id.Round, state RoundState) { defer rt.mux.Unlock() // this ensures a lower state will not overwrite a higher state. // eg. Unchecked does not overwrite MessageAvailable - if storedState, exists := rt.state[rid]; exists && storedState < state { + if storedState, exists := rt.state[rid]; exists && storedState > state { jww.TRACE.Printf("did not denote round %d because " + "stored state of %s (%d) > passed state %s (%d)", rid, storedState, storedState, state, state) diff --git a/network/rounds/unchecked.go b/network/rounds/unchecked.go index e62bff0c6885d71080b4544cf6602ec684fa2a9a..bd91584fad326ae04b8c6574951681fd93266b34 100644 --- a/network/rounds/unchecked.go +++ b/network/rounds/unchecked.go @@ -11,6 +11,8 @@ import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage/reception" + "gitlab.com/elixxir/client/storage/rounds" + "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" "time" ) @@ -47,9 +49,7 @@ func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTab return case <-ticker.C: - // Pull and iterate through uncheckedRound list - roundList := m.Session.UncheckedRounds().GetList() - for rid, rnd := range roundList { + iterator := func(rid id.Round, rnd rounds.UncheckedRound){ // If this round is due for a round check, send the round over // to the retrieval thread. If not due, check next round. if isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { @@ -80,8 +80,9 @@ func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTab } } - } + // Pull and iterate through uncheckedRound list + m.Session.UncheckedRounds().IterateOverList(iterator) } } } diff --git a/storage/e2e/stateVector.go b/storage/e2e/stateVector.go index 10306ae562bde79a5d6382d167f83b823ccbc8df..c01b1a2ecab7cbb03f7455eee7e758bdf1210684 100644 --- a/storage/e2e/stateVector.go +++ b/storage/e2e/stateVector.go @@ -212,17 +212,16 @@ func (sv *stateVector) Delete() error { // execute a store and a store must be executed after. func (sv *stateVector) nextAvailable() { - block := (sv.firstAvailable + 1) / 64 - pos := (sv.firstAvailable + 1) % 64 + //plus one so we start at the next one + pos := sv.firstAvailable+1 + block := pos/64 - for ; block < uint32(len(sv.vect)) && (sv.vect[block]>>pos)&1 == 1; pos++ { - if pos == 63 { - pos = 0 - block++ - } + for block < uint32(len(sv.vect)) && (sv.vect[block]>>(pos%64))&1 == 1 { + pos++ + block = pos/64 } - sv.firstAvailable = block*64 + pos + sv.firstAvailable = pos } //ekv functions diff --git a/storage/e2e/stateVector_test.go b/storage/e2e/stateVector_test.go index 99fd5cf606088b3d55ddda3f3e6045a04effe9af..271c982d01652c7ca556db06dd3983da9e9c268d 100644 --- a/storage/e2e/stateVector_test.go +++ b/storage/e2e/stateVector_test.go @@ -11,6 +11,7 @@ import ( "fmt" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" + "math" "math/bits" "reflect" "testing" @@ -110,6 +111,32 @@ func TestStateVector_Next(t *testing.T) { } } + +// Shows that Next mutates vector state as expected +// Shows that Next can find key indexes all throughout the bitfield +// A bug was found when the next avalible was in the first index of a word, this tests that case +func TestStateVector_Next_EdgeCase(t *testing.T) { + // Expected results: all keynums, and beyond the last key + //expectedFirstAvail := []uint32{139, 145, 300, 360, 420, 761, 868, 875, 893, 995} + + const numKeys = 1000 + sv, err := newStateVector(versioned.NewKV(make(ekv.Memstore)), "key", numKeys) + if err != nil { + t.Fatal(err) + } + + // Set a few clean bits randomly + sv.vect[0] = math.MaxUint64 + + sv.firstAvailable = 0 + sv.nextAvailable() + + // firstAvailable should now be beyond the end of the bitfield + if sv.firstAvailable !=64 { + t.Errorf("Next avalivle skiped the first of the next word, should be 64, is %d", sv.firstAvailable) + } +} + // Shows that Use() mutates the state vector itself func TestStateVector_Use(t *testing.T) { // These keyNums will be set to dirty with Use diff --git a/storage/rounds/uncheckedRounds.go b/storage/rounds/uncheckedRounds.go index 898222b71556ef86146d996e23c9804bff356f85..a8a92cc7859855735d8da7380c5ca9b5ee71ac76 100644 --- a/storage/rounds/uncheckedRounds.go +++ b/storage/rounds/uncheckedRounds.go @@ -200,10 +200,12 @@ func (s *UncheckedRoundStore) GetRound(rid id.Round) (UncheckedRound, bool) { } // Retrieves the list of rounds -func (s *UncheckedRoundStore) GetList() map[id.Round]UncheckedRound { +func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, rnd UncheckedRound)) { s.mux.RLock() defer s.mux.RUnlock() - return s.list + for rid, rnd := range s.list { + go iterator(rid, rnd) + } } // Increments the amount of checks performed on this stored round