diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..36859e77fa17a6019d108d2ac2ad5a0951a6cc11 --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +To whom it may concern, + +You can download, modify, compile and deploy this source code for the purpose of participating in the xx network as a +beta node, in accordance with your beta node participation agreement. + +You can also download, modify, compile and deploy the source code for non-commercial testing and verification +(i.e., security and bug review) purposes. You can repost aspects of the source code on both the BetaNet forum +(forum.xx.network) and the official Discord (https://discord.gg/D4NHmv4) consistent with these purposes. +To release a bug or security report outside the BetaNet forum or official Discord, you must notify bugs@xx.network at +least three business days in advance Pacific time. + +The xx network SEZC hereby grants you a non-transferable license under its legal rights limited to the purposes above. + +This Agreement and the license that it grants you expires the earlier of April 1st 2022 or the launch of the xx network +MainNet. + +THE SOURCE CODE IS PROVIDED TO YOU ON AN “AS IS” BASIS WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESS OR IMPLIED, +INCLUDING ANY WARRANTY OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE OR USE, OR ANY WARRANTY THAT THE SOURCE CODE + DOES NOT INFRINGE THE RIGHTS OF OTHERS (WHETHER PATENT RIGHTS, COPYRIGHTS OR OTHERWISE). + +THE XX NETWORK SEZC WILL NOT BE LIABLE TO YOU FOR ANY DAMAGES OF ANY KIND, WHETHER DIRECT, SPECIAL, CONSEQUENTIAL, +INCIDENTAL, INDIRECT OR OTHERWISE, EVEN IF THE XX NETWORK SEZC HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES, +WHICH ARISE OUT OF THIS AGREEMENT OR THE USE OR PERFORMANCE OF THE SOURCE CODE. + +The xx network SEZC diff --git a/README.md b/README.md index 185aa4585c85fe9b586d97694cac9a695ace5199..3e8b9d1c0bf7664cfbaa897c11adc4eb06bc5b5a 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,11 @@ blockchainGeoBinning: false # How long offline nodes remain in the NDF. If a node is offline past this duration # the node is pruned from the NDF. Expects duration in"h". (Defaults to 1 week (168 hours) pruneRetentionLimit: "168h" + +# How long rounds will be tracked by gateways. Rounds (and messages as an extension) +# prior to this period are not guaranteed to be delivered to clients. +# Expects duration in"h". (Defaults to 1 weeks (168 hours) +messageRetentionLimit: "168h" ``` ### SchedulingConfig template: diff --git a/cmd/geoBin.go b/cmd/geoBin.go index 423b92cd7d629c5fa36428c7439cb22dceb19ad1..e4a4974740f939d42829c5ade40dd626a0a2a0c1 100644 --- a/cmd/geoBin.go +++ b/cmd/geoBin.go @@ -75,7 +75,14 @@ func (m *RegistrationImpl) setNodeSequence(n *node.State, nodeIpAddr string) err return errors.Errorf(setDbSequenceErr, n.GetID(), countryCode) } - err = storage.PermissioningDb.UpdateGeoIP(n.GetAppID(), fmt.Sprintf("%s, %s", city, countryName), geobin.String(), gps) + // Generate the location string (exclude city if none is found) + location := countryName + if city != "" { + location = city + ", " + location + } + + err = storage.PermissioningDb.UpdateGeoIP( + n.GetAppID(), location, geobin.String(), gps) // Set the state ordering n.SetOrdering(countryCode) diff --git a/cmd/impl.go b/cmd/impl.go index fca90983dd0935d581abf9933eba28831c0d4538..f2b52ff48b0ef6b7594ff71ffc13d58ecc9e6536 100644 --- a/cmd/impl.go +++ b/cmd/impl.go @@ -16,6 +16,7 @@ import ( jww "github.com/spf13/jwalterweatherman" pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/registration" + "gitlab.com/elixxir/registration/scheduling" "gitlab.com/elixxir/registration/storage" "gitlab.com/elixxir/registration/storage/node" "gitlab.com/xx_network/comms/connect" @@ -27,6 +28,7 @@ import ( "gitlab.com/xx_network/primitives/region" "gitlab.com/xx_network/primitives/utils" "sync" + "sync/atomic" "time" ) @@ -34,6 +36,7 @@ import ( type RegistrationImpl struct { Comms *registration.Comms params *Params + schedulingParams *scheduling.SafeParams State *storage.NetworkState Stopped *uint32 permissioningCert *x509.Certificate @@ -58,6 +61,8 @@ type RegistrationImpl struct { geoIPDBStatus geoipStatus NDFLock sync.Mutex + + earliestRoundTracker atomic.Value } // function used to schedule nodes @@ -65,6 +70,12 @@ type SchedulingAlgorithm func(params []byte, state *storage.NetworkState) error var LoadAllRegNodes bool +type earliestRoundTracking struct { + ClientRoundId uint64 + GatewayRoundId uint64 + GatewayTimestamp int64 +} + // Configure and start the Permissioning Server func StartRegistration(params Params) (*RegistrationImpl, error) { @@ -113,13 +124,14 @@ func StartRegistration(params Params) (*RegistrationImpl, error) { // Build default parameters regImpl := &RegistrationImpl{ - params: ¶ms, - ndfOutputPath: params.NdfOutputPath, - NdfReady: &ndfReady, - Stopped: &roundCreationStopped, - numRegistered: 0, - beginScheduling: make(chan struct{}, 1), - registrationTimes: make(map[id.ID]int64), + params: ¶ms, + ndfOutputPath: params.NdfOutputPath, + NdfReady: &ndfReady, + Stopped: &roundCreationStopped, + numRegistered: 0, + beginScheduling: make(chan struct{}, 1), + registrationTimes: make(map[id.ID]int64), + earliestRoundTracker: atomic.Value{}, } // If the the GeoIP2 database file is supplied, then use it to open the @@ -448,3 +460,24 @@ func NewImplementation(instance *RegistrationImpl) *registration.Implementation return impl } + +func (m *RegistrationImpl) UpdateEarliestRound(clientEarliestRoundId, + gatewayEarliestRound id.Round, gatewayEarliestTimestamp time.Time) { + newEarliestRound := &earliestRoundTracking{ + ClientRoundId: uint64(clientEarliestRoundId), + GatewayRoundId: uint64(gatewayEarliestRound), + GatewayTimestamp: gatewayEarliestTimestamp.UnixNano(), + } + + m.earliestRoundTracker.Store(newEarliestRound) +} + +func (m *RegistrationImpl) GetEarliestRoundInfo() (uint64, uint64, int64, error) { + earliestRound, ok := m.earliestRoundTracker.Load().(*earliestRoundTracking) + if !ok || earliestRound == nil { + return 0, 0, 0, errors.New("Earliest round state does not exist, try again") + } + + return earliestRound.ClientRoundId, + earliestRound.GatewayRoundId, earliestRound.GatewayTimestamp, nil +} diff --git a/cmd/nodeMetricTracker.go b/cmd/nodeMetricTracker.go index 5ad33acdc1e6c097d13bdedcc1b51a0fed23e0ea..79c7d760e2fa69df3711952f82f5d6c1a48c3af8 100644 --- a/cmd/nodeMetricTracker.go +++ b/cmd/nodeMetricTracker.go @@ -9,6 +9,7 @@ package cmd import ( "encoding/json" + "github.com/jinzhu/gorm" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/registration/storage" @@ -145,6 +146,38 @@ func TrackNodeMetrics(impl *RegistrationImpl, quitChan chan struct{}, nodeMetric } impl.NDFLock.Unlock() } + + paramsCopy := impl.schedulingParams.SafeCopy() + + clientCutoff := impl.params.messageRetentionLimit + paramsCopy.RealtimeTimeout + gatewayCutoff := impl.params.messageRetentionLimit + paramsCopy.RealtimeTimeout + + 2*(paramsCopy.PrecomputationTimeout+paramsCopy.RealtimeDelay+paramsCopy.RealtimeTimeout) + + earliestClientRound, _, clientErr := storage.PermissioningDb. + GetEarliestRound(clientCutoff) + + earliestGwRound, earliestGwRoundTs, gatewayErr := storage.PermissioningDb. + GetEarliestRound(gatewayCutoff) + + if clientErr != nil || gatewayErr != nil { + if clientErr != nil && !errors.Is(clientErr, gorm.ErrRecordNotFound) { + jww.ERROR.Printf("GetEarliestRound returned no records for client cutoff") + } else if clientErr != nil { + jww.ERROR.Printf("GetEarliestRound returned an error "+ + "for client cutoff: %v", clientErr) + } + + if gatewayErr != nil && !errors.Is(gatewayErr, gorm.ErrRecordNotFound) { + jww.ERROR.Printf("GetEarliestRound returned no records for gateway cutoff") + } else if gatewayErr != nil { + jww.ERROR.Printf("GetEarliestRound returned an error "+ + "for gateway cutoff: %v", gatewayErr) + } + } else { + // If no errors, update impl + impl.UpdateEarliestRound(earliestClientRound, earliestGwRound, earliestGwRoundTs) + } + } } } diff --git a/cmd/nodeMetricTracker_test.go b/cmd/nodeMetricTracker_test.go index de2e75ad663b4208006058e29883403d173e8fee..22bf2a7f2aebb4da90ec44baa63803d1b765b135 100644 --- a/cmd/nodeMetricTracker_test.go +++ b/cmd/nodeMetricTracker_test.go @@ -9,12 +9,15 @@ package cmd import ( "bytes" "crypto/rand" + "gitlab.com/elixxir/registration/scheduling" "gitlab.com/elixxir/registration/storage" "gitlab.com/elixxir/registration/storage/node" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/ndf" "gitlab.com/xx_network/primitives/region" "strconv" + "sync" + "sync/atomic" "testing" "time" ) @@ -138,8 +141,14 @@ func TestTrackNodeMetrics(t *testing.T) { } impl := &RegistrationImpl{ - params: &testParams, - State: state, + params: &testParams, + State: state, + earliestRoundTracker: atomic.Value{}, + } + + impl.schedulingParams = &scheduling.SafeParams{ + RWMutex: sync.RWMutex{}, + Params: &scheduling.Params{}, } go TrackNodeMetrics(impl, kill, diff --git a/cmd/params.go b/cmd/params.go index 1dcff9ba5012f5ee68100b8698fb74e60a2afe8b..2b99bb2278b3414b052eef753c141d0c54bf195b 100644 --- a/cmd/params.go +++ b/cmd/params.go @@ -63,6 +63,13 @@ type Params struct { // NDF. Expects duration in"h". (Defaults to 1 week (168 hours) pruneRetentionLimit time.Duration + // How long rounds will be tracked by gateways. + // Rounds (and messages as an extension) + // prior to this period are not guaranteed to be delivered to clients. + // Expects duration in"h". (Defaults to 1 weeks (168 hours) + messageRetentionLimit time.Duration + messageRetentionLimitMux sync.Mutex + // Specs on rate limiting clients leakedCapacity uint32 leakedTokens uint32 @@ -82,3 +89,9 @@ func toGroup(grp map[string]string) (*ndf.Group, error) { } return &ndf.Group{Prime: pStr, Generator: gStr}, nil } + +func (p *Params) GetMessageRetention() time.Duration { + p.messageRetentionLimitMux.Lock() + defer p.messageRetentionLimitMux.Unlock() + return p.messageRetentionLimit +} diff --git a/cmd/poll.go b/cmd/poll.go index ec0023059b47cd6e280179edce9ecdf47df7f7f1..deb9fe31e6322541801f9cc8b18a60197d727ffe 100644 --- a/cmd/poll.go +++ b/cmd/poll.go @@ -31,6 +31,14 @@ func (m *RegistrationImpl) Poll(msg *pb.PermissioningPoll, auth *connect.Auth) ( // Initialize the response response := &pb.PermissionPollResponse{} + earliestClientRound, earliestGwRound, earliestGwRoundTs, err := m.GetEarliestRoundInfo() + if err != nil { + response.EarliestRoundErr = err.Error() + } else { + response.EarliestClientRound = earliestClientRound + response.EarliestGatewayRound = earliestGwRound + response.EarliestRoundTimestamp = earliestGwRoundTs + } //do edge check to ensure the message is not nil if msg == nil { @@ -44,7 +52,7 @@ func (m *RegistrationImpl) Poll(msg *pb.PermissioningPoll, auth *connect.Auth) ( } // Check for correct version - err := checkVersion(m.params, msg) + err = checkVersion(m.params, msg) if err != nil { return response, err } diff --git a/cmd/root.go b/cmd/root.go index 29f1fe75adc84443c6e059de030e224c7c566ca0..c57fabe8ca262917987a93d886b9c8965b05ea51 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -51,6 +51,7 @@ var ( // Default duration between polls of the disabled Node list for updates. const defaultDisabledNodesPollDuration = time.Minute const defaultPruneRetention = 24 * 7 * time.Hour +const defaultMessageRetention = 24 * 7 * time.Hour // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ @@ -187,6 +188,8 @@ var rootCmd = &cobra.Command{ viper.SetDefault("addressSpace", 5) viper.SetDefault("pruneRetentionLimit", defaultPruneRetention) + viper.SetDefault("messageRetentionLimit", defaultMessageRetention) + // Get rate limiting values capacity := viper.GetUint32("RateLimiting.Capacity") if capacity == 0 { @@ -233,11 +236,11 @@ var rootCmd = &cobra.Command{ onlyScheduleActive: viper.GetBool("onlyScheduleActive"), enableBlockchain: viper.GetBool("enableBlockchain"), - disableNDFPruning: viper.GetBool("disableNDFPruning"), - geoIPDBFile: viper.GetString("geoIPDBFile"), - pruneRetentionLimit: viper.GetDuration("pruneRetentionLimit"), - - versionLock: sync.RWMutex{}, + disableNDFPruning: viper.GetBool("disableNDFPruning"), + geoIPDBFile: viper.GetString("geoIPDBFile"), + pruneRetentionLimit: viper.GetDuration("pruneRetentionLimit"), + messageRetentionLimit: viper.GetDuration("messageRetentionLimit"), + versionLock: sync.RWMutex{}, // Rate limiting specs leakedCapacity: capacity, @@ -285,6 +288,16 @@ var rootCmd = &cobra.Command{ nodeMetricInterval := time.Duration( viper.GetInt64("nodeMetricInterval")) * time.Second + // Parse params JSON + params := scheduling.ParseParams(SchedulingConfig) + + // Initialize param update if it is enabled + if impl.params.enableBlockchain { + go scheduling.UpdateParams(params, nodeMetricInterval) + } + + impl.schedulingParams = params + // Run the Node metric tracker forever in another thread metricTrackerQuitChan := make(chan struct{}) go TrackNodeMetrics(impl, metricTrackerQuitChan, nodeMetricInterval) @@ -330,13 +343,6 @@ var rootCmd = &cobra.Command{ // Begin scheduling algorithm go func() { - // Parse params JSON - params := scheduling.ParseParams(SchedulingConfig) - - // Initialize param update if it is enabled - if impl.params.enableBlockchain { - go scheduling.UpdateParams(params, nodeMetricInterval) - } // Initialize scheduling err = scheduling.Scheduler(params, impl.State, roundCreationQuitChan) @@ -554,6 +560,19 @@ func initConfig() { func (m *RegistrationImpl) update(in fsnotify.Event) { m.updateVersions() m.updateRateLimiting() + m.updateEarliestRound() + +} + +func (m *RegistrationImpl) updateEarliestRound() { + msgRetention := viper.GetDuration("messageRetentionLimit") + if msgRetention.Seconds() == 0 { + msgRetention = defaultMessageRetention + } + + m.params.messageRetentionLimitMux.Lock() + m.params.messageRetentionLimit = msgRetention + m.params.messageRetentionLimitMux.Unlock() } diff --git a/cmd/version.go b/cmd/version.go index 88af0197ee8a8e17d48b26009ee258e6ed55f0b4..c0500d744414f273c20da9dec06d8f41dcf6fbbf 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -16,7 +16,7 @@ import ( ) // Change this value to set the version for this build -const currentVersion = "3.2.0" +const currentVersion = "3.3.0" func printVersion() { fmt.Printf("xx network Permissioning Server v%s -- %s\n\n", diff --git a/cmd/version_vars.go b/cmd/version_vars.go index bce4a9c822a72e84f278af5d678d6954508b8e05..f68424a2fc69b84f713b2ac7df67b9707c15c7ee 100644 --- a/cmd/version_vars.go +++ b/cmd/version_vars.go @@ -1,10 +1,10 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2021-10-06 16:56:24.600964 -0500 CDT m=+0.037358375 +// 2021-11-11 13:01:22.955006 -0600 CST m=+0.046409656 package cmd -const GITVERSION = `7c5e07a update deps` -const SEMVER = "3.2.0" +const GITVERSION = `19f2713 Merge branch 'hotfix/CompletedTS' into 'release'` +const SEMVER = "3.3.0" const DEPENDENCIES = `module gitlab.com/elixxir/registration go 1.13 @@ -26,12 +26,12 @@ require ( github.com/spf13/cobra v1.1.3 github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 - gitlab.com/elixxir/comms v0.0.4-0.20211006204855-b8344f58b9e2 - gitlab.com/elixxir/crypto v0.0.7-0.20210920180151-6c9b84bae372 - gitlab.com/elixxir/primitives v0.0.3-0.20210920180121-b85bca5212f4 - gitlab.com/xx_network/comms v0.0.4-0.20211006215149-7e899b800f6d - gitlab.com/xx_network/crypto v0.0.5-0.20210920180047-4dd4aed4a942 - gitlab.com/xx_network/primitives v0.0.4-0.20210915220237-70cb4551d6f3 + gitlab.com/elixxir/comms v0.0.4-0.20211101174956-590ba1b47887 + gitlab.com/elixxir/crypto v0.0.7-0.20211022013957-3a7899285c4c + gitlab.com/elixxir/primitives v0.0.3-0.20211102233208-a716d5c670b6 + gitlab.com/xx_network/comms v0.0.4-0.20211014163953-e774276b83ae + gitlab.com/xx_network/crypto v0.0.5-0.20211014163843-57b345890686 + gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb google.golang.org/genproto v0.0.0-20210315173758-2651cd453018 // indirect ) ` diff --git a/go.mod b/go.mod index f6dc96a8b3482dc50a52dc51f5e5ac660256a36b..79181a74fa3aae16e62798afdaec6da3d6419b56 100644 --- a/go.mod +++ b/go.mod @@ -19,9 +19,9 @@ require ( github.com/spf13/cobra v1.1.3 github.com/spf13/jwalterweatherman v1.1.0 github.com/spf13/viper v1.7.1 - gitlab.com/elixxir/comms v0.0.4-0.20211014164523-495493efb970 - gitlab.com/elixxir/crypto v0.0.7-0.20211014164205-95915de2ac0d - gitlab.com/elixxir/primitives v0.0.3-0.20211014164029-06022665b576 + gitlab.com/elixxir/comms v0.0.4-0.20211101174956-590ba1b47887 + gitlab.com/elixxir/crypto v0.0.7-0.20211022013957-3a7899285c4c + gitlab.com/elixxir/primitives v0.0.3-0.20211102233208-a716d5c670b6 gitlab.com/xx_network/comms v0.0.4-0.20211014163953-e774276b83ae gitlab.com/xx_network/crypto v0.0.5-0.20211014163843-57b345890686 gitlab.com/xx_network/primitives v0.0.4-0.20211014163031-53405cf191fb diff --git a/go.sum b/go.sum index fd291e97b186f7c787215a14b1f504d00a1cb3dc..b73c4e9a38ad62a7d455711810c6811f3e8edb28 100644 --- a/go.sum +++ b/go.sum @@ -269,18 +269,19 @@ github.com/zeebo/blake3 v0.0.4/go.mod h1:YOZo8A49yNqM0X/Y+JmDUZshJWLt1laHsNSn5ny github.com/zeebo/blake3 v0.1.1/go.mod h1:G9pM4qQwjRzF1/v7+vabMj/c5mWpGZ2Wzo3Eb4z0pb4= github.com/zeebo/pcg v0.0.0-20181207190024-3cdc6b625a05/go.mod h1:Gr+78ptB0MwXxm//LBaEvBiaXY7hXJ6KGe2V32X2F6E= github.com/zeebo/pcg v1.0.0/go.mod h1:09F0S9iiKrwn9rlI5yjLkmrug154/YRW6KnnXVDM/l4= -gitlab.com/elixxir/comms v0.0.4-0.20211014164523-495493efb970 h1:mSf5KidH231esbVvL1rozvLXhgAHn8S+BV70k0oxlW8= -gitlab.com/elixxir/comms v0.0.4-0.20211014164523-495493efb970/go.mod h1:L2fs1Me+L6SKyix7+Gyd9QKmBMjnyJPds/ikSPqdeNY= +gitlab.com/elixxir/comms v0.0.4-0.20211101174956-590ba1b47887 h1:SOQaoEvc6RqImz86jSjsj7wIW3ZhgxXc38GzvRkKdOw= +gitlab.com/elixxir/comms v0.0.4-0.20211101174956-590ba1b47887/go.mod h1:rQpTeFVSn08ocbQeEw5AbMhGWXHfXmQ0y1/ZprAIVVU= gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c= gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA= -gitlab.com/elixxir/crypto v0.0.7-0.20211014164205-95915de2ac0d h1:tI3YYoHVb/KViRhagzQM0XKdw0hJ7KcuSQXFIWhmtSE= -gitlab.com/elixxir/crypto v0.0.7-0.20211014164205-95915de2ac0d/go.mod h1:teuTEXyqsqo4N/J1sshcTg9xYOt+wNTurop7pkZOiCg= +gitlab.com/elixxir/crypto v0.0.7-0.20211022013957-3a7899285c4c h1:HIr2HBhZqSAKdPRBdEY0/qravISL619O2yuTY/DQTdo= +gitlab.com/elixxir/crypto v0.0.7-0.20211022013957-3a7899285c4c/go.mod h1:teuTEXyqsqo4N/J1sshcTg9xYOt+wNTurop7pkZOiCg= gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg= gitlab.com/elixxir/primitives v0.0.0-20200804170709-a1896d262cd9/go.mod h1:p0VelQda72OzoUckr1O+vPW0AiFe0nyKQ6gYcmFSuF8= gitlab.com/elixxir/primitives v0.0.0-20200804182913-788f47bded40/go.mod h1:tzdFFvb1ESmuTCOl1z6+yf6oAICDxH2NPUemVgoNLxc= gitlab.com/elixxir/primitives v0.0.1/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2YKnw5c+LJCE= -gitlab.com/elixxir/primitives v0.0.3-0.20211014164029-06022665b576 h1:sXX3/hewV4TQLxT2iKBfnfgW/A1eXoEfv5raJxTb79s= gitlab.com/elixxir/primitives v0.0.3-0.20211014164029-06022665b576/go.mod h1:zZy8AlOISFm5IG4G4sylypnz7xNBfZ5mpXiibqJT8+8= +gitlab.com/elixxir/primitives v0.0.3-0.20211102233208-a716d5c670b6 h1:ymWyFBFLcRQiuSId54dq8PVeiV4W7a9737kV46Thjlk= +gitlab.com/elixxir/primitives v0.0.3-0.20211102233208-a716d5c670b6/go.mod h1:zZy8AlOISFm5IG4G4sylypnz7xNBfZ5mpXiibqJT8+8= gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw= gitlab.com/xx_network/comms v0.0.4-0.20211014163953-e774276b83ae h1:jmZWmSm8eH40SX5B5uOw2XaYoHYqVn8daTfa6B80AOs= gitlab.com/xx_network/comms v0.0.4-0.20211014163953-e774276b83ae/go.mod h1:wR9Vx0KZLrIs0g2Efcp0UwFPStjcDRWkg/DJLVQI2vw= @@ -459,8 +460,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/scheduling/nodeStateChange.go b/scheduling/nodeStateChange.go index 5899b4942fa454ea691b6f8a63788748b9070492..58490424bdfecaf70ddd284456e3980321059183 100644 --- a/scheduling/nodeStateChange.go +++ b/scheduling/nodeStateChange.go @@ -90,16 +90,6 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state "not be moving to the %s state", update.Node, states.PRECOMPUTING) } - // fixme: nodes selected from pool are assigned to precomp in start round, inherently are synced - //stateComplete := r.NodeIsReadyForTransition() - //if stateComplete { - // err := r.Update(states.PRECOMPUTING, time.Now()) - // if err != nil { - // return errors.WithMessagef(err, - // "Could not move round %v from %s to %s", - // r.GetRoundID(), states.PENDING, states.PRECOMPUTING) - // } - //} case current.STANDBY: // Check that node in standby actually does have a round if !hasRound { @@ -170,6 +160,12 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state // Clear the round n.ClearRound() + + // Keep track of when the first node reached the completed state + if r.GetTopology().IsLastNode(n.GetID()) { + r.SetRealtimeCompletedTs(time.Now().UnixNano()) + } + // Check if the round is ready for all the nodes // in order to transition stateComplete := r.NodeIsReadyForTransition() @@ -197,7 +193,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state roundTracker.RemoveActiveRound(r.GetRoundID()) // Store round metric in another thread for completed round - go StoreRoundMetric(roundInfo, states.COMPLETED) + go StoreRoundMetric(roundInfo, r.GetRoundState(), r.GetRealtimeCompletedTs()) // Commit metrics about the round to storage return nil @@ -219,13 +215,14 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state } // Insert metrics about the newly-completed round into storage -func StoreRoundMetric(roundInfo *pb.RoundInfo, realtimeEnd states.Round) { +func StoreRoundMetric(roundInfo *pb.RoundInfo, roundEnd states.Round, realtimeTs int64) { metric := &storage.RoundMetric{ Id: roundInfo.ID, PrecompStart: time.Unix(0, int64(roundInfo.Timestamps[states.PRECOMPUTING])), PrecompEnd: time.Unix(0, int64(roundInfo.Timestamps[states.STANDBY])), RealtimeStart: time.Unix(0, int64(roundInfo.Timestamps[states.REALTIME])), - RealtimeEnd: time.Unix(0, int64(roundInfo.Timestamps[realtimeEnd])), + RealtimeEnd: time.Unix(0, realtimeTs), + RoundEnd: time.Unix(0, int64(roundInfo.Timestamps[roundEnd])), BatchSize: roundInfo.BatchSize, } @@ -265,7 +262,7 @@ func killRound(state *storage.NetworkState, r *round.State, go func() { // Attempt to insert the RoundMetric for the failed round - StoreRoundMetric(roundInfo, states.FAILED) + StoreRoundMetric(roundInfo, r.GetRoundState(), 0) // Return early if there is no roundError if roundError == nil { diff --git a/scheduling/params.go b/scheduling/params.go index 652fad551a5e451ccdbb8dda53ec17047698bb44..3da502cb833046e62999eb2619025e74feb2ac53 100644 --- a/scheduling/params.go +++ b/scheduling/params.go @@ -26,7 +26,7 @@ type SafeParams struct { } // Allows for safe duplication of the current internal Params object -func (s *SafeParams) safeCopy() Params { +func (s *SafeParams) SafeCopy() Params { s.RLock() defer s.RUnlock() return *s.Params diff --git a/scheduling/schedule.go b/scheduling/schedule.go index ae029c6c416d0b7ccead012fc840998d2796ddc3..2ebc20ec5b1d2c14e8dbd488a3c4c65a35f9e553 100644 --- a/scheduling/schedule.go +++ b/scheduling/schedule.go @@ -163,7 +163,7 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch //begin the thread that starts rounds go func() { - paramsCopy := params.safeCopy() + paramsCopy := params.SafeCopy() lastRound := time.Now() var err error @@ -201,7 +201,7 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch // Start receiving updates from nodes for true { - paramsCopy := params.safeCopy() + paramsCopy := params.SafeCopy() isRoundTimeout := false var update node.UpdateNotification @@ -248,7 +248,7 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch // Create a new round if the pool is full var teamFormationThreshold uint32 if paramsCopy.Secure { - teamFormationThreshold = uint32(paramsCopy.Threshold * float64(numNodesInPool)) + teamFormationThreshold = uint32(paramsCopy.Threshold * float64(state.CountActiveNodes())) } else { teamFormationThreshold = paramsCopy.TeamSize } diff --git a/storage/interface.go b/storage/interface.go index 3b396bf4689ecb1adec1e3e99593651296174f7a..13386c634b7a72be260e53d104d9df5aea629901 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -27,6 +27,7 @@ type database interface { GetLatestEphemeralLength() (*EphemeralLength, error) GetEphemeralLengths() ([]*EphemeralLength, error) InsertEphemeralLength(length *EphemeralLength) error + GetEarliestRound(cutoff time.Duration) (id.Round, time.Time, error) getBins() ([]*GeoBin, error) // Node methods @@ -200,7 +201,8 @@ type RoundMetric struct { PrecompStart time.Time `gorm:"NOT NULL"` PrecompEnd time.Time `gorm:"NOT NULL;INDEX;"` RealtimeStart time.Time `gorm:"NOT NULL"` - RealtimeEnd time.Time `gorm:"NOT NULL;INDEX;"` // Index for TPS calc + RealtimeEnd time.Time `gorm:"NOT NULL;INDEX;"` // Index for TPS calc + RoundEnd time.Time `gorm:"NOT NULL;INDEX;default:to_timestamp(0)"` // Index for TPS calc BatchSize uint32 `gorm:"NOT NULL"` // Each RoundMetric has many Nodes participating in each Round diff --git a/storage/permissioningDb.go b/storage/permissioningDb.go index f80e6381e13beacf93e05a0850914342e25eae5a..ebbacbb9b980d9a894fb501ae6339add1e97805c 100644 --- a/storage/permissioningDb.go +++ b/storage/permissioningDb.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/xx_network/primitives/id" + "time" ) // Inserts the given State into Storage if it does not exist @@ -112,6 +113,19 @@ func (d *DatabaseImpl) InsertEphemeralLength(length *EphemeralLength) error { return d.db.Create(length).Error } +// Get the first round that is timestamped after the given cutoff +func (d *DatabaseImpl) GetEarliestRound(cutoff time.Duration) (id.Round, time.Time, error) { + var result RoundMetric + cutoffTs := time.Now().Add(-cutoff) + err := d.db.Where("? <= realtime_end", cutoffTs).Order("realtime_end ASC").Take(&result).Error + if err != nil { + return 0, time.Time{}, err + } + roundId := id.Round(result.Id) + jww.TRACE.Printf("Obtained EarliestRound: %d", roundId) + return roundId, result.RealtimeStart, nil +} + // Returns all GeoBin from Storage func (d *DatabaseImpl) getBins() ([]*GeoBin, error) { var result []*GeoBin diff --git a/storage/permissioningMap.go b/storage/permissioningMap.go index 65a0ca24ac324d2b0742247169df15491af3e1e2..e1ca813adb54271083cc0a899ed239418b34159c 100644 --- a/storage/permissioningMap.go +++ b/storage/permissioningMap.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/xx_network/primitives/id" + "time" ) // Inserts the given State into Storage if it does not exist @@ -153,6 +154,18 @@ func (m *MapImpl) InsertEphemeralLength(length *EphemeralLength) error { return nil } +// Get the first round that is timestamped after the given cutoff +func (m *MapImpl) GetEarliestRound(cutoff time.Duration) (id.Round, time.Time, error) { + cutoffTs := time.Now().Add(-cutoff) + minRound := &RoundMetric{} + for _, v := range m.roundMetrics { + if v.RealtimeEnd.After(cutoffTs) && (v.RealtimeEnd.Before(minRound.RealtimeEnd) || minRound.Id == 0) { + minRound = v + } + } + return id.Round(minRound.Id), minRound.PrecompEnd, nil +} + // Returns all GeoBin from Storage func (m *MapImpl) getBins() ([]*GeoBin, error) { m.mut.Lock() diff --git a/storage/permissioningMap_test.go b/storage/permissioningMap_test.go index b0bd4a93983627c2806c404ff3b126f60427816e..6773c61c7ee6e5604d562e00d6346e98337f5e8b 100644 --- a/storage/permissioningMap_test.go +++ b/storage/permissioningMap_test.go @@ -25,7 +25,50 @@ import ( // return // } // -// result, err := db.GetLatestEphemeralLength() +// err = db.InsertRoundMetric(&RoundMetric{ +// Id: 5, +// PrecompStart: time.Now(), +// PrecompEnd: time.Now(), +// RealtimeStart: time.Now(), +// RealtimeEnd: time.Now().Add(-59*time.Minute), +// BatchSize: 10, +// }, nil) +// if err != nil { +// t.Errorf(err.Error()) +// return +// } +// err = db.InsertRoundMetric(&RoundMetric{ +// Id: 6, +// PrecompStart: time.Now(), +// PrecompEnd: time.Now(), +// RealtimeStart: time.Now(), +// RealtimeEnd: time.Now().Add(-time.Hour), +// BatchSize: 10, +// }, nil) +// if err != nil { +// t.Errorf(err.Error()) +// return +// } +// err = db.InsertRoundMetric(&RoundMetric{ +// Id: 7, +// PrecompStart: time.Now(), +// PrecompEnd: time.Now(), +// RealtimeStart: time.Now(), +// RealtimeEnd: time.Now().Add(-(3*time.Hour)), +// BatchSize: 10, +// }, nil) +// if err != nil { +// t.Errorf(err.Error()) +// return +// } +// result, result2, err := db.GetEarliestRound(2*time.Hour) +// if err != nil { +// t.Errorf(err.Error()) +// return +// } +// jww.FATAL.Printf("GetEarliestRound: %d %v", result, result2) +// +// result, err = db.GetLatestEphemeralLength() // if err != nil { // t.Errorf(err.Error()) // } @@ -348,6 +391,52 @@ func TestMapImpl_GetLatestEphemeralLengthErr(t *testing.T) { } } +func TestMapImpl_GetEarliestRound(t *testing.T) { + m := &MapImpl{roundMetrics: make(map[uint64]*RoundMetric)} + + cutoff := 20 * time.Minute + roundId, _, err := m.GetEarliestRound(cutoff) + if err != nil || int(roundId) != 0 { + t.Errorf("Invalid return for empty roundMetrics: %+v", err) + } + + metrics := []*RoundMetric{{ + Id: 0, + PrecompStart: time.Now(), + PrecompEnd: time.Now(), + RealtimeStart: time.Now(), + RealtimeEnd: time.Now().Add(-30 * time.Minute), + BatchSize: 420, + }, + { + Id: 1, + PrecompStart: time.Now(), + PrecompEnd: time.Now(), + RealtimeStart: time.Now(), + RealtimeEnd: time.Now().Add(-time.Minute), + BatchSize: 420, + }, + { + Id: 2, + PrecompStart: time.Now(), + PrecompEnd: time.Now(), + RealtimeStart: time.Now(), + RealtimeEnd: time.Now().Add(-10 * time.Minute), + BatchSize: 420, + }, + } + + // Insert dummy metrics + for _, metric := range metrics { + m.roundMetrics[metric.Id] = metric + } + + roundId, _, err = m.GetEarliestRound(cutoff) + if err != nil || uint64(roundId) != 2 { + t.Errorf("Invalid return for GetEarliestRound: %d %+v", roundId, err) + } +} + // Test error path to ensure error message stays consistent func TestMapImpl_GetStateValue(t *testing.T) { m := &MapImpl{states: make(map[string]string)} diff --git a/storage/round/state.go b/storage/round/state.go index 7adfdee9f28444a7d4839e431381de109e16d65b..0dae0b676d37225b06657b3c0415c70d1f234c4f 100644 --- a/storage/round/state.go +++ b/storage/round/state.go @@ -43,6 +43,10 @@ type State struct { lastUpdate time.Time + // Keep track of the ns timestamp when the last node in the round reported completed + // in order to get better granularity for when realtime finished + realtimeCompletedTs int64 + mux sync.RWMutex } @@ -175,6 +179,16 @@ func (s *State) GetRoundID() id.Round { return rid } +// Return firstCompletedTs +func (s *State) GetRealtimeCompletedTs() int64 { + return s.realtimeCompletedTs +} + +// Set firstCompletedTs +func (s *State) SetRealtimeCompletedTs(ts int64) { + s.realtimeCompletedTs = ts +} + // Append a round error to our list of stored rounderrors func (s *State) AppendError(roundError *pb.RoundError) { s.mux.Lock() diff --git a/storage/state.go b/storage/state.go index 212bf541ba1e528188c01ee5cc354620fbd03aa3..921a24bba3a627f9f734cfb4813bffd4aceed987 100644 --- a/storage/state.go +++ b/storage/state.go @@ -186,6 +186,12 @@ func NewState(rsaPrivKey *rsa.PrivateKey, addressSpaceSize uint32, return state, nil } +// CountActiveNodes returns a count of active nodes in the state +// NOTE: Accounts for pruned, but not stale nodes +func (s *NetworkState) CountActiveNodes() int { + return len(s.GetFullNdf().Get().Nodes) +} + // Adds pruned nodes, used by disabledNodes func (s *NetworkState) setPrunedNodesNoReset(ids []*id.ID) { s.pruneListMux.Lock()