Skip to content
Snippets Groups Projects
Commit b38c9750 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

add tests for historical rounds lookup code

parent 8a32c0d3
No related branches found
No related tags found
3 merge requests!510Release,!207WIP: Client Restructure,!203Symmetric broadcast
......@@ -9,6 +9,8 @@ package historical
import (
"fmt"
"time"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/event"
......@@ -17,7 +19,6 @@ import (
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"time"
)
// Historical Rounds looks up the round history via random gateways.
......@@ -103,7 +104,8 @@ func (m *manager) StartProcessies() *stoppable.Single {
// processHistoricalRounds is a long running thread which
// process historical rounds. Can be killed by sending
// a signal to the quit channel takes a comms interface to aid in testing
func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Single) {
func (m *manager) processHistoricalRounds(comm RoundsComms,
stop *stoppable.Single) {
timerCh := make(<-chan time.Time)
......@@ -115,9 +117,11 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Sing
// wait for a quit or new round to check
select {
case <-stop.Quit():
// return all roundRequests in the queue to the input channel so they can
// be checked in the future. If the queue is full, disable them as
// processing so they are picked up from the beginning
// return all roundRequests in the queue to
// the input channel so they can be checked in
// the future. If the queue is full, disable
// them as processing so they are picked up
// from the beginning
for _, r := range roundRequests {
select {
case m.c <- r:
......@@ -126,7 +130,8 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Sing
}
stop.ToStopped()
return
// if the timer elapses process roundRequests to ensure the delay isn't too long
// if the timer elapses process roundRequests to
// ensure the delay isn't too long
case <-timerCh:
if len(roundRequests) > 0 {
shouldProcess = true
......@@ -158,66 +163,84 @@ func (m *manager) processHistoricalRounds(comm RoundsComms, stop *stoppable.Sing
}
var gwHost *connect.Host
result, err := m.sender.SendToAny(func(host *connect.Host) (interface{}, error) {
jww.DEBUG.Printf("Requesting Historical rounds %v from "+
"gateway %s", rounds, host.GetId())
gwHost = host
return comm.RequestHistoricalRounds(host, hr)
}, stop)
result, err := m.sender.SendToAny(
func(host *connect.Host) (interface{}, error) {
jww.DEBUG.Printf("Requesting Historical "+
"rounds %v from gateway %s", rounds,
host.GetId())
gwHost = host
return comm.RequestHistoricalRounds(host, hr)
}, stop)
if err != nil {
jww.ERROR.Printf("Failed to request historical roundRequests "+
"data for rounds %v: %s", rounds, err)
// if the check fails to resolve, break the loop and so they will be
// checked again
jww.ERROR.Printf("Failed to request historical "+
"roundRequests data for rounds %v: %s",
rounds, err)
// if the check fails to resolve, break the
// loop and so they will be checked again
timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C
continue
}
response := result.(*pb.HistoricalRoundsResponse)
rids := make([]uint64, 0)
// process the returned historical roundRequests.
for i, roundInfo := range response.Rounds {
// The interface has missing returns returned as nil, such roundRequests
// need be be removes as processing so the network follower will
// pick them up in the future.
if roundInfo == nil {
var errMsg string
roundRequests[i].numAttempts++
if roundRequests[i].numAttempts == m.params.MaxHistoricalRoundsRetries {
errMsg = fmt.Sprintf("Failed to retreive historical "+
"round %d on last attempt, will not try again",
roundRequests[i].rid)
go roundRequests[i].RoundResultCallback(nil, false)
} else {
select {
case m.c <- roundRequests[i]:
errMsg = fmt.Sprintf("Failed to retreive historical "+
"round %d, will try up to %d more times",
roundRequests[i].rid, m.params.MaxHistoricalRoundsRetries-roundRequests[i].numAttempts)
default:
errMsg = fmt.Sprintf("Failed to retreive historical "+
"round %d, failed to try again, round will not be "+
"retreived", roundRequests[i].rid)
}
}
jww.WARN.Printf(errMsg)
m.events.Report(5, "HistoricalRounds",
"Error", errMsg)
continue
}
// Successfully retrieved roundRequests are returned on the callback
go roundRequests[i].RoundResultCallback(roundInfo, true)
rids = append(rids, roundInfo.ID)
}
rids, retries := processHistoricalRoundsResponse(response,
roundRequests, m.params.MaxHistoricalRoundsRetries,
m.events)
m.events.Report(1, "HistoricalRounds", "Metrics",
fmt.Sprintf("Received %d historical rounds from"+
" gateway %s: %v", len(response.Rounds), gwHost,
rids))
//clear the buffer now that all have been checked
roundRequests = make([]roundRequest, 0)
// reset the buffer to those left to retry now that all
// have been checked
roundRequests = retries
// Now reset the timer, this prevents immediate reprocessing
// of the retries, limiting it to the next historical round
// request when buffer is full OR next timer tick
timerCh = time.NewTimer(m.params.HistoricalRoundsPeriod).C
}
}
func processHistoricalRoundsResponse(response *pb.HistoricalRoundsResponse,
roundRequests []roundRequest, maxRetries uint,
events event.Manager) ([]uint64, []roundRequest) {
retries := make([]roundRequest, 0)
rids := make([]uint64, 0)
// process the returned historical roundRequests.
for i, roundInfo := range response.Rounds {
// The interface has missing returns returned as nil,
// such roundRequests need be be removes as processing
// so the network follower will pick them up in the
// future.
if roundInfo == nil {
var errMsg string
roundRequests[i].numAttempts++
if roundRequests[i].numAttempts == maxRetries {
errMsg = fmt.Sprintf("Failed to retrieve "+
"historical round %d on last attempt,"+
" will not try again",
roundRequests[i].rid)
go roundRequests[i].RoundResultCallback(nil,
false)
} else {
retries = append(retries, roundRequests[i])
errMsg = fmt.Sprintf("Failed to retrieve "+
"historical round %d, will try up to "+
"%d more times", roundRequests[i].rid,
maxRetries-roundRequests[i].numAttempts)
}
jww.WARN.Printf(errMsg)
events.Report(5, "HistoricalRounds",
"Error", errMsg)
continue
}
// Successfully retrieved roundRequests are returned
// on the callback
go roundRequests[i].RoundResultCallback(roundInfo, true)
rids = append(rids, roundInfo.ID)
}
return rids, retries
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package historical
import (
"testing"
"time"
"gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/stoppable"
pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/ndf"
)
// TestHistoricalRounds provides a smoke test to run through most of the code
// paths for historical round lookup.
func TestHistoricalRounds(t *testing.T) {
params := GetDefaultParams()
params.HistoricalRoundsPeriod = 500 * time.Millisecond
params.MaxHistoricalRounds = 3
comms := &testRoundsComms{}
sender := &testGWSender{sendCnt: 0}
events := &testEventMgr{}
hMgr := NewRetriever(params, comms, sender, events)
stopper := hMgr.StartProcessies()
// case 1: Send a round request and wait for timeout for
// processing
hMgr.LookupHistoricalRound(42, func(info *pb.RoundInfo, success bool) {
t.Errorf("first called when it shouldn't")
})
time.Sleep(501 * time.Millisecond)
if sender.sendCnt != 1 {
t.Errorf("did not send as expected")
}
// case 2: make round requests up to m.params.MaxHistoricalRounds
for i := 0; i < 3; i++ {
hMgr.LookupHistoricalRound(id.Round(40+i),
func(info *pb.RoundInfo, success bool) {
t.Errorf("i called when it shouldn't")
})
}
time.Sleep(10 * time.Millisecond)
if sender.sendCnt != 2 {
t.Errorf("unexpected send count: %d != 2", sender.sendCnt)
}
err := stopper.Close()
if err != nil {
t.Errorf("%+v", err)
}
if stopper.IsRunning() {
t.Errorf("historical rounds routine failed to close")
}
}
// TestHistoricalRoundsProcessing exercises the
func TestProcessHistoricalRoundsResponse(t *testing.T) {
params := GetDefaultParams()
bad_rr := roundRequest{
rid: id.Round(41),
RoundResultCallback: func(info *pb.RoundInfo, success bool) {
t.Errorf("bad called when it shouldn't")
},
numAttempts: params.MaxHistoricalRoundsRetries - 2,
}
expired_rr := roundRequest{
rid: id.Round(42),
RoundResultCallback: func(info *pb.RoundInfo, success bool) {
if info == nil && !success {
return
}
t.Errorf("expired called with bad params")
},
numAttempts: params.MaxHistoricalRoundsRetries - 1,
}
x := false
callbackCalled := &x
good_rr := roundRequest{
rid: id.Round(43),
RoundResultCallback: func(info *pb.RoundInfo, success bool) {
*callbackCalled = true
},
numAttempts: 0,
}
rrs := []roundRequest{bad_rr, expired_rr, good_rr}
rifs := make([]*pb.RoundInfo, 3)
rifs[0] = nil
rifs[1] = nil
rifs[2] = &pb.RoundInfo{ID: 43}
response := &pb.HistoricalRoundsResponse{
Rounds: rifs,
}
events := &testEventMgr{}
rids, retries := processHistoricalRoundsResponse(response, rrs,
params.MaxHistoricalRoundsRetries, events)
if len(rids) != 1 || rids[0] != 43 {
t.Errorf("bad return: %v, expected [43]", rids)
}
// Note: 1 of the entries was expired, thats why this is not 2.
if len(retries) != 1 {
t.Errorf("retries not right length: %d != 1", len(retries))
}
time.Sleep(5 * time.Millisecond)
if !*callbackCalled {
t.Errorf("expected callback to be called")
}
}
// Test structure implementations
type testRoundsComms struct{}
func (t *testRoundsComms) GetHost(hostId *id.ID) (*connect.Host, bool) {
return nil, false
}
func (t *testRoundsComms) RequestHistoricalRounds(host *connect.Host,
message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) {
return nil, nil
}
type testGWSender struct {
sendCnt int
}
func (t *testGWSender) SendToAny(sendFunc func(host *connect.Host) (interface{},
error), stop *stoppable.Single) (interface{}, error) {
// this is always called with at least 1 round info set
rifs := make([]*pb.RoundInfo, 1)
rifs[0] = nil
m := &pb.HistoricalRoundsResponse{Rounds: rifs}
t.sendCnt += 1
return m, nil
}
func (t *testGWSender) SendToPreferred(targets []*id.ID, sendFunc gateway.SendToPreferredFunc,
stop *stoppable.Single, timeout time.Duration) (interface{}, error) {
return t, nil
}
func (t *testGWSender) UpdateNdf(ndf *ndf.NetworkDefinition) {
}
func (t *testGWSender) SetGatewayFilter(f gateway.Filter) {}
func (t *testGWSender) GetHostParams() connect.HostParams {
return connect.GetDefaultHostParams()
}
type testEventMgr struct{}
func (t *testEventMgr) Report(priority int, category, evtType, details string) {
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment