Skip to content
Snippets Groups Projects
Commit 74305fe7 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

rewrite of host manager to a seperatly rw muxed map and to allow it to handle...

rewrite of host manager to a seperatly rw muxed map and to allow it to handle re creation of hosts apropreatly
parent 45eb18f5
No related branches found
No related tags found
No related merge requests found
......@@ -60,7 +60,7 @@ func TestSignVerify(t *testing.T) {
func TestProtoComms_AuthenticatedReceiver(t *testing.T) {
// Create comm object
pc := ProtoComms{
Manager: Manager{},
Manager: newManager(),
tokens: token.NewMap(),
LocalServer: nil,
ListeningAddr: "",
......@@ -110,7 +110,7 @@ func TestProtoComms_AuthenticatedReceiver(t *testing.T) {
func TestProtoComms_AuthenticatedReceiver_BadId(t *testing.T) {
// Create comm object
pc := ProtoComms{
Manager: Manager{},
Manager: newManager(),
tokens: token.NewMap(),
LocalServer: nil,
ListeningAddr: "",
......@@ -142,13 +142,12 @@ func TestProtoComms_AuthenticatedReceiver_BadId(t *testing.T) {
}
// Try the authenticated received
_, err = pc.AuthenticatedReceiver(msg)
if err != nil {
return
}
a, _ := pc.AuthenticatedReceiver(msg)
if a.IsAuthenticated {
t.Errorf("Expected error path!"+
"Should not be able to marshal a message with id: %v", badId)
}
}
......@@ -227,6 +226,7 @@ func TestProtoComms_ValidateToken(t *testing.T) {
ListeningAddr: "",
privateKey: nil,
tokens: token.NewMap(),
Manager: newManager(),
}
err := comm.setPrivateKey(testkeys.LoadFromPath(testkeys.GetNodeKeyPath()))
if err != nil {
......@@ -280,6 +280,7 @@ func TestProtoComms_ValidateToken_BadId(t *testing.T) {
ListeningAddr: "",
privateKey: nil,
tokens: token.NewMap(),
Manager: newManager(),
}
err := comm.setPrivateKey(testkeys.LoadFromPath(testkeys.GetNodeKeyPath()))
if err != nil {
......@@ -348,6 +349,7 @@ func TestProtoComms_ValidateTokenDynamic(t *testing.T) {
Id: uid,
ListeningAddr: "",
tokens: token.NewMap(),
Manager: newManager(),
}
err = comm.setPrivateKey(rsa.CreatePrivateKeyPem(privKey))
if err != nil {
......
......@@ -56,7 +56,7 @@ var MaxConcurrentStreams = uint32(250000)
// Proto object containing a gRPC server
type ProtoComms struct {
// Inherit the Manager object
Manager
*Manager
// The network ID of this comms server
Id *id.ID
......@@ -98,6 +98,7 @@ func CreateCommClient(id *id.ID, pubKeyPem, privKeyPem,
pubKeyPem: pubKeyPem,
salt: salt,
tokens: token.NewMap(),
Manager: newManager(),
}
// Set the private key if specified
......@@ -119,6 +120,7 @@ func StartCommServer(id *id.ID, localServer string, certPEMblock,
Id: id,
ListeningAddr: localServer,
tokens: token.NewMap(),
Manager: newManager(),
}
listen:
......
......@@ -15,7 +15,7 @@ import (
)
func TestHost_address(t *testing.T) {
var mgr Manager
mgr := newManager()
testId := id.NewIdFromString("test", id.Node, t)
testAddress := "test"
host, err := mgr.AddHost(testId, testAddress, nil, false, false)
......
......@@ -21,61 +21,77 @@ import (
// to Host objects for top-level libraries
type Manager struct {
// A map of id.IDs to Hosts
connections sync.Map
connections map[id.ID]*Host
mux sync.RWMutex
}
func newManager() *Manager {
return &Manager{
connections: make(map[id.ID]*Host),
mux: sync.RWMutex{},
}
}
// Fetch a Host from the internal map
func (m *Manager) GetHost(hostId *id.ID) (*Host, bool) {
value, ok := m.connections.Load(*hostId)
m.mux.RLock()
defer m.mux.RUnlock()
host, ok := m.connections[*hostId]
if !ok {
return nil, false
}
host, ok := value.(*Host)
return host, ok
}
// Creates and adds a Host object to the Manager using the given id
func (m *Manager) AddHost(id *id.ID, address string,
func (m *Manager) AddHost(hid *id.ID, address string,
cert []byte, disableTimeout, enableAuth bool) (host *Host, err error) {
m.mux.Lock()
defer m.mux.Unlock()
host, err = NewHost(id, address, cert, disableTimeout, enableAuth)
//check if the host already exists, if it does return it
host, ok := m.connections[*hid]
if ok {
return host, nil
}
//create the new host
host, err = NewHost(hid, address, cert, disableTimeout, enableAuth)
if err != nil {
return nil, err
}
//add the host to the map
m.addHost(host)
return
}
// Removes a host from the connection manager
func (m *Manager) RemoveHost(id *id.ID) {
jww.DEBUG.Printf("Removing host: %v", id)
m.connections.Delete(*id)
return host, nil
}
// Internal helper function that can add Hosts directly
func (m *Manager) addHost(host *Host) {
jww.DEBUG.Printf("Adding host: %s", host)
m.connections.Store(*host.id, host)
m.connections[*(host.id)] = host
}
// Removes a host from the connection manager
func (m *Manager) RemoveHost(hid *id.ID) {
m.mux.Lock()
defer m.mux.Unlock()
delete(m.connections, *hid)
}
// Closes all client connections and removes them from Manager
func (m *Manager) DisconnectAll() {
m.connections.Range(func(key interface{}, value interface{}) bool {
value.(*Host).Disconnect()
return true
})
for _, host := range m.connections {
host.Disconnect()
}
}
// Implements Stringer for debug printing
func (m *Manager) String() string {
var result bytes.Buffer
m.connections.Range(func(key interface{}, value interface{}) bool {
k := key.(id.ID)
for k, host := range m.connections {
result.WriteString(fmt.Sprintf("[%s]: %+v",
(&k).String(), value.(*Host)))
return true
})
(&k).String(), host))
}
return result.String()
}
......@@ -58,14 +58,14 @@ func TestConnectionManager_Disconnect(t *testing.T) {
test := 2
pass := 0
address := ServerAddress
var manager Manager
manager := newManager()
testId := id.NewIdFromString("testId", id.Node, t)
host, err := manager.AddHost(testId, address, nil, false, false)
if err != nil {
t.Errorf("Unable to call connnect: %+v", err)
}
_, inMap := manager.connections.Load(*testId)
_, inMap := manager.connections[*testId]
if !inMap {
t.Errorf("connect Function didn't add connection to map")
......@@ -96,7 +96,7 @@ func TestConnectionManager_DisconnectAll(t *testing.T) {
pass := 0
address := ServerAddress
address2 := ServerAddress2
var manager Manager
manager := newManager()
testId := id.NewIdFromString("testId", id.Generic, t)
testId2 := id.NewIdFromString("TestId2", id.Generic, t)
......@@ -127,7 +127,7 @@ func TestConnectionManager_DisconnectAll(t *testing.T) {
t.Errorf("Unable to call connnect: %+v", err)
}
_, inMap = manager.connections.Load(*testId2)
_, inMap = manager.connections[*testId2]
if !inMap {
t.Errorf("connect Function didn't add connection to map")
......@@ -152,7 +152,7 @@ func TestConnectionManager_DisconnectAll(t *testing.T) {
}
func TestConnectionManager_String(t *testing.T) {
var manager Manager
manager := newManager()
//t.Log(manager)
certPath := testkeys.GetNodeCertPath()
......@@ -170,7 +170,7 @@ func TestConnectionManager_String(t *testing.T) {
// Show that if a connection is in the map,
// it's no longer in the map after RemoveHost is called
func TestConnectionManager_RemoveHost(t *testing.T) {
var manager Manager
manager := newManager()
// After adding the host, the connection should be accessible
id := id.NewIdFromString("i am a connection", id.Gateway, t)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment