Skip to content
Snippets Groups Projects
Commit 98cb0269 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

Merge branch 'XX-4228_clock_skew_tracker' into 'project/Channels'

Add rough draft timeTracker

See merge request !404
parents d17d147c d87da450
No related branches found
No related tags found
4 merge requests!510Release,!419rewrote the health tracker to both consider if there are waiting rounds and...,!404Add rough draft timeTracker,!340Project/channels
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
// package timeTracker tracks local clock skew relative to gateways.
package timeTracker
import (
"sync"
"time"
"gitlab.com/xx_network/primitives/id"
)
const maxHistogramSize = 50
// TimeOffsetTracker tracks local clock skew relative to various
// gateways.
type TimeOffsetTracker interface {
// Add additional data to our aggregate clock skews.
Add(gwID *id.ID, startTime, rTs time.Time, rtt, gwD time.Duration)
// Aggregate returns the average of the last n offsets.
Aggregate() time.Duration
}
// gatewayDelays is a helper type used by the timeOffsetTracker below
// to keep track of the last maxHistogramSize number of durations.
type gatewayDelays struct {
lock sync.RWMutex
delays []*time.Duration
currentIndex int
}
func newGatewayDelays() *gatewayDelays {
return &gatewayDelays{
delays: make([]*time.Duration, maxHistogramSize),
currentIndex: 0,
}
}
func (g *gatewayDelays) Add(d time.Duration) {
g.lock.Lock()
defer g.lock.Unlock()
g.delays[g.currentIndex] = &d
g.currentIndex += 1
if g.currentIndex == len(g.delays) {
g.currentIndex = 0
}
}
func (g *gatewayDelays) Average() time.Duration {
g.lock.RLock()
defer g.lock.RUnlock()
return average(g.delays)
}
// timeOffsetTracker implements the TimeOffsetTracker
type timeOffsetTracker struct {
gatewayClockDelays *sync.Map // id.ID -> *gatewayDelays
lock sync.RWMutex
offsets []*time.Duration
currentIndex int
}
// New returns an implementation of TimeOffsetTracker.
func New() TimeOffsetTracker {
t := &timeOffsetTracker{
gatewayClockDelays: new(sync.Map),
offsets: make([]*time.Duration, maxHistogramSize),
currentIndex: 0,
}
return t
}
// Add implements the Add method of the TimeOffsetTracker interface.
func (t *timeOffsetTracker) Add(gwID *id.ID, startTime, rTs time.Time, rtt, gwD time.Duration) {
delay := rtt/2 - gwD
delays, _ := t.gatewayClockDelays.LoadOrStore(*gwID, newGatewayDelays())
gwdelays := delays.(*gatewayDelays)
gwdelays.Add(delay)
gwDelay := gwdelays.Average()
offset := startTime.Sub(rTs.Add(-gwDelay))
t.addOffset(offset)
}
func (t *timeOffsetTracker) addOffset(offset time.Duration) {
t.lock.Lock()
defer t.lock.Unlock()
t.offsets[t.currentIndex] = &offset
t.currentIndex += 1
if t.currentIndex == len(t.offsets) {
t.currentIndex = 0
}
}
// Aggregate implements the Aggregate method fo the TimeOffsetTracker interface.
func (t *timeOffsetTracker) Aggregate() time.Duration {
t.lock.RLock()
defer t.lock.RUnlock()
return average(t.offsets)
}
func average(durations []*time.Duration) time.Duration {
sum := int64(0)
count := int64(0)
for i := 0; i < len(durations); i++ {
if durations[i] == nil {
break
}
sum += int64(*durations[i])
count += 1
}
return time.Duration(sum / count)
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2022 xx foundation //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file. //
////////////////////////////////////////////////////////////////////////////////
// package timeTracker tracks local clock skew relative to gateways.
package timeTracker
import (
"crypto/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/xx_network/primitives/id"
)
func TestTimeTrackerSmokeTest(t *testing.T) {
tracker := New()
gwID := &id.ID{}
_, err := rand.Read(gwID[:])
require.NoError(t, err)
startTime := time.Now().AddDate(0, 0, -1) // this time yesterday
rTs := startTime.Add(time.Second * 10)
rtt := time.Second * 10
gwD := time.Second * 3
tracker.Add(gwID, startTime, rTs, rtt, gwD)
tracker.Add(gwID, startTime, rTs, rtt, gwD)
tracker.Add(gwID, startTime, rTs, rtt, gwD)
aggregate := tracker.Aggregate()
t.Logf("aggregate: %v", aggregate)
}
func TestAverage(t *testing.T) {
t1 := time.Duration(int64(10))
t2 := time.Duration(int64(20))
t3 := time.Duration(int64(30))
t4 := time.Duration(int64(1000))
durations := make([]*time.Duration, 100)
durations[0] = &t1
durations[1] = &t2
durations[2] = &t3
durations[3] = &t4
avg := average(durations)
require.Equal(t, int(avg), 265)
}
func TestGatewayDelayAverage(t *testing.T) {
t1 := time.Duration(int64(10))
t2 := time.Duration(int64(20))
t3 := time.Duration(int64(30))
t4 := time.Duration(int64(1000))
gwDelays := newGatewayDelays()
gwDelays.Add(t1)
gwDelays.Add(t2)
gwDelays.Add(t3)
gwDelays.Add(t4)
avg := gwDelays.Average()
require.Equal(t, int(avg), 265)
}
func TestAddOffset(t *testing.T) {
tracker := &timeOffsetTracker{
gatewayClockDelays: new(sync.Map),
offsets: make([]*time.Duration, maxHistogramSize),
currentIndex: 0,
}
offset := time.Second * 10
for i := 0; i < maxHistogramSize-1; i++ {
tracker.addOffset(offset)
require.Equal(t, i+1, tracker.currentIndex)
}
tracker.addOffset(offset)
require.Equal(t, 0, tracker.currentIndex)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment