diff --git a/api/utilsInterfaces_test.go b/api/utilsInterfaces_test.go
index 62ab16f5231d0bdc81ab30f8e0a1b890fb0931c7..259349681d4c3466fcf5af3bddc84be63215d076 100644
--- a/api/utilsInterfaces_test.go
+++ b/api/utilsInterfaces_test.go
@@ -125,3 +125,11 @@ func (t *testNetworkManagerGeneric) InProgressRegistrations() int {
func (t *testNetworkManagerGeneric) GetSender() *gateway.Sender {
return t.sender
}
+
+func (t *testNetworkManagerGeneric) GetAddressSize() uint8 { return 0 }
+
+func (t *testNetworkManagerGeneric) RegisterAddressSizeNotification(string) (chan uint8, error) {
+ return nil, nil
+}
+
+func (t *testNetworkManagerGeneric) UnregisterAddressSizeNotification(string) {}
diff --git a/cmd/root.go b/cmd/root.go
index 2e0ee849d36a0329e199e47dfbb13262ce6de027..6a8329f13d4f37c85dd2ac680d147168baffab06 100644
--- a/cmd/root.go
+++ b/cmd/root.go
@@ -525,7 +525,7 @@ func waitUntilConnected(connected chan bool) {
isConnected)
break
case <-timeoutTimer.C:
- jww.FATAL.Panic("timeout on connection")
+ jww.FATAL.Panicf("timeout on connection after %s", waitTimeout*time.Second)
}
}
diff --git a/go.mod b/go.mod
index 06ffdffc77ede3d213855d7619bbb69f14d36243..4f83d7b3d397a2973da761470650fc72f7dfb164 100644
--- a/go.mod
+++ b/go.mod
@@ -17,13 +17,13 @@ require (
github.com/spf13/jwalterweatherman v1.1.0
github.com/spf13/viper v1.7.1
gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228
- gitlab.com/elixxir/comms v0.0.4-0.20210524170509-89dd425cb228
- gitlab.com/elixxir/crypto v0.0.7-0.20210524170447-264b215ce90b
+ gitlab.com/elixxir/comms v0.0.4-0.20210526002834-a1bcf83670ce
+ gitlab.com/elixxir/crypto v0.0.7-0.20210526002540-1fb51df5b4b2
gitlab.com/elixxir/ekv v0.1.5
- gitlab.com/elixxir/primitives v0.0.3-0.20210524170524-9780695d2b55
- gitlab.com/xx_network/comms v0.0.4-0.20210524170426-175f698a7b07
- gitlab.com/xx_network/crypto v0.0.5-0.20210524170434-dc9a398a2581
- gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db
+ gitlab.com/elixxir/primitives v0.0.3-0.20210526002350-b9c947fec050
+ gitlab.com/xx_network/comms v0.0.4-0.20210526002311-2b5a66af0eac
+ gitlab.com/xx_network/crypto v0.0.5-0.20210526002149-9c08ccb202be
+ gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
diff --git a/go.sum b/go.sum
index 84d7662f419e86ed2bb6c694e95e1d5a92332a4f..650c87ee12905dc29b08dee2e584788bf874e984 100644
--- a/go.sum
+++ b/go.sum
@@ -242,11 +242,15 @@ gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228 h1:Gi6rj4mAlK0
gitlab.com/elixxir/bloomfilter v0.0.0-20200930191214-10e9ac31b228/go.mod h1:H6jztdm0k+wEV2QGK/KYA+MY9nj9Zzatux/qIvDDv3k=
gitlab.com/elixxir/comms v0.0.4-0.20210524170509-89dd425cb228 h1:btpdRz3MDO651yuH698feDEoBFgOmR020xCrjKQuxSk=
gitlab.com/elixxir/comms v0.0.4-0.20210524170509-89dd425cb228/go.mod h1:IJ2l4MFEpYk5VdPdsbTlZYh3owurHlnvxPv2c+dJZAA=
+gitlab.com/elixxir/comms v0.0.4-0.20210526002834-a1bcf83670ce h1:aMaMqzBhLT3Ug3FhZlzwsb2MU7RFNKdOhC0l/pM8pZo=
+gitlab.com/elixxir/comms v0.0.4-0.20210526002834-a1bcf83670ce/go.mod h1:RyA3YVhyPRTxE+jjf6CV8elwQiFr8QuS54LcdLWw97E=
gitlab.com/elixxir/crypto v0.0.0-20200804182833-984246dea2c4/go.mod h1:ucm9SFKJo+K0N2GwRRpaNr+tKXMIOVWzmyUD0SbOu2c=
gitlab.com/elixxir/crypto v0.0.3/go.mod h1:ZNgBOblhYToR4m8tj4cMvJ9UsJAUKq+p0gCp07WQmhA=
gitlab.com/elixxir/crypto v0.0.7-0.20210521205349-cb0c5cdd44e3/go.mod h1:x6QKxPWIMH742i2p0IN47wkqcpTntwnMFLYcUfUeRfA=
gitlab.com/elixxir/crypto v0.0.7-0.20210524170447-264b215ce90b h1:gEgfJB6VUNNSFoB4TFHuz4pwUMerF3wLTbJnmeFmBas=
gitlab.com/elixxir/crypto v0.0.7-0.20210524170447-264b215ce90b/go.mod h1:x6QKxPWIMH742i2p0IN47wkqcpTntwnMFLYcUfUeRfA=
+gitlab.com/elixxir/crypto v0.0.7-0.20210526002540-1fb51df5b4b2 h1:vB50nscyNp1Qy/5HIINaG5Wj2cBH40Ur6D3uLpVVDpc=
+gitlab.com/elixxir/crypto v0.0.7-0.20210526002540-1fb51df5b4b2/go.mod h1:Zf+/0EwVPNcpenZHK5KEn70XQoONsoYTmOQmTtZBUUA=
gitlab.com/elixxir/ekv v0.1.5 h1:R8M1PA5zRU1HVnTyrtwybdABh7gUJSCvt1JZwUSeTzk=
gitlab.com/elixxir/ekv v0.1.5/go.mod h1:e6WPUt97taFZe5PFLPb1Dupk7tqmDCTQu1kkstqJvw4=
gitlab.com/elixxir/primitives v0.0.0-20200731184040-494269b53b4d/go.mod h1:OQgUZq7SjnE0b+8+iIAT2eqQF+2IFHn73tOo+aV11mg=
@@ -256,16 +260,22 @@ gitlab.com/elixxir/primitives v0.0.1/go.mod h1:kNp47yPqja2lHSiS4DddTvFpB/4D9dB2Y
gitlab.com/elixxir/primitives v0.0.3-0.20210521205228-746e9ff840fb/go.mod h1:iqQxR8Vl9+79pTRM4EG9/GDhJ+IBlMsA7sVepVSaOYA=
gitlab.com/elixxir/primitives v0.0.3-0.20210524170524-9780695d2b55 h1:Y/avAGl/IQjS56bKB+xodaieKAYR7ihUctZXsdlbI/s=
gitlab.com/elixxir/primitives v0.0.3-0.20210524170524-9780695d2b55/go.mod h1:iqQxR8Vl9+79pTRM4EG9/GDhJ+IBlMsA7sVepVSaOYA=
+gitlab.com/elixxir/primitives v0.0.3-0.20210526002350-b9c947fec050 h1:yPDOkhKZrLa1WBRkecx1uatDrltCLifr7I2gMsgTb0M=
+gitlab.com/elixxir/primitives v0.0.3-0.20210526002350-b9c947fec050/go.mod h1:WWANCivJyXYh5h61q7f9k6eDEvi9CZwmmM/EAximSk0=
gitlab.com/xx_network/comms v0.0.0-20200805174823-841427dd5023/go.mod h1:owEcxTRl7gsoM8c3RQ5KAm5GstxrJp5tn+6JfQ4z5Hw=
gitlab.com/xx_network/comms v0.0.4-0.20210521205156-5dbbf700c6c7/go.mod h1:HiwVwX6+N4yVpq8ZKAduVsGxK0CL604sYbuW8pQNcok=
gitlab.com/xx_network/comms v0.0.4-0.20210524170426-175f698a7b07 h1:GHBfA8dWTWVhemavUfE9egGd9nNEVt+P/TAT18cnWi8=
gitlab.com/xx_network/comms v0.0.4-0.20210524170426-175f698a7b07/go.mod h1:qxX3x7yCATvaK8hhFibl2Rnnb+xvLior/AJlx2dk1UM=
+gitlab.com/xx_network/comms v0.0.4-0.20210526002311-2b5a66af0eac h1:jGvh1HpdMOQ0CFlrSTwMTfjOM4puIWmgFMV69OtxIvc=
+gitlab.com/xx_network/comms v0.0.4-0.20210526002311-2b5a66af0eac/go.mod h1:RGc5f2KZz5YpcMKKL7vc4dDuOhE7RnDrHa1h3lERLtY=
gitlab.com/xx_network/crypto v0.0.3/go.mod h1:DF2HYvvCw9wkBybXcXAgQMzX+MiGbFPjwt3t17VRqRE=
gitlab.com/xx_network/crypto v0.0.4/go.mod h1:+lcQEy+Th4eswFgQDwT0EXKp4AXrlubxalwQFH5O0Mk=
gitlab.com/xx_network/crypto v0.0.5-0.20210517205543-4ae99cbb9063/go.mod h1:AOUw4RJfBrKDXbe9nY/8bM3ID1czcooefUbVd82zrCY=
gitlab.com/xx_network/crypto v0.0.5-0.20210521205053-9423260a7c0f/go.mod h1:yPo/IIAhFo0nWxd6ykDH+JLFjTCYO3GIw05+TcpaaWQ=
gitlab.com/xx_network/crypto v0.0.5-0.20210524170434-dc9a398a2581 h1:/p2NlxhPexJAXXEB5rajvwzRoFY9dbq2DcXV4t7TSZ4=
gitlab.com/xx_network/crypto v0.0.5-0.20210524170434-dc9a398a2581/go.mod h1:AOUw4RJfBrKDXbe9nY/8bM3ID1czcooefUbVd82zrCY=
+gitlab.com/xx_network/crypto v0.0.5-0.20210526002149-9c08ccb202be h1:mOoKScRRcKB6tf5jJGT+43YbgnpCYnvdcMQg2byNzK8=
+gitlab.com/xx_network/crypto v0.0.5-0.20210526002149-9c08ccb202be/go.mod h1:bOto0zsMIJz0hrvcMcEff6YFI0+tP8cmfV/5B060B34=
gitlab.com/xx_network/primitives v0.0.0-20200803231956-9b192c57ea7c/go.mod h1:wtdCMr7DPePz9qwctNoAUzZtbOSHSedcK++3Df3psjA=
gitlab.com/xx_network/primitives v0.0.0-20200804183002-f99f7a7284da/go.mod h1:OK9xevzWCaPO7b1wiluVJGk7R5ZsuC7pHY5hteZFQug=
gitlab.com/xx_network/primitives v0.0.2/go.mod h1:cs0QlFpdMDI6lAo61lDRH2JZz+3aVkHy+QogOB6F/qc=
@@ -273,6 +283,12 @@ gitlab.com/xx_network/primitives v0.0.4-0.20210517202253-c7b4bd0087ea/go.mod h1:
gitlab.com/xx_network/primitives v0.0.4-0.20210521183842-3b12812ac984/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE=
gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db h1:JD/ttwHOzJLEZg/Q01f59uo9p8ZoPLvIyTPayl5kBxA=
gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE=
+gitlab.com/xx_network/primitives v0.0.4-0.20210524180355-07f8a01a856f h1:xvPCYeH79Op96BSPc/XJ9CpxZXWn0KJJ2i5SDvey/gc=
+gitlab.com/xx_network/primitives v0.0.4-0.20210524180355-07f8a01a856f/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE=
+gitlab.com/xx_network/primitives v0.0.4-0.20210524201134-f2e4de31b813 h1:flLCpV/6uFJl+UKLDy116iyET9x/IRlomWISZnA79cs=
+gitlab.com/xx_network/primitives v0.0.4-0.20210524201134-f2e4de31b813/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE=
+gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd h1:+vQkuM/o3TqS6bHqIyy36+aGn3TTrvvWFALol2TU13w=
+gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE=
gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0=
gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go
index 1daa7d38104731870459b1323c8d918401a398fd..ad9e03caf6f53e13e9d8e9190f2417749c99f490 100644
--- a/interfaces/networkManager.go
+++ b/interfaces/networkManager.go
@@ -29,6 +29,19 @@ type NetworkManager interface {
Follow(report ClientErrorReport) (stoppable.Stoppable, error)
CheckGarbledMessages()
InProgressRegistrations() int
+
+ // GetAddressSize returns the current address size of IDs. Blocks until an
+ // address size is known.
+ GetAddressSize() uint8
+
+ // RegisterAddressSizeNotification returns a channel that will trigger for
+ // every address space size update. The provided tag is the unique ID for
+ // the channel. Returns an error if the tag is already used.
+ RegisterAddressSizeNotification(tag string) (chan uint8, error)
+
+ // UnregisterAddressSizeNotification stops broadcasting address space size
+ // updates on the channel with the specified tag.
+ UnregisterAddressSizeNotification(tag string)
}
//for use in key exchange which needs to be callable inside of network
diff --git a/keyExchange/utils_test.go b/keyExchange/utils_test.go
index 58c21e7fef3e722ce5377217e36edc72248ffb31..d9dd4ef14b2dea6fd6c3f06db794e14a87b44cc8 100644
--- a/keyExchange/utils_test.go
+++ b/keyExchange/utils_test.go
@@ -108,6 +108,14 @@ func (t *testNetworkManagerGeneric) GetSender() *gateway.Sender {
return nil
}
+func (t *testNetworkManagerGeneric) GetAddressSize() uint8 { return 0 }
+
+func (t *testNetworkManagerGeneric) RegisterAddressSizeNotification(string) (chan uint8, error) {
+ return nil, nil
+}
+
+func (t *testNetworkManagerGeneric) UnregisterAddressSizeNotification(string) {}
+
func InitTestingContextGeneric(i interface{}) (*storage.Session, interfaces.NetworkManager, error) {
switch i.(type) {
case *testing.T, *testing.M, *testing.B, *testing.PB:
@@ -219,6 +227,14 @@ func (t *testNetworkManagerFullExchange) GetSender() *gateway.Sender {
return nil
}
+func (t *testNetworkManagerFullExchange) GetAddressSize() uint8 { return 0 }
+
+func (t *testNetworkManagerFullExchange) RegisterAddressSizeNotification(string) (chan uint8, error) {
+ return nil, nil
+}
+
+func (t *testNetworkManagerFullExchange) UnregisterAddressSizeNotification(string) {}
+
func InitTestingContextFullExchange(i interface{}) (*storage.Session, *switchboard.Switchboard, interfaces.NetworkManager) {
switch i.(type) {
case *testing.T, *testing.M, *testing.B, *testing.PB:
diff --git a/network/ephemeral/addressSpace.go b/network/ephemeral/addressSpace.go
new file mode 100644
index 0000000000000000000000000000000000000000..f942df77df38bfa4455bdf096ad2d4e0add40b49
--- /dev/null
+++ b/network/ephemeral/addressSpace.go
@@ -0,0 +1,138 @@
+package ephemeral
+
+import (
+ "github.com/pkg/errors"
+ jww "github.com/spf13/jwalterweatherman"
+ "sync"
+ "testing"
+)
+
+const (
+ // The initial value for the address space size. This value signifies that
+ // the address space size has not yet been updated.
+ initSize = 1
+)
+
+// AddressSpace contains the current address space size used for creating
+// ephemeral IDs and the infrastructure to alert other processes when an Update
+// occurs.
+type AddressSpace struct {
+ size uint8
+ notifyMap map[string]chan uint8
+ cond *sync.Cond
+}
+
+// NewAddressSpace initialises a new AddressSpace and returns it.
+func NewAddressSpace() *AddressSpace {
+ return &AddressSpace{
+ size: initSize,
+ notifyMap: make(map[string]chan uint8),
+ cond: sync.NewCond(&sync.Mutex{}),
+ }
+}
+
+// Get returns the current address space size. It blocks until an address space
+// size is set.
+func (as *AddressSpace) Get() uint8 {
+ as.cond.L.Lock()
+ defer as.cond.L.Unlock()
+
+ // If the size has been set, then return the current size
+ if as.size != initSize {
+ return as.size
+ }
+
+ // If the size is not set, then block until it is set
+ as.cond.Wait()
+
+ return as.size
+}
+
+// GetWithoutWait returns the current address space size regardless if it has
+// been set yet.
+func (as *AddressSpace) GetWithoutWait() uint8 {
+ as.cond.L.Lock()
+ defer as.cond.L.Unlock()
+ return as.size
+}
+
+// Update updates the address space size to the new size, if it is larger. Then,
+// each registered channel is notified of the Update. If this was the first time
+// that the address space size was set, then the conditional broadcasts to stop
+// blocking for all threads waiting on Get.
+func (as *AddressSpace) Update(newSize uint8) {
+ as.cond.L.Lock()
+ defer as.cond.L.Unlock()
+
+ // Skip Update if the address space size is unchanged
+ if as.size >= newSize {
+ return
+ }
+
+ // Update address space size
+ oldSize := as.size
+ as.size = newSize
+ jww.INFO.Printf("Updated address space size from %d to %d", oldSize, as.size)
+
+ // Broadcast that the address space size is set, if set for the first time
+ if oldSize == initSize {
+ as.cond.Broadcast()
+ } else {
+ // Broadcast the new address space size to all registered channels
+ for chanID, sizeChan := range as.notifyMap {
+ select {
+ case sizeChan <- as.size:
+ default:
+ jww.ERROR.Printf("Failed to send address space Update of %d on "+
+ "channel with ID %s", as.size, chanID)
+ }
+ }
+ }
+}
+
+// RegisterNotification returns a channel that will trigger for every address
+// space size Update. The provided tag is the unique ID for the channel.
+// Returns an error if the tag is already used.
+func (as *AddressSpace) RegisterNotification(tag string) (chan uint8, error) {
+ as.cond.L.Lock()
+ defer as.cond.L.Unlock()
+
+ if _, exists := as.notifyMap[tag]; exists {
+ return nil, errors.Errorf("tag \"%s\" already exists in notify map", tag)
+ }
+
+ as.notifyMap[tag] = make(chan uint8, 1)
+
+ return as.notifyMap[tag], nil
+}
+
+// UnregisterNotification stops broadcasting address space size updates on the
+// channel with the specified tag.
+func (as *AddressSpace) UnregisterNotification(tag string) {
+ as.cond.L.Lock()
+ defer as.cond.L.Unlock()
+
+ delete(as.notifyMap, tag)
+}
+
+// NewTestAddressSpace initialises a new AddressSpace for testing with the given
+// size.
+func NewTestAddressSpace(newSize uint8, x interface{}) *AddressSpace {
+ switch x.(type) {
+ case *testing.T, *testing.M, *testing.B, *testing.PB:
+ break
+ default:
+ jww.FATAL.Panicf("NewTestAddressSpace is restricted to testing only. "+
+ "Got %T", x)
+ }
+
+ as := &AddressSpace{
+ size: initSize,
+ notifyMap: make(map[string]chan uint8),
+ cond: sync.NewCond(&sync.Mutex{}),
+ }
+
+ as.Update(newSize)
+
+ return as
+}
diff --git a/network/ephemeral/addressSpace_test.go b/network/ephemeral/addressSpace_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..e08f08d844a6aff3bf387db5456a2147fe89f16b
--- /dev/null
+++ b/network/ephemeral/addressSpace_test.go
@@ -0,0 +1,291 @@
+package ephemeral
+
+import (
+ "reflect"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+)
+
+// Unit test of NewAddressSpace.
+func Test_newAddressSpace(t *testing.T) {
+ expected := &AddressSpace{
+ size: initSize,
+ notifyMap: make(map[string]chan uint8),
+ cond: sync.NewCond(&sync.Mutex{}),
+ }
+
+ as := NewAddressSpace()
+
+ if !reflect.DeepEqual(expected, as) {
+ t.Errorf("NewAddressSpace failed to return the expected AddressSpace."+
+ "\nexpected: %+v\nreceived: %+v", expected, as)
+ }
+}
+
+// Test that AddressSpace.Get blocks when the address space size has not been
+// set and that it does not block when it has been set.
+func Test_addressSpace_Get(t *testing.T) {
+ as := NewAddressSpace()
+ expectedSize := uint8(42)
+
+ // Call Get and error if it does not block
+ wait := make(chan uint8)
+ go func() { wait <- as.Get() }()
+ select {
+ case size := <-wait:
+ t.Errorf("Get failed to block and returned size %d.", size)
+ case <-time.NewTimer(10 * time.Millisecond).C:
+ }
+
+ // Update address size
+ as.cond.L.Lock()
+ as.size = expectedSize
+ as.cond.L.Unlock()
+
+ // Call Get and error if it does block
+ wait = make(chan uint8)
+ go func() { wait <- as.Get() }()
+ select {
+ case size := <-wait:
+ if size != expectedSize {
+ t.Errorf("Get returned the wrong size.\nexpected: %d\nreceived: %d",
+ expectedSize, size)
+ }
+ case <-time.NewTimer(15 * time.Millisecond).C:
+ t.Error("Get blocking when the size has been updated.")
+ }
+}
+
+// Test that AddressSpace.Get blocks until the condition broadcasts.
+func Test_addressSpace_Get_WaitBroadcast(t *testing.T) {
+ as := NewAddressSpace()
+
+ wait := make(chan uint8)
+ go func() { wait <- as.Get() }()
+
+ go func() {
+ select {
+ case size := <-wait:
+ if size != initSize {
+ t.Errorf("Get returned the wrong size.\nexpected: %d\nreceived: %d",
+ initSize, size)
+ }
+ case <-time.NewTimer(15 * time.Millisecond).C:
+ t.Error("Get blocking when the Cond has broadcast.")
+ }
+ }()
+
+ time.Sleep(5 * time.Millisecond)
+
+ as.cond.Broadcast()
+}
+
+// Unit test of AddressSpace.GetWithoutWait.
+func Test_addressSpace_GetWithoutWait(t *testing.T) {
+ as := NewAddressSpace()
+
+ size := as.GetWithoutWait()
+ if size != initSize {
+ t.Errorf("GetWithoutWait returned the wrong size."+
+ "\nexpected: %d\nreceived: %d", initSize, size)
+ }
+}
+
+// Tests that AddressSpace.Update only updates the size when it is larger.
+func Test_addressSpace_update(t *testing.T) {
+ as := NewAddressSpace()
+ expectedSize := uint8(42)
+
+ // Attempt to Update to larger size
+ as.Update(expectedSize)
+ if as.size != expectedSize {
+ t.Errorf("Update failed to set the new size."+
+ "\nexpected: %d\nreceived: %d", expectedSize, as.size)
+ }
+
+ // Attempt to Update to smaller size
+ as.Update(expectedSize - 1)
+ if as.size != expectedSize {
+ t.Errorf("Update failed to set the new size."+
+ "\nexpected: %d\nreceived: %d", expectedSize, as.size)
+ }
+}
+
+// Tests that AddressSpace.Update sends the new size to all registered channels.
+func Test_addressSpace_update_GetAndChannels(t *testing.T) {
+ as := NewAddressSpace()
+ var wg sync.WaitGroup
+ expectedSize := uint8(42)
+
+ // Start threads that are waiting for an Update
+ wait := []chan uint8{make(chan uint8), make(chan uint8), make(chan uint8)}
+ for _, waitChan := range wait {
+ go func(waitChan chan uint8) { waitChan <- as.Get() }(waitChan)
+ }
+
+ // Wait on threads
+ for i, waitChan := range wait {
+ go func(i int, waitChan chan uint8) {
+ wg.Add(1)
+ defer wg.Done()
+
+ select {
+ case size := <-waitChan:
+ if size != expectedSize {
+ t.Errorf("Thread %d received unexpected size."+
+ "\nexpected: %d\nreceived: %d", i, expectedSize, size)
+ }
+ case <-time.NewTimer(15 * time.Millisecond).C:
+ t.Errorf("Timed out waiting for Get to return on thread %d.", i)
+ }
+ }(i, waitChan)
+ }
+
+ // Register channels
+ notifyChannels := make(map[string]chan uint8)
+ var notifyChan chan uint8
+ var err error
+ var chanID string
+ for i := 0; i < 3; i++ {
+ chanID = strconv.Itoa(i)
+ notifyChannels[chanID], err = as.RegisterNotification(chanID)
+ if err != nil {
+ t.Errorf("Failed to regisdter channel: %+v", err)
+ }
+ }
+
+ // Wait for new size on channels
+ for chanID, notifyChan := range notifyChannels {
+ go func(chanID string, notifyChan chan uint8) {
+ wg.Add(1)
+ defer wg.Done()
+
+ select {
+ case size := <-notifyChan:
+ t.Errorf("Received size %d on channel %s when it should not have.",
+ size, chanID)
+ case <-time.NewTimer(15 * time.Millisecond).C:
+ }
+ }(chanID, notifyChan)
+ }
+
+ time.Sleep(5 * time.Millisecond)
+
+ // Attempt to Update to larger size
+ as.Update(expectedSize)
+
+ wg.Wait()
+
+ // Unregistered one channel and make sure it will not receive
+ delete(notifyChannels, chanID)
+ as.UnregisterNotification(chanID)
+
+ expectedSize++
+
+ // Wait for new size on channels
+ for chanID, notifyChan := range notifyChannels {
+ go func(chanID string, notifyChan chan uint8) {
+ wg.Add(1)
+ defer wg.Done()
+
+ select {
+ case size := <-notifyChan:
+ if size != expectedSize {
+ t.Errorf("Failed to receive expected size on channel %s."+
+ "\nexpected: %d\nreceived: %d", chanID, expectedSize, size)
+ }
+ case <-time.NewTimer(15 * time.Millisecond).C:
+ t.Errorf("Timed out waiting on channel %s", chanID)
+ }
+ }(chanID, notifyChan)
+ }
+
+ // Wait for timeout on unregistered channel
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+
+ select {
+ case size := <-notifyChan:
+ t.Errorf("Received size %d on channel %s when it should not have.",
+ size, chanID)
+ case <-time.NewTimer(15 * time.Millisecond).C:
+ }
+ }()
+
+ time.Sleep(5 * time.Millisecond)
+
+ // Attempt to Update to larger size
+ as.Update(expectedSize)
+
+ wg.Wait()
+}
+
+// Tests that a channel created by AddressSpace.RegisterNotification receives
+// the expected size when triggered.
+func Test_addressSpace_RegisterNotification(t *testing.T) {
+ as := NewAddressSpace()
+ expectedSize := uint8(42)
+
+ // Register channel
+ chanID := "chanID"
+ sizeChan, err := as.RegisterNotification(chanID)
+ if err != nil {
+ t.Errorf("RegisterNotification returned an error: %+v", err)
+ }
+
+ // Wait on channel or error after timing out
+ go func() {
+ select {
+ case size := <-sizeChan:
+ if size != expectedSize {
+ t.Errorf("received wrong size on channel."+
+ "\nexpected: %d\nreceived: %d", expectedSize, size)
+ }
+ case <-time.NewTimer(10 * time.Millisecond).C:
+ t.Error("Timed out waiting on channel.")
+ }
+ }()
+
+ // Send on channel
+ select {
+ case as.notifyMap[chanID] <- expectedSize:
+ default:
+ t.Errorf("Sent on channel %s that should not be in map.", chanID)
+ }
+}
+
+// Tests that when AddressSpace.UnregisterNotification unregisters a channel,
+// it no longer can be triggered from the map.
+func Test_addressSpace_UnregisterNotification(t *testing.T) {
+ as := NewAddressSpace()
+ expectedSize := uint8(42)
+
+ // Register channel and then unregister it
+ chanID := "chanID"
+ sizeChan, err := as.RegisterNotification(chanID)
+ if err != nil {
+ t.Errorf("RegisterNotification returned an error: %+v", err)
+ }
+ as.UnregisterNotification(chanID)
+
+ // Wait for timeout or error if the channel receives
+ go func() {
+ select {
+ case size := <-sizeChan:
+ t.Errorf("Received %d on channel %s that should not be in map.",
+ size, chanID)
+ case <-time.NewTimer(10 * time.Millisecond).C:
+ }
+ }()
+
+ // Send on channel
+ select {
+ case as.notifyMap[chanID] <- expectedSize:
+ t.Errorf("Sent size %d on channel %s that should not be in map.",
+ expectedSize, chanID)
+ default:
+ }
+}
diff --git a/network/ephemeral/testutil.go b/network/ephemeral/testutil.go
index d2708b0c71c7d112151e4494d3ef631a78adaf62..c213078eeecaca6178d6f2cd7617caf468da792e 100644
--- a/network/ephemeral/testutil.go
+++ b/network/ephemeral/testutil.go
@@ -70,7 +70,7 @@ func (t *testNetworkManager) GetHealthTracker() interfaces.HealthTracker {
return nil
}
-func (t *testNetworkManager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppable, error) {
+func (t *testNetworkManager) Follow(_ interfaces.ClientErrorReport) (stoppable.Stoppable, error) {
return nil, nil
}
@@ -84,12 +84,19 @@ func (t *testNetworkManager) GetSender() *gateway.Sender {
return nil
}
+func (t *testNetworkManager) GetAddressSize() uint8 { return 15 }
+func (t *testNetworkManager) RegisterAddressSizeNotification(string) (chan uint8, error) {
+ return nil, nil
+}
+
+func (t *testNetworkManager) UnregisterAddressSizeNotification(string) {}
+
func NewTestNetworkManager(i interface{}) interfaces.NetworkManager {
switch i.(type) {
case *testing.T, *testing.M, *testing.B:
break
default:
- jww.FATAL.Panicf("initTesting is restricted to testing only."+
+ jww.FATAL.Panicf("NewTestNetworkManager is restricted to testing only."+
"Got %T", i)
}
@@ -97,17 +104,22 @@ func NewTestNetworkManager(i interface{}) interfaces.NetworkManager {
cert, err := utils.ReadFile(testkeys.GetNodeCertPath())
if err != nil {
- jww.FATAL.Panicf("Failed to create new test Instance: %v", err)
+ jww.FATAL.Panicf("Failed to create new test Instance: %+v", err)
}
- commsManager.AddHost(&id.Permissioning, "", cert, connect.GetDefaultHostParams())
+ _, err = commsManager.AddHost(
+ &id.Permissioning, "", cert, connect.GetDefaultHostParams())
+ if err != nil {
+ jww.FATAL.Panicf("Failed to add host: %+v", err)
+ }
instanceComms := &connect.ProtoComms{
Manager: commsManager,
}
- thisInstance, err := network.NewInstanceTesting(instanceComms, getNDF(), getNDF(), nil, nil, i)
+ thisInstance, err := network.NewInstanceTesting(
+ instanceComms, getNDF(), getNDF(), nil, nil, i)
if err != nil {
- jww.FATAL.Panicf("Failed to create new test Instance: %v", err)
+ jww.FATAL.Panicf("Failed to create new test Instance: %+v", err)
}
thisManager := &testNetworkManager{instance: thisInstance}
diff --git a/network/ephemeral/tracker.go b/network/ephemeral/tracker.go
index 7292f73d8cfa8741c4bac631dd9c43ab3859b53d..da81a326914389233631d15fd19bf9ee65744ed0 100644
--- a/network/ephemeral/tracker.go
+++ b/network/ephemeral/tracker.go
@@ -22,150 +22,157 @@ import (
const validityGracePeriod = 5 * time.Minute
const TimestampKey = "IDTrackingTimestamp"
+const TimestampStoreVersion = 0
const ephemeralStoppable = "EphemeralCheck"
+const addressSpaceSizeChanTag = "ephemeralTracker"
-// Track runs a thread which checks for past and present ephemeral ids
-func Track(session *storage.Session, ourId *id.ID) stoppable.Stoppable {
+// Track runs a thread which checks for past and present ephemeral ID.
+func Track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID) stoppable.Stoppable {
stop := stoppable.NewSingle(ephemeralStoppable)
- go track(session, ourId, stop)
+ go track(session, addrSpace, ourId, stop)
return stop
}
-// track is a thread which continuously processes ephemeral ids.
-// If any error occurs, the thread crashes
-func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single) {
+// track is a thread which continuously processes ephemeral IDs. Panics if any
+// error occurs.
+func track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID, stop *stoppable.Single) {
// Check that there is a timestamp in store at all
err := checkTimestampStore(session)
if err != nil {
- jww.FATAL.Panicf("Could not store timestamp "+
- "for ephemeral ID tracking: %v", err)
+ jww.FATAL.Panicf("Could not store timestamp for ephemeral ID "+
+ "tracking: %+v", err)
}
// Get the latest timestamp from store
lastTimestampObj, err := session.Get(TimestampKey)
if err != nil {
- jww.FATAL.Panicf("Could not get timestamp: %v", err)
+ jww.FATAL.Panicf("Could not get timestamp: %+v", err)
}
lastCheck, err := unmarshalTimestamp(lastTimestampObj)
if err != nil {
- jww.FATAL.Panicf("Could not parse stored timestamp: %v", err)
+ jww.FATAL.Panicf("Could not parse stored timestamp: %+v", err)
}
- // Wait until we get the id size from the network
+ // Wait until we get the ID size from the network
receptionStore := session.Reception()
- receptionStore.WaitForIdSizeUpdate()
+ addressSizeUpdate, err := addrSpace.RegisterNotification(addressSpaceSizeChanTag)
+ if err != nil {
+ jww.FATAL.Panicf("failed to register address size notification "+
+ "channel: %+v", err)
+ }
+ addressSize := addrSpace.Get()
- for true {
+ for {
now := netTime.Now()
- //hack for inconsistent time on android
- if now.Sub(lastCheck) <=0{
+ // Hack for inconsistent time on android
+ if now.Before(lastCheck) || now.Equal(lastCheck) {
now = lastCheck.Add(time.Nanosecond)
}
// Generates the IDs since the last track
- protoIds, err := ephemeral.GetIdsByRange(ourId, receptionStore.GetIDSize(),
- now, now.Sub(lastCheck))
+ protoIds, err := ephemeral.GetIdsByRange(
+ ourId, uint(addressSize), now, now.Sub(lastCheck))
jww.DEBUG.Printf("Now: %s, LastCheck: %s, Different: %s",
now, lastCheck, now.Sub(lastCheck))
-
jww.DEBUG.Printf("protoIds Count: %d", len(protoIds))
if err != nil {
- jww.FATAL.Panicf("Could not generate "+
- "upcoming IDs: %v", err)
+ jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err)
}
// Generate identities off of that list
- identities := generateIdentities(protoIds, ourId)
-
- jww.INFO.Printf("Number of Identities Generated: %d",
- len(identities))
+ identities := generateIdentities(protoIds, ourId, addressSize)
+ jww.INFO.Printf("Number of Identities Generated: %d", len(identities))
jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, End: %s",
- identities[len(identities)-1].EphId.Int64(), identities[len(identities)-1].Source,
- identities[len(identities)-1].StartValid, identities[len(identities)-1].EndValid)
+ identities[len(identities)-1].EphId.Int64(),
+ identities[len(identities)-1].Source,
+ identities[len(identities)-1].StartValid,
+ identities[len(identities)-1].EndValid)
- // Add identities to storage if unique
+ // Add identities to storage, if unique
for _, identity := range identities {
if err = receptionStore.AddIdentity(identity); err != nil {
- jww.FATAL.Panicf("Could not insert "+
- "identity: %v", err)
+ jww.FATAL.Panicf("Could not insert identity: %+v", err)
}
}
- // Generate the time stamp for storage
+ // Generate the timestamp for storage
vo, err := marshalTimestamp(now)
if err != nil {
- jww.FATAL.Panicf("Could not marshal "+
- "timestamp for storage: %v", err)
+ jww.FATAL.Panicf("Could not marshal timestamp for storage: %+v", err)
}
// Store the timestamp
if err = session.Set(TimestampKey, vo); err != nil {
- jww.FATAL.Panicf("Could not store timestamp: %v", err)
+ jww.FATAL.Panicf("Could not store timestamp: %+v", err)
}
- // Sleep until the last Id has expired
+ // Sleep until the last ID has expired
timeToSleep := calculateTickerTime(protoIds)
- t := time.NewTimer(timeToSleep)
select {
- case <-t.C:
+ case <-time.NewTimer(timeToSleep).C:
+ case addressSize = <-addressSizeUpdate:
+ receptionStore.SetToExpire(addressSize)
case <-stop.Quit():
return
}
}
}
-// generateIdentities is a constructor which generates a list of
-// identities off of the list of protoIdentities passed in
-func generateIdentities(protoIds []ephemeral.ProtoIdentity,
- ourId *id.ID) []reception.Identity {
+// generateIdentities generates a list of identities off of the list of passed
+// in ProtoIdentity.
+func generateIdentities(protoIds []ephemeral.ProtoIdentity, ourId *id.ID,
+ addressSize uint8) []reception.Identity {
- identities := make([]reception.Identity, 0)
+ identities := make([]reception.Identity, len(protoIds))
- // Add identities for every ephemeral id
- for _, eid := range protoIds {
+ // Add identities for every ephemeral ID
+ for i, eid := range protoIds {
// Expand the grace period for both start and end
eid.End.Add(validityGracePeriod)
eid.Start.Add(-validityGracePeriod)
- identities = append(identities, reception.Identity{
- EphId: eid.Id,
- Source: ourId,
- End: eid.End,
- StartValid: eid.Start,
- EndValid: eid.End,
- Ephemeral: false,
- })
+ identities[i] = reception.Identity{
+ EphId: eid.Id,
+ Source: ourId,
+ AddressSize: addressSize,
+ End: eid.End,
+ StartValid: eid.Start,
+ EndValid: eid.End,
+ Ephemeral: false,
+ }
}
return identities
}
-// Sanitation check of timestamp store. If a value has not been stored yet
-// then the current time is stored
+// checkTimestampStore performs a sanitation check of timestamp store. If a
+// value has not been stored yet, then the current time is stored.
func checkTimestampStore(session *storage.Session) error {
if _, err := session.Get(TimestampKey); err != nil {
- // only generate from the last hour because this is a new id, it
- // couldn't receive messages yet
+ // Only generate from the last hour because this is a new ID; it could
+ // not yet receive messages
now, err := marshalTimestamp(netTime.Now().Add(-1 * time.Hour))
if err != nil {
- return errors.Errorf("Could not marshal new timestamp for storage: %v", err)
+ return errors.Errorf("Could not marshal new timestamp for "+
+ "storage: %+v", err)
}
+
return session.Set(TimestampKey, now)
}
return nil
}
-// Takes the stored timestamp and unmarshal into a time object
+// unmarshalTimestamp unmarshal the stored timestamp into a time.Time.
func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) {
if lastTimestampObj == nil || lastTimestampObj.Data == nil {
return netTime.Now(), nil
@@ -176,27 +183,29 @@ func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) {
return lastTimestamp, err
}
-// Marshals the timestamp for ekv storage. Generates a storable object
+// marshalTimestamp marshals the timestamp and generates a storable object for
+// ekv storage.
func marshalTimestamp(timeToStore time.Time) (*versioned.Object, error) {
data, err := timeToStore.MarshalBinary()
return &versioned.Object{
- Version: 0,
+ Version: TimestampStoreVersion,
Timestamp: netTime.Now(),
Data: data,
}, err
}
-// Helper function which calculates the time for the ticker based
-// off of the last ephemeral ID to expire
+// calculateTickerTime calculates the time for the ticker based off of the last
+// ephemeral ID to expire.
func calculateTickerTime(baseIDs []ephemeral.ProtoIdentity) time.Duration {
if len(baseIDs) == 0 {
return time.Duration(0)
}
+
// Get the last identity in the list
lastIdentity := baseIDs[len(baseIDs)-1]
- // Factor out the grace period previously expanded upon.
+ // Factor out the grace period previously expanded upon
// Calculate and return that duration
gracePeriod := lastIdentity.End.Add(-validityGracePeriod)
return lastIdentity.End.Sub(gracePeriod)
diff --git a/network/ephemeral/tracker_test.go b/network/ephemeral/tracker_test.go
index e6c3e11f8fd3b8a9e120f3634cac636b385f66c6..3b9e03b29db0b470eebb6a56e490d5f98ac0385b 100644
--- a/network/ephemeral/tracker_test.go
+++ b/network/ephemeral/tracker_test.go
@@ -28,35 +28,34 @@ func TestCheck(t *testing.T) {
session := storage.InitTestingSession(t)
instance := NewTestNetworkManager(t)
if err := setupInstance(instance); err != nil {
- t.Errorf("Could not set up instance: %v", err)
+ t.Errorf("Could not set up instance: %+v", err)
}
- /// Store a mock initial timestamp the store
+ // Store a mock initial timestamp the store
now := netTime.Now()
twoDaysAgo := now.Add(-2 * 24 * time.Hour)
twoDaysTimestamp, err := marshalTimestamp(twoDaysAgo)
if err != nil {
- t.Errorf("Could not marshal timestamp for test setup: %v", err)
+ t.Errorf("Could not marshal timestamp for test setup: %+v", err)
}
+
err = session.Set(TimestampKey, twoDaysTimestamp)
if err != nil {
- t.Errorf("Could not set mock timestamp for test setup: %v", err)
+ t.Errorf("Could not set mock timestamp for test setup: %+v", err)
}
ourId := id.NewIdFromBytes([]byte("Sauron"), t)
- stop := Track(session, ourId)
- session.Reception().MarkIdSizeAsSet()
+ stop := Track(session, NewTestAddressSpace(15, t), ourId)
err = stop.Close(3 * time.Second)
if err != nil {
- t.Errorf("Could not close thread: %v", err)
+ t.Errorf("Could not close thread: %+v", err)
}
}
-// Unit test for track
+// Unit test for track.
func TestCheck_Thread(t *testing.T) {
-
session := storage.InitTestingSession(t)
instance := NewTestNetworkManager(t)
if err := setupInstance(instance); err != nil {
@@ -65,26 +64,25 @@ func TestCheck_Thread(t *testing.T) {
ourId := id.NewIdFromBytes([]byte("Sauron"), t)
stop := stoppable.NewSingle(ephemeralStoppable)
- /// Store a mock initial timestamp the store
+ // Store a mock initial timestamp the store
now := netTime.Now()
yesterday := now.Add(-24 * time.Hour)
yesterdayTimestamp, err := marshalTimestamp(yesterday)
if err != nil {
- t.Errorf("Could not marshal timestamp for test setup: %v", err)
+ t.Errorf("Could not marshal timestamp for test setup: %+v", err)
}
+
err = session.Set(TimestampKey, yesterdayTimestamp)
if err != nil {
- t.Errorf("Could not set mock timestamp for test setup: %v", err)
+ t.Errorf("Could not set mock timestamp for test setup: %+v", err)
}
// Run the tracker
go func() {
- track(session, ourId, stop)
+ track(session, NewTestAddressSpace(15, t), ourId, stop)
}()
time.Sleep(3 * time.Second)
- session.Reception().MarkIdSizeAsSet()
-
err = stop.Close(3 * time.Second)
if err != nil {
t.Errorf("Could not close thread: %v", err)
@@ -95,7 +93,7 @@ func TestCheck_Thread(t *testing.T) {
func setupInstance(instance interfaces.NetworkManager) error {
cert, err := utils.ReadFile(testkeys.GetNodeKeyPath())
if err != nil {
- return errors.Errorf("Failed to read cert from from file: %v", err)
+ return errors.Errorf("Failed to read cert from from file: %+v", err)
}
ri := &mixmessages.RoundInfo{
ID: 1,
@@ -103,20 +101,20 @@ func setupInstance(instance interfaces.NetworkManager) error {
testCert, err := rsa.LoadPrivateKeyFromPem(cert)
if err != nil {
- return errors.Errorf("Failed to load cert from from file: %v", err)
+ return errors.Errorf("Failed to load cert from from file: %+v", err)
}
if err = signature.SignRsa(ri, testCert); err != nil {
- return errors.Errorf("Failed to sign round info: %v", err)
+ return errors.Errorf("Failed to sign round info: %+v", err)
}
if err = instance.GetInstance().RoundUpdate(ri); err != nil {
- return errors.Errorf("Failed to RoundUpdate from from file: %v", err)
+ return errors.Errorf("Failed to RoundUpdate from from file: %+v", err)
}
ri = &mixmessages.RoundInfo{
ID: 2,
}
if err = signature.SignRsa(ri, testCert); err != nil {
- return errors.Errorf("Failed to sign round info: %v", err)
+ return errors.Errorf("Failed to sign round info: %+v", err)
}
if err = instance.GetInstance().RoundUpdate(ri); err != nil {
return errors.Errorf("Failed to RoundUpdate from from file: %v", err)
diff --git a/network/follow.go b/network/follow.go
index 8fedb69e085bd699bcb55414a0c95e0a6e20eb34..05d8134600d2370e647363e56624005d39fc472a 100644
--- a/network/follow.go
+++ b/network/follow.go
@@ -64,20 +64,20 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch
m.follow(report, rng, m.Comms, isRunning)
case <-TrackTicker.C:
numPolls := atomic.SwapUint64(m.tracker, 0)
- if m.numLatencies !=0{
- latencyAvg := time.Nanosecond*time.Duration(m.latencySum/m.numLatencies)
+ if m.numLatencies != 0 {
+ latencyAvg := time.Nanosecond * time.Duration(m.latencySum/m.numLatencies)
m.latencySum, m.numLatencies = 0, 0
jww.INFO.Printf("Polled the network %d times in the "+
"last %s, with an average newest packet latency of %s", numPolls,
debugTrackPeriod, latencyAvg)
- }else{
+ } else {
jww.INFO.Printf("Polled the network %d times in the "+
"last %s", numPolls, debugTrackPeriod)
}
}
- if !isRunning.IsRunning(){
+ if !isRunning.IsRunning() {
jww.ERROR.Printf("Killing network follower " +
"due to failed exit")
return
@@ -89,9 +89,8 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch
func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
comms followNetworkComms, isRunning interfaces.Running) {
-
- //get the identity we will poll for
- identity, err := m.Session.Reception().GetIdentity(rng)
+ //Get the identity we will poll for
+ identity, err := m.Session.Reception().GetIdentity(rng, m.addrSpace.GetWithoutWait())
if err != nil {
jww.FATAL.Panicf("Failed to get an identity, this should be "+
"impossible: %+v", err)
@@ -121,7 +120,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
identity.EndRequest, identity.EndRequest.Sub(identity.StartRequest), host.GetId())
return comms.SendPoll(host, &pollReq)
})
- if !isRunning.IsRunning(){
+ if !isRunning.IsRunning() {
jww.ERROR.Printf("Killing network follower " +
"due to failed exit")
return
@@ -165,10 +164,9 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
m.GetSender().UpdateNdf(m.GetInstance().GetPartialNdf().Get())
}
- //check that the stored address space is correct
- m.Session.Reception().UpdateIdSize(uint(m.Instance.GetPartialNdf().Get().AddressSpaceSize))
- // Updates any id size readers of a network compliant id size
- m.Session.Reception().MarkIdSizeAsSet()
+ // Update the address space size
+ m.addrSpace.Update(m.Instance.GetPartialNdf().Get().AddressSpace[0].Size)
+
// NOTE: this updates rounds and updates the tracking of the health of the
// network
if pollResp.Updates != nil {
@@ -224,17 +222,16 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source,
}
}
-
newestTS := uint64(0)
- for i:=0;i<len(pollResp.Updates[len(pollResp.Updates)-1].Timestamps);i++{
- if pollResp.Updates[len(pollResp.Updates)-1].Timestamps[i]!=0{
+ for i := 0; i < len(pollResp.Updates[len(pollResp.Updates)-1].Timestamps); i++ {
+ if pollResp.Updates[len(pollResp.Updates)-1].Timestamps[i] != 0 {
newestTS = pollResp.Updates[len(pollResp.Updates)-1].Timestamps[i]
}
}
- newest := time.Unix(0,int64(newestTS))
+ newest := time.Unix(0, int64(newestTS))
- if newest.After(now){
+ if newest.After(now) {
deltaDur := newest.Sub(now)
m.latencySum = uint64(deltaDur)
m.numLatencies++
diff --git a/network/manager.go b/network/manager.go
index cf443e9912d43ffc14bdd73ddc09064d9ec3f51c..c072da4eb05bd74f60fd95dfdcfcdd7bd47e7428 100644
--- a/network/manager.go
+++ b/network/manager.go
@@ -47,9 +47,12 @@ type manager struct {
message *message.Manager
//number of polls done in a period of time
- tracker *uint64
- latencySum uint64
+ tracker *uint64
+ latencySum uint64
numLatencies uint64
+
+ // Address space size
+ addrSpace *ephemeral.AddressSpace
}
// NewManager builds a new reception manager object using inputted key fields
@@ -71,10 +74,11 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
tracker := uint64(0)
- //create manager object
+ // create manager object
m := manager{
- param: params,
- tracker: &tracker,
+ param: params,
+ tracker: &tracker,
+ addrSpace: ephemeral.NewAddressSpace(),
}
m.Internal = internal.Internal{
@@ -141,7 +145,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab
// Round processing
multi.Add(m.round.StartProcessors())
- multi.Add(ephemeral.Track(m.Session, m.ReceptionID))
+ multi.Add(ephemeral.Track(m.Session, m.addrSpace, m.ReceptionID))
return multi, nil
}
@@ -173,3 +177,22 @@ func (m *manager) CheckGarbledMessages() {
func (m *manager) InProgressRegistrations() int {
return len(m.Internal.NodeRegistration)
}
+
+// GetAddressSize returns the current address space size. It blocks until an
+// address space size is set.
+func (m *manager) GetAddressSize() uint8 {
+ return m.addrSpace.Get()
+}
+
+// RegisterAddressSizeNotification returns a channel that will trigger for every
+// address space size update. The provided tag is the unique ID for the channel.
+// Returns an error if the tag is already used.
+func (m *manager) RegisterAddressSizeNotification(tag string) (chan uint8, error) {
+ return m.addrSpace.RegisterNotification(tag)
+}
+
+// UnregisterAddressSizeNotification stops broadcasting address space size
+// updates on the channel with the specified tag.
+func (m *manager) UnregisterAddressSizeNotification(tag string) {
+ m.addrSpace.UnregisterNotification(tag)
+}
diff --git a/single/manager_test.go b/single/manager_test.go
index 2ce788f7b86ba44fd61a2aaa68814d1d62a49a02..a4384b166322df3206937ff58d56187aed0a6f52 100644
--- a/single/manager_test.go
+++ b/single/manager_test.go
@@ -324,10 +324,18 @@ func (tnm *testNetworkManager) InProgressRegistrations() int {
return 0
}
-func (t *testNetworkManager) GetSender() *gateway.Sender {
+func (tnm *testNetworkManager) GetSender() *gateway.Sender {
return nil
}
+func (tnm *testNetworkManager) GetAddressSize() uint8 { return 16 }
+
+func (tnm *testNetworkManager) RegisterAddressSizeNotification(string) (chan uint8, error) {
+ return nil, nil
+}
+
+func (tnm *testNetworkManager) UnregisterAddressSizeNotification(string) {}
+
func getNDF() *ndf.NetworkDefinition {
return &ndf.NetworkDefinition{
E2E: ndf.Group{
diff --git a/single/transmission.go b/single/transmission.go
index d5289f339b1120c87256228d87fd7b7514c0a9d7..bbe41371d7252b29bf78b87bed37088af95c733c 100644
--- a/single/transmission.go
+++ b/single/transmission.go
@@ -64,12 +64,9 @@ type roundEvents interface {
func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte,
tag string, MaxMsgs uint8, rng io.Reader, callback ReplyComm, timeout time.Duration, roundEvents roundEvents) error {
- // Get ephemeral ID address size; this will block until the client knows the
- // address size if it is currently unknown
- if m.store.Reception().IsIdSizeDefault() {
- m.store.Reception().WaitForIdSizeUpdate()
- }
- addressSize := m.store.Reception().GetIDSize()
+ // Get ephemeral ID address space size; this blocks until the address space
+ // size is set for the first time
+ addressSize := m.net.GetAddressSize()
// Create new CMIX message containing the transmission payload
cmixMsg, dhKey, rid, ephID, err := m.makeTransmitCmixMessage(partner,
@@ -93,6 +90,7 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte,
err = m.reception.AddIdentity(reception.Identity{
EphId: ephID,
Source: rid,
+ AddressSize: addressSize,
End: timeStart.Add(2 * timeout),
ExtraChecks: 10,
StartValid: timeStart.Add(-2 * timeout),
@@ -175,7 +173,7 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte,
// makeTransmitCmixMessage generates a CMIX message containing the transmission message,
// which contains the encrypted payload.
func (m *Manager) makeTransmitCmixMessage(partner contact2.Contact,
- payload []byte, tag string, maxMsgs uint8, addressSize uint,
+ payload []byte, tag string, maxMsgs uint8, addressSize uint8,
timeout time.Duration, timeNow time.Time, rng io.Reader) (format.Message,
*cyclic.Int, *id.ID, ephemeral.Id, error) {
e2eGrp := m.store.E2e().GetGroup()
@@ -255,8 +253,9 @@ func generateDhKeys(grp *cyclic.Group, dhPubKey *cyclic.Int,
// contains a nonce. If the generated ephemeral ID has a window that is not
// within +/- the given 2*timeout from now, then the IDs are generated again
// using a new nonce.
-func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int, addressSize uint,
- timeout time.Duration, timeNow time.Time, rng io.Reader) (*id.ID, ephemeral.Id, error) {
+func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int,
+ addressSize uint8, timeout time.Duration, timeNow time.Time,
+ rng io.Reader) (*id.ID, ephemeral.Id, error) {
var rid *id.ID
var ephID ephemeral.Id
@@ -277,7 +276,7 @@ func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int, addressSize uin
rid = msg.GetRID(publicKey)
// Generate the ephemeral ID
- ephID, start, end, err = ephemeral.GetId(rid, addressSize, timeNow.UnixNano())
+ ephID, start, end, err = ephemeral.GetId(rid, uint(addressSize), timeNow.UnixNano())
if err != nil {
return nil, ephemeral.Id{}, errors.Errorf("failed to generate "+
"ephemeral ID from newly generated ID: %+v", err)
diff --git a/single/transmission_test.go b/single/transmission_test.go
index 367a8b3bfa4bd07df0e266045281624934ea3ee3..761f65d9fc5e757aa84710cf7f3ef476f0ae91ac 100644
--- a/single/transmission_test.go
+++ b/single/transmission_test.go
@@ -366,7 +366,7 @@ func Test_makeIDs_Consistency(t *testing.T) {
if err != nil {
t.Fatalf("Failed to generate public key: %+v", err)
}
- addressSize := uint(32)
+ addressSize := uint8(32)
expectedPayload, err := unmarshalTransmitMessagePayload(msgPayload.Marshal())
if err != nil {
@@ -397,7 +397,7 @@ func Test_makeIDs_Consistency(t *testing.T) {
}
expectedEphID, _, _, err := ephemeral.GetId(expectedPayload.GetRID(publicKey),
- addressSize, timeNow.UnixNano())
+ uint(addressSize), timeNow.UnixNano())
if err != nil {
t.Fatalf("Failed to generate expected ephemeral ID: %+v", err)
}
diff --git a/storage/reception/IdentityUse.go b/storage/reception/IdentityUse.go
index 2c8b9df3b64601651d7489e96935ff1a3d360db9..08f1a070aadc0ac5271156516e860371c0ca7daa 100644
--- a/storage/reception/IdentityUse.go
+++ b/storage/reception/IdentityUse.go
@@ -1,12 +1,15 @@
package reception
import (
+ "fmt"
"github.com/pkg/errors"
"gitlab.com/elixxir/client/storage/rounds"
"gitlab.com/elixxir/crypto/hash"
"gitlab.com/xx_network/crypto/randomness"
"io"
"math/big"
+ "strconv"
+ "strings"
"time"
)
@@ -48,3 +51,17 @@ func (iu IdentityUse) setSamplingPeriod(rng io.Reader) (IdentityUse, error) {
iu.EndRequest = iu.EndValid.Add(iu.RequestMask - time.Duration(periodOffset))
return iu, nil
}
+
+func (iu IdentityUse) GoString() string {
+ str := make([]string, 0, 7)
+
+ str = append(str, "Identity:"+iu.Identity.GoString())
+ str = append(str, "StartRequest:"+iu.StartRequest.String())
+ str = append(str, "EndRequest:"+iu.EndRequest.String())
+ str = append(str, "Fake:"+strconv.FormatBool(iu.Fake))
+ str = append(str, "UR:"+fmt.Sprintf("%+v", iu.UR))
+ str = append(str, "ER:"+fmt.Sprintf("%+v", iu.ER))
+ str = append(str, "CR:"+fmt.Sprintf("%+v", iu.CR))
+
+ return "{" + strings.Join(str, ", ") + "}"
+}
diff --git a/storage/reception/fake.go b/storage/reception/fake.go
index 38cb63a241cc25122b98ff8b5a1762da0f3994a5..3e6ba00fc711c097b7b765027962e60dc81d98bb 100644
--- a/storage/reception/fake.go
+++ b/storage/reception/fake.go
@@ -10,7 +10,8 @@ import (
// generateFakeIdentity generates a fake identity of the given size with the
// given random number generator
-func generateFakeIdentity(rng io.Reader, idSize uint, now time.Time) (IdentityUse, error) {
+func generateFakeIdentity(rng io.Reader, addressSize uint8,
+ now time.Time) (IdentityUse, error) {
// Randomly generate an identity
randIdBytes := make([]byte, id.ArrIDLen-1)
if _, err := rng.Read(randIdBytes); err != nil {
@@ -23,7 +24,8 @@ func generateFakeIdentity(rng io.Reader, idSize uint, now time.Time) (IdentityUs
randID.SetType(id.User)
// Generate the current ephemeral ID from the random identity
- ephID, start, end, err := ephemeral.GetId(randID, idSize, now.UnixNano())
+ ephID, start, end, err := ephemeral.GetId(
+ randID, uint(addressSize), now.UnixNano())
if err != nil {
return IdentityUse{}, errors.WithMessage(err, "failed to generate an "+
"ephemeral ID for random identity when none is available")
@@ -33,6 +35,7 @@ func generateFakeIdentity(rng io.Reader, idSize uint, now time.Time) (IdentityUs
Identity: Identity{
EphId: ephID,
Source: randID,
+ AddressSize: addressSize,
End: end,
ExtraChecks: 0,
StartValid: start,
diff --git a/storage/reception/fake_test.go b/storage/reception/fake_test.go
index 2c748ac2258e0950e8901a2748c0057c498770ef..0cb52480ac8eb3b713f9334eefa1dad9b4351dc3 100644
--- a/storage/reception/fake_test.go
+++ b/storage/reception/fake_test.go
@@ -4,6 +4,7 @@ import (
"encoding/json"
"math"
"math/rand"
+ "strconv"
"strings"
"testing"
"time"
@@ -13,12 +14,14 @@ import (
func Test_generateFakeIdentity(t *testing.T) {
rng := rand.New(rand.NewSource(42))
+ addressSize := uint8(15)
end, _ := json.Marshal(time.Unix(0, 1258494203759765625))
startValid, _ := json.Marshal(time.Unix(0, 1258407803759765625))
endValid, _ := json.Marshal(time.Unix(0, 1258494203759765625))
expected := "{\"EphId\":[0,0,0,0,0,0,46,197]," +
"\"Source\":[83,140,127,150,177,100,191,27,151,187,159,75,180,114," +
"232,159,91,20,132,242,82,9,201,217,52,62,146,186,9,221,157,82,3]," +
+ "\"AddressSize\":" + strconv.Itoa(int(addressSize)) + "," +
"\"End\":" + string(end) + ",\"ExtraChecks\":0," +
"\"StartValid\":" + string(startValid) + "," +
"\"EndValid\":" + string(endValid) + "," +
@@ -28,7 +31,7 @@ func Test_generateFakeIdentity(t *testing.T) {
timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC)
- received, err := generateFakeIdentity(rng, 15, timestamp)
+ received, err := generateFakeIdentity(rng, addressSize, timestamp)
if err != nil {
t.Errorf("generateFakeIdentity() returned an error: %+v", err)
}
@@ -58,7 +61,7 @@ func Test_generateFakeIdentity_GetEphemeralIdError(t *testing.T) {
rng := rand.New(rand.NewSource(42))
timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC)
- _, err := generateFakeIdentity(rng, math.MaxUint64, timestamp)
+ _, err := generateFakeIdentity(rng, math.MaxInt8, timestamp)
if err == nil || !strings.Contains(err.Error(), "ephemeral ID") {
t.Errorf("generateFakeIdentity() did not return the correct error on "+
"failure to generate ephemeral ID: %+v", err)
diff --git a/storage/reception/identity.go b/storage/reception/identity.go
index 4c65d9696ba4a6d0fd567395a4d86e8bd9145b45..5d0721e5c0830edbb01b6e95ea8a6f63ad0bf638 100644
--- a/storage/reception/identity.go
+++ b/storage/reception/identity.go
@@ -8,6 +8,7 @@ import (
"gitlab.com/xx_network/primitives/id/ephemeral"
"gitlab.com/xx_network/primitives/netTime"
"strconv"
+ "strings"
"time"
)
@@ -16,8 +17,9 @@ const identityStorageVersion = 0
type Identity struct {
// Identity
- EphId ephemeral.Id
- Source *id.ID
+ EphId ephemeral.Id
+ Source *id.ID
+ AddressSize uint8
// Usage variables
End time.Time // Timestamp when active polling will stop
@@ -76,13 +78,30 @@ func (i Identity) delete(kv *versioned.KV) error {
return kv.Delete(identityStorageKey, identityStorageVersion)
}
-func (i *Identity) String() string {
+func (i Identity) String() string {
return strconv.FormatInt(i.EphId.Int64(), 16) + " " + i.Source.String()
}
+func (i Identity) GoString() string {
+ str := make([]string, 0, 9)
+
+ str = append(str, "EphId:"+strconv.FormatInt(i.EphId.Int64(), 16))
+ str = append(str, "Source:"+i.Source.String())
+ str = append(str, "AddressSize:"+strconv.FormatUint(uint64(i.AddressSize), 10))
+ str = append(str, "End:"+i.End.String())
+ str = append(str, "ExtraChecks:"+strconv.FormatUint(uint64(i.ExtraChecks), 10))
+ str = append(str, "StartValid:"+i.StartValid.String())
+ str = append(str, "EndValid:"+i.EndValid.String())
+ str = append(str, "RequestMask:"+i.RequestMask.String())
+ str = append(str, "Ephemeral:"+strconv.FormatBool(i.Ephemeral))
+
+ return "{" + strings.Join(str, ", ") + "}"
+}
+
func (i Identity) Equal(b Identity) bool {
return i.EphId == b.EphId &&
i.Source.Cmp(b.Source) &&
+ i.AddressSize == b.AddressSize &&
i.End.Equal(b.End) &&
i.ExtraChecks == b.ExtraChecks &&
i.StartValid.Equal(b.StartValid) &&
diff --git a/storage/reception/identity_test.go b/storage/reception/identity_test.go
index d80b9501486c94045209da5c99dc0b77a07306f8..1fada0d707c848b5d2ea9f66398b4c1cc2c5a0c7 100644
--- a/storage/reception/identity_test.go
+++ b/storage/reception/identity_test.go
@@ -16,6 +16,7 @@ func TestIdentity_EncodeDecode(t *testing.T) {
r := Identity{
EphId: ephemeral.Id{},
Source: &id.Permissioning,
+ AddressSize: 15,
End: netTime.Now().Round(0),
ExtraChecks: 12,
StartValid: netTime.Now().Round(0),
@@ -45,6 +46,7 @@ func TestIdentity_Delete(t *testing.T) {
r := Identity{
EphId: ephemeral.Id{},
Source: &id.Permissioning,
+ AddressSize: 15,
End: netTime.Now().Round(0),
ExtraChecks: 12,
StartValid: netTime.Now().Round(0),
@@ -90,11 +92,11 @@ func TestIdentity_Equal(t *testing.T) {
if !a.Identity.Equal(b.Identity) {
t.Errorf("Equal() found two equal identities as unequal."+
- "\na: %s\nb: %s", a.String(), b.String())
+ "\na: %s\nb: %s", a, b)
}
if a.Identity.Equal(c.Identity) {
t.Errorf("Equal() found two unequal identities as equal."+
- "\na: %s\nc: %s", a.String(), c.String())
+ "\na: %s\nc: %s", a, c)
}
}
diff --git a/storage/reception/store.go b/storage/reception/store.go
index 08498aa5c0aa856727fe60c989419f2aeb18616a..b6bdf41f7db520e5e9d88880a1246ced9dc2b3b0 100644
--- a/storage/reception/store.go
+++ b/storage/reception/store.go
@@ -1,7 +1,6 @@
package reception
import (
- "bytes"
"encoding/json"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman"
@@ -12,7 +11,6 @@ import (
"gitlab.com/xx_network/primitives/netTime"
"golang.org/x/crypto/blake2b"
"io"
- "strconv"
"sync"
"time"
)
@@ -20,17 +18,11 @@ import (
const receptionPrefix = "reception"
const receptionStoreStorageKey = "receptionStoreKey"
const receptionStoreStorageVersion = 0
-const receptionIDSizeStorageKey = "receptionIDSizeKey"
-const receptionIDSizeStorageVersion = 0
-const defaultIDSize = 12
type Store struct {
// Identities which are being actively checked
- active []*registration
- present map[idHash]interface{}
- idSize int
- idSizeCond *sync.Cond
- isIdSizeSet bool
+ active []*registration
+ present map[idHash]struct{}
kv *versioned.KV
@@ -56,13 +48,10 @@ func makeIdHash(ephID ephemeral.Id, source *id.ID) idHash {
// NewStore creates a new reception store that starts empty.
func NewStore(kv *versioned.KV) *Store {
- kv = kv.Prefix(receptionPrefix)
s := &Store{
- active: make([]*registration, 0),
- present: make(map[idHash]interface{}),
- idSize: defaultIDSize * 2,
- kv: kv,
- idSizeCond: sync.NewCond(&sync.Mutex{}),
+ active: []*registration{},
+ present: make(map[idHash]struct{}),
+ kv: kv.Prefix(receptionPrefix),
}
// Store the empty list
@@ -70,53 +59,37 @@ func NewStore(kv *versioned.KV) *Store {
jww.FATAL.Panicf("Failed to save new reception store: %+v", err)
}
- // Update the size so queries can be made
- s.UpdateIdSize(defaultIDSize)
-
return s
}
func LoadStore(kv *versioned.KV) *Store {
kv = kv.Prefix(receptionPrefix)
- s := &Store{
- kv: kv,
- present: make(map[idHash]interface{}),
- idSizeCond: sync.NewCond(&sync.Mutex{}),
- }
// Load the versioned object for the reception list
- vo, err := kv.Get(receptionStoreStorageKey,
- receptionStoreStorageVersion)
+ vo, err := kv.Get(receptionStoreStorageKey, receptionStoreStorageVersion)
if err != nil {
jww.FATAL.Panicf("Failed to get the reception storage list: %+v", err)
}
- identities := make([]storedReference, len(s.active))
- err = json.Unmarshal(vo.Data, &identities)
- if err != nil {
- jww.FATAL.Panicf("Failed to unmarshal the reception storage list: %+v", err)
+ // JSON unmarshal identities list
+ var identities []storedReference
+ if err = json.Unmarshal(vo.Data, &identities); err != nil {
+ jww.FATAL.Panicf("Failed to unmarshal the stored identity list: %+v", err)
+ }
+
+ s := &Store{
+ active: make([]*registration, len(identities)),
+ present: make(map[idHash]struct{}, len(identities)),
+ kv: kv,
}
- s.active = make([]*registration, len(identities))
for i, sr := range identities {
s.active[i], err = loadRegistration(sr.Eph, sr.Source, sr.StartValid, s.kv)
if err != nil {
jww.FATAL.Panicf("Failed to load registration for %s: %+v",
regPrefix(sr.Eph, sr.Source, sr.StartValid), err)
}
- s.present[makeIdHash(sr.Eph, sr.Source)] = nil
- }
-
- // Load the ephemeral ID length
- vo, err = kv.Get(receptionIDSizeStorageKey,
- receptionIDSizeStorageVersion)
- if err != nil {
- jww.FATAL.Panicf("Failed to get the reception ID size: %+v", err)
- }
-
- if s.idSize, err = strconv.Atoi(string(vo.Data)); err != nil {
- jww.FATAL.Panicf("Failed to unmarshal the reception ID size: %+v",
- err)
+ s.present[makeIdHash(sr.Eph, sr.Source)] = struct{}{}
}
return s
@@ -124,7 +97,6 @@ func LoadStore(kv *versioned.KV) *Store {
func (s *Store) save() error {
identities := s.makeStoredReferences()
-
data, err := json.Marshal(&identities)
if err != nil {
return errors.WithMessage(err, "failed to store reception store")
@@ -165,7 +137,7 @@ func (s *Store) makeStoredReferences() []storedReference {
return identities[:i]
}
-func (s *Store) GetIdentity(rng io.Reader) (IdentityUse, error) {
+func (s *Store) GetIdentity(rng io.Reader, addressSize uint8) (IdentityUse, error) {
s.mux.Lock()
defer s.mux.Unlock()
@@ -181,7 +153,7 @@ func (s *Store) GetIdentity(rng io.Reader) (IdentityUse, error) {
// poll with so we can continue tracking the network and to further
// obfuscate network identities.
if len(s.active) == 0 {
- identity, err = generateFakeIdentity(rng, uint(s.idSize), now)
+ identity, err = generateFakeIdentity(rng, addressSize, now)
if err != nil {
jww.FATAL.Panicf("Failed to generate a new ID when none "+
"available: %+v", err)
@@ -196,20 +168,18 @@ func (s *Store) GetIdentity(rng io.Reader) (IdentityUse, error) {
// Calculate the sampling period
identity, err = identity.setSamplingPeriod(rng)
if err != nil {
- jww.FATAL.Panicf("Failed to calculate the sampling period: "+
- "%+v", err)
+ jww.FATAL.Panicf("Failed to calculate the sampling period: %+v", err)
}
return identity, nil
}
func (s *Store) AddIdentity(identity Identity) error {
-
idH := makeIdHash(identity.EphId, identity.Source)
s.mux.Lock()
defer s.mux.Unlock()
- //do not make duplicates of IDs
+ // Do not make duplicates of IDs
if _, ok := s.present[idH]; ok {
jww.DEBUG.Printf("Ignoring duplicate identity for %d (%s)",
identity.EphId, identity.Source)
@@ -218,7 +188,7 @@ func (s *Store) AddIdentity(identity Identity) error {
if identity.StartValid.After(identity.EndValid) {
return errors.Errorf("Cannot add an identity which start valid "+
- "time (%s) is after its end valid time(%s)", identity.StartValid,
+ "time (%s) is after its end valid time (%s)", identity.StartValid,
identity.EndValid)
}
@@ -229,11 +199,11 @@ func (s *Store) AddIdentity(identity Identity) error {
}
s.active = append(s.active, reg)
- s.present[idH] = nil
+ s.present[idH] = struct{}{}
if !identity.Ephemeral {
if err := s.save(); err != nil {
- jww.FATAL.Panicf("Failed to save reception store after identity " +
- "addition")
+ jww.FATAL.Panicf("Failed to save reception store after identity "+
+ "addition: %+v", err)
}
}
@@ -244,86 +214,44 @@ func (s *Store) RemoveIdentity(ephID ephemeral.Id) {
s.mux.Lock()
defer s.mux.Unlock()
- for i := 0; i < len(s.active); i++ {
- inQuestion := s.active[i]
- if bytes.Equal(inQuestion.EphId[:], ephID[:]) {
+ for i, inQuestion := range s.active {
+ if inQuestion.EphId == ephID {
s.active = append(s.active[:i], s.active[i+1:]...)
+
err := inQuestion.Delete()
if err != nil {
jww.FATAL.Panicf("Failed to delete identity: %+v", err)
}
+
if !inQuestion.Ephemeral {
if err := s.save(); err != nil {
- jww.FATAL.Panicf("Failed to save reception store after " +
- "identity removal")
+ jww.FATAL.Panicf("Failed to save reception store after "+
+ "identity removal: %+v", err)
}
}
+
return
}
}
}
-// Returns whether idSize is set to default
-func (s *Store) IsIdSizeDefault() bool {
- s.mux.Lock()
- defer s.mux.Unlock()
- return s.isIdSizeSet
-}
-
-// Updates idSize boolean and broadcasts to any waiting
-// idSize readers that id size is now updated with the network
-func (s *Store) MarkIdSizeAsSet() {
- s.mux.Lock()
- s.idSizeCond.L.Lock()
- defer s.mux.Unlock()
- defer s.idSizeCond.L.Unlock()
- s.isIdSizeSet = true
- s.idSizeCond.Broadcast()
-}
-
-// Wrapper function which calls a
-// sync.Cond wait. Used on any reader of idSize
-// who cannot use the default id size
-func (s *Store) WaitForIdSizeUpdate() {
- s.idSizeCond.L.Lock()
- defer s.idSizeCond.L.Unlock()
- for !s.IsIdSizeDefault() {
-
- s.idSizeCond.Wait()
- }
-}
-
-func (s *Store) UpdateIdSize(idSize uint) {
+func (s *Store) SetToExpire(addressSize uint8) {
s.mux.Lock()
defer s.mux.Unlock()
- if s.idSize == int(idSize) {
- return
- }
- jww.INFO.Printf("Updating address space size to %v", idSize)
-
- s.idSize = int(idSize)
-
- // Store the ID size
- obj := &versioned.Object{
- Version: receptionIDSizeStorageVersion,
- Timestamp: netTime.Now(),
- Data: []byte(strconv.Itoa(s.idSize)),
- }
+ expire := netTime.Now().Add(5 * time.Minute)
- err := s.kv.Set(receptionIDSizeStorageKey,
- receptionIDSizeStorageVersion, obj)
- if err != nil {
- jww.FATAL.Panicf("Failed to store reception ID size: %+v", err)
+ for i, active := range s.active {
+ if active.AddressSize < addressSize && active.EndValid.After(expire) {
+ s.active[i].EndValid = expire
+ err := s.active[i].store(s.kv)
+ if err != nil {
+ jww.ERROR.Printf("Failed to store identity %d: %+v", i, err)
+ }
+ }
}
}
-func (s *Store) GetIDSize() uint {
- s.mux.Lock()
- defer s.mux.Unlock()
- return uint(s.idSize)
-}
-
func (s *Store) prune(now time.Time) {
lengthBefore := len(s.active)
@@ -332,8 +260,8 @@ func (s *Store) prune(now time.Time) {
inQuestion := s.active[i]
if now.After(inQuestion.End) && inQuestion.ExtraChecks == 0 {
if err := inQuestion.Delete(); err != nil {
- jww.ERROR.Printf("Failed to delete Identity for %s: "+
- "%+v", inQuestion, err)
+ jww.ERROR.Printf("Failed to delete Identity for %s: %+v",
+ inQuestion, err)
}
s.active = append(s.active[:i], s.active[i+1:]...)
@@ -346,7 +274,7 @@ func (s *Store) prune(now time.Time) {
if lengthBefore != len(s.active) {
jww.INFO.Printf("Pruned %d identities", lengthBefore-len(s.active))
if err := s.save(); err != nil {
- jww.FATAL.Panicf("Failed to store reception storage")
+ jww.FATAL.Panicf("Failed to store reception storage: %+v", err)
}
}
}
@@ -360,11 +288,15 @@ func (s *Store) selectIdentity(rng io.Reader, now time.Time) (IdentityUse, error
} else {
seed := make([]byte, 32)
if _, err := rng.Read(seed); err != nil {
- return IdentityUse{}, errors.WithMessage(err, "Failed to "+
- "choose ID due to rng failure")
+ return IdentityUse{}, errors.WithMessage(err, "Failed to choose "+
+ "ID due to RNG failure")
}
- selectedNum := large.NewInt(1).Mod(large.NewIntFromBytes(seed), large.NewInt(int64(len(s.active))))
+ selectedNum := large.NewInt(1).Mod(
+ large.NewIntFromBytes(seed),
+ large.NewInt(int64(len(s.active))),
+ )
+
selected = s.active[selectedNum.Uint64()]
}
@@ -372,9 +304,12 @@ func (s *Store) selectIdentity(rng io.Reader, now time.Time) (IdentityUse, error
selected.ExtraChecks--
}
- jww.TRACE.Printf("Selected identity: EphId: %d ID: %s End: %s StartValid: %s EndValid: %s",
- selected.EphId.Int64(), selected.Source, selected.End.Format("01/02/06 03:04:05 pm"),
- selected.StartValid.Format("01/02/06 03:04:05 pm"), selected.EndValid.Format("01/02/06 03:04:05 pm"))
+ jww.TRACE.Printf("Selected identity: EphId: %d ID: %s End: %s "+
+ "StartValid: %s EndValid: %s",
+ selected.EphId.Int64(), selected.Source,
+ selected.End.Format("01/02/06 03:04:05 pm"),
+ selected.StartValid.Format("01/02/06 03:04:05 pm"),
+ selected.EndValid.Format("01/02/06 03:04:05 pm"))
return IdentityUse{
Identity: selected.Identity,
diff --git a/storage/reception/store_test.go b/storage/reception/store_test.go
index 8779e8f54cedacd24ae9e4c892fb3b19c279b814..b969c9680a0f4e0885808df8b9144db6ca82d819 100644
--- a/storage/reception/store_test.go
+++ b/storage/reception/store_test.go
@@ -16,13 +16,12 @@ func TestNewStore(t *testing.T) {
kv := versioned.NewKV(make(ekv.Memstore))
expected := &Store{
active: make([]*registration, 0),
- idSize: defaultIDSize,
kv: kv,
}
s := NewStore(kv)
- if !reflect.DeepEqual([]*registration{}, s.active) || s.idSize != defaultIDSize {
+ if !reflect.DeepEqual([]*registration{}, s.active) {
t.Errorf("NewStore() failed to return the expected Store."+
"\nexpected: %+v\nreceived: %+v", expected, s)
}
@@ -154,14 +153,14 @@ func TestStore_GetIdentity(t *testing.T) {
t.Errorf("AddIdentity() produced an error: %+v", err)
}
- idu, err := s.GetIdentity(prng)
+ idu, err := s.GetIdentity(prng, 15)
if err != nil {
t.Errorf("GetIdentity() produced an error: %+v", err)
}
if !testID.Equal(idu.Identity) {
t.Errorf("GetIdentity() did not return the expected Identity."+
- "\nexpected: %s\nreceived: %s", testID.String(), idu.String())
+ "\nexpected: %s\nreceived: %s", testID, idu)
}
}
@@ -181,7 +180,7 @@ func TestStore_AddIdentity(t *testing.T) {
if !s.active[0].Identity.Equal(testID.Identity) {
t.Errorf("Failed to get expected Identity.\nexpected: %s\nreceived: %s",
- testID.Identity.String(), s.active[0])
+ testID.Identity, s.active[0])
}
}
@@ -204,19 +203,6 @@ func TestStore_RemoveIdentity(t *testing.T) {
}
}
-func TestStore_UpdateIdSize(t *testing.T) {
- kv := versioned.NewKV(make(ekv.Memstore))
- s := NewStore(kv)
- newSize := s.idSize * 2
-
- s.UpdateIdSize(uint(newSize))
-
- if s.idSize != newSize {
- t.Errorf("UpdateIdSize() failed to update the size."+
- "\nexpected: %d\nrecieved: %d", newSize, s.idSize)
- }
-}
-
func TestStore_prune(t *testing.T) {
kv := versioned.NewKV(make(ekv.Memstore))
s := NewStore(kv)