Skip to content
Snippets Groups Projects
Commit 20e1c77f authored by Jono Wenger's avatar Jono Wenger
Browse files

Fix tests for network/rounds

parent f7e09f0c
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
...@@ -9,38 +9,40 @@ package rounds ...@@ -9,38 +9,40 @@ package rounds
import ( import (
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces" "gitlab.com/elixxir/client/network/identity/receptionID"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
func (m *manager) GetMessagesFromRound(roundID id.Round, identity interfaces.EphemeralIdentity) { func (m *manager) GetMessagesFromRound(roundID id.Round, identity receptionID.EphemeralIdentity) {
//get the round from the in ram store // Get the round from the in-RAM store
ri, err := m.instance.GetRound(roundID) ri, err := m.instance.GetRound(roundID)
// If we didn't find it, send to Historical Rounds Retrieval // If we did not find it, then send to Historical Rounds Retrieval
if err != nil || m.params.ForceHistoricalRounds { if err != nil || m.params.ForceHistoricalRounds {
// store the round as an unretreived round without a round info // Store the round as an un-retrieved round without a round info
// This will silently do nothing if the round is // This will silently do nothing if the round is
err = m.unchecked.AddRound(roundID, nil, err = m.unchecked.AddRound(roundID, nil,
identity.Source, identity.EphId) identity.Source, identity.EphId)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) jww.FATAL.Panicf(
"Failed to denote Unchecked Round for round %d", roundID)
} }
if m.params.ForceHistoricalRounds { if m.params.ForceHistoricalRounds {
jww.WARN.Printf("Forcing use of historical rounds for round ID %d.", jww.WARN.Printf(
roundID) "Forcing use of historical rounds for round ID %d.", roundID)
} }
jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+
"up messages via historical lookup", roundID, identity.EphId.Int64(), "up messages via historical lookup", roundID, identity.EphId.Int64(),
identity.Source) identity.Source)
err = m.historical.LookupHistoricalRound(roundID, func(info *pb.RoundInfo, success bool) { err = m.historical.LookupHistoricalRound(
roundID, func(info *pb.RoundInfo, success bool) {
if !success { if !success {
// TODO: implement me
} }
// If found, send to Message Retrieval Workers // If found, send to Message Retrieval Workers
m.lookupRoundMessages <- roundLookup{ m.lookupRoundMessages <- roundLookup{
...@@ -48,16 +50,19 @@ func (m *manager) GetMessagesFromRound(roundID id.Round, identity interfaces.Eph ...@@ -48,16 +50,19 @@ func (m *manager) GetMessagesFromRound(roundID id.Round, identity interfaces.Eph
Identity: identity, Identity: identity,
} }
}) })
} else { // if we did find it, send it to the round pickup thread } else {
// If we did find it, send it to the round pickup thread
jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+
"up messages via in ram lookup", roundID, identity.EphId.Int64(), "up messages via in ram lookup", roundID, identity.EphId.Int64(),
identity.Source) identity.Source)
//store the round as an unretreived round
// store the round as an un-retrieved round
if !m.params.RealtimeOnly { if !m.params.RealtimeOnly {
err = m.unchecked.AddRound(roundID, ri, err = m.unchecked.AddRound(roundID, ri,
identity.Source, identity.EphId) identity.Source, identity.EphId)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) jww.FATAL.Panicf(
"Failed to denote Unchecked Round for round %d", roundID)
} }
} }
......
...@@ -45,8 +45,9 @@ type manager struct { ...@@ -45,8 +45,9 @@ type manager struct {
} }
func NewPickup(params Params, bundles chan<- message.Bundle, func NewPickup(params Params, bundles chan<- message.Bundle,
sender gateway.Sender, historical historical.Retriever, rng *fastRNG.StreamGenerator, sender gateway.Sender, historical historical.Retriever,
instance RoundGetter, session storage.Session) Pickup { rng *fastRNG.StreamGenerator, instance RoundGetter,
session storage.Session) Pickup {
unchecked := store.NewOrLoadUncheckedStore(session.GetKV()) unchecked := store.NewOrLoadUncheckedStore(session.GetKV())
m := &manager{ m := &manager{
params: params, params: params,
...@@ -67,9 +68,10 @@ func (m *manager) StartProcessors() stoppable.Stoppable { ...@@ -67,9 +68,10 @@ func (m *manager) StartProcessors() stoppable.Stoppable {
multi := stoppable.NewMulti("Rounds") multi := stoppable.NewMulti("Rounds")
//start the message retrieval worker pool // Start the message retrieval worker pool
for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ { for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ {
stopper := stoppable.NewSingle("Message Retriever " + strconv.Itoa(int(i))) stopper := stoppable.NewSingle(
"Message Retriever " + strconv.Itoa(int(i)))
go m.processMessageRetrieval(m.comms, stopper) go m.processMessageRetrieval(m.comms, stopper)
multi.Add(stopper) multi.Add(stopper)
} }
...@@ -77,7 +79,8 @@ func (m *manager) StartProcessors() stoppable.Stoppable { ...@@ -77,7 +79,8 @@ func (m *manager) StartProcessors() stoppable.Stoppable {
// Start the periodic unchecked round worker // Start the periodic unchecked round worker
if !m.params.RealtimeOnly { if !m.params.RealtimeOnly {
stopper := stoppable.NewSingle("UncheckRound") stopper := stoppable.NewSingle("UncheckRound")
go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper) go m.processUncheckedRounds(
m.params.UncheckRoundPeriod, backOffTable, stopper)
multi.Add(stopper) multi.Add(stopper)
} }
......
...@@ -12,8 +12,8 @@ type Params struct { ...@@ -12,8 +12,8 @@ type Params struct {
// Maximum number of times a historical round lookup will be attempted // Maximum number of times a historical round lookup will be attempted
MaxHistoricalRoundsRetries uint MaxHistoricalRoundsRetries uint
// Interval between checking for rounds in UncheckedRoundStore // Interval between checking for rounds in UncheckedRoundStore due for a
// due for a message retrieval retry // message retrieval retry
UncheckRoundPeriod time.Duration UncheckRoundPeriod time.Duration
// Toggles if message pickup retrying mechanism if forced // Toggles if message pickup retrying mechanism if forced
...@@ -24,7 +24,7 @@ type Params struct { ...@@ -24,7 +24,7 @@ type Params struct {
// tried // tried
SendTimeout time.Duration SendTimeout time.Duration
//disables all attempts to pick up dropped or missed messages // Disables all attempts to pick up dropped or missed messages
RealtimeOnly bool RealtimeOnly bool
// Toggles if historical rounds should always be used // Toggles if historical rounds should always be used
......
...@@ -25,8 +25,8 @@ import ( ...@@ -25,8 +25,8 @@ import (
type MessageRetrievalComms interface { type MessageRetrievalComms interface {
GetHost(hostId *id.ID) (*connect.Host, bool) GetHost(hostId *id.ID) (*connect.Host, bool)
RequestMessages(host *connect.Host, RequestMessages(host *connect.Host, message *pb.GetMessages) (
message *pb.GetMessages) (*pb.GetMessagesResponse, error) *pb.GetMessagesResponse, error)
} }
type roundLookup struct { type roundLookup struct {
...@@ -37,7 +37,7 @@ type roundLookup struct { ...@@ -37,7 +37,7 @@ type roundLookup struct {
const noRoundError = "does not have round %d" const noRoundError = "does not have round %d"
// processMessageRetrieval received a roundLookup request and pings the gateways // processMessageRetrieval received a roundLookup request and pings the gateways
// of that round for messages for the requested Identity in the roundLookup // of that round for messages for the requested Identity in the roundLookup.
func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
stop *stoppable.Single) { stop *stoppable.Single) {
...@@ -49,11 +49,14 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, ...@@ -49,11 +49,14 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
case rl := <-m.lookupRoundMessages: case rl := <-m.lookupRoundMessages:
ri := rl.RoundInfo ri := rl.RoundInfo
jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) jww.DEBUG.Printf("Checking for messages in round %d", ri.ID)
if !m.params.RealtimeOnly { if !m.params.RealtimeOnly {
err := m.unchecked.AddRound(id.Round(ri.ID), ri, err := m.unchecked.AddRound(id.Round(ri.ID), ri,
rl.Identity.Source, rl.Identity.EphId) rl.Identity.Source, rl.Identity.EphId)
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", id.Round(ri.ID)) jww.FATAL.Panicf(
"Failed to denote Unchecked Round for round %d",
id.Round(ri.ID))
} }
} }
...@@ -62,37 +65,41 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, ...@@ -62,37 +65,41 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
for i, idBytes := range ri.Topology { for i, idBytes := range ri.Topology {
gwId, err := id.Unmarshal(idBytes) gwId, err := id.Unmarshal(idBytes)
if err != nil { if err != nil {
jww.FATAL.Panicf("processMessageRetrieval: Unable to unmarshal: %+v", err) jww.FATAL.Panicf(
"processMessageRetrieval: Unable to unmarshal: %+v", err)
} }
gwId.SetType(id.Gateway) gwId.SetType(id.Gateway)
gwIds[i] = gwId gwIds[i] = gwId
} }
if len(gwIds) == 0 { if len(gwIds) == 0 {
jww.WARN.Printf("Empty gateway ID List") jww.WARN.Printf("Empty gateway ID List")
continue continue
} }
// Target the last nodes in the team first because it has
// messages first, randomize other members of the team // Target the last nodes in the team first because it has messages
// first, randomize other members of the team
var rndBytes [32]byte var rndBytes [32]byte
stream := m.rng.GetStream() stream := m.rng.GetStream()
_, err := stream.Read(rndBytes[:]) _, err := stream.Read(rndBytes[:])
stream.Close() stream.Close()
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+
"from all gateways (%v): %s", "from all gateways (%v): %s", ri.ID, gwIds, err)
id.Round(ri.ID), gwIds, err)
} }
gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0] gwIds[0], gwIds[len(gwIds)-1] = gwIds[len(gwIds)-1], gwIds[0]
shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) { shuffle.ShuffleSwap(rndBytes[:], len(gwIds)-1, func(i, j int) {
gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1] gwIds[i+1], gwIds[j+1] = gwIds[j+1], gwIds[i+1]
}) })
// If ForceMessagePickupRetry, we are forcing processUncheckedRounds by // If ForceMessagePickupRetry, we are forcing processUncheckedRounds
// randomly not picking up messages (FOR INTEGRATION TEST). Only done if // by randomly not picking up messages (FOR INTEGRATION TEST). Only
// round has not been ignored before // done if round has not been ignored before.
var bundle message.Bundle var bundle message.Bundle
if m.params.ForceMessagePickupRetry { if m.params.ForceMessagePickupRetry {
bundle, err = m.forceMessagePickupRetry(ri, rl, comms, gwIds, stop) bundle, err = m.forceMessagePickupRetry(
ri, rl, comms, gwIds, stop)
// Exit if the thread has been stopped // Exit if the thread has been stopped
if stoppable.CheckErr(err) { if stoppable.CheckErr(err) {
...@@ -100,13 +107,13 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, ...@@ -100,13 +107,13 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
continue continue
} }
if err != nil { if err != nil {
jww.ERROR.Printf("Failed to get pickup round %d "+ jww.ERROR.Printf("Failed to get pickup round %d from all "+
"from all gateways (%v): %s", "gateways (%v): %s", ri.ID, gwIds, err)
id.Round(ri.ID), gwIds, err)
} }
} else { } else {
// Attempt to request for this gateway // Attempt to request for this gateway
bundle, err = m.getMessagesFromGateway(id.Round(ri.ID), rl.Identity, comms, gwIds, stop) bundle, err = m.getMessagesFromGateway(
id.Round(ri.ID), rl.Identity, comms, gwIds, stop)
// Exit if the thread has been stopped // Exit if the thread has been stopped
if stoppable.CheckErr(err) { if stoppable.CheckErr(err) {
...@@ -114,18 +121,20 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, ...@@ -114,18 +121,20 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
continue continue
} }
// After trying all gateways, if none returned we mark the round as a // After trying all gateways, if none returned we mark the round
// failure and print out the last error // as a failure and print out the last error
if err != nil { if err != nil {
jww.ERROR.Printf("Failed to get pickup round %d "+ jww.ERROR.Printf("Failed to get pickup round %d "+
"from all gateways (%v): %s", "from all gateways (%v): %s", ri.ID, gwIds, err)
id.Round(ri.ID), gwIds, err)
} }
} }
jww.DEBUG.Printf("messages: %v\n", bundle.Messages)
if len(bundle.Messages) != 0 { if len(bundle.Messages) != 0 {
// If successful and there are messages, we send them to another thread // If successful and there are messages, we send them to another
// thread
bundle.Identity = receptionID.EphemeralIdentity{ bundle.Identity = receptionID.EphemeralIdentity{
EphId: rl.Identity.EphId, EphId: rl.Identity.EphId,
Source: rl.Identity.Source, Source: rl.Identity.Source,
...@@ -135,10 +144,11 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, ...@@ -135,10 +144,11 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
if !m.params.RealtimeOnly { if !m.params.RealtimeOnly {
err = m.unchecked.Remove(id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) err = m.unchecked.Remove(
id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId)
if err != nil { if err != nil {
jww.ERROR.Printf("Could not remove round %d "+ jww.ERROR.Printf("Could not remove round %d from "+
"from unchecked rounds store: %v", ri.ID, err) "unchecked rounds store: %v", ri.ID, err)
} }
} }
...@@ -148,16 +158,18 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms, ...@@ -148,16 +158,18 @@ func (m *manager) processMessageRetrieval(comms MessageRetrievalComms,
} }
} }
// getMessagesFromGateway attempts to get messages from their assigned // getMessagesFromGateway attempts to get messages from their assigned gateway
// gateway host in the round specified. If successful // host in the round specified. If successful
func (m *manager) getMessagesFromGateway(roundID id.Round, func (m *manager) getMessagesFromGateway(roundID id.Round,
identity receptionID.EphemeralIdentity, comms MessageRetrievalComms, gwIds []*id.ID, identity receptionID.EphemeralIdentity, comms MessageRetrievalComms, gwIds []*id.ID,
stop *stoppable.Single) (message.Bundle, error) { stop *stoppable.Single) (message.Bundle, error) {
start := time.Now() start := time.Now()
// Send to the gateways using backup proxies // Send to the gateways using backup proxies
result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) { result, err := m.sender.SendToPreferred(gwIds,
jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ func(host *connect.Host, target *id.ID, _ time.Duration) (interface{}, error) {
"via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) jww.DEBUG.Printf("Trying to get messages for round %d for "+
"ephemeralID %d (%s) via Gateway: %s", roundID,
identity.EphId.Int64(), identity.Source, host.GetId())
// send the request // send the request
msgReq := &pb.GetMessages{ msgReq := &pb.GetMessages{
...@@ -166,31 +178,35 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, ...@@ -166,31 +178,35 @@ func (m *manager) getMessagesFromGateway(roundID id.Round,
Target: target.Marshal(), Target: target.Marshal(),
} }
// If the gateway doesnt have the round, return an error // If the gateway doesn't have the round, return an error
msgResp, err := comms.RequestMessages(host, msgReq) msgResp, err := comms.RequestMessages(host, msgReq)
if err != nil { if err != nil {
// you need to default to a retryable errors because otherwise we cannot enumerate all errors // You need to default to a retryable errors because otherwise
// we cannot enumerate all errors
return nil, errors.WithMessage(err, gateway.RetryableError) return nil, errors.WithMessage(err, gateway.RetryableError)
} }
if !msgResp.GetHasRound() { if !msgResp.GetHasRound() {
errRtn := errors.Errorf(noRoundError, roundID) errRtn := errors.Errorf(noRoundError, roundID)
return message.Bundle{}, errors.WithMessage(errRtn, gateway.RetryableError) return message.Bundle{},
errors.WithMessage(errRtn, gateway.RetryableError)
} }
return msgResp, nil return msgResp, nil
}, stop, m.params.SendTimeout) }, stop, m.params.SendTimeout)
jww.INFO.Printf("Received message for round %d, processing...", roundID) jww.INFO.Printf("Received message for round %d, processing...", roundID)
// Fail the round if an error occurs so it can be tried again later
// Fail the round if an error occurs so that it can be tried again later
if err != nil { if err != nil {
return message.Bundle{}, errors.WithMessagef(err, "Failed to "+ return message.Bundle{}, errors.WithMessagef(
"request messages for round %d", roundID) err, "Failed to request messages for round %d", roundID)
} }
msgResp := result.(*pb.GetMessagesResponse) msgResp := result.(*pb.GetMessagesResponse)
// If there are no messages print a warning. Due to the probabilistic nature // If there are no messages, print a warning. Due to the probabilistic
// of the bloom filters, false positives will happen sometimes // nature of the bloom filters, false positives will happen sometimes
msgs := msgResp.GetMessages() msgs := msgResp.GetMessages()
if msgs == nil || len(msgs) == 0 { if msgs == nil || len(msgs) == 0 {
jww.WARN.Printf("no messages for client %s "+ jww.WARN.Printf("no messages for client %s "+
...@@ -207,10 +223,11 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, ...@@ -207,10 +223,11 @@ func (m *manager) getMessagesFromGateway(roundID id.Round,
return message.Bundle{}, nil return message.Bundle{}, nil
} }
jww.INFO.Printf("Received %d messages in Round %v for %d (%s) in %s", jww.INFO.Printf("Received %d messages in Round %d for %d (%s) in %s",
len(msgs), roundID, identity.EphId.Int64(), identity.Source, time.Now().Sub(start)) len(msgs), roundID, identity.EphId.Int64(), identity.Source,
time.Now().Sub(start))
// build the bundle of messages to send to the message processor // Build the bundle of messages to send to the message processor
bundle := message.Bundle{ bundle := message.Bundle{
Round: roundID, Round: roundID,
Messages: make([]format.Message, len(msgs)), Messages: make([]format.Message, len(msgs)),
...@@ -230,31 +247,34 @@ func (m *manager) getMessagesFromGateway(roundID id.Round, ...@@ -230,31 +247,34 @@ func (m *manager) getMessagesFromGateway(roundID id.Round,
} }
// Helper function which forces processUncheckedRounds by randomly // Helper function which forces processUncheckedRounds by randomly not looking
// not looking up messages // up messages.
func (m *manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup, func (m *manager) forceMessagePickupRetry(ri *pb.RoundInfo, rl roundLookup,
comms MessageRetrievalComms, gwIds []*id.ID, comms MessageRetrievalComms, gwIds []*id.ID,
stop *stoppable.Single) (bundle message.Bundle, err error) { stop *stoppable.Single) (bundle message.Bundle, err error) {
rnd, _ := m.unchecked.GetRound(id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId) rnd, _ := m.unchecked.GetRound(
id.Round(ri.ID), rl.Identity.Source, rl.Identity.EphId)
if rnd.NumChecks == 0 { if rnd.NumChecks == 0 {
// Flip a coin to determine whether to pick up message // Flip a coin to determine whether to pick up message
stream := m.rng.GetStream()
defer stream.Close()
b := make([]byte, 8) b := make([]byte, 8)
stream := m.rng.GetStream()
_, err = stream.Read(b) _, err = stream.Read(b)
if err != nil { if err != nil {
jww.FATAL.Panic(err.Error()) jww.FATAL.Panic(err)
} }
stream.Close()
result := binary.BigEndian.Uint64(b) result := binary.BigEndian.Uint64(b)
if result%2 == 0 { if result%2 == 0 {
jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID) jww.INFO.Printf("Forcing a message pickup retry for round %d", ri.ID)
// Do not call get message, leaving the round to be picked up // Do not call get message, leaving the round to be picked up in
// in unchecked round scheduler process // unchecked round scheduler process
return return
} }
} }
// Attempt to request for this gateway // Attempt to request for this gateway
return m.getMessagesFromGateway(id.Round(ri.ID), rl.Identity, comms, gwIds, stop) return m.getMessagesFromGateway(
id.Round(ri.ID), rl.Identity, comms, gwIds, stop)
} }
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
...@@ -26,6 +27,7 @@ import ( ...@@ -26,6 +27,7 @@ import (
// Happy path // Happy path
func TestManager_ProcessMessageRetrieval(t *testing.T) { func TestManager_ProcessMessageRetrieval(t *testing.T) {
// General initializations // General initializations
connect.TestingOnlyDisableTLS = true
testManager := newManager(t) testManager := newManager(t)
roundId := id.Round(5) roundId := id.Round(5)
mockComms := &mockMessageRetrievalComms{testingSignature: t} mockComms := &mockMessageRetrievalComms{testingSignature: t}
...@@ -139,8 +141,8 @@ func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) { ...@@ -139,8 +141,8 @@ func TestManager_ProcessMessageRetrieval_NoRound(t *testing.T) {
testNdf, mockComms, testManager.session, nil) testNdf, mockComms, testManager.session, nil)
stop := stoppable.NewSingle("singleStoppable") stop := stoppable.NewSingle("singleStoppable")
// Create a local channel so reception is possible (testManager.messageBundles is // Create a local channel so reception is possible
// send only via newManager call above) // (testManager.messageBundles is sent only via newManager call above)
messageBundleChan := make(chan message.Bundle) messageBundleChan := make(chan message.Bundle)
testManager.messageBundles = messageBundleChan testManager.messageBundles = messageBundleChan
......
...@@ -31,7 +31,7 @@ func newRoundIdentity(rid id.Round, recipient *id.ID, ephID ephemeral.Id) roundI ...@@ -31,7 +31,7 @@ func newRoundIdentity(rid id.Round, recipient *id.ID, ephID ephemeral.Id) roundI
} }
// String prints a base 64 string representation of roundIdentity. This function // String prints a base 64 string representation of roundIdentity. This function
// satisfies the fmt.Stringer interface. // adheres to the fmt.Stringer interface.
func (ri roundIdentity) String() string { func (ri roundIdentity) String() string {
return base64.StdEncoding.EncodeToString(ri[:]) return base64.StdEncoding.EncodeToString(ri[:])
} }
......
...@@ -33,7 +33,7 @@ func NewUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { ...@@ -33,7 +33,7 @@ func NewUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) {
return urs, urs.save() return urs, urs.save()
} }
// NewUncheckedStore is a constructor for a UncheckedRoundStore. // NewOrLoadUncheckedStore is a constructor for a UncheckedRoundStore.
func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore { func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore {
kv = kv.Prefix(uncheckedRoundPrefix) kv = kv.Prefix(uncheckedRoundPrefix)
...@@ -48,7 +48,7 @@ func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore { ...@@ -48,7 +48,7 @@ func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore {
} }
if err = urs.save(); err != nil { if err = urs.save(); err != nil {
jww.FATAL.Panicf("failed to save a new unchecked round store") jww.FATAL.Panicf("Failed to save a new unchecked round store: %v", err)
} }
return urs return urs
...@@ -56,7 +56,6 @@ func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore { ...@@ -56,7 +56,6 @@ func NewOrLoadUncheckedStore(kv *versioned.KV) *UncheckedRoundStore {
// LoadUncheckedStore loads a deserializes a UncheckedRoundStore from memory. // LoadUncheckedStore loads a deserializes a UncheckedRoundStore from memory.
func LoadUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { func LoadUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) {
kv = kv.Prefix(uncheckedRoundPrefix) kv = kv.Prefix(uncheckedRoundPrefix)
vo, err := kv.Get(uncheckedRoundKey, uncheckedRoundVersion) vo, err := kv.Get(uncheckedRoundKey, uncheckedRoundVersion)
if err != nil { if err != nil {
...@@ -126,9 +125,8 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, ...@@ -126,9 +125,8 @@ func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round,
defer s.mux.RUnlock() defer s.mux.RUnlock()
for _, rnd := range s.list { for _, rnd := range s.list {
jww.DEBUG.Printf("rnd for lookup: %d, %+v\n", rnd.Id, rnd) jww.DEBUG.Printf("Round for lookup: %d, %+v\n", rnd.Id, rnd)
go func(localRid id.Round, go func(localRid id.Round, localRnd UncheckedRound) {
localRnd UncheckedRound) {
iterator(localRid, localRnd) iterator(localRid, localRnd)
}(rnd.Id, rnd) }(rnd.Id, rnd)
} }
......
...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
) )
// Constants for message retrieval backoff delays // Constants for message retrieval backoff delays
// todo - make this a real backoff // TODO: Make this a real backoff
const ( const (
tryZero = 10 * time.Second tryZero = 10 * time.Second
tryOne = 30 * time.Second tryOne = 30 * time.Second
...@@ -27,21 +27,22 @@ const ( ...@@ -27,21 +27,22 @@ const (
tryFour = 3 * time.Hour tryFour = 3 * time.Hour
tryFive = 12 * time.Hour tryFive = 12 * time.Hour
trySix = 24 * time.Hour trySix = 24 * time.Hour
// Amount of tries past which the // Amount of tries past which the backoff will not increase
// backoff will not increase
cappedTries = 7 cappedTries = 7
) )
var backOffTable = [cappedTries]time.Duration{tryZero, tryOne, tryTwo, tryThree, tryFour, tryFive, trySix} var backOffTable = [cappedTries]time.Duration{
tryZero, tryOne, tryTwo, tryThree, tryFour, tryFive, trySix}
// processUncheckedRounds will (periodically) check every checkInterval // processUncheckedRounds will (periodically) check every checkInterval for
// for rounds that failed message retrieval in processMessageRetrieval. // rounds that failed message retrieval in processMessageRetrieval. Rounds will
// Rounds will have a backoff duration in which they will be tried again. // have a backoff duration in which they will be tried again. If a round is
// If a round is found to be due on a periodical check, the round is sent // found to be due on a periodical check, the round is sent back to
// back to processMessageRetrieval. // processMessageRetrieval.
// todo - make this system know which rounds are still in progress instead of just assume by time // TODO: Make this system know which rounds are still in progress instead of
func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTable [cappedTries]time.Duration, // just assume by time
stop *stoppable.Single) { func (m *manager) processUncheckedRounds(checkInterval time.Duration,
backoffTable [cappedTries]time.Duration, stop *stoppable.Single) {
ticker := time.NewTicker(checkInterval) ticker := time.NewTicker(checkInterval)
uncheckedRoundStore := m.unchecked uncheckedRoundStore := m.unchecked
for { for {
...@@ -53,18 +54,23 @@ func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTab ...@@ -53,18 +54,23 @@ func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTab
case <-ticker.C: case <-ticker.C:
iterator := func(rid id.Round, rnd store.UncheckedRound) { iterator := func(rid id.Round, rnd store.UncheckedRound) {
jww.DEBUG.Printf("checking if %d due for a message lookup", rid) jww.DEBUG.Printf(
"Checking if round %d is due for a message lookup.", rid)
// If this round is due for a round check, send the round over // If this round is due for a round check, send the round over
// to the retrieval thread. If not due, check next round. // to the retrieval thread. If not due, then check next round.
if !isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { if !isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) {
return return
} }
jww.INFO.Printf("Round %d due for a message lookup, retrying...", rid)
//check if it needs to be processed by historical Rounds jww.INFO.Printf(
"Round %d due for a message lookup, retrying...", rid)
// Check if it needs to be processed by historical Rounds
m.GetMessagesFromRound(rid, receptionID.EphemeralIdentity{ m.GetMessagesFromRound(rid, receptionID.EphemeralIdentity{
EphId: rnd.EpdId, EphId: rnd.EpdId,
Source: rnd.Source, Source: rnd.Source,
}) })
// Update the state of the round for next look-up (if needed) // Update the state of the round for next look-up (if needed)
err := uncheckedRoundStore.IncrementCheck(rid, rnd.Source, rnd.EpdId) err := uncheckedRoundStore.IncrementCheck(rid, rnd.Source, rnd.EpdId)
if err != nil { if err != nil {
...@@ -72,16 +78,18 @@ func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTab ...@@ -72,16 +78,18 @@ func (m *manager) processUncheckedRounds(checkInterval time.Duration, backoffTab
"increment check attempts for round %d: %v", rid, err) "increment check attempts for round %d: %v", rid, err)
} }
} }
// Pull and iterate through uncheckedRound list // Pull and iterate through uncheckedRound list
m.unchecked.IterateOverList(iterator) m.unchecked.IterateOverList(iterator)
} }
} }
} }
// isRoundCheckDue given the amount of tries and the timestamp the round // isRoundCheckDue determines whether this round is due for another check given
// was stored, determines whether this round is due for another check. // the amount of tries and the timestamp the round was stored. Returns true if a
// Returns true if a new check is due // new check is due
func isRoundCheckDue(tries uint64, ts time.Time, backoffTable [cappedTries]time.Duration) bool { func isRoundCheckDue(tries uint64, ts time.Time,
backoffTable [cappedTries]time.Duration) bool {
now := netTime.Now() now := netTime.Now()
if tries >= uint64(len(backoffTable)) { if tries >= uint64(len(backoffTable)) {
......
...@@ -13,11 +13,11 @@ import ( ...@@ -13,11 +13,11 @@ import (
"gitlab.com/elixxir/client/stoppable" "gitlab.com/elixxir/client/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/ndf"
"reflect"
"testing" "testing"
"time" "time"
) )
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
// Happy path // Happy path
func TestUncheckedRoundScheduler(t *testing.T) { func TestUncheckedRoundScheduler(t *testing.T) {
// General initializations // General initializations
connect.TestingOnlyDisableTLS = true
testManager := newManager(t) testManager := newManager(t)
roundId := id.Round(5) roundId := id.Round(5)
mockComms := &mockMessageRetrievalComms{testingSignature: t} mockComms := &mockMessageRetrievalComms{testingSignature: t}
...@@ -37,9 +38,9 @@ func TestUncheckedRoundScheduler(t *testing.T) { ...@@ -37,9 +38,9 @@ func TestUncheckedRoundScheduler(t *testing.T) {
testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}} testNdf.Gateways = []ndf.Gateway{{ID: gwId.Marshal()}}
p := gateway.DefaultPoolParams() p := gateway.DefaultPoolParams()
p.MaxPoolSize = 1 p.MaxPoolSize = 1
testManager.sender, _ = gateway.NewSender(p, rngGen := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG)
fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG), testManager.sender, _ = gateway.NewSender(
testNdf, mockComms, testManager.Session, nil) p, rngGen, testNdf, mockComms, testManager.session, nil)
// Create a local channel so reception is possible (testManager.messageBundles is // Create a local channel so reception is possible (testManager.messageBundles is
// send only via newManager call above) // send only via newManager call above)
...@@ -62,34 +63,27 @@ func TestUncheckedRoundScheduler(t *testing.T) { ...@@ -62,34 +63,27 @@ func TestUncheckedRoundScheduler(t *testing.T) {
Topology: idList, Topology: idList,
} }
// Add round ot check // Add round to check
err := testManager.Session.UncheckedRounds().AddRound(roundId, roundInfo, requestGateway, expectedEphID) err := testManager.unchecked.AddRound(roundId, roundInfo, requestGateway, expectedEphID)
if err != nil { if err != nil {
t.Fatalf("Could not add round to session: %v", err) t.Fatalf("Could not add round to session: %v", err)
} }
var testBundle message.Bundle var testBundle message.Bundle
go func() { select {
// Receive the bundle over the channel case testBundle = <-messageBundleChan:
time.Sleep(1 * time.Second) case <-time.After(500 * time.Millisecond):
testBundle = <-messageBundleChan t.Fatalf("Did not receive a message bundle over the channel")
}
// Close the process // Close the process
if err := stop1.Close(); err != nil { if err = stop1.Close(); err != nil {
t.Errorf("Failed to signal close to process: %+v", err) t.Errorf("Failed to signal close to process: %+v", err)
} }
if err := stop2.Close(); err != nil { if err = stop2.Close(); err != nil {
t.Errorf("Failed to signal close to process: %+v", err) t.Errorf("Failed to signal close to process: %+v", err)
} }
}()
// Ensure bundle received and has expected values
time.Sleep(2 * time.Second)
if reflect.DeepEqual(testBundle, message.Bundle{}) {
t.Fatalf("Did not receive a message bundle over the channel")
}
if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() { if testBundle.Identity.EphId.Int64() != expectedEphID.Int64() {
t.Errorf("Unexpected address ID in bundle."+ t.Errorf("Unexpected address ID in bundle."+
"\n\tExpected: %v"+ "\n\tExpected: %v"+
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/network/message" "gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/network/rounds/store"
"gitlab.com/elixxir/client/storage" "gitlab.com/elixxir/client/storage"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/comms/testkeys" "gitlab.com/elixxir/comms/testkeys"
...@@ -23,38 +24,67 @@ import ( ...@@ -23,38 +24,67 @@ import (
"time" "time"
) )
func newManager(face interface{}) *manager { func newManager(t *testing.T) *manager {
sess1 := storage.InitTestingSession(face) session := storage.InitTestingSession(t)
unchecked, err := store.NewUncheckedStore(session.GetKV())
if err != nil {
t.Errorf("Failed to make new UncheckedRoundStore: %+v", err)
}
instance := &MockRoundGetter{
topology: [][]byte{
id.NewIdFromString("gateway0", id.Gateway, t).Bytes(),
id.NewIdFromString("gateway1", id.Gateway, t).Bytes(),
id.NewIdFromString("gateway2", id.Gateway, t).Bytes(),
id.NewIdFromString("gateway3", id.Gateway, t).Bytes(),
},
}
testManager := &manager{ testManager := &manager{
params: GetDefaultParams(), params: GetDefaultParams(),
session: session,
rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
instance: instance,
lookupRoundMessages: make(chan roundLookup), lookupRoundMessages: make(chan roundLookup),
messageBundles: make(chan message.Bundle), messageBundles: make(chan message.Bundle),
session: sess1, unchecked: unchecked,
rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
} }
return testManager return testManager
} }
// Build ID off of this string for expected gateway type MockRoundGetter struct {
// which will return on over mock comm topology [][]byte
const ReturningGateway = "GetMessageRequest" }
const FalsePositive = "FalsePositive"
const PayloadMessage = "Payload" func (mrg *MockRoundGetter) GetRound(rid id.Round) (*pb.RoundInfo, error) {
const ErrorGateway = "Error" return &pb.RoundInfo{
ID: uint64(rid),
Topology: mrg.topology,
}, nil
}
// Build ID off of this string for expected gateway that will be returned over
// mock comm
const (
ReturningGateway = "GetMessageRequest"
FalsePositive = "FalsePositive"
PayloadMessage = "Payload"
ErrorGateway = "Error"
)
type mockMessageRetrievalComms struct { type mockMessageRetrievalComms struct {
testingSignature *testing.T testingSignature *testing.T
} }
func (mmrc *mockMessageRetrievalComms) AddHost(hid *id.ID, address string, cert []byte, params connect.HostParams) (host *connect.Host, err error) { func (mmrc *mockMessageRetrievalComms) AddHost(_ *id.ID, _ string, _ []byte,
_ connect.HostParams) (host *connect.Host, err error) {
host, _ = mmrc.GetHost(nil) host, _ = mmrc.GetHost(nil)
return host, nil return host, nil
} }
func (mmrc *mockMessageRetrievalComms) RemoveHost(hid *id.ID) { func (mmrc *mockMessageRetrievalComms) RemoveHost(_ *id.ID) {
} }
func (mmrc *mockMessageRetrievalComms) GetHost(hostId *id.ID) (*connect.Host, bool) { func (mmrc *mockMessageRetrievalComms) GetHost(hostId *id.ID) (*connect.Host, bool) {
...@@ -65,13 +95,14 @@ func (mmrc *mockMessageRetrievalComms) GetHost(hostId *id.ID) (*connect.Host, bo ...@@ -65,13 +95,14 @@ func (mmrc *mockMessageRetrievalComms) GetHost(hostId *id.ID) (*connect.Host, bo
return h, true return h, true
} }
// Mock comm which returns differently based on the host ID // RequestMessages returns differently based on the host ID.
// ReturningGateway returns a happy path response, in which there is a message // ReturningGateway returns a happy path response, in which there is a message.
// FalsePositive returns a response in which there were no messages in the round // FalsePositive returns a response in which there were no messages in the
// ErrorGateway returns an error on the mock comm // round.
// ErrorGateway returns an error on the mock comm.
// Any other ID returns default no round errors // Any other ID returns default no round errors
func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host,
message *pb.GetMessages) (*pb.GetMessagesResponse, error) { _ *pb.GetMessages) (*pb.GetMessagesResponse, error) {
payloadMsg := []byte(PayloadMessage) payloadMsg := []byte(PayloadMessage)
payload := make([]byte, 256) payload := make([]byte, 256)
copy(payload, payloadMsg) copy(payload, payloadMsg)
...@@ -81,7 +112,8 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, ...@@ -81,7 +112,8 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host,
} }
// If we are the requesting on the returning gateway, return a mock response // If we are the requesting on the returning gateway, return a mock response
returningGateway := id.NewIdFromString(ReturningGateway, id.Gateway, mmrc.testingSignature) returningGateway := id.NewIdFromString(
ReturningGateway, id.Gateway, mmrc.testingSignature)
if host.GetId().Cmp(returningGateway) { if host.GetId().Cmp(returningGateway) {
return &pb.GetMessagesResponse{ return &pb.GetMessagesResponse{
Messages: []*pb.Slot{testSlot}, Messages: []*pb.Slot{testSlot},
...@@ -89,8 +121,10 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, ...@@ -89,8 +121,10 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host,
}, nil }, nil
} }
// Return an empty message structure (ie a false positive in the bloom filter) // Return an empty message structure (i.e. a false positive in the bloom
falsePositive := id.NewIdFromString(FalsePositive, id.Gateway, mmrc.testingSignature) // filter)
falsePositive := id.NewIdFromString(
FalsePositive, id.Gateway, mmrc.testingSignature)
if host.GetId().Cmp(falsePositive) { if host.GetId().Cmp(falsePositive) {
return &pb.GetMessagesResponse{ return &pb.GetMessagesResponse{
Messages: []*pb.Slot{}, Messages: []*pb.Slot{},
...@@ -99,7 +133,8 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host, ...@@ -99,7 +133,8 @@ func (mmrc *mockMessageRetrievalComms) RequestMessages(host *connect.Host,
} }
// Return a mock error // Return a mock error
errorGateway := id.NewIdFromString(ErrorGateway, id.Gateway, mmrc.testingSignature) errorGateway := id.NewIdFromString(
ErrorGateway, id.Gateway, mmrc.testingSignature)
if host.GetId().Cmp(errorGateway) { if host.GetId().Cmp(errorGateway) {
return &pb.GetMessagesResponse{}, errors.Errorf("Connection error") return &pb.GetMessagesResponse{}, errors.Errorf("Connection error")
} }
...@@ -112,7 +147,8 @@ func newTestBackoffTable(face interface{}) [cappedTries]time.Duration { ...@@ -112,7 +147,8 @@ func newTestBackoffTable(face interface{}) [cappedTries]time.Duration {
case *testing.T, *testing.M, *testing.B, *testing.PB: case *testing.T, *testing.M, *testing.B, *testing.PB:
break break
default: default:
jww.FATAL.Panicf("newTestBackoffTable is restricted to testing only. Got %T", face) jww.FATAL.Panicf(
"newTestBackoffTable is restricted to testing only. Got %T", face)
} }
var backoff [cappedTries]time.Duration var backoff [cappedTries]time.Duration
...@@ -144,38 +180,41 @@ func getNDF() *ndf.NetworkDefinition { ...@@ -144,38 +180,41 @@ func getNDF() *ndf.NetworkDefinition {
}, },
}, },
E2E: ndf.Group{ E2E: ndf.Group{
Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B" + Prime: "E2EE983D031DC1DB6F1A7A67DF0E9A8E5561DB8E8D49413394C049B7A" +
"7A8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3DD2AE" + "8ACCEDC298708F121951D9CF920EC5D146727AA4AE535B0922C688B55B3D" +
"DF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E7861575E745D31F" + "D2AEDF6C01C94764DAB937935AA83BE36E67760713AB44A6337C20E78615" +
"8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC6ADC718DD2A3E041" + "75E745D31F8B9E9AD8412118C62A3E2E29DF46B0864D0C951C394A5CBBDC" +
"023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C4A530E8FFB1BC51DADDF45" + "6ADC718DD2A3E041023DBB5AB23EBB4742DE9C1687B5B34FA48C3521632C" +
"3B0B2717C2BC6669ED76B4BDD5C9FF558E88F26E5785302BEDBCA23EAC5ACE9209" + "4A530E8FFB1BC51DADDF453B0B2717C2BC6669ED76B4BDD5C9FF558E88F2" +
"6EE8A60642FB61E8F3D24990B8CB12EE448EEF78E184C7242DD161C7738F32BF29" + "6E5785302BEDBCA23EAC5ACE92096EE8A60642FB61E8F3D24990B8CB12EE" +
"A841698978825B4111B4BC3E1E198455095958333D776D8B2BEEED3A1A1A221A6E" + "448EEF78E184C7242DD161C7738F32BF29A841698978825B4111B4BC3E1E" +
"37E664A64B83981C46FFDDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F2" + "198455095958333D776D8B2BEEED3A1A1A221A6E37E664A64B83981C46FF" +
"78DE8014A47323631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696" + "DDC1A45E3D5211AAF8BFBC072768C4F50D7D7803D2D4F278DE8014A47323" +
"015CB79C3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E" + "631D7E064DE81C0C6BFA43EF0E6998860F1390B5D3FEACAF1696015CB79C" +
"6319BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC35873" + "3F9C2D93D961120CD0E5F12CBB687EAB045241F96789C38E89D796138E63" +
"847AEF49F66E43873", "19BE62E35D87B1048CA28BE389B575E994DCA755471584A09EC723742DC3" +
"5873847AEF49F66E43873",
Generator: "2", Generator: "2",
}, },
CMIX: ndf.Group{ CMIX: ndf.Group{
Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642F0B5C48" + Prime: "9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642" +
"C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757264E5A1A44F" + "F0B5C48C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757" +
"FE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F9716BFE6117C6B5" + "264E5A1A44FFE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F" +
"B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091EB51743BF33050C38DE2" + "9716BFE6117C6B5B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091E" +
"35567E1B34C3D6A5C0CEAA1A0F368213C3D19843D0B4B09DCB9FC72D39C8DE41" + "B51743BF33050C38DE235567E1B34C3D6A5C0CEAA1A0F368213C3D19843D" +
"F1BF14D4BB4563CA28371621CAD3324B6A2D392145BEBFAC748805236F5CA2FE" + "0B4B09DCB9FC72D39C8DE41F1BF14D4BB4563CA28371621CAD3324B6A2D3" +
"92B871CD8F9C36D3292B5509CA8CAA77A2ADFC7BFD77DDA6F71125A7456FEA15" + "92145BEBFAC748805236F5CA2FE92B871CD8F9C36D3292B5509CA8CAA77A" +
"3E433256A2261C6A06ED3693797E7995FAD5AABBCFBE3EDA2741E375404AE25B", "2ADFC7BFD77DDA6F71125A7456FEA153E433256A2261C6A06ED3693797E7" +
Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E24809670716C613" + "995FAD5AABBCFBE3EDA2741E375404AE25B",
"D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D1AA58C4328A06C4" + Generator: "5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E2480" +
"6A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A338661D10461C0D135472" + "9670716C613D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D" +
"085057F3494309FFA73C611F78B32ADBB5740C361C9F35BE90997DB2014E2EF5" + "1AA58C4328A06C46A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A33" +
"AA61782F52ABEB8BD6432C4DD097BC5423B285DAFB60DC364E8161F4A2A35ACA" + "8661D10461C0D135472085057F3494309FFA73C611F78B32ADBB5740C361" +
"3A10B1C4D203CC76A470A33AFDCBDD92959859ABD8B56E1725252D78EAC66E71" + "C9F35BE90997DB2014E2EF5AA61782F52ABEB8BD6432C4DD097BC5423B28" +
"BA9AE3F1DD2487199874393CD4D832186800654760E1E34C09E4D155179F9EC0" + "5DAFB60DC364E8161F4A2A35ACA3A10B1C4D203CC76A470A33AFDCBDD929" +
"DC4473F996BDCE6EED1CABED8B6F116F7AD9CF505DF0F998E34AB27514B0FFE7", "59859ABD8B56E1725252D78EAC66E71BA9AE3F1DD2487199874393CD4D83" +
"2186800654760E1E34C09E4D155179F9EC0DC4473F996BDCE6EED1CABED8" +
"B6F116F7AD9CF505DF0F998E34AB27514B0FFE7",
}, },
} }
} }
...@@ -11,6 +11,7 @@ package storage ...@@ -11,6 +11,7 @@ package storage
import ( import (
"gitlab.com/elixxir/client/storage/utility" "gitlab.com/elixxir/client/storage/utility"
"gitlab.com/xx_network/crypto/large"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -138,7 +139,7 @@ func New(baseDir, password string, u userInterface.Info, ...@@ -138,7 +139,7 @@ func New(baseDir, password string, u userInterface.Info,
} }
// Loads existing user data into the session // Loads existing user data into the session
func Load(baseDir, password string, currentVersion version.Version) (*Session, error) { func Load(baseDir, password string, currentVersion version.Version) (Session, error) {
s, err := initStore(baseDir, password) s, err := initStore(baseDir, password)
if err != nil { if err != nil {
...@@ -243,6 +244,26 @@ func InitTestingSession(i interface{}) Session { ...@@ -243,6 +244,26 @@ func InitTestingSession(i interface{}) Session {
u.SetRegistrationTimestamp(testTime.UnixNano()) u.SetRegistrationTimestamp(testTime.UnixNano())
s.User = u s.User = u
s.cmixGroup = cyclic.NewGroup(
large.NewIntFromString("9DB6FB5951B66BB6FE1E140F1D2CE5502374161FD6538DF1648218642"+
"F0B5C48C8F7A41AADFA187324B87674FA1822B00F1ECF8136943D7C55757"+
"264E5A1A44FFE012E9936E00C1D3E9310B01C7D179805D3058B2A9F4BB6F"+
"9716BFE6117C6B5B3CC4D9BE341104AD4A80AD6C94E005F4B993E14F091E"+
"B51743BF33050C38DE235567E1B34C3D6A5C0CEAA1A0F368213C3D19843D"+
"0B4B09DCB9FC72D39C8DE41F1BF14D4BB4563CA28371621CAD3324B6A2D3"+
"92145BEBFAC748805236F5CA2FE92B871CD8F9C36D3292B5509CA8CAA77A"+
"2ADFC7BFD77DDA6F71125A7456FEA153E433256A2261C6A06ED3693797E7"+
"995FAD5AABBCFBE3EDA2741E375404AE25B", 16),
large.NewIntFromString("5C7FF6B06F8F143FE8288433493E4769C4D988ACE5BE25A0E2480"+
"9670716C613D7B0CEE6932F8FAA7C44D2CB24523DA53FBE4F6EC3595892D"+
"1AA58C4328A06C46A15662E7EAA703A1DECF8BBB2D05DBE2EB956C142A33"+
"8661D10461C0D135472085057F3494309FFA73C611F78B32ADBB5740C361"+
"C9F35BE90997DB2014E2EF5AA61782F52ABEB8BD6432C4DD097BC5423B28"+
"5DAFB60DC364E8161F4A2A35ACA3A10B1C4D203CC76A470A33AFDCBDD929"+
"59859ABD8B56E1725252D78EAC66E71BA9AE3F1DD2487199874393CD4D83"+
"2186800654760E1E34C09E4D155179F9EC0DC4473F996BDCE6EED1CABED8"+
"B6F116F7AD9CF505DF0F998E34AB27514B0FFE7", 16),
)
return s return s
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment