Skip to content
Snippets Groups Projects
Commit edf3a780 authored by Jake Taylor's avatar Jake Taylor :lips:
Browse files

Merge branch 'hotfix/KaOpts' into 'release'

Hotfix/ka opts

See merge request xx_network/comms!70
parents 191473de 9d26bfa9
No related branches found
No related tags found
1 merge request!11Release
...@@ -26,9 +26,11 @@ import ( ...@@ -26,9 +26,11 @@ import (
"time" "time"
) )
// TODO: Set these via config // MaxWindowSize 4 MB
const MaxWindowSize = math.MaxInt32
// KaOpts are Keepalive options for servers // KaOpts are Keepalive options for servers
// TODO: Set these via config
var KaOpts = keepalive.ServerParameters{ var KaOpts = keepalive.ServerParameters{
// Idle for at most 60s // Idle for at most 60s
MaxConnectionIdle: 60 * time.Second, MaxConnectionIdle: 60 * time.Second,
...@@ -36,16 +38,16 @@ var KaOpts = keepalive.ServerParameters{ ...@@ -36,16 +38,16 @@ var KaOpts = keepalive.ServerParameters{
MaxConnectionAge: 1 * time.Hour, MaxConnectionAge: 1 * time.Hour,
// w/ 1m grace shutdown // w/ 1m grace shutdown
MaxConnectionAgeGrace: 1 * time.Minute, MaxConnectionAgeGrace: 1 * time.Minute,
// Never ping to keepalive // Send keepAlive every Time interval
Time: infinityTime, Time: 5 * time.Second,
// Close connection 60 seconds after ping // Timeout after last successful keepAlive to close connection
Timeout: 60 * time.Second, Timeout: 60 * time.Second,
} }
// KaEnforcement are keepalive enforcement options for servers // KaEnforcement are keepalive enforcement options for servers
var KaEnforcement = keepalive.EnforcementPolicy{ var KaEnforcement = keepalive.EnforcementPolicy{
// Client should never send keep alive ping // Send keepAlive every Time interval
MinTime: infinityTime, MinTime: 3 * time.Second,
// Doing KA on non-streams is OK // Doing KA on non-streams is OK
PermitWithoutStream: true, PermitWithoutStream: true,
} }
......
...@@ -22,7 +22,6 @@ import ( ...@@ -22,7 +22,6 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"math" "math"
"net" "net"
"strings" "strings"
...@@ -32,22 +31,6 @@ import ( ...@@ -32,22 +31,6 @@ import (
"time" "time"
) )
const infinityTime = time.Duration(math.MaxInt64)
//4 MB
const MaxWindowSize = math.MaxInt32
// KaClientOpts are the keepalive options for clients
// TODO: Set via configuration
var KaClientOpts = keepalive.ClientParameters{
// Never ping to keepalive
Time: infinityTime,
// 60s after ping before closing
Timeout: 60 * time.Second,
// For all connections, with and without streaming
PermitWithoutStream: true,
}
// Information used to describe a connection to a host // Information used to describe a connection to a host
type Host struct { type Host struct {
// System-wide ID of the Host // System-wide ID of the Host
...@@ -396,7 +379,7 @@ func (h *Host) connectHelper() (err error) { ...@@ -396,7 +379,7 @@ func (h *Host) connectHelper() (err error) {
dialOpts := []grpc.DialOption{ dialOpts := []grpc.DialOption{
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithKeepaliveParams(KaClientOpts), grpc.WithKeepaliveParams(h.params.KaClientOpts),
securityDial, securityDial,
} }
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
package connect package connect
import ( import (
"google.golang.org/grpc/keepalive"
"time" "time"
) )
...@@ -34,6 +35,9 @@ type HostParams struct { ...@@ -34,6 +35,9 @@ type HostParams struct {
// List of sending errors that are deemed unimportant // List of sending errors that are deemed unimportant
// Reception of these errors will not update the Metric state // Reception of these errors will not update the Metric state
ExcludeMetricErrors []string ExcludeMetricErrors []string
// KeepAlive Options for Host connections
KaClientOpts keepalive.ClientParameters
} }
// GetDefaultHostParams Get default set of host params // GetDefaultHostParams Get default set of host params
...@@ -47,5 +51,13 @@ func GetDefaultHostParams() HostParams { ...@@ -47,5 +51,13 @@ func GetDefaultHostParams() HostParams {
SendTimeout: 2 * time.Minute, SendTimeout: 2 * time.Minute,
EnableMetrics: false, EnableMetrics: false,
ExcludeMetricErrors: make([]string, 0), ExcludeMetricErrors: make([]string, 0),
KaClientOpts: keepalive.ClientParameters{
// Send keepAlive every Time interval
Time: 5 * time.Second,
// Timeout after last successful keepAlive to close connection
Timeout: 60 * time.Second,
// For all connections, with and without streaming
PermitWithoutStream: true,
},
} }
} }
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"sync" "sync"
"testing" "testing"
"time"
) )
// The Manager object provides thread-safe access // The Manager object provides thread-safe access
...@@ -103,12 +104,35 @@ func (m *Manager) DisconnectAll() { ...@@ -103,12 +104,35 @@ func (m *Manager) DisconnectAll() {
} }
} }
// StartConnectionReport begins intermittently printing connection information
func (m *Manager) StartConnectionReport() {
go func() {
ticker := time.NewTicker(60 * time.Second)
for {
select {
case _ = <-ticker.C:
jww.INFO.Printf(m.String())
}
}
}()
}
// Implements Stringer for debug printing // Implements Stringer for debug printing
func (m *Manager) String() string { func (m *Manager) String() string {
var result bytes.Buffer var result bytes.Buffer
i := uint32(0)
result.WriteString(fmt.Sprintf("Host Manager Connections\n"))
m.mux.RLock()
for k, host := range m.connections { for k, host := range m.connections {
result.WriteString(fmt.Sprintf("[%s]: %+v", isConnected, _ := host.Connected()
(&k).String(), host)) if isConnected {
i++
}
result.WriteString(fmt.Sprintf("[%s] IsConnected: %t\n",
(&k).String(), isConnected))
} }
m.mux.RUnlock()
result.WriteString(fmt.Sprintf("%d/%d Hosts connected", i, len(m.connections)))
return result.String() return result.String()
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment