diff --git a/cmd/impl.go b/cmd/impl.go index cb772f88817df37cd87e5263b36fe6bf33adbc34..b6db6dc8f47eb55eadddbcccea15d94ebefe9917 100644 --- a/cmd/impl.go +++ b/cmd/impl.go @@ -281,134 +281,45 @@ func NewImplementation(instance *RegistrationImpl) *registration.Implementation impl.Functions.RegisterUser = func( registrationCode, pubKey string) (signature []byte, err error) { - result := make(chan bool) - - var response []byte - - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("Register User crash recovered: %+v", r) - jww.ERROR.Printf("Register User crash recovered: %+v", r) - result <- true - } - }() - - response, err = instance.RegisterUser(registrationCode, pubKey) - if err != nil { - jww.ERROR.Printf("RegisterUser error: %+v", err) - } - result <- true - }() - - <-result - + response, err := instance.RegisterUser(registrationCode, pubKey) + if err != nil { + jww.ERROR.Printf("RegisterUser error: %+v", err) + } return response, err } impl.Functions.GetCurrentClientVersion = func() (version string, err error) { - result := make(chan bool) - - var response string - - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("GetCurrentClientVersion crash recovered: %+v", r) - jww.ERROR.Printf("GetCurrentClientVersion crash recovered: %+v", r) - result <- true - } - }() - - response, err = instance.GetCurrentClientVersion() - if err != nil { - jww.ERROR.Printf("GetCurrentClientVersion error: %+v", err) - } - result <- true - }() - - <-result + response, err := instance.GetCurrentClientVersion() + if err != nil { + jww.ERROR.Printf("GetCurrentClientVersion error: %+v", err) + } return response, err } impl.Functions.RegisterNode = func(ID *id.ID, ServerAddr, ServerTlsCert, GatewayAddr, GatewayTlsCert, RegistrationCode string) error { - result := make(chan bool) - var err error - - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("RegisterNode crash recovered: %+v", r) - jww.ERROR.Printf("RegisterNode crash recovered: %+v", r) - result <- true - } - }() - - err = instance.RegisterNode(ID, ServerAddr, - ServerTlsCert, GatewayAddr, GatewayTlsCert, RegistrationCode) - if err != nil { - jww.ERROR.Printf("RegisterNode error: %+v", err) - } - result <- true - }() - - <-result + err := instance.RegisterNode(ID, ServerAddr, + ServerTlsCert, GatewayAddr, GatewayTlsCert, RegistrationCode) + if err != nil { + jww.ERROR.Printf("RegisterNode error: %+v", err) + } return err } impl.Functions.PollNdf = func(theirNdfHash []byte, auth *connect.Auth) ([]byte, error) { - result := make(chan bool) - var err error - var response []byte - - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("PollNdf crash recovered: %+v", r) - jww.ERROR.Printf("PollNdf crash recovered: %+v", r) - result <- true - } - }() - - response, err = instance.PollNdf(theirNdfHash, auth) - if err != nil && err.Error() != ndf.NO_NDF { - jww.ERROR.Printf("PollNdf error: %+v", err) - } - result <- true - }() - - <-result + response, err := instance.PollNdf(theirNdfHash, auth) + if err != nil && err.Error() != ndf.NO_NDF { + jww.ERROR.Printf("PollNdf error: %+v", err) + } return response, err } impl.Functions.Poll = func(msg *pb.PermissioningPoll, auth *connect.Auth, serverAddress string) (*pb.PermissionPollResponse, error) { //ensure a bad poll can not take down the permisisoning server - result := make(chan bool) - - response := &pb.PermissionPollResponse{} - var err error - - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("Unified Poll crash recovered: %+v", r) - jww.ERROR.Printf("Unified Poll crash recovered: %+v", r) - result <- true - } - }() - - response, err = instance.Poll(msg, auth, serverAddress) - if err != nil && err.Error() != ndf.NO_NDF { - jww.ERROR.Printf("Unified Poll error: %+v", err) - } - result <- true - }() - - <-result + response, err := instance.Poll(msg, auth, serverAddress) return response, err } @@ -416,48 +327,13 @@ func NewImplementation(instance *RegistrationImpl) *registration.Implementation // This comm is not authenticated as servers call this early in their //lifecycle to check if they've already registered impl.Functions.CheckRegistration = func(msg *pb.RegisteredNodeCheck) (confirmation *pb.RegisteredNodeConfirmation, e error) { - result := make(chan bool) - - var response bool - var err error - - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("Check Node Registration crash recovered: %+v", r) - jww.ERROR.Printf("Check Node Registration crash recovered: %+v", r) - result <- true - } - }() - - response = instance.CheckNodeRegistration(msg.RegCode) - result <- true - }() - - <-result + response := instance.CheckNodeRegistration(msg.RegCode) // Returning any errors, such as database errors, would result in too much // leaked data for a public call. - return &pb.RegisteredNodeConfirmation{IsRegistered: response}, err + return &pb.RegisteredNodeConfirmation{IsRegistered: response}, nil } return impl } - -func recoverable(f func() error, source string) error { - result := make(chan bool) - var err error - go func() { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("crash recovered: %+v, %+v", source, r) - result <- true - } - }() - err = f() - result <- true - }() - <-result - return err -} diff --git a/cmd/impl_test.go b/cmd/impl_test.go deleted file mode 100644 index faeb7f979b47e7a90ca71d6e9fafda0845fc6f5b..0000000000000000000000000000000000000000 --- a/cmd/impl_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package cmd - -import ( - "github.com/pkg/errors" - "strings" - "testing" -) - -func TestRecoverable(t *testing.T) { - panicfunc := func() error { - panic("Failed") - } - normfunc := func() error { - return errors.New("Error message two") - } - - err1 := recoverable(panicfunc, "test str one") - - if err1 == nil { - t.Error("Recovery did not succeed") - } - if !strings.Contains(err1.Error(), "test str one") { - t.Error("Did not return proper error") - } - - err2 := recoverable(normfunc, "test str") - - if err2 == nil { - t.Error("Recovery did not succeed second time") - } - if !strings.Contains(err2.Error(), "Error message two") { - t.Error("Did not receive correct error message") - } -} diff --git a/cmd/permissioning.go b/cmd/permissioning.go index 62e73614014f0d24cb51a46206e3cb47deb40e7c..57bf9f98de57b149a3ce0740f91e5063ff588320 100644 --- a/cmd/permissioning.go +++ b/cmd/permissioning.go @@ -156,12 +156,13 @@ func (m *RegistrationImpl) completeNodeRegistration(regCode string) error { m.numRegistered++ - jww.INFO.Printf("Registered %d node(s)! %s", m.numRegistered, regCode) + jww.INFO.Printf("Registered %d node(s)!", m.numRegistered) // Add the new node to the topology m.NDFLock.Lock() networkDef := m.State.GetFullNdf().Get() - gateway, node, order, err := assembleNdf(regCode) + gateway, n, order, err := assembleNdf(regCode) + if err != nil { m.NDFLock.Unlock() err := errors.Errorf("unable to assemble topology: %+v", err) @@ -177,10 +178,10 @@ func (m *RegistrationImpl) completeNodeRegistration(regCode string) error { } networkDef.Gateways[order] = gateway - networkDef.Nodes[order] = node + networkDef.Nodes[order] = n } else { networkDef.Gateways = append(networkDef.Gateways, gateway) - networkDef.Nodes = append(networkDef.Nodes, node) + networkDef.Nodes = append(networkDef.Nodes, n) } // update the internal state with the newly-updated ndf @@ -244,12 +245,14 @@ func assembleNdf(code string) (ndf.Gateway, ndf.Node, int, error) { return ndf.Gateway{}, ndf.Node{}, 0, errors.Errorf("Error parsing node ID: %v", err) } - node := ndf.Node{ + n := ndf.Node{ ID: nodeID.Bytes(), Address: nodeInfo.ServerAddress, TlsCertificate: nodeInfo.NodeCertificate, } + jww.INFO.Printf("Node %s (AppID: %d) registed with code %s", nodeID, nodeInfo.ApplicationId, code) + gwID := nodeID.DeepCopy() gwID.SetType(id.Gateway) gateway := ndf.Gateway{ @@ -260,10 +263,10 @@ func assembleNdf(code string) (ndf.Gateway, ndf.Node, int, error) { order, err := strconv.Atoi(nodeInfo.Sequence) if err != nil { - return gateway, node, -1, nil + return gateway, n, -1, nil } - return gateway, node, order, nil + return gateway, n, order, nil } // outputNodeTopologyToJSON encodes the NodeTopology structure to JSON and diff --git a/cmd/poll.go b/cmd/poll.go index 4c69df2852701e519ba458095464018b6a0dd3ab..a4892bbf0f3799d0f6f14603366c422611a8ebf3 100644 --- a/cmd/poll.go +++ b/cmd/poll.go @@ -32,6 +32,12 @@ func (m *RegistrationImpl) Poll(msg *pb.PermissioningPoll, auth *connect.Auth, // Initialize the response response := &pb.PermissionPollResponse{} + //do edge check to ensure the message is not nil + if msg == nil { + return nil, errors.Errorf("Message payload for unified poll " + + "is nil, poll cannot be processed") + } + // Ensure the NDF is ready to be returned regComplete := atomic.LoadUint32(m.NdfReady) if regComplete != 1 { diff --git a/cmd/poll_test.go b/cmd/poll_test.go index 841e793fb75249035a32a465acc4e52e612bc63b..76ac1e76152794044e19cd10d4fcff189bdb0fa5 100644 --- a/cmd/poll_test.go +++ b/cmd/poll_test.go @@ -148,7 +148,9 @@ func TestRegistrationImpl_PollNoNdf(t *testing.T) { }, } - _, err = impl.Poll(nil, nil, "") + dummyMessage := &pb.PermissioningPoll{} + + _, err = impl.Poll(dummyMessage, nil, "") if err == nil || err.Error() != ndf.NO_NDF { t.Errorf("Unexpected error polling: %+v", err) } @@ -189,7 +191,9 @@ func TestRegistrationImpl_PollFailAuth(t *testing.T) { Sender: testHost, } - _, err = impl.Poll(nil, testAuth, "0.0.0.0") + dummyMessage := &pb.PermissioningPoll{} + + _, err = impl.Poll(dummyMessage, testAuth, "0.0.0.0") if err == nil || err.Error() != connect.AuthError(testAuth.Sender.GetId()).Error() { t.Errorf("Unexpected error polling: %+v", err) } diff --git a/cmd/version.go b/cmd/version.go index b12cf1bd475a7a6651e703007923b6e9547c0506..46cdb27c0b366ae2f6f5f5f64f845eea6250ab06 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -15,7 +15,7 @@ import ( ) // Change this value to set the version for this build -const currentVersion = "1.2.3" +const currentVersion = "1.2.4" func printVersion() { fmt.Printf("xx network Permissioning Server v%s -- %s\n\n", diff --git a/cmd/version_vars.go b/cmd/version_vars.go index 866baa2f174a2bac3829ee1dd41c9adb62aa8d71..16b11982eed26b243e3932d6f622eba48d74d576 100644 --- a/cmd/version_vars.go +++ b/cmd/version_vars.go @@ -1,10 +1,10 @@ // Code generated by go generate; DO NOT EDIT. // This file was generated by robots at -// 2020-06-24 12:33:38.2042 -0700 PDT m=+0.019241721 +// 2020-06-25 16:47:50.2117064 -0700 PDT m=+0.210670301 package cmd -const GITVERSION = `49d5bcf Fix broken release` -const SEMVER = "1.2.3" +const GITVERSION = `79362eb Merge branch 'hotfix/slowdown' into 'release'` +const SEMVER = "1.2.4" const DEPENDENCIES = `module gitlab.com/elixxir/registration go 1.13 diff --git a/scheduling/nodeStateChange.go b/scheduling/nodeStateChange.go index 342b3bbab5e21293578fc41873610821ca69e670..842ecbafafee248e5e45da3e17c91ba33f533a9c 100644 --- a/scheduling/nodeStateChange.go +++ b/scheduling/nodeStateChange.go @@ -51,7 +51,8 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, if err != nil { jww.FATAL.Panicf("Failed to sign error message for banned node %s: %+v", update.Node, err) } - return false, killRound(state, r, n, banError) + n.ClearRound() + return false, killRound(state, r, banError) } else { pool.Ban(n) return false, nil @@ -166,7 +167,7 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, } //send the signal that the round is complete - r.GetRoundCompletedChan() <- struct{}{} + r.DenoteRoundCompleted() // Commit metrics about the round to storage return true, StoreRoundMetric(roundInfo) @@ -176,8 +177,9 @@ func HandleNodeUpdates(update node.UpdateNotification, pool *waitingPool, var err error if hasRound { //send the signal that the round is complete - r.GetRoundCompletedChan() <- struct{}{} - err = killRound(state, r, n, update.Error) + r.DenoteRoundCompleted() + n.ClearRound() + err = killRound(state, r, update.Error) } return false, err } @@ -206,11 +208,10 @@ func StoreRoundMetric(roundInfo *pb.RoundInfo) error { } // killRound sets the round to failed and clears the node's round -func killRound(state *storage.NetworkState, r *round.State, n *node.State, roundError *pb.RoundError) error { +func killRound(state *storage.NetworkState, r *round.State, roundError *pb.RoundError) error { r.AppendError(roundError) _ = r.Update(states.FAILED, time.Now()) - n.ClearRound() roundId := r.GetRoundID() // Build the round info and update the network state diff --git a/scheduling/nodeStateChange_test.go b/scheduling/nodeStateChange_test.go index 255e5ccfbe57796162b02d32eab827d773c61a8a..dbcf4df366afba66b122cc9178a00b1b791435e2 100644 --- a/scheduling/nodeStateChange_test.go +++ b/scheduling/nodeStateChange_test.go @@ -543,15 +543,13 @@ func TestKillRound(t *testing.T) { r := round.NewState_Testing(42, 0, t) - ns := testState.GetNodeMap().GetNode(nodeList[0]) - re := &mixmessages.RoundError{ Id: 0, NodeId: nil, Error: "test", } - // - err = killRound(testState, r, ns, re) + + err = killRound(testState, r, re) if err != nil { t.Errorf("Unexpected error in happy path: %v", err) } diff --git a/scheduling/schedule.go b/scheduling/schedule.go index 2f6341ccb8ec441796812532fbc25fcdfe748195..ee01913f0d4d52318ca04e0da7116e156bf6eb8a 100644 --- a/scheduling/schedule.go +++ b/scheduling/schedule.go @@ -83,10 +83,11 @@ func scheduler(params Params, state *storage.NetworkState, killchan chan chan st go func() { lastRound := time.Now() var err error + minRoundDelay := params.MinimumDelay * time.Millisecond for newRound := range newRoundChan { // To avoid back-to-back teaming, we make sure to sleep until the minimum delay - if timeDiff := time.Now().Sub(lastRound); timeDiff < params.MinimumDelay*time.Millisecond { - time.Sleep(timeDiff) + if timeDiff := time.Now().Sub(lastRound); timeDiff < minRoundDelay { + time.Sleep(minRoundDelay - timeDiff) } lastRound = time.Now() @@ -223,20 +224,10 @@ func timeoutRound(state *storage.NetworkState, timeoutRoundID id.Round) error { "timed out round %d: %+v", ourRound.GetRoundID(), err) } - // Parse the circuit, killing the round for each node - roundCircuit := ourRound.GetTopology() - for i := 0; i < roundCircuit.Len(); i++ { - // Get the node from the nodeMap - nid := roundCircuit.GetNodeAtIndex(i) - n := state.GetNodeMap().GetNode(nid) - - // Kill the round for this node - err = killRound(state, ourRound, n, timeoutError) - if err != nil { - return errors.WithMessagef(err, "Failed to kill round for node [%v]", nid) - } - - jww.DEBUG.Printf("Round [%d] killed due to timeout", ourRound.GetRoundID()) + err = killRound(state, ourRound, timeoutError) + if err != nil { + return errors.WithMessagef(err, "Failed to kill round %d: %s", + ourRound.GetRoundID(), err) } } diff --git a/scheduling/startRound.go b/scheduling/startRound.go index dcc56f1833a6c3b5ab4d13a61441ec6416f05ca4..ba46e0e0bbb23c1924221a325461236e987de6e0 100644 --- a/scheduling/startRound.go +++ b/scheduling/startRound.go @@ -31,14 +31,6 @@ func startRound(round protoRound, state *storage.NetworkState) error { return err } - // Issue the update to the network state - err = state.AddRoundUpdate(r.BuildRoundInfo()) - if err != nil { - err = errors.WithMessagef(err, "Could not issue "+ - "update to create round %v", r.GetRoundID()) - return err - } - // Tag all nodes to the round for i, n := range round.NodeStateList { jwalterweatherman.TRACE.Printf("Node %v is (%d)/(%d) of round", @@ -50,5 +42,13 @@ func startRound(round protoRound, state *storage.NetworkState) error { } } + // Issue the update to the network state + err = state.AddRoundUpdate(r.BuildRoundInfo()) + if err != nil { + err = errors.WithMessagef(err, "Could not issue "+ + "update to create round %v", r.GetRoundID()) + return err + } + return nil } diff --git a/storage/round/map.go b/storage/round/map.go index 1a03c7d9cd358478842b07b9266e7247d7445ffa..ab8e2118da111330f129fea943257655222613a5 100644 --- a/storage/round/map.go +++ b/storage/round/map.go @@ -48,7 +48,7 @@ func (rsm *StateMap) AddRound(id id.Round, batchsize uint32, resourceQueueTimeou // Gets rounds from the state structure func (rsm *StateMap) GetRound(id id.Round) *State { rsm.mux.RLock() - rsm.mux.RUnlock() + defer rsm.mux.RUnlock() return rsm.rounds[id] } diff --git a/storage/round/state.go b/storage/round/state.go index 5c79a7eb94747950853cb6b27dbe9e761f905357..aff35384d1e447ecc32d7836a35b6990bdb2071c 100644 --- a/storage/round/state.go +++ b/storage/round/state.go @@ -54,7 +54,7 @@ func newState(id id.Round, batchsize uint32, resourceQueueTimeout time.Duration, timestamps := make([]uint64, states.NUM_STATES) timestamps[states.PENDING] = uint64(pendingTs.Unix()) - roundCompleteChan := make(chan struct{}, topology.Len()*topology.Len()+1) + roundCompleteChan := make(chan struct{}, 1) //build and return the round state object return &State{ @@ -121,10 +121,6 @@ func (s *State) Update(state states.Round, stamp time.Time) error { "greater state") } - if state == states.FAILED || state == states.COMPLETED { - s.roundComplete <- struct{}{} - } - s.state = state s.base.Timestamps[state] = uint64(stamp.UnixNano()) return nil @@ -195,7 +191,16 @@ func (s *State) AppendError(roundError *pb.RoundError) { s.roundErrors = append(s.roundErrors, roundError) } -//returns the id of the round -func (s *State) GetRoundCompletedChan() chan struct{} { +//returns the channel used to stop the round timeout +func (s *State) GetRoundCompletedChan() <-chan struct{} { return s.roundComplete } + +// sends on the round complete channel to the timeout thread notifying it +// that the round has completed +func (s *State) DenoteRoundCompleted() { + select { + case s.roundComplete <- struct{}{}: + default: + } +}