diff --git a/api/utilsInterfaces_test.go b/api/utilsInterfaces_test.go index 0997993aebfabf2a0bd4fa4ca2efb9c269112a1f..730f7a242a93cd0027bb6713dcbc8ac6c2bd2a7e 100644 --- a/api/utilsInterfaces_test.go +++ b/api/utilsInterfaces_test.go @@ -100,6 +100,12 @@ func (t *testNetworkManagerGeneric) Follow(report interfaces.ClientErrorReport) func (t *testNetworkManagerGeneric) CheckGarbledMessages() { return } + +func (t *testNetworkManagerGeneric) GetVerboseRounds()string{ + return "" +} + + func (t *testNetworkManagerGeneric) SendE2E(message.Send, params.E2E, *stoppable.Single) ( []id.Round, cE2e.MessageID, time.Time, error) { rounds := []id.Round{id.Round(0), id.Round(1), id.Round(2)} diff --git a/cmd/root.go b/cmd/root.go index c09275041992074413fa95f5b2a9701cfc74dedb..af05fe0c6307cf3f0d4d3561e5317fe47b350d91 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -247,9 +247,10 @@ var rootCmd = &cobra.Command{ } } fmt.Printf("Received %d\n", receiveCnt) - - jww.DEBUG.Printf("Verbose round information: \n%s", + jww.DEBUG.Printf("TestPrintthisshit") + fmt.Printf("Verbose round information: %s", client.GetNetworkInterface().GetVerboseRounds()) + jww.DEBUG.Printf("printedVerboseRounds") err = client.StopNetworkFollower() if err != nil { jww.WARN.Printf( @@ -414,6 +415,7 @@ func initClient() *api.Client { jww.INFO.Printf("Setting Uncheck Round Period to %v", period) netParams.UncheckRoundPeriod = period } + netParams.VerboseRoundTracking = viper.GetUint("logLevel") > 0 //load the client client, err := api.Login(storeDir, []byte(pass), netParams) diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go index df7fc4e35b68bad6d9a57fb7e2c34ef6f15613c0..9fc0fd6aab4babfba1ea9c26285abbfd15df3d98 100644 --- a/groupChat/utils_test.go +++ b/groupChat/utils_test.go @@ -261,6 +261,10 @@ func (tnm *testNetworkManager) SendUnsafe(message.Send, params.Unsafe) ([]id.Rou return []id.Round{}, nil } +func (tnm *testNetworkManager) GetVerboseRounds()string{ + return "" +} + func (tnm *testNetworkManager) SendCMIX(format.Message, *id.ID, params.CMIX) (id.Round, ephemeral.Id, error) { return 0, ephemeral.Id{}, nil } diff --git a/network/follow.go b/network/follow.go index 8c8404b88df79c5c5d5e1adf80159f3077a9b737..3416eeacd679f2c06562bba4a9a5f39078d1c6bb 100644 --- a/network/follow.go +++ b/network/follow.go @@ -294,7 +294,11 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, // are messages waiting in rounds and then sends signals to the appropriate // handling threads roundChecker := func(rid id.Round) bool { - return rounds.Checker(rid, filterList, identity.CR) + hasMessage := rounds.Checker(rid, filterList, identity.CR) + if !hasMessage && m.verboseRounds != nil{ + m.verboseRounds.denote(rid, RoundState(NoMessageAvailable)) + } + return hasMessage } // move the earliest unknown round tracker forward to the earliest @@ -339,16 +343,21 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, } if m.verboseRounds != nil { - for i := earliestTrackedRound; i <= earliestRemaining; i++ { + trackingStart := updated + if uint(earliestRemaining-updated)>m.param.KnownRoundsThreshold{ + trackingStart = earliestRemaining-id.Round(m.param.KnownRoundsThreshold) + } + jww.DEBUG.Printf("Rounds tracked: %v to %v", trackingStart, earliestRemaining) + for i := trackingStart; i <= earliestRemaining; i++ { state := Unchecked for _, rid := range roundsWithMessages { if rid == i { - state = Checked + state = MessageAvailable } } for _, rid := range roundsWithMessages2 { if rid == i { - state = Checked + state = MessageAvailable } } for _, rid := range roundsUnknown { diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index 18122389d048141dbc71fd8d56de62cfbebd1253..10de8277e31e346305a2baaaa760ed29c52086a3 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -543,3 +543,4 @@ func readRangeUint32(start, end uint32, rng io.Reader) uint32 { return (res % size) + start } } + diff --git a/network/gateway/sender.go b/network/gateway/sender.go index ddd41478cc7845347bd5adde640794766cf9cc8f..0f507d7e536912788502e8e83bc7e808fc24bfa3 100644 --- a/network/gateway/sender.go +++ b/network/gateway/sender.go @@ -18,6 +18,7 @@ import ( "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" + "strings" ) // Sender Object used for sending that wraps the HostPool for providing destinations @@ -40,18 +41,21 @@ func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.Net func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) { proxies := s.getAny(s.poolParams.ProxyAttempts, nil) - for i := range proxies { - result, err := sendFunc(proxies[i]) + for proxy := range proxies { + result, err := sendFunc(proxies[proxy]) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToAny") - } else if err == nil { + }else if err==nil{ return result, nil - } else { - jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err) - _, err = s.checkReplace(proxies[i].GetId(), err) - if err != nil { - jww.ERROR.Printf("Unable to checkReplace: %+v", err) - } + } else if strings.Contains(err.Error(),"unable to connect to target host") { + // Retry of the proxy could not communicate + jww.WARN.Printf("Unable to SendToAny via %s: proxy could not contact requested host: %s", + proxies[proxy].GetId().String(), err) + }else if replaced, checkReplaceErr := s.checkReplace(proxies[proxy].GetId(), err); replaced{ + jww.WARN.Printf("Unable to SendToAny, replaced a proxy %s: %s", + proxies[proxy].GetId().String(), checkReplaceErr) + }else{ + return nil, errors.WithMessage(err,"Received error from remote") } } @@ -60,7 +64,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error // SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations func (s *Sender) SendToPreferred(targets []*id.ID, - sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error), + sendFunc func(host *connect.Host, target *id.ID) (interface{}, error), stop *stoppable.Single) (interface{}, error) { // Get the hosts and shuffle randomly @@ -68,22 +72,24 @@ func (s *Sender) SendToPreferred(targets []*id.ID, // Attempt to send directly to targets if they are in the HostPool for i := range targetHosts { - result, didAbort, err := sendFunc(targetHosts[i], targets[i]) + result, err := sendFunc(targetHosts[i], targets[i]) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") } else if err == nil { return result, nil + } else if strings.Contains(err.Error(),"unable to connect to target host") { + // Retry of the proxy could not communicate + jww.WARN.Printf("Unable to SendToPreferred first pass %s via %s: %s, proxy could not contact requested host", + targets[i], targetHosts[i].GetId(), err) + continue + } else if replaced, checkReplaceErr := s.checkReplace(targetHosts[i].GetId(), err); replaced { + jww.WARN.Printf("Unable to SendToPreferred first pass %s via %s: %s, proxy failed, was replaced", + targets[i], targetHosts[i].GetId(), checkReplaceErr) + continue } else { - if didAbort { - return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway %s", - targetHosts[i].GetId().String()) - } - jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s", + jww.WARN.Printf("Unable to SendToPreferred first pass %s via %s: %s, comm returned an error", targets[i], targetHosts[i].GetId(), err) - _, err = s.checkReplace(targetHosts[i].GetId(), err) - if err != nil { - jww.ERROR.Printf("Unable to checkReplace: %+v", err) - } + return result, err } } @@ -115,29 +121,35 @@ func (s *Sender) SendToPreferred(targets []*id.ID, continue } - result, didAbort, err := sendFunc(targetProxies[proxyIdx], target) + result, err := sendFunc(proxy, target) if stop != nil && !stop.IsRunning() { return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred") } else if err == nil { return result, nil + } else if strings.Contains(err.Error(),"unable to connect to target host") { + // Retry of the proxy could not communicate + jww.WARN.Printf("Unable to SendToPreferred second pass %s via %s: %s, proxy could not contact requested host", + target, proxy, err) + continue + } else if replaced, checkReplaceErr := s.checkReplace(proxy.GetId(), err); replaced { + jww.WARN.Printf("Unable to SendToPreferred second pass %s via %s: %s, proxy failed, was replaced", + target, proxy.GetId(), checkReplaceErr) + badProxies[proxy.String()] = nil + continue } else { - if didAbort { - return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway proxy %s", - proxy.GetId().String()) - } - jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+ - "%s: %s", target, proxy.GetId(), err) - wasReplaced, err := s.checkReplace(proxy.GetId(), err) - if err != nil { - jww.ERROR.Printf("Unable to checkReplace: %+v", err) - } - // If the proxy was replaced, add as a bad proxy - if wasReplaced { - badProxies[proxy.String()] = nil - } + jww.WARN.Printf("Unable to SendToPreferred second pass %s via %s: %s, comm returned an error", + target, proxy.GetId(), err) + return result, err } } } return nil, errors.Errorf("Unable to send to any preferred") } + + +func (s *Sender)sendHelper(target []*id.ID, + sendFunc func(host *connect.Host, target *id.ID) (interface{}, error), + stop *stoppable.Single){ + +} diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go index 9f75ace1429e947ef7c8a399f3f088e2583777de..0ec7dc11f8edde72a82affd3b718bcec121bf60c 100644 --- a/network/gateway/utils_test.go +++ b/network/gateway/utils_test.go @@ -129,16 +129,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition { const happyPathReturn = "happyPathReturn" -func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, bool, error) { - return happyPathReturn, false, nil +func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) { + return happyPathReturn, nil } -func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, bool, error) { - return nil, false, fmt.Errorf(errorsList[0]) +func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) { + return nil, fmt.Errorf(errorsList[0]) } -func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, bool, error) { - return nil, false, fmt.Errorf("Unexpected error: Oopsie") +func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) { + return nil, fmt.Errorf("Unexpected error: Oopsie") } func SendToAny_HappyPath(host *connect.Host) (interface{}, error) { diff --git a/network/manager.go b/network/manager.go index 2e53a7a0f12c2a6c73abd42e6ed65e5dce88c0c1..f47191c0bdb39add026c8ff11ee7b7b946368975 100644 --- a/network/manager.go +++ b/network/manager.go @@ -231,5 +231,8 @@ func (m *manager) SetPoolFilter(f gateway.Filter) { // GetVerboseRounds returns verbose round information func (m *manager) GetVerboseRounds() string { + if m.verboseRounds==nil{ + return "Verbose Round tracking not enabled" + } return m.verboseRounds.String() } diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go index 2028e435692425f6df01d2bdcbc9d03087c37250..6484c0ffac997790daa31f2ebd71aaf07be72a4b 100644 --- a/network/message/sendCmix.go +++ b/network/message/sendCmix.go @@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, encMsg.Digest(), firstGateway.String()) // Send the payload - sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) { + sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { wrappedMsg.Target = target.Marshal() result, err := comms.SendPutMessage(host, wrappedMsg) if err != nil { @@ -137,10 +137,10 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, if warn { jww.WARN.Printf("SendCmix Failed: %+v", err) } else { - return result, true, errors.WithMessagef(err, "SendCmix %s", unrecoverableError) + return result, errors.WithMessagef(err, "SendCmix %s", unrecoverableError) } } - return result, false, err + return result, err } result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, stop) diff --git a/network/message/sendCmixUtils.go b/network/message/sendCmixUtils.go index 28917e297a1d0f86e27c9dc2837567196bf1cfbd..43973970d0d00d6554aa8fa8382d879e4e0dc1ef 100644 --- a/network/message/sendCmixUtils.go +++ b/network/message/sendCmixUtils.go @@ -41,6 +41,7 @@ const unrecoverableError = "failed with an unrecoverable error" // context. If the error is not among recoverable errors, then the recoverable // boolean will be returned false. If the error is among recoverable errors, // then the boolean will return true. +// recoverable means we should try resending to the round func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, session *storage.Session, nodeRegistration chan network.NodeGateway, recipientString string, bestRound *pb.RoundInfo, @@ -65,7 +66,7 @@ func handlePutMessageError(firstGateway *id.ID, instance *network.Instance, // Trigger go handleMissingNodeKeys(instance, nodeRegistration, []*id.ID{nodeID}) - return true, errors.WithMessagef(err, "Failed to send to [%s] via %s "+ + return false, errors.WithMessagef(err, "Failed to send to [%s] via %s "+ "due to failed authentication, retrying...", recipientString, firstGateway) } diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go index d9e098715a5032c90ed2bef7438f15ce2714f890..6bf064a712906f8f747ff11f66c57ec084702d5a 100644 --- a/network/message/sendManyCmix.go +++ b/network/message/sendManyCmix.go @@ -137,7 +137,7 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message, } // Send the payload - sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) { + sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) { wrappedMessage.Target = target.Marshal() result, err := comms.SendPutManyMessages(host, wrappedMessage) if err != nil { @@ -146,11 +146,11 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message, if warn { jww.WARN.Printf("SendManyCMIX Failed: %+v", err) } else { - return result, false, errors.WithMessagef(err, + return result, errors.WithMessagef(err, "SendManyCMIX %s", unrecoverableError) } } - return result, false, err + return result, err } result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, nil) diff --git a/network/node/register.go b/network/node/register.go index 2aa7a7d203a23cd800b202075f6ee169b1e2592e..471e892a3d33d522e9c897debcd06607aa70e0e5 100644 --- a/network/node/register.go +++ b/network/node/register.go @@ -194,12 +194,10 @@ func requestNonce(sender *gateway.Sender, comms RegisterNodeCommsInterface, gwId TimeStamp: registrationTimestampNano, }) if err != nil { - errMsg := fmt.Sprintf("Register: Failed requesting nonce from gateway: %+v", err) - return nil, errors.New(errMsg) + return nil, errors.WithMessage(err,"Register: Failed requesting nonce from gateway") } if nonceResponse.Error != "" { - err := errors.New(fmt.Sprintf("requestNonce: nonceResponse error: %s", nonceResponse.Error)) - return nil, err + return nil, errors.WithMessage(err,"requestNonce: nonceResponse error") } return nonceResponse, nil }, stop) @@ -207,6 +205,7 @@ func requestNonce(sender *gateway.Sender, comms RegisterNodeCommsInterface, gwId if err != nil { return nil, nil, err } + nonceResponse := result.(*pb.Nonce) // Use Client keypair to sign Server nonce diff --git a/network/roundTracking.go b/network/roundTracking.go index eeeeff42c9b68e76f98a03f48939b3b2331d711e..650747e1c8ff5145f597eb909812bbeafef9fa38 100644 --- a/network/roundTracking.go +++ b/network/roundTracking.go @@ -8,6 +8,7 @@ package network import ( "fmt" + jww "github.com/spf13/jwalterweatherman" "gitlab.com/xx_network/primitives/id" "sort" "sync" @@ -18,7 +19,8 @@ type RoundState uint8 const ( Unchecked = iota Unknown - Checked + NoMessageAvailable + MessageAvailable Abandoned ) @@ -28,8 +30,10 @@ func (rs RoundState) String() string { return "Unchecked" case Unknown: return "Unknown" - case Checked: - return "Checked" + case MessageAvailable: + return "Message Available" + case NoMessageAvailable: + return "No Message Available" case Abandoned: return "Abandoned" default: @@ -51,12 +55,16 @@ func NewRoundTracker() *RoundTracker { func (rt *RoundTracker) denote(rid id.Round, state RoundState) { rt.mux.Lock() defer rt.mux.Unlock() + if storedState, exists := rt.state[rid]; exists || storedState > state { + return + } rt.state[rid] = state } func (rt *RoundTracker) String() string { rt.mux.Lock() defer rt.mux.Unlock() + jww.DEBUG.Printf("Debug Printing status of %d rounds", len(rt.state)) keys := make([]int, 0, len(rt.state)) for key := range rt.state { keys = append(keys, int(key)) diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index caf5ce0e70d2b688a2045feb8c0dd21bcecdee6a..bd7f1d682760d2f0160e2b9c4f9ca32f749bc534 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -143,7 +143,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, stop *stoppable.Single) (message.Bundle, error) { start := time.Now() // Send to the gateways using backup proxies - result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, bool, error) { + result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) { jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v) "+ "via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId()) @@ -158,10 +158,10 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, msgResp, err := comms.RequestMessages(host, msgReq) if err == nil && !msgResp.GetHasRound() { jww.INFO.Printf("No round error for round %d received from %s", roundID, target) - return message.Bundle{}, false, errors.Errorf(noRoundError, roundID) + return message.Bundle{}, errors.Errorf(noRoundError, roundID) } - return msgResp, false, err + return msgResp, err }, stop) jww.INFO.Printf("Received message for round %d, processing...", roundID) // Fail the round if an error occurs so it can be tried again later diff --git a/single/manager_test.go b/single/manager_test.go index ac5dd4253820f4fa8f2f50d5cdba079f0c94147c..9657e47b4b9dd1030156eb15e5250184d64d3579 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -287,6 +287,10 @@ func (tnm *testNetworkManager) SendE2E(message.Send, params.E2E, *stoppable.Sing return nil, e2e.MessageID{}, time.Time{}, nil } +func (tnm *testNetworkManager) GetVerboseRounds()string{ + return "" +} + func (tnm *testNetworkManager) SendUnsafe(_ message.Send, _ params.Unsafe) ([]id.Round, error) { return []id.Round{}, nil }