Skip to content
Snippets Groups Projects
Commit 8e98267f authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'hotfix/RefactorConnect' into 'release'

Hotfix/refactor connect

See merge request !250
parents 362a4bc9 eeec6a9c
No related branches found
No related tags found
2 merge requests!510Release,!250Hotfix/refactor connect
......@@ -15,7 +15,6 @@ import (
"encoding/json"
"errors"
"fmt"
"gitlab.com/elixxir/client/storage/user"
"io/fs"
"io/ioutil"
"log"
......@@ -26,6 +25,8 @@ import (
"sync"
"time"
"gitlab.com/elixxir/client/storage/user"
"gitlab.com/elixxir/client/backup"
"gitlab.com/elixxir/client/e2e"
"gitlab.com/elixxir/client/xxdk"
......@@ -195,6 +196,8 @@ var rootCmd = &cobra.Command{
client := initClient()
jww.INFO.Printf("Client Initialized...")
user := client.GetUser()
jww.INFO.Printf("USERPUBKEY: %s",
user.E2eDhPublicKey.TextVerbose(16, 0))
......@@ -219,14 +222,21 @@ var rootCmd = &cobra.Command{
recipientContact = user.GetContact()
}
jww.INFO.Printf("Client: %s, Partner: %s", user.ReceptionID,
recipientID)
client.GetE2E().EnableUnsafeReception()
recvCh := registerMessageListener(client)
jww.INFO.Printf("Starting Network followers...")
err := client.StartNetworkFollower(5 * time.Second)
if err != nil {
jww.FATAL.Panicf("%+v", err)
}
jww.INFO.Printf("Network followers started!")
// Wait until connected or crash on timeout
connected := make(chan bool, 10)
client.GetCmix().AddHealthCallback(
......@@ -244,6 +254,8 @@ var rootCmd = &cobra.Command{
// 85% of the nodes
numReg := 1
total := 100
jww.INFO.Printf("Registering with nodes...")
for numReg < (total*3)/4 {
time.Sleep(1 * time.Second)
numReg, total, err = client.GetNodeRegistrationStatus()
......@@ -256,6 +268,8 @@ var rootCmd = &cobra.Command{
client.GetBackupContainer().TriggerBackup("Integration test.")
jww.INFO.Printf("Client backup triggered...")
// Send Messages
msgBody := viper.GetString("message")
time.Sleep(10 * time.Second)
......@@ -376,6 +390,8 @@ var rootCmd = &cobra.Command{
paramsE2E.ExcludedRounds = excludedRounds.NewSet()
}
jww.INFO.Printf("Client Sending messages...")
wg := &sync.WaitGroup{}
sendCnt := int(viper.GetUint("sendCount"))
wg.Add(sendCnt)
......@@ -455,6 +471,8 @@ var rootCmd = &cobra.Command{
waitTimeout := time.Duration(waitSecs) * time.Second
done := false
jww.INFO.Printf("Client receiving messages...")
for !done && expectedCnt != 0 {
timeoutTimer := time.NewTimer(waitTimeout)
select {
......@@ -511,7 +529,7 @@ var rootCmd = &cobra.Command{
if profileOut != "" {
pprof.StopCPUProfile()
}
jww.INFO.Printf("Client exiting!")
},
}
......
......@@ -117,10 +117,14 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
c.Handler = message.NewHandler(c.param.Message, c.session.GetKV(),
c.events, c.session.GetReceptionID())
return c, nil
err := c.initialize(session.GetNDF())
return c, err
}
func (c *client) Connect(ndf *ndf.NetworkDefinition) error {
// initialize turns on network handlers, initializing a host pool and
// network health monitors. This should be called before
// network Follow command is called.
func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
// Start network instance
instance, err := commNetwork.NewInstance(
c.comms.ProtoComms, ndf, nil, nil, commNetwork.None,
......
......@@ -14,15 +14,9 @@ import (
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
)
type Client interface {
// Connect turns on network handlers, initializing a host pool and
// network health monitors. This should be called before
// network Follow command is called.
Connect(ndf *ndf.NetworkDefinition) error
// Follow starts the tracking of the network in a new thread.
// Errors that occur are reported on the ClientErrorReport function if
// passed. The returned stoppable can be used to stop the follower.
......
......@@ -199,10 +199,6 @@ func newMockCmix() *mockCmix {
return &mockCmix{}
}
func (m mockCmix) Connect(ndf *ndf.NetworkDefinition) error {
return nil
}
func (m *mockCmix) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) {
return nil, nil
}
......
......@@ -53,6 +53,7 @@ func newCritical(kv *versioned.KV, hm func(f func(bool)) uint64,
E2eMessageBuffer: cm,
trigger: make(chan bool, 100),
send: send,
healthcb: hm,
}
return c
......
......@@ -20,7 +20,6 @@ import (
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/ndf"
"math/rand"
"sync"
"testing"
......@@ -112,7 +111,6 @@ func newMockFpgCmix() *mockFpgCmix {
}
}
func (m *mockFpgCmix) Connect(*ndf.NetworkDefinition) error { return nil }
func (m *mockFpgCmix) Follow(cmix.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil }
func (m *mockFpgCmix) GetMaxMessageLength() int { return 0 }
func (m *mockFpgCmix) Send(*id.ID, format.Fingerprint, message.Service, []byte, []byte, cmix.CMIXParams) (id.Round, ephemeral.Id, error) {
......
......@@ -27,7 +27,7 @@ import (
type manager struct {
*ratchet.Ratchet
*receive.Switchboard
partitioner parse.Partitioner
partitioner *parse.Partitioner
net cmix.Client
myID *id.ID
rng *fastRNG.StreamGenerator
......@@ -35,6 +35,7 @@ type manager struct {
grp *cyclic.Group
crit *critical
rekeyParams rekey.Params
kv *versioned.KV
}
const e2eRekeyParamsKey = "e2eRekeyParams"
......@@ -69,6 +70,8 @@ func initE2E(kv *versioned.KV, myID *id.ID, privKey *cyclic.Int,
// Load returns an e2e manager from storage. It uses an ID to prefix the kv
// and is used for partner relationships.
// You can use a memkv for an ephemeral e2e id
// Can be initialized with a nil cmix.Client, but will crash on start - use when
// prebuilding e2e identity to be used later
func Load(kv *versioned.KV, net cmix.Client, myID *id.ID,
grp *cyclic.Group, rng *fastRNG.StreamGenerator,
events event.Reporter) (Handler, error) {
......@@ -82,6 +85,8 @@ func Load(kv *versioned.KV, net cmix.Client, myID *id.ID,
// Does not modify the kv prefix in any way to maintain backwards compatibility
// before multiple IDs were supported
// You can use a memkv for an ephemeral e2e id
// Can be initialized with a nil cmix.Client, but will crash on start - use when
// prebuilding e2e identity to be used later
func LoadLegacy(kv *versioned.KV, net cmix.Client, myID *id.ID,
grp *cyclic.Group, rng *fastRNG.StreamGenerator,
events event.Reporter, params rekey.Params) (Handler, error) {
......@@ -118,12 +123,12 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID,
m := &manager{
Switchboard: receive.New(),
partitioner: parse.NewPartitioner(kv, net.GetMaxMessageLength()),
net: net,
myID: myDefaultID,
events: events,
grp: grp,
rekeyParams: rekey.Params{},
kv: kv,
}
var err error
......@@ -144,17 +149,23 @@ func loadE2E(kv *versioned.KV, net cmix.Client, myDefaultID *id.ID,
"Failed to unmarshal rekeyParams data")
}
m.crit = newCritical(kv, net.AddHealthCallback, m.SendE2E)
return m, nil
}
func (m *manager) StartProcesses() (stoppable.Stoppable, error) {
multi := stoppable.NewMulti("e2eManager")
if m.partitioner == nil {
m.partitioner = parse.NewPartitioner(m.kv, m.net.GetMaxMessageLength())
}
if m.crit == nil {
m.crit = newCritical(m.kv, m.net.AddHealthCallback, m.SendE2E)
}
critcalNetworkStopper := stoppable.NewSingle(
"e2eCriticalMessagesStopper")
m.crit.runCriticalMessages(critcalNetworkStopper,
go m.crit.runCriticalMessages(critcalNetworkStopper,
m.net.GetInstance().GetRoundEvents())
multi.Add(critcalNetworkStopper)
......
......@@ -32,7 +32,7 @@ type Partitioner struct {
partition *partition.Store
}
func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner {
func NewPartitioner(kv *versioned.KV, messageSize int) *Partitioner {
p := Partitioner{
baseMessageSize: messageSize,
firstContentsSize: messageSize - firstHeaderLen,
......@@ -43,10 +43,10 @@ func NewPartitioner(kv *versioned.KV, messageSize int) Partitioner {
}
p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize
return p
return &p
}
func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
func (p *Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
timestamp time.Time, payload []byte) ([][]byte, uint64, error) {
if len(payload) > p.maxSize {
......@@ -77,7 +77,7 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
return parts, fullMessageID, nil
}
func (p Partitioner) HandlePartition(sender *id.ID,
func (p *Partitioner) HandlePartition(sender *id.ID,
contents []byte, relationshipFingerprint []byte) (receive.Message, bool) {
if isFirst(contents) {
......@@ -106,19 +106,19 @@ func (p Partitioner) HandlePartition(sender *id.ID,
// FirstPartitionSize returns the max partition payload size for the
// first payload
func (p Partitioner) FirstPartitionSize() uint {
func (p *Partitioner) FirstPartitionSize() uint {
return uint(p.firstContentsSize)
}
// SecondPartitionSize returns the max partition payload size for all
// payloads after the first payload
func (p Partitioner) SecondPartitionSize() uint {
func (p *Partitioner) SecondPartitionSize() uint {
return uint(p.partContentsSize)
}
// PayloadSize Returns the max payload size for a partitionable E2E
// message
func (p Partitioner) PayloadSize() uint {
func (p *Partitioner) PayloadSize() uint {
return uint(p.maxSize)
}
......
......@@ -218,11 +218,6 @@ func (m mockServiceHandler) DeleteService(clientID *id.ID, toDelete message.Serv
type mockNetManager struct{}
func (m *mockNetManager) Connect(ndf *ndf.NetworkDefinition) error {
// TODO implement me
panic("implement me")
}
func (m *mockNetManager) GetIdentity(get *id.ID) (identity.TrackedID, error) {
//TODO implement me
panic("implement me")
......
......@@ -166,7 +166,6 @@ func newMockCmix(myID *id.ID, handler *mockCmixHandler, t testing.TB) *mockCmix
}
}
func (m *mockCmix) Connect(*ndf.NetworkDefinition) error { return nil }
func (m *mockCmix) Follow(cmix.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil }
func (m *mockCmix) GetMaxMessageLength() int {
......
......@@ -156,12 +156,6 @@ func OpenCmix(storageDir string, password []byte,
return nil, err
}
c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage,
c.rng, c.events)
if err != nil {
return nil, err
}
return c, nil
}
......@@ -213,6 +207,12 @@ func LoadCmix(storageDir string, password []byte, parameters Params) (*Cmix, err
return nil, err
}
c.network, err = cmix.NewClient(parameters.CMix, c.comms, c.storage,
c.rng, c.events)
if err != nil {
return nil, err
}
jww.INFO.Printf("Cmix Logged in: \n\tTransmissionID: %s "+
"\n\tReceptionID: %s", c.storage.GetTransmissionID(), c.storage.GetReceptionID())
......@@ -245,11 +245,6 @@ func LoadCmix(storageDir string, password []byte, parameters Params) (*Cmix, err
}
}
err = c.network.Connect(def)
if err != nil {
return nil, err
}
err = c.registerFollower()
if err != nil {
return nil, err
......
......@@ -64,6 +64,12 @@ func LoginLegacy(client *Cmix, callbacks auth.Callbacks) (m *E2e, err error) {
}
client.GetCmix().AddIdentity(client.GetUser().ReceptionID, time.Time{}, true)
err = client.AddService(m.e2e.StartProcesses)
if err != nil {
return nil, errors.WithMessage(err, "Failed to add "+
"the e2e processies")
}
m.auth, err = auth.NewState(client.GetStorage().GetKV(), client.GetCmix(),
m.e2e, client.GetRng(), client.GetEventReporter(),
auth.GetDefaultParams(), callbacks, m.backup.TriggerBackup)
......@@ -94,7 +100,7 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte,
return nil, err
}
c, err := OpenCmix(storageDir, password, params)
c, err := LoadCmix(storageDir, password, params)
if err != nil {
return nil, err
}
......@@ -113,11 +119,6 @@ func LoginWithNewBaseNDF_UNSAFE(storageDir string, password []byte,
"able to register or track network.")
}
err = c.network.Connect(def)
if err != nil {
return nil, err
}
err = c.registerFollower()
if err != nil {
return nil, err
......@@ -151,7 +152,7 @@ func LoginWithProtoClient(storageDir string, password []byte,
return nil, err
}
c, err := OpenCmix(storageDir, password, params)
c, err := LoadCmix(storageDir, password, params)
if err != nil {
return nil, err
}
......@@ -163,11 +164,6 @@ func LoginWithProtoClient(storageDir string, password []byte,
return nil, err
}
err = c.network.Connect(def)
if err != nil {
return nil, err
}
c.network.AddIdentity(c.GetUser().ReceptionID, time.Time{}, true)
// FIXME: The callbacks need to be set, so I suppose we would need to
......@@ -226,6 +222,12 @@ func login(client *Cmix, callbacks auth.Callbacks,
"newly created e2e store")
}
err = client.AddService(m.e2e.StartProcesses)
if err != nil {
return nil, errors.WithMessage(err, "Failed to add "+
"the e2e processies")
}
m.auth, err = auth.NewState(kv, client.GetCmix(),
m.e2e, client.GetRng(), client.GetEventReporter(),
auth.GetDefaultTemporaryParams(), callbacks, m.backup.TriggerBackup)
......
......@@ -7,7 +7,6 @@
package xxdk
import (
"gitlab.com/xx_network/primitives/ndf"
"time"
"gitlab.com/elixxir/client/cmix"
......@@ -93,9 +92,6 @@ func (d *dummyEventMgr) EventService() (stoppable.Stoppable, error) {
}
/* Below methods built for interface adherence */
func (t *testNetworkManagerGeneric) Connect(ndf *ndf.NetworkDefinition) error {
return nil
}
func (t *testNetworkManagerGeneric) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, error) {
return nil, nil
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment