diff --git a/README.md b/README.md index 4509379e01b025100393bdb345e9b0c63ee4d5e5..ebf490f59a5ab4ac6c8a154785443cff0ed28310 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,9 @@ Flags: base64 representations) (default "0") --forceHistoricalRounds Force all rounds to be sent to historical round retrieval + --forceMessagePickupRetry Enable a mechanism which forces a 50% chance + of no message pickup, instead triggering the + message pickup retry mechanism -h, --help help for client -l, --log string Path to the log output path (- is stdout) (default "-") diff --git a/cmd/root.go b/cmd/root.go index 469b227d36d7e16fc60f1d189afc3c06c59ae5ce..3a7b9beeff41b1cd1f9a69a910c89159bd1f1271 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -368,6 +368,7 @@ func createClient() *api.Client { viper.GetUint("e2eNumReKeys")) netParams.ForceHistoricalRounds = viper.GetBool("forceHistoricalRounds") netParams.FastPolling = !viper.GetBool("slowPolling") + netParams.ForceMessagePickupRetry = viper.GetBool("forceMessagePickupRetry") client, err := api.OpenClient(storeDir, []byte(pass), netParams) if err != nil { @@ -389,6 +390,7 @@ func initClient() *api.Client { viper.GetUint("e2eNumReKeys")) netParams.ForceHistoricalRounds = viper.GetBool("forceHistoricalRounds") netParams.FastPolling = viper.GetBool(" slowPolling") + netParams.ForceMessagePickupRetry = viper.GetBool("forceMessagePickupRetry") //load the client client, err := api.Login(storeDir, []byte(pass), netParams) @@ -786,6 +788,11 @@ func init() { "Enables polling for unfiltered network updates with RSA signatures") viper.BindPFlag("slowPolling", rootCmd.Flags().Lookup("slowPolling")) + rootCmd.Flags().Bool("forceMessagePickupRetry", false, + "Enable a mechanism which forces a 50% chance of no message pickup, " + + "instead triggering the message pickup retry mechanism") + viper.BindPFlag("forceMessagePickupRetry", + rootCmd.Flags().Lookup("forceMessagePickupRetry")) // E2E Params defaultE2EParams := params.GetDefaultE2ESessionParams() diff --git a/go.mod b/go.mod index 4f83d7b3d397a2973da761470650fc72f7dfb164..b343cc9300217c314b0069445c468e35101bf135 100644 --- a/go.mod +++ b/go.mod @@ -17,13 +17,14 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 - gitlab.com/elixxir/comms v0.0.4-0.20210526002834-a1bcf83670ce + gitlab.com/elixxir/comms v0.0.4-0.20210527192434-4f4f53730861 gitlab.com/elixxir/crypto v0.0.7-0.20210526002540-1fb51df5b4b2 gitlab.com/elixxir/ekv v0.1.5 gitlab.com/elixxir/primitives v0.0.3-0.20210526002350-b9c947fec050 gitlab.com/xx_network/comms v0.0.4-0.20210526002311-2b5a66af0eac gitlab.com/xx_network/crypto v0.0.5-0.20210526002149-9c08ccb202be gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd + gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5 // indirect golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect diff --git a/go.sum b/go.sum index 650c87ee12905dc29b08dee2e584788bf874e984..e3f4a78aee130aa1e2f13684f6f6bb8a0f6606e3 100644 --- a/go.sum +++ b/go.sum @@ -244,6 +244,12 @@ gitlab.com/elixxir/comms v0.0.4-0.20210524170509-89dd425cb228 h1:btpdRz3MDO651yu gitlab.com/elixxir/comms v0.0.4-0.20210524170509-89dd425cb228/go.mod h1:IJ2l4MFEpYk5VdPdsbTlZYh3owurHlnvxPv2c+dJZAA= gitlab.com/elixxir/comms v0.0.4-0.20210526002834-a1bcf83670ce h1:aMaMqzBhLT3Ug3FhZlzwsb2MU7RFNKdOhC0l/pM8pZo= gitlab.com/elixxir/comms v0.0.4-0.20210526002834-a1bcf83670ce/go.mod h1:RyA3YVhyPRTxE+jjf6CV8elwQiFr8QuS54LcdLWw97E= +gitlab.com/elixxir/comms v0.0.4-0.20210526162037-5fc5b5633ccf h1:aICCHX2ADeVIjxEm1cT4W06tu4t4r0Akcba67msGlAU= +gitlab.com/elixxir/comms v0.0.4-0.20210526162037-5fc5b5633ccf/go.mod h1:RyA3YVhyPRTxE+jjf6CV8elwQiFr8QuS54LcdLWw97E= +gitlab.com/elixxir/comms v0.0.4-0.20210526170645-e7a77c41f345 h1:FvCaE82edqf1Ovl/5xHZmejlQwRukNL4hwOm1sfGKvc= +gitlab.com/elixxir/comms v0.0.4-0.20210526170645-e7a77c41f345/go.mod h1:7+fK1U++g5zbbDh1kCSd+LUf7ikWfD+1lM44rWUm0F4= +gitlab.com/elixxir/comms v0.0.4-0.20210527192434-4f4f53730861 h1:EwL+PZ5I2sumbR5A69vM94Nkuqr11eYv951Xw3yenOg= +gitlab.com/elixxir/comms v0.0.4-0.20210527192434-4f4f53730861/go.mod h1:Wru310EhfTp6ZPeTs3JqcPjBueCqf/VvsPkWw1QtL+Q= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= gitlab.com/elixxir/crypto v0.0.7-0.20210521205349-cb0c5cdd44e3/go.mod h1:x6QKxPWIMH742i2p0IN47wkqcpTntwnMFLYcUfUeRfA= @@ -291,6 +297,10 @@ gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd h1:+vQkuM/ gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0= gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= +gitlab.com/xx_network/ring v0.0.3-0.20210525230516-6a1185d19af8 h1:QUmZ2KyTquOjLygap91CU5p+MSRqq6Ssse8v6xGWw60= +gitlab.com/xx_network/ring v0.0.3-0.20210525230516-6a1185d19af8/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= +gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5 h1:FY+4Rh1Q2rgLyv10aKJjhWApuKRCR/054XhreudfAvw= +gitlab.com/xx_network/ring v0.0.3-0.20210527191221-ce3f170aabd5/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index 07c4c3c25d83f3c115263bef0269ab1c9226c7ee..4a6ffab984f3ba309d7fdb77cf7cfb1710f8cef4 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -31,6 +31,14 @@ type Rounds struct { // Maximum number of times a historical round lookup will be attempted MaxHistoricalRoundsRetries uint + + // Interval between checking for rounds in UncheckedRoundStore + // due for a message retrieval retry + UncheckRoundPeriod time.Duration + + // Toggles if message pickup retrying mechanism if forced + // by intentionally not looking up messages + ForceMessagePickupRetry bool } func GetDefaultRounds() Rounds { @@ -43,5 +51,7 @@ func GetDefaultRounds() Rounds { LookupRoundsBufferLen: 2000, ForceHistoricalRounds: false, MaxHistoricalRoundsRetries: 3, + UncheckRoundPeriod: 20 * time.Second, + ForceMessagePickupRetry: false, } } diff --git a/network/follow.go b/network/follow.go index 05d8134600d2370e647363e56624005d39fc472a..8669ea162a38dc51bdbb7601f8023a19406b7351 100644 --- a/network/follow.go +++ b/network/follow.go @@ -165,7 +165,13 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, } // Update the address space size - m.addrSpace.Update(m.Instance.GetPartialNdf().Get().AddressSpace[0].Size) + // todo: this is a fix for incompatibility with the live network + // remove once the live network has been pushed to + if len(m.Instance.GetPartialNdf().Get().AddressSpace)!=0{ + m.addrSpace.Update(m.Instance.GetPartialNdf().Get().AddressSpace[0].Size) + }else{ + m.addrSpace.Update(18) + } // NOTE: this updates rounds and updates the tracking of the health of the // network diff --git a/network/rounds/manager.go b/network/rounds/manager.go index 942e86319efe8ab05711b901360edbcd37865978..c488e39f3b2dc2311166b99d21b680dd3d0b8028 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -57,5 +57,11 @@ func (m *Manager) StartProcessors() stoppable.Stoppable { go m.processMessageRetrieval(m.Comms, stopper.Quit()) multi.Add(stopper) } + + // Start the periodic unchecked round worker + stopper := stoppable.NewSingle("UncheckRound") + go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper.Quit()) + multi.Add(stopper) + return multi } diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index c12c91998f3aa775da68df6abc9bb60db1357cfe..e8c6d9f952680887c158cf6b9c11fc3557da3618 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -8,6 +8,7 @@ package rounds import ( + "encoding/binary" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/network/message" @@ -44,7 +45,14 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, case <-quitCh: done = true case rl := <-m.lookupRoundMessages: + // wrap this around the pickup logic ri := rl.roundInfo + err := m.Session.UncheckedRounds().AddRound(rl.roundInfo, + rl.identity.EphId, rl.identity.Source) + if err != nil { + jww.ERROR.Printf("Could not find round %d in unchecked rounds store: %v", + rl.roundInfo.ID, err) + } // Convert gateways in round to proper ID format gwIds := make([]*id.ID, len(ri.Topology)) @@ -56,12 +64,11 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, gwId.SetType(id.Gateway) gwIds[i] = gwId } - // Target the last node in the team first because it has // messages first, randomize other members of the team var rndBytes [32]byte stream := m.Rng.GetStream() - _, err := stream.Read(rndBytes[:]) + _, err = stream.Read(rndBytes[:]) stream.Close() if err != nil { jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ @@ -73,18 +80,38 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1] }) - // Attempt to request for this gateway - bundle, err := m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds) + // If ForceMessagePickupRetry, we are forcing processUncheckedRounds by + // randomly not picking up messages + var bundle message.Bundle + if m.params.ForceMessagePickupRetry { + jww.DEBUG.Printf("Forcing message pickup retry") + bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds) + if err != nil { + jww.ERROR.Printf("Failed to get pickup round %d "+ + "from all gateways (%v): %s", + id.Round(ri.ID), gwIds, err) + } + } else { + // Attempt to request for this gateway + bundle, err = m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds) + // After trying all gateways, if none returned we mark the round as a + // failure and print out the last error + if err != nil { + jww.ERROR.Printf("Failed to get pickup round %d "+ + "from all gateways (%v): %s", + id.Round(ri.ID), gwIds, err) + } - // After trying all gateways, if none returned we mark the round as a - // failure and print out the last error - if err != nil { - jww.ERROR.Printf("Failed to get pickup round %d "+ - "from all gateways (%v): %s", - id.Round(ri.ID), gwIds, err) } + if len(bundle.Messages) != 0 { + err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID)) + if err != nil { + jww.ERROR.Printf("Could not remove round %d " + + "from unchecked rounds store: %v", ri.ID, err) + } + // If successful and there are messages, we send them to another thread bundle.Identity = rl.identity m.messageBundles <- bundle @@ -160,3 +187,26 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id return bundle, nil } + +// Helper function which forces processUncheckedRounds by randomly +// not looking up messages +func (m *Manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, + comms messageRetrievalComms, gwIds []*id.ID) (bundle message.Bundle, err error) { + // Flip a coin to determine whether to pick up message + stream := m.Rng.GetStream() + defer stream.Close() + b := make([]byte, 8) + _, err = stream.Read(b) + if err != nil { + jww.FATAL.Panic(err.Error()) + } + result := binary.BigEndian.Uint64(b) + if result%2 == 0 { + // Do not call get message, leaving the round to be picked up + // in unchecked round scheduler process + return + } + + // Attempt to request for this gateway + return m.getMessagesFromGateway(id.Round(ri.ID), rl.identity, comms, gwIds) +} \ No newline at end of file diff --git a/network/rounds/unchecked.go b/network/rounds/unchecked.go new file mode 100644 index 0000000000000000000000000000000000000000..d049e7dda07917d303bb381dc5d87d543d913f43 --- /dev/null +++ b/network/rounds/unchecked.go @@ -0,0 +1,100 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package rounds + +import ( + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/storage/reception" + "gitlab.com/xx_network/primitives/netTime" + "time" +) + +// Constants for message retrieval backoff delays +const ( + tryZero = 10 * time.Second + tryOne = 30 * time.Second + tryTwo = 5 * time.Minute + tryThree = 30 * time.Minute + tryFour = 3 * time.Hour + tryFive = 12 * time.Hour + trySix = 24 * time.Hour + // Amount of tries past which the + // backoff will not increase + cappedTries = 7 +) + + +var backOffTable = [cappedTries]time.Duration{tryZero, tryOne, tryTwo, tryThree, tryFour, tryFive, trySix} + +// processUncheckedRounds will (periodically) check every checkInterval +// for rounds that failed message retrieval in processMessageRetrieval. +// Rounds will have a backoff duration in which they will be tried again. +// If a round is found to be due on a periodical check, the round is sent +// back to processMessageRetrieval. +func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTable [cappedTries]time.Duration, + quitCh <-chan struct{}) { + ticker := time.NewTicker(checkInterval) + uncheckedRoundStore := m.Session.UncheckedRounds() + done := false + for !done { + select { + case <-quitCh: + done = true + + case <-ticker.C: + // Pull and iterate through uncheckedRound list + roundList := m.Session.UncheckedRounds().GetList() + for rid, rnd := range roundList { + // If this round is due for a round check, send the round over + // to the retrieval thread. If not due, check next round. + if isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { + jww.DEBUG.Printf("Round %d due for a message lookup, retrying...", rid) + // Construct roundLookup object to send + rl := roundLookup{ + roundInfo: rnd.Info, + identity: reception.IdentityUse{ + Identity: reception.Identity{ + EphId: rnd.EpdId, + Source: rnd.Source, + }, + }, + } + + // Send to processMessageRetrieval + select { + case m.lookupRoundMessages <- rl: + case <- time.After(500*time.Second): + } + + // Update the state of the round for next look-up (if needed) + err := uncheckedRoundStore.IncrementCheck(rid) + if err != nil { + jww.ERROR.Printf("processUncheckedRounds error: Could not "+ + "increment check attempts for round %d: %v", rid, err) + } + + } + + } + } + } +} + +// isRoundCheckDue given the amount of tries and the timestamp the round +// was stored, determines whether this round is due for another check. +// Returns true if a new check is due +func isRoundCheckDue(tries uint64, ts time.Time, backoffTable [cappedTries]time.Duration) bool { + now := netTime.Now() + + if tries > cappedTries { + tries = cappedTries + } + roundCheckTime := ts.Add(backoffTable[tries]) + + return now.After(roundCheckTime) +} diff --git a/network/rounds/unchecked_test.go b/network/rounds/unchecked_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cc66c02712acbffae509eeb7270cc88463e80c58 --- /dev/null +++ b/network/rounds/unchecked_test.go @@ -0,0 +1,97 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package rounds + +import ( + "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/client/network/message" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/ndf" + "reflect" + "testing" + "time" +) + +// Happy path +func TestUncheckedRoundScheduler(t *testing.T) { + // General initializations + testManager := newManager(t) + roundId := id.Round(5) + mockComms := &mockMessageRetrievalComms{testingSignature: t} + quitChan := make(chan struct{}) + testNdf := getNDF() + nodeId := id.NewIdFromString(ReturningGateway, id.Node, &testing.T{}) + gwId := nodeId.DeepCopy() + gwId.SetType(id.Gateway) + testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} + p := gateway.DefaultPoolParams() + p.MaxPoolSize = 1 + testManager.sender, _ = gateway.NewSender(p, + fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), + testNdf, mockComms, testManager.Session, nil) + + // Create a local channel so reception is possible (testManager.messageBundles is + // send only via newManager call above) + messageBundleChan := make(chan message.Bundle) + testManager.messageBundles = messageBundleChan + + testBackoffTable := newTestBackoffTable(t) + checkInterval := 250 * time.Millisecond + // Initialize the message retrieval + go testManager.processMessageRetrieval(mockComms, quitChan) + go testManager.processUncheckedRounds(checkInterval, testBackoffTable, quitChan) + + requestGateway := id.NewIdFromString(ReturningGateway, id.Gateway, t) + + // Construct expected values for checking + expectedEphID := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + idList := [][]byte{requestGateway.Bytes()} + roundInfo := &pb.RoundInfo{ + ID: uint64(roundId), + Topology: idList, + } + + // Add round ot check + err := testManager.Session.UncheckedRounds().AddRound(roundInfo, expectedEphID, requestGateway) + if err != nil { + t.Fatalf("Could not add round to session: %v", err) + } + + var testBundle message.Bundle + go func() { + // Receive the bundle over the channel + time.Sleep(1 * time.Second) + testBundle = <-messageBundleChan + + // Close the process + quitChan <- struct{}{} + + }() + + // Ensure bundle received and has expected values + time.Sleep(2 * time.Second) + if reflect.DeepEqual(testBundle, message.Bundle{}) { + t.Fatalf("Did not receive a message bundle over the channel") + } + + if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() { + t.Errorf("Unexpected ephemeral ID in bundle."+ + "\n\tExpected: %v"+ + "\n\tReceived: %v", expectedEphID, testBundle.Identity.EphId) + } + + _, exists := testManager.Session.UncheckedRounds().GetRound(roundId) + if exists { + t.Fatalf("Expected round %d to be removed after being processed", roundId) + } + +} diff --git a/network/rounds/utils_test.go b/network/rounds/utils_test.go index d352078dcd1219b188c8c7fde0b807748d8c3521..479995d831956809fc772d7f28adacf102acdc7b 100644 --- a/network/rounds/utils_test.go +++ b/network/rounds/utils_test.go @@ -8,14 +8,18 @@ package rounds import ( "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/network/internal" "gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/storage" pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" "testing" + "time" ) func newManager(face interface{}) *Manager { @@ -27,6 +31,7 @@ func newManager(face interface{}) *Manager { Internal: internal.Internal{ Session: sess1, TransmissionID: sess1.GetUser().TransmissionID, + Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), }, } return testManager @@ -102,6 +107,23 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, return nil, nil } +func newTestBackoffTable(face interface{}) [cappedTries]time.Duration { + switch face.(type) { + case *testing.T, *testing.M, *testing.B, *testing.PB: + break + default: + jww.FATAL.Panicf("newTestBackoffTable is restricted to testing only. Got %T", face) + } + + var backoff [cappedTries]time.Duration + for i := 0; i < cappedTries; i++ { + backoff[uint64(i)] = 1 * time.Millisecond + } + + return backoff + +} + func getNDF() *ndf.NetworkDefinition { return &ndf.NetworkDefinition{ E2E: ndf.Group{ diff --git a/storage/rounds/uncheckedRounds.go b/storage/rounds/uncheckedRounds.go new file mode 100644 index 0000000000000000000000000000000000000000..898222b71556ef86146d996e23c9804bff356f85 --- /dev/null +++ b/storage/rounds/uncheckedRounds.go @@ -0,0 +1,314 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package rounds + +import ( + "bytes" + "encoding/binary" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "gitlab.com/elixxir/client/storage/versioned" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/netTime" + "sync" + "time" +) + +const ( + uncheckedRoundVersion = 0 + uncheckedRoundPrefix = "uncheckedRoundPrefix" + // Key to store rounds + uncheckedRoundKey = "uncheckRounds" + // Key to store individual round + // Housekeeping constant (used for serializing uint64 ie id.Round) + uint64Size = 8 + // Maximum checks that can be performed on a round. Intended so that + // a round is checked no more than 1 week approximately (network/rounds.cappedTries + 7) + maxChecks = 14 +) + +// Round identity information used in message retrieval +// Derived from reception.Identity saving data needed +// for message retrieval +type Identity struct { + EpdId ephemeral.Id + Source *id.ID +} + +// Unchecked round structure is rounds which failed on message retrieval +// These rounds are stored for retry of message retrieval +type UncheckedRound struct { + Info *pb.RoundInfo + Identity + // Timestamp in which round has last been checked + LastCheck time.Time + // Number of times a round has been checked + NumChecks uint64 +} + +// marshal serializes UncheckedRound r into a byte slice +func (r UncheckedRound) marshal() ([]byte, error) { + buf := bytes.NewBuffer(nil) + // Write the round info + b := make([]byte, uint64Size) + infoBytes, err := proto.Marshal(r.Info) + binary.LittleEndian.PutUint64(b, uint64(len(infoBytes))) + buf.Write(b) + buf.Write(infoBytes) + + b = make([]byte, uint64Size) + + // Write the round identity info + buf.Write(r.Identity.EpdId[:]) + if r.Source != nil { + buf.Write(r.Identity.Source.Marshal()) + } else { + buf.Write(make([]byte, id.ArrIDLen)) + } + + // Write the time stamp bytes + tsBytes, err := r.LastCheck.MarshalBinary() + if err != nil { + return nil, errors.WithMessage(err, "Could not marshal timestamp ") + } + b = make([]byte, uint64Size) + binary.LittleEndian.PutUint64(b, uint64(len(tsBytes))) + buf.Write(b) + buf.Write(tsBytes) + + // Write the number of tries for this round + b = make([]byte, uint64Size) + binary.LittleEndian.PutUint64(b, r.NumChecks) + buf.Write(b) + + return buf.Bytes(), nil +} + +// unmarshal deserializes round data from buff into UncheckedRound r +func (r *UncheckedRound) unmarshal(buff *bytes.Buffer) error { + // Deserialize the roundInfo + roundInfoLen := binary.LittleEndian.Uint64(buff.Next(uint64Size)) + roundInfoBytes := buff.Next(int(roundInfoLen)) + ri := &pb.RoundInfo{} + if err := proto.Unmarshal(roundInfoBytes, ri); err != nil { + return errors.WithMessagef(err, "Failed to unmarshal roundInfo") + } + r.Info = ri + + // Deserialize the round identity information + copy(r.EpdId[:], buff.Next(uint64Size)) + + sourceId, err := id.Unmarshal(buff.Next(id.ArrIDLen)) + if err != nil { + return errors.WithMessage(err, "Failed to unmarshal round identity.source") + } + + r.Source = sourceId + + // Deserialize the timestamp bytes + timestampLen := binary.LittleEndian.Uint64(buff.Next(uint64Size)) + tsByes := buff.Next(int(uint64(timestampLen))) + if err = r.LastCheck.UnmarshalBinary(tsByes); err != nil { + return errors.WithMessage(err, "Failed to unmarshal round timestamp") + } + + r.NumChecks = binary.LittleEndian.Uint64(buff.Next(uint64Size)) + + return nil +} + +// Storage object saving rounds to retry for message retrieval +type UncheckedRoundStore struct { + list map[id.Round]UncheckedRound + mux sync.RWMutex + kv *versioned.KV +} + +// Constructor for a UncheckedRoundStore +func NewUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { + kv = kv.Prefix(uncheckedRoundPrefix) + + urs := &UncheckedRoundStore{ + list: make(map[id.Round]UncheckedRound, 0), + kv: kv, + } + + return urs, urs.save() + +} + +// Loads an deserializes a UncheckedRoundStore from memory +func LoadUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { + + kv = kv.Prefix(uncheckedRoundPrefix) + vo, err := kv.Get(uncheckedRoundKey, uncheckedRoundVersion) + if err != nil { + return nil, err + } + + urs := &UncheckedRoundStore{ + list: make(map[id.Round]UncheckedRound), + kv: kv, + } + + err = urs.unmarshal(vo.Data) + if err != nil { + return nil, errors.WithMessage(err, "Failed to load rounds from storage") + } + + return urs, err +} + +// Adds a round to check on the list and saves to memory +func (s *UncheckedRoundStore) AddRound(ri *pb.RoundInfo, ephID ephemeral.Id, source *id.ID) error { + s.mux.Lock() + defer s.mux.Unlock() + rid := id.Round(ri.ID) + + if _, exists := s.list[rid]; !exists { + newUncheckedRound := UncheckedRound{ + Info: ri, + Identity: Identity{ + EpdId: ephID, + Source: source, + }, + LastCheck: netTime.Now(), + NumChecks: 0, + } + + s.list[rid] = newUncheckedRound + + return s.save() + } + + return nil +} + +// Retrieves an UncheckedRound from the map, if it exists +func (s *UncheckedRoundStore) GetRound(rid id.Round) (UncheckedRound, bool) { + s.mux.RLock() + defer s.mux.RUnlock() + rnd, exists := s.list[rid] + return rnd, exists +} + +// Retrieves the list of rounds +func (s *UncheckedRoundStore) GetList() map[id.Round]UncheckedRound { + s.mux.RLock() + defer s.mux.RUnlock() + return s.list +} + +// Increments the amount of checks performed on this stored round +func (s *UncheckedRoundStore) IncrementCheck(rid id.Round) error { + s.mux.Lock() + defer s.mux.Unlock() + rnd, exists := s.list[rid] + if !exists { + return errors.Errorf("round %d could not be found in RAM", rid) + } + + // If a round has been checked the maximum amount of times, + // we bail the round by removing it from store and no longer checking + if rnd.NumChecks >= maxChecks { + if err := s.remove(rid); err != nil { + return errors.WithMessagef(err, "Round %d reached maximum checks "+ + "but could not be removed", rid) + } + return nil + } + + // Update the rounds state + rnd.LastCheck = netTime.Now() + rnd.NumChecks++ + s.list[rid] = rnd + return s.save() +} + +// Remove deletes a round from UncheckedRoundStore's list and from storage +func (s *UncheckedRoundStore) Remove(rid id.Round) error { + s.mux.Lock() + defer s.mux.Unlock() + return s.remove(rid) +} + +// Remove is a helper function which removes the round from UncheckedRoundStore's list +// Note this method is unsafe and should only be used by methods with a lock +func (s *UncheckedRoundStore) remove(rid id.Round) error { + if _, exists := s.list[rid]; !exists { + return errors.Errorf("round %d does not exist in store", rid) + } + delete(s.list, rid) + return s.save() +} + +// save stores the information from the round list into storage +func (s *UncheckedRoundStore) save() error { + // Store list of rounds + data, err := s.marshal() + if err != nil { + return errors.WithMessagef(err, "Could not marshal data for unchecked rounds") + } + + // Create the versioned object + obj := &versioned.Object{ + Version: uncheckedRoundVersion, + Timestamp: netTime.Now(), + Data: data, + } + + // Save to storage + err = s.kv.Set(uncheckedRoundKey, uncheckedRoundVersion, obj) + if err != nil { + return errors.WithMessagef(err, "Could not store data for unchecked rounds") + } + + return nil +} + +// marshal is a helper function which serializes all rounds in list to bytes +func (s *UncheckedRoundStore) marshal() ([]byte, error) { + buf := bytes.NewBuffer(nil) + // Write number of rounds the buffer + b := make([]byte, 8) + binary.PutVarint(b, int64(len(s.list))) + buf.Write(b) + + for rid, rnd := range s.list { + rndData, err := rnd.marshal() + if err != nil { + return nil, errors.WithMessagef(err, "Failed to marshal round %d", rid) + } + + buf.Write(rndData) + + } + + return buf.Bytes(), nil +} + +// unmarshal deserializes an UncheckedRound from its stored byte data +func (s *UncheckedRoundStore) unmarshal(data []byte) error { + buff := bytes.NewBuffer(data) + // Get number of rounds in list + length, _ := binary.Varint(buff.Next(8)) + + for i := 0; i < int(length); i++ { + rnd := UncheckedRound{} + err := rnd.unmarshal(buff) + if err != nil { + return errors.WithMessage(err, "Failed to unmarshal rounds in storage") + } + + s.list[id.Round(rnd.Info.ID)] = rnd + } + + return nil +} diff --git a/storage/rounds/uncheckedRounds_test.go b/storage/rounds/uncheckedRounds_test.go new file mode 100644 index 0000000000000000000000000000000000000000..63e8c195f4f2ffa752979e5f5780acdc6d2d3f16 --- /dev/null +++ b/storage/rounds/uncheckedRounds_test.go @@ -0,0 +1,371 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package rounds + +import ( + "bytes" + "gitlab.com/elixxir/client/storage/versioned" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/elixxir/ekv" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "gitlab.com/xx_network/primitives/netTime" + "reflect" + "testing" +) + +// Unit test +func TestNewUncheckedStore(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + + testStore := &UncheckedRoundStore{ + list: make(map[id.Round]UncheckedRound), + kv: kv.Prefix(uncheckedRoundPrefix), + } + + store, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("NewUncheckedStore error: "+ + "Could not create unchecked stor: %v", err) + } + + // Compare manually created object with NewUnknownRoundsStore + if !reflect.DeepEqual(testStore, store) { + t.Fatalf("NewUncheckedStore error: "+ + "Returned incorrect Store."+ + "\n\texpected: %+v\n\treceived: %+v", testStore, store) + } + + rid := id.Round(1) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + uncheckedRound := UncheckedRound{ + Info: roundInfo, + LastCheck: netTime.Now(), + NumChecks: 0, + } + + store.list[rid] = uncheckedRound + if err = store.save(); err != nil { + t.Fatalf("NewUncheckedStore error: "+ + "Could not save store: %v", err) + } + + // Test if round list data matches + expectedRoundData, err := store.marshal() + if err != nil { + t.Fatalf("NewUncheckedStore error: "+ + "Could not marshal data: %v", err) + } + roundData, err := store.kv.Get(uncheckedRoundKey, uncheckedRoundVersion) + if err != nil { + t.Fatalf("NewUncheckedStore error: "+ + "Could not retrieve round list form storage: %v", err) + } + + if !bytes.Equal(expectedRoundData, roundData.Data) { + t.Fatalf("NewUncheckedStore error: "+ + "Data from store was not expected"+ + "\n\tExpected %v\n\tReceived: %v", expectedRoundData, roundData.Data) + } + +} + +// Unit test +func TestLoadUncheckedStore(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + + testStore, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("LoadUncheckedStore error: "+ + "Could not call constructor NewUncheckedStore: %v", err) + } + + // Add round to store + rid := id.Round(0) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + + ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + source := id.NewIdFromBytes([]byte("Sauron"), t) + err = testStore.AddRound(roundInfo, ephId, source) + if err != nil { + t.Fatalf("LoadUncheckedStore error: "+ + "Could not add round to store: %v", err) + } + + // Load store + loadedStore, err := LoadUncheckedStore(kv) + if err != nil { + t.Fatalf("LoadUncheckedStore error: "+ + "Could not call LoadUncheckedStore: %v", err) + } + + // Check if round is in loaded store + rnd, exists := loadedStore.list[rid] + if !exists { + t.Fatalf("LoadUncheckedStore error: "+ + "Added round %d not found in loaded store", rid) + } + + // Check if set values are expected + if !bytes.Equal(rnd.EpdId[:], ephId[:]) || + !source.Cmp(rnd.Source) { + t.Fatalf("LoadUncheckedStore error: "+ + "Values in loaded round %d are not expected."+ + "\n\tExpected ephemeral: %v"+ + "\n\tReceived ephemeral: %v"+ + "\n\tExpected source: %v"+ + "\n\tReceived source: %v", rid, + ephId, rnd.EpdId, + source, rnd.Source) + } + +} + +// Unit test +func TestUncheckedRoundStore_AddRound(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + + testStore, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("AddRound error: "+ + "Could not call constructor NewUncheckedStore: %v", err) + } + + // Add round to store + rid := id.Round(0) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + source := id.NewIdFromBytes([]byte("Sauron"), t) + err = testStore.AddRound(roundInfo, ephId, source) + if err != nil { + t.Fatalf("AddRound error: "+ + "Could not add round to store: %v", err) + } + + if _, exists := testStore.list[rid]; !exists { + t.Errorf("AddRound error: " + + "Could not find added round in list") + } + +} + +// Unit test +func TestUncheckedRoundStore_GetRound(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + + testStore, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("GetRound error: "+ + "Could not call constructor NewUncheckedStore: %v", err) + } + + // Add round to store + rid := id.Round(0) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + source := id.NewIdFromBytes([]byte("Sauron"), t) + err = testStore.AddRound(roundInfo, ephId, source) + if err != nil { + t.Fatalf("GetRound error: "+ + "Could not add round to store: %v", err) + } + + // Retrieve round that was inserted + retrievedRound, exists := testStore.GetRound(rid) + if !exists { + t.Fatalf("GetRound error: " + + "Could not get round from store") + } + + if !bytes.Equal(retrievedRound.EpdId[:], ephId[:]) || + !source.Cmp(retrievedRound.Source) { + t.Fatalf("GetRound error: "+ + "Values in loaded round %d are not expected."+ + "\n\tExpected ephemeral: %v"+ + "\n\tReceived ephemeral: %v"+ + "\n\tExpected source: %v"+ + "\n\tReceived source: %v", rid, + ephId, retrievedRound.EpdId, + source, retrievedRound.Source) + } + + // Try to pull unknown round from store + unknownRound := id.Round(1) + _, exists = testStore.GetRound(unknownRound) + if exists { + t.Fatalf("GetRound error: " + + "Should not find unknown round in store.") + } + +} + +// Unit test +func TestUncheckedRoundStore_GetList(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + + testStore, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("GetList error: "+ + "Could not call constructor NewUncheckedStore: %v", err) + } + + // Add rounds to store + numRounds := 10 + for i := 0; i < numRounds; i++ { + rid := id.Round(i) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + source := id.NewIdFromUInt(uint64(i), id.User, t) + err = testStore.AddRound(roundInfo, ephId, source) + if err != nil { + t.Errorf("GetList error: "+ + "Could not add round to store: %v", err) + } + } + + // Retrieve list + retrievedList := testStore.GetList() + if len(retrievedList) != numRounds { + t.Errorf("GetList error: "+ + "List returned is not of expected size."+ + "\n\tExpected: %v\n\tReceived: %v", numRounds, len(retrievedList)) + } + + for i := 0; i < numRounds; i++ { + rid := id.Round(i) + if _, exists := retrievedList[rid]; !exists { + t.Errorf("GetList error: "+ + "Retrieved list does not contain expected round %d.", rid) + } + } + +} + +// Unit test +func TestUncheckedRoundStore_IncrementCheck(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + + testStore, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("IncrementCheck error: "+ + "Could not call constructor NewUncheckedStore: %v", err) + } + + // Add rounds to store + numRounds := 10 + for i := 0; i < numRounds; i++ { + rid := id.Round(i) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + source := id.NewIdFromUInt(uint64(i), id.User, t) + err = testStore.AddRound(roundInfo, ephId, source) + if err != nil { + t.Errorf("IncrementCheck error: "+ + "Could not add round to store: %v", err) + } + } + + testRound := id.Round(3) + numChecks := 4 + for i := 0; i < numChecks; i++ { + err = testStore.IncrementCheck(testRound) + if err != nil { + t.Errorf("IncrementCheck error: "+ + "Could not increment check for round %d: %v", testRound, err) + } + } + + rnd, _ := testStore.GetRound(testRound) + if rnd.NumChecks != uint64(numChecks) { + t.Errorf("IncrementCheck error: "+ + "Round %d did not have expected number of checks."+ + "\n\tExpected: %v\n\tReceived: %v", testRound, numChecks, rnd.NumChecks) + } + + // Error path: check unknown round can not be incremented + unknownRound := id.Round(numRounds + 5) + err = testStore.IncrementCheck(unknownRound) + if err == nil { + t.Errorf("IncrementCheck error: "+ + "Should not find round %d which was not added to store", unknownRound) + } + + // Reach max checks, ensure that round is removed + maxRound := id.Round(7) + for i := 0; i < maxChecks+1; i++ { + err = testStore.IncrementCheck(maxRound) + if err != nil { + t.Errorf("IncrementCheck error: "+ + "Could not increment check for round %d: %v", maxRound, err) + } + + } + +} + +// Unit test +func TestUncheckedRoundStore_Remove(t *testing.T) { + kv := versioned.NewKV(make(ekv.Memstore)) + testStore, err := NewUncheckedStore(kv) + if err != nil { + t.Fatalf("Remove error: "+ + "Could not call constructor NewUncheckedStore: %v", err) + } + + // Add rounds to store + numRounds := 10 + for i := 0; i < numRounds; i++ { + rid := id.Round(i) + roundInfo := &pb.RoundInfo{ + ID: uint64(rid), + } + ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} + source := id.NewIdFromUInt(uint64(i), id.User, t) + err = testStore.AddRound(roundInfo, ephId, source) + if err != nil { + t.Errorf("Remove error: "+ + "Could not add round to store: %v", err) + } + } + + // Remove round from storage + removedRound := id.Round(1) + err = testStore.Remove(removedRound) + if err != nil { + t.Errorf("Remove error: "+ + "Could not removed round %d from storage: %v", removedRound, err) + } + + // Check that round was removed + _, exists := testStore.GetRound(removedRound) + if exists { + t.Errorf("Remove error: "+ + "Round %d expected to be removed from storage", removedRound) + } + + // Error path: attempt to remove unknown round + unknownRound := id.Round(numRounds + 5) + err = testStore.Remove(unknownRound) + if err == nil { + t.Errorf("Remove error: "+ + "Should not removed round %d which is not in storage", unknownRound) + } + +} diff --git a/storage/session.go b/storage/session.go index febb8d760fc8fc22d36ac3429b6014a2641a8e55..0277166a98b9dd51f06509efb6f35e514aae249d 100644 --- a/storage/session.go +++ b/storage/session.go @@ -10,6 +10,7 @@ package storage import ( + "gitlab.com/elixxir/client/storage/rounds" "sync" "testing" "time" @@ -63,6 +64,7 @@ type Session struct { garbledMessages *utility.MeteredCmixMessageBuffer reception *reception.Store clientVersion *clientVersion.Store + uncheckedRounds *rounds.UncheckedRoundStore } // Initialize a new Session object @@ -142,6 +144,10 @@ func New(baseDir, password string, u userInterface.User, currentVersion version. return nil, errors.WithMessage(err, "Failed to create client version store.") } + s.uncheckedRounds, err = rounds.NewUncheckedStore(s.kv) + if err != nil { + return nil, errors.WithMessage(err, "Failed to create unchecked round store") + } return s, nil } @@ -213,6 +219,11 @@ func Load(baseDir, password string, currentVersion version.Version, s.reception = reception.LoadStore(s.kv) + s.uncheckedRounds, err = rounds.LoadUncheckedStore(s.kv) + if err != nil { + return nil, errors.WithMessage(err, "Failed to load unchecked round store") + } + return s, nil } @@ -283,6 +294,12 @@ func (s *Session) Partition() *partition.Store { return s.partition } +func (s *Session) UncheckedRounds() *rounds.UncheckedRoundStore { + s.mux.RLock() + defer s.mux.RUnlock() + return s.uncheckedRounds +} + // Get an object from the session func (s *Session) Get(key string) (*versioned.Object, error) { return s.kv.Get(key, currentSessionVersion) @@ -373,5 +390,10 @@ func InitTestingSession(i interface{}) *Session { s.reception = reception.NewStore(s.kv) + s.uncheckedRounds, err = rounds.NewUncheckedStore(s.kv) + if err != nil { + jww.FATAL.Panicf("Failed to create uncheckRound store: %v", err) + } + return s }