From b38c97505d5e2e5025a520a40b5e16c33845b773 Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" <rick.carback@gmail.com> Date: Tue, 29 Mar 2022 16:01:25 +0000 Subject: [PATCH] add tests for historical rounds lookup code --- network/historical/historical.go | 131 +++++++++++--------- network/historical/historical_test.go | 164 ++++++++++++++++++++++++++ 2 files changed, 241 insertions(+), 54 deletions(-) create mode 100644 network/historical/historical_test.go diff --git a/network/historical/historical.go b/network/historical/historical.go index 25688c109..884b33778 100644 --- a/network/historical/historical.go +++ b/network/historical/historical.go @@ -9,6 +9,8 @@ package historical import ( "fmt" + "time" + "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/event" @@ -17,7 +19,6 @@ import ( pb "gitlab.com/elixxir/comms/mixmessages" "gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/primitives/id" - "time" ) // Historical Rounds looks up the round history via random gateways. @@ -103,7 +104,8 @@ func (m *manager) StartProcessies() *stoppable.Single { // processHistoricalRounds is a long running thread which // process historical rounds. Can be killed by sending // a signal to the quit channel takes a comms interface to aid in testing -func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Single) { +func (m *manager) processHistoricalRounds(comm RoundsComms, + stop *stoppable.Single) { timerCh := make(<-chan time.Time) @@ -115,9 +117,11 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Sing // wait for a quit or new round to check select { case <-stop.Quit(): - // return all roundRequests in the queue to the input channel so they can - // be checked in the future. If the queue is full, disable them as - // processing so they are picked up from the beginning + // return all roundRequests in the queue to + // the input channel so they can be checked in + // the future. If the queue is full, disable + // them as processing so they are picked up + // from the beginning for _, r := range roundRequests { select { case m.c <- r: @@ -126,7 +130,8 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Sing } stop.ToStopped() return - // if the timer elapses process roundRequests to ensure the delay isn't too long + // if the timer elapses process roundRequests to + // ensure the delay isn't too long case <-timerCh: if len(roundRequests) > 0 { shouldProcess = true @@ -158,66 +163,84 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Sing } var gwHost *connect.Host - result, err := m.sender.SendToAny(func(host *connect.Host) (interface{}, error) { - jww.DEBUG.Printf("Requesting Historical rounds %v from "+ - "gateway %s", rounds, host.GetId()) - gwHost = host - return comm.RequestHistoricalRounds(host, hr) - }, stop) + result, err := m.sender.SendToAny( + func(host *connect.Host) (interface{}, error) { + jww.DEBUG.Printf("Requesting Historical "+ + "rounds %v from gateway %s", rounds, + host.GetId()) + gwHost = host + return comm.RequestHistoricalRounds(host, hr) + }, stop) if err != nil { - jww.ERROR.Printf("Failed to request historical roundRequests "+ - "data for rounds %v: %s", rounds, err) - // if the check fails to resolve, break the loop and so they will be - // checked again + jww.ERROR.Printf("Failed to request historical "+ + "roundRequests data for rounds %v: %s", + rounds, err) + // if the check fails to resolve, break the + // loop and so they will be checked again timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C continue } response := result.(*pb.HistoricalRoundsResponse) - rids := make([]uint64, 0) - // process the returned historical roundRequests. - for i, roundInfo := range response.Rounds { - // The interface has missing returns returned as nil, such roundRequests - // need be be removes as processing so the network follower will - // pick them up in the future. - if roundInfo == nil { - var errMsg string - roundRequests[i].numAttempts++ - if roundRequests[i].numAttempts == m.params.MaxHistoricalRoundsRetries { - errMsg = fmt.Sprintf("Failed to retreive historical "+ - "round %d on last attempt, will not try again", - roundRequests[i].rid) - go roundRequests[i].RoundResultCallback(nil, false) - } else { - select { - case m.c <- roundRequests[i]: - errMsg = fmt.Sprintf("Failed to retreive historical "+ - "round %d, will try up to %d more times", - roundRequests[i].rid, m.params.MaxHistoricalRoundsRetries-roundRequests[i].numAttempts) - default: - errMsg = fmt.Sprintf("Failed to retreive historical "+ - "round %d, failed to try again, round will not be "+ - "retreived", roundRequests[i].rid) - } - } - jww.WARN.Printf(errMsg) - m.events.Report(5, "HistoricalRounds", - "Error", errMsg) - continue - } - // Successfully retrieved roundRequests are returned on the callback - go roundRequests[i].RoundResultCallback(roundInfo, true) - - rids = append(rids, roundInfo.ID) - } + rids, retries := processHistoricalRoundsResponse(response, + roundRequests, m.params.MaxHistoricalRoundsRetries, + m.events) m.events.Report(1, "HistoricalRounds", "Metrics", fmt.Sprintf("Received %d historical rounds from"+ " gateway %s: %v", len(response.Rounds), gwHost, rids)) - //clear the buffer now that all have been checked - roundRequests = make([]roundRequest, 0) + // reset the buffer to those left to retry now that all + // have been checked + roundRequests = retries + // Now reset the timer, this prevents immediate reprocessing + // of the retries, limiting it to the next historical round + // request when buffer is full OR next timer tick + timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C } } + +func processHistoricalRoundsResponse(response *pb.HistoricalRoundsResponse, + roundRequests []roundRequest, maxRetries uint, + events event.Manager) ([]uint64, []roundRequest) { + retries := make([]roundRequest, 0) + rids := make([]uint64, 0) + // process the returned historical roundRequests. + for i, roundInfo := range response.Rounds { + // The interface has missing returns returned as nil, + // such roundRequests need be be removes as processing + // so the network follower will pick them up in the + // future. + if roundInfo == nil { + var errMsg string + roundRequests[i].numAttempts++ + if roundRequests[i].numAttempts == maxRetries { + errMsg = fmt.Sprintf("Failed to retrieve "+ + "historical round %d on last attempt,"+ + " will not try again", + roundRequests[i].rid) + go roundRequests[i].RoundResultCallback(nil, + false) + } else { + retries = append(retries, roundRequests[i]) + errMsg = fmt.Sprintf("Failed to retrieve "+ + "historical round %d, will try up to "+ + "%d more times", roundRequests[i].rid, + maxRetries-roundRequests[i].numAttempts) + } + jww.WARN.Printf(errMsg) + events.Report(5, "HistoricalRounds", + "Error", errMsg) + continue + } + // Successfully retrieved roundRequests are returned + // on the callback + go roundRequests[i].RoundResultCallback(roundInfo, true) + + rids = append(rids, roundInfo.ID) + } + + return rids, retries +} diff --git a/network/historical/historical_test.go b/network/historical/historical_test.go new file mode 100644 index 000000000..22fef0d99 --- /dev/null +++ b/network/historical/historical_test.go @@ -0,0 +1,164 @@ +/////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +/////////////////////////////////////////////////////////////////////////////// + +package historical + +import ( + "testing" + "time" + + "gitlab.com/elixxir/client/network/gateway" + "gitlab.com/elixxir/client/stoppable" + pb "gitlab.com/elixxir/comms/mixmessages" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/ndf" +) + +// TestHistoricalRounds provides a smoke test to run through most of the code +// paths for historical round lookup. +func TestHistoricalRounds(t *testing.T) { + params := GetDefaultParams() + params.HistoricalRoundsPeriod = 500 * time.Millisecond + params.MaxHistoricalRounds = 3 + comms := &testRoundsComms{} + sender := &testGWSender{sendCnt: 0} + events := &testEventMgr{} + hMgr := NewRetriever(params, comms, sender, events) + stopper := hMgr.StartProcessies() + + // case 1: Send a round request and wait for timeout for + // processing + hMgr.LookupHistoricalRound(42, func(info *pb.RoundInfo, success bool) { + t.Errorf("first called when it shouldn't") + }) + time.Sleep(501 * time.Millisecond) + + if sender.sendCnt != 1 { + t.Errorf("did not send as expected") + } + + // case 2: make round requests up to m.params.MaxHistoricalRounds + for i := 0; i < 3; i++ { + hMgr.LookupHistoricalRound(id.Round(40+i), + func(info *pb.RoundInfo, success bool) { + t.Errorf("i called when it shouldn't") + }) + } + + time.Sleep(10 * time.Millisecond) + + if sender.sendCnt != 2 { + t.Errorf("unexpected send count: %d != 2", sender.sendCnt) + } + + err := stopper.Close() + if err != nil { + t.Errorf("%+v", err) + } + if stopper.IsRunning() { + t.Errorf("historical rounds routine failed to close") + } +} + +// TestHistoricalRoundsProcessing exercises the +func TestProcessHistoricalRoundsResponse(t *testing.T) { + params := GetDefaultParams() + bad_rr := roundRequest{ + rid: id.Round(41), + RoundResultCallback: func(info *pb.RoundInfo, success bool) { + t.Errorf("bad called when it shouldn't") + }, + numAttempts: params.MaxHistoricalRoundsRetries - 2, + } + expired_rr := roundRequest{ + rid: id.Round(42), + RoundResultCallback: func(info *pb.RoundInfo, success bool) { + if info == nil && !success { + return + } + t.Errorf("expired called with bad params") + }, + numAttempts: params.MaxHistoricalRoundsRetries - 1, + } + x := false + callbackCalled := &x + good_rr := roundRequest{ + rid: id.Round(43), + RoundResultCallback: func(info *pb.RoundInfo, success bool) { + *callbackCalled = true + }, + numAttempts: 0, + } + rrs := []roundRequest{bad_rr, expired_rr, good_rr} + rifs := make([]*pb.RoundInfo, 3) + rifs[0] = nil + rifs[1] = nil + rifs[2] = &pb.RoundInfo{ID: 43} + response := &pb.HistoricalRoundsResponse{ + Rounds: rifs, + } + events := &testEventMgr{} + + rids, retries := processHistoricalRoundsResponse(response, rrs, + params.MaxHistoricalRoundsRetries, events) + + if len(rids) != 1 || rids[0] != 43 { + t.Errorf("bad return: %v, expected [43]", rids) + } + + // Note: 1 of the entries was expired, thats why this is not 2. + if len(retries) != 1 { + t.Errorf("retries not right length: %d != 1", len(retries)) + } + + time.Sleep(5 * time.Millisecond) + + if !*callbackCalled { + t.Errorf("expected callback to be called") + } +} + +// Test structure implementations +type testRoundsComms struct{} + +func (t *testRoundsComms) GetHost(hostId *id.ID) (*connect.Host, bool) { + return nil, false +} +func (t *testRoundsComms) RequestHistoricalRounds(host *connect.Host, + message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) { + return nil, nil +} + +type testGWSender struct { + sendCnt int +} + +func (t *testGWSender) SendToAny(sendFunc func(host *connect.Host) (interface{}, + error), stop *stoppable.Single) (interface{}, error) { + // this is always called with at least 1 round info set + rifs := make([]*pb.RoundInfo, 1) + rifs[0] = nil + m := &pb.HistoricalRoundsResponse{Rounds: rifs} + t.sendCnt += 1 + return m, nil +} +func (t *testGWSender) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc, + stop *stoppable.Single, timeout time.Duration) (interface{}, error) { + return t, nil +} +func (t *testGWSender) UpdateNdf(ndf *ndf.NetworkDefinition) { +} +func (t *testGWSender) SetGatewayFilter(f gateway.Filter) {} +func (t *testGWSender) GetHostParams() connect.HostParams { + return connect.GetDefaultHostParams() +} + +type testEventMgr struct{} + +func (t *testEventMgr) Report(priority int, category, evtType, details string) { +} -- GitLab