diff --git a/cmd/root.go b/cmd/root.go index d9373fe1ae63ae380573fab85b4e2585d3ac7aa4..ac0e48756e933735e8fe9500a5d339439cffe86b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -15,7 +15,6 @@ import ( "encoding/json" "errors" "fmt" - "gitlab.com/elixxir/client/storage/user" "io/fs" "io/ioutil" "log" @@ -26,6 +25,8 @@ import ( "sync" "time" + "gitlab.com/elixxir/client/storage/user" + "gitlab.com/elixxir/client/backup" "gitlab.com/elixxir/client/e2e" "gitlab.com/elixxir/client/xxdk" @@ -195,6 +196,8 @@ var rootCmd = &cobra.Command{ client := initClient() + jww.INFO.Printf("Client Initialized...") + user := client.GetUser() jww.INFO.Printf("USERPUBKEY: %s", user.E2eDhPublicKey.TextVerbose(16, 0)) @@ -219,14 +222,21 @@ var rootCmd = &cobra.Command{ recipientContact = user.GetContact() } + jww.INFO.Printf("Client: %s, Partner: %s", user.ReceptionID, + recipientID) + client.GetE2E().EnableUnsafeReception() recvCh := registerMessageListener(client) + jww.INFO.Printf("Starting Network followers...") + err := client.StartNetworkFollower(5 * time.Second) if err != nil { jww.FATAL.Panicf("%+v", err) } + jww.INFO.Printf("Network followers started!") + // Wait until connected or crash on timeout connected := make(chan bool, 10) client.GetCmix().AddHealthCallback( @@ -244,6 +254,8 @@ var rootCmd = &cobra.Command{ // 85% of the nodes numReg := 1 total := 100 + jww.INFO.Printf("Registering with nodes...") + for numReg < (total*3)/4 { time.Sleep(1 * time.Second) numReg, total, err = client.GetNodeRegistrationStatus() @@ -256,6 +268,8 @@ var rootCmd = &cobra.Command{ client.GetBackupContainer().TriggerBackup("Integration test.") + jww.INFO.Printf("Client backup triggered...") + // Send Messages msgBody := viper.GetString("message") time.Sleep(10 * time.Second) @@ -376,6 +390,8 @@ var rootCmd = &cobra.Command{ paramsE2E.ExcludedRounds = excludedRounds.NewSet() } + jww.INFO.Printf("Client Sending messages...") + wg := &sync.WaitGroup{} sendCnt := int(viper.GetUint("sendCount")) wg.Add(sendCnt) @@ -455,6 +471,8 @@ var rootCmd = &cobra.Command{ waitTimeout := time.Duration(waitSecs) * time.Second done := false + jww.INFO.Printf("Client receiving messages...") + for !done && expectedCnt != 0 { timeoutTimer := time.NewTimer(waitTimeout) select { @@ -511,7 +529,7 @@ var rootCmd = &cobra.Command{ if profileOut != "" { pprof.StopCPUProfile() } - + jww.INFO.Printf("Client exiting!") }, } diff --git a/cmix/client.go b/cmix/client.go index 92c2d1a1486f1a68c2e2d7672bab34e126cd4a16..a06e21f4d3a1c94e492a81ddd7bed6679a1547f9 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -117,10 +117,14 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, c.Handler = message.NewHandler(c.param.Message, c.session.GetKV(), c.events, c.session.GetReceptionID()) - return c, nil + err := c.initialize(session.GetNDF()) + return c, err } -func (c *client) Connect(ndf *ndf.NetworkDefinition) error { +// initialize turns on network handlers, initializing a host pool and +// network health monitors. This should be called before +// network Follow command is called. +func (c *client) initialize(ndf *ndf.NetworkDefinition) error { // Start network instance instance, err := commNetwork.NewInstance( c.comms.ProtoComms, ndf, nil, nil, commNetwork.None, diff --git a/cmix/interface.go b/cmix/interface.go index 41e15dd3e981daf718f53f2198d0d708aa525e48..a5db8b3cfe2089e94d03861769785ab99bd4390a 100644 --- a/cmix/interface.go +++ b/cmix/interface.go @@ -14,15 +14,9 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" - "gitlab.com/xx_network/primitives/ndf" ) type Client interface { - // Connect turns on network handlers, initializing a host pool and - // network health monitors. This should be called before - // network Follow command is called. - Connect(ndf *ndf.NetworkDefinition) error - // Follow starts the tracking of the network in a new thread. // Errors that occur are reported on the ClientErrorReport function if // passed. The returned stoppable can be used to stop the follower. diff --git a/connect/utils_test.go b/connect/utils_test.go index 45038764f61017d7caf2403cd629f8c3e821ca39..dec043d0433cfe2efe7d6016a45ae61a1c1d1488 100644 --- a/connect/utils_test.go +++ b/connect/utils_test.go @@ -199,10 +199,6 @@ func newMockCmix() *mockCmix { return &mockCmix{} } -func (m mockCmix) Connect(ndf *ndf.NetworkDefinition) error { - return nil -} - func (m *mockCmix) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil } diff --git a/e2e/critical.go b/e2e/critical.go index a605775419c8e2181e3d22eb3f984fed7a05bf3e..9a803886ff81faeded4e546a8462af9e78c73dc9 100644 --- a/e2e/critical.go +++ b/e2e/critical.go @@ -53,6 +53,7 @@ func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64, E2eMessageBuffer: cm, trigger: make(chan bool, 100), send: send, + healthcb: hm, } return c diff --git a/e2e/fpGenerator_test.go b/e2e/fpGenerator_test.go index daf82b417700983987618f9c1a1df171ecd1436c..ebd2d7cc381a3d3ccc908b9118b697226bb0af59 100644 --- a/e2e/fpGenerator_test.go +++ b/e2e/fpGenerator_test.go @@ -20,7 +20,6 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id/ephemeral" - "gitlab.com/xx_network/primitives/ndf" "math/rand" "sync" "testing" @@ -112,7 +111,6 @@ func newMockFpgCmix() *mockFpgCmix { } } -func (m *mockFpgCmix) Connect(*ndf.NetworkDefinition) error { return nil } func (m *mockFpgCmix) Follow(cmix.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil } func (m *mockFpgCmix) GetMaxMessageLength() int { return 0 } func (m *mockFpgCmix) Send(*id.ID, format.Fingerprint, message.Service, []byte, []byte, cmix.CMIXParams) (id.Round, ephemeral.Id, error) { diff --git a/e2e/manager.go b/e2e/manager.go index cbfcddd6ec0d4b4126983d800e96dce03b25a375..6d4102ebba81c19ee4ddce1bb7cf65ad8e3b7ecf 100644 --- a/e2e/manager.go +++ b/e2e/manager.go @@ -27,7 +27,7 @@ import ( type manager struct { *ratchet.Ratchet *receive.Switchboard - partitioner parse.Partitioner + partitioner *parse.Partitioner net cmix.Client myID *id.ID rng *fastRNG.StreamGenerator @@ -35,6 +35,7 @@ type manager struct { grp *cyclic.Group crit *critical rekeyParams rekey.Params + kv *versioned.KV } const e2eRekeyParamsKey = "e2eRekeyParams" @@ -69,6 +70,8 @@ func initE2E(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int, // Load returns an e2e manager from storage. It uses an ID to prefix the kv // and is used for partner relationships. // You can use a memkv for an ephemeral e2e id +// Can be initialized with a nil cmix.Client, but will crash on start - use when +// prebuilding e2e identity to be used later func Load(kv *versioned.KV, net cmix.Client, myID *id.ID, grp *cyclic.Group, rng *fastRNG.StreamGenerator, events event.Reporter) (Handler, error) { @@ -82,6 +85,8 @@ func Load(kv *versioned.KV, net cmix.Client, myID *id.ID, // Does not modify the kv prefix in any way to maintain backwards compatibility // before multiple IDs were supported // You can use a memkv for an ephemeral e2e id +// Can be initialized with a nil cmix.Client, but will crash on start - use when +// prebuilding e2e identity to be used later func LoadLegacy(kv *versioned.KV, net cmix.Client, myID *id.ID, grp *cyclic.Group, rng *fastRNG.StreamGenerator, events event.Reporter, params rekey.Params) (Handler, error) { @@ -118,12 +123,12 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, m := &manager{ Switchboard: receive.New(), - partitioner: parse.NewPartitioner(kv, net.GetMaxMessageLength()), net: net, myID: myDefaultID, events: events, grp: grp, rekeyParams: rekey.Params{}, + kv: kv, } var err error @@ -144,17 +149,23 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID, "Failed to unmarshal rekeyParams data") } - m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E) - return m, nil } func (m *manager) StartProcesses() (stoppable.Stoppable, error) { multi := stoppable.NewMulti("e2eManager") + if m.partitioner == nil { + m.partitioner = parse.NewPartitioner(m.kv, m.net.GetMaxMessageLength()) + } + + if m.crit == nil { + m.crit = newCritical(m.kv, m.net.AddHealthCallback, m.SendE2E) + } + critcalNetworkStopper := stoppable.NewSingle( "e2eCriticalMessagesStopper") - m.crit.runCriticalMessages(critcalNetworkStopper, + go m.crit.runCriticalMessages(critcalNetworkStopper, m.net.GetInstance().GetRoundEvents()) multi.Add(critcalNetworkStopper) diff --git a/e2e/parse/partition.go b/e2e/parse/partition.go index c0f5c0b95338bd6044063fbe7cecbac114fbbd3f..da7628381175971f748d456761ccc836345bd227 100644 --- a/e2e/parse/partition.go +++ b/e2e/parse/partition.go @@ -32,7 +32,7 @@ type Partitioner struct { partition *partition.Store } -func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner { +func NewPartitioner(kv *versioned.KV, messageSize int) *Partitioner { p := Partitioner{ baseMessageSize: messageSize, firstContentsSize: messageSize - firstHeaderLen, @@ -43,10 +43,10 @@ func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner { } p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize - return p + return &p } -func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, +func (p *Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, timestamp time.Time, payload []byte) ([][]byte, uint64, error) { if len(payload) > p.maxSize { @@ -77,7 +77,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, return parts, fullMessageID, nil } -func (p Partitioner) HandlePartition(sender *id.ID, +func (p *Partitioner) HandlePartition(sender *id.ID, contents []byte, relationshipFingerprint []byte) (receive.Message, bool) { if isFirst(contents) { @@ -106,19 +106,19 @@ func (p Partitioner) HandlePartition(sender *id.ID, // FirstPartitionSize returns the max partition payload size for the // first payload -func (p Partitioner) FirstPartitionSize() uint { +func (p *Partitioner) FirstPartitionSize() uint { return uint(p.firstContentsSize) } // SecondPartitionSize returns the max partition payload size for all // payloads after the first payload -func (p Partitioner) SecondPartitionSize() uint { +func (p *Partitioner) SecondPartitionSize() uint { return uint(p.partContentsSize) } // PayloadSize Returns the max payload size for a partitionable E2E // message -func (p Partitioner) PayloadSize() uint { +func (p *Partitioner) PayloadSize() uint { return uint(p.maxSize) } diff --git a/e2e/rekey/utils_test.go b/e2e/rekey/utils_test.go index e30b22c255aa47c36dd3cb917c2c34a7a84219ef..3e1f77f1dcdc80a760498da5ea6b1905b90cec84 100644 --- a/e2e/rekey/utils_test.go +++ b/e2e/rekey/utils_test.go @@ -218,11 +218,6 @@ func (m mockServiceHandler) DeleteService(clientID *id.ID, toDelete message.Serv type mockNetManager struct{} -func (m *mockNetManager) Connect(ndf *ndf.NetworkDefinition) error { - // TODO implement me - panic("implement me") -} - func (m *mockNetManager) GetIdentity(get *id.ID) (identity.TrackedID, error) { //TODO implement me panic("implement me") diff --git a/e2e/utils_test.go b/e2e/utils_test.go index 36bccbe729f8d9f72b650f960d85a81e110575f2..72bcfb3402d76c2d22b8351f1283a367ba771fc9 100644 --- a/e2e/utils_test.go +++ b/e2e/utils_test.go @@ -166,7 +166,6 @@ func newMockCmix(myID *id.ID, handler *mockCmixHandler, t testing.TB) *mockCmix } } -func (m *mockCmix) Connect(*ndf.NetworkDefinition) error { return nil } func (m *mockCmix) Follow(cmix.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil } func (m *mockCmix) GetMaxMessageLength() int { diff --git a/xxdk/cmix.go b/xxdk/cmix.go index e8aaed28256b8c1ff8b7b4ae98190b4a1a35cce8..059c9d061ffa8b83db87cf30db140f05d5311f7f 100644 --- a/xxdk/cmix.go +++ b/xxdk/cmix.go @@ -156,12 +156,6 @@ func OpenCmix(storageDir string, password []byte, return nil, err } - c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage, - c.rng, c.events) - if err != nil { - return nil, err - } - return c, nil } @@ -213,6 +207,12 @@ func LoadCmix(storageDir string, password []byte, parameters Params) (*Cmix, err return nil, err } + c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage, + c.rng, c.events) + if err != nil { + return nil, err + } + jww.INFO.Printf("Cmix Logged in: \n\tTransmissionID: %s "+ "\n\tReceptionID: %s", c.storage.GetTransmissionID(), c.storage.GetReceptionID()) @@ -245,11 +245,6 @@ func LoadCmix(storageDir string, password []byte, parameters Params) (*Cmix, err } } - err = c.network.Connect(def) - if err != nil { - return nil, err - } - err = c.registerFollower() if err != nil { return nil, err diff --git a/xxdk/e2e.go b/xxdk/e2e.go index 055f18cfe01ea95a49a548d9530a377f52a10782..ce139c2941c26eb86ccd917403864718b61c2946 100644 --- a/xxdk/e2e.go +++ b/xxdk/e2e.go @@ -64,6 +64,12 @@ func LoginLegacy(client *Cmix, callbacks auth.Callbacks) (m *E2e, err error) { } client.GetCmix().AddIdentity(client.GetUser().ReceptionID, time.Time{}, true) + err = client.AddService(m.e2e.StartProcesses) + if err != nil { + return nil, errors.WithMessage(err, "Failed to add "+ + "the e2e processies") + } + m.auth, err = auth.NewState(client.GetStorage().GetKV(), client.GetCmix(), m.e2e, client.GetRng(), client.GetEventReporter(), auth.GetDefaultParams(), callbacks, m.backup.TriggerBackup) @@ -94,7 +100,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, return nil, err } - c, err := OpenCmix(storageDir, password, params) + c, err := LoadCmix(storageDir, password, params) if err != nil { return nil, err } @@ -113,11 +119,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte, "able to register or track network.") } - err = c.network.Connect(def) - if err != nil { - return nil, err - } - err = c.registerFollower() if err != nil { return nil, err @@ -151,7 +152,7 @@ func LoginWithProtoClient(storageDir string, password []byte, return nil, err } - c, err := OpenCmix(storageDir, password, params) + c, err := LoadCmix(storageDir, password, params) if err != nil { return nil, err } @@ -163,11 +164,6 @@ func LoginWithProtoClient(storageDir string, password []byte, return nil, err } - err = c.network.Connect(def) - if err != nil { - return nil, err - } - c.network.AddIdentity(c.GetUser().ReceptionID, time.Time{}, true) // FIXME: The callbacks need to be set, so I suppose we would need to @@ -226,6 +222,12 @@ func login(client *Cmix, callbacks auth.Callbacks, "newly created e2e store") } + err = client.AddService(m.e2e.StartProcesses) + if err != nil { + return nil, errors.WithMessage(err, "Failed to add "+ + "the e2e processies") + } + m.auth, err = auth.NewState(kv, client.GetCmix(), m.e2e, client.GetRng(), client.GetEventReporter(), auth.GetDefaultTemporaryParams(), callbacks, m.backup.TriggerBackup) diff --git a/xxdk/utilsInterfaces_test.go b/xxdk/utilsInterfaces_test.go index 107fb06778868bdf829e537c51d3cd1ae6cb9571..29317d133e3031c8af10eb945ad6fc726d174c1e 100644 --- a/xxdk/utilsInterfaces_test.go +++ b/xxdk/utilsInterfaces_test.go @@ -7,7 +7,6 @@ package xxdk import ( - "gitlab.com/xx_network/primitives/ndf" "time" "gitlab.com/elixxir/client/cmix" @@ -93,9 +92,6 @@ func (d *dummyEventMgr) EventService() (stoppable.Stoppable, error) { } /* Below methods built for interface adherence */ -func (t *testNetworkManagerGeneric) Connect(ndf *ndf.NetworkDefinition) error { - return nil -} func (t *testNetworkManagerGeneric) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil }