diff --git a/interfaces/params/message.go b/interfaces/params/message.go index fbf9779829b939145cf7bc1277fa79b5617b826a..27a8ebd7d626445cc4c2ace03772d5dbbb6105ed 100644 --- a/interfaces/params/message.go +++ b/interfaces/params/message.go @@ -16,6 +16,7 @@ type Messages struct { MessageReceptionWorkerPoolSize uint MaxChecksGarbledMessage uint GarbledMessageWait time.Duration + RealtimeOnly bool } func GetDefaultMessage() Messages { @@ -24,5 +25,6 @@ func GetDefaultMessage() Messages { MessageReceptionWorkerPoolSize: 4, MaxChecksGarbledMessage: 10, GarbledMessageWait: 15 * time.Minute, + RealtimeOnly: false, } } diff --git a/interfaces/params/network.go b/interfaces/params/network.go index 67d083802e516de5d0feca95a299416489561a8a..d4c37b06cc6e5b9e508185b226f02dec5df749df 100644 --- a/interfaces/params/network.go +++ b/interfaces/params/network.go @@ -34,6 +34,8 @@ type Network struct { // Determines if the state of every round processed is tracked in ram. // This is very memory intensive and is primarily used for debugging VerboseRoundTracking bool + //disables all attempts to pick up dropped or missed messages + RealtimeOnly bool Rounds Messages @@ -54,6 +56,7 @@ func GetDefaultNetwork() Network { FastPolling: true, BlacklistedNodes: make([]string, 0), VerboseRoundTracking: false, + RealtimeOnly: false, } n.Rounds = GetDefaultRounds() n.Messages = GetDefaultMessage() @@ -65,6 +68,13 @@ func (n Network) Marshal() ([]byte, error) { return json.Marshal(n) } +func (n Network) SetRealtimeOnlyAll()Network { + n.RealtimeOnly = true + n.Rounds.RealtimeOnly = true + n.Messages.RealtimeOnly = true + return n +} + // Obtain default Network parameters, or override with given parameters if set func GetNetworkParameters(params string) (Network, error) { p := GetDefaultNetwork() diff --git a/interfaces/params/rounds.go b/interfaces/params/rounds.go index 40bd41bfbd80a6cef2ecf30d8a1c645782f7b305..75e4270987ab2b5d8ea1c4edb6e49efb9f32a863 100644 --- a/interfaces/params/rounds.go +++ b/interfaces/params/rounds.go @@ -43,6 +43,9 @@ type Rounds struct { // Duration to wait before sending on a round times out and a new round is // tried SendTimeout time.Duration + + //disables all attempts to pick up dropped or missed messages + RealtimeOnly bool } func GetDefaultRounds() Rounds { @@ -58,5 +61,6 @@ func GetDefaultRounds() Rounds { UncheckRoundPeriod: 20 * time.Second, ForceMessagePickupRetry: false, SendTimeout: 1 * time.Second, + RealtimeOnly: false, } } diff --git a/network/follow.go b/network/follow.go index e25f5ff6cd43857aa7e5fd4cd5856f2a2eaa5172..89d3d1546a624fc4986445772fdd149bec1890a3 100644 --- a/network/follow.go +++ b/network/follow.go @@ -367,12 +367,17 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, jww.DEBUG.Printf("New Earliest Remaining: %d, Gateways last checked: %d", earliestRemaining, gwRoundsState.GetLastChecked()) } - roundsWithMessages2 := identity.UR.Iterate(func(rid id.Round) bool { - if gwRoundsState.Checked(rid) { - return rounds.Checker(rid, filterList, identity.CR) - } - return false - }, roundsUnknown, abandon) + var roundsWithMessages2 []id.Round + + if !m.param.RealtimeOnly{ + roundsWithMessages2 = identity.UR.Iterate(func(rid id.Round) bool { + if gwRoundsState.Checked(rid) { + return rounds.Checker(rid, filterList, identity.CR) + } + return false + }, roundsUnknown, abandon) + } + for _, rid := range roundsWithMessages { //denote that the round has been looked at in the tracking store diff --git a/network/rounds/check.go b/network/rounds/check.go index 3b9eb2d2849400d2fea2227373839cbc0def44e6..295a63f0abdb607660c0e6bc6865e7d026d608f6 100644 --- a/network/rounds/check.go +++ b/network/rounds/check.go @@ -56,6 +56,11 @@ func serializeRound(roundId id.Round) []byte { func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.IdentityUse) { ri, err := m.Instance.GetRound(roundID) if err != nil || m.params.ForceHistoricalRounds { + if m.params.RealtimeOnly { + jww.WARN.Printf("Skipping round %d because it is not in ram and we are realtime only mode", + roundID) + return + } if m.params.ForceHistoricalRounds { jww.WARN.Printf("Forcing use of historical rounds for round ID %d.", roundID) @@ -80,11 +85,14 @@ func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.Iden "up messages via in ram lookup", roundID, identity.EphId.Int64(), identity.Source) //store the round as an unretreived round - err = m.Session.UncheckedRounds().AddRound(roundID, ri, - identity.Source, identity.EphId) - if err != nil { - jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) + if !m.params.RealtimeOnly { + err = m.Session.UncheckedRounds().AddRound(roundID, ri, + identity.Source, identity.EphId) + if err != nil { + jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) + } } + // If found, send to Message Retrieval Workers m.lookupRoundMessages <- roundLookup{ roundInfo: ri, diff --git a/network/rounds/manager.go b/network/rounds/manager.go index c695cd7a8c78be622769a4388a669eda5757b304..f220583de8de98ab706b48db3a2b24d6a6168078 100644 --- a/network/rounds/manager.go +++ b/network/rounds/manager.go @@ -58,9 +58,12 @@ func (m *Manager) StartProcessors() stoppable.Stoppable { } // Start the periodic unchecked round worker - stopper := stoppable.NewSingle("UncheckRound") - go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper) - multi.Add(stopper) + if !m.params.RealtimeOnly{ + stopper := stoppable.NewSingle("UncheckRound") + go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper) + multi.Add(stopper) + } + return multi } diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index bd378ecf24728018dbdf824d97efc522bd227ed3..00f6675375243ef4025ad86f87751b578f185c1c 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -49,12 +49,15 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, case rl := <-m.lookupRoundMessages: ri := rl.roundInfo jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) - err := m.Session.UncheckedRounds().AddRound(id.Round(ri.ID), ri, - rl.identity.Source, rl.identity.EphId) - if err != nil { - jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", id.Round(ri.ID)) + if !m.params.RealtimeOnly{ + err := m.Session.UncheckedRounds().AddRound(id.Round(ri.ID), ri, + rl.identity.Source, rl.identity.EphId) + if err != nil { + jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", id.Round(ri.ID)) + } } + // Convert gateways in round to proper ID format gwIds := make([]*id.ID, len(ri.Topology)) for i, idBytes := range ri.Topology { @@ -73,7 +76,7 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, // messages first, randomize other members of the team var rndBytes [32]byte stream := m.Rng.GetStream() - _, err = stream.Read(rndBytes[:]) + _, err := stream.Read(rndBytes[:]) stream.Close() if err != nil { jww.FATAL.Panicf("Failed to randomize shuffle in round %d "+ @@ -129,12 +132,15 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, m.messageBundles <- bundle jww.DEBUG.Printf("Removing round %d from unchecked store", ri.ID) - err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID), rl.identity.Source, rl.identity.EphId) - if err != nil { - jww.ERROR.Printf("Could not remove round %d "+ - "from unchecked rounds store: %v", ri.ID, err) + if !m.params.RealtimeOnly{ + err = m.Session.UncheckedRounds().Remove(id.Round(ri.ID), rl.identity.Source, rl.identity.EphId) + if err != nil { + jww.ERROR.Printf("Could not remove round %d "+ + "from unchecked rounds store: %v", ri.ID, err) + } } + } } @@ -190,12 +196,14 @@ func (m *Manager) getMessagesFromGateway(roundID id.Round, " in round %d. This happening every once in a while is normal,"+ " but can be indicative of a problem if it is consistent", m.TransmissionID, roundID) - - err = m.Session.UncheckedRounds().Remove(roundID, identity.Source, identity.EphId) - if err != nil { - jww.ERROR.Printf("Failed to remove round %d: %+v", roundID, err) + if m.params.RealtimeOnly{ + err = m.Session.UncheckedRounds().Remove(roundID, identity.Source, identity.EphId) + if err != nil { + jww.ERROR.Printf("Failed to remove round %d: %+v", roundID, err) + } } + return message.Bundle{}, nil }