Skip to content
Snippets Groups Projects
Commit fb478b3a authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

added the realtime only network flag which doesnt do any attempt at picking up missed messages

parent 789574c0
No related branches found
No related tags found
2 merge requests!170Release,!137added the realtime only network flag which doesnt do any attempt at picking up missed messages
......@@ -16,6 +16,7 @@ type Messages struct {
MessageReceptionWorkerPoolSize uint
MaxChecksGarbledMessage uint
GarbledMessageWait time.Duration
RealtimeOnly bool
}
func GetDefaultMessage() Messages {
......@@ -24,5 +25,6 @@ func GetDefaultMessage() Messages {
MessageReceptionWorkerPoolSize: 4,
MaxChecksGarbledMessage: 10,
GarbledMessageWait: 15 * time.Minute,
RealtimeOnly: false,
}
}
......@@ -34,6 +34,8 @@ type Network struct {
// Determines if the state of every round processed is tracked in ram.
// This is very memory intensive and is primarily used for debugging
VerboseRoundTracking bool
//disables all attempts to pick up dropped or missed messages
RealtimeOnly bool
Rounds
Messages
......@@ -54,6 +56,7 @@ func GetDefaultNetwork() Network {
FastPolling: true,
BlacklistedNodes: make([]string, 0),
VerboseRoundTracking: false,
RealtimeOnly: false,
}
n.Rounds = GetDefaultRounds()
n.Messages = GetDefaultMessage()
......@@ -65,6 +68,13 @@ func (n Network) Marshal() ([]byte, error) {
return json.Marshal(n)
}
func (n Network) SetRealtimeOnlyAll()Network {
n.RealtimeOnly = true
n.Rounds.RealtimeOnly = true
n.Messages.RealtimeOnly = true
return n
}
// Obtain default Network parameters, or override with given parameters if set
func GetNetworkParameters(params string) (Network, error) {
p := GetDefaultNetwork()
......
......@@ -43,6 +43,9 @@ type Rounds struct {
// Duration to wait before sending on a round times out and a new round is
// tried
SendTimeout time.Duration
//disables all attempts to pick up dropped or missed messages
RealtimeOnly bool
}
func GetDefaultRounds() Rounds {
......@@ -58,5 +61,6 @@ func GetDefaultRounds() Rounds {
UncheckRoundPeriod: 20 * time.Second,
ForceMessagePickupRetry: false,
SendTimeout: 1 * time.Second,
RealtimeOnly: false,
}
}
......@@ -367,12 +367,17 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
jww.DEBUG.Printf("New Earliest Remaining: %d, Gateways last checked: %d", earliestRemaining, gwRoundsState.GetLastChecked())
}
roundsWithMessages2 := identity.UR.Iterate(func(rid id.Round) bool {
if gwRoundsState.Checked(rid) {
return rounds.Checker(rid, filterList, identity.CR)
}
return false
}, roundsUnknown, abandon)
var roundsWithMessages2 []id.Round
if !m.param.RealtimeOnly{
roundsWithMessages2 = identity.UR.Iterate(func(rid id.Round) bool {
if gwRoundsState.Checked(rid) {
return rounds.Checker(rid, filterList, identity.CR)
}
return false
}, roundsUnknown, abandon)
}
for _, rid := range roundsWithMessages {
//denote that the round has been looked at in the tracking store
......
......@@ -56,6 +56,11 @@ func serializeRound(roundId id.Round) []byte {
func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.IdentityUse) {
ri, err := m.Instance.GetRound(roundID)
if err != nil || m.params.ForceHistoricalRounds {
if m.params.RealtimeOnly {
jww.WARN.Printf("Skipping round %d because it is not in ram and we are realtime only mode",
roundID)
return
}
if m.params.ForceHistoricalRounds {
jww.WARN.Printf("Forcing use of historical rounds for round ID %d.",
roundID)
......@@ -80,11 +85,14 @@ func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.Iden
"up messages via in ram lookup", roundID, identity.EphId.Int64(),
identity.Source)
//store the round as an unretreived round
err = m.Session.UncheckedRounds().AddRound(roundID, ri,
identity.Source, identity.EphId)
if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID)
if !m.params.RealtimeOnly {
err = m.Session.UncheckedRounds().AddRound(roundID, ri,
identity.Source, identity.EphId)
if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID)
}
}
// If found, send to Message Retrieval Workers
m.lookupRoundMessages <- roundLookup{
roundInfo: ri,
......
......@@ -58,9 +58,12 @@ func (m *Manager) StartProcessors() stoppable.Stoppable {
}
// Start the periodic unchecked round worker
stopper := stoppable.NewSingle("UncheckRound")
go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper)
multi.Add(stopper)
if !m.params.RealtimeOnly{
stopper := stoppable.NewSingle("UncheckRound")
go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper)
multi.Add(stopper)
}
return multi
}
......@@ -49,12 +49,15 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
case rl := <-m.lookupRoundMessages:
ri := rl.roundInfo
jww.DEBUG.Printf("Checking for messages in round %d", ri.ID)
err := m.Session.UncheckedRounds().AddRound(id.Round(ri.ID), ri,
rl.identity.Source, rl.identity.EphId)
if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", id.Round(ri.ID))
if !m.params.RealtimeOnly{
err := m.Session.UncheckedRounds().AddRound(id.Round(ri.ID), ri,
rl.identity.Source, rl.identity.EphId)
if err != nil {
jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", id.Round(ri.ID))
}
}
// Convert gateways in round to proper ID format
gwIds := make([]*id.ID, len(ri.Topology))
for i, idBytes := range ri.Topology {
......@@ -73,7 +76,7 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
// messages first, randomize other members of the team
var rndBytes [32]byte
stream := m.Rng.GetStream()
_, err = stream.Read(rndBytes[:])
_, err := stream.Read(rndBytes[:])
stream.Close()
if err != nil {
jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+
......@@ -129,12 +132,15 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms,
m.messageBundles <- bundle
jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID)
err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID), rl.identity.Source, rl.identity.EphId)
if err != nil {
jww.ERROR.Printf("Could not remove round %d "+
"from unchecked rounds store: %v", ri.ID, err)
if !m.params.RealtimeOnly{
err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID), rl.identity.Source, rl.identity.EphId)
if err != nil {
jww.ERROR.Printf("Could not remove round %d "+
"from unchecked rounds store: %v", ri.ID, err)
}
}
}
}
......@@ -190,12 +196,14 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round,
" in round %d. This happening every once in a while is normal,"+
" but can be indicative of a problem if it is consistent",
m.TransmissionID, roundID)
err = m.Session.UncheckedRounds().Remove(roundID, identity.Source, identity.EphId)
if err != nil {
jww.ERROR.Printf("Failed to remove round %d: %+v", roundID, err)
if m.params.RealtimeOnly{
err = m.Session.UncheckedRounds().Remove(roundID, identity.Source, identity.EphId)
if err != nil {
jww.ERROR.Printf("Failed to remove round %d: %+v", roundID, err)
}
}
return message.Bundle{}, nil
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment