Skip to content
Snippets Groups Projects
Commit ccaf880f authored by benjamin's avatar benjamin
Browse files

added the fast sending tracker and implemented its logic in send

parent d0e4fab3
Branches
Tags
3 merge requests!510Release,!424Hotfix/fast sending,!340Project/channels
package attempts
import (
"sort"
"sync"
"sync/atomic"
)
const maxHistogramSize = 100
const minElements = 3
const percentileNumerator = 66
const percentileDenominator = 99
const percentileDenominatorOffset = 49
type SendAttemptTracker interface {
SubmitProbeAttempt(numAttemptsUntilSuccessful int)
GetOptimalNumAttempts() (attempts int, ready bool)
}
type sendAttempts struct {
lock sync.Mutex
numAttempts []int
currentIndex int
isFull bool
optimalAttempts *int32
}
func NewSendAttempts() SendAttemptTracker {
optimalAttempts := int32(-1)
sa := &sendAttempts{
numAttempts: make([]int, maxHistogramSize),
currentIndex: 0,
isFull: false,
optimalAttempts: &optimalAttempts,
}
return sa
}
func (sa *sendAttempts) SubmitProbeAttempt(a int) {
sa.lock.Lock()
defer sa.lock.Unlock()
sa.numAttempts[sa.currentIndex] = a
sa.currentIndex += 1
if sa.currentIndex == len(sa.numAttempts) {
sa.currentIndex = 0
sa.isFull = true
}
sa.computeOptimalUnsafe()
}
func (sa *sendAttempts) GetOptimalNumAttempts() (attempts int, ready bool) {
optimalAttempts := atomic.LoadInt32(sa.optimalAttempts)
if optimalAttempts == -1 {
return 0, false
}
return int(optimalAttempts), true
}
func (sa *sendAttempts) computeOptimalUnsafe() {
toCopy := maxHistogramSize
if !sa.isFull {
if sa.currentIndex < minElements {
return
}
toCopy = sa.currentIndex
}
histoCopy := make([]int, toCopy)
copy(histoCopy, sa.numAttempts[:toCopy])
sort.Slice(histoCopy, func(i, j int) bool {
return histoCopy[i] < histoCopy[j]
})
optimal := histoCopy[((toCopy*percentileNumerator)+percentileDenominatorOffset)/percentileDenominator]
atomic.StoreInt32(sa.optimalAttempts, int32(optimal))
}
......@@ -11,6 +11,7 @@ package cmix
// and intra-client state are accessible through the context object.
import (
"gitlab.com/elixxir/client/cmix/attempts"
"gitlab.com/elixxir/client/cmix/clockSkew"
"gitlab.com/xx_network/primitives/netTime"
"math"
......@@ -75,6 +76,7 @@ type client struct {
identity.Tracker
health.Monitor
crit *critical
attemptTracker attempts.SendAttemptTracker
// Earliest tracked round
earliestRound *uint64
......@@ -114,6 +116,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session,
comms: comms,
maxMsgLen: tmpMsg.ContentsSize(),
skewTracker: clockSkew.New(params.ClockSkewClamp),
attemptTracker: attempts.NewSendAttempts(),
}
if params.VerboseRoundTracking {
......@@ -201,7 +204,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error {
}
r, eid, _, sendErr := sendCmixHelper(c.Sender, compiler, recipient, params, c.instance,
c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
c.session.GetTransmissionID(), c.comms)
c.session.GetTransmissionID(), c.comms, c.attemptTracker)
return r, eid, sendErr
}
......
......@@ -219,6 +219,10 @@ type CMIXParams struct {
// should only be used in cases where repeats cannot be different. Only used
// in sendCmix, not sendManyCmix.
Critical bool
// Probe tells the client that this send can be used to test network performance,
// that outgoing latency is not important
Probe bool
}
// cMixParamsDisk will be the marshal-able and umarshal-able object.
......@@ -242,6 +246,7 @@ func GetDefaultCMIXParams() CMIXParams {
// Unused stoppable so components that require one have a channel to
// wait on
Stop: stoppable.NewSingle("cmixParamsDefault"),
Probe: false,
}
}
......
......@@ -9,6 +9,7 @@ package cmix
import (
"fmt"
"gitlab.com/elixxir/client/cmix/attempts"
"gitlab.com/elixxir/client/cmix/rounds"
"gitlab.com/elixxir/primitives/states"
"strings"
......@@ -132,7 +133,7 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler,
r, ephID, msg, rtnErr := sendCmixHelper(c.Sender, assemblerFunc, recipient, cmixParams,
c.instance, c.session.GetCmixGroup(), c.Registrar, c.rng, c.events,
c.session.GetTransmissionID(), c.comms)
c.session.GetTransmissionID(), c.comms, c.attemptTracker)
if cmixParams.Critical {
c.crit.handle(msg, recipient, r.ID, rtnErr)
......@@ -154,7 +155,7 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler,
func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient *id.ID,
cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group,
nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Reporter,
senderId *id.ID, comms SendCmixCommsInterface) (rounds.Round, ephemeral.Id, format.Message, error) {
senderId *id.ID, comms SendCmixCommsInterface, attemptTracker attempts.SendAttemptTracker) (rounds.Round, ephemeral.Id, format.Message, error) {
if cmixParams.RoundTries == 0 {
return rounds.Round{}, ephemeral.Id{}, format.Message{},
......@@ -179,6 +180,16 @@ func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient
defer stream.Close()
numAttempts := 0
if !cmixParams.Probe {
optimalAttempts, ready := attemptTracker.GetOptimalNumAttempts()
if ready {
numAttempts = optimalAttempts
} else {
numAttempts = 4
}
} else {
defer attemptTracker.SubmitProbeAttempt(numAttempts)
}
for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries,
numAttempts = numRoundTries+1, numAttempts+1 {
......
......@@ -41,7 +41,7 @@ type SendCmixCommsInterface interface {
}
// How much in the future a round needs to be to send to it
const sendTimeBuffer = 500 * time.Millisecond
const sendTimeBuffer = 150 * time.Millisecond
const unrecoverableError = "failed with an unrecoverable error"
// handlePutMessageError handles errors received from a PutMessage or a
......
......@@ -125,6 +125,7 @@ func (m *Manager) sendMessage(index, totalMessages int, rng csprng.Source) error
// Send message
p := cmix.GetDefaultCMIXParams()
p.Probe = true
_, _, err = m.net.Send(recipient, fp, service, payload, mac, p)
if err != nil {
return errors.Errorf("Failed to send message: %+v", err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment