Skip to content
Snippets Groups Projects
Commit c9dd2132 authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'fixRoundTiming' into 'master'

modified the system so that is delays realtime starts to try to evently space...

See merge request !49
parents 15c20916 4f00d8ab
No related branches found
No related tags found
1 merge request!49modified the system so that is delays realtime starts to try to evently space...
...@@ -22,16 +22,31 @@ import ( ...@@ -22,16 +22,31 @@ import (
"time" "time"
) )
type stateChanger struct {
lastRealtime time.Time
realtimeDelay time.Duration
realtimeDelta time.Duration
realtimeTimeout time.Duration
pool *waitingPool
state *storage.NetworkState
roundTracker *RoundTracker
roundTimeoutChan chan id.Round
}
// HandleNodeUpdates handles the node state changes. // HandleNodeUpdates handles the node state changes.
// A node in waiting is added to the pool in preparation for precomputing. // A node in waiting is added to the pool in preparation for precomputing.
// A node in standby is added to a round in preparation for realtime. // A node in standby is added to a round in preparation for realtime.
// A node in completed waits for all other nodes in the team to transition // A node in completed waits for all other nodes in the team to transition
// before the round is updated. // before the round is updated.
func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state *storage.NetworkState, func (sc *stateChanger) HandleNodeUpdates(update node.UpdateNotification) error {
realtimeDelay time.Duration, roundTracker *RoundTracker, roundTimeoutChan chan id.Round,
realtimeTimeout time.Duration) error {
// Check the round's error state // Check the round's error state
n := state.GetNodeMap().GetNode(update.Node) n := sc.state.GetNodeMap().GetNode(update.Node)
// when a node poll is received, the nodes polling lock is taken. If there // when a node poll is received, the nodes polling lock is taken. If there
// is no update, it is released in the endpoint, otherwise it is released // is no update, it is released in the endpoint, otherwise it is released
// here which blocks all future polls until processing completes // here which blocks all future polls until processing completes
...@@ -57,14 +72,14 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -57,14 +72,14 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
NodeId: id.Permissioning.Marshal(), NodeId: id.Permissioning.Marshal(),
Error: fmt.Sprintf("Round killed due to particiption of banned node %s", update.Node), Error: fmt.Sprintf("Round killed due to particiption of banned node %s", update.Node),
} }
err := signature.SignRsa(banError, state.GetPrivateKey()) err := signature.SignRsa(banError, sc.state.GetPrivateKey())
if err != nil { if err != nil {
return errors.Errorf("Failed to sign error message for banned node %s: %+v", update.Node, err) return errors.Errorf("Failed to sign error message for banned node %s: %+v", update.Node, err)
} }
n.ClearRound() n.ClearRound()
return killRound(state, r, banError, roundTracker) return killRound(sc.state, r, banError, sc.roundTracker)
} else { } else {
pool.Ban(n) sc.pool.Ban(n)
return nil return nil
} }
} }
...@@ -77,10 +92,10 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -77,10 +92,10 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
// If the node was in the offline pool, set it to online // If the node was in the offline pool, set it to online
// (which also adds it to the online pool) // (which also adds it to the online pool)
if update.FromStatus == node.Inactive && update.ToStatus == node.Active { if update.FromStatus == node.Inactive && update.ToStatus == node.Active {
pool.SetNodeToOnline(n) sc.pool.SetNodeToOnline(n)
} else { } else {
// Otherwise, add it to the online pool normally // Otherwise, add it to the online pool normally
pool.Add(n) sc.pool.Add(n)
} }
case current.PRECOMPUTING: case current.PRECOMPUTING:
...@@ -111,11 +126,19 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -111,11 +126,19 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
// kill the precomp timeout and start a realtime timeout // kill the precomp timeout and start a realtime timeout
r.DenoteRoundCompleted() r.DenoteRoundCompleted()
go waitForRoundTimeout(roundTimeoutChan, state, r, go waitForRoundTimeout(sc.roundTimeoutChan, sc.state, r,
realtimeTimeout, "realtime") sc.realtimeTimeout, "realtime")
startTime := time.Now().Add(sc.realtimeDelay)
nextRoundMinimum := sc.lastRealtime.Add(sc.realtimeDelta)
if nextRoundMinimum.After(startTime) {
startTime = nextRoundMinimum
}
sc.lastRealtime = startTime
// Update the round for realtime transition // Update the round for realtime transition
err = r.Update(states.QUEUED, time.Now().Add(realtimeDelay)) err = r.Update(states.QUEUED, startTime)
if err != nil { if err != nil {
return errors.WithMessagef(err, return errors.WithMessagef(err,
...@@ -124,7 +147,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -124,7 +147,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
} }
// Build the round info and add to the networkState // Build the round info and add to the networkState
err = state.AddRoundUpdate(r.BuildRoundInfo()) err = sc.state.AddRoundUpdate(r.BuildRoundInfo())
if err != nil { if err != nil {
return errors.WithMessagef(err, "Could not issue "+ return errors.WithMessagef(err, "Could not issue "+
"update for round %v transitioning from %s to %s", "update for round %v transitioning from %s to %s",
...@@ -181,7 +204,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -181,7 +204,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
// Build the round info and add to the networkState // Build the round info and add to the networkState
roundInfo := r.BuildRoundInfo() roundInfo := r.BuildRoundInfo()
err = state.AddRoundUpdate(roundInfo) err = sc.state.AddRoundUpdate(roundInfo)
if err != nil { if err != nil {
return errors.WithMessagef(err, "Could not issue "+ return errors.WithMessagef(err, "Could not issue "+
"update for round %v transitioning from %s to %s", "update for round %v transitioning from %s to %s",
...@@ -190,7 +213,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -190,7 +213,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
//send the signal that the round is complete //send the signal that the round is complete
r.DenoteRoundCompleted() r.DenoteRoundCompleted()
roundTracker.RemoveActiveRound(r.GetRoundID()) sc.roundTracker.RemoveActiveRound(r.GetRoundID())
// Store round metric in another thread for completed round // Store round metric in another thread for completed round
go StoreRoundMetric(roundInfo, r.GetRoundState(), r.GetRealtimeCompletedTs()) go StoreRoundMetric(roundInfo, r.GetRoundState(), r.GetRealtimeCompletedTs())
...@@ -206,7 +229,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state ...@@ -206,7 +229,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, state
//send the signal that the round is complete //send the signal that the round is complete
r.DenoteRoundCompleted() r.DenoteRoundCompleted()
n.ClearRound() n.ClearRound()
err = killRound(state, r, update.Error, roundTracker) err = killRound(sc.state, r, update.Error, sc.roundTracker)
} }
return err return err
} }
......
...@@ -73,8 +73,18 @@ func TestHandleNodeStateChance_Waiting(t *testing.T) { ...@@ -73,8 +73,18 @@ func TestHandleNodeStateChance_Waiting(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
...@@ -132,8 +142,18 @@ func TestHandleNodeStateChance_Waiting_SetNodeToOnline(t *testing.T) { ...@@ -132,8 +142,18 @@ func TestHandleNodeStateChance_Waiting_SetNodeToOnline(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
...@@ -196,8 +216,19 @@ func TestHandleNodeStateChance_Standby(t *testing.T) { ...@@ -196,8 +216,19 @@ func TestHandleNodeStateChance_Standby(t *testing.T) {
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
testTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: testTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Waiting pool is full for %d: %v", i, err) t.Errorf("Waiting pool is full for %d: %v", i, err)
} }
...@@ -214,8 +245,18 @@ func TestHandleNodeStateChance_Standby(t *testing.T) { ...@@ -214,8 +245,18 @@ func TestHandleNodeStateChance_Standby(t *testing.T) {
testState.GetNodeMap().GetNode(nodeList[i]).GetPollingLock().Lock() testState.GetNodeMap().GetNode(nodeList[i]).GetPollingLock().Lock()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
nil, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: nil,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Error in standby happy path: %v", err) t.Errorf("Error in standby happy path: %v", err)
} }
...@@ -270,8 +311,18 @@ func TestHandleNodeStateChance_Standby_NoRound(t *testing.T) { ...@@ -270,8 +311,18 @@ func TestHandleNodeStateChance_Standby_NoRound(t *testing.T) {
testTracker := NewRoundTracker() testTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
testTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: testTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err == nil { if err == nil {
t.Errorf("Expected error for %d was not received. Node should not have round", i) t.Errorf("Expected error for %d was not received. Node should not have round", i)
} }
...@@ -337,8 +388,18 @@ func TestHandleNodeUpdates_Completed(t *testing.T) { ...@@ -337,8 +388,18 @@ func TestHandleNodeUpdates_Completed(t *testing.T) {
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
testTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: testTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Waiting pool is full for %d: %v", i, err) t.Errorf("Waiting pool is full for %d: %v", i, err)
} }
...@@ -357,8 +418,18 @@ func TestHandleNodeUpdates_Completed(t *testing.T) { ...@@ -357,8 +418,18 @@ func TestHandleNodeUpdates_Completed(t *testing.T) {
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
testTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: testTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Expected happy path for completed: %v", err) t.Errorf("Expected happy path for completed: %v", err)
} }
...@@ -407,8 +478,18 @@ func TestHandleNodeUpdates_Completed_NoRound(t *testing.T) { ...@@ -407,8 +478,18 @@ func TestHandleNodeUpdates_Completed_NoRound(t *testing.T) {
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
testTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: testTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err == nil { if err == nil {
t.Errorf("Expected error for %d was not received. Node should not have round", i) t.Errorf("Expected error for %d was not received. Node should not have round", i)
} }
...@@ -475,8 +556,18 @@ func TestHandleNodeUpdates_Error(t *testing.T) { ...@@ -475,8 +556,18 @@ func TestHandleNodeUpdates_Error(t *testing.T) {
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
testTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: testTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
...@@ -521,15 +612,24 @@ func TestHandleNodeUpdates_BannedNode(t *testing.T) { ...@@ -521,15 +612,24 @@ func TestHandleNodeUpdates_BannedNode(t *testing.T) {
Node: nodeList[0], Node: nodeList[0],
ToStatus: node.Banned, ToStatus: node.Banned,
} }
testTracker := NewRoundTracker()
// Ban the first node in the state map // Ban the first node in the state map
testState.GetNodeMap().GetNode(nodeList[0]).GetPollingLock().Lock() testState.GetNodeMap().GetNode(nodeList[0]).GetPollingLock().Lock()
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
...@@ -560,8 +660,7 @@ func TestHandleNodeUpdates_BannedNode(t *testing.T) { ...@@ -560,8 +660,7 @@ func TestHandleNodeUpdates_BannedNode(t *testing.T) {
// Ban the the second node in the state map // Ban the the second node in the state map
testState.GetNodeMap().GetNode(nodeList[1]).GetPollingLock().Lock() testState.GetNodeMap().GetNode(nodeList[1]).GetPollingLock().Lock()
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, err = sc.HandleNodeUpdates(testUpdate)
testTracker, timeoutCh, 15*time.Second)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
...@@ -670,8 +769,18 @@ func TestHandleNodeUpdates_Precomputing_RoundError(t *testing.T) { ...@@ -670,8 +769,18 @@ func TestHandleNodeUpdates_Precomputing_RoundError(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err == nil { if err == nil {
t.Errorf("HandleNodeUpdates() did not produce the expected error when"+ t.Errorf("HandleNodeUpdates() did not produce the expected error when"+
"there is no round.\n\texpected: %v\n\treceived: %v", "there is no round.\n\texpected: %v\n\treceived: %v",
...@@ -726,8 +835,18 @@ func TestHandleNodeUpdates_Realtime(t *testing.T) { ...@@ -726,8 +835,18 @@ func TestHandleNodeUpdates_Realtime(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
...@@ -775,8 +894,18 @@ func TestHandleNodeUpdates_Realtime_RoundError(t *testing.T) { ...@@ -775,8 +894,18 @@ func TestHandleNodeUpdates_Realtime_RoundError(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err == nil { if err == nil {
t.Errorf("HandleNodeUpdates() did not produce the expected error when"+ t.Errorf("HandleNodeUpdates() did not produce the expected error when"+
"there is no round.\n\texpected: %v\n\treceived: %v", "there is no round.\n\texpected: %v\n\treceived: %v",
...@@ -835,8 +964,18 @@ func TestHandleNodeUpdates_Realtime_UpdateError(t *testing.T) { ...@@ -835,8 +964,18 @@ func TestHandleNodeUpdates_Realtime_UpdateError(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err == nil { if err == nil {
t.Errorf("HandleNodeUpdates() did not produce the expected error."+ t.Errorf("HandleNodeUpdates() did not produce the expected error."+
"\n\texpected: %v\n\treceived: %v", "\n\texpected: %v\n\treceived: %v",
...@@ -891,8 +1030,18 @@ func TestHandleNodeUpdates_RoundErrored(t *testing.T) { ...@@ -891,8 +1030,18 @@ func TestHandleNodeUpdates_RoundErrored(t *testing.T) {
roundTracker := NewRoundTracker() roundTracker := NewRoundTracker()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
err = HandleNodeUpdates(testUpdate, testPool, testState, 0, sc := &stateChanger{
roundTracker, timeoutCh, 15*time.Second) lastRealtime: time.Unix(0, 0),
realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: testPool,
state: testState,
roundTracker: roundTracker,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Expected no error return!") t.Errorf("Expected no error return!")
} }
...@@ -930,9 +1079,18 @@ func TestHandleNodeUpdates_NOT_STARTED(t *testing.T) { ...@@ -930,9 +1079,18 @@ func TestHandleNodeUpdates_NOT_STARTED(t *testing.T) {
testState.GetNodeMap().GetNode(nodeList[0]).GetPollingLock().Lock() testState.GetNodeMap().GetNode(nodeList[0]).GetPollingLock().Lock()
timeoutCh := make(chan id.Round, 1) timeoutCh := make(chan id.Round, 1)
sc := &stateChanger{
err = HandleNodeUpdates(testUpdate, nil, testState, 0, lastRealtime: time.Unix(0, 0),
nil, timeoutCh, 15*time.Second) realtimeDelay: 0,
realtimeDelta: 0,
realtimeTimeout: 15 * time.Second,
pool: nil,
state: testState,
roundTracker: nil,
roundTimeoutChan: timeoutCh,
}
err = sc.HandleNodeUpdates(testUpdate)
if err != nil { if err != nil {
t.Errorf("Happy path received error: %v", err) t.Errorf("Happy path received error: %v", err)
} }
......
...@@ -158,11 +158,12 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch ...@@ -158,11 +158,12 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch
//begin the thread that starts rounds //begin the thread that starts rounds
go func() { go func() {
paramsCopy := params.SafeCopy()
lastRound := time.Now() lastRound := time.Now()
paramsCopy := params.SafeCopy()
minRoundDelay := (paramsCopy.MinimumDelay * time.Millisecond)/3
var err error var err error
minRoundDelay := paramsCopy.MinimumDelay * time.Millisecond
for newRound := range newRoundChan { for newRound := range newRoundChan {
// To avoid back-to-back teaming, we make sure to sleep until the minimum delay // To avoid back-to-back teaming, we make sure to sleep until the minimum delay
...@@ -194,9 +195,27 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch ...@@ -194,9 +195,27 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch
go trackRounds(state, pool, roundTracker, &iterationsCount) go trackRounds(state, pool, roundTracker, &iterationsCount)
} }
paramsCopy := params.SafeCopy()
sc := &stateChanger{
lastRealtime: time.Unix(0, 0),
realtimeDelay: paramsCopy.RealtimeDelay * time.Millisecond,
realtimeDelta: paramsCopy.MinimumDelay * time.Millisecond,
realtimeTimeout: paramsCopy.RealtimeTimeout * time.Millisecond,
pool: pool,
state: state,
roundTracker: roundTracker,
roundTimeoutChan: roundTimeoutTracker,
}
jww.INFO.Printf("Initialized state changer with: " +
"\n\t realtimeDelay: %s, " +
"\n\t realtimeDelta: %s" +
"\n\t realtimeTimeout: %s", sc.realtimeDelay,
sc.realtimeDelta, sc.realtimeTimeout)
// Start receiving updates from nodes // Start receiving updates from nodes
for true { for true {
paramsCopy := params.SafeCopy()
isRoundTimeout := false isRoundTimeout := false
var update node.UpdateNotification var update node.UpdateNotification
...@@ -227,9 +246,7 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch ...@@ -227,9 +246,7 @@ func Scheduler(params *SafeParams, state *storage.NetworkState, killchan chan ch
var err error var err error
// Handle the node's state change // Handle the node's state change
err = HandleNodeUpdates(update, pool, state, err = sc.HandleNodeUpdates(update)
paramsCopy.RealtimeDelay*time.Millisecond, roundTracker, roundTimeoutTracker,
paramsCopy.RealtimeTimeout*time.Millisecond)
if err != nil { if err != nil {
return err return err
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment