Skip to content
Snippets Groups Projects
Commit 853bcdab authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'XX-2875/ClientErrMaster' into 'master'

port xx-2875 partially to master

See merge request elixxir/registration!247
parents be5dcc89 3aa5051f
No related branches found
No related tags found
No related merge requests found
...@@ -119,48 +119,43 @@ func (m *RegistrationImpl) Poll(msg *pb.PermissioningPoll, auth *connect.Auth, ...@@ -119,48 +119,43 @@ func (m *RegistrationImpl) Poll(msg *pb.PermissioningPoll, auth *connect.Auth,
return response, err return response, err
} }
// if the node is in not started state, do not produce an update // If round creation stopped OR if the node is in not started state,
if activity == current.NOT_STARTED { // return early before we get the polling lock
stopped := atomic.LoadUint32(m.Stopped) == 1
if activity == current.NOT_STARTED || stopped {
return response, err
}
// Ensure any errors are properly formatted before sending an update
err = verifyError(msg, n, m)
if err != nil {
return response, err return response, err
} }
// Return early before we get the polling lock if round creation stopped
stopped := atomic.LoadUint32(m.Stopped) == 1
if !stopped {
// when a node poll is received, the nodes polling lock is taken here. If // when a node poll is received, the nodes polling lock is taken here. If
// there is no update, it is released in this endpoint, otherwise it is // there is no update, it is released in this endpoint, otherwise it is
// released in the scheduling algorithm which blocks all future polls until // released in the scheduling algorithm which blocks all future polls until
// processing completes // processing completes
n.GetPollingLock().Lock() n.GetPollingLock().Lock()
err = verifyError(msg, n, m) // update does edge checking. It ensures the state change received was a
if err != nil { // valid one and the state of the node and
n.GetPollingLock().Unlock()
return response, err
}
}
// update does edge checking. It ensures the state change recieved was a
// valid one and the state fo the node and
// any associated round allows for that change. If the change was not // any associated round allows for that change. If the change was not
// acceptable, it is not recorded and an error is returned, which is // acceptable, it is not recorded and an error is returned, which is
// propagated to the node // propagated to the node
update, updateNotification, err := n.Update(current.Activity(msg.Activity)) isUpdate, updateNotification, err := n.Update(current.Activity(msg.Activity))
if !stopped { if !isUpdate || err != nil {
//if updating to an error state, attach the error the the update
if update && err == nil && updateNotification.ToActivity == current.ERROR {
updateNotification.Error = msg.Error
}
//if an update ocured, report it to the control thread
if update {
err = m.State.SendUpdateNotification(updateNotification)
} else {
n.GetPollingLock().Unlock() n.GetPollingLock().Unlock()
return response, err
} }
// If updating to an error state, attach the error the the update
if updateNotification.ToActivity == current.ERROR {
updateNotification.Error = msg.Error
} }
return response, err // Update occurred, report it to the control thread
return response, m.State.SendUpdateNotification(updateNotification)
} }
// PollNdf handles the client polling for an updated NDF // PollNdf handles the client polling for an updated NDF
......
...@@ -19,23 +19,23 @@ func Permute(items []*node.State) [][]*node.State { ...@@ -19,23 +19,23 @@ func Permute(items []*node.State) [][]*node.State {
var output [][]*node.State var output [][]*node.State
// Place inline to make appending output easier // Place inline to make appending output easier
helper = func(items []*node.State, n int) { helper = func(items []*node.State, numItems int) {
if n == 1 { if numItems == 1 {
// Create a copy and append the copy to the output // Create a copy and append the copy to the output
ourCopy := make([]*node.State, len(items)) ourCopy := make([]*node.State, len(items))
copy(ourCopy, items) copy(ourCopy, items)
output = append(output, ourCopy) output = append(output, ourCopy)
} else { } else {
for i := 0; i < n; i++ { for i := 0; i < numItems; i++ {
helper(items, n-1) helper(items, numItems-1)
// Swap choice dependent on parity of k (even or odd) // Swap choice dependent on parity of k (even or odd)
if n%2 == 1 { if numItems%2 == 1 {
// Swap the values // Swap the values
items[i], items[n-1] = items[n-1], items[i] items[i], items[numItems-1] = items[numItems-1], items[i]
} else { } else {
// Swap the values // Swap the values
items[0], items[n-1] = items[n-1], items[0] items[0], items[numItems-1] = items[numItems-1], items[0]
} }
} }
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/primitives/states"
"gitlab.com/elixxir/registration/storage" "gitlab.com/elixxir/registration/storage"
"gitlab.com/elixxir/registration/storage/node" "gitlab.com/elixxir/registration/storage/node"
"gitlab.com/elixxir/registration/storage/round"
"sync/atomic" "sync/atomic"
"time" "time"
) )
...@@ -95,14 +96,13 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -95,14 +96,13 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
} }
lastRound = time.Now() lastRound = time.Now()
err = startRound(newRound, state, roundTracker) ourRound, err := startRound(newRound, state, roundTracker)
if err != nil { if err != nil {
break break
} }
go func(roundID id.Round) { go func(roundID id.Round, localRound *round.State) {
// Allow for round the to be added to the map // Allow for round the to be added to the map
ourRound := state.GetRoundMap().GetRound(roundID)
roundTimer := time.NewTimer(params.RoundTimeout * time.Second) roundTimer := time.NewTimer(params.RoundTimeout * time.Second)
select { select {
// Wait for the timer to go off // Wait for the timer to go off
...@@ -113,10 +113,11 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st ...@@ -113,10 +113,11 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st
roundTimeoutTracker <- roundID roundTimeoutTracker <- roundID
// Signals the round has been completed. // Signals the round has been completed.
// In this case, we can exit the go-routine // In this case, we can exit the go-routine
case <-ourRound.GetRoundCompletedChan(): case <-localRound.GetRoundCompletedChan():
state.GetRoundMap().DeleteRound(roundID)
return return
} }
}(newRound.ID) }(newRound.ID, ourRound)
} }
jww.ERROR.Printf("Round creation thread should never exit: %s", err) jww.ERROR.Printf("Round creation thread should never exit: %s", err)
......
...@@ -159,7 +159,7 @@ func getRegion(region string) (int, error) { ...@@ -159,7 +159,7 @@ func getRegion(region string) (int, error) {
return WesternEurope, nil return WesternEurope, nil
case "CentralEurope": case "CentralEurope":
return CentralEurope, nil return CentralEurope, nil
case "EasternEurope": case "EasternEurope", "EastEurope":
return EasternEurope, nil return EasternEurope, nil
case "MiddleEast": case "MiddleEast":
return MiddleEast, nil return MiddleEast, nil
......
...@@ -10,25 +10,26 @@ import ( ...@@ -10,25 +10,26 @@ import (
"github.com/spf13/jwalterweatherman" "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/primitives/states" "gitlab.com/elixxir/primitives/states"
"gitlab.com/elixxir/registration/storage" "gitlab.com/elixxir/registration/storage"
"gitlab.com/elixxir/registration/storage/round"
"time" "time"
) )
// startRound is a function which takes the info from createSimpleRound and updates the // startRound is a function which takes the info from createSimpleRound and updates the
// node and network states in order to begin the round // node and network states in order to begin the round
func startRound(round protoRound, state *storage.NetworkState, roundTracker *RoundTracker) error { func startRound(round protoRound, state *storage.NetworkState, roundTracker *RoundTracker) (*round.State, error) {
// Add the round to the manager // Add the round to the manager
r, err := state.GetRoundMap().AddRound(round.ID, round.BatchSize, round.ResourceQueueTimeout, r, err := state.GetRoundMap().AddRound(round.ID, round.BatchSize, round.ResourceQueueTimeout,
round.Topology) round.Topology)
if err != nil { if err != nil {
err = errors.WithMessagef(err, "Failed to create new round %v", round.ID) err = errors.WithMessagef(err, "Failed to create new round %v", round.ID)
return err return nil, err
} }
// Move the round to precomputing // Move the round to precomputing
err = r.Update(states.PRECOMPUTING, time.Now()) err = r.Update(states.PRECOMPUTING, time.Now())
if err != nil { if err != nil {
err = errors.WithMessagef(err, "Could not move new round into %s", states.PRECOMPUTING) err = errors.WithMessagef(err, "Could not move new round into %s", states.PRECOMPUTING)
return err return nil, err
} }
// Tag all nodes to the round // Tag all nodes to the round
...@@ -38,7 +39,7 @@ func startRound(round protoRound, state *storage.NetworkState, roundTracker *Rou ...@@ -38,7 +39,7 @@ func startRound(round protoRound, state *storage.NetworkState, roundTracker *Rou
err := n.SetRound(r) err := n.SetRound(r)
if err != nil { if err != nil {
err = errors.WithMessagef(err, "could not add round %v to node %s", r.GetRoundID(), n.GetID()) err = errors.WithMessagef(err, "could not add round %v to node %s", r.GetRoundID(), n.GetID())
return err return nil, err
} }
} }
...@@ -47,11 +48,11 @@ func startRound(round protoRound, state *storage.NetworkState, roundTracker *Rou ...@@ -47,11 +48,11 @@ func startRound(round protoRound, state *storage.NetworkState, roundTracker *Rou
if err != nil { if err != nil {
err = errors.WithMessagef(err, "Could not issue "+ err = errors.WithMessagef(err, "Could not issue "+
"update to create round %v", r.GetRoundID()) "update to create round %v", r.GetRoundID())
return err return nil, err
} }
// Add round to active set of rounds // Add round to active set of rounds
roundTracker.AddActiveRound(r.GetRoundID()) roundTracker.AddActiveRound(r.GetRoundID())
return nil return r, nil
} }
...@@ -68,7 +68,7 @@ func TestStartRound(t *testing.T) { ...@@ -68,7 +68,7 @@ func TestStartRound(t *testing.T) {
testTracker := NewRoundTracker() testTracker := NewRoundTracker()
err = startRound(testProtoRound, testState, testTracker) _, err = startRound(testProtoRound, testState, testTracker)
if err != nil { if err != nil {
t.Errorf("Received error from startRound(): %v", err) t.Errorf("Received error from startRound(): %v", err)
} }
...@@ -134,7 +134,7 @@ func TestStartRound_BadState(t *testing.T) { ...@@ -134,7 +134,7 @@ func TestStartRound_BadState(t *testing.T) {
testTracker := NewRoundTracker() testTracker := NewRoundTracker()
err = startRound(testProtoRound, testState, testTracker) _, err = startRound(testProtoRound, testState, testTracker)
if err == nil { if err == nil {
t.Errorf("Expected error. Artificially created round " + t.Errorf("Expected error. Artificially created round " +
"should make starting precomputing impossible") "should make starting precomputing impossible")
...@@ -199,7 +199,7 @@ func TestStartRound_BadNode(t *testing.T) { ...@@ -199,7 +199,7 @@ func TestStartRound_BadNode(t *testing.T) {
testProtoRound.NodeStateList[0].SetRound(badState) testProtoRound.NodeStateList[0].SetRound(badState)
testTracker := NewRoundTracker() testTracker := NewRoundTracker()
err = startRound(testProtoRound, testState, testTracker) _, err = startRound(testProtoRound, testState, testTracker)
if err == nil { if err == nil {
t.Log(err) t.Log(err)
t.Errorf("Expected error. Artificially created round " + t.Errorf("Expected error. Artificially created round " +
......
...@@ -52,6 +52,18 @@ func (rsm *StateMap) GetRound(id id.Round) *State { ...@@ -52,6 +52,18 @@ func (rsm *StateMap) GetRound(id id.Round) *State {
return rsm.rounds[id] return rsm.rounds[id]
} }
// add a schedule to delete timestamp
// Cleans out rounds from round map.
// ONLY to be used upon round completion
func (rsm *StateMap) DeleteRound(id id.Round) {
// Delete the round from the map
rsm.mux.Lock()
delete(rsm.rounds, id)
rsm.mux.Unlock()
return
}
//adds rounds for testing without checks //adds rounds for testing without checks
func (rsm *StateMap) AddRound_Testing(state *State, t *testing.T) { func (rsm *StateMap) AddRound_Testing(state *State, t *testing.T) {
if t == nil { if t == nil {
......
...@@ -114,6 +114,24 @@ func TestStateMap_GetNode_invalid(t *testing.T) { ...@@ -114,6 +114,24 @@ func TestStateMap_GetNode_invalid(t *testing.T) {
} }
} }
// Happy path
func TestStateMap_DeleteRound(t *testing.T) {
sm := &StateMap{
rounds: make(map[id.Round]*State),
}
rid := id.Round(2)
sm.rounds[rid] = &State{}
sm.DeleteRound(rid)
// Check that the round is empty upon deletion
r := sm.GetRound(rid)
if r != nil {
t.Errorf("Round retrieved after deletion from map")
}
}
func buildMockTopology(numNodes int, t *testing.T) *connect.Circuit { func buildMockTopology(numNodes int, t *testing.T) *connect.Circuit {
nodeLst := make([]*id.ID, numNodes) nodeLst := make([]*id.ID, numNodes)
for i := 0; i < numNodes; i++ { for i := 0; i < numNodes; i++ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment