diff --git a/cmix/client.go b/cmix/client.go index 71b2c3c206431c3c98eaac08098a36a444d30db0..b83c8425f04776724e715e9a6752f8e3654ef802 100644 --- a/cmix/client.go +++ b/cmix/client.go @@ -11,6 +11,8 @@ package cmix // and intra-client state are accessible through the context object. import ( + "gitlab.com/elixxir/client/cmix/clockSkew" + "gitlab.com/xx_network/primitives/netTime" "math" "strconv" "sync/atomic" @@ -57,6 +59,8 @@ type client struct { comms *commClient.Comms // Contains the network instance instance *commNetwork.Instance + //contains the clock skew tracker + skewTracker clockSkew.Tracker // Parameters of the network param Params @@ -97,6 +101,8 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, tracker := uint64(0) earliest := uint64(0) + netTime.SetTimeSource(localTime{}) + // Create client object c := &client{ param: params, @@ -107,6 +113,7 @@ func NewClient(params Params, comms *commClient.Comms, session storage.Session, rng: rng, comms: comms, maxMsgLen: tmpMsg.ContentsSize(), + skewTracker: clockSkew.New(), } if params.VerboseRoundTracking { diff --git a/cmix/timeTracker/timeTracker.go b/cmix/clockSkew/timeTracker.go similarity index 85% rename from cmix/timeTracker/timeTracker.go rename to cmix/clockSkew/timeTracker.go index 67b0017a9051d8a5eefb7b2011ab2a0d8016b58a..1d9b729a452410e88c36d820eaabd9a56d9847e0 100644 --- a/cmix/timeTracker/timeTracker.go +++ b/cmix/clockSkew/timeTracker.go @@ -5,8 +5,8 @@ // LICENSE file. // //////////////////////////////////////////////////////////////////////////////// -// package timeTracker tracks local clock skew relative to gateways. -package timeTracker +// package clockSkew tracks local clock skew relative to gateways. +package clockSkew import ( "sync" @@ -17,9 +17,9 @@ import ( const maxHistogramSize = 50 -// TimeOffsetTracker tracks local clock skew relative to various +// Tracker tracks local clock skew relative to various // gateways. -type TimeOffsetTracker interface { +type Tracker interface { // Add additional data to our aggregate clock skews. Add(gwID *id.ID, startTime, rTs time.Time, rtt, gwD time.Duration) @@ -59,7 +59,7 @@ func (g *gatewayDelays) Average() time.Duration { return average(g.delays) } -// timeOffsetTracker implements the TimeOffsetTracker +// timeOffsetTracker implements the Tracker type timeOffsetTracker struct { gatewayClockDelays *sync.Map // id.ID -> *gatewayDelays @@ -68,8 +68,8 @@ type timeOffsetTracker struct { currentIndex int } -// New returns an implementation of TimeOffsetTracker. -func New() TimeOffsetTracker { +// New returns an implementation of Tracker. +func New() Tracker { t := &timeOffsetTracker{ gatewayClockDelays: new(sync.Map), offsets: make([]*time.Duration, maxHistogramSize), @@ -78,7 +78,7 @@ func New() TimeOffsetTracker { return t } -// Add implements the Add method of the TimeOffsetTracker interface. +// Add implements the Add method of the Tracker interface. func (t *timeOffsetTracker) Add(gwID *id.ID, startTime, rTs time.Time, rtt, gwD time.Duration) { delay := rtt/2 - gwD @@ -103,7 +103,7 @@ func (t *timeOffsetTracker) addOffset(offset time.Duration) { } } -// Aggregate implements the Aggregate method fo the TimeOffsetTracker interface. +// Aggregate implements the Aggregate method fo the Tracker interface. func (t *timeOffsetTracker) Aggregate() time.Duration { t.lock.RLock() defer t.lock.RUnlock() diff --git a/cmix/timeTracker/timeTracker_test.go b/cmix/clockSkew/timeTracker_test.go similarity index 96% rename from cmix/timeTracker/timeTracker_test.go rename to cmix/clockSkew/timeTracker_test.go index 8d2293e15c7a05ecf88cf21845dd55dd07144fc1..b21be2bb3c96600032da9e7f4470355dd35106da 100644 --- a/cmix/timeTracker/timeTracker_test.go +++ b/cmix/clockSkew/timeTracker_test.go @@ -5,8 +5,8 @@ // LICENSE file. // //////////////////////////////////////////////////////////////////////////////// -// package timeTracker tracks local clock skew relative to gateways. -package timeTracker +// package clockSkew tracks local clock skew relative to gateways. +package clockSkew import ( "crypto/rand" diff --git a/cmix/follow.go b/cmix/follow.go index 4d8688369fc250805f4e35b47aa21179e9177ed6..84bb62b72429d1dd10daa03e16c5810c453e0571 100644 --- a/cmix/follow.go +++ b/cmix/follow.go @@ -57,7 +57,7 @@ const ( type followNetworkComms interface { GetHost(hostId *id.ID) (*connect.Host, bool) SendPoll(host *connect.Host, message *pb.GatewayPoll) ( - *pb.GatewayPollResponse, error) + *pb.GatewayPollResponse, time.Time, time.Duration, error) RequestMessages(host *connect.Host, message *pb.GetMessages) ( *pb.GetMessagesResponse, error) } @@ -125,6 +125,14 @@ func (c *client) followNetwork(report ClientErrorReport, operator) stream.Close() + //update clock skew + estimatedSkew := c.skewTracker.Aggregate() + if estimatedSkew < (-c.param.ClockSkewClamp) || estimatedSkew > c.param.ClockSkewClamp { + netTime.SetOffset(estimatedSkew) + } else { + netTime.SetOffset(0) + } + if err != nil { jww.ERROR.Printf("failed to operate on identities to "+ "track: %s", err) @@ -190,12 +198,20 @@ func (c *client) follow(identity receptionID.IdentityUse, DisableUpdates: !getUpdates, } + var rtt time.Duration + var sendTo *id.ID + var startTime time.Time + result, err := c.SendToAny(func(host *connect.Host) (interface{}, error) { jww.DEBUG.Printf("Executing poll for %v(%s) range: %s-%s(%s) from %s", identity.EphId.Int64(), identity.Source, identity.StartValid, identity.EndValid, identity.EndValid.Sub(identity.StartValid), host.GetId()) - return comms.SendPoll(host, &pollReq) + var err error + var response *pb.GatewayPollResponse + response, startTime, rtt, err = comms.SendPoll(host, &pollReq) + sendTo = host.GetId() + return response, err }, stop) // Exit if the thread has been stopped @@ -222,6 +238,11 @@ func (c *client) follow(identity receptionID.IdentityUse, pollResp := result.(*pb.GatewayPollResponse) + //execute clock skew update + c.skewTracker.Add(sendTo, startTime, + time.Unix(0, pollResp.ReceivedTs), + rtt, time.Duration(pollResp.GatewayDelay)) + // ---- Process Network State Update Data ---- gwRoundsState := &knownRounds.KnownRounds{} err = gwRoundsState.Unmarshal(pollResp.KnownRounds) diff --git a/cmix/localTime.go b/cmix/localTime.go new file mode 100644 index 0000000000000000000000000000000000000000..8e990ac9e455227ad392b1a1ef09d969d73d3b30 --- /dev/null +++ b/cmix/localTime.go @@ -0,0 +1,10 @@ +package cmix + +import "time" + +type localTime struct{} + +func (localTime) NowMs() int64 { + t := time.Now() + return t.UnixNano() / int64(time.Millisecond) +} diff --git a/cmix/params.go b/cmix/params.go index df692139efe88af82753e6a26453a757e5b3a37f..4016f64aebd84f7aade7bf62ee464f2a7bdb694f 100644 --- a/cmix/params.go +++ b/cmix/params.go @@ -62,6 +62,10 @@ type Params struct { // the system will poll in one iteration of the follower MaxParallelIdentityTracks uint + // ClockSkewClamp is the window (+/-) in which clock skew is + // ignored and local time is used + ClockSkewClamp time.Duration + Rounds rounds.Params Pickup pickup.Params Message message.Params @@ -102,6 +106,7 @@ func GetDefaultParams() Params { RealtimeOnly: false, ReplayRequests: true, MaxParallelIdentityTracks: 20, + ClockSkewClamp: 150 * time.Millisecond, } n.Rounds = rounds.GetDefaultParams() n.Pickup = pickup.GetDefaultParams() diff --git a/go.mod b/go.mod index 59c47e65eefef2ecefcbe56c04249bb26a70c3e2..3defe062c0381e443f4a827f396d4bd85528ab57 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 gitlab.com/elixxir/bloomfilter v0.0.0-20211222005329-7d931ceead6f - gitlab.com/elixxir/comms v0.0.4-0.20221010233602-6ed8c94ddac0 + gitlab.com/elixxir/comms v0.0.4-0.20221011183106-8c4450ba3cfb gitlab.com/elixxir/crypto v0.0.7-0.20221003185354-b091598d2322 gitlab.com/elixxir/ekv v0.2.1 gitlab.com/elixxir/primitives v0.0.3-0.20220901220638-1acc75fabdc6 diff --git a/go.sum b/go.sum index da4905f1ee2a4ed03054afa1476f87896a51a09f..e283526bded061afbaa498703e4448f550a9fef8 100644 --- a/go.sum +++ b/go.sum @@ -637,6 +637,10 @@ gitlab.com/elixxir/comms v0.0.4-0.20221005205938-10f2defa5b33 h1:mtn/b+/+cMoZNSE gitlab.com/elixxir/comms v0.0.4-0.20221005205938-10f2defa5b33/go.mod h1:oRteMH+R5t1j/FZ+KJJnZUcqJO2sLXnWksN5HPkZUIo= gitlab.com/elixxir/comms v0.0.4-0.20221010233602-6ed8c94ddac0 h1:Z8VcCdfmA1AHlGdPe/L8QSGhbjSW2NyCNrDxzByfuqI= gitlab.com/elixxir/comms v0.0.4-0.20221010233602-6ed8c94ddac0/go.mod h1:oRteMH+R5t1j/FZ+KJJnZUcqJO2sLXnWksN5HPkZUIo= +gitlab.com/elixxir/comms v0.0.4-0.20221011164757-27723f6e2a41 h1:xUjEFv7aoiJFCLpTMBPhCjNUWan8+b0GFnjwHORJvU4= +gitlab.com/elixxir/comms v0.0.4-0.20221011164757-27723f6e2a41/go.mod h1:oRteMH+R5t1j/FZ+KJJnZUcqJO2sLXnWksN5HPkZUIo= +gitlab.com/elixxir/comms v0.0.4-0.20221011183106-8c4450ba3cfb h1:aw7Ao1oqho+97gO35HkzBmv2e25qluRXEiNXw/oD8MM= +gitlab.com/elixxir/comms v0.0.4-0.20221011183106-8c4450ba3cfb/go.mod h1:oRteMH+R5t1j/FZ+KJJnZUcqJO2sLXnWksN5HPkZUIo= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= gitlab.com/elixxir/crypto v0.0.7-0.20220913220142-ab0771bad0af/go.mod h1:QF8SzsrYh9Elip9EUYUDAhPjqO9DGrrrQxYHvn+VXok=