From f6e604faa0540626c954dbeb7bc0d0409389311f Mon Sep 17 00:00:00 2001 From: Benjamin Wenger <ben@elixxir.ioo> Date: Thu, 7 Apr 2022 18:05:32 -0700 Subject: [PATCH] refactor cmix.Rounds -> cmix.Pickup, cmix.Historical -> cmix.Rounds --- auth/receivedConfirm.go | 4 +- auth/receivedRequest.go | 4 +- auth/state.go | 8 ++-- auth/store/receivedRequest.go | 12 ++--- auth/store/store.go | 4 +- cmix/client.go | 8 ++-- cmix/historical/params.go | 30 ------------- cmix/identity/receptionID/identity.go | 4 +- cmix/interface.go | 4 +- cmix/message/bundle.go | 4 +- cmix/message/fingerprints_test.go | 4 +- cmix/message/inProgress.go | 4 +- cmix/message/processor.go | 4 +- cmix/params.go | 8 ++-- cmix/{rounds => pickup}/get.go | 8 ++-- cmix/pickup/params.go | 44 +++++++++++++++++++ cmix/{rounds/manager.go => pickup/pickup.go} | 14 +++--- cmix/{rounds => pickup}/retrieve.go | 10 ++--- cmix/{rounds => pickup}/retrieve_test.go | 12 ++--- cmix/{rounds => pickup}/roundGetter.go | 0 .../{rounds => pickup}/store/roundIdentity.go | 0 cmix/{rounds => pickup}/store/store.go | 0 .../store/uncheckedRounds.go | 2 +- .../store/uncheckedRounds_test.go | 0 cmix/{rounds => pickup}/unchecked.go | 4 +- cmix/{rounds => pickup}/unchecked_test.go | 0 cmix/{rounds => pickup}/utils_test.go | 6 +-- cmix/results.go | 20 ++++----- cmix/{historical => rounds}/historical.go | 12 ++--- .../{historical => rounds}/historical_test.go | 2 +- cmix/rounds/params.go | 44 +++++++------------ cmix/{historical => rounds}/round.go | 2 +- .../round.go => cmix/rounds/roundStorage.go | 17 ++++--- e2e/processor.go | 4 +- e2e/receive/message.go | 4 +- e2e/rekey/utils_test.go | 4 +- e2e/unsafeProcessor.go | 4 +- groupChat/receive.go | 4 +- groupChat/send_test.go | 4 +- groupChat/utils_test.go | 4 +- 40 files changed, 163 insertions(+), 164 deletions(-) delete mode 100644 cmix/historical/params.go rename cmix/{rounds => pickup}/get.go (92%) create mode 100644 cmix/pickup/params.go rename cmix/{rounds/manager.go => pickup/pickup.go} (89%) rename cmix/{rounds => pickup}/retrieve.go (96%) rename cmix/{rounds => pickup}/retrieve_test.go (98%) rename cmix/{rounds => pickup}/roundGetter.go (100%) rename cmix/{rounds => pickup}/store/roundIdentity.go (100%) rename cmix/{rounds => pickup}/store/store.go (100%) rename cmix/{rounds => pickup}/store/uncheckedRounds.go (98%) rename cmix/{rounds => pickup}/store/uncheckedRounds_test.go (100%) rename cmix/{rounds => pickup}/unchecked.go (96%) rename cmix/{rounds => pickup}/unchecked_test.go (100%) rename cmix/{rounds => pickup}/utils_test.go (98%) rename cmix/{historical => rounds}/historical.go (95%) rename cmix/{historical => rounds}/historical_test.go (99%) rename cmix/{historical => rounds}/round.go (99%) rename storage/utility/round.go => cmix/rounds/roundStorage.go (64%) diff --git a/auth/receivedConfirm.go b/auth/receivedConfirm.go index 70da22a76..1990fd5bd 100644 --- a/auth/receivedConfirm.go +++ b/auth/receivedConfirm.go @@ -5,7 +5,7 @@ import ( "fmt" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth/store" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/e2e/ratchet/partner/session" @@ -22,7 +22,7 @@ type receivedConfirmService struct { } func (rcs *receivedConfirmService) Process(msg format.Message, - receptionID receptionID.EphemeralIdentity, round historical.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round) { state := rcs.s diff --git a/auth/receivedRequest.go b/auth/receivedRequest.go index 586b048aa..eaea3ddd6 100644 --- a/auth/receivedRequest.go +++ b/auth/receivedRequest.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth/store" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/e2e/ratchet" "gitlab.com/elixxir/crypto/contact" @@ -26,7 +26,7 @@ type receivedRequestService struct { } func (rrs *receivedRequestService) Process(message format.Message, - receptionID receptionID.EphemeralIdentity, round historical.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round) { state := rrs.s diff --git a/auth/state.go b/auth/state.go index f75b7fbd5..9c78cc955 100644 --- a/auth/state.go +++ b/auth/state.go @@ -12,9 +12,9 @@ import ( "github.com/pkg/errors" "gitlab.com/elixxir/client/auth/store" "gitlab.com/elixxir/client/cmix" - "gitlab.com/elixxir/client/cmix/historical" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/storage/versioned" @@ -38,11 +38,11 @@ type state struct { type Callbacks interface { Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) } // NewState loads the auth state or creates new auth state if one cannot be diff --git a/auth/store/receivedRequest.go b/auth/store/receivedRequest.go index 957af798e..fe5fa1f7c 100644 --- a/auth/store/receivedRequest.go +++ b/auth/store/receivedRequest.go @@ -5,7 +5,7 @@ import ( "github.com/cloudflare/circl/dh/sidh" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" util "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/contact" @@ -23,14 +23,14 @@ type ReceivedRequest struct { theirSidHPubKeyA *sidh.PublicKey //round received on - round historical.Round + round rounds.Round //lock to make sure only one operator at a time mux *sync.Mutex } func newReceivedRequest(kv *versioned.KV, c contact.Contact, - key *sidh.PublicKey, round historical.Round) *ReceivedRequest { + key *sidh.PublicKey, round rounds.Round) *ReceivedRequest { if err := util.StoreContact(kv, c); err != nil { jww.FATAL.Panicf("Failed to save contact for partner %s", c.ID.String()) @@ -43,7 +43,7 @@ func newReceivedRequest(kv *versioned.KV, c contact.Contact, } roundStoreKey := makeRoundKey(c.ID) - if err := util.StoreRound(kv, round, roundStoreKey); err != nil { + if err := rounds.StoreRound(kv, round, roundStoreKey); err != nil { jww.FATAL.Panicf("Failed to save round request was received on "+ "for partner %s", c.ID.String()) } @@ -75,7 +75,7 @@ func loadReceivedRequest(kv *versioned.KV, partner *id.ID) ( partner) } - round, err := util.LoadRound(kv, makeRoundKey(partner)) + round, err := rounds.LoadRound(kv, makeRoundKey(partner)) if err != nil { return nil, errors.WithMessagef(err, "Failed to Load "+ "round request was received on with %s", @@ -98,7 +98,7 @@ func (rr *ReceivedRequest) GetTheirSidHPubKeyA() *sidh.PublicKey { return rr.theirSidHPubKeyA } -func (rr *ReceivedRequest) GetRound() historical.Round { +func (rr *ReceivedRequest) GetRound() rounds.Round { return rr.round } diff --git a/auth/store/store.go b/auth/store/store.go index 6a08e6b93..a81000f51 100644 --- a/auth/store/store.go +++ b/auth/store/store.go @@ -12,7 +12,7 @@ import ( "github.com/cloudflare/circl/dh/sidh" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/contact" "gitlab.com/elixxir/crypto/cyclic" @@ -210,7 +210,7 @@ func (s *Store) AddSent(partner *id.ID, partnerHistoricalPubKey, myPrivKey, } func (s *Store) AddReceived(c contact.Contact, key *sidh.PublicKey, - round historical.Round) error { + round rounds.Round) error { s.mux.Lock() defer s.mux.Unlock() jww.DEBUG.Printf("AddReceived new contact: %s", c.ID) diff --git a/cmix/client.go b/cmix/client.go index a161824c7..2beb6873d 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -15,11 +15,11 @@ import ( "gitlab.com/elixxir/client/cmix/address" "gitlab.com/elixxir/client/cmix/gateway" "gitlab.com/elixxir/client/cmix/health" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/nodes" - "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/cmix/pickup" "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" @@ -64,7 +64,7 @@ type client struct { gateway.Sender message.Handler nodes.Registrar - historical.Retriever + rounds.Retriever rounds.Pickup address.Space identity.Tracker @@ -150,7 +150,7 @@ func NewManager(params Params, comms *commClient.Comms, session storage.Session, } // Set up the historical rounds handler - c.Retriever = historical.NewRetriever( + c.Retriever = rounds.NewRetriever( params.Historical, comms, c.Sender, events) // Set up Message Handler diff --git a/cmix/historical/params.go b/cmix/historical/params.go deleted file mode 100644 index ae97e1921..000000000 --- a/cmix/historical/params.go +++ /dev/null @@ -1,30 +0,0 @@ -package historical - -import "time" - -type Params struct { - // MaxHistoricalRounds is the number of historical rounds required to - // automatically send a historical rounds query. - MaxHistoricalRounds uint - - // HistoricalRoundsPeriod is the maximum period of time a pending historical - // round query will wait before it is transmitted. - HistoricalRoundsPeriod time.Duration - - // HistoricalRoundsBufferLen is the length of historical rounds channel - // buffer. - HistoricalRoundsBufferLen uint - - // MaxHistoricalRoundsRetries is the maximum number of times a historical - // round lookup will be attempted. - MaxHistoricalRoundsRetries uint -} - -func GetDefaultParams() Params { - return Params{ - MaxHistoricalRounds: 100, - HistoricalRoundsPeriod: 100 * time.Millisecond, - HistoricalRoundsBufferLen: 1000, - MaxHistoricalRoundsRetries: 3, - } -} diff --git a/cmix/identity/receptionID/identity.go b/cmix/identity/receptionID/identity.go index e0e75425c..3c7526be3 100644 --- a/cmix/identity/receptionID/identity.go +++ b/cmix/identity/receptionID/identity.go @@ -3,7 +3,7 @@ package receptionID import ( "encoding/json" "github.com/pkg/errors" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/primitives/states" "gitlab.com/xx_network/primitives/id" @@ -120,7 +120,7 @@ func (i Identity) Equal(b Identity) bool { // BuildIdentityFromRound returns an EphemeralIdentity that the source would // use to receive messages from the given round func BuildIdentityFromRound(source *id.ID, - round historical.Round) EphemeralIdentity { + round rounds.Round) EphemeralIdentity { ephID, _, _, _ := ephemeral.GetId(source, uint(round.AddressSpaceSize), round.Timestamps[states.QUEUED].UnixNano()) return EphemeralIdentity{ diff --git a/cmix/interface.go b/cmix/interface.go index d008d4398..893200297 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -2,7 +2,7 @@ package cmix import ( "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/nodes" @@ -234,7 +234,7 @@ type Client interface { // GetRoundResults does this lookup when needed, generally that is // preferable LookupHistoricalRound( - rid id.Round, callback historical.RoundResultCallback) error + rid id.Round, callback rounds.RoundResultCallback) error /* === Sender =========================================================== */ /* The sender handles sending comms to the network. It tracks connections to diff --git a/cmix/message/bundle.go b/cmix/message/bundle.go index 2dd4a582d..887148285 100644 --- a/cmix/message/bundle.go +++ b/cmix/message/bundle.go @@ -8,7 +8,7 @@ package message import ( - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -16,7 +16,7 @@ import ( type Bundle struct { Round id.Round - RoundInfo historical.Round + RoundInfo rounds.Round Messages []format.Message Finish func() Identity receptionID.EphemeralIdentity diff --git a/cmix/message/fingerprints_test.go b/cmix/message/fingerprints_test.go index 46407a1b0..6740f3a08 100644 --- a/cmix/message/fingerprints_test.go +++ b/cmix/message/fingerprints_test.go @@ -9,7 +9,7 @@ package message import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -174,6 +174,6 @@ func (mock *MockMsgProcessor) MarkFingerprintUsed(_ format.Fingerprint) { } func (mock *MockMsgProcessor) Process( - format.Message, receptionID.EphemeralIdentity, historical.Round) { + format.Message, receptionID.EphemeralIdentity, rounds.Round) { return } diff --git a/cmix/message/inProgress.go b/cmix/message/inProgress.go index c21ef6f64..147b6401b 100644 --- a/cmix/message/inProgress.go +++ b/cmix/message/inProgress.go @@ -9,7 +9,7 @@ package message import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -56,7 +56,7 @@ func (p *handler) recheckInProgress() { for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { bundle := Bundle{ Round: id.Round(ri.ID), - RoundInfo: historical.MakeRound(ri), + RoundInfo: rounds.MakeRound(ri), Messages: []format.Message{grbldMsg}, Finish: func() {}, Identity: identity, diff --git a/cmix/message/processor.go b/cmix/message/processor.go index 8df34ae5a..c4107476d 100644 --- a/cmix/message/processor.go +++ b/cmix/message/processor.go @@ -1,7 +1,7 @@ package message import ( - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/primitives/format" ) @@ -15,5 +15,5 @@ type Processor interface { // It is a security vulnerability to reuse a fingerprint. It leaks privacy // and can lead to compromise of message contents and integrity. Process(message format.Message, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) } diff --git a/cmix/params.go b/cmix/params.go index 4d61bca66..e334217c5 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -3,9 +3,9 @@ package cmix import ( "encoding/base64" "encoding/json" - "gitlab.com/elixxir/client/cmix/historical" - "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/pickup" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/excludedRounds" "gitlab.com/xx_network/primitives/id" @@ -52,7 +52,7 @@ type Params struct { Rounds rounds.Params Message message.Params - Historical historical.Params + Historical rounds.Params } func GetDefaultParams() Params { @@ -70,7 +70,7 @@ func GetDefaultParams() Params { } n.Rounds = rounds.GetDefaultParams() n.Message = message.GetDefaultParams() - n.Historical = historical.GetDefaultParams() + n.Historical = rounds.GetDefaultParams() return n } diff --git a/cmix/rounds/get.go b/cmix/pickup/get.go similarity index 92% rename from cmix/rounds/get.go rename to cmix/pickup/get.go index 774d64fc0..9ecee34cb 100644 --- a/cmix/rounds/get.go +++ b/cmix/pickup/get.go @@ -9,12 +9,12 @@ package rounds import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/xx_network/primitives/id" ) -func (m *manager) GetMessagesFromRound( +func (m *pickup) GetMessagesFromRound( roundID id.Round, identity receptionID.EphemeralIdentity) { // Get the round from the in-RAM store ri, err := m.instance.GetRound(roundID) @@ -41,7 +41,7 @@ func (m *manager) GetMessagesFromRound( identity.Source) err = m.historical.LookupHistoricalRound( - roundID, func(round historical.Round, success bool) { + roundID, func(round rounds.Round, success bool) { if !success { // TODO: Implement me } @@ -70,7 +70,7 @@ func (m *manager) GetMessagesFromRound( // If found, send to Message Retrieval Workers m.lookupRoundMessages <- roundLookup{ - Round: historical.MakeRound(ri), + Round: rounds.MakeRound(ri), Identity: identity, } } diff --git a/cmix/pickup/params.go b/cmix/pickup/params.go new file mode 100644 index 000000000..ab7f956c9 --- /dev/null +++ b/cmix/pickup/params.go @@ -0,0 +1,44 @@ +package rounds + +import "time" + +type Params struct { + // Number of worker threads for retrieving messages from gateways + NumMessageRetrievalWorkers uint + + // Length of round lookup channel buffer + LookupRoundsBufferLen uint + + // Maximum number of times a historical round lookup will be attempted + MaxHistoricalRoundsRetries uint + + // Interval between checking for rounds in UncheckedRoundStore due for a + // message retrieval retry + UncheckRoundPeriod time.Duration + + // Toggles if message pickup retrying mechanism if forced + // by intentionally not looking up messages + ForceMessagePickupRetry bool + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + + // Disables all attempts to pick up dropped or missed messages + RealtimeOnly bool + + // Toggles if historical rounds should always be used + ForceHistoricalRounds bool +} + +func GetDefaultParams() Params { + return Params{ + NumMessageRetrievalWorkers: 8, + LookupRoundsBufferLen: 2000, + MaxHistoricalRoundsRetries: 3, + UncheckRoundPeriod: 20 * time.Second, + ForceMessagePickupRetry: false, + SendTimeout: 3 * time.Second, + RealtimeOnly: false, + } +} diff --git a/cmix/rounds/manager.go b/cmix/pickup/pickup.go similarity index 89% rename from cmix/rounds/manager.go rename to cmix/pickup/pickup.go index 46e53d5e7..f7653e44d 100644 --- a/cmix/rounds/manager.go +++ b/cmix/pickup/pickup.go @@ -9,10 +9,10 @@ package rounds import ( "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" - "gitlab.com/elixxir/client/cmix/rounds/store" + "gitlab.com/elixxir/client/cmix/pickup/store" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/crypto/fastRNG" @@ -25,14 +25,14 @@ type Pickup interface { GetMessagesFromRound(roundID id.Round, identity receptionID.EphemeralIdentity) } -type manager struct { +type pickup struct { params Params sender gateway.Sender session storage.Session comms MessageRetrievalComms - historical historical.Retriever + historical rounds.Retriever rng *fastRNG.StreamGenerator @@ -45,11 +45,11 @@ type manager struct { } func NewPickup(params Params, bundles chan<- message.Bundle, - sender gateway.Sender, historical historical.Retriever, + sender gateway.Sender, historical rounds.Retriever, rng *fastRNG.StreamGenerator, instance RoundGetter, session storage.Session) Pickup { unchecked := store.NewOrLoadUncheckedStore(session.GetKV()) - m := &manager{ + m := &pickup{ params: params, lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), messageBundles: bundles, @@ -64,7 +64,7 @@ func NewPickup(params Params, bundles chan<- message.Bundle, return m } -func (m *manager) StartProcessors() stoppable.Stoppable { +func (m *pickup) StartProcessors() stoppable.Stoppable { multi := stoppable.NewMulti("Rounds") diff --git a/cmix/rounds/retrieve.go b/cmix/pickup/retrieve.go similarity index 96% rename from cmix/rounds/retrieve.go rename to cmix/pickup/retrieve.go index b3c8e1823..d6d5889c6 100644 --- a/cmix/rounds/retrieve.go +++ b/cmix/pickup/retrieve.go @@ -12,7 +12,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/stoppable" @@ -32,7 +32,7 @@ type MessageRetrievalComms interface { } type roundLookup struct { - Round historical.Round + Round rounds.Round Identity receptionID.EphemeralIdentity } @@ -40,7 +40,7 @@ const noRoundError = "does not have round %d" // processMessageRetrieval received a roundLookup request and pings the gateways // of that round for messages for the requested Identity in the roundLookup. -func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, +func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, stop *stoppable.Single) { for { @@ -158,7 +158,7 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, // getMessagesFromGateway attempts to get messages from their assigned gateway // host in the round specified. If successful -func (m *manager) getMessagesFromGateway(roundID id.Round, +func (m *pickup) getMessagesFromGateway(roundID id.Round, identity receptionID.EphemeralIdentity, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (message.Bundle, error) { start := netTime.Now() @@ -247,7 +247,7 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, // Helper function which forces processUncheckedRounds by randomly not looking // up messages. -func (m *manager) forceMessagePickupRetry(ri historical.Round, rl roundLookup, +func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (bundle message.Bundle, err error) { rnd, _ := m.unchecked.GetRound( diff --git a/cmix/rounds/retrieve_test.go b/cmix/pickup/retrieve_test.go similarity index 98% rename from cmix/rounds/retrieve_test.go rename to cmix/pickup/retrieve_test.go index 2a1c940c9..baacbe18a 100644 --- a/cmix/rounds/retrieve_test.go +++ b/cmix/pickup/retrieve_test.go @@ -9,7 +9,7 @@ package rounds import ( "bytes" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" ephemeral2 "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/stoppable" @@ -71,7 +71,7 @@ func Test_manager_processMessageRetrieval(t *testing.T) { Source: requestGateway, } - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{requestGateway}), } @@ -159,7 +159,7 @@ func Test_manager_processMessageRetrieval_NoRound(t *testing.T) { Source: dummyGateway, } - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{dummyGateway}), } @@ -236,7 +236,7 @@ func Test_manager_processMessageRetrieval_FalsePositive(t *testing.T) { requestGateway := id.NewIdFromString(FalsePositive, id.Gateway, t) - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{requestGateway}), } @@ -309,7 +309,7 @@ func Test_manager_processMessageRetrieval_Quit(t *testing.T) { requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t) - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{requestGateway}), } @@ -379,7 +379,7 @@ func Test_manager_processMessageRetrieval_MultipleGateways(t *testing.T) { Source: requestGateway, } - round := historical.Round{ + round := rounds.Round{ ID: roundId, // Create a list of IDs in which some error gateways must be // contacted before the happy path diff --git a/cmix/rounds/roundGetter.go b/cmix/pickup/roundGetter.go similarity index 100% rename from cmix/rounds/roundGetter.go rename to cmix/pickup/roundGetter.go diff --git a/cmix/rounds/store/roundIdentity.go b/cmix/pickup/store/roundIdentity.go similarity index 100% rename from cmix/rounds/store/roundIdentity.go rename to cmix/pickup/store/roundIdentity.go diff --git a/cmix/rounds/store/store.go b/cmix/pickup/store/store.go similarity index 100% rename from cmix/rounds/store/store.go rename to cmix/pickup/store/store.go diff --git a/cmix/rounds/store/uncheckedRounds.go b/cmix/pickup/store/uncheckedRounds.go similarity index 98% rename from cmix/rounds/store/uncheckedRounds.go rename to cmix/pickup/store/uncheckedRounds.go index 326fff5c2..b15cc5b80 100644 --- a/cmix/rounds/store/uncheckedRounds.go +++ b/cmix/pickup/store/uncheckedRounds.go @@ -33,7 +33,7 @@ const ( uint64Size = 8 // Maximum checks that can be performed on a round. Intended so that a round - // is checked no more than 1 week approximately (network/rounds.cappedTries + 7) + // is checked no more than 1 week approximately (network/pickup.cappedTries + 7) maxChecks = 14 ) diff --git a/cmix/rounds/store/uncheckedRounds_test.go b/cmix/pickup/store/uncheckedRounds_test.go similarity index 100% rename from cmix/rounds/store/uncheckedRounds_test.go rename to cmix/pickup/store/uncheckedRounds_test.go diff --git a/cmix/rounds/unchecked.go b/cmix/pickup/unchecked.go similarity index 96% rename from cmix/rounds/unchecked.go rename to cmix/pickup/unchecked.go index 15859e57f..a09909074 100644 --- a/cmix/rounds/unchecked.go +++ b/cmix/pickup/unchecked.go @@ -10,7 +10,7 @@ package rounds import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/identity/receptionID" - "gitlab.com/elixxir/client/cmix/rounds/store" + "gitlab.com/elixxir/client/cmix/pickup/store" "gitlab.com/elixxir/client/stoppable" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" @@ -41,7 +41,7 @@ var backOffTable = [cappedTries]time.Duration{ // processMessageRetrieval. // TODO: Make this system know which rounds are still in progress instead of // just assume by time -func (m *manager) processUncheckedRounds(checkInterval time.Duration, +func (m *pickup) processUncheckedRounds(checkInterval time.Duration, backoffTable [cappedTries]time.Duration, stop *stoppable.Single) { ticker := time.NewTicker(checkInterval) uncheckedRoundStore := m.unchecked diff --git a/cmix/rounds/unchecked_test.go b/cmix/pickup/unchecked_test.go similarity index 100% rename from cmix/rounds/unchecked_test.go rename to cmix/pickup/unchecked_test.go diff --git a/cmix/rounds/utils_test.go b/cmix/pickup/utils_test.go similarity index 98% rename from cmix/rounds/utils_test.go rename to cmix/pickup/utils_test.go index f74306728..c2c11db71 100644 --- a/cmix/rounds/utils_test.go +++ b/cmix/pickup/utils_test.go @@ -10,7 +10,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/message" - "gitlab.com/elixxir/client/cmix/rounds/store" + "gitlab.com/elixxir/client/cmix/pickup/store" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/testkeys" @@ -24,7 +24,7 @@ import ( "time" ) -func newManager(t *testing.T) *manager { +func newManager(t *testing.T) *pickup { session := storage.InitTestingSession(t) unchecked, err := store.NewUncheckedStore(session.GetKV()) @@ -41,7 +41,7 @@ func newManager(t *testing.T) *manager { }, } - testManager := &manager{ + testManager := &pickup{ params: GetDefaultParams(), session: session, rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), diff --git a/cmix/results.go b/cmix/results.go index 45f09279d..d5bdc7ad4 100644 --- a/cmix/results.go +++ b/cmix/results.go @@ -9,7 +9,7 @@ package cmix import ( "fmt" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "time" jww "github.com/spf13/jwalterweatherman" @@ -42,12 +42,12 @@ func (rr RoundLookupStatus) String() string { type RoundResult struct { Status RoundLookupStatus - Round historical.Round + Round rounds.Round } type historicalRoundsRtn struct { Success bool - Round historical.Round + Round rounds.Round } // RoundEventCallback interface which reports the requested rounds. @@ -106,12 +106,12 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, if states.Round(roundInfo.State) == states.COMPLETED { roundsResults[rnd] = RoundResult{ Status: Succeeded, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } } else if states.Round(roundInfo.State) == states.FAILED { roundsResults[rnd] = RoundResult{ Status: Failed, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } allRoundsSucceeded = false } else { @@ -139,7 +139,7 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, // Find out what happened to old (historical) rounds if any are needed if len(historicalRequest) > 0 { for _, rnd := range historicalRequest { - rrc := func(round historical.Round, success bool) { + rrc := func(round rounds.Round, success bool) { var status RoundLookupStatus if success { status = Succeeded @@ -198,12 +198,12 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, if states.Round(roundInfo.State) == states.COMPLETED { result = RoundResult{ Status: Succeeded, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } } else if states.Round(roundInfo.State) == states.FAILED { result = RoundResult{ Status: Failed, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } allRoundsSucceeded = false } @@ -217,12 +217,12 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, if states.Round(roundReport.RoundInfo.State) == states.COMPLETED { result = RoundResult{ Status: Succeeded, - Round: historical.MakeRound(roundReport.RoundInfo), + Round: rounds.MakeRound(roundReport.RoundInfo), } } else { result = RoundResult{ Status: Failed, - Round: historical.MakeRound(roundReport.RoundInfo), + Round: rounds.MakeRound(roundReport.RoundInfo), } allRoundsSucceeded = false } diff --git a/cmix/historical/historical.go b/cmix/rounds/historical.go similarity index 95% rename from cmix/historical/historical.go rename to cmix/rounds/historical.go index 5e5c4faef..98522670f 100644 --- a/cmix/historical/historical.go +++ b/cmix/rounds/historical.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package historical +package rounds import ( "fmt" @@ -38,15 +38,15 @@ type Retriever interface { type manager struct { params Params - comms RoundsComms + comms Comms sender gateway.Sender events event.Manager c chan roundRequest } -// RoundsComms interface to increase east of testing of historical rounds. -type RoundsComms interface { +// Comms interface to increase east of testing of historical rounds. +type Comms interface { GetHost(hostId *id.ID) (*connect.Host, bool) RequestHistoricalRounds(host *connect.Host, message *pb.HistoricalRounds) ( *pb.HistoricalRoundsResponse, error) @@ -62,7 +62,7 @@ type roundRequest struct { numAttempts uint } -func NewRetriever(param Params, comms RoundsComms, sender gateway.Sender, +func NewRetriever(param Params, comms Comms, sender gateway.Sender, events event.Manager) Retriever { return &manager{ params: param, @@ -100,7 +100,7 @@ func (m *manager) StartProcesses() *stoppable.Single { // processHistoricalRounds is a long-running thread that process historical // rounds. The thread can be killed by triggering the stoppable. It takes a // comms interface to aid in testing. -func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Single) { +func (m *manager) processHistoricalRounds(comm Comms, stop *stoppable.Single) { timerCh := make(<-chan time.Time) var roundRequests []roundRequest diff --git a/cmix/historical/historical_test.go b/cmix/rounds/historical_test.go similarity index 99% rename from cmix/historical/historical_test.go rename to cmix/rounds/historical_test.go index 250c66ab6..807386a25 100644 --- a/cmix/historical/historical_test.go +++ b/cmix/rounds/historical_test.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package historical +package rounds import ( "sync" diff --git a/cmix/rounds/params.go b/cmix/rounds/params.go index ab7f956c9..1023f6022 100644 --- a/cmix/rounds/params.go +++ b/cmix/rounds/params.go @@ -3,42 +3,28 @@ package rounds import "time" type Params struct { - // Number of worker threads for retrieving messages from gateways - NumMessageRetrievalWorkers uint + // MaxHistoricalRounds is the number of historical rounds required to + // automatically send a historical rounds query. + MaxHistoricalRounds uint - // Length of round lookup channel buffer - LookupRoundsBufferLen uint + // HistoricalRoundsPeriod is the maximum period of time a pending historical + // round query will wait before it is transmitted. + HistoricalRoundsPeriod time.Duration - // Maximum number of times a historical round lookup will be attempted - MaxHistoricalRoundsRetries uint - - // Interval between checking for rounds in UncheckedRoundStore due for a - // message retrieval retry - UncheckRoundPeriod time.Duration - - // Toggles if message pickup retrying mechanism if forced - // by intentionally not looking up messages - ForceMessagePickupRetry bool + // HistoricalRoundsBufferLen is the length of historical rounds channel + // buffer. + HistoricalRoundsBufferLen uint - // Duration to wait before sending on a round times out and a new round is - // tried - SendTimeout time.Duration - - // Disables all attempts to pick up dropped or missed messages - RealtimeOnly bool - - // Toggles if historical rounds should always be used - ForceHistoricalRounds bool + // MaxHistoricalRoundsRetries is the maximum number of times a historical + // round lookup will be attempted. + MaxHistoricalRoundsRetries uint } func GetDefaultParams() Params { return Params{ - NumMessageRetrievalWorkers: 8, - LookupRoundsBufferLen: 2000, + MaxHistoricalRounds: 100, + HistoricalRoundsPeriod: 100 * time.Millisecond, + HistoricalRoundsBufferLen: 1000, MaxHistoricalRoundsRetries: 3, - UncheckRoundPeriod: 20 * time.Second, - ForceMessagePickupRetry: false, - SendTimeout: 3 * time.Second, - RealtimeOnly: false, } } diff --git a/cmix/historical/round.go b/cmix/rounds/round.go similarity index 99% rename from cmix/historical/round.go rename to cmix/rounds/round.go index 5dac9bec5..0de38e8ae 100644 --- a/cmix/historical/round.go +++ b/cmix/rounds/round.go @@ -1,4 +1,4 @@ -package historical +package rounds import ( jww "github.com/spf13/jwalterweatherman" diff --git a/storage/utility/round.go b/cmix/rounds/roundStorage.go similarity index 64% rename from storage/utility/round.go rename to cmix/rounds/roundStorage.go index 1227bc6ce..2b92c819f 100644 --- a/storage/utility/round.go +++ b/cmix/rounds/roundStorage.go @@ -1,8 +1,7 @@ -package utility +package rounds import ( "github.com/golang/protobuf/proto" - "gitlab.com/elixxir/client/cmix/historical" "gitlab.com/elixxir/client/storage/versioned" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/primitives/netTime" @@ -11,7 +10,7 @@ import ( const currentRoundVersion = 0 // StoreRound stores the round using the key. -func StoreRound(kv *versioned.KV, round historical.Round, key string) error { +func StoreRound(kv *versioned.KV, round Round, key string) error { now := netTime.Now() marshaled, err := proto.Marshal(round.Raw) @@ -21,7 +20,7 @@ func StoreRound(kv *versioned.KV, round historical.Round, key string) error { } obj := versioned.Object{ - Version: currentCyclicVersion, + Version: currentRoundVersion, Timestamp: now, Data: marshaled, } @@ -30,21 +29,21 @@ func StoreRound(kv *versioned.KV, round historical.Round, key string) error { } // LoadRound stores the round using the key. -func LoadRound(kv *versioned.KV, key string) (historical.Round, error) { +func LoadRound(kv *versioned.KV, key string) (Round, error) { vo, err := kv.Get(key, currentRoundVersion) if err != nil { - return historical.Round{}, err + return Round{}, err } ri := &pb.RoundInfo{} err = proto.Unmarshal(vo.Data, ri) if err != nil { - return historical.Round{}, err + return Round{}, err } - return historical.MakeRound(ri), nil + return MakeRound(ri), nil } func DeleteRound(kv *versioned.KV, key string) error { - return kv.Delete(key, currentCyclicVersion) + return kv.Delete(key, currentRoundVersion) } diff --git a/e2e/processor.go b/e2e/processor.go index 740585989..e57359960 100644 --- a/e2e/processor.go +++ b/e2e/processor.go @@ -2,7 +2,7 @@ package e2e import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/e2e/ratchet/partner/session" "gitlab.com/elixxir/primitives/format" @@ -15,7 +15,7 @@ type processor struct { func (p *processor) Process(ecrMsg format.Message, receptionID receptionID.EphemeralIdentity, - round historical.Round) { + round rounds.Round) { // ensure the key will be marked used before returning defer p.cy.Use() diff --git a/e2e/receive/message.go b/e2e/receive/message.go index 560f2224a..673b1e6ae 100644 --- a/e2e/receive/message.go +++ b/e2e/receive/message.go @@ -2,7 +2,7 @@ package receive import ( "gitlab.com/elixxir/client/catalog" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/crypto/e2e" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" @@ -21,5 +21,5 @@ type Message struct { Encrypted bool - Round historical.Round + Round rounds.Round } diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index 173c0f14f..706d4b7bc 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -17,7 +17,7 @@ import ( "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" session2 "gitlab.com/elixxir/client/e2e/ratchet/partner/session" @@ -305,7 +305,7 @@ func (m *mockNetManager) GetRoundResults(timeout time.Duration, roundCallback cm } func (m *mockNetManager) LookupHistoricalRound( - rid id.Round, callback historical.RoundResultCallback) error { + rid id.Round, callback rounds.RoundResultCallback) error { return nil } diff --git a/e2e/unsafeProcessor.go b/e2e/unsafeProcessor.go index d49842fca..5a9e84390 100644 --- a/e2e/unsafeProcessor.go +++ b/e2e/unsafeProcessor.go @@ -2,7 +2,7 @@ package e2e import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/primitives/format" @@ -15,7 +15,7 @@ type UnsafeProcessor struct { func (up *UnsafeProcessor) Process(ecrMsg format.Message, receptionID receptionID.EphemeralIdentity, - round historical.Round) { + round rounds.Round) { //check if the message is unencrypted unencrypted, sender := e2e.IsUnencrypted(ecrMsg) if !unencrypted { diff --git a/groupChat/receive.go b/groupChat/receive.go index 87f8b1d35..8555fafa9 100644 --- a/groupChat/receive.go +++ b/groupChat/receive.go @@ -10,7 +10,7 @@ package groupChat import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" gs "gitlab.com/elixxir/client/groupChat/groupStore" "gitlab.com/elixxir/crypto/group" @@ -37,7 +37,7 @@ type receptionProcessor struct { } // Process incoming group chat messages -func (p *receptionProcessor) Process(message format.Message, receptionID receptionID.EphemeralIdentity, round historical.Round) { +func (p *receptionProcessor) Process(message format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) { jww.TRACE.Print("Group message reception received cMix message.") // Attempt to read the message diff --git a/groupChat/send_test.go b/groupChat/send_test.go index 55a956d63..ec257fd4e 100644 --- a/groupChat/send_test.go +++ b/groupChat/send_test.go @@ -10,7 +10,7 @@ package groupChat import ( "bytes" "encoding/base64" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" gs "gitlab.com/elixxir/client/groupChat/groupStore" "gitlab.com/elixxir/crypto/group" @@ -51,7 +51,7 @@ func TestManager_Send(t *testing.T) { } for _, msg := range messages { - reception.Process(msg, receptionID.EphemeralIdentity{}, historical.Round{ID: roundId}) + reception.Process(msg, receptionID.EphemeralIdentity{}, rounds.Round{ID: roundId}) select { case result := <-receiveChan: if !result.SenderID.Cmp(m.receptionId) { diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go index 88dc69e1b..efa002ebe 100644 --- a/groupChat/utils_test.go +++ b/groupChat/utils_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" clientE2E "gitlab.com/elixxir/client/e2e" @@ -397,7 +397,7 @@ func (*testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback panic("implement me") } -func (*testNetworkManager) LookupHistoricalRound(rid id.Round, callback historical.RoundResultCallback) error { +func (*testNetworkManager) LookupHistoricalRound(rid id.Round, callback rounds.RoundResultCallback) error { panic("implement me") } -- GitLab