diff --git a/auth/receivedConfirm.go b/auth/receivedConfirm.go index 70da22a76cee612449d5c9aad3224468725344bd..1990fd5bdca5eeb826573b6c06c9151ce96ce25c 100644 --- a/auth/receivedConfirm.go +++ b/auth/receivedConfirm.go @@ -5,7 +5,7 @@ import ( "fmt" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth/store" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/e2e/ratchet/partner/session" @@ -22,7 +22,7 @@ type receivedConfirmService struct { } func (rcs *receivedConfirmService) Process(msg format.Message, - receptionID receptionID.EphemeralIdentity, round historical.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round) { state := rcs.s diff --git a/auth/receivedRequest.go b/auth/receivedRequest.go index 586b048aa8f35d0ace9ff9fcb32ecb00fdcf2cfa..eaea3ddd66ec6f81271cb89099224ed5a86d873c 100644 --- a/auth/receivedRequest.go +++ b/auth/receivedRequest.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/auth/store" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/e2e/ratchet" "gitlab.com/elixxir/crypto/contact" @@ -26,7 +26,7 @@ type receivedRequestService struct { } func (rrs *receivedRequestService) Process(message format.Message, - receptionID receptionID.EphemeralIdentity, round historical.Round) { + receptionID receptionID.EphemeralIdentity, round rounds.Round) { state := rrs.s diff --git a/auth/state.go b/auth/state.go index f75b7fbd508e94febe7791796fc5948489541adf..9c78cc9554924f8a2b212978c8c33d3aa18f0c95 100644 --- a/auth/state.go +++ b/auth/state.go @@ -12,9 +12,9 @@ import ( "github.com/pkg/errors" "gitlab.com/elixxir/client/auth/store" "gitlab.com/elixxir/client/cmix" - "gitlab.com/elixxir/client/cmix/historical" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/storage/versioned" @@ -38,11 +38,11 @@ type state struct { type Callbacks interface { Request(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) Confirm(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) Reset(requestor contact.Contact, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) } // NewState loads the auth state or creates new auth state if one cannot be diff --git a/auth/store/receivedRequest.go b/auth/store/receivedRequest.go index 957af798edfe52c18cf52ba465520194a5a4224f..fe5fa1f7c60c85fdefebd56e9d875e448679a00e 100644 --- a/auth/store/receivedRequest.go +++ b/auth/store/receivedRequest.go @@ -5,7 +5,7 @@ import ( "github.com/cloudflare/circl/dh/sidh" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" util "gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/contact" @@ -23,14 +23,14 @@ type ReceivedRequest struct { theirSidHPubKeyA *sidh.PublicKey //round received on - round historical.Round + round rounds.Round //lock to make sure only one operator at a time mux *sync.Mutex } func newReceivedRequest(kv *versioned.KV, c contact.Contact, - key *sidh.PublicKey, round historical.Round) *ReceivedRequest { + key *sidh.PublicKey, round rounds.Round) *ReceivedRequest { if err := util.StoreContact(kv, c); err != nil { jww.FATAL.Panicf("Failed to save contact for partner %s", c.ID.String()) @@ -43,7 +43,7 @@ func newReceivedRequest(kv *versioned.KV, c contact.Contact, } roundStoreKey := makeRoundKey(c.ID) - if err := util.StoreRound(kv, round, roundStoreKey); err != nil { + if err := rounds.StoreRound(kv, round, roundStoreKey); err != nil { jww.FATAL.Panicf("Failed to save round request was received on "+ "for partner %s", c.ID.String()) } @@ -75,7 +75,7 @@ func loadReceivedRequest(kv *versioned.KV, partner *id.ID) ( partner) } - round, err := util.LoadRound(kv, makeRoundKey(partner)) + round, err := rounds.LoadRound(kv, makeRoundKey(partner)) if err != nil { return nil, errors.WithMessagef(err, "Failed to Load "+ "round request was received on with %s", @@ -98,7 +98,7 @@ func (rr *ReceivedRequest) GetTheirSidHPubKeyA() *sidh.PublicKey { return rr.theirSidHPubKeyA } -func (rr *ReceivedRequest) GetRound() historical.Round { +func (rr *ReceivedRequest) GetRound() rounds.Round { return rr.round } diff --git a/auth/store/store.go b/auth/store/store.go index 6a08e6b931871754209c98dda25173cd83a92e92..a81000f51106e7c86f49ecfc0c3b391bc022c196 100644 --- a/auth/store/store.go +++ b/auth/store/store.go @@ -12,7 +12,7 @@ import ( "github.com/cloudflare/circl/dh/sidh" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/contact" "gitlab.com/elixxir/crypto/cyclic" @@ -210,7 +210,7 @@ func (s *Store) AddSent(partner *id.ID, partnerHistoricalPubKey, myPrivKey, } func (s *Store) AddReceived(c contact.Contact, key *sidh.PublicKey, - round historical.Round) error { + round rounds.Round) error { s.mux.Lock() defer s.mux.Unlock() jww.DEBUG.Printf("AddReceived new contact: %s", c.ID) diff --git a/cmix/client.go b/cmix/client.go index a161824c714cd4320d624f97b5b5ff970fd61918..2beb6873d434e4572f577aa0ebca00825d4ed697 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -15,11 +15,11 @@ import ( "gitlab.com/elixxir/client/cmix/address" "gitlab.com/elixxir/client/cmix/gateway" "gitlab.com/elixxir/client/cmix/health" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/nodes" - "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/cmix/pickup" "gitlab.com/elixxir/client/event" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" @@ -64,7 +64,7 @@ type client struct { gateway.Sender message.Handler nodes.Registrar - historical.Retriever + rounds.Retriever rounds.Pickup address.Space identity.Tracker @@ -150,7 +150,7 @@ func NewManager(params Params, comms *commClient.Comms, session storage.Session, } // Set up the historical rounds handler - c.Retriever = historical.NewRetriever( + c.Retriever = rounds.NewRetriever( params.Historical, comms, c.Sender, events) // Set up Message Handler diff --git a/cmix/historical/params.go b/cmix/historical/params.go deleted file mode 100644 index ae97e1921f28269ac05cf74fb31772e6b4e61959..0000000000000000000000000000000000000000 --- a/cmix/historical/params.go +++ /dev/null @@ -1,30 +0,0 @@ -package historical - -import "time" - -type Params struct { - // MaxHistoricalRounds is the number of historical rounds required to - // automatically send a historical rounds query. - MaxHistoricalRounds uint - - // HistoricalRoundsPeriod is the maximum period of time a pending historical - // round query will wait before it is transmitted. - HistoricalRoundsPeriod time.Duration - - // HistoricalRoundsBufferLen is the length of historical rounds channel - // buffer. - HistoricalRoundsBufferLen uint - - // MaxHistoricalRoundsRetries is the maximum number of times a historical - // round lookup will be attempted. - MaxHistoricalRoundsRetries uint -} - -func GetDefaultParams() Params { - return Params{ - MaxHistoricalRounds: 100, - HistoricalRoundsPeriod: 100 * time.Millisecond, - HistoricalRoundsBufferLen: 1000, - MaxHistoricalRoundsRetries: 3, - } -} diff --git a/cmix/identity/receptionID/identity.go b/cmix/identity/receptionID/identity.go index e0e75425ca6a0db288de908b9c6a8f8736eb40a1..3c7526be3d0677b57694228f77bc98c66590293e 100644 --- a/cmix/identity/receptionID/identity.go +++ b/cmix/identity/receptionID/identity.go @@ -3,7 +3,7 @@ package receptionID import ( "encoding/json" "github.com/pkg/errors" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/primitives/states" "gitlab.com/xx_network/primitives/id" @@ -120,7 +120,7 @@ func (i Identity) Equal(b Identity) bool { // BuildIdentityFromRound returns an EphemeralIdentity that the source would // use to receive messages from the given round func BuildIdentityFromRound(source *id.ID, - round historical.Round) EphemeralIdentity { + round rounds.Round) EphemeralIdentity { ephID, _, _, _ := ephemeral.GetId(source, uint(round.AddressSpaceSize), round.Timestamps[states.QUEUED].UnixNano()) return EphemeralIdentity{ diff --git a/cmix/interface.go b/cmix/interface.go index d008d4398b37b7851f76a37a19f54feae9e6ef3d..89320029719c5f9be6465c26254c0d44869a5f8a 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -2,7 +2,7 @@ package cmix import ( "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/nodes" @@ -234,7 +234,7 @@ type Client interface { // GetRoundResults does this lookup when needed, generally that is // preferable LookupHistoricalRound( - rid id.Round, callback historical.RoundResultCallback) error + rid id.Round, callback rounds.RoundResultCallback) error /* === Sender =========================================================== */ /* The sender handles sending comms to the network. It tracks connections to diff --git a/cmix/message/bundle.go b/cmix/message/bundle.go index 2dd4a582d409495714b1531348fe6d62ab65546c..887148285243b4eaa96c1d4b3ef2cdd74a137c0f 100644 --- a/cmix/message/bundle.go +++ b/cmix/message/bundle.go @@ -8,7 +8,7 @@ package message import ( - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -16,7 +16,7 @@ import ( type Bundle struct { Round id.Round - RoundInfo historical.Round + RoundInfo rounds.Round Messages []format.Message Finish func() Identity receptionID.EphemeralIdentity diff --git a/cmix/message/fingerprints_test.go b/cmix/message/fingerprints_test.go index 46407a1b0d94cff0177ba992696e4097d4eee71e..6740f3a0878b3fbafad189807d50891fff4a6973 100644 --- a/cmix/message/fingerprints_test.go +++ b/cmix/message/fingerprints_test.go @@ -9,7 +9,7 @@ package message import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -174,6 +174,6 @@ func (mock *MockMsgProcessor) MarkFingerprintUsed(_ format.Fingerprint) { } func (mock *MockMsgProcessor) Process( - format.Message, receptionID.EphemeralIdentity, historical.Round) { + format.Message, receptionID.EphemeralIdentity, rounds.Round) { return } diff --git a/cmix/message/inProgress.go b/cmix/message/inProgress.go index c21ef6f64493e1978df6c0cc8761c2afc072df60..147b6401b7e2a292735cbfc0e90bc58f9248a178 100644 --- a/cmix/message/inProgress.go +++ b/cmix/message/inProgress.go @@ -9,7 +9,7 @@ package message import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/format" "gitlab.com/xx_network/primitives/id" @@ -56,7 +56,7 @@ func (p *handler) recheckInProgress() { for grbldMsg, ri, identity, has := p.inProcess.Next(); has; grbldMsg, ri, identity, has = p.inProcess.Next() { bundle := Bundle{ Round: id.Round(ri.ID), - RoundInfo: historical.MakeRound(ri), + RoundInfo: rounds.MakeRound(ri), Messages: []format.Message{grbldMsg}, Finish: func() {}, Identity: identity, diff --git a/cmix/message/processor.go b/cmix/message/processor.go index 8df34ae5ae90160e9b8ce1b78a96fb9abc4da2b6..c4107476d067af8cbd262c37865f3daffa8744d5 100644 --- a/cmix/message/processor.go +++ b/cmix/message/processor.go @@ -1,7 +1,7 @@ package message import ( - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/primitives/format" ) @@ -15,5 +15,5 @@ type Processor interface { // It is a security vulnerability to reuse a fingerprint. It leaks privacy // and can lead to compromise of message contents and integrity. Process(message format.Message, receptionID receptionID.EphemeralIdentity, - round historical.Round) + round rounds.Round) } diff --git a/cmix/params.go b/cmix/params.go index 4d61bca66e2895b3827e3327ccf543ce087d72e5..e334217c5b7dd3e7588ea83c44223275dfd5ef0d 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -3,9 +3,9 @@ package cmix import ( "encoding/base64" "encoding/json" - "gitlab.com/elixxir/client/cmix/historical" - "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/pickup" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/primitives/excludedRounds" "gitlab.com/xx_network/primitives/id" @@ -52,7 +52,7 @@ type Params struct { Rounds rounds.Params Message message.Params - Historical historical.Params + Historical rounds.Params } func GetDefaultParams() Params { @@ -70,7 +70,7 @@ func GetDefaultParams() Params { } n.Rounds = rounds.GetDefaultParams() n.Message = message.GetDefaultParams() - n.Historical = historical.GetDefaultParams() + n.Historical = rounds.GetDefaultParams() return n } diff --git a/cmix/rounds/get.go b/cmix/pickup/get.go similarity index 92% rename from cmix/rounds/get.go rename to cmix/pickup/get.go index 774d64fc0b4092b1d4af3a921716fed92b5eba92..9ecee34cb33a1c0ad491cb09beef3be1c4b8ae44 100644 --- a/cmix/rounds/get.go +++ b/cmix/pickup/get.go @@ -9,12 +9,12 @@ package rounds import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/xx_network/primitives/id" ) -func (m *manager) GetMessagesFromRound( +func (m *pickup) GetMessagesFromRound( roundID id.Round, identity receptionID.EphemeralIdentity) { // Get the round from the in-RAM store ri, err := m.instance.GetRound(roundID) @@ -41,7 +41,7 @@ func (m *manager) GetMessagesFromRound( identity.Source) err = m.historical.LookupHistoricalRound( - roundID, func(round historical.Round, success bool) { + roundID, func(round rounds.Round, success bool) { if !success { // TODO: Implement me } @@ -70,7 +70,7 @@ func (m *manager) GetMessagesFromRound( // If found, send to Message Retrieval Workers m.lookupRoundMessages <- roundLookup{ - Round: historical.MakeRound(ri), + Round: rounds.MakeRound(ri), Identity: identity, } } diff --git a/cmix/pickup/params.go b/cmix/pickup/params.go new file mode 100644 index 0000000000000000000000000000000000000000..ab7f956c91fdf7933a20b417e55cb20c82bca456 --- /dev/null +++ b/cmix/pickup/params.go @@ -0,0 +1,44 @@ +package rounds + +import "time" + +type Params struct { + // Number of worker threads for retrieving messages from gateways + NumMessageRetrievalWorkers uint + + // Length of round lookup channel buffer + LookupRoundsBufferLen uint + + // Maximum number of times a historical round lookup will be attempted + MaxHistoricalRoundsRetries uint + + // Interval between checking for rounds in UncheckedRoundStore due for a + // message retrieval retry + UncheckRoundPeriod time.Duration + + // Toggles if message pickup retrying mechanism if forced + // by intentionally not looking up messages + ForceMessagePickupRetry bool + + // Duration to wait before sending on a round times out and a new round is + // tried + SendTimeout time.Duration + + // Disables all attempts to pick up dropped or missed messages + RealtimeOnly bool + + // Toggles if historical rounds should always be used + ForceHistoricalRounds bool +} + +func GetDefaultParams() Params { + return Params{ + NumMessageRetrievalWorkers: 8, + LookupRoundsBufferLen: 2000, + MaxHistoricalRoundsRetries: 3, + UncheckRoundPeriod: 20 * time.Second, + ForceMessagePickupRetry: false, + SendTimeout: 3 * time.Second, + RealtimeOnly: false, + } +} diff --git a/cmix/rounds/manager.go b/cmix/pickup/pickup.go similarity index 89% rename from cmix/rounds/manager.go rename to cmix/pickup/pickup.go index 46e53d5e7c93ef71f15b44517d094aa673b73817..f7653e44d29413084b4d48f598ce8661507cc38a 100644 --- a/cmix/rounds/manager.go +++ b/cmix/pickup/pickup.go @@ -9,10 +9,10 @@ package rounds import ( "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" - "gitlab.com/elixxir/client/cmix/rounds/store" + "gitlab.com/elixxir/client/cmix/pickup/store" "gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/crypto/fastRNG" @@ -25,14 +25,14 @@ type Pickup interface { GetMessagesFromRound(roundID id.Round, identity receptionID.EphemeralIdentity) } -type manager struct { +type pickup struct { params Params sender gateway.Sender session storage.Session comms MessageRetrievalComms - historical historical.Retriever + historical rounds.Retriever rng *fastRNG.StreamGenerator @@ -45,11 +45,11 @@ type manager struct { } func NewPickup(params Params, bundles chan<- message.Bundle, - sender gateway.Sender, historical historical.Retriever, + sender gateway.Sender, historical rounds.Retriever, rng *fastRNG.StreamGenerator, instance RoundGetter, session storage.Session) Pickup { unchecked := store.NewOrLoadUncheckedStore(session.GetKV()) - m := &manager{ + m := &pickup{ params: params, lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen), messageBundles: bundles, @@ -64,7 +64,7 @@ func NewPickup(params Params, bundles chan<- message.Bundle, return m } -func (m *manager) StartProcessors() stoppable.Stoppable { +func (m *pickup) StartProcessors() stoppable.Stoppable { multi := stoppable.NewMulti("Rounds") diff --git a/cmix/rounds/retrieve.go b/cmix/pickup/retrieve.go similarity index 96% rename from cmix/rounds/retrieve.go rename to cmix/pickup/retrieve.go index b3c8e1823f366c2763912e5e62d50a137480fe2f..d6d5889c676c29ff1a9491762014b4ee51c7458b 100644 --- a/cmix/rounds/retrieve.go +++ b/cmix/pickup/retrieve.go @@ -12,7 +12,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/stoppable" @@ -32,7 +32,7 @@ type MessageRetrievalComms interface { } type roundLookup struct { - Round historical.Round + Round rounds.Round Identity receptionID.EphemeralIdentity } @@ -40,7 +40,7 @@ const noRoundError = "does not have round %d" // processMessageRetrieval received a roundLookup request and pings the gateways // of that round for messages for the requested Identity in the roundLookup. -func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, +func (m *pickup) processMessageRetrieval(comms MessageRetrievalComms, stop *stoppable.Single) { for { @@ -158,7 +158,7 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, // getMessagesFromGateway attempts to get messages from their assigned gateway // host in the round specified. If successful -func (m *manager) getMessagesFromGateway(roundID id.Round, +func (m *pickup) getMessagesFromGateway(roundID id.Round, identity receptionID.EphemeralIdentity, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (message.Bundle, error) { start := netTime.Now() @@ -247,7 +247,7 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, // Helper function which forces processUncheckedRounds by randomly not looking // up messages. -func (m *manager) forceMessagePickupRetry(ri historical.Round, rl roundLookup, +func (m *pickup) forceMessagePickupRetry(ri rounds.Round, rl roundLookup, comms MessageRetrievalComms, gwIds []*id.ID, stop *stoppable.Single) (bundle message.Bundle, err error) { rnd, _ := m.unchecked.GetRound( diff --git a/cmix/rounds/retrieve_test.go b/cmix/pickup/retrieve_test.go similarity index 98% rename from cmix/rounds/retrieve_test.go rename to cmix/pickup/retrieve_test.go index 2a1c940c94d8977a918a3b348390603e8386f7e7..baacbe18a1cb6c364513064247bc655fa256970c 100644 --- a/cmix/rounds/retrieve_test.go +++ b/cmix/pickup/retrieve_test.go @@ -9,7 +9,7 @@ package rounds import ( "bytes" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" ephemeral2 "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/cmix/message" "gitlab.com/elixxir/client/stoppable" @@ -71,7 +71,7 @@ func Test_manager_processMessageRetrieval(t *testing.T) { Source: requestGateway, } - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{requestGateway}), } @@ -159,7 +159,7 @@ func Test_manager_processMessageRetrieval_NoRound(t *testing.T) { Source: dummyGateway, } - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{dummyGateway}), } @@ -236,7 +236,7 @@ func Test_manager_processMessageRetrieval_FalsePositive(t *testing.T) { requestGateway := id.NewIdFromString(FalsePositive, id.Gateway, t) - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{requestGateway}), } @@ -309,7 +309,7 @@ func Test_manager_processMessageRetrieval_Quit(t *testing.T) { requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t) - round := historical.Round{ + round := rounds.Round{ ID: roundId, Topology: connect.NewCircuit([]*id.ID{requestGateway}), } @@ -379,7 +379,7 @@ func Test_manager_processMessageRetrieval_MultipleGateways(t *testing.T) { Source: requestGateway, } - round := historical.Round{ + round := rounds.Round{ ID: roundId, // Create a list of IDs in which some error gateways must be // contacted before the happy path diff --git a/cmix/rounds/roundGetter.go b/cmix/pickup/roundGetter.go similarity index 100% rename from cmix/rounds/roundGetter.go rename to cmix/pickup/roundGetter.go diff --git a/cmix/rounds/store/roundIdentity.go b/cmix/pickup/store/roundIdentity.go similarity index 100% rename from cmix/rounds/store/roundIdentity.go rename to cmix/pickup/store/roundIdentity.go diff --git a/cmix/rounds/store/store.go b/cmix/pickup/store/store.go similarity index 100% rename from cmix/rounds/store/store.go rename to cmix/pickup/store/store.go diff --git a/cmix/rounds/store/uncheckedRounds.go b/cmix/pickup/store/uncheckedRounds.go similarity index 98% rename from cmix/rounds/store/uncheckedRounds.go rename to cmix/pickup/store/uncheckedRounds.go index 326fff5c2e9432240de17647cfab452ee9696307..b15cc5b803cc9d3139746833c60b6baf14e5ac35 100644 --- a/cmix/rounds/store/uncheckedRounds.go +++ b/cmix/pickup/store/uncheckedRounds.go @@ -33,7 +33,7 @@ const ( uint64Size = 8 // Maximum checks that can be performed on a round. Intended so that a round - // is checked no more than 1 week approximately (network/rounds.cappedTries + 7) + // is checked no more than 1 week approximately (network/pickup.cappedTries + 7) maxChecks = 14 ) diff --git a/cmix/rounds/store/uncheckedRounds_test.go b/cmix/pickup/store/uncheckedRounds_test.go similarity index 100% rename from cmix/rounds/store/uncheckedRounds_test.go rename to cmix/pickup/store/uncheckedRounds_test.go diff --git a/cmix/rounds/unchecked.go b/cmix/pickup/unchecked.go similarity index 96% rename from cmix/rounds/unchecked.go rename to cmix/pickup/unchecked.go index 15859e57fb631f23006c640332a482c5719377c2..a09909074a62c0ac6b0f5d66a99feca16086f2bf 100644 --- a/cmix/rounds/unchecked.go +++ b/cmix/pickup/unchecked.go @@ -10,7 +10,7 @@ package rounds import ( jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/identity/receptionID" - "gitlab.com/elixxir/client/cmix/rounds/store" + "gitlab.com/elixxir/client/cmix/pickup/store" "gitlab.com/elixxir/client/stoppable" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" @@ -41,7 +41,7 @@ var backOffTable = [cappedTries]time.Duration{ // processMessageRetrieval. // TODO: Make this system know which rounds are still in progress instead of // just assume by time -func (m *manager) processUncheckedRounds(checkInterval time.Duration, +func (m *pickup) processUncheckedRounds(checkInterval time.Duration, backoffTable [cappedTries]time.Duration, stop *stoppable.Single) { ticker := time.NewTicker(checkInterval) uncheckedRoundStore := m.unchecked diff --git a/cmix/rounds/unchecked_test.go b/cmix/pickup/unchecked_test.go similarity index 100% rename from cmix/rounds/unchecked_test.go rename to cmix/pickup/unchecked_test.go diff --git a/cmix/rounds/utils_test.go b/cmix/pickup/utils_test.go similarity index 98% rename from cmix/rounds/utils_test.go rename to cmix/pickup/utils_test.go index f743067287a0c00750edc4e4f775f8d94062053c..c2c11db7119d7e8e14d10bcbecead29f5a648fd9 100644 --- a/cmix/rounds/utils_test.go +++ b/cmix/pickup/utils_test.go @@ -10,7 +10,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/cmix/message" - "gitlab.com/elixxir/client/cmix/rounds/store" + "gitlab.com/elixxir/client/cmix/pickup/store" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/testkeys" @@ -24,7 +24,7 @@ import ( "time" ) -func newManager(t *testing.T) *manager { +func newManager(t *testing.T) *pickup { session := storage.InitTestingSession(t) unchecked, err := store.NewUncheckedStore(session.GetKV()) @@ -41,7 +41,7 @@ func newManager(t *testing.T) *manager { }, } - testManager := &manager{ + testManager := &pickup{ params: GetDefaultParams(), session: session, rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), diff --git a/cmix/results.go b/cmix/results.go index 45f09279de37b13727520a77654c3ce4d84222e1..d5bdc7ad42541ac14895e8b685f0b753b81af454 100644 --- a/cmix/results.go +++ b/cmix/results.go @@ -9,7 +9,7 @@ package cmix import ( "fmt" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "time" jww "github.com/spf13/jwalterweatherman" @@ -42,12 +42,12 @@ func (rr RoundLookupStatus) String() string { type RoundResult struct { Status RoundLookupStatus - Round historical.Round + Round rounds.Round } type historicalRoundsRtn struct { Success bool - Round historical.Round + Round rounds.Round } // RoundEventCallback interface which reports the requested rounds. @@ -106,12 +106,12 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, if states.Round(roundInfo.State) == states.COMPLETED { roundsResults[rnd] = RoundResult{ Status: Succeeded, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } } else if states.Round(roundInfo.State) == states.FAILED { roundsResults[rnd] = RoundResult{ Status: Failed, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } allRoundsSucceeded = false } else { @@ -139,7 +139,7 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, // Find out what happened to old (historical) rounds if any are needed if len(historicalRequest) > 0 { for _, rnd := range historicalRequest { - rrc := func(round historical.Round, success bool) { + rrc := func(round rounds.Round, success bool) { var status RoundLookupStatus if success { status = Succeeded @@ -198,12 +198,12 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, if states.Round(roundInfo.State) == states.COMPLETED { result = RoundResult{ Status: Succeeded, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } } else if states.Round(roundInfo.State) == states.FAILED { result = RoundResult{ Status: Failed, - Round: historical.MakeRound(roundInfo), + Round: rounds.MakeRound(roundInfo), } allRoundsSucceeded = false } @@ -217,12 +217,12 @@ func (c *client) getRoundResults(roundList []id.Round, timeout time.Duration, if states.Round(roundReport.RoundInfo.State) == states.COMPLETED { result = RoundResult{ Status: Succeeded, - Round: historical.MakeRound(roundReport.RoundInfo), + Round: rounds.MakeRound(roundReport.RoundInfo), } } else { result = RoundResult{ Status: Failed, - Round: historical.MakeRound(roundReport.RoundInfo), + Round: rounds.MakeRound(roundReport.RoundInfo), } allRoundsSucceeded = false } diff --git a/cmix/historical/historical.go b/cmix/rounds/historical.go similarity index 95% rename from cmix/historical/historical.go rename to cmix/rounds/historical.go index 5e5c4faefcd04a92fd3734775aa6a11459c45439..98522670faf6a2635e37d300f5df25db4c451b75 100644 --- a/cmix/historical/historical.go +++ b/cmix/rounds/historical.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package historical +package rounds import ( "fmt" @@ -38,15 +38,15 @@ type Retriever interface { type manager struct { params Params - comms RoundsComms + comms Comms sender gateway.Sender events event.Manager c chan roundRequest } -// RoundsComms interface to increase east of testing of historical rounds. -type RoundsComms interface { +// Comms interface to increase east of testing of historical rounds. +type Comms interface { GetHost(hostId *id.ID) (*connect.Host, bool) RequestHistoricalRounds(host *connect.Host, message *pb.HistoricalRounds) ( *pb.HistoricalRoundsResponse, error) @@ -62,7 +62,7 @@ type roundRequest struct { numAttempts uint } -func NewRetriever(param Params, comms RoundsComms, sender gateway.Sender, +func NewRetriever(param Params, comms Comms, sender gateway.Sender, events event.Manager) Retriever { return &manager{ params: param, @@ -100,7 +100,7 @@ func (m *manager) StartProcesses() *stoppable.Single { // processHistoricalRounds is a long-running thread that process historical // rounds. The thread can be killed by triggering the stoppable. It takes a // comms interface to aid in testing. -func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Single) { +func (m *manager) processHistoricalRounds(comm Comms, stop *stoppable.Single) { timerCh := make(<-chan time.Time) var roundRequests []roundRequest diff --git a/cmix/historical/historical_test.go b/cmix/rounds/historical_test.go similarity index 99% rename from cmix/historical/historical_test.go rename to cmix/rounds/historical_test.go index 250c66ab6b7066089af308d1ea677ad502f51991..807386a2557bcc84392e024d99d1ca0aedc30e1d 100644 --- a/cmix/historical/historical_test.go +++ b/cmix/rounds/historical_test.go @@ -5,7 +5,7 @@ // LICENSE file // /////////////////////////////////////////////////////////////////////////////// -package historical +package rounds import ( "sync" diff --git a/cmix/rounds/params.go b/cmix/rounds/params.go index ab7f956c91fdf7933a20b417e55cb20c82bca456..1023f6022acce31a971747c5f31bfebdec724c03 100644 --- a/cmix/rounds/params.go +++ b/cmix/rounds/params.go @@ -3,42 +3,28 @@ package rounds import "time" type Params struct { - // Number of worker threads for retrieving messages from gateways - NumMessageRetrievalWorkers uint + // MaxHistoricalRounds is the number of historical rounds required to + // automatically send a historical rounds query. + MaxHistoricalRounds uint - // Length of round lookup channel buffer - LookupRoundsBufferLen uint + // HistoricalRoundsPeriod is the maximum period of time a pending historical + // round query will wait before it is transmitted. + HistoricalRoundsPeriod time.Duration - // Maximum number of times a historical round lookup will be attempted - MaxHistoricalRoundsRetries uint - - // Interval between checking for rounds in UncheckedRoundStore due for a - // message retrieval retry - UncheckRoundPeriod time.Duration - - // Toggles if message pickup retrying mechanism if forced - // by intentionally not looking up messages - ForceMessagePickupRetry bool + // HistoricalRoundsBufferLen is the length of historical rounds channel + // buffer. + HistoricalRoundsBufferLen uint - // Duration to wait before sending on a round times out and a new round is - // tried - SendTimeout time.Duration - - // Disables all attempts to pick up dropped or missed messages - RealtimeOnly bool - - // Toggles if historical rounds should always be used - ForceHistoricalRounds bool + // MaxHistoricalRoundsRetries is the maximum number of times a historical + // round lookup will be attempted. + MaxHistoricalRoundsRetries uint } func GetDefaultParams() Params { return Params{ - NumMessageRetrievalWorkers: 8, - LookupRoundsBufferLen: 2000, + MaxHistoricalRounds: 100, + HistoricalRoundsPeriod: 100 * time.Millisecond, + HistoricalRoundsBufferLen: 1000, MaxHistoricalRoundsRetries: 3, - UncheckRoundPeriod: 20 * time.Second, - ForceMessagePickupRetry: false, - SendTimeout: 3 * time.Second, - RealtimeOnly: false, } } diff --git a/cmix/historical/round.go b/cmix/rounds/round.go similarity index 99% rename from cmix/historical/round.go rename to cmix/rounds/round.go index 5dac9bec59a62c99ab9059b68fc86ed07f1a4aec..0de38e8aedf6ba06b2eaf6d363d999235ccde874 100644 --- a/cmix/historical/round.go +++ b/cmix/rounds/round.go @@ -1,4 +1,4 @@ -package historical +package rounds import ( jww "github.com/spf13/jwalterweatherman" diff --git a/storage/utility/round.go b/cmix/rounds/roundStorage.go similarity index 64% rename from storage/utility/round.go rename to cmix/rounds/roundStorage.go index 1227bc6ce2fc5c8fcdec24d12ca5836f31cc6fcf..2b92c819f9b6279dd30d4b8b37a503b146168501 100644 --- a/storage/utility/round.go +++ b/cmix/rounds/roundStorage.go @@ -1,8 +1,7 @@ -package utility +package rounds import ( "github.com/golang/protobuf/proto" - "gitlab.com/elixxir/client/cmix/historical" "gitlab.com/elixxir/client/storage/versioned" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/primitives/netTime" @@ -11,7 +10,7 @@ import ( const currentRoundVersion = 0 // StoreRound stores the round using the key. -func StoreRound(kv *versioned.KV, round historical.Round, key string) error { +func StoreRound(kv *versioned.KV, round Round, key string) error { now := netTime.Now() marshaled, err := proto.Marshal(round.Raw) @@ -21,7 +20,7 @@ func StoreRound(kv *versioned.KV, round historical.Round, key string) error { } obj := versioned.Object{ - Version: currentCyclicVersion, + Version: currentRoundVersion, Timestamp: now, Data: marshaled, } @@ -30,21 +29,21 @@ func StoreRound(kv *versioned.KV, round historical.Round, key string) error { } // LoadRound stores the round using the key. -func LoadRound(kv *versioned.KV, key string) (historical.Round, error) { +func LoadRound(kv *versioned.KV, key string) (Round, error) { vo, err := kv.Get(key, currentRoundVersion) if err != nil { - return historical.Round{}, err + return Round{}, err } ri := &pb.RoundInfo{} err = proto.Unmarshal(vo.Data, ri) if err != nil { - return historical.Round{}, err + return Round{}, err } - return historical.MakeRound(ri), nil + return MakeRound(ri), nil } func DeleteRound(kv *versioned.KV, key string) error { - return kv.Delete(key, currentCyclicVersion) + return kv.Delete(key, currentRoundVersion) } diff --git a/e2e/processor.go b/e2e/processor.go index 7405859894c12ece7243f9aba78e2bc0ec1eb6a6..e57359960e926cd9e8d19275a94447f62c7f2094 100644 --- a/e2e/processor.go +++ b/e2e/processor.go @@ -2,7 +2,7 @@ package e2e import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/client/e2e/ratchet/partner/session" "gitlab.com/elixxir/primitives/format" @@ -15,7 +15,7 @@ type processor struct { func (p *processor) Process(ecrMsg format.Message, receptionID receptionID.EphemeralIdentity, - round historical.Round) { + round rounds.Round) { // ensure the key will be marked used before returning defer p.cy.Use() diff --git a/e2e/receive/message.go b/e2e/receive/message.go index 560f2224a5aa21e581a4b84e86c1d3e385f92b4b..673b1e6ae0f8d2a72bdfc1ee6076f011a8b96088 100644 --- a/e2e/receive/message.go +++ b/e2e/receive/message.go @@ -2,7 +2,7 @@ package receive import ( "gitlab.com/elixxir/client/catalog" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/crypto/e2e" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" @@ -21,5 +21,5 @@ type Message struct { Encrypted bool - Round historical.Round + Round rounds.Round } diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index 173c0f14f02060736cdd167c2748435ae01b1f8a..706d4b7bc19371cb771a845829bf507074e3b489 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -17,7 +17,7 @@ import ( "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" session2 "gitlab.com/elixxir/client/e2e/ratchet/partner/session" @@ -305,7 +305,7 @@ func (m *mockNetManager) GetRoundResults(timeout time.Duration, roundCallback cm } func (m *mockNetManager) LookupHistoricalRound( - rid id.Round, callback historical.RoundResultCallback) error { + rid id.Round, callback rounds.RoundResultCallback) error { return nil } diff --git a/e2e/unsafeProcessor.go b/e2e/unsafeProcessor.go index d49842fca45d63520c20a0e64a8793b48b1a107b..5a9e84390531f6d37fe5e391ed646ee5530b2d0c 100644 --- a/e2e/unsafeProcessor.go +++ b/e2e/unsafeProcessor.go @@ -2,7 +2,7 @@ package e2e import ( jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" "gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/primitives/format" @@ -15,7 +15,7 @@ type UnsafeProcessor struct { func (up *UnsafeProcessor) Process(ecrMsg format.Message, receptionID receptionID.EphemeralIdentity, - round historical.Round) { + round rounds.Round) { //check if the message is unencrypted unencrypted, sender := e2e.IsUnencrypted(ecrMsg) if !unencrypted { diff --git a/groupChat/receive.go b/groupChat/receive.go index 87f8b1d35db9ffeebebe699503ee8692db5702e1..8555fafa98e39a73565a6c1c4975e3d83004f803 100644 --- a/groupChat/receive.go +++ b/groupChat/receive.go @@ -10,7 +10,7 @@ package groupChat import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" gs "gitlab.com/elixxir/client/groupChat/groupStore" "gitlab.com/elixxir/crypto/group" @@ -37,7 +37,7 @@ type receptionProcessor struct { } // Process incoming group chat messages -func (p *receptionProcessor) Process(message format.Message, receptionID receptionID.EphemeralIdentity, round historical.Round) { +func (p *receptionProcessor) Process(message format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) { jww.TRACE.Print("Group message reception received cMix message.") // Attempt to read the message diff --git a/groupChat/send_test.go b/groupChat/send_test.go index 55a956d63a035623d1ca90ad4ac135aa40c6c973..ec257fd4e11dd2bbadc9f614fcb5b1afc12c4972 100644 --- a/groupChat/send_test.go +++ b/groupChat/send_test.go @@ -10,7 +10,7 @@ package groupChat import ( "bytes" "encoding/base64" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity/receptionID" gs "gitlab.com/elixxir/client/groupChat/groupStore" "gitlab.com/elixxir/crypto/group" @@ -51,7 +51,7 @@ func TestManager_Send(t *testing.T) { } for _, msg := range messages { - reception.Process(msg, receptionID.EphemeralIdentity{}, historical.Round{ID: roundId}) + reception.Process(msg, receptionID.EphemeralIdentity{}, rounds.Round{ID: roundId}) select { case result := <-receiveChan: if !result.SenderID.Cmp(m.receptionId) { diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go index 88dc69e1b2e79081c0d4817ef675f41bb39628fb..efa002ebef24db9516325115dee171da848da1d8 100644 --- a/groupChat/utils_test.go +++ b/groupChat/utils_test.go @@ -14,7 +14,7 @@ import ( "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/cmix" "gitlab.com/elixxir/client/cmix/gateway" - "gitlab.com/elixxir/client/cmix/historical" + "gitlab.com/elixxir/client/cmix/rounds" "gitlab.com/elixxir/client/cmix/identity" "gitlab.com/elixxir/client/cmix/message" clientE2E "gitlab.com/elixxir/client/e2e" @@ -397,7 +397,7 @@ func (*testNetworkManager) GetRoundResults(timeout time.Duration, roundCallback panic("implement me") } -func (*testNetworkManager) LookupHistoricalRound(rid id.Round, callback historical.RoundResultCallback) error { +func (*testNetworkManager) LookupHistoricalRound(rid id.Round, callback rounds.RoundResultCallback) error { panic("implement me") }