From ccaf880fa9bd5fcfda59ae5bc445c4b41b28af42 Mon Sep 17 00:00:00 2001
From: benjamin <ben@elixxir.io>
Date: Sun, 23 Oct 2022 16:19:35 -0700
Subject: [PATCH] added the fast sending tracker and implemented its logic in
 send

---
 cmix/attempts/histrogram.go | 84 +++++++++++++++++++++++++++++++++++++
 cmix/client.go              | 25 ++++++-----
 cmix/params.go              |  7 +++-
 cmix/sendCmix.go            | 15 ++++++-
 cmix/sendCmixUtils.go       |  2 +-
 dummy/send.go               |  1 +
 6 files changed, 119 insertions(+), 15 deletions(-)
 create mode 100644 cmix/attempts/histrogram.go

diff --git a/cmix/attempts/histrogram.go b/cmix/attempts/histrogram.go
new file mode 100644
index 000000000..52bf207a4
--- /dev/null
+++ b/cmix/attempts/histrogram.go
@@ -0,0 +1,84 @@
+package attempts
+
+import (
+	"sort"
+	"sync"
+	"sync/atomic"
+)
+
+const maxHistogramSize = 100
+const minElements = 3
+const percentileNumerator = 66
+const percentileDenominator = 99
+const percentileDenominatorOffset = 49
+
+type SendAttemptTracker interface {
+	SubmitProbeAttempt(numAttemptsUntilSuccessful int)
+	GetOptimalNumAttempts() (attempts int, ready bool)
+}
+
+type sendAttempts struct {
+	lock         sync.Mutex
+	numAttempts  []int
+	currentIndex int
+	isFull       bool
+
+	optimalAttempts *int32
+}
+
+func NewSendAttempts() SendAttemptTracker {
+	optimalAttempts := int32(-1)
+
+	sa := &sendAttempts{
+		numAttempts:     make([]int, maxHistogramSize),
+		currentIndex:    0,
+		isFull:          false,
+		optimalAttempts: &optimalAttempts,
+	}
+	return sa
+}
+
+func (sa *sendAttempts) SubmitProbeAttempt(a int) {
+	sa.lock.Lock()
+	defer sa.lock.Unlock()
+
+	sa.numAttempts[sa.currentIndex] = a
+	sa.currentIndex += 1
+	if sa.currentIndex == len(sa.numAttempts) {
+		sa.currentIndex = 0
+		sa.isFull = true
+	}
+
+	sa.computeOptimalUnsafe()
+}
+
+func (sa *sendAttempts) GetOptimalNumAttempts() (attempts int, ready bool) {
+	optimalAttempts := atomic.LoadInt32(sa.optimalAttempts)
+
+	if optimalAttempts == -1 {
+		return 0, false
+	}
+
+	return int(optimalAttempts), true
+}
+
+func (sa *sendAttempts) computeOptimalUnsafe() {
+	toCopy := maxHistogramSize
+	if !sa.isFull {
+		if sa.currentIndex < minElements {
+			return
+		}
+		toCopy = sa.currentIndex
+
+	}
+
+	histoCopy := make([]int, toCopy)
+	copy(histoCopy, sa.numAttempts[:toCopy])
+
+	sort.Slice(histoCopy, func(i, j int) bool {
+		return histoCopy[i] < histoCopy[j]
+	})
+
+	optimal := histoCopy[((toCopy*percentileNumerator)+percentileDenominatorOffset)/percentileDenominator]
+	atomic.StoreInt32(sa.optimalAttempts, int32(optimal))
+}
diff --git a/cmix/client.go b/cmix/client.go
index 406fe0181..bc7eaeaec 100644
--- a/cmix/client.go
+++ b/cmix/client.go
@@ -11,6 +11,7 @@ package cmix
 // and intra-client state are accessible through the context object.
 
 import (
+	"gitlab.com/elixxir/client/cmix/attempts"
 	"gitlab.com/elixxir/client/cmix/clockSkew"
 	"gitlab.com/xx_network/primitives/netTime"
 	"math"
@@ -74,7 +75,8 @@ type client struct {
 	address.Space
 	identity.Tracker
 	health.Monitor
-	crit *critical
+	crit           *critical
+	attemptTracker attempts.SendAttemptTracker
 
 	// Earliest tracked round
 	earliestRound *uint64
@@ -105,15 +107,16 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
 
 	// Create client object
 	c := &client{
-		param:         params,
-		tracker:       &tracker,
-		events:        events,
-		earliestRound: &earliest,
-		session:       session,
-		rng:           rng,
-		comms:         comms,
-		maxMsgLen:     tmpMsg.ContentsSize(),
-		skewTracker:   clockSkew.New(params.ClockSkewClamp),
+		param:          params,
+		tracker:        &tracker,
+		events:         events,
+		earliestRound:  &earliest,
+		session:        session,
+		rng:            rng,
+		comms:          comms,
+		maxMsgLen:      tmpMsg.ContentsSize(),
+		skewTracker:    clockSkew.New(params.ClockSkewClamp),
+		attemptTracker: attempts.NewSendAttempts(),
 	}
 
 	if params.VerboseRoundTracking {
@@ -201,7 +204,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
 		}
 		r, eid, _, sendErr := sendCmixHelper(c.Sender, compiler, recipient, params, c.instance,
 			c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
-			c.session.GetTransmissionID(), c.comms)
+			c.session.GetTransmissionID(), c.comms, c.attemptTracker)
 		return r, eid, sendErr
 
 	}
diff --git a/cmix/params.go b/cmix/params.go
index d933dbc9d..ff6418328 100644
--- a/cmix/params.go
+++ b/cmix/params.go
@@ -219,6 +219,10 @@ type CMIXParams struct {
 	// should only be used in cases where repeats cannot be different. Only used
 	// in sendCmix, not sendManyCmix.
 	Critical bool
+
+	// Probe tells the client that this send can be used to test network performance,
+	// that outgoing latency is not important
+	Probe bool
 }
 
 // cMixParamsDisk will be the marshal-able and umarshal-able object.
@@ -241,7 +245,8 @@ func GetDefaultCMIXParams() CMIXParams {
 		DebugTag:    DefaultDebugTag,
 		// Unused stoppable so components that require one have a channel to
 		// wait on
-		Stop: stoppable.NewSingle("cmixParamsDefault"),
+		Stop:  stoppable.NewSingle("cmixParamsDefault"),
+		Probe: false,
 	}
 }
 
diff --git a/cmix/sendCmix.go b/cmix/sendCmix.go
index beff00fc6..3815528ec 100644
--- a/cmix/sendCmix.go
+++ b/cmix/sendCmix.go
@@ -9,6 +9,7 @@ package cmix
 
 import (
 	"fmt"
+	"gitlab.com/elixxir/client/cmix/attempts"
 	"gitlab.com/elixxir/client/cmix/rounds"
 	"gitlab.com/elixxir/primitives/states"
 	"strings"
@@ -132,7 +133,7 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler,
 
 	r, ephID, msg, rtnErr := sendCmixHelper(c.Sender, assemblerFunc, recipient, cmixParams,
 		c.instance, c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
-		c.session.GetTransmissionID(), c.comms)
+		c.session.GetTransmissionID(), c.comms, c.attemptTracker)
 
 	if cmixParams.Critical {
 		c.crit.handle(msg, recipient, r.ID, rtnErr)
@@ -154,7 +155,7 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler,
 func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient *id.ID,
 	cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group,
 	nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Reporter,
-	senderId *id.ID, comms SendCmixCommsInterface) (rounds.Round, ephemeral.Id, format.Message, error) {
+	senderId *id.ID, comms SendCmixCommsInterface, attemptTracker attempts.SendAttemptTracker) (rounds.Round, ephemeral.Id, format.Message, error) {
 
 	if cmixParams.RoundTries == 0 {
 		return rounds.Round{}, ephemeral.Id{}, format.Message{},
@@ -179,6 +180,16 @@ func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient
 	defer stream.Close()
 
 	numAttempts := 0
+	if !cmixParams.Probe {
+		optimalAttempts, ready := attemptTracker.GetOptimalNumAttempts()
+		if ready {
+			numAttempts = optimalAttempts
+		} else {
+			numAttempts = 4
+		}
+	} else {
+		defer attemptTracker.SubmitProbeAttempt(numAttempts)
+	}
 
 	for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries,
 		numAttempts = numRoundTries+1, numAttempts+1 {
diff --git a/cmix/sendCmixUtils.go b/cmix/sendCmixUtils.go
index 09ac505cd..e2d8ea178 100644
--- a/cmix/sendCmixUtils.go
+++ b/cmix/sendCmixUtils.go
@@ -41,7 +41,7 @@ type SendCmixCommsInterface interface {
 }
 
 // How much in the future a round needs to be to send to it
-const sendTimeBuffer = 500 * time.Millisecond
+const sendTimeBuffer = 150 * time.Millisecond
 const unrecoverableError = "failed with an unrecoverable error"
 
 // handlePutMessageError handles errors received from a PutMessage or a
diff --git a/dummy/send.go b/dummy/send.go
index ac6b39796..77b0f4e7d 100644
--- a/dummy/send.go
+++ b/dummy/send.go
@@ -125,6 +125,7 @@ func (m *Manager) sendMessage(index, totalMessages int, rng csprng.Source) error
 
 	// Send message
 	p := cmix.GetDefaultCMIXParams()
+	p.Probe = true
 	_, _, err = m.net.Send(recipient, fp, service, payload, mac, p)
 	if err != nil {
 		return errors.Errorf("Failed to send message: %+v", err)
-- 
GitLab