From fbcdd95097956ecbeb448baa915be9a137ce03fc Mon Sep 17 00:00:00 2001
From: Benjamin Wenger <ben@elixxir.ioo>
Date: Mon, 20 Sep 2021 15:32:50 -0700
Subject: [PATCH] improved host pool

---
 api/utilsInterfaces_test.go      |  6 +++
 cmd/root.go                      |  6 ++-
 groupChat/utils_test.go          |  4 ++
 network/follow.go                | 17 +++++--
 network/gateway/hostPool.go      |  1 +
 network/gateway/sender.go        | 82 ++++++++++++++++++--------------
 network/gateway/utils_test.go    | 12 ++---
 network/manager.go               |  3 ++
 network/message/sendCmix.go      |  6 +--
 network/message/sendCmixUtils.go |  3 +-
 network/message/sendManyCmix.go  |  6 +--
 network/node/register.go         |  7 ++-
 network/roundTracking.go         | 14 ++++--
 network/rounds/retrieve.go       |  6 +--
 single/manager_test.go           |  4 ++
 15 files changed, 113 insertions(+), 64 deletions(-)

diff --git a/api/utilsInterfaces_test.go b/api/utilsInterfaces_test.go
index 0997993ae..730f7a242 100644
--- a/api/utilsInterfaces_test.go
+++ b/api/utilsInterfaces_test.go
@@ -100,6 +100,12 @@ func (t *testNetworkManagerGeneric) Follow(report interfaces.ClientErrorReport)
 func (t *testNetworkManagerGeneric) CheckGarbledMessages() {
 	return
 }
+
+func (t *testNetworkManagerGeneric) GetVerboseRounds()string{
+	return ""
+}
+
+
 func (t *testNetworkManagerGeneric) SendE2E(message.Send, params.E2E, *stoppable.Single) (
 	[]id.Round, cE2e.MessageID, time.Time, error) {
 	rounds := []id.Round{id.Round(0), id.Round(1), id.Round(2)}
diff --git a/cmd/root.go b/cmd/root.go
index c09275041..af05fe0c6 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -247,9 +247,10 @@ var rootCmd = &cobra.Command{
 			}
 		}
 		fmt.Printf("Received %d\n", receiveCnt)
-
-		jww.DEBUG.Printf("Verbose round information: \n%s",
+		jww.DEBUG.Printf("TestPrintthisshit")
+		fmt.Printf("Verbose round information: %s",
 			client.GetNetworkInterface().GetVerboseRounds())
+		jww.DEBUG.Printf("printedVerboseRounds")
 		err = client.StopNetworkFollower()
 		if err != nil {
 			jww.WARN.Printf(
@@ -414,6 +415,7 @@ func initClient() *api.Client {
 		jww.INFO.Printf("Setting Uncheck Round Period to %v", period)
 		netParams.UncheckRoundPeriod = period
 	}
+	netParams.VerboseRoundTracking = viper.GetUint("logLevel") > 0
 
 	//load the client
 	client, err := api.Login(storeDir, []byte(pass), netParams)
diff --git a/groupChat/utils_test.go b/groupChat/utils_test.go
index df7fc4e35..9fc0fd6aa 100644
--- a/groupChat/utils_test.go
+++ b/groupChat/utils_test.go
@@ -261,6 +261,10 @@ func (tnm *testNetworkManager) SendUnsafe(message.Send, params.Unsafe) ([]id.Rou
 	return []id.Round{}, nil
 }
 
+func (tnm *testNetworkManager) GetVerboseRounds()string{
+	return ""
+}
+
 func (tnm *testNetworkManager) SendCMIX(format.Message, *id.ID, params.CMIX) (id.Round, ephemeral.Id, error) {
 	return 0, ephemeral.Id{}, nil
 }
diff --git a/network/follow.go b/network/follow.go
index 8c8404b88..3416eeacd 100644
--- a/network/follow.go
+++ b/network/follow.go
@@ -294,7 +294,11 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
 	// are messages waiting in rounds and then sends signals to the appropriate
 	// handling threads
 	roundChecker := func(rid id.Round) bool {
-		return rounds.Checker(rid, filterList, identity.CR)
+		hasMessage := rounds.Checker(rid, filterList, identity.CR)
+		if !hasMessage && m.verboseRounds != nil{
+			m.verboseRounds.denote(rid, RoundState(NoMessageAvailable))
+		}
+		return hasMessage
 	}
 
 	// move the earliest unknown round tracker forward to the earliest
@@ -339,16 +343,21 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
 	}
 
 	if m.verboseRounds != nil {
-		for i := earliestTrackedRound; i <= earliestRemaining; i++ {
+		trackingStart := updated
+		if uint(earliestRemaining-updated)>m.param.KnownRoundsThreshold{
+			trackingStart = earliestRemaining-id.Round(m.param.KnownRoundsThreshold)
+		}
+		jww.DEBUG.Printf("Rounds tracked: %v to %v", trackingStart, earliestRemaining)
+		for i := trackingStart; i <= earliestRemaining; i++ {
 			state := Unchecked
 			for _, rid := range roundsWithMessages {
 				if rid == i {
-					state = Checked
+					state = MessageAvailable
 				}
 			}
 			for _, rid := range roundsWithMessages2 {
 				if rid == i {
-					state = Checked
+					state = MessageAvailable
 				}
 			}
 			for _, rid := range roundsUnknown {
diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go
index 18122389d..10de8277e 100644
--- a/network/gateway/hostPool.go
+++ b/network/gateway/hostPool.go
@@ -543,3 +543,4 @@ func readRangeUint32(start, end uint32, rng io.Reader) uint32 {
 		return (res % size) + start
 	}
 }
+
diff --git a/network/gateway/sender.go b/network/gateway/sender.go
index ddd41478c..0f507d7e5 100644
--- a/network/gateway/sender.go
+++ b/network/gateway/sender.go
@@ -18,6 +18,7 @@ import (
 	"gitlab.com/xx_network/comms/connect"
 	"gitlab.com/xx_network/primitives/id"
 	"gitlab.com/xx_network/primitives/ndf"
+	"strings"
 )
 
 // Sender Object used for sending that wraps the HostPool for providing destinations
@@ -40,18 +41,21 @@ func NewSender(poolParams PoolParams, rng *fastRNG.StreamGenerator, ndf *ndf.Net
 func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error), stop *stoppable.Single) (interface{}, error) {
 
 	proxies := s.getAny(s.poolParams.ProxyAttempts, nil)
-	for i := range proxies {
-		result, err := sendFunc(proxies[i])
+	for proxy := range proxies {
+		result, err := sendFunc(proxies[proxy])
 		if stop != nil && !stop.IsRunning() {
 			return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToAny")
-		} else if err == nil {
+		}else if err==nil{
 			return result, nil
-		} else {
-			jww.WARN.Printf("Unable to SendToAny %s: %s", proxies[i].GetId().String(), err)
-			_, err = s.checkReplace(proxies[i].GetId(), err)
-			if err != nil {
-				jww.ERROR.Printf("Unable to checkReplace: %+v", err)
-			}
+		} else if strings.Contains(err.Error(),"unable to connect to target host") {
+			// Retry of the proxy could not communicate
+			jww.WARN.Printf("Unable to SendToAny via %s: proxy could not contact requested host: %s",
+				proxies[proxy].GetId().String(), err)
+		}else if replaced, checkReplaceErr := s.checkReplace(proxies[proxy].GetId(), err); replaced{
+			jww.WARN.Printf("Unable to SendToAny, replaced a proxy %s: %s",
+				proxies[proxy].GetId().String(), checkReplaceErr)
+		}else{
+			return nil, errors.WithMessage(err,"Received error from remote")
 		}
 	}
 
@@ -60,7 +64,7 @@ func (s *Sender) SendToAny(sendFunc func(host *connect.Host) (interface{}, error
 
 // SendToPreferred Call given sendFunc to any Host in the HostPool, attempting with up to numProxies destinations
 func (s *Sender) SendToPreferred(targets []*id.ID,
-	sendFunc func(host *connect.Host, target *id.ID) (interface{}, bool, error),
+	sendFunc func(host *connect.Host, target *id.ID) (interface{}, error),
 	stop *stoppable.Single) (interface{}, error) {
 
 	// Get the hosts and shuffle randomly
@@ -68,22 +72,24 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
 
 	// Attempt to send directly to targets if they are in the HostPool
 	for i := range targetHosts {
-		result, didAbort, err := sendFunc(targetHosts[i], targets[i])
+		result, err := sendFunc(targetHosts[i], targets[i])
 		if stop != nil && !stop.IsRunning() {
 			return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred")
 		} else if err == nil {
 			return result, nil
+		} else if strings.Contains(err.Error(),"unable to connect to target host") {
+			// Retry of the proxy could not communicate
+			jww.WARN.Printf("Unable to SendToPreferred first pass %s via %s: %s, proxy could not contact requested host",
+				targets[i], targetHosts[i].GetId(), err)
+			continue
+		} else if replaced, checkReplaceErr := s.checkReplace(targetHosts[i].GetId(), err); replaced {
+			jww.WARN.Printf("Unable to SendToPreferred first pass %s via %s: %s, proxy failed, was replaced",
+				targets[i], targetHosts[i].GetId(), checkReplaceErr)
+			continue
 		} else {
-			if didAbort {
-				return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway %s",
-					targetHosts[i].GetId().String())
-			}
-			jww.WARN.Printf("Unable to SendToPreferred %s via %s: %s",
+			jww.WARN.Printf("Unable to SendToPreferred first pass %s via %s: %s, comm returned an error",
 				targets[i], targetHosts[i].GetId(), err)
-			_, err = s.checkReplace(targetHosts[i].GetId(), err)
-			if err != nil {
-				jww.ERROR.Printf("Unable to checkReplace: %+v", err)
-			}
+			return result, err
 		}
 	}
 
@@ -115,29 +121,35 @@ func (s *Sender) SendToPreferred(targets []*id.ID,
 				continue
 			}
 
-			result, didAbort, err := sendFunc(targetProxies[proxyIdx], target)
+			result, err := sendFunc(proxy, target)
 			if stop != nil && !stop.IsRunning() {
 				return nil, errors.Errorf(stoppable.ErrMsg, stop.Name(), "SendToPreferred")
 			} else if err == nil {
 				return result, nil
+			} else if strings.Contains(err.Error(),"unable to connect to target host") {
+				// Retry of the proxy could not communicate
+				jww.WARN.Printf("Unable to SendToPreferred second pass %s via %s: %s, proxy could not contact requested host",
+					target, proxy, err)
+				continue
+			} else if replaced, checkReplaceErr := s.checkReplace(proxy.GetId(), err); replaced {
+				jww.WARN.Printf("Unable to SendToPreferred second pass %s via %s: %s, proxy failed, was replaced",
+					target, proxy.GetId(), checkReplaceErr)
+				badProxies[proxy.String()] = nil
+				continue
 			} else {
-				if didAbort {
-					return nil, errors.WithMessagef(err, "Aborted SendToPreferred gateway proxy %s",
-						proxy.GetId().String())
-				}
-				jww.WARN.Printf("Unable to SendToPreferred %s via proxy "+
-					"%s: %s", target, proxy.GetId(), err)
-				wasReplaced, err := s.checkReplace(proxy.GetId(), err)
-				if err != nil {
-					jww.ERROR.Printf("Unable to checkReplace: %+v", err)
-				}
-				// If the proxy was replaced, add as a bad proxy
-				if wasReplaced {
-					badProxies[proxy.String()] = nil
-				}
+				jww.WARN.Printf("Unable to SendToPreferred second pass %s via %s: %s, comm returned an error",
+					target, proxy.GetId(), err)
+				return result, err
 			}
 		}
 	}
 
 	return nil, errors.Errorf("Unable to send to any preferred")
 }
+
+
+func (s *Sender)sendHelper(target  []*id.ID,
+	sendFunc func(host *connect.Host, target *id.ID) (interface{}, error),
+	stop *stoppable.Single){
+
+}
diff --git a/network/gateway/utils_test.go b/network/gateway/utils_test.go
index 9f75ace14..0ec7dc11f 100644
--- a/network/gateway/utils_test.go
+++ b/network/gateway/utils_test.go
@@ -129,16 +129,16 @@ func getTestNdf(face interface{}) *ndf.NetworkDefinition {
 
 const happyPathReturn = "happyPathReturn"
 
-func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, bool, error) {
-	return happyPathReturn, false, nil
+func SendToPreferred_HappyPath(host *connect.Host, target *id.ID) (interface{}, error) {
+	return happyPathReturn, nil
 }
 
-func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, bool, error) {
-	return nil, false, fmt.Errorf(errorsList[0])
+func SendToPreferred_KnownError(host *connect.Host, target *id.ID) (interface{}, error) {
+	return nil, fmt.Errorf(errorsList[0])
 }
 
-func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, bool, error) {
-	return nil, false, fmt.Errorf("Unexpected error: Oopsie")
+func SendToPreferred_UnknownError(host *connect.Host, target *id.ID) (interface{}, error) {
+	return nil, fmt.Errorf("Unexpected error: Oopsie")
 }
 
 func SendToAny_HappyPath(host *connect.Host) (interface{}, error) {
diff --git a/network/manager.go b/network/manager.go
index 2e53a7a0f..f47191c0b 100644
--- a/network/manager.go
+++ b/network/manager.go
@@ -231,5 +231,8 @@ func (m *manager) SetPoolFilter(f gateway.Filter) {
 
 // GetVerboseRounds returns verbose round information
 func (m *manager) GetVerboseRounds() string {
+	if m.verboseRounds==nil{
+		return "Verbose Round tracking not enabled"
+	}
 	return m.verboseRounds.String()
 }
diff --git a/network/message/sendCmix.go b/network/message/sendCmix.go
index 2028e4356..6484c0ffa 100644
--- a/network/message/sendCmix.go
+++ b/network/message/sendCmix.go
@@ -128,7 +128,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
 			encMsg.Digest(), firstGateway.String())
 
 		// Send the payload
-		sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) {
+		sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) {
 			wrappedMsg.Target = target.Marshal()
 			result, err := comms.SendPutMessage(host, wrappedMsg)
 			if err != nil {
@@ -137,10 +137,10 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
 				if warn {
 					jww.WARN.Printf("SendCmix Failed: %+v", err)
 				} else {
-					return result, true, errors.WithMessagef(err, "SendCmix %s", unrecoverableError)
+					return result, errors.WithMessagef(err, "SendCmix %s", unrecoverableError)
 				}
 			}
-			return result, false, err
+			return result, err
 		}
 		result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, stop)
 
diff --git a/network/message/sendCmixUtils.go b/network/message/sendCmixUtils.go
index 28917e297..43973970d 100644
--- a/network/message/sendCmixUtils.go
+++ b/network/message/sendCmixUtils.go
@@ -41,6 +41,7 @@ const unrecoverableError = "failed with an unrecoverable error"
 // context. If the error is not among recoverable errors, then the recoverable
 // boolean will be returned false. If the error is among recoverable errors,
 // then the boolean will return true.
+// recoverable means we should try resending to the round
 func handlePutMessageError(firstGateway *id.ID, instance *network.Instance,
 	session *storage.Session, nodeRegistration chan network.NodeGateway,
 	recipientString string, bestRound *pb.RoundInfo,
@@ -65,7 +66,7 @@ func handlePutMessageError(firstGateway *id.ID, instance *network.Instance,
 		// Trigger
 		go handleMissingNodeKeys(instance, nodeRegistration, []*id.ID{nodeID})
 
-		return true, errors.WithMessagef(err, "Failed to send to [%s] via %s "+
+		return false, errors.WithMessagef(err, "Failed to send to [%s] via %s "+
 			"due to failed authentication, retrying...",
 			recipientString, firstGateway)
 	}
diff --git a/network/message/sendManyCmix.go b/network/message/sendManyCmix.go
index d9e098715..6bf064a71 100644
--- a/network/message/sendManyCmix.go
+++ b/network/message/sendManyCmix.go
@@ -137,7 +137,7 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message,
 		}
 
 		// Send the payload
-		sendFunc := func(host *connect.Host, target *id.ID) (interface{}, bool, error) {
+		sendFunc := func(host *connect.Host, target *id.ID) (interface{}, error) {
 			wrappedMessage.Target = target.Marshal()
 			result, err := comms.SendPutManyMessages(host, wrappedMessage)
 			if err != nil {
@@ -146,11 +146,11 @@ func sendManyCmixHelper(sender *gateway.Sender, msgs map[id.ID]format.Message,
 				if warn {
 					jww.WARN.Printf("SendManyCMIX Failed: %+v", err)
 				} else {
-					return result, false, errors.WithMessagef(err,
+					return result, errors.WithMessagef(err,
 						"SendManyCMIX %s", unrecoverableError)
 				}
 			}
-			return result, false, err
+			return result, err
 		}
 		result, err := sender.SendToPreferred([]*id.ID{firstGateway}, sendFunc, nil)
 
diff --git a/network/node/register.go b/network/node/register.go
index 2aa7a7d20..471e892a3 100644
--- a/network/node/register.go
+++ b/network/node/register.go
@@ -194,12 +194,10 @@ func requestNonce(sender *gateway.Sender, comms RegisterNodeCommsInterface, gwId
 				TimeStamp: registrationTimestampNano,
 			})
 		if err != nil {
-			errMsg := fmt.Sprintf("Register: Failed requesting nonce from gateway: %+v", err)
-			return nil, errors.New(errMsg)
+			return nil, errors.WithMessage(err,"Register: Failed requesting nonce from gateway")
 		}
 		if nonceResponse.Error != "" {
-			err := errors.New(fmt.Sprintf("requestNonce: nonceResponse error: %s", nonceResponse.Error))
-			return nil, err
+			return nil, errors.WithMessage(err,"requestNonce: nonceResponse error")
 		}
 		return nonceResponse, nil
 	}, stop)
@@ -207,6 +205,7 @@ func requestNonce(sender *gateway.Sender, comms RegisterNodeCommsInterface, gwId
 	if err != nil {
 		return nil, nil, err
 	}
+
 	nonceResponse := result.(*pb.Nonce)
 
 	// Use Client keypair to sign Server nonce
diff --git a/network/roundTracking.go b/network/roundTracking.go
index eeeeff42c..650747e1c 100644
--- a/network/roundTracking.go
+++ b/network/roundTracking.go
@@ -8,6 +8,7 @@ package network
 
 import (
 	"fmt"
+	jww "github.com/spf13/jwalterweatherman"
 	"gitlab.com/xx_network/primitives/id"
 	"sort"
 	"sync"
@@ -18,7 +19,8 @@ type RoundState uint8
 const (
 	Unchecked = iota
 	Unknown
-	Checked
+	NoMessageAvailable
+	MessageAvailable
 	Abandoned
 )
 
@@ -28,8 +30,10 @@ func (rs RoundState) String() string {
 		return "Unchecked"
 	case Unknown:
 		return "Unknown"
-	case Checked:
-		return "Checked"
+	case MessageAvailable:
+		return "Message Available"
+	case NoMessageAvailable:
+		return "No Message Available"
 	case Abandoned:
 		return "Abandoned"
 	default:
@@ -51,12 +55,16 @@ func NewRoundTracker() *RoundTracker {
 func (rt *RoundTracker) denote(rid id.Round, state RoundState) {
 	rt.mux.Lock()
 	defer rt.mux.Unlock()
+	if storedState, exists := rt.state[rid]; exists || storedState > state {
+		return
+	}
 	rt.state[rid] = state
 }
 
 func (rt *RoundTracker) String() string {
 	rt.mux.Lock()
 	defer rt.mux.Unlock()
+	jww.DEBUG.Printf("Debug Printing status of %d rounds", len(rt.state))
 	keys := make([]int, 0, len(rt.state))
 	for key := range rt.state {
 		keys = append(keys, int(key))
diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go
index caf5ce0e7..bd7f1d682 100644
--- a/network/rounds/retrieve.go
+++ b/network/rounds/retrieve.go
@@ -143,7 +143,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round,
 	stop *stoppable.Single) (message.Bundle, error) {
 	start := time.Now()
 	// Send to the gateways using backup proxies
-	result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, bool, error) {
+	result, err := m.sender.SendToPreferred(gwIds, func(host *connect.Host, target *id.ID) (interface{}, error) {
 		jww.DEBUG.Printf("Trying to get messages for round %v for ephemeralID %d (%v)  "+
 			"via Gateway: %s", roundID, identity.EphId.Int64(), identity.Source.String(), host.GetId())
 
@@ -158,10 +158,10 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round,
 		msgResp, err := comms.RequestMessages(host, msgReq)
 		if err == nil && !msgResp.GetHasRound() {
 			jww.INFO.Printf("No round error for round %d received from %s", roundID, target)
-			return message.Bundle{}, false, errors.Errorf(noRoundError, roundID)
+			return message.Bundle{}, errors.Errorf(noRoundError, roundID)
 		}
 
-		return msgResp, false, err
+		return msgResp, err
 	}, stop)
 	jww.INFO.Printf("Received message for round %d, processing...", roundID)
 	// Fail the round if an error occurs so it can be tried again later
diff --git a/single/manager_test.go b/single/manager_test.go
index ac5dd4253..9657e47b4 100644
--- a/single/manager_test.go
+++ b/single/manager_test.go
@@ -287,6 +287,10 @@ func (tnm *testNetworkManager) SendE2E(message.Send, params.E2E, *stoppable.Sing
 	return nil, e2e.MessageID{}, time.Time{}, nil
 }
 
+func (tnm *testNetworkManager) GetVerboseRounds()string{
+	return ""
+}
+
 func (tnm *testNetworkManager) SendUnsafe(_ message.Send, _ params.Unsafe) ([]id.Round, error) {
 	return []id.Round{}, nil
 }
-- 
GitLab