diff --git a/cmix/attempts/histrogram.go b/cmix/attempts/histrogram.go new file mode 100644 index 0000000000000000000000000000000000000000..52bf207a48345818f0f1900d5c69b45e0259d7d8 --- /dev/null +++ b/cmix/attempts/histrogram.go @@ -0,0 +1,84 @@ +package attempts + +import ( + "sort" + "sync" + "sync/atomic" +) + +const maxHistogramSize = 100 +const minElements = 3 +const percentileNumerator = 66 +const percentileDenominator = 99 +const percentileDenominatorOffset = 49 + +type SendAttemptTracker interface { + SubmitProbeAttempt(numAttemptsUntilSuccessful int) + GetOptimalNumAttempts() (attempts int, ready bool) +} + +type sendAttempts struct { + lock sync.Mutex + numAttempts []int + currentIndex int + isFull bool + + optimalAttempts *int32 +} + +func NewSendAttempts() SendAttemptTracker { + optimalAttempts := int32(-1) + + sa := &sendAttempts{ + numAttempts: make([]int, maxHistogramSize), + currentIndex: 0, + isFull: false, + optimalAttempts: &optimalAttempts, + } + return sa +} + +func (sa *sendAttempts) SubmitProbeAttempt(a int) { + sa.lock.Lock() + defer sa.lock.Unlock() + + sa.numAttempts[sa.currentIndex] = a + sa.currentIndex += 1 + if sa.currentIndex == len(sa.numAttempts) { + sa.currentIndex = 0 + sa.isFull = true + } + + sa.computeOptimalUnsafe() +} + +func (sa *sendAttempts) GetOptimalNumAttempts() (attempts int, ready bool) { + optimalAttempts := atomic.LoadInt32(sa.optimalAttempts) + + if optimalAttempts == -1 { + return 0, false + } + + return int(optimalAttempts), true +} + +func (sa *sendAttempts) computeOptimalUnsafe() { + toCopy := maxHistogramSize + if !sa.isFull { + if sa.currentIndex < minElements { + return + } + toCopy = sa.currentIndex + + } + + histoCopy := make([]int, toCopy) + copy(histoCopy, sa.numAttempts[:toCopy]) + + sort.Slice(histoCopy, func(i, j int) bool { + return histoCopy[i] < histoCopy[j] + }) + + optimal := histoCopy[((toCopy*percentileNumerator)+percentileDenominatorOffset)/percentileDenominator] + atomic.StoreInt32(sa.optimalAttempts, int32(optimal)) +} diff --git a/cmix/client.go b/cmix/client.go index 406fe0181350274152195b540978d73744f9a81b..bc7eaeaecc7a6a8fdb28f189e2d9ad9488870f2d 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -11,6 +11,7 @@ package cmix // and intra-client state are accessible through the context object. import ( + "gitlab.com/elixxir/client/cmix/attempts" "gitlab.com/elixxir/client/cmix/clockSkew" "gitlab.com/xx_network/primitives/netTime" "math" @@ -74,7 +75,8 @@ type client struct { address.Space identity.Tracker health.Monitor - crit *critical + crit *critical + attemptTracker attempts.SendAttemptTracker // Earliest tracked round earliestRound *uint64 @@ -105,15 +107,16 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, // Create client object c := &client{ - param: params, - tracker: &tracker, - events: events, - earliestRound: &earliest, - session: session, - rng: rng, - comms: comms, - maxMsgLen: tmpMsg.ContentsSize(), - skewTracker: clockSkew.New(params.ClockSkewClamp), + param: params, + tracker: &tracker, + events: events, + earliestRound: &earliest, + session: session, + rng: rng, + comms: comms, + maxMsgLen: tmpMsg.ContentsSize(), + skewTracker: clockSkew.New(params.ClockSkewClamp), + attemptTracker: attempts.NewSendAttempts(), } if params.VerboseRoundTracking { @@ -201,7 +204,7 @@ func (c *client) initialize(ndf *ndf.NetworkDefinition) error { } r, eid, _, sendErr := sendCmixHelper(c.Sender, compiler, recipient, params, c.instance, c.session.GetCmixGroup(), c.Registrar, c.rng, c.events, - c.session.GetTransmissionID(), c.comms) + c.session.GetTransmissionID(), c.comms, c.attemptTracker) return r, eid, sendErr } diff --git a/cmix/gateway/defaults.go b/cmix/gateway/defaults.go new file mode 100644 index 0000000000000000000000000000000000000000..9e19c49cf3577888da2c31daf5bed9d8eabf52fd --- /dev/null +++ b/cmix/gateway/defaults.go @@ -0,0 +1,9 @@ +//go:build !js || !wasm +// +build !js !wasm + +// This file is compiled for all architectures except WebAssembly. +package gateway + +const ( + MaxPoolSize = 20 +) diff --git a/cmix/gateway/defaults_js.go b/cmix/gateway/defaults_js.go new file mode 100644 index 0000000000000000000000000000000000000000..8eb93858dc95aefd6506ab7bed520f83ab274632 --- /dev/null +++ b/cmix/gateway/defaults_js.go @@ -0,0 +1,5 @@ +package gateway + +const ( + MaxPoolSize = 7 +) diff --git a/cmix/gateway/hostPool.go b/cmix/gateway/hostPool.go index d39e71596ef644a444f5cd6056e97e9a10ea3f39..31f9ba3ca400705a2ddf82268bc6e0b5f4d1c266 100644 --- a/cmix/gateway/hostPool.go +++ b/cmix/gateway/hostPool.go @@ -128,7 +128,7 @@ type poolParamsDisk struct { // DefaultPoolParams returns a default set of PoolParams. func DefaultPoolParams() PoolParams { p := PoolParams{ - MaxPoolSize: 5, + MaxPoolSize: MaxPoolSize, ProxyAttempts: 5, PoolSize: 0, MaxPings: 0, diff --git a/cmix/params.go b/cmix/params.go index d933dbc9df696105de2cccc9aba8bae98f78625d..ff6418328203b2b728b62c0a96bf4101e9b6d686 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -219,6 +219,10 @@ type CMIXParams struct { // should only be used in cases where repeats cannot be different. Only used // in sendCmix, not sendManyCmix. Critical bool + + // Probe tells the client that this send can be used to test network performance, + // that outgoing latency is not important + Probe bool } // cMixParamsDisk will be the marshal-able and umarshal-able object. @@ -241,7 +245,8 @@ func GetDefaultCMIXParams() CMIXParams { DebugTag: DefaultDebugTag, // Unused stoppable so components that require one have a channel to // wait on - Stop: stoppable.NewSingle("cmixParamsDefault"), + Stop: stoppable.NewSingle("cmixParamsDefault"), + Probe: false, } } diff --git a/cmix/sendCmix.go b/cmix/sendCmix.go index bc9891a4abfea507ebe4f18672fbd72ab8270a3c..7cb72ec57262a9e6a84e7e39b508b67e06543e4d 100644 --- a/cmix/sendCmix.go +++ b/cmix/sendCmix.go @@ -9,7 +9,9 @@ package cmix import ( "fmt" + "gitlab.com/elixxir/client/cmix/attempts" "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/primitives/states" "strings" "time" @@ -131,7 +133,7 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler, r, ephID, msg, rtnErr := sendCmixHelper(c.Sender, assemblerFunc, recipient, cmixParams, c.instance, c.session.GetCmixGroup(), c.Registrar, c.rng, c.events, - c.session.GetTransmissionID(), c.comms) + c.session.GetTransmissionID(), c.comms, c.attemptTracker) if cmixParams.Critical { c.crit.handle(msg, recipient, r.ID, rtnErr) @@ -153,7 +155,7 @@ func (c *client) sendWithAssembler(recipient *id.ID, assembler MessageAssembler, func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient *id.ID, cmixParams CMIXParams, instance *network.Instance, grp *cyclic.Group, nodes nodes.Registrar, rng *fastRNG.StreamGenerator, events event.Reporter, - senderId *id.ID, comms SendCmixCommsInterface) (rounds.Round, ephemeral.Id, format.Message, error) { + senderId *id.ID, comms SendCmixCommsInterface, attemptTracker attempts.SendAttemptTracker) (rounds.Round, ephemeral.Id, format.Message, error) { if cmixParams.RoundTries == 0 { return rounds.Round{}, ephemeral.Id{}, format.Message{}, @@ -171,14 +173,33 @@ func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient attempted = excludedRounds.NewSet() } - jww.INFO.Printf("[Send-%s] Looking for round to send cMix message to "+ - "%s", cmixParams.DebugTag, recipient) - stream := rng.GetStream() defer stream.Close() - for numRoundTries := uint( - 0); numRoundTries < cmixParams.RoundTries; numRoundTries++ { + numAttempts := 0 + if !cmixParams.Probe { + optimalAttempts, ready := attemptTracker.GetOptimalNumAttempts() + if ready { + numAttempts = optimalAttempts + jww.INFO.Printf("[Send-%s] Looking for round to send cMix message to "+ + "%s, sending non probe with %d optimalAttempts", cmixParams.DebugTag, recipient, numAttempts) + } else { + numAttempts = 4 + jww.INFO.Printf("[Send-%s] Looking for round to send cMix message to "+ + "%s, sending non probe with %d non optimalAttempts, insufficient data", + cmixParams.DebugTag, recipient, numAttempts) + } + } else { + jww.INFO.Printf("[Send-%s] Looking for round to send cMix message to "+ + "%s, sending probe with %d Attempts, insufficient data", + cmixParams.DebugTag, recipient, numAttempts) + defer attemptTracker.SubmitProbeAttempt(numAttempts) + } + + jww.INFO.Printf("") + + for numRoundTries := uint(0); numRoundTries < cmixParams.RoundTries; numRoundTries, + numAttempts = numRoundTries+1, numAttempts+1 { elapsed := netTime.Since(timeStart) jww.TRACE.Printf("[Send-%s] try %d, elapsed: %s", cmixParams.DebugTag, numRoundTries, elapsed) @@ -196,11 +217,12 @@ func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient numRoundTries+1, recipient) } + startSearch := netTime.Now() // Find the best round to send to, excluding attempted rounds remainingTime := cmixParams.Timeout - elapsed waitingRounds := instance.GetWaitingRounds() - bestRound, err := waitingRounds.GetUpcomingRealtime( - remainingTime, attempted, sendTimeBuffer) + bestRound, _, err := waitingRounds.GetUpcomingRealtime( + remainingTime, attempted, numAttempts, sendTimeBuffer) if err != nil { jww.WARN.Printf("[Send-%s] failed to GetUpcomingRealtime: "+ "%+v", cmixParams.DebugTag, err) @@ -213,8 +235,8 @@ func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient continue } - jww.TRACE.Printf("[Send-%s] Best round found: %+v", - cmixParams.DebugTag, bestRound) + jww.DEBUG.Printf("[Send-%s] Best round found, took %s: %d", + cmixParams.DebugTag, netTime.Since(startSearch), bestRound.ID) // Determine whether the selected round contains any // nodes that are blacklisted by the CMIXParams object @@ -269,10 +291,13 @@ func sendCmixHelper(sender gateway.Sender, assembler messageAssembler, recipient return rounds.Round{}, ephemeral.Id{}, format.Message{}, err } + timeRoundStart := time.Unix(0, int64(bestRound.Timestamps[states.QUEUED])) + jww.INFO.Printf("[Send-%s] Sending to EphID %d (%s), on round %d "+ - "(msgDigest: %s, ecrMsgDigest: %s) via gateway %s", - cmixParams.DebugTag, ephID.Int64(), recipient, bestRound.ID, - msg.Digest(), encMsg.Digest(), firstGateway.String()) + "(msgDigest: %s, ecrMsgDigest: %s) via gateway %s starting "+ + "at %s (%s in the future)", cmixParams.DebugTag, ephID.Int64(), + recipient, bestRound.ID, msg.Digest(), encMsg.Digest(), + firstGateway.String(), timeRoundStart, netTime.Until(timeRoundStart)) // Send the payload sendFunc := func(host *connect.Host, target *id.ID, diff --git a/cmix/sendCmixUtils.go b/cmix/sendCmixUtils.go index 146e63117d5cde18c1ee8d9dc794156eded72ad1..e2d8ea178920d52c3fd3dcfbd78b4386896d5b75 100644 --- a/cmix/sendCmixUtils.go +++ b/cmix/sendCmixUtils.go @@ -41,7 +41,7 @@ type SendCmixCommsInterface interface { } // How much in the future a round needs to be to send to it -const sendTimeBuffer = 1000 * time.Millisecond +const sendTimeBuffer = 150 * time.Millisecond const unrecoverableError = "failed with an unrecoverable error" // handlePutMessageError handles errors received from a PutMessage or a diff --git a/cmix/sendManyCmix.go b/cmix/sendManyCmix.go index b9700c0d813a15754713a322b96422e484c24f3e..aaca9e031bb04ceb9f8e62509ac76470f1b00128 100644 --- a/cmix/sendManyCmix.go +++ b/cmix/sendManyCmix.go @@ -161,8 +161,8 @@ func sendManyCmixHelper(sender gateway.Sender, remainingTime := param.Timeout - elapsed // Find the best round to send to, excluding attempted rounds - bestRound, _ := instance.GetWaitingRounds().GetUpcomingRealtime( - remainingTime, attempted, sendTimeBuffer) + bestRound, _, _ := instance.GetWaitingRounds().GetUpcomingRealtime( + remainingTime, attempted, int(numRoundTries), sendTimeBuffer) if bestRound == nil { continue } diff --git a/dummy/mockCmix_test.go b/dummy/mockCmix_test.go index 96bab83384e0d57d84d50c5cac95ce71bc9fb7df..2aca679d0702225c69d01e7449c028ddd57cd7ee 100644 --- a/dummy/mockCmix_test.go +++ b/dummy/mockCmix_test.go @@ -78,8 +78,7 @@ func (m mockCmix) Follow(report cmix.ClientErrorReport) (stoppable.Stoppable, er } func (m mockCmix) GetMaxMessageLength() int { - //TODO implement me - panic("implement me") + return 100 } func (m *mockCmix) SendMany(messages []cmix.TargetedCmixMessage, p cmix.CMIXParams) (rounds.Round, []ephemeral.Id, error) { diff --git a/dummy/random.go b/dummy/random.go index 2a41d8243decc43ae56ae93068c5106a8f9371e7..2f1abc73ec290ead86c142f527e85e8db176a136 100644 --- a/dummy/random.go +++ b/dummy/random.go @@ -49,7 +49,7 @@ func (m *Manager) newRandomCmixMessage(rng csprng.Source) ( } // Generate random message payload - payloadSize := m.store.GetCmixGroup().GetP().ByteLen() + payloadSize := m.net.GetMaxMessageLength() payload, err = newRandomPayload(payloadSize, rng) if err != nil { return nil, format.Fingerprint{}, message.Service{}, nil, nil, @@ -79,13 +79,8 @@ func (m *Manager) newRandomCmixMessage(rng csprng.Source) ( // newRandomPayload generates a random payload of a random length // within the maxPayloadSize. func newRandomPayload(maxPayloadSize int, rng csprng.Source) ([]byte, error) { - // Generate random payload size - randomPayloadSize, err := randomInt(maxPayloadSize, rng) - if err != nil { - return nil, errors.Errorf(payloadSizeRngErr, err) - } - randomMsg, err := csprng.Generate(randomPayloadSize, rng) + randomMsg, err := csprng.Generate(maxPayloadSize, rng) if err != nil { return nil, err } diff --git a/dummy/random_test.go b/dummy/random_test.go index fab84c22fc0e552931026ec0f27cc0c07e511b32..a4336983df79a8a33e3f95e56763d85728962d6c 100644 --- a/dummy/random_test.go +++ b/dummy/random_test.go @@ -75,16 +75,16 @@ func Test_durationRng_Consistency(t *testing.T) { // when using a PRNG and that the result is not larger than the max payload. func Test_newRandomPayload_Consistency(t *testing.T) { expectedPayloads := []string{ - "l7ufS7Ry6J9bFITyUgnJ", - "Ut/Xm012Qpthegyfnw07pVsMwNYUTIiFNQ==", - "CD9h", - "GSnh", - "joE=", - "uoQ+6NY+jE/+HOvqVG2PrBPdGqwEzi6ih3xVec+ix44bC6+uiBuCpw==", - "qkNGWnhiBhaXiu0M48bE8657w+BJW1cS/v2+DBAoh+EA2s0tiF9pLLYH2gChHBxwcec=", - "suEpcF4nPwXJIyaCjisFbg==", - "R/3zREEO1MEWAj+o41drb+0n/4l0usDK/ZrQVpKxNhnnOJZN/ceejVNDc2Yc/WbXTw==", - "bkt1IQ==", + "U4x/lrFkvxuXu59LtHLon1sUhPJSCcnZND6SugndnVLf15tNdkKbYXoMn58NO6VbDMDWFEyIhTWEGsvgcJsHWA==", + "CD9h03W8ArQd9PkZKeGP2p5vguVOdI6B555LvW/jTNy6hD7o1j6MT/4c6+pUbY+sE90arATOLqKHfFV5z6LHjg==", + "GwuvrogbgqdREIpC7TyQPKpDRlp4YgYWl4rtDOPGxPOue8PgSVtXEv79vgwQKIfhANrNLYhfaSy2B9oAoRwccA==", + "ceeWotwtwlpbdLLhKXBeJz8FySMmgo4rBW44F2WOEGFJiUf980RBDtTBFgI/qONXa2/tJ/+JdLrAyv2a0FaSsQ==", + "NhnnOJZN/ceejVNDc2Yc/WbXT+weG4lJGrcjbkt1IWKQzyvrQsPKJzKFYPGqwGfOpui/RtSrK0aAQCxfsoIOiA==", + "XTJg8d6XgoPUoJo2+WwglBdG4+1NpkaprotPp7T8OiC6+hp17TJ6hriww5rxz9KztRIZ6nlTOr9EjSxHnTJgdQ==", + "M5BZFMjMHPCdo54Okp0CSry8sWk5e7c05+8KbgHxhU3rX+Qk/vesIQiR9ZdeKSqiuKoEfGHNszNz6+csJ6CYwA==", + "IZfa5rcyw1HfZo+HTiyfHOCcqGAX5+IXSDA/9BwbI+EcSO0XU51oX3byp5i8ZN4OXbKGSyrTwmzmOCNCdloT1g==", + "luUt92D2w0ZeKaDcpGrDoNVwEzvCFXH19UpkMQVRP9hCmxlK4bqfKoOGrnKzZh/oLCrGTb9GFRgk4jBTEmN8mQ==", + "wrh9bfDdXvKDZxkHLWcvYfqgvob0V5Iew3wORgzw1wPQfcX1ZhpFATNAmnEramar17plIkyiaXjZpc5i/rEagw==", } prng := NewPrng(42) diff --git a/dummy/send.go b/dummy/send.go index ac6b39796add64bbfed8be9ed8c3e3f8687904f8..77b0f4e7de569e36b985c60521c6083b40006ee3 100644 --- a/dummy/send.go +++ b/dummy/send.go @@ -125,6 +125,7 @@ func (m *Manager) sendMessage(index, totalMessages int, rng csprng.Source) error // Send message p := cmix.GetDefaultCMIXParams() + p.Probe = true _, _, err = m.net.Send(recipient, fp, service, payload, mac, p) if err != nil { return errors.Errorf("Failed to send message: %+v", err) diff --git a/go.mod b/go.mod index 560bd5f6066e46252b9ebd75e579aa159e79c5ef..84763a587f505360a7d5060638c6d864861a030d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f - gitlab.com/elixxir/comms v0.0.4-0.20221017173926-4eaa6061dfaa + gitlab.com/elixxir/comms v0.0.4-0.20221024050701-bced94c1b026 gitlab.com/elixxir/crypto v0.0.7-0.20221024012326-cf941c375c1f gitlab.com/elixxir/ekv v0.2.1 gitlab.com/elixxir/primitives v0.0.3-0.20221017172918-6176818d1aba diff --git a/go.sum b/go.sum index 427f82ca79d5ae0a92d4aa61acd455b35d4f1a80..87c84117eb84016d562b5d837059578cec993216 100644 --- a/go.sum +++ b/go.sum @@ -633,6 +633,16 @@ gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f h1:yXGvNBqzZwA gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k= gitlab.com/elixxir/comms v0.0.4-0.20221017173926-4eaa6061dfaa h1:/FEpu0N0rAyq74FkvO3uY8BcQoWLSbVPhj/s5QfscZw= gitlab.com/elixxir/comms v0.0.4-0.20221017173926-4eaa6061dfaa/go.mod h1:rW7xdbHntP2MoF3q+2+f+IR8OHol94MRyviotfR5rXg= +gitlab.com/elixxir/comms v0.0.4-0.20221021234520-a4f94f752e3e h1:Go3Ec+LOm8t6j8wVgI4GqTfuy+PQkyblZCYQU7yGB2E= +gitlab.com/elixxir/comms v0.0.4-0.20221021234520-a4f94f752e3e/go.mod h1:rW7xdbHntP2MoF3q+2+f+IR8OHol94MRyviotfR5rXg= +gitlab.com/elixxir/comms v0.0.4-0.20221023173239-c75420d94293 h1:QIiZYjdtwjBCYaO7dZI6K8cEaA73wtd/YKIWD1oeDWw= +gitlab.com/elixxir/comms v0.0.4-0.20221023173239-c75420d94293/go.mod h1:rW7xdbHntP2MoF3q+2+f+IR8OHol94MRyviotfR5rXg= +gitlab.com/elixxir/comms v0.0.4-0.20221023190124-3441c3fdc3de h1:1YnKkJn3a7xiftFBRqLK5os7C6uF4okMtDXZyLrzIuY= +gitlab.com/elixxir/comms v0.0.4-0.20221023190124-3441c3fdc3de/go.mod h1:rW7xdbHntP2MoF3q+2+f+IR8OHol94MRyviotfR5rXg= +gitlab.com/elixxir/comms v0.0.4-0.20221024012811-e6754f7740db h1:LQUde8pjIfQpVdg7trANu6o5uzZv9ADK13S8bkMgkBw= +gitlab.com/elixxir/comms v0.0.4-0.20221024012811-e6754f7740db/go.mod h1:NevrBdsi5wJvitUeMsid3xI1FrzzuzfxKy4Bapnhzao= +gitlab.com/elixxir/comms v0.0.4-0.20221024050701-bced94c1b026 h1:CdqvzyM91wN6u4MmGj0n+gKO/0tJabWPN3EQ4SFsZsg= +gitlab.com/elixxir/comms v0.0.4-0.20221024050701-bced94c1b026/go.mod h1:NevrBdsi5wJvitUeMsid3xI1FrzzuzfxKy4Bapnhzao= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= gitlab.com/elixxir/crypto v0.0.7-0.20221017173452-565da4101a3b/go.mod h1:1rftbwSVdy49LkBIkPr+w+P2mDOerYeBKoZuB3r0yqI=