diff --git a/scheduling/nodeStateChange.go b/scheduling/nodeStateChange.go index 58490424bdfecaf70ddd284456e3980321059183..3629b91402f68a7b9bacd0086cf51731e3cb9c9b 100644 --- a/scheduling/nodeStateChange.go +++ b/scheduling/nodeStateChange.go @@ -22,16 +22,31 @@ import ( "time" ) +type stateChanger struct { + lastRealtime time.Time + + realtimeDelay time.Duration + realtimeDelta time.Duration + + realtimeTimeout time.Duration + + pool *waitingPool + + state *storage.NetworkState + + roundTracker *RoundTracker + + roundTimeoutChan chan id.Round +} + // HandleNodeUpdates handles the node state changes. // A node in waiting is added to the pool in preparation for precomputing. // A node in standby is added to a round in preparation for realtime. // A node in completed waits for all other nodes in the team to transition // before the round is updated. -func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state *storage.NetworkState, - realtimeDelay time.Duration, roundTracker *RoundTracker, roundTimeoutChan chan id.Round, - realtimeTimeout time.Duration) error { +func (sc *stateChanger) HandleNodeUpdates(update node.UpdateNotification) error { // Check the round's error state - n := state.GetNodeMap().GetNode(update.Node) + n := sc.state.GetNodeMap().GetNode(update.Node) // when a node poll is received, the nodes polling lock is taken. If there // is no update, it is released in the endpoint, otherwise it is released // here which blocks all future polls until processing completes @@ -57,14 +72,14 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state NodeId: id.Permissioning.Marshal(), Error: fmt.Sprintf("Round killed due to particiption of banned node %s", update.Node), } - err := signature.SignRsa(banError, state.GetPrivateKey()) + err := signature.SignRsa(banError, sc.state.GetPrivateKey()) if err != nil { return errors.Errorf("Failed to sign error message for banned node %s: %+v", update.Node, err) } n.ClearRound() - return killRound(state, r, banError, roundTracker) + return killRound(sc.state, r, banError, sc.roundTracker) } else { - pool.Ban(n) + sc.pool.Ban(n) return nil } } @@ -77,10 +92,10 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state // If the node was in the offline pool, set it to online // (which also adds it to the online pool) if update.FromStatus == node.Inactive && update.ToStatus == node.Active { - pool.SetNodeToOnline(n) + sc.pool.SetNodeToOnline(n) } else { // Otherwise, add it to the online pool normally - pool.Add(n) + sc.pool.Add(n) } case current.PRECOMPUTING: @@ -111,11 +126,19 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state // kill the precomp timeout and start a realtime timeout r.DenoteRoundCompleted() - go waitForRoundTimeout(roundTimeoutChan, state, r, - realtimeTimeout, "realtime") + go waitForRoundTimeout(sc.roundTimeoutChan, sc.state, r, + sc.realtimeTimeout, "realtime") + + startTime := time.Now().Add(sc.realtimeDelay) + nextRoundMinimum := sc.lastRealtime.Add(sc.realtimeDelta) + if nextRoundMinimum.After(startTime) { + startTime = nextRoundMinimum + } + + sc.lastRealtime = startTime // Update the round for realtime transition - err = r.Update(states.QUEUED, time.Now().Add(realtimeDelay)) + err = r.Update(states.QUEUED, startTime) if err != nil { return errors.WithMessagef(err, @@ -124,7 +147,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state } // Build the round info and add to the networkState - err = state.AddRoundUpdate(r.BuildRoundInfo()) + err = sc.state.AddRoundUpdate(r.BuildRoundInfo()) if err != nil { return errors.WithMessagef(err, "Could not issue "+ "update for round %v transitioning from %s to %s", @@ -181,7 +204,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state // Build the round info and add to the networkState roundInfo := r.BuildRoundInfo() - err = state.AddRoundUpdate(roundInfo) + err = sc.state.AddRoundUpdate(roundInfo) if err != nil { return errors.WithMessagef(err, "Could not issue "+ "update for round %v transitioning from %s to %s", @@ -190,7 +213,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state //send the signal that the round is complete r.DenoteRoundCompleted() - roundTracker.RemoveActiveRound(r.GetRoundID()) + sc.roundTracker.RemoveActiveRound(r.GetRoundID()) // Store round metric in another thread for completed round go StoreRoundMetric(roundInfo, r.GetRoundState(), r.GetRealtimeCompletedTs()) @@ -206,7 +229,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state //send the signal that the round is complete r.DenoteRoundCompleted() n.ClearRound() - err = killRound(state, r, update.Error, roundTracker) + err = killRound(sc.state, r, update.Error, sc.roundTracker) } return err } diff --git a/scheduling/nodeStateChange_test.go b/scheduling/nodeStateChange_test.go index bc77f06889af3eaa6da18a017323b56426d4a073..c11d2062367c61c6ab3e765a6933b7a2b4375324 100644 --- a/scheduling/nodeStateChange_test.go +++ b/scheduling/nodeStateChange_test.go @@ -73,8 +73,18 @@ func TestHandleNodeStateChance_Waiting(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } @@ -132,8 +142,18 @@ func TestHandleNodeStateChance_Waiting_SetNodeToOnline(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } @@ -196,8 +216,19 @@ func TestHandleNodeStateChance_Standby(t *testing.T) { timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: testTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) + if err != nil { t.Errorf("Waiting pool is full for %d: %v", i, err) } @@ -214,8 +245,18 @@ func TestHandleNodeStateChance_Standby(t *testing.T) { testState.GetNodeMap().GetNode(nodeList[i]).GetPollingLock().Lock() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - nil, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: nil, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Error in standby happy path: %v", err) } @@ -270,8 +311,18 @@ func TestHandleNodeStateChance_Standby_NoRound(t *testing.T) { testTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: testTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err == nil { t.Errorf("Expected error for %d was not received. Node should not have round", i) } @@ -337,8 +388,18 @@ func TestHandleNodeUpdates_Completed(t *testing.T) { timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: testTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Waiting pool is full for %d: %v", i, err) } @@ -357,8 +418,18 @@ func TestHandleNodeUpdates_Completed(t *testing.T) { timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: testTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Expected happy path for completed: %v", err) } @@ -407,8 +478,18 @@ func TestHandleNodeUpdates_Completed_NoRound(t *testing.T) { timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: testTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err == nil { t.Errorf("Expected error for %d was not received. Node should not have round", i) } @@ -475,8 +556,18 @@ func TestHandleNodeUpdates_Error(t *testing.T) { timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: testTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } @@ -521,15 +612,24 @@ func TestHandleNodeUpdates_BannedNode(t *testing.T) { Node: nodeList[0], ToStatus: node.Banned, } - testTracker := NewRoundTracker() // Ban the first node in the state map testState.GetNodeMap().GetNode(nodeList[0]).GetPollingLock().Lock() roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } @@ -560,8 +660,7 @@ func TestHandleNodeUpdates_BannedNode(t *testing.T) { // Ban the the second node in the state map testState.GetNodeMap().GetNode(nodeList[1]).GetPollingLock().Lock() - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - testTracker, timeoutCh, 15*time.Second) + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } @@ -670,8 +769,18 @@ func TestHandleNodeUpdates_Precomputing_RoundError(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err == nil { t.Errorf("HandleNodeUpdates() did not produce the expected error when"+ "there is no round.\n\texpected: %v\n\treceived: %v", @@ -726,8 +835,18 @@ func TestHandleNodeUpdates_Realtime(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } @@ -775,8 +894,18 @@ func TestHandleNodeUpdates_Realtime_RoundError(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err == nil { t.Errorf("HandleNodeUpdates() did not produce the expected error when"+ "there is no round.\n\texpected: %v\n\treceived: %v", @@ -835,8 +964,18 @@ func TestHandleNodeUpdates_Realtime_UpdateError(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err == nil { t.Errorf("HandleNodeUpdates() did not produce the expected error."+ "\n\texpected: %v\n\treceived: %v", @@ -891,8 +1030,18 @@ func TestHandleNodeUpdates_RoundErrored(t *testing.T) { roundTracker := NewRoundTracker() timeoutCh := make(chan id.Round, 1) - err = HandleNodeUpdates(testUpdate, testPool, testState, 0, - roundTracker, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: testPool, + state: testState, + roundTracker: roundTracker, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Expected no error return!") } @@ -930,9 +1079,18 @@ func TestHandleNodeUpdates_NOT_STARTED(t *testing.T) { testState.GetNodeMap().GetNode(nodeList[0]).GetPollingLock().Lock() timeoutCh := make(chan id.Round, 1) - - err = HandleNodeUpdates(testUpdate, nil, testState, 0, - nil, timeoutCh, 15*time.Second) + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: 0, + realtimeDelta: 0, + realtimeTimeout: 15 * time.Second, + pool: nil, + state: testState, + roundTracker: nil, + roundTimeoutChan: timeoutCh, + } + + err = sc.HandleNodeUpdates(testUpdate) if err != nil { t.Errorf("Happy path received error: %v", err) } diff --git a/scheduling/schedule.go b/scheduling/schedule.go index 5c22b443bbcce2df325dbeae375bda2dcff2c01c..d4f77fb9bfeab631b0eef0a88ba48a04aa960d15 100644 --- a/scheduling/schedule.go +++ b/scheduling/schedule.go @@ -158,11 +158,12 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch //begin the thread that starts rounds go func() { - paramsCopy := params.SafeCopy() lastRound := time.Now() + + paramsCopy := params.SafeCopy() + minRoundDelay := (paramsCopy.MinimumDelay * time.Millisecond)/3 var err error - minRoundDelay := paramsCopy.MinimumDelay * time.Millisecond for newRound := range newRoundChan { // To avoid back-to-back teaming, we make sure to sleep until the minimum delay @@ -194,9 +195,27 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch go trackRounds(state, pool, roundTracker, &iterationsCount) } + paramsCopy := params.SafeCopy() + + sc := &stateChanger{ + lastRealtime: time.Unix(0, 0), + realtimeDelay: paramsCopy.RealtimeDelay * time.Millisecond, + realtimeDelta: paramsCopy.MinimumDelay * time.Millisecond, + realtimeTimeout: paramsCopy.RealtimeTimeout * time.Millisecond, + pool: pool, + state: state, + roundTracker: roundTracker, + roundTimeoutChan: roundTimeoutTracker, + } + + jww.INFO.Printf("Initialized state changer with: " + + "\n\t realtimeDelay: %s, " + + "\n\t realtimeDelta: %s" + + "\n\t realtimeTimeout: %s", sc.realtimeDelay, + sc.realtimeDelta, sc.realtimeTimeout) + // Start receiving updates from nodes for true { - paramsCopy := params.SafeCopy() isRoundTimeout := false var update node.UpdateNotification @@ -227,9 +246,7 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch var err error // Handle the node's state change - err = HandleNodeUpdates(update, pool, state, - paramsCopy.RealtimeDelay*time.Millisecond, roundTracker, roundTimeoutTracker, - paramsCopy.RealtimeTimeout*time.Millisecond) + err = sc.HandleNodeUpdates(update) if err != nil { return err }