diff --git a/connect/host.go b/connect/host.go index 38df1126ad3a77762035c6764b55299a0b646b61..f786293a5e892b690f15b3f2346480758906c59c 100644 --- a/connect/host.go +++ b/connect/host.go @@ -31,7 +31,7 @@ import ( "time" ) -// Information used to describe a connection to a host +// Host information used to describe a remote connection type Host struct { // System-wide ID of the Host id *id.ID @@ -53,6 +53,9 @@ type Host struct { // GRPC connection object connection *grpc.ClientConn connectionCount uint64 + // lock which ensures only a single thread is connecting at a time and + // that connections do not interrupt sends + connectionMux sync.RWMutex // TLS credentials object used to establish the connection credentials credentials.TransportCredentials @@ -63,10 +66,6 @@ type Host struct { // State tracking for host metric metrics *Metric - // lock which ensures only a single thread is connecting at a time and - // that connections do not interrupt sends - connectionMux sync.RWMutex - coolOffBucket *rateLimiting.Bucket inCoolOff bool @@ -78,7 +77,7 @@ type Host struct { windowSize *int32 } -// Creates a new Host object +// NewHost creates a new Host object func NewHost(id *id.ID, address string, cert []byte, params HostParams) (host *Host, err error) { windowSize := int32(0) @@ -108,16 +107,25 @@ func NewHost(id *id.ID, address string, cert []byte, params HostParams) (host *H // Configure the host credentials err = host.setCredentials() + if err != nil { + return + } + + // Connect immediately if configured to do so + if !params.LazyConnection { + // No mutex required + err = host.connect() + } return } -// the amount of data, when streaming, that a sender can send before receiving an ACK +// SetWindowSize sets the amount of data, when streaming, that a sender can send before receiving an ACK // keep at zero to use the default GRPC algorithm to determine func (h *Host) SetWindowSize(size int32) { atomic.StoreInt32(h.windowSize, size) } -// Simple getter for the public key +// GetPubKey simple getter for the public key func (h *Host) GetPubKey() *rsa.PublicKey { return h.rsaPublicKey } @@ -132,7 +140,7 @@ func (h *Host) Connected() (bool, uint64) { } // connectedUnsafe checks if the given Host's connection is alive without taking -// a connection lock. Only use if already under a connection lock. The the uint is +// a connection lock. Only use if already under a connection lock. The uint is //the connection count, it increments every time a reconnect occurs func (h *Host) connectedUnsafe() (bool, uint64) { return h.isAlive() && !h.authenticationRequired(), h.connectionCount @@ -143,7 +151,7 @@ func (h *Host) GetMessagingContext() (context.Context, context.CancelFunc) { return h.GetMessagingContextWithTimeout(h.params.SendTimeout) } -// GetMessagingContext returns a context object for message sending configured according to HostParams +// GetMessagingContextWithTimeout returns a context object for message sending configured according to HostParams func (h *Host) GetMessagingContextWithTimeout( timeout time.Duration) (context.Context, context.CancelFunc) { return newContext(timeout) @@ -189,7 +197,7 @@ func (h *Host) isExcludedMetricError(err string) bool { return false } -// Sets the host metrics to an arbitrary value. Used for testing +// SetMetricsTesting sets the host metrics to an arbitrary value. Used for testing // purposes only func (h *Host) SetMetricsTesting(m *Metric, face interface{}) { // Ensure that this function is only run in testing environments @@ -204,17 +212,17 @@ func (h *Host) SetMetricsTesting(m *Metric, face interface{}) { } -// Disconnect closes a the Host connection under the write lock -// Due to asynchorous connection handling, this may result in +// Disconnect closes the Host connection under the write lock +// Due to asynchronous connection handling, this may result in // killing a good connection and could result in an immediate -// reconnection by a seperate thread +// reconnection by a separate thread func (h *Host) Disconnect() { h.connectionMux.Lock() defer h.connectionMux.Unlock() h.disconnect() } -// ConditionalDisconnect closes a the Host connection under the write lock only +// ConditionalDisconnect closes the Host connection under the write lock only // if the connection count has not increased func (h *Host) conditionalDisconnect(count uint64) { if count == h.connectionCount { @@ -222,7 +230,7 @@ func (h *Host) conditionalDisconnect(count uint64) { } } -// Returns whether or not the Host is able to be contacted +// IsOnline returns whether the Host is able to be contacted // before the timeout by attempting to dial a tcp connection // Returns how long the ping took, and whether it was successful func (h *Host) IsOnline() (time.Duration, bool) { @@ -281,7 +289,7 @@ func (h *Host) connect() error { } // authenticationRequired Checks if new authentication is required with -// the remote. This is used exclusively under the lock in protocoms.transmit so +// the remote. This is used exclusively under the lock in protocomm.transmit so // no lock is needed func (h *Host) authenticationRequired() bool { return h.params.AuthEnabled && !h.transmissionToken.Has() @@ -298,10 +306,10 @@ func (h *Host) isAlive() bool { state == connectivity.Ready } -// disconnect closes a the Host connection while not under a write lock. +// disconnect closes the Host connection while not under a write lock. // undefined behavior if the caller has not taken the write lock func (h *Host) disconnect() { - // its possible to close a host which never sent so it never made a + // it's possible to close a host which never sent so that it never made a // connection. In that case, we should not close a connection which does not // exist if h.connection != nil { @@ -442,7 +450,7 @@ func (h *Host) String() string { h.id, addr) } -// Stringer interface for connection +// StringVerbose stringer interface for connection func (h *Host) StringVerbose() string { return fmt.Sprintf("%s\t CERTIFICATE: %s", h, h.certificate) } diff --git a/connect/hostParams.go b/connect/hostParams.go index 013c719b1e6a607bba91a8300f8c52b80eb872a5..18413425323a05422ba9048a8ed793739126dc83 100644 --- a/connect/hostParams.go +++ b/connect/hostParams.go @@ -41,6 +41,10 @@ type HostParams struct { // If set, metric handling will be enabled on this host EnableMetrics bool + // If true, a connection will only be established when a comm is sent + // else, a connection will be established immediately upon host creation + LazyConnection bool + // List of sending errors that are deemed unimportant // Reception of these errors will not update the Metric state ExcludeMetricErrors []string @@ -61,6 +65,7 @@ func GetDefaultHostParams() HostParams { SendTimeout: 2 * time.Minute, PingTimeout: 5 * time.Second, EnableMetrics: false, + LazyConnection: true, ExcludeMetricErrors: make([]string, 0), KaClientOpts: keepalive.ClientParameters{ // Send keepAlive every Time interval diff --git a/connect/transmit.go b/connect/transmit.go index de1a7747b640c57c71dc5fe2495fbc071c2aab78..307d7ca5ea7a89f7259304f5b596fa6dc748fc8f 100644 --- a/connect/transmit.go +++ b/connect/transmit.go @@ -88,7 +88,7 @@ func (c *ProtoComms) connect(host *Host, count uint64) (uint64, error) { } //if the connection is alive return, it is possible for another transmission - //to connect between releasing the read lock and taking the write lick + //to connect between releasing the read lock and taking the write lock if !host.isAlive() { //connect to host jww.INFO.Printf("Host %s not connected, attempting to connect...",