Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
client
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Terraform modules
Analyze
Contributor analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
elixxir
client
Commits
8b6834a7
Commit
8b6834a7
authored
2 years ago
by
Jonah Husson
Browse files
Options
Downloads
Patches
Plain Diff
Reintroduce logging changes w/o issue in roundchecker
parent
c404746b
No related branches found
No related tags found
2 merge requests
!562
Revert "Revert "Merge branch 'xx-4509/batch-pickup' into 'release'""
,
!515
Release
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
cmix/follow.go
+23
-20
23 additions, 20 deletions
cmix/follow.go
with
23 additions
and
20 deletions
cmix/follow.go
+
23
−
20
View file @
8b6834a7
...
@@ -149,18 +149,18 @@ func (c *client) followNetwork(report ClientErrorReport,
...
@@ -149,18 +149,18 @@ func (c *client) followNetwork(report ClientErrorReport,
c
.
latencySum
/
c
.
numLatencies
)
c
.
latencySum
/
c
.
numLatencies
)
c
.
latencySum
,
c
.
numLatencies
=
0
,
0
c
.
latencySum
,
c
.
numLatencies
=
0
,
0
infoMsg
:=
fmt
.
Sprintf
(
"Polled the network %d times in the "
+
infoMsg
:=
fmt
.
Sprintf
(
"
[Follow]
Polled the network %d times in the "
+
"last %s, with an average newest packet latency of %s"
,
"last %s, with an average newest packet latency of %s"
,
numPolls
,
debugTrackPeriod
,
latencyAvg
)
numPolls
,
debugTrackPeriod
,
latencyAvg
)
jww
.
INFO
.
Print
f
(
infoMsg
)
jww
.
INFO
.
Print
ln
(
infoMsg
)
c
.
events
.
Report
(
1
,
"Polling"
,
"MetricsWithLatency"
,
infoMsg
)
c
.
events
.
Report
(
1
,
"Polling"
,
"MetricsWithLatency"
,
infoMsg
)
}
else
{
}
else
{
infoMsg
:=
fmt
.
Sprintf
(
infoMsg
:=
fmt
.
Sprintf
(
"Polled the network %d times in the last %s"
,
numPolls
,
"
[Follow]
Polled the network %d times in the last %s"
,
numPolls
,
debugTrackPeriod
)
debugTrackPeriod
)
jww
.
INFO
.
Print
f
(
infoMsg
)
jww
.
INFO
.
Print
ln
(
infoMsg
)
c
.
events
.
Report
(
1
,
"Polling"
,
"Metrics"
,
infoMsg
)
c
.
events
.
Report
(
1
,
"Polling"
,
"Metrics"
,
infoMsg
)
}
}
}
}
...
@@ -204,7 +204,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -204,7 +204,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
var
startTime
time
.
Time
var
startTime
time
.
Time
result
,
err
:=
c
.
SendToAny
(
func
(
host
*
connect
.
Host
)
(
interface
{},
error
)
{
result
,
err
:=
c
.
SendToAny
(
func
(
host
*
connect
.
Host
)
(
interface
{},
error
)
{
jww
.
DEBUG
.
Printf
(
"Executing poll for %v(%s) range: %s-%s(%s) from %s"
,
jww
.
DEBUG
.
Printf
(
"
[Follow]
Executing poll for %v(%s) range: %s-%s(%s) from %s"
,
identity
.
EphId
.
Int64
(),
identity
.
Source
,
identity
.
StartValid
,
identity
.
EphId
.
Int64
(),
identity
.
Source
,
identity
.
StartValid
,
identity
.
EndValid
,
identity
.
EndValid
.
Sub
(
identity
.
StartValid
),
identity
.
EndValid
,
identity
.
EndValid
.
Sub
(
identity
.
StartValid
),
host
.
GetId
())
host
.
GetId
())
...
@@ -233,7 +233,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -233,7 +233,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
}
}
errMsg
:=
fmt
.
Sprintf
(
"Unable to poll gateway: %+v"
,
err
)
errMsg
:=
fmt
.
Sprintf
(
"Unable to poll gateway: %+v"
,
err
)
c
.
events
.
Report
(
10
,
"Polling"
,
"Error"
,
errMsg
)
c
.
events
.
Report
(
10
,
"Polling"
,
"Error"
,
errMsg
)
jww
.
ERROR
.
Print
(
errMsg
)
jww
.
ERROR
.
Print
(
"[Follow] "
+
errMsg
)
return
return
}
}
...
@@ -248,7 +248,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -248,7 +248,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
gwRoundsState
:=
&
knownRounds
.
KnownRounds
{}
gwRoundsState
:=
&
knownRounds
.
KnownRounds
{}
err
=
gwRoundsState
.
Unmarshal
(
pollResp
.
KnownRounds
)
err
=
gwRoundsState
.
Unmarshal
(
pollResp
.
KnownRounds
)
if
err
!=
nil
{
if
err
!=
nil
{
jww
.
ERROR
.
Printf
(
"Failed to unmarshal: %+v"
,
err
)
jww
.
ERROR
.
Printf
(
"
[Follow]
Failed to unmarshal: %+v"
,
err
)
return
return
}
}
...
@@ -258,7 +258,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -258,7 +258,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
if
pollResp
.
PartialNDF
!=
nil
{
if
pollResp
.
PartialNDF
!=
nil
{
err
=
c
.
instance
.
UpdatePartialNdf
(
pollResp
.
PartialNDF
)
err
=
c
.
instance
.
UpdatePartialNdf
(
pollResp
.
PartialNDF
)
if
err
!=
nil
{
if
err
!=
nil
{
jww
.
ERROR
.
Printf
(
"Unable to update partial NDF: %+v"
,
err
)
jww
.
ERROR
.
Printf
(
"
[Follow]
Unable to update partial NDF: %+v"
,
err
)
return
return
}
}
...
@@ -302,7 +302,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -302,7 +302,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
// Obtain relevant NodeGateway information
// Obtain relevant NodeGateway information
nid
,
err
:=
id
.
Unmarshal
(
clientErr
.
Source
)
nid
,
err
:=
id
.
Unmarshal
(
clientErr
.
Source
)
if
err
!=
nil
{
if
err
!=
nil
{
jww
.
ERROR
.
Printf
(
"Unable to get NodeID: %+v"
,
err
)
jww
.
ERROR
.
Printf
(
"
[Follow]
Unable to get NodeID: %+v"
,
err
)
return
return
}
}
...
@@ -322,7 +322,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -322,7 +322,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
// with ClientErrors
// with ClientErrors
err
=
c
.
instance
.
RoundUpdates
(
pollResp
.
Updates
)
err
=
c
.
instance
.
RoundUpdates
(
pollResp
.
Updates
)
if
err
!=
nil
{
if
err
!=
nil
{
jww
.
ERROR
.
Printf
(
"%+v"
,
err
)
jww
.
ERROR
.
Printf
(
"
[Follow]
%+v"
,
err
)
return
return
}
}
...
@@ -344,12 +344,12 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -344,12 +344,12 @@ func (c *client) follow(identity receptionID.IdentityUse,
// ---- Identity Specific Round Processing -----
// ---- Identity Specific Round Processing -----
if
identity
.
Fake
{
if
identity
.
Fake
{
jww
.
DEBUG
.
Printf
(
"Not processing result, identity.Fake == true"
)
jww
.
DEBUG
.
Printf
(
"
[Follow]
Not processing result, identity.Fake == true"
)
return
return
}
}
if
len
(
pollResp
.
Filters
.
Filters
)
==
0
{
if
len
(
pollResp
.
Filters
.
Filters
)
==
0
{
jww
.
TRACE
.
Printf
(
"No filters found for the passed ID %d (%s), "
+
jww
.
TRACE
.
Printf
(
"
[Follow]
No filters found for the passed ID %d (%s), "
+
"skipping processing."
,
identity
.
EphId
.
Int64
(),
identity
.
Source
)
"skipping processing."
,
identity
.
EphId
.
Int64
(),
identity
.
Source
)
return
return
}
}
...
@@ -367,7 +367,6 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -367,7 +367,6 @@ func (c *client) follow(identity receptionID.IdentityUse,
// are messages waiting in rounds and then sends signals to the appropriate
// are messages waiting in rounds and then sends signals to the appropriate
// handling threads
// handling threads
roundChecker
:=
func
(
rid
id
.
Round
)
bool
{
roundChecker
:=
func
(
rid
id
.
Round
)
bool
{
// IMPORTANT: DO NOT PUT LOGS HERE, IT RUNS TOO OFTEN AND WILL BREAK
hasMessage
:=
Checker
(
rid
,
filterList
,
identity
.
CR
)
hasMessage
:=
Checker
(
rid
,
filterList
,
identity
.
CR
)
if
!
hasMessage
&&
c
.
verboseRounds
!=
nil
{
if
!
hasMessage
&&
c
.
verboseRounds
!=
nil
{
c
.
verboseRounds
.
denote
(
rid
,
RoundState
(
NoMessageAvailable
))
c
.
verboseRounds
.
denote
(
rid
,
RoundState
(
NoMessageAvailable
))
...
@@ -389,8 +388,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -389,8 +388,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
// received on this ID by using an estimate of how many rounds the
// received on this ID by using an estimate of how many rounds the
// network runs per second
// network runs per second
timeSinceStartValid := netTime.Now().Sub(identity.StartValid)
timeSinceStartValid := netTime.Now().Sub(identity.StartValid)
roundsDelta :=
roundsDelta := uint((timeSinceStartValid / time.Second).Seconds() * estimatedRoundsPerSecond)
uint(timeSinceStartValid / time.Second * estimatedRoundsPerSecond)
if roundsDelta < c.param.KnownRoundsThreshold {
if roundsDelta < c.param.KnownRoundsThreshold {
roundsDelta = c.param.KnownRoundsThreshold
roundsDelta = c.param.KnownRoundsThreshold
}
}
...
@@ -398,9 +396,13 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -398,9 +396,13 @@ func (c *client) follow(identity receptionID.IdentityUse,
if id.Round(roundsDelta) > lastCheckedRound {
if id.Round(roundsDelta) > lastCheckedRound {
// Handles edge case for new networks to prevent starting at
// Handles edge case for new networks to prevent starting at
// negative rounds
// negative rounds
jww.WARN.Printf("[Follow] roundsDelta(%d) > lastCheckedRound(%d)",
roundsDelta, lastCheckedRound)
updatedEarliestRound = 1
updatedEarliestRound = 1
} else {
} else {
updatedEarliestRound = lastCheckedRound - id.Round(roundsDelta)
updatedEarliestRound = lastCheckedRound - id.Round(roundsDelta)
jww.TRACE.Printf("[Follow] UpdatedEarliestRound (%d) set to %d - %d",
updatedEarliestRound, lastCheckedRound, id.Round(roundsDelta))
earliestFilterRound := filterList[0].FirstRound() // Length of filterList always > 0
earliestFilterRound := filterList[0].FirstRound() // Length of filterList always > 0
// If the network appears to be moving faster than our estimate,
// If the network appears to be moving faster than our estimate,
...
@@ -409,6 +411,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -409,6 +411,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
// as long as contacted gateway has all data
// as long as contacted gateway has all data
if updatedEarliestRound > earliestFilterRound {
if updatedEarliestRound > earliestFilterRound {
updatedEarliestRound = earliestFilterRound
updatedEarliestRound = earliestFilterRound
jww.TRACE.Printf("[Follow] updatedEarliestRound set to earliestFilterRound (%d)", earliestFilterRound)
}
}
}
}
identity.ER.Set(updatedEarliestRound)
identity.ER.Set(updatedEarliestRound)
...
@@ -423,7 +426,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -423,7 +426,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
gwRoundsState
.
RangeUnchecked
(
gwRoundsState
.
RangeUnchecked
(
updatedEarliestRound
,
c
.
param
.
KnownRoundsThreshold
,
roundChecker
,
100
)
updatedEarliestRound
,
c
.
param
.
KnownRoundsThreshold
,
roundChecker
,
100
)
jww
.
DEBUG
.
Printf
(
"Processed RangeUnchecked for %d, Oldest: %d, "
+
jww
.
DEBUG
.
Printf
(
"
[Follow]
Processed RangeUnchecked for %d, Oldest: %d, "
+
"firstUnchecked: %d, last Checked: %d, threshold: %d, "
+
"firstUnchecked: %d, last Checked: %d, threshold: %d, "
+
"NewEarliestRemaining: %d, NumWithMessages: %d, NumUnknown: %d"
,
"NewEarliestRemaining: %d, NumWithMessages: %d, NumUnknown: %d"
,
identity
.
EphId
.
Int64
(),
updatedEarliestRound
,
gwRoundsState
.
GetFirstUnchecked
(),
identity
.
EphId
.
Int64
(),
updatedEarliestRound
,
gwRoundsState
.
GetFirstUnchecked
(),
...
@@ -432,9 +435,9 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -432,9 +435,9 @@ func (c *client) follow(identity receptionID.IdentityUse,
_
,
_
,
changed
:=
identity
.
ER
.
Set
(
earliestRemaining
)
_
,
_
,
changed
:=
identity
.
ER
.
Set
(
earliestRemaining
)
if
changed
{
if
changed
{
jww
.
TRACE
.
Printf
(
"External returns of RangeUnchecked: %d, %v, %v"
,
jww
.
DEBUG
.
Printf
(
"
[Follow]
External returns of RangeUnchecked: %d, %v, %v"
,
earliestRemaining
,
roundsWithMessages
,
roundsUnknown
)
earliestRemaining
,
roundsWithMessages
,
roundsUnknown
)
jww
.
DEBUG
.
Printf
(
"New Earliest Remaining: %d, Gateways last checked: %d"
,
jww
.
DEBUG
.
Printf
(
"
[Follow]
New Earliest Remaining: %d, Gateways last checked: %d"
,
earliestRemaining
,
gwRoundsState
.
GetLastChecked
())
earliestRemaining
,
gwRoundsState
.
GetLastChecked
())
}
}
...
@@ -458,7 +461,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -458,7 +461,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
identity
.
CR
.
Prune
()
identity
.
CR
.
Prune
()
err
=
identity
.
CR
.
SaveCheckedRounds
()
err
=
identity
.
CR
.
SaveCheckedRounds
()
if
err
!=
nil
{
if
err
!=
nil
{
jww
.
ERROR
.
Printf
(
"Could not save rounds for identity %d (%s): %+v"
,
jww
.
ERROR
.
Printf
(
"
[Follow]
Could not save rounds for identity %d (%s): %+v"
,
identity
.
EphId
.
Int64
(),
identity
.
Source
,
err
)
identity
.
EphId
.
Int64
(),
identity
.
Source
,
err
)
}
}
...
@@ -473,7 +476,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
...
@@ -473,7 +476,7 @@ func (c *client) follow(identity receptionID.IdentityUse,
trackingStart
=
earliestRemaining
-
id
.
Round
(
c
.
param
.
KnownRoundsThreshold
)
trackingStart
=
earliestRemaining
-
id
.
Round
(
c
.
param
.
KnownRoundsThreshold
)
}
}
jww
.
DEBUG
.
Printf
(
"Rounds tracked: %v to %v"
,
trackingStart
,
earliestRemaining
)
jww
.
DEBUG
.
Printf
(
"
[Follow]
Rounds tracked: %v to %v"
,
trackingStart
,
earliestRemaining
)
for
i
:=
trackingStart
;
i
<=
earliestRemaining
;
i
++
{
for
i
:=
trackingStart
;
i
<=
earliestRemaining
;
i
++
{
state
:=
Unchecked
state
:=
Unchecked
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment