diff --git a/cmd/poll.go b/cmd/poll.go index 546136a242c7fb8debba37a500f0c451b4fdd019..c6a24dc64f0403f64cf38e2ebbef839411bcf2c1 100644 --- a/cmd/poll.go +++ b/cmd/poll.go @@ -119,48 +119,43 @@ func (m *RegistrationImpl) Poll(msg *pb.PermissioningPoll, auth *connect.Auth, return response, err } - // if the node is in not started state, do not produce an update - if activity == current.NOT_STARTED { + // If round creation stopped OR if the node is in not started state, + // return early before we get the polling lock + stopped := atomic.LoadUint32(m.Stopped) == 1 + if activity == current.NOT_STARTED || stopped { return response, err } - // Return early before we get the polling lock if round creation stopped - stopped := atomic.LoadUint32(m.Stopped) == 1 - if !stopped { - // when a node poll is received, the nodes polling lock is taken here. If - // there is no update, it is released in this endpoint, otherwise it is - // released in the scheduling algorithm which blocks all future polls until - // processing completes - n.GetPollingLock().Lock() - - err = verifyError(msg, n, m) - if err != nil { - n.GetPollingLock().Unlock() - return response, err - } + // Ensure any errors are properly formatted before sending an update + err = verifyError(msg, n, m) + if err != nil { + return response, err } - // update does edge checking. It ensures the state change recieved was a - // valid one and the state fo the node and + // when a node poll is received, the nodes polling lock is taken here. If + // there is no update, it is released in this endpoint, otherwise it is + // released in the scheduling algorithm which blocks all future polls until + // processing completes + n.GetPollingLock().Lock() + + // update does edge checking. It ensures the state change received was a + // valid one and the state of the node and // any associated round allows for that change. If the change was not // acceptable, it is not recorded and an error is returned, which is // propagated to the node - update, updateNotification, err := n.Update(current.Activity(msg.Activity)) - if !stopped { - //if updating to an error state, attach the error the the update - if update && err == nil && updateNotification.ToActivity == current.ERROR { - updateNotification.Error = msg.Error - } + isUpdate, updateNotification, err := n.Update(current.Activity(msg.Activity)) + if !isUpdate || err != nil { + n.GetPollingLock().Unlock() + return response, err + } - //if an update ocured, report it to the control thread - if update { - err = m.State.SendUpdateNotification(updateNotification) - } else { - n.GetPollingLock().Unlock() - } + // If updating to an error state, attach the error the the update + if updateNotification.ToActivity == current.ERROR { + updateNotification.Error = msg.Error } - return response, err + // Update occurred, report it to the control thread + return response, m.State.SendUpdateNotification(updateNotification) } // PollNdf handles the client polling for an updated NDF diff --git a/scheduling/permute.go b/scheduling/permute.go index 37b39e7ef10757a317f9bcd8a3cab9796568c024..0aeeb3265e4d44acdc6f542d31eed20cb4b98e1c 100644 --- a/scheduling/permute.go +++ b/scheduling/permute.go @@ -19,23 +19,23 @@ func Permute(items []*node.State) [][]*node.State { var output [][]*node.State // Place inline to make appending output easier - helper = func(items []*node.State, n int) { - if n == 1 { + helper = func(items []*node.State, numItems int) { + if numItems == 1 { // Create a copy and append the copy to the output ourCopy := make([]*node.State, len(items)) copy(ourCopy, items) output = append(output, ourCopy) } else { - for i := 0; i < n; i++ { - helper(items, n-1) + for i := 0; i < numItems; i++ { + helper(items, numItems-1) // Swap choice dependent on parity of k (even or odd) - if n%2 == 1 { + if numItems%2 == 1 { // Swap the values - items[i], items[n-1] = items[n-1], items[i] + items[i], items[numItems-1] = items[numItems-1], items[i] } else { // Swap the values - items[0], items[n-1] = items[n-1], items[0] + items[0], items[numItems-1] = items[numItems-1], items[0] } } diff --git a/scheduling/schedule.go b/scheduling/schedule.go index 45937b9331db585b3a2cb554a8a0373aa721a58e..3b2d9f252bf2616c218cef081e5392ccb5f27961 100644 --- a/scheduling/schedule.go +++ b/scheduling/schedule.go @@ -17,6 +17,7 @@ import ( "gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/registration/storage" "gitlab.com/elixxir/registration/storage/node" + "gitlab.com/elixxir/registration/storage/round" "sync/atomic" "time" ) @@ -95,14 +96,13 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st } lastRound = time.Now() - err = startRound(newRound, state, roundTracker) + ourRound, err := startRound(newRound, state, roundTracker) if err != nil { break } - go func(roundID id.Round) { + go func(roundID id.Round, localRound *round.State) { // Allow for round the to be added to the map - ourRound := state.GetRoundMap().GetRound(roundID) roundTimer := time.NewTimer(params.RoundTimeout * time.Second) select { // Wait for the timer to go off @@ -113,10 +113,11 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st roundTimeoutTracker <- roundID // Signals the round has been completed. // In this case, we can exit the go-routine - case <-ourRound.GetRoundCompletedChan(): + case <-localRound.GetRoundCompletedChan(): + state.GetRoundMap().DeleteRound(roundID) return } - }(newRound.ID) + }(newRound.ID, ourRound) } jww.ERROR.Printf("Round creation thread should never exit: %s", err) diff --git a/scheduling/semiOptimalOrdering.go b/scheduling/semiOptimalOrdering.go index 1a9bb2d3f493f2ad66aae9a83b86b255b386292d..85ab3519509b65e6f4462f6f3aa31191d7aa2ef1 100644 --- a/scheduling/semiOptimalOrdering.go +++ b/scheduling/semiOptimalOrdering.go @@ -159,7 +159,7 @@ func getRegion(region string) (int, error) { return WesternEurope, nil case "CentralEurope": return CentralEurope, nil - case "EasternEurope": + case "EasternEurope", "EastEurope": return EasternEurope, nil case "MiddleEast": return MiddleEast, nil diff --git a/scheduling/startRound.go b/scheduling/startRound.go index e49f21c9d408c7ab18008485d4fd52d37edb8306..95cbcd17a8acfdcee9960564e830ed9bbbbf0a9b 100644 --- a/scheduling/startRound.go +++ b/scheduling/startRound.go @@ -10,25 +10,26 @@ import ( "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/registration/storage" + "gitlab.com/elixxir/registration/storage/round" "time" ) // startRound is a function which takes the info from createSimpleRound and updates the // node and network states in order to begin the round -func startRound(round protoRound, state *storage.NetworkState, roundTracker *RoundTracker) error { +func startRound(round protoRound, state *storage.NetworkState, roundTracker *RoundTracker) (*round.State, error) { // Add the round to the manager r, err := state.GetRoundMap().AddRound(round.ID, round.BatchSize, round.ResourceQueueTimeout, round.Topology) if err != nil { err = errors.WithMessagef(err, "Failed to create new round %v", round.ID) - return err + return nil, err } // Move the round to precomputing err = r.Update(states.PRECOMPUTING, time.Now()) if err != nil { err = errors.WithMessagef(err, "Could not move new round into %s", states.PRECOMPUTING) - return err + return nil, err } // Tag all nodes to the round @@ -38,7 +39,7 @@ func startRound(round protoRound, state *storage.NetworkState, roundTracker *Rou err := n.SetRound(r) if err != nil { err = errors.WithMessagef(err, "could not add round %v to node %s", r.GetRoundID(), n.GetID()) - return err + return nil, err } } @@ -47,11 +48,11 @@ func startRound(round protoRound, state *storage.NetworkState, roundTracker *Rou if err != nil { err = errors.WithMessagef(err, "Could not issue "+ "update to create round %v", r.GetRoundID()) - return err + return nil, err } // Add round to active set of rounds roundTracker.AddActiveRound(r.GetRoundID()) - return nil + return r, nil } diff --git a/scheduling/startRound_test.go b/scheduling/startRound_test.go index 713d381ab931e83d41da442cdf6267f130ecb92a..38d60d709646f2d46b73f461a6529e0270ddf3db 100644 --- a/scheduling/startRound_test.go +++ b/scheduling/startRound_test.go @@ -68,7 +68,7 @@ func TestStartRound(t *testing.T) { testTracker := NewRoundTracker() - err = startRound(testProtoRound, testState, testTracker) + _, err = startRound(testProtoRound, testState, testTracker) if err != nil { t.Errorf("Received error from startRound(): %v", err) } @@ -134,7 +134,7 @@ func TestStartRound_BadState(t *testing.T) { testTracker := NewRoundTracker() - err = startRound(testProtoRound, testState, testTracker) + _, err = startRound(testProtoRound, testState, testTracker) if err == nil { t.Errorf("Expected error. Artificially created round " + "should make starting precomputing impossible") @@ -199,7 +199,7 @@ func TestStartRound_BadNode(t *testing.T) { testProtoRound.NodeStateList[0].SetRound(badState) testTracker := NewRoundTracker() - err = startRound(testProtoRound, testState, testTracker) + _, err = startRound(testProtoRound, testState, testTracker) if err == nil { t.Log(err) t.Errorf("Expected error. Artificially created round " + diff --git a/storage/round/map.go b/storage/round/map.go index 7016474130d175fa9d099127edde4188ca1ff152..010f2ceae1671e61e9bf980ce4afaeda72595a7a 100644 --- a/storage/round/map.go +++ b/storage/round/map.go @@ -52,6 +52,18 @@ func (rsm *StateMap) GetRound(id id.Round) *State { return rsm.rounds[id] } +// add a schedule to delete timestamp + +// Cleans out rounds from round map. +// ONLY to be used upon round completion +func (rsm *StateMap) DeleteRound(id id.Round) { + // Delete the round from the map + rsm.mux.Lock() + delete(rsm.rounds, id) + rsm.mux.Unlock() + return +} + //adds rounds for testing without checks func (rsm *StateMap) AddRound_Testing(state *State, t *testing.T) { if t == nil { diff --git a/storage/round/map_test.go b/storage/round/map_test.go index 6f36d4c5970ff8fe4a3485d0dc4244d89d81f053..b64d5175f643c207f03d7986f98c9a3f98a02ba9 100644 --- a/storage/round/map_test.go +++ b/storage/round/map_test.go @@ -114,6 +114,24 @@ func TestStateMap_GetNode_invalid(t *testing.T) { } } +// Happy path +func TestStateMap_DeleteRound(t *testing.T) { + sm := &StateMap{ + rounds: make(map[id.Round]*State), + } + rid := id.Round(2) + sm.rounds[rid] = &State{} + + sm.DeleteRound(rid) + + // Check that the round is empty upon deletion + r := sm.GetRound(rid) + if r != nil { + t.Errorf("Round retrieved after deletion from map") + } + +} + func buildMockTopology(numNodes int, t *testing.T) *connect.Circuit { nodeLst := make([]*id.ID, numNodes) for i := 0; i < numNodes; i++ {