diff --git a/cmix/timeTracker/timeTracker.go b/cmix/timeTracker/timeTracker.go new file mode 100644 index 0000000000000000000000000000000000000000..67b0017a9051d8a5eefb7b2011ab2a0d8016b58a --- /dev/null +++ b/cmix/timeTracker/timeTracker.go @@ -0,0 +1,126 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +// package timeTracker tracks local clock skew relative to gateways. +package timeTracker + +import ( + "sync" + "time" + + "gitlab.com/xx_network/primitives/id" +) + +const maxHistogramSize = 50 + +// TimeOffsetTracker tracks local clock skew relative to various +// gateways. +type TimeOffsetTracker interface { + // Add additional data to our aggregate clock skews. + Add(gwID *id.ID, startTime, rTs time.Time, rtt, gwD time.Duration) + + // Aggregate returns the average of the last n offsets. + Aggregate() time.Duration +} + +// gatewayDelays is a helper type used by the timeOffsetTracker below +// to keep track of the last maxHistogramSize number of durations. +type gatewayDelays struct { + lock sync.RWMutex + delays []*time.Duration + currentIndex int +} + +func newGatewayDelays() *gatewayDelays { + return &gatewayDelays{ + delays: make([]*time.Duration, maxHistogramSize), + currentIndex: 0, + } +} + +func (g *gatewayDelays) Add(d time.Duration) { + g.lock.Lock() + defer g.lock.Unlock() + + g.delays[g.currentIndex] = &d + g.currentIndex += 1 + if g.currentIndex == len(g.delays) { + g.currentIndex = 0 + } +} + +func (g *gatewayDelays) Average() time.Duration { + g.lock.RLock() + defer g.lock.RUnlock() + return average(g.delays) +} + +// timeOffsetTracker implements the TimeOffsetTracker +type timeOffsetTracker struct { + gatewayClockDelays *sync.Map // id.ID -> *gatewayDelays + + lock sync.RWMutex + offsets []*time.Duration + currentIndex int +} + +// New returns an implementation of TimeOffsetTracker. +func New() TimeOffsetTracker { + t := &timeOffsetTracker{ + gatewayClockDelays: new(sync.Map), + offsets: make([]*time.Duration, maxHistogramSize), + currentIndex: 0, + } + return t +} + +// Add implements the Add method of the TimeOffsetTracker interface. +func (t *timeOffsetTracker) Add(gwID *id.ID, startTime, rTs time.Time, rtt, gwD time.Duration) { + delay := rtt/2 - gwD + + delays, _ := t.gatewayClockDelays.LoadOrStore(*gwID, newGatewayDelays()) + + gwdelays := delays.(*gatewayDelays) + gwdelays.Add(delay) + gwDelay := gwdelays.Average() + + offset := startTime.Sub(rTs.Add(-gwDelay)) + t.addOffset(offset) +} + +func (t *timeOffsetTracker) addOffset(offset time.Duration) { + t.lock.Lock() + defer t.lock.Unlock() + + t.offsets[t.currentIndex] = &offset + t.currentIndex += 1 + if t.currentIndex == len(t.offsets) { + t.currentIndex = 0 + } +} + +// Aggregate implements the Aggregate method fo the TimeOffsetTracker interface. +func (t *timeOffsetTracker) Aggregate() time.Duration { + t.lock.RLock() + defer t.lock.RUnlock() + + return average(t.offsets) +} + +func average(durations []*time.Duration) time.Duration { + sum := int64(0) + count := int64(0) + for i := 0; i < len(durations); i++ { + if durations[i] == nil { + break + } + sum += int64(*durations[i]) + count += 1 + } + + return time.Duration(sum / count) +} diff --git a/cmix/timeTracker/timeTracker_test.go b/cmix/timeTracker/timeTracker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8d2293e15c7a05ecf88cf21845dd55dd07144fc1 --- /dev/null +++ b/cmix/timeTracker/timeTracker_test.go @@ -0,0 +1,84 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2022 xx foundation // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file. // +//////////////////////////////////////////////////////////////////////////////// + +// package timeTracker tracks local clock skew relative to gateways. +package timeTracker + +import ( + "crypto/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/xx_network/primitives/id" +) + +func TestTimeTrackerSmokeTest(t *testing.T) { + tracker := New() + gwID := &id.ID{} + _, err := rand.Read(gwID[:]) + require.NoError(t, err) + + startTime := time.Now().AddDate(0, 0, -1) // this time yesterday + rTs := startTime.Add(time.Second * 10) + rtt := time.Second * 10 + gwD := time.Second * 3 + + tracker.Add(gwID, startTime, rTs, rtt, gwD) + tracker.Add(gwID, startTime, rTs, rtt, gwD) + tracker.Add(gwID, startTime, rTs, rtt, gwD) + + aggregate := tracker.Aggregate() + + t.Logf("aggregate: %v", aggregate) +} + +func TestAverage(t *testing.T) { + t1 := time.Duration(int64(10)) + t2 := time.Duration(int64(20)) + t3 := time.Duration(int64(30)) + t4 := time.Duration(int64(1000)) + durations := make([]*time.Duration, 100) + durations[0] = &t1 + durations[1] = &t2 + durations[2] = &t3 + durations[3] = &t4 + avg := average(durations) + require.Equal(t, int(avg), 265) +} + +func TestGatewayDelayAverage(t *testing.T) { + t1 := time.Duration(int64(10)) + t2 := time.Duration(int64(20)) + t3 := time.Duration(int64(30)) + t4 := time.Duration(int64(1000)) + gwDelays := newGatewayDelays() + gwDelays.Add(t1) + gwDelays.Add(t2) + gwDelays.Add(t3) + gwDelays.Add(t4) + avg := gwDelays.Average() + require.Equal(t, int(avg), 265) +} + +func TestAddOffset(t *testing.T) { + tracker := &timeOffsetTracker{ + gatewayClockDelays: new(sync.Map), + offsets: make([]*time.Duration, maxHistogramSize), + currentIndex: 0, + } + offset := time.Second * 10 + + for i := 0; i < maxHistogramSize-1; i++ { + tracker.addOffset(offset) + require.Equal(t, i+1, tracker.currentIndex) + } + tracker.addOffset(offset) + require.Equal(t, 0, tracker.currentIndex) +}