Skip to content
Snippets Groups Projects
Commit fd49b8a9 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished implementing roundTimeouts

parent d8c86cf4
No related branches found
No related tags found
No related merge requests found
// Code generated by go generate; DO NOT EDIT. // Code generated by go generate; DO NOT EDIT.
// This file was generated by robots at // This file was generated by robots at
// 2020-06-19 10:22:16.298262 -0700 PDT m=+0.030866691 // 2020-06-22 14:09:27.0710071 -0700 PDT m=+0.231969901
package cmd package cmd
const GITVERSION = `ca9242c Merge branch 'hotfix/DbIndexing' into 'release'` const GITVERSION = `d8c86cf WIP: Timeout round`
const SEMVER = "1.2.1" const SEMVER = "1.2.1"
const DEPENDENCIES = `module gitlab.com/elixxir/registration const DEPENDENCIES = `module gitlab.com/elixxir/registration
......
...@@ -7,7 +7,6 @@ require ( ...@@ -7,7 +7,6 @@ require (
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9
github.com/go-sql-driver/mysql v1.5.0 // indirect github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang/protobuf v1.4.2
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/jinzhu/gorm v1.9.12 github.com/jinzhu/gorm v1.9.12
github.com/jinzhu/now v1.1.1 // indirect github.com/jinzhu/now v1.1.1 // indirect
...@@ -28,8 +27,6 @@ require ( ...@@ -28,8 +27,6 @@ require (
gitlab.com/elixxir/comms v0.0.0-20200617200816-e0360e7c282e gitlab.com/elixxir/comms v0.0.0-20200617200816-e0360e7c282e
gitlab.com/elixxir/crypto v0.0.0-20200617200757-fac8d7aaf8da gitlab.com/elixxir/crypto v0.0.0-20200617200757-fac8d7aaf8da
gitlab.com/elixxir/primitives v0.0.0-20200617192826-98601f119cdf gitlab.com/elixxir/primitives v0.0.0-20200617192826-98601f119cdf
golang.org/x/net v0.0.0-20200513185701-a91f0712d120
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587 // indirect google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587 // indirect
google.golang.org/grpc v1.29.1
gopkg.in/ini.v1 v1.56.0 // indirect gopkg.in/ini.v1 v1.56.0 // indirect
) )
...@@ -153,6 +153,10 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, ...@@ -153,6 +153,10 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool,
"Could not move round %v from %s to %s", "Could not move round %v from %s to %s",
r.GetRoundID(), states.REALTIME, states.COMPLETED) r.GetRoundID(), states.REALTIME, states.COMPLETED)
} }
//send the signal that the round is complete
r.GetRoundCompletedChan() <- struct{}{}
// Build the round info and add to the networkState // Build the round info and add to the networkState
roundInfo := r.BuildRoundInfo() roundInfo := r.BuildRoundInfo()
err = state.AddRoundUpdate(roundInfo) err = state.AddRoundUpdate(roundInfo)
...@@ -171,6 +175,8 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, ...@@ -171,6 +175,8 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool,
// If in an error state, kill the round if the node has one // If in an error state, kill the round if the node has one
var err error var err error
if hasRound { if hasRound {
//send the signal that the round is complete
r.GetRoundCompletedChan() <- struct{}{}
err = killRound(state, r, n, update.Error) err = killRound(state, r, n, update.Error)
} }
return false, err return false, err
......
...@@ -73,7 +73,7 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -73,7 +73,7 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
} }
// Channel to communicate that a round has timed out // Channel to communicate that a round has timed out
roundTimeoutTracker := make(chan id.Round) roundTimeoutTracker := make(chan id.Round, 1000)
//begin the thread that starts rounds //begin the thread that starts rounds
go func() { go func() {
...@@ -85,9 +85,8 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -85,9 +85,8 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
time.Sleep(timeDiff) time.Sleep(timeDiff)
} }
lastRound = time.Now() lastRound = time.Now()
roundCompletionChan := make(chan<- struct{}, (params.TeamSize*params.TeamSize)+1)
err = startRound(newRound, state, roundCompletionChan) err = startRound(newRound, state)
if err != nil { if err != nil {
break break
} }
...@@ -106,7 +105,6 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -106,7 +105,6 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
case <-ourRound.GetRoundCompletedChan(): case <-ourRound.GetRoundCompletedChan():
return return
} }
}() }()
} }
...@@ -124,10 +122,9 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -124,10 +122,9 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
go trackRounds(params, state, pool) go trackRounds(params, state, pool)
} }
isRoundTimeout := false
// Start receiving updates from nodes // Start receiving updates from nodes
for true { for true {
isRoundTimeout := false
var update node.UpdateNotification var update node.UpdateNotification
var timedOutRoundID id.Round var timedOutRoundID id.Round
select { select {
...@@ -140,21 +137,23 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -140,21 +137,23 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
isRoundTimeout = true isRoundTimeout = true
} }
endRound := false
if isRoundTimeout { if isRoundTimeout {
// Handle the timed out round // Handle the timed out round
err := timeoutRound(state, timedOutRoundID) err := timeoutRound(state, timedOutRoundID)
if err != nil { if err != nil {
return err return err
} }
// Reset to false for next round to time out endRound = true
isRoundTimeout = false } else {
} var err error
// Handle the node's state change // Handle the node's state change
endRound, err := HandleNodeUpdates(update, pool, state, rtDelay) endRound, err = HandleNodeUpdates(update, pool, state, rtDelay)
if err != nil { if err != nil {
return err return err
} }
}
// Remove offline nodes from pool to more accurately determine if pool is eligible for round creation // Remove offline nodes from pool to more accurately determine if pool is eligible for round creation
pool.CleanOfflineNodes(params.NodeCleanUpInterval * time.Second) pool.CleanOfflineNodes(params.NodeCleanUpInterval * time.Second)
......
...@@ -15,10 +15,10 @@ import ( ...@@ -15,10 +15,10 @@ import (
// startRound is a function which takes the info from createSimpleRound and updates the // startRound is a function which takes the info from createSimpleRound and updates the
// node and network states in order to begin the round // node and network states in order to begin the round
func startRound(round protoRound, state *storage.NetworkState, roundCompleteChan chan<- struct{}) error { func startRound(round protoRound, state *storage.NetworkState) error {
// Add the round to the manager // Add the round to the manager
r, err := state.GetRoundMap().AddRound(round.ID, round.BatchSize, round.ResourceQueueTimeout, r, err := state.GetRoundMap().AddRound(round.ID, round.BatchSize, round.ResourceQueueTimeout,
round.Topology, roundCompleteChan) round.Topology)
if err != nil { if err != nil {
err = errors.WithMessagef(err, "Failed to create new round %v", round.ID) err = errors.WithMessagef(err, "Failed to create new round %v", round.ID)
return err return err
......
...@@ -32,7 +32,7 @@ func NewStateMap() *StateMap { ...@@ -32,7 +32,7 @@ func NewStateMap() *StateMap {
// Adds a new round state to the structure. Will not overwrite an existing one. // Adds a new round state to the structure. Will not overwrite an existing one.
func (rsm *StateMap) AddRound(id id.Round, batchsize uint32, resourceQueueTimeout time.Duration, func (rsm *StateMap) AddRound(id id.Round, batchsize uint32, resourceQueueTimeout time.Duration,
topology *connect.Circuit, roundCompleteChan chan<- struct{}) (*State, error) { topology *connect.Circuit) (*State, error) {
rsm.mux.Lock() rsm.mux.Lock()
defer rsm.mux.Unlock() defer rsm.mux.Unlock()
......
...@@ -54,7 +54,7 @@ func newState(id id.Round, batchsize uint32, resourceQueueTimeout time.Duration, ...@@ -54,7 +54,7 @@ func newState(id id.Round, batchsize uint32, resourceQueueTimeout time.Duration,
timestamps := make([]uint64, states.NUM_STATES) timestamps := make([]uint64, states.NUM_STATES)
timestamps[states.PENDING] = uint64(pendingTs.Unix()) timestamps[states.PENDING] = uint64(pendingTs.Unix())
roundCompleteChan := make(chan struct{}) roundCompleteChan := make(chan struct{}, topology.Len()*topology.Len()+1)
//build and return the round state object //build and return the round state object
return &State{ return &State{
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment