Skip to content
Snippets Groups Projects
Commit 17962663 authored by Jake Taylor's avatar Jake Taylor
Browse files

Made contexts require timeout, added default timeout to hostparams

parent bfe43343
No related branches found
No related tags found
No related merge requests found
......@@ -44,7 +44,7 @@ func (c *ProtoComms) clientHandshake(host *Host) (err error) {
// Set up the context
client := pb.NewGenericClient(host.connection)
ctx, cancel := MessagingContext()
ctx, cancel := MessagingContext(host.GetSendTimeout())
defer cancel()
// Send the token request message
......@@ -75,7 +75,7 @@ func (c *ProtoComms) clientHandshake(host *Host) (err error) {
}
// Set up the context
ctx, cancel = MessagingContext()
ctx, cancel = MessagingContext(host.GetSendTimeout())
defer cancel()
// Send the authenticate token message
......
......@@ -17,23 +17,15 @@ import (
"time"
)
// Used for creating connections with the default timeout
func ConnectionContext(waitingPeriod time.Duration) (context.Context, context.CancelFunc) {
// MessagingContext builds a context for connections and message sending with the given timeout
func MessagingContext(waitingPeriod time.Duration) (context.Context, context.CancelFunc) {
jww.DEBUG.Printf("Timing out in: %s", waitingPeriod)
ctx, cancel := context.WithTimeout(context.Background(),
waitingPeriod)
return ctx, cancel
}
// Used for sending messages with the default timeout
func MessagingContext() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithTimeout(context.Background(),
2*time.Minute)
return ctx, cancel
}
// Creates a context object with the default context
// StreamingContext Creates a context object with the default context
// for all client streaming messages. This is primarily used to
// allow a cancel option for clients and is suitable for unary streaming.
func StreamingContext() (context.Context, context.CancelFunc) {
......@@ -41,7 +33,7 @@ func StreamingContext() (context.Context, context.CancelFunc) {
return ctx, cancel
}
// Obtain address:port from the context of an incoming communication
// GetAddressFromContext obtains address:port from the context of an incoming communication
func GetAddressFromContext(ctx context.Context) (address string, port string, err error) {
info, _ := peer.FromContext(ctx)
address, port, err = net.SplitHostPort(info.Addr.String())
......
......@@ -40,7 +40,7 @@ var KaClientOpts = keepalive.ClientParameters{
Time: infinityTime,
// 60s after ping before closing
Timeout: 60 * time.Second,
// For all connections, streaming and nonstreaming
// For all connections, with and without streaming
PermitWithoutStream: true,
}
......@@ -167,6 +167,11 @@ func (h *Host) Connected() (bool, uint64) {
return h.isAlive() && !h.authenticationRequired(), h.connectionCount
}
// GetSendTimeout returns the timeout for message sending
func (h *Host) GetSendTimeout() time.Duration {
return h.params.SendTimeout
}
// GetId returns the id of the host
func (h *Host) GetId() *id.ID {
if h == nil {
......@@ -370,7 +375,7 @@ func (h *Host) connectHelper() (err error) {
if backoffTime > 15000 {
backoffTime = 15000
}
ctx, cancel := ConnectionContext(time.Duration(backoffTime) * time.Millisecond)
ctx, cancel := MessagingContext(time.Duration(backoffTime) * time.Millisecond)
// Create the connection
h.connection, err = grpc.DialContext(ctx, h.GetAddress(),
......
......@@ -11,12 +11,12 @@ import (
"time"
)
// Params object for host creation
// HostParams is the configuration object for Host creation
type HostParams struct {
MaxRetries uint32
AuthEnabled bool
// Toggles cool off of connections
// Toggles connection cool off
EnableCoolOff bool
// Number of leaky bucket sends before it stops
......@@ -25,15 +25,18 @@ type HostParams struct {
// Amount of time after a cool off is triggered before allowed to send again
CoolOffTimeout time.Duration
// Message sending timeout
SendTimeout time.Duration
// If set, metric handling will be enabled on this host
EnableMetrics bool
// List of sending errors that are deemed unimportant
// Reception of these errors will not update the Metric's state
// Reception of these errors will not update the Metric state
ExcludeMetricErrors []string
}
// Get default set of host params
// GetDefaultHostParams Get default set of host params
func GetDefaultHostParams() HostParams {
return HostParams{
MaxRetries: 100,
......@@ -41,6 +44,7 @@ func GetDefaultHostParams() HostParams {
EnableCoolOff: false,
NumSendsBeforeCoolOff: 3,
CoolOffTimeout: 60 * time.Second,
SendTimeout: 2 * time.Minute,
EnableMetrics: false,
ExcludeMetricErrors: make([]string, 0),
}
......
......@@ -20,13 +20,13 @@ import (
)
// CMixServer -> consensus node Send Function
func (s *CMixServer) GetNdf(host *connect.Host,
func (c *CMixServer) GetNdf(host *connect.Host,
message *messages.Ping) (*NDF, error) {
// Create the Send Function
f := func(conn *grpc.ClientConn) (*any.Any, error) {
// Set up the context
ctx, cancel := connect.MessagingContext()
ctx, cancel := connect.MessagingContext(host.GetSendTimeout())
defer cancel()
//Format to authenticated message type
// Send the message
......@@ -40,7 +40,7 @@ func (s *CMixServer) GetNdf(host *connect.Host,
// Execute the Send function
jww.DEBUG.Printf("Sending Post Phase message: %+v", message)
resultMsg, err := s.Send(host, f)
resultMsg, err := c.Send(host, f)
if err != nil {
return nil, err
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment