Skip to content
Snippets Groups Projects
Commit 101ed7a9 authored by Jake Taylor's avatar Jake Taylor
Browse files

add configurable flag for lazy connections to hostParams.go

parent 7c497413
No related branches found
No related tags found
2 merge requests!39Merge release into master,!27add configurable flag for lazy connections to hostParams.go
......@@ -31,7 +31,7 @@ import (
"time"
)
// Information used to describe a connection to a host
// Host information used to describe a remote connection
type Host struct {
// System-wide ID of the Host
id *id.ID
......@@ -53,6 +53,9 @@ type Host struct {
// GRPC connection object
connection *grpc.ClientConn
connectionCount uint64
// lock which ensures only a single thread is connecting at a time and
// that connections do not interrupt sends
connectionMux sync.RWMutex
// TLS credentials object used to establish the connection
credentials credentials.TransportCredentials
......@@ -63,10 +66,6 @@ type Host struct {
// State tracking for host metric
metrics *Metric
// lock which ensures only a single thread is connecting at a time and
// that connections do not interrupt sends
connectionMux sync.RWMutex
coolOffBucket *rateLimiting.Bucket
inCoolOff bool
......@@ -78,7 +77,7 @@ type Host struct {
windowSize *int32
}
// Creates a new Host object
// NewHost creates a new Host object
func NewHost(id *id.ID, address string, cert []byte, params HostParams) (host *Host, err error) {
windowSize := int32(0)
......@@ -108,16 +107,25 @@ func NewHost(id *id.ID, address string, cert []byte, params HostParams) (host *H
// Configure the host credentials
err = host.setCredentials()
if err != nil {
return
}
// the amount of data, when streaming, that a sender can send before receiving an ACK
// Connect immediately if configured to do so
if !params.LazyConnection {
// No mutex required
err = host.connect()
}
return
}
// SetWindowSize sets the amount of data, when streaming, that a sender can send before receiving an ACK
// keep at zero to use the default GRPC algorithm to determine
func (h *Host) SetWindowSize(size int32) {
atomic.StoreInt32(h.windowSize, size)
}
// Simple getter for the public key
// GetPubKey simple getter for the public key
func (h *Host) GetPubKey() *rsa.PublicKey {
return h.rsaPublicKey
}
......@@ -132,7 +140,7 @@ func (h *Host) Connected() (bool, uint64) {
}
// connectedUnsafe checks if the given Host's connection is alive without taking
// a connection lock. Only use if already under a connection lock. The the uint is
// a connection lock. Only use if already under a connection lock. The uint is
//the connection count, it increments every time a reconnect occurs
func (h *Host) connectedUnsafe() (bool, uint64) {
return h.isAlive() && !h.authenticationRequired(), h.connectionCount
......@@ -143,7 +151,7 @@ func (h *Host) GetMessagingContext() (context.Context, context.CancelFunc) {
return h.GetMessagingContextWithTimeout(h.params.SendTimeout)
}
// GetMessagingContext returns a context object for message sending configured according to HostParams
// GetMessagingContextWithTimeout returns a context object for message sending configured according to HostParams
func (h *Host) GetMessagingContextWithTimeout(
timeout time.Duration) (context.Context, context.CancelFunc) {
return newContext(timeout)
......@@ -189,7 +197,7 @@ func (h *Host) isExcludedMetricError(err string) bool {
return false
}
// Sets the host metrics to an arbitrary value. Used for testing
// SetMetricsTesting sets the host metrics to an arbitrary value. Used for testing
// purposes only
func (h *Host) SetMetricsTesting(m *Metric, face interface{}) {
// Ensure that this function is only run in testing environments
......@@ -204,17 +212,17 @@ func (h *Host) SetMetricsTesting(m *Metric, face interface{}) {
}
// Disconnect closes a the Host connection under the write lock
// Due to asynchorous connection handling, this may result in
// Disconnect closes the Host connection under the write lock
// Due to asynchronous connection handling, this may result in
// killing a good connection and could result in an immediate
// reconnection by a seperate thread
// reconnection by a separate thread
func (h *Host) Disconnect() {
h.connectionMux.Lock()
defer h.connectionMux.Unlock()
h.disconnect()
}
// ConditionalDisconnect closes a the Host connection under the write lock only
// ConditionalDisconnect closes the Host connection under the write lock only
// if the connection count has not increased
func (h *Host) conditionalDisconnect(count uint64) {
if count == h.connectionCount {
......@@ -222,7 +230,7 @@ func (h *Host) conditionalDisconnect(count uint64) {
}
}
// Returns whether or not the Host is able to be contacted
// IsOnline returns whether the Host is able to be contacted
// before the timeout by attempting to dial a tcp connection
// Returns how long the ping took, and whether it was successful
func (h *Host) IsOnline() (time.Duration, bool) {
......@@ -281,7 +289,7 @@ func (h *Host) connect() error {
}
// authenticationRequired Checks if new authentication is required with
// the remote. This is used exclusively under the lock in protocoms.transmit so
// the remote. This is used exclusively under the lock in protocomm.transmit so
// no lock is needed
func (h *Host) authenticationRequired() bool {
return h.params.AuthEnabled && !h.transmissionToken.Has()
......@@ -298,10 +306,10 @@ func (h *Host) isAlive() bool {
state == connectivity.Ready
}
// disconnect closes a the Host connection while not under a write lock.
// disconnect closes the Host connection while not under a write lock.
// undefined behavior if the caller has not taken the write lock
func (h *Host) disconnect() {
// its possible to close a host which never sent so it never made a
// it's possible to close a host which never sent so that it never made a
// connection. In that case, we should not close a connection which does not
// exist
if h.connection != nil {
......@@ -442,7 +450,7 @@ func (h *Host) String() string {
h.id, addr)
}
// Stringer interface for connection
// StringVerbose stringer interface for connection
func (h *Host) StringVerbose() string {
return fmt.Sprintf("%s\t CERTIFICATE: %s", h, h.certificate)
}
......
......@@ -41,6 +41,10 @@ type HostParams struct {
// If set, metric handling will be enabled on this host
EnableMetrics bool
// If true, a connection will only be established when a comm is sent
// else, a connection will be established immediately upon host creation
LazyConnection bool
// List of sending errors that are deemed unimportant
// Reception of these errors will not update the Metric state
ExcludeMetricErrors []string
......@@ -61,6 +65,7 @@ func GetDefaultHostParams() HostParams {
SendTimeout: 2 * time.Minute,
PingTimeout: 5 * time.Second,
EnableMetrics: false,
LazyConnection: true,
ExcludeMetricErrors: make([]string, 0),
KaClientOpts: keepalive.ClientParameters{
// Send keepAlive every Time interval
......
......@@ -88,7 +88,7 @@ func (c *ProtoComms) connect(host *Host, count uint64) (uint64, error) {
}
//if the connection is alive return, it is possible for another transmission
//to connect between releasing the read lock and taking the write lick
//to connect between releasing the read lock and taking the write lock
if !host.isAlive() {
//connect to host
jww.INFO.Printf("Host %s not connected, attempting to connect...",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment