From 20e1c77f57935371091c58765788ba45973aefd2 Mon Sep 17 00:00:00 2001 From: Jono Wenger <jono@elixxir.io> Date: Mon, 28 Mar 2022 16:02:06 -0700 Subject: [PATCH] Fix tests for network/rounds --- network/rounds/get.go | 47 ++++---- network/rounds/manager.go | 13 ++- network/rounds/params.go | 6 +- network/rounds/retrieve.go | 152 +++++++++++++++----------- network/rounds/retrieve_test.go | 6 +- network/rounds/store/roundIdentity.go | 2 +- network/rounds/store/store.go | 10 +- network/rounds/unchecked.go | 48 ++++---- network/rounds/unchecked_test.go | 42 +++---- network/rounds/utils_test.go | 143 +++++++++++++++--------- storage/session.go | 29 ++++- 11 files changed, 294 insertions(+), 204 deletions(-) diff --git a/network/rounds/get.go b/network/rounds/get.go index 07bef0fdc..7fd430d19 100644 --- a/network/rounds/get.go +++ b/network/rounds/get.go @@ -9,55 +9,60 @@ package rounds import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/interfaces" + "gitlab.com/elixxir/client/network/identity/receptionID" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/primitives/id" ) -func (m *manager) GetMessagesFromRound(roundID id.Round, identity interfaces.EphemeralIdentity) { - //get the round from the in ram store +func (m *manager) GetMessagesFromRound(roundID id.Round, identity receptionID.EphemeralIdentity) { + // Get the round from the in-RAM store ri, err := m.instance.GetRound(roundID) - // If we didn't find it, send to Historical Rounds Retrieval + // If we did not find it, then send to Historical Rounds Retrieval if err != nil || m.params.ForceHistoricalRounds { - // store the round as an unretreived round without a round info + // Store the round as an un-retrieved round without a round info // This will silently do nothing if the round is err = m.unchecked.AddRound(roundID, nil, identity.Source, identity.EphId) if err != nil { - jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) + jww.FATAL.Panicf( + "Failed to denote Unchecked Round for round %d", roundID) } if m.params.ForceHistoricalRounds { - jww.WARN.Printf("Forcing use of historical rounds for round ID %d.", - roundID) + jww.WARN.Printf( + "Forcing use of historical rounds for round ID %d.", roundID) } jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ "up messages via historical lookup", roundID, identity.EphId.Int64(), identity.Source) - err = m.historical.LookupHistoricalRound(roundID, func(info *pb.RoundInfo, success bool) { - if !success { - - } - // If found, send to Message Retrieval Workers - m.lookupRoundMessages <- roundLookup{ - RoundInfo: info, - Identity: identity, - } - }) - } else { // if we did find it, send it to the round pickup thread + err = m.historical.LookupHistoricalRound( + roundID, func(info *pb.RoundInfo, success bool) { + if !success { + // TODO: implement me + } + // If found, send to Message Retrieval Workers + m.lookupRoundMessages <- roundLookup{ + RoundInfo: info, + Identity: identity, + } + }) + } else { + // If we did find it, send it to the round pickup thread jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ "up messages via in ram lookup", roundID, identity.EphId.Int64(), identity.Source) - //store the round as an unretreived round + + // store the round as an un-retrieved round if !m.params.RealtimeOnly { err = m.unchecked.AddRound(roundID, ri, identity.Source, identity.EphId) if err != nil { - jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) + jww.FATAL.Panicf( + "Failed to denote Unchecked Round for round %d", roundID) } } diff --git a/network/rounds/manager.go b/network/rounds/manager.go index f9f42d676..de4c952bb 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -45,8 +45,9 @@ type manager struct { } func NewPickup(params Params, bundles chan<- message.Bundle, - sender gateway.Sender, historical historical.Retriever, rng *fastRNG.StreamGenerator, - instance RoundGetter, session storage.Session) Pickup { + sender gateway.Sender, historical historical.Retriever, + rng *fastRNG.StreamGenerator, instance RoundGetter, + session storage.Session) Pickup { unchecked := store.NewOrLoadUncheckedStore(session.GetKV()) m := &manager{ params: params, @@ -67,9 +68,10 @@ func (m *manager) StartProcessors() stoppable.Stoppable { multi := stoppable.NewMulti("Rounds") - //start the message retrieval worker pool + // Start the message retrieval worker pool for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ { - stopper := stoppable.NewSingle("Message Retriever " + strconv.Itoa(int(i))) + stopper := stoppable.NewSingle( + "Message Retriever " + strconv.Itoa(int(i))) go m.processMessageRetrieval(m.comms, stopper) multi.Add(stopper) } @@ -77,7 +79,8 @@ func (m *manager) StartProcessors() stoppable.Stoppable { // Start the periodic unchecked round worker if !m.params.RealtimeOnly { stopper := stoppable.NewSingle("UncheckRound") - go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper) + go m.processUncheckedRounds( + m.params.UncheckRoundPeriod, backOffTable, stopper) multi.Add(stopper) } diff --git a/network/rounds/params.go b/network/rounds/params.go index dbd9e3e6a..ab7f956c9 100644 --- a/network/rounds/params.go +++ b/network/rounds/params.go @@ -12,8 +12,8 @@ type Params struct { // Maximum number of times a historical round lookup will be attempted MaxHistoricalRoundsRetries uint - // Interval between checking for rounds in UncheckedRoundStore - // due for a message retrieval retry + // Interval between checking for rounds in UncheckedRoundStore due for a + // message retrieval retry UncheckRoundPeriod time.Duration // Toggles if message pickup retrying mechanism if forced @@ -24,7 +24,7 @@ type Params struct { // tried SendTimeout time.Duration - //disables all attempts to pick up dropped or missed messages + // Disables all attempts to pick up dropped or missed messages RealtimeOnly bool // Toggles if historical rounds should always be used diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 149306323..5891480cc 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -25,8 +25,8 @@ import ( type MessageRetrievalComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) - RequestMessages(host *connect.Host, - message *pb.GetMessages) (*pb.GetMessagesResponse, error) + RequestMessages(host *connect.Host, message *pb.GetMessages) ( + *pb.GetMessagesResponse, error) } type roundLookup struct { @@ -37,7 +37,7 @@ type roundLookup struct { const noRoundError = "does not have round %d" // processMessageRetrieval received a roundLookup request and pings the gateways -// of that round for messages for the requested Identity in the roundLookup +// of that round for messages for the requested Identity in the roundLookup. func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, stop *stoppable.Single) { @@ -49,11 +49,14 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, case rl := <-m.lookupRoundMessages: ri := rl.RoundInfo jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) + if !m.params.RealtimeOnly { err := m.unchecked.AddRound(id.Round(ri.ID), ri, rl.Identity.Source, rl.Identity.EphId) if err != nil { - jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", id.Round(ri.ID)) + jww.FATAL.Panicf( + "Failed to denote Unchecked Round for round %d", + id.Round(ri.ID)) } } @@ -62,37 +65,41 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, for i, idBytes := range ri.Topology { gwId, err := id.Unmarshal(idBytes) if err != nil { - jww.FATAL.Panicf("processMessageRetrieval: Unable to unmarshal: %+v", err) + jww.FATAL.Panicf( + "processMessageRetrieval: Unable to unmarshal: %+v", err) } gwId.SetType(id.Gateway) gwIds[i] = gwId } + if len(gwIds) == 0 { jww.WARN.Printf("Empty gateway ID List") continue } - // Target the last nodes in the team first because it has - // messages first, randomize other members of the team + + // Target the last nodes in the team first because it has messages + // first, randomize other members of the team var rndBytes [32]byte stream := m.rng.GetStream() _, err := stream.Read(rndBytes[:]) stream.Close() if err != nil { jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ - "from all gateways (%v): %s", - id.Round(ri.ID), gwIds, err) + "from all gateways (%v): %s", ri.ID, gwIds, err) } + gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0] shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) { gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1] }) - // If ForceMessagePickupRetry, we are forcing processUncheckedRounds by - // randomly not picking up messages (FOR INTEGRATION TEST). Only done if - // round has not been ignored before + // If ForceMessagePickupRetry, we are forcing processUncheckedRounds + // by randomly not picking up messages (FOR INTEGRATION TEST). Only + // done if round has not been ignored before. var bundle message.Bundle if m.params.ForceMessagePickupRetry { - bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds, stop) + bundle, err = m.forceMessagePickupRetry( + ri, rl, comms, gwIds, stop) // Exit if the thread has been stopped if stoppable.CheckErr(err) { @@ -100,13 +107,13 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, continue } if err != nil { - jww.ERROR.Printf("Failed to get pickup round %d "+ - "from all gateways (%v): %s", - id.Round(ri.ID), gwIds, err) + jww.ERROR.Printf("Failed to get pickup round %d from all "+ + "gateways (%v): %s", ri.ID, gwIds, err) } } else { // Attempt to request for this gateway - bundle, err = m.getMessagesFromGateway(id.Round(ri.ID), rl.Identity, comms, gwIds, stop) + bundle, err = m.getMessagesFromGateway( + id.Round(ri.ID), rl.Identity, comms, gwIds, stop) // Exit if the thread has been stopped if stoppable.CheckErr(err) { @@ -114,18 +121,20 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, continue } - // After trying all gateways, if none returned we mark the round as a - // failure and print out the last error + // After trying all gateways, if none returned we mark the round + // as a failure and print out the last error if err != nil { jww.ERROR.Printf("Failed to get pickup round %d "+ - "from all gateways (%v): %s", - id.Round(ri.ID), gwIds, err) + "from all gateways (%v): %s", ri.ID, gwIds, err) } } + jww.DEBUG.Printf("messages: %v\n", bundle.Messages) + if len(bundle.Messages) != 0 { - // If successful and there are messages, we send them to another thread + // If successful and there are messages, we send them to another + // thread bundle.Identity = receptionID.EphemeralIdentity{ EphId: rl.Identity.EphId, Source: rl.Identity.Source, @@ -135,10 +144,11 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) if !m.params.RealtimeOnly { - err = m.unchecked.Remove(id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) + err = m.unchecked.Remove( + id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) if err != nil { - jww.ERROR.Printf("Could not remove round %d "+ - "from unchecked rounds store: %v", ri.ID, err) + jww.ERROR.Printf("Could not remove round %d from "+ + "unchecked rounds store: %v", ri.ID, err) } } @@ -148,49 +158,55 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, } } -// getMessagesFromGateway attempts to get messages from their assigned -// gateway host in the round specified. If successful +// getMessagesFromGateway attempts to get messages from their assigned gateway +// host in the round specified. If successful func (m *manager) getMessagesFromGateway(roundID id.Round, identity receptionID.EphemeralIdentity, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (message.Bundle, error) { start := time.Now() // Send to the gateways using backup proxies - result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) { - jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ - "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) - - // send the request - msgReq := &pb.GetMessages{ - ClientID: identity.EphId[:], - RoundID: uint64(roundID), - Target: target.Marshal(), - } + result, err := m.sender.SendToPreferred(gwIds, + func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) { + jww.DEBUG.Printf("Trying to get messages for round %d for "+ + "ephemeralID %d (%s) via Gateway: %s", roundID, + identity.EphId.Int64(), identity.Source, host.GetId()) + + // send the request + msgReq := &pb.GetMessages{ + ClientID: identity.EphId[:], + RoundID: uint64(roundID), + Target: target.Marshal(), + } - // If the gateway doesnt have the round, return an error - msgResp, err := comms.RequestMessages(host, msgReq) + // If the gateway doesn't have the round, return an error + msgResp, err := comms.RequestMessages(host, msgReq) - if err != nil { - // you need to default to a retryable errors because otherwise we cannot enumerate all errors - return nil, errors.WithMessage(err, gateway.RetryableError) - } + if err != nil { + // You need to default to a retryable errors because otherwise + // we cannot enumerate all errors + return nil, errors.WithMessage(err, gateway.RetryableError) + } - if !msgResp.GetHasRound() { - errRtn := errors.Errorf(noRoundError, roundID) - return message.Bundle{}, errors.WithMessage(errRtn, gateway.RetryableError) - } + if !msgResp.GetHasRound() { + errRtn := errors.Errorf(noRoundError, roundID) + return message.Bundle{}, + errors.WithMessage(errRtn, gateway.RetryableError) + } + + return msgResp, nil + }, stop, m.params.SendTimeout) - return msgResp, nil - }, stop, m.params.SendTimeout) jww.INFO.Printf("Received message for round %d, processing...", roundID) - // Fail the round if an error occurs so it can be tried again later + + // Fail the round if an error occurs so that it can be tried again later if err != nil { - return message.Bundle{}, errors.WithMessagef(err, "Failed to "+ - "request messages for round %d", roundID) + return message.Bundle{}, errors.WithMessagef( + err, "Failed to request messages for round %d", roundID) } msgResp := result.(*pb.GetMessagesResponse) - // If there are no messages print a warning. Due to the probabilistic nature - // of the bloom filters, false positives will happen sometimes + // If there are no messages, print a warning. Due to the probabilistic + // nature of the bloom filters, false positives will happen sometimes msgs := msgResp.GetMessages() if msgs == nil || len(msgs) == 0 { jww.WARN.Printf("no messages for client %s "+ @@ -207,10 +223,11 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, return message.Bundle{}, nil } - jww.INFO.Printf("Received %d messages in Round %v for %d (%s) in %s", - len(msgs), roundID, identity.EphId.Int64(), identity.Source, time.Now().Sub(start)) + jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s", + len(msgs), roundID, identity.EphId.Int64(), identity.Source, + time.Now().Sub(start)) - // build the bundle of messages to send to the message processor + // Build the bundle of messages to send to the message processor bundle := message.Bundle{ Round: roundID, Messages: make([]format.Message, len(msgs)), @@ -230,31 +247,34 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, } -// Helper function which forces processUncheckedRounds by randomly -// not looking up messages +// Helper function which forces processUncheckedRounds by randomly not looking +// up messages. func (m *manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (bundle message.Bundle, err error) { - rnd, _ := m.unchecked.GetRound(id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) + rnd, _ := m.unchecked.GetRound( + id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) if rnd.NumChecks == 0 { // Flip a coin to determine whether to pick up message - stream := m.rng.GetStream() - defer stream.Close() b := make([]byte, 8) + stream := m.rng.GetStream() _, err = stream.Read(b) if err != nil { - jww.FATAL.Panic(err.Error()) + jww.FATAL.Panic(err) } + stream.Close() + result := binary.BigEndian.Uint64(b) if result%2 == 0 { jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID) - // Do not call get message, leaving the round to be picked up - // in unchecked round scheduler process + // Do not call get message, leaving the round to be picked up in + // unchecked round scheduler process return } } // Attempt to request for this gateway - return m.getMessagesFromGateway(id.Round(ri.ID), rl.Identity, comms, gwIds, stop) + return m.getMessagesFromGateway( + id.Round(ri.ID), rl.Identity, comms, gwIds, stop) } diff --git a/network/rounds/retrieve_test.go b/network/rounds/retrieve_test.go index 84e2cd4c3..81f2a3871 100644 --- a/network/rounds/retrieve_test.go +++ b/network/rounds/retrieve_test.go @@ -14,6 +14,7 @@ import ( "gitlab.com/elixxir/client/stoppable" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" @@ -26,6 +27,7 @@ import ( // Happy path func TestManager_ProcessMessageRetrieval(t *testing.T) { // General initializations + connect.TestingOnlyDisableTLS = true testManager := newManager(t) roundId := id.Round(5) mockComms := &mockMessageRetrievalComms{testingSignature: t} @@ -139,8 +141,8 @@ func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) { testNdf, mockComms, testManager.session, nil) stop := stoppable.NewSingle("singleStoppable") - // Create a local channel so reception is possible (testManager.messageBundles is - // send only via newManager call above) + // Create a local channel so reception is possible + // (testManager.messageBundles is sent only via newManager call above) messageBundleChan := make(chan message.Bundle) testManager.messageBundles = messageBundleChan diff --git a/network/rounds/store/roundIdentity.go b/network/rounds/store/roundIdentity.go index b63fac99a..a5c64b021 100644 --- a/network/rounds/store/roundIdentity.go +++ b/network/rounds/store/roundIdentity.go @@ -31,7 +31,7 @@ func newRoundIdentity(rid id.Round, recipient *id.ID, ephID ephemeral.Id) roundI } // String prints a base 64 string representation of roundIdentity. This function -// satisfies the fmt.Stringer interface. +// adheres to the fmt.Stringer interface. func (ri roundIdentity) String() string { return base64.StdEncoding.EncodeToString(ri[:]) } diff --git a/network/rounds/store/store.go b/network/rounds/store/store.go index b3d8a8983..5d6a52017 100644 --- a/network/rounds/store/store.go +++ b/network/rounds/store/store.go @@ -33,7 +33,7 @@ func NewUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { return urs, urs.save() } -// NewUncheckedStore is a constructor for a UncheckedRoundStore. +// NewOrLoadUncheckedStore is a constructor for a UncheckedRoundStore. func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore { kv = kv.Prefix(uncheckedRoundPrefix) @@ -48,7 +48,7 @@ func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore { } if err = urs.save(); err != nil { - jww.FATAL.Panicf("failed to save a new unchecked round store") + jww.FATAL.Panicf("Failed to save a new unchecked round store: %v", err) } return urs @@ -56,7 +56,6 @@ func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore { // LoadUncheckedStore loads a deserializes a UncheckedRoundStore from memory. func LoadUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { - kv = kv.Prefix(uncheckedRoundPrefix) vo, err := kv.Get(uncheckedRoundKey, uncheckedRoundVersion) if err != nil { @@ -126,9 +125,8 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, defer s.mux.RUnlock() for _, rnd := range s.list { - jww.DEBUG.Printf("rnd for lookup: %d, %+v\n", rnd.Id, rnd) - go func(localRid id.Round, - localRnd UncheckedRound) { + jww.DEBUG.Printf("Round for lookup: %d, %+v\n", rnd.Id, rnd) + go func(localRid id.Round, localRnd UncheckedRound) { iterator(localRid, localRnd) }(rnd.Id, rnd) } diff --git a/network/rounds/unchecked.go b/network/rounds/unchecked.go index 74592292f..61ec17e77 100644 --- a/network/rounds/unchecked.go +++ b/network/rounds/unchecked.go @@ -18,7 +18,7 @@ import ( ) // Constants for message retrieval backoff delays -// todo - make this a real backoff +// TODO: Make this a real backoff const ( tryZero = 10 * time.Second tryOne = 30 * time.Second @@ -27,21 +27,22 @@ const ( tryFour = 3 * time.Hour tryFive = 12 * time.Hour trySix = 24 * time.Hour - // Amount of tries past which the - // backoff will not increase + // Amount of tries past which the backoff will not increase cappedTries = 7 ) -var backOffTable = [cappedTries]time.Duration{tryZero, tryOne, tryTwo, tryThree, tryFour, tryFive, trySix} +var backOffTable = [cappedTries]time.Duration{ + tryZero, tryOne, tryTwo, tryThree, tryFour, tryFive, trySix} -// processUncheckedRounds will (periodically) check every checkInterval -// for rounds that failed message retrieval in processMessageRetrieval. -// Rounds will have a backoff duration in which they will be tried again. -// If a round is found to be due on a periodical check, the round is sent -// back to processMessageRetrieval. -// todo - make this system know which rounds are still in progress instead of just assume by time -func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTable [cappedTries]time.Duration, - stop *stoppable.Single) { +// processUncheckedRounds will (periodically) check every checkInterval for +// rounds that failed message retrieval in processMessageRetrieval. Rounds will +// have a backoff duration in which they will be tried again. If a round is +// found to be due on a periodical check, the round is sent back to +// processMessageRetrieval. +// TODO: Make this system know which rounds are still in progress instead of +// just assume by time +func (m *manager) processUncheckedRounds(checkInterval time.Duration, + backoffTable [cappedTries]time.Duration, stop *stoppable.Single) { ticker := time.NewTicker(checkInterval) uncheckedRoundStore := m.unchecked for { @@ -53,18 +54,23 @@ func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTab case <-ticker.C: iterator := func(rid id.Round, rnd store.UncheckedRound) { - jww.DEBUG.Printf("checking if %d due for a message lookup", rid) + jww.DEBUG.Printf( + "Checking if round %d is due for a message lookup.", rid) // If this round is due for a round check, send the round over - // to the retrieval thread. If not due, check next round. + // to the retrieval thread. If not due, then check next round. if !isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { return } - jww.INFO.Printf("Round %d due for a message lookup, retrying...", rid) - //check if it needs to be processed by historical Rounds + + jww.INFO.Printf( + "Round %d due for a message lookup, retrying...", rid) + + // Check if it needs to be processed by historical Rounds m.GetMessagesFromRound(rid, receptionID.EphemeralIdentity{ EphId: rnd.EpdId, Source: rnd.Source, }) + // Update the state of the round for next look-up (if needed) err := uncheckedRoundStore.IncrementCheck(rid, rnd.Source, rnd.EpdId) if err != nil { @@ -72,16 +78,18 @@ func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTab "increment check attempts for round %d: %v", rid, err) } } + // Pull and iterate through uncheckedRound list m.unchecked.IterateOverList(iterator) } } } -// isRoundCheckDue given the amount of tries and the timestamp the round -// was stored, determines whether this round is due for another check. -// Returns true if a new check is due -func isRoundCheckDue(tries uint64, ts time.Time, backoffTable [cappedTries]time.Duration) bool { +// isRoundCheckDue determines whether this round is due for another check given +// the amount of tries and the timestamp the round was stored. Returns true if a +// new check is due +func isRoundCheckDue(tries uint64, ts time.Time, + backoffTable [cappedTries]time.Duration) bool { now := netTime.Now() if tries >= uint64(len(backoffTable)) { diff --git a/network/rounds/unchecked_test.go b/network/rounds/unchecked_test.go index ae865d92c..66d475bcf 100644 --- a/network/rounds/unchecked_test.go +++ b/network/rounds/unchecked_test.go @@ -13,11 +13,11 @@ import ( "gitlab.com/elixxir/client/stoppable" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/ndf" - "reflect" "testing" "time" ) @@ -25,6 +25,7 @@ import ( // Happy path func TestUncheckedRoundScheduler(t *testing.T) { // General initializations + connect.TestingOnlyDisableTLS = true testManager := newManager(t) roundId := id.Round(5) mockComms := &mockMessageRetrievalComms{testingSignature: t} @@ -37,9 +38,9 @@ func TestUncheckedRoundScheduler(t *testing.T) { testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} p := gateway.DefaultPoolParams() p.MaxPoolSize = 1 - testManager.sender, _ = gateway.NewSender(p, - fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), - testNdf, mockComms, testManager.Session, nil) + rngGen := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) + testManager.sender, _ = gateway.NewSender( + p, rngGen, testNdf, mockComms, testManager.session, nil) // Create a local channel so reception is possible (testManager.messageBundles is // send only via newManager call above) @@ -62,34 +63,27 @@ func TestUncheckedRoundScheduler(t *testing.T) { Topology: idList, } - // Add round ot check - err := testManager.Session.UncheckedRounds().AddRound(roundId, roundInfo, requestGateway, expectedEphID) + // Add round to check + err := testManager.unchecked.AddRound(roundId, roundInfo, requestGateway, expectedEphID) if err != nil { t.Fatalf("Could not add round to session: %v", err) } var testBundle message.Bundle - go func() { - // Receive the bundle over the channel - time.Sleep(1 * time.Second) - testBundle = <-messageBundleChan - - // Close the process - if err := stop1.Close(); err != nil { - t.Errorf("Failed to signal close to process: %+v", err) - } - if err := stop2.Close(); err != nil { - t.Errorf("Failed to signal close to process: %+v", err) - } - - }() - - // Ensure bundle received and has expected values - time.Sleep(2 * time.Second) - if reflect.DeepEqual(testBundle, message.Bundle{}) { + select { + case testBundle = <-messageBundleChan: + case <-time.After(500 * time.Millisecond): t.Fatalf("Did not receive a message bundle over the channel") } + // Close the process + if err = stop1.Close(); err != nil { + t.Errorf("Failed to signal close to process: %+v", err) + } + if err = stop2.Close(); err != nil { + t.Errorf("Failed to signal close to process: %+v", err) + } + if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() { t.Errorf("Unexpected address ID in bundle."+ "\n\tExpected: %v"+ diff --git a/network/rounds/utils_test.go b/network/rounds/utils_test.go index f5b9fc5df..08995ff1c 100644 --- a/network/rounds/utils_test.go +++ b/network/rounds/utils_test.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/network/message" + "gitlab.com/elixxir/client/network/rounds/store" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/testkeys" @@ -23,38 +24,67 @@ import ( "time" ) -func newManager(face interface{}) *manager { - sess1 := storage.InitTestingSession(face) +func newManager(t *testing.T) *manager { + session := storage.InitTestingSession(t) + + unchecked, err := store.NewUncheckedStore(session.GetKV()) + if err != nil { + t.Errorf("Failed to make new UncheckedRoundStore: %+v", err) + } + + instance := &MockRoundGetter{ + topology: [][]byte{ + id.NewIdFromString("gateway0", id.Gateway, t).Bytes(), + id.NewIdFromString("gateway1", id.Gateway, t).Bytes(), + id.NewIdFromString("gateway2", id.Gateway, t).Bytes(), + id.NewIdFromString("gateway3", id.Gateway, t).Bytes(), + }, + } testManager := &manager{ params: GetDefaultParams(), + session: session, + rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + instance: instance, lookupRoundMessages: make(chan roundLookup), messageBundles: make(chan message.Bundle), - session: sess1, - rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + unchecked: unchecked, } return testManager } -// Build ID off of this string for expected gateway -// which will return on over mock comm -const ReturningGateway = "GetMessageRequest" -const FalsePositive = "FalsePositive" -const PayloadMessage = "Payload" -const ErrorGateway = "Error" +type MockRoundGetter struct { + topology [][]byte +} + +func (mrg *MockRoundGetter) GetRound(rid id.Round) (*pb.RoundInfo, error) { + return &pb.RoundInfo{ + ID: uint64(rid), + Topology: mrg.topology, + }, nil +} + +// Build ID off of this string for expected gateway that will be returned over +// mock comm +const ( + ReturningGateway = "GetMessageRequest" + FalsePositive = "FalsePositive" + PayloadMessage = "Payload" + ErrorGateway = "Error" +) type mockMessageRetrievalComms struct { testingSignature *testing.T } -func (mmrc *mockMessageRetrievalComms) AddHost(hid *id.ID, address string, cert []byte, params connect.HostParams) (host *connect.Host, err error) { +func (mmrc *mockMessageRetrievalComms) AddHost(_ *id.ID, _ string, _ []byte, + _ connect.HostParams) (host *connect.Host, err error) { host, _ = mmrc.GetHost(nil) return host, nil } -func (mmrc *mockMessageRetrievalComms) RemoveHost(hid *id.ID) { - +func (mmrc *mockMessageRetrievalComms) RemoveHost(_ *id.ID) { } func (mmrc *mockMessageRetrievalComms) GetHost(hostId *id.ID) (*connect.Host, bool) { @@ -65,13 +95,14 @@ func (mmrc *mockMessageRetrievalComms) GetHost(hostId *id.ID) (*connect.Host, bo return h, true } -// Mock comm which returns differently based on the host ID -// ReturningGateway returns a happy path response, in which there is a message -// FalsePositive returns a response in which there were no messages in the round -// ErrorGateway returns an error on the mock comm +// RequestMessages returns differently based on the host ID. +// ReturningGateway returns a happy path response, in which there is a message. +// FalsePositive returns a response in which there were no messages in the +// round. +// ErrorGateway returns an error on the mock comm. // Any other ID returns default no round errors func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, - message *pb.GetMessages) (*pb.GetMessagesResponse, error) { + _ *pb.GetMessages) (*pb.GetMessagesResponse, error) { payloadMsg := []byte(PayloadMessage) payload := make([]byte, 256) copy(payload, payloadMsg) @@ -81,7 +112,8 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, } // If we are the requesting on the returning gateway, return a mock response - returningGateway := id.NewIdFromString(ReturningGateway, id.Gateway, mmrc.testingSignature) + returningGateway := id.NewIdFromString( + ReturningGateway, id.Gateway, mmrc.testingSignature) if host.GetId().Cmp(returningGateway) { return &pb.GetMessagesResponse{ Messages: []*pb.Slot{testSlot}, @@ -89,8 +121,10 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, }, nil } - // Return an empty message structure (ie a false positive in the bloom filter) - falsePositive := id.NewIdFromString(FalsePositive, id.Gateway, mmrc.testingSignature) + // Return an empty message structure (i.e. a false positive in the bloom + // filter) + falsePositive := id.NewIdFromString( + FalsePositive, id.Gateway, mmrc.testingSignature) if host.GetId().Cmp(falsePositive) { return &pb.GetMessagesResponse{ Messages: []*pb.Slot{}, @@ -99,7 +133,8 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, } // Return a mock error - errorGateway := id.NewIdFromString(ErrorGateway, id.Gateway, mmrc.testingSignature) + errorGateway := id.NewIdFromString( + ErrorGateway, id.Gateway, mmrc.testingSignature) if host.GetId().Cmp(errorGateway) { return &pb.GetMessagesResponse{}, errors.Errorf("Connection error") } @@ -112,7 +147,8 @@ func newTestBackoffTable(face interface{}) [cappedTries]time.Duration { case *testing.T, *testing.M, *testing.B, *testing.PB: break default: - jww.FATAL.Panicf("newTestBackoffTable is restricted to testing only. Got %T", face) + jww.FATAL.Panicf( + "newTestBackoffTable is restricted to testing only. Got %T", face) } var backoff [cappedTries]time.Duration @@ -144,38 +180,41 @@ func getNDF() *ndf.NetworkDefinition { }, }, E2E: ndf.Group{ - Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B" + - "7A8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3DD2AE" + - "DF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E7861575E745D31F" + - "8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC6ADC718DD2A3E041" + - "023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C4A530E8FFB1BC51DADDF45" + - "3B0B2717C2BC6669ED76B4BDD5C9FF558E88F26E5785302BEDBCA23EAC5ACE9209" + - "6EE8A60642FB61E8F3D24990B8CB12EE448EEF78E184C7242DD161C7738F32BF29" + - "A841698978825B4111B4BC3E1E198455095958333D776D8B2BEEED3A1A1A221A6E" + - "37E664A64B83981C46FFDDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F2" + - "78DE8014A47323631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696" + - "015CB79C3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E" + - "6319BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC35873" + - "847AEF49F66E43873", + Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B7A" + + "8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3D" + + "D2AEDF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E78615" + + "75E745D31F8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC" + + "6ADC718DD2A3E041023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C" + + "4A530E8FFB1BC51DADDF453B0B2717C2BC6669ED76B4BDD5C9FF558E88F2" + + "6E5785302BEDBCA23EAC5ACE92096EE8A60642FB61E8F3D24990B8CB12EE" + + "448EEF78E184C7242DD161C7738F32BF29A841698978825B4111B4BC3E1E" + + "198455095958333D776D8B2BEEED3A1A1A221A6E37E664A64B83981C46FF" + + "DDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F278DE8014A47323" + + "631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696015CB79C" + + "3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E63" + + "19BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC3" + + "5873847AEF49F66E43873", Generator: "2", }, CMIX: ndf.Group{ - Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642F0B5C48" + - "C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757264E5A1A44F" + - "FE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F9716BFE6117C6B5" + - "B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091EB51743BF33050C38DE2" + - "35567E1B34C3D6A5C0CEAA1A0F368213C3D19843D0B4B09DCB9FC72D39C8DE41" + - "F1BF14D4BB4563CA28371621CAD3324B6A2D392145BEBFAC748805236F5CA2FE" + - "92B871CD8F9C36D3292B5509CA8CAA77A2ADFC7BFD77DDA6F71125A7456FEA15" + - "3E433256A2261C6A06ED3693797E7995FAD5AABBCFBE3EDA2741E375404AE25B", - Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E24809670716C613" + - "D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D1AA58C4328A06C4" + - "6A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A338661D10461C0D135472" + - "085057F3494309FFA73C611F78B32ADBB5740C361C9F35BE90997DB2014E2EF5" + - "AA61782F52ABEB8BD6432C4DD097BC5423B285DAFB60DC364E8161F4A2A35ACA" + - "3A10B1C4D203CC76A470A33AFDCBDD92959859ABD8B56E1725252D78EAC66E71" + - "BA9AE3F1DD2487199874393CD4D832186800654760E1E34C09E4D155179F9EC0" + - "DC4473F996BDCE6EED1CABED8B6F116F7AD9CF505DF0F998E34AB27514B0FFE7", + Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642" + + "F0B5C48C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757" + + "264E5A1A44FFE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F" + + "9716BFE6117C6B5B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091E" + + "B51743BF33050C38DE235567E1B34C3D6A5C0CEAA1A0F368213C3D19843D" + + "0B4B09DCB9FC72D39C8DE41F1BF14D4BB4563CA28371621CAD3324B6A2D3" + + "92145BEBFAC748805236F5CA2FE92B871CD8F9C36D3292B5509CA8CAA77A" + + "2ADFC7BFD77DDA6F71125A7456FEA153E433256A2261C6A06ED3693797E7" + + "995FAD5AABBCFBE3EDA2741E375404AE25B", + Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E2480" + + "9670716C613D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D" + + "1AA58C4328A06C46A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A33" + + "8661D10461C0D135472085057F3494309FFA73C611F78B32ADBB5740C361" + + "C9F35BE90997DB2014E2EF5AA61782F52ABEB8BD6432C4DD097BC5423B28" + + "5DAFB60DC364E8161F4A2A35ACA3A10B1C4D203CC76A470A33AFDCBDD929" + + "59859ABD8B56E1725252D78EAC66E71BA9AE3F1DD2487199874393CD4D83" + + "2186800654760E1E34C09E4D155179F9EC0DC4473F996BDCE6EED1CABED8" + + "B6F116F7AD9CF505DF0F998E34AB27514B0FFE7", }, } } diff --git a/storage/session.go b/storage/session.go index 2e5736232..2f2c543e8 100644 --- a/storage/session.go +++ b/storage/session.go @@ -11,6 +11,7 @@ package storage import ( "gitlab.com/elixxir/client/storage/utility" + "gitlab.com/xx_network/crypto/large" "sync" "testing" "time" @@ -71,16 +72,16 @@ type Session interface { type session struct { kv *versioned.KV - //memoized data + // memoized data mux sync.RWMutex regStatus RegistrationStatus ndf *ndf.NetworkDefinition - //network parameters + // network parameters cmixGroup *cyclic.Group e2eGroup *cyclic.Group - //sub-stores + // sub-stores *user.User clientVersion *clientVersion.Store } @@ -138,7 +139,7 @@ func New(baseDir, password string, u userInterface.Info, } // Loads existing user data into the session -func Load(baseDir, password string, currentVersion version.Version) (*Session, error) { +func Load(baseDir, password string, currentVersion version.Version) (Session, error) { s, err := initStore(baseDir, password) if err != nil { @@ -243,6 +244,26 @@ func InitTestingSession(i interface{}) Session { u.SetRegistrationTimestamp(testTime.UnixNano()) s.User = u + s.cmixGroup = cyclic.NewGroup( + large.NewIntFromString("9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642"+ + "F0B5C48C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757"+ + "264E5A1A44FFE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F"+ + "9716BFE6117C6B5B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091E"+ + "B51743BF33050C38DE235567E1B34C3D6A5C0CEAA1A0F368213C3D19843D"+ + "0B4B09DCB9FC72D39C8DE41F1BF14D4BB4563CA28371621CAD3324B6A2D3"+ + "92145BEBFAC748805236F5CA2FE92B871CD8F9C36D3292B5509CA8CAA77A"+ + "2ADFC7BFD77DDA6F71125A7456FEA153E433256A2261C6A06ED3693797E7"+ + "995FAD5AABBCFBE3EDA2741E375404AE25B", 16), + large.NewIntFromString("5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E2480"+ + "9670716C613D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D"+ + "1AA58C4328A06C46A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A33"+ + "8661D10461C0D135472085057F3494309FFA73C611F78B32ADBB5740C361"+ + "C9F35BE90997DB2014E2EF5AA61782F52ABEB8BD6432C4DD097BC5423B28"+ + "5DAFB60DC364E8161F4A2A35ACA3A10B1C4D203CC76A470A33AFDCBDD929"+ + "59859ABD8B56E1725252D78EAC66E71BA9AE3F1DD2487199874393CD4D83"+ + "2186800654760E1E34C09E4D155179F9EC0DC4473F996BDCE6EED1CABED8"+ + "B6F116F7AD9CF505DF0F998E34AB27514B0FFE7", 16), + ) return s } -- GitLab