Skip to content
Snippets Groups Projects
Commit 0b2c562a authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'hotfix/historicalRounds' into 'release'

removed processign round and updated historical rounds to handle failures and retries

See merge request !573
parents e6d6fb53 23a5a42e
No related branches found
No related tags found
No related merge requests found
......@@ -12,9 +12,6 @@ import (
)
type Rounds struct {
// Maximum number of times to attempt to retrieve a round from a gateway
// before giving up on it
MaxAttemptsCheckingARound uint
// Number of historical rounds required to automatically send a historical
// rounds query
MaxHistoricalRounds uint
......@@ -31,11 +28,13 @@ type Rounds struct {
// Toggles if historical rounds should always be used
ForceHistoricalRounds bool
// Maximum number of times a historical round lookup will be attempted
MaxHistoricalRoundsRetries uint
}
func GetDefaultRounds() Rounds {
return Rounds{
MaxAttemptsCheckingARound: 5,
MaxHistoricalRounds: 100,
HistoricalRoundsPeriod: 100 * time.Millisecond,
NumMessageRetrievalWorkers: 8,
......@@ -43,5 +42,6 @@ func GetDefaultRounds() Rounds {
HistoricalRoundsBufferLen: 1000,
LookupRoundsBufferLen: 2000,
ForceHistoricalRounds: false,
MaxHistoricalRoundsRetries: 3,
}
}
......@@ -61,6 +61,7 @@ func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.Iden
m.historicalRounds <- historicalRoundRequest{
rid: roundID,
identity: identity,
numAttempts: 0,
}
} else {
jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+
......
......@@ -36,6 +36,7 @@ type historicalRoundsComms interface {
type historicalRoundRequest struct {
rid id.Round
identity reception.IdentityUse
numAttempts uint
}
// Long running thread which process historical rounds
......@@ -62,7 +63,6 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
select {
case m.historicalRounds <- r:
default:
m.p.NotProcessing(r.rid, r.identity.EphId, r.identity.Source)
}
}
done = true
......@@ -123,9 +123,23 @@ func (m *Manager) processHistoricalRounds(comm historicalRoundsComms, quitCh <-c
// need be be removes as processing so the network follower will
// pick them up in the future.
if roundInfo == nil {
jww.ERROR.Printf("Failed to retreive "+
"historical round %d", roundRequests[i].rid)
m.p.Fail(roundRequests[i].rid, roundRequests[i].identity.EphId, roundRequests[i].identity.Source)
roundRequests[i].numAttempts++
if roundRequests[i].numAttempts==m.params.MaxHistoricalRoundsRetries{
jww.ERROR.Printf("Failed to retreive historical " +
"round %d on last attempt, will not try again",
roundRequests[i].rid)
}else{
select {
case m.historicalRounds <-roundRequests[i]:
jww.WARN.Printf("Failed to retreive historical " +
"round %d, will try up to %d more times",
roundRequests[i].rid, m.params.MaxHistoricalRoundsRetries-roundRequests[i].numAttempts)
default:
jww.WARN.Printf("Failed to retreive historical " +
"round %d, failed to try again, round will not be " +
"retreived", roundRequests[i].rid)
}
}
continue
}
// Successfully retrieved roundRequests are sent to the Message
......
......@@ -13,15 +13,11 @@ import (
"gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message"
"gitlab.com/elixxir/client/stoppable"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
)
type Manager struct {
params params.Rounds
p *processing
internal.Internal
historicalRounds chan historicalRoundRequest
......@@ -33,7 +29,6 @@ func NewManager(internal internal.Internal, params params.Rounds,
bundles chan<- message.Bundle) *Manager {
m := &Manager{
params: params,
p: newProcessingRounds(),
historicalRounds: make(chan historicalRoundRequest, params.HistoricalRoundsBufferLen),
lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen),
......@@ -61,8 +56,3 @@ func (m *Manager) StartProcessors() stoppable.Stoppable {
}
return multi
}
\ No newline at end of file
func (m *Manager) DeleteProcessingRoundDelete(round id.Round, eph ephemeral.Id, source *id.ID) {
m.p.Delete(round, eph, source)
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package rounds
// File for storing info about which rounds are processing
import (
"crypto/md5"
"encoding/binary"
"fmt"
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"sync"
)
type Status uint8
const (
NotProcessing Status = iota
Processing
Done
)
func (s Status) String() string {
switch s {
case NotProcessing:
return "NotProcessing"
case Processing:
return "Processing"
case Done:
return "Done"
default:
return fmt.Sprintf("Unknown Status: %d", s)
}
}
type status struct {
failCount uint
Status
}
// processing struct with a lock so it can be managed with concurrent threads.
type processing struct {
rounds map[hashID]*status
sync.RWMutex
}
type hashID [16]byte
func makeHashID(round id.Round, eph ephemeral.Id, source *id.ID) hashID {
h := md5.New()
ridbytes := make([]byte, 8)
binary.BigEndian.PutUint64(ridbytes, uint64(round))
h.Write(ridbytes)
h.Write(eph[:])
h.Write(source.Bytes())
hBytes := h.Sum(nil)
hid := hashID{}
copy(hid[:], hBytes)
return hid
}
// newProcessingRounds returns a new processing rounds object.
func newProcessingRounds() *processing {
return &processing{
rounds: make(map[hashID]*status),
}
}
// Process adds a round to the list of processing rounds. The returned boolean
// is true when the round changes from "not processing" to "processing". The
// returned count is the number of times the round has been processed.
func (pr *processing) Process(round id.Round, eph ephemeral.Id, source *id.ID) (Status, uint) {
hid := makeHashID(round, eph, source)
pr.Lock()
defer pr.Unlock()
var rs *status
var ok bool
if rs, ok = pr.rounds[hid]; ok && rs.Status == NotProcessing {
rs.Status = Processing
return NotProcessing, rs.failCount
} else if !ok {
rs = &status{
failCount: 0,
Status: Processing,
}
pr.rounds[hid] = rs
return NotProcessing, rs.failCount
}
return rs.Status, rs.failCount
}
// IsProcessing determines if a round ID is marked as processing.
func (pr *processing) IsProcessing(round id.Round, eph ephemeral.Id, source *id.ID) bool {
hid := makeHashID(round, eph, source)
pr.RLock()
defer pr.RUnlock()
if rs, ok := pr.rounds[hid]; ok {
return rs.Status == Processing
}
return false
}
// Fail sets a round's processing status to failed and increments its fail
// counter so that it can be retried.
func (pr *processing) Fail(round id.Round, eph ephemeral.Id, source *id.ID) {
hid := makeHashID(round, eph, source)
pr.Lock()
defer pr.Unlock()
if rs, ok := pr.rounds[hid]; ok {
rs.Status = NotProcessing
rs.failCount++
}
}
// Done deletes a round from the processing list.
func (pr *processing) Done(round id.Round, eph ephemeral.Id, source *id.ID) {
hid := makeHashID(round, eph, source)
pr.Lock()
defer pr.Unlock()
if rs, ok := pr.rounds[hid]; ok {
rs.Status = Done
}
}
// NotProcessing sets a round's processing status to failed so that it can be
// retried but does not increment its fail counter.
func (pr *processing) NotProcessing(round id.Round, eph ephemeral.Id, source *id.ID) {
hid := makeHashID(round, eph, source)
pr.Lock()
defer pr.Unlock()
if rs, ok := pr.rounds[hid]; ok {
rs.Status = NotProcessing
}
}
// Done deletes a round from the processing list.
func (pr *processing) Delete(round id.Round, eph ephemeral.Id, source *id.ID) {
hid := makeHashID(round, eph, source)
pr.Lock()
defer pr.Unlock()
delete(pr.rounds, hid)
}
///////////////////////////////////////////////////////////////////////////////
// Copyright © 2020 xx network SEZC //
// //
// Use of this source code is governed by a license that can be found in the //
// LICENSE file //
///////////////////////////////////////////////////////////////////////////////
package rounds
import (
"gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/id/ephemeral"
"reflect"
"testing"
)
// Testing functions for Processing Round structure
// Tests happy path of newProcessingRounds.
func Test_newProcessingRounds(t *testing.T) {
expectedPr := &processing{
rounds: make(map[hashID]*status),
}
pr := newProcessingRounds()
if !reflect.DeepEqual(expectedPr, pr) {
t.Errorf("Did not get expected processing."+
"\n\texpected: %v\n\trecieved: %v", expectedPr, pr)
}
}
// Tests happy path of Process.
func TestProcessing_Process(t *testing.T) {
pr := newProcessingRounds()
ephID := ephemeral.Id{}
source := &id.ID{}
testData := []struct {
rid id.Round
status Status
count uint
}{
{10, NotProcessing, 0},
{10, NotProcessing, 0},
{10, Processing, 0},
{100, NotProcessing, 0},
{100, NotProcessing, 0},
{100, Processing, 0},
}
for i, d := range testData {
hid := makeHashID(d.rid, ephID, source)
if _, exists := pr.rounds[hid]; exists {
pr.rounds[hid].Status = d.status
}
status, count := pr.Process(d.rid, ephID, source)
if status != d.status {
t.Errorf("Process() did not return the correct boolean for round "+
"ID %d (%d).\nexpected: %s\nrecieved: %s",
d.rid, i, d.status, status)
}
if count != d.count {
t.Errorf("Process did not return the expected count for round ID "+
"%d (%d).\n\texpected: %d\n\trecieved: %d",
d.rid, i, d.count, count)
}
if _, ok := pr.rounds[hid]; !ok {
t.Errorf("Process() did not add round ID %d to the map (%d).",
d.rid, i)
}
}
}
// Tests happy path of IsProcessing.
func TestProcessing_IsProcessing(t *testing.T) {
pr := newProcessingRounds()
ephID := ephemeral.Id{}
source := &id.ID{}
rid := id.Round(10)
hid := makeHashID(rid, ephID, source)
pr.rounds[hid] = &status{0, Processing}
if !pr.IsProcessing(rid, ephID, source) {
t.Errorf("IsProcessing() should have returned %s for round ID %d.", Processing, rid)
}
pr.rounds[hid].Status = NotProcessing
if pr.IsProcessing(rid, ephID, source) {
t.Errorf("IsProcessing() should have returned %s for round ID %d.", NotProcessing, rid)
}
}
// Tests happy path of Fail.
func TestProcessing_Fail(t *testing.T) {
pr := newProcessingRounds()
rid := id.Round(10)
ephID := ephemeral.Id{}
source := &id.ID{}
hid := makeHashID(rid, ephID, source)
pr.rounds[hid] = &status{0, Processing}
pr.Fail(rid, ephID, source)
if pr.rounds[hid].Status == Processing {
t.Errorf("Fail() did not mark processing as false for round id %d.", rid)
}
if pr.rounds[hid].failCount != 1 {
t.Errorf("Fail() did not increment the fail count of round id %d.", rid)
}
}
// Tests happy path of Done.
func TestProcessing_Done(t *testing.T) {
pr := newProcessingRounds()
rid := id.Round(10)
ephID := ephemeral.Id{}
source := &id.ID{}
hid := makeHashID(rid, ephID, source)
pr.rounds[hid] = &status{0, Processing}
pr.Done(rid, ephID, source)
if s, _ := pr.rounds[hid]; s.Status != Done {
t.Errorf("Done() failed to flag round ID %d.", rid)
}
}
......@@ -82,7 +82,6 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
// After trying all gateways, if none returned we mark the round as a
// failure and print out the last error
if err != nil {
m.p.Fail(id.Round(ri.ID), rl.identity.EphId, rl.identity.Source)
jww.ERROR.Printf("Failed to get pickup round %d "+
"from all gateways (%v): final gateway %s returned : %s",
id.Round(ri.ID), gwIDs, gwHosts[len(gwHosts)-1].GetId(), err)
......@@ -116,7 +115,6 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
}
// if the gateway doesnt have the round, return an error
if !msgResp.GetHasRound() {
m.p.Done(roundID, identity.EphId, identity.Source)
return message.Bundle{}, errors.Errorf(noRoundError)
}
......@@ -139,7 +137,7 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, identity reception.Id
Round: roundID,
Messages: make([]format.Message, len(msgs)),
Finish: func() {
m.p.Done(roundID, identity.EphId, identity.Source)
return
},
}
......
......@@ -23,7 +23,6 @@ func newManager(face interface{}) *Manager {
testManager := &Manager{
lookupRoundMessages: make(chan roundLookup),
messageBundles: make(chan message.Bundle),
p: newProcessingRounds(),
Internal: internal.Internal{
Session: sess1,
TransmissionID: sess1.GetUser().TransmissionID,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment