diff --git a/api/utilsInterfaces_test.go b/api/utilsInterfaces_test.go index 62ab16f5231d0bdc81ab30f8e0a1b890fb0931c7..259349681d4c3466fcf5af3bddc84be63215d076 100644 --- a/api/utilsInterfaces_test.go +++ b/api/utilsInterfaces_test.go @@ -125,3 +125,11 @@ func (t *testNetworkManagerGeneric) InProgressRegistrations() int { func (t *testNetworkManagerGeneric) GetSender() *gateway.Sender { return t.sender } + +func (t *testNetworkManagerGeneric) GetAddressSize() uint8 { return 0 } + +func (t *testNetworkManagerGeneric) RegisterAddressSizeNotification(string) (chan uint8, error) { + return nil, nil +} + +func (t *testNetworkManagerGeneric) UnregisterAddressSizeNotification(string) {} diff --git a/cmd/root.go b/cmd/root.go index c0ff54e0b1c349b46a968c6ca292391fc800d2ce..3b5bc7dc283c29b2137be041228df6df0a714bcb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -512,7 +512,7 @@ func waitUntilConnected(connected chan bool) { isConnected) break case <-timeoutTimer.C: - jww.FATAL.Panic("timeout on connection") + jww.FATAL.Panicf("timeout on connection after %s", waitTimeout*time.Second) } } diff --git a/go.mod b/go.mod index 06ffdffc77ede3d213855d7619bbb69f14d36243..483ac6df4b2f8141acf2df6842f76473fdb1ea0c 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( gitlab.com/elixxir/primitives v0.0.3-0.20210524170524-9780695d2b55 gitlab.com/xx_network/comms v0.0.4-0.20210524170426-175f698a7b07 gitlab.com/xx_network/crypto v0.0.5-0.20210524170434-dc9a398a2581 - gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db + gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect diff --git a/go.sum b/go.sum index 84d7662f419e86ed2bb6c694e95e1d5a92332a4f..530dd2f546baf121fca93ddf313b409e401c9f18 100644 --- a/go.sum +++ b/go.sum @@ -273,6 +273,12 @@ gitlab.com/xx_network/primitives v0.0.4-0.20210517202253-c7b4bd0087ea/go.mod h1: gitlab.com/xx_network/primitives v0.0.4-0.20210521183842-3b12812ac984/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db h1:JD/ttwHOzJLEZg/Q01f59uo9p8ZoPLvIyTPayl5kBxA= gitlab.com/xx_network/primitives v0.0.4-0.20210524170438-ab712af183db/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20210524180355-07f8a01a856f h1:xvPCYeH79Op96BSPc/XJ9CpxZXWn0KJJ2i5SDvey/gc= +gitlab.com/xx_network/primitives v0.0.4-0.20210524180355-07f8a01a856f/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20210524201134-f2e4de31b813 h1:flLCpV/6uFJl+UKLDy116iyET9x/IRlomWISZnA79cs= +gitlab.com/xx_network/primitives v0.0.4-0.20210524201134-f2e4de31b813/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= +gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd h1:+vQkuM/o3TqS6bHqIyy36+aGn3TTrvvWFALol2TU13w= +gitlab.com/xx_network/primitives v0.0.4-0.20210525232109-3f99a04adcfd/go.mod h1:9imZHvYwNFobxueSvVtHneZLk9wTK7HQTzxPm+zhFhE= gitlab.com/xx_network/ring v0.0.2 h1:TlPjlbFdhtJrwvRgIg4ScdngMTaynx/ByHBRZiXCoL0= gitlab.com/xx_network/ring v0.0.2/go.mod h1:aLzpP2TiZTQut/PVHR40EJAomzugDdHXetbieRClXIM= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= diff --git a/interfaces/networkManager.go b/interfaces/networkManager.go index 1daa7d38104731870459b1323c8d918401a398fd..ad9e03caf6f53e13e9d8e9190f2417749c99f490 100644 --- a/interfaces/networkManager.go +++ b/interfaces/networkManager.go @@ -29,6 +29,19 @@ type NetworkManager interface { Follow(report ClientErrorReport) (stoppable.Stoppable, error) CheckGarbledMessages() InProgressRegistrations() int + + // GetAddressSize returns the current address size of IDs. Blocks until an + // address size is known. + GetAddressSize() uint8 + + // RegisterAddressSizeNotification returns a channel that will trigger for + // every address space size update. The provided tag is the unique ID for + // the channel. Returns an error if the tag is already used. + RegisterAddressSizeNotification(tag string) (chan uint8, error) + + // UnregisterAddressSizeNotification stops broadcasting address space size + // updates on the channel with the specified tag. + UnregisterAddressSizeNotification(tag string) } //for use in key exchange which needs to be callable inside of network diff --git a/keyExchange/utils_test.go b/keyExchange/utils_test.go index 58c21e7fef3e722ce5377217e36edc72248ffb31..d9dd4ef14b2dea6fd6c3f06db794e14a87b44cc8 100644 --- a/keyExchange/utils_test.go +++ b/keyExchange/utils_test.go @@ -108,6 +108,14 @@ func (t *testNetworkManagerGeneric) GetSender() *gateway.Sender { return nil } +func (t *testNetworkManagerGeneric) GetAddressSize() uint8 { return 0 } + +func (t *testNetworkManagerGeneric) RegisterAddressSizeNotification(string) (chan uint8, error) { + return nil, nil +} + +func (t *testNetworkManagerGeneric) UnregisterAddressSizeNotification(string) {} + func InitTestingContextGeneric(i interface{}) (*storage.Session, interfaces.NetworkManager, error) { switch i.(type) { case *testing.T, *testing.M, *testing.B, *testing.PB: @@ -219,6 +227,14 @@ func (t *testNetworkManagerFullExchange) GetSender() *gateway.Sender { return nil } +func (t *testNetworkManagerFullExchange) GetAddressSize() uint8 { return 0 } + +func (t *testNetworkManagerFullExchange) RegisterAddressSizeNotification(string) (chan uint8, error) { + return nil, nil +} + +func (t *testNetworkManagerFullExchange) UnregisterAddressSizeNotification(string) {} + func InitTestingContextFullExchange(i interface{}) (*storage.Session, *switchboard.Switchboard, interfaces.NetworkManager) { switch i.(type) { case *testing.T, *testing.M, *testing.B, *testing.PB: diff --git a/network/ephemeral/addressSpace.go b/network/ephemeral/addressSpace.go new file mode 100644 index 0000000000000000000000000000000000000000..f942df77df38bfa4455bdf096ad2d4e0add40b49 --- /dev/null +++ b/network/ephemeral/addressSpace.go @@ -0,0 +1,138 @@ +package ephemeral + +import ( + "github.com/pkg/errors" + jww "github.com/spf13/jwalterweatherman" + "sync" + "testing" +) + +const ( + // The initial value for the address space size. This value signifies that + // the address space size has not yet been updated. + initSize = 1 +) + +// AddressSpace contains the current address space size used for creating +// ephemeral IDs and the infrastructure to alert other processes when an Update +// occurs. +type AddressSpace struct { + size uint8 + notifyMap map[string]chan uint8 + cond *sync.Cond +} + +// NewAddressSpace initialises a new AddressSpace and returns it. +func NewAddressSpace() *AddressSpace { + return &AddressSpace{ + size: initSize, + notifyMap: make(map[string]chan uint8), + cond: sync.NewCond(&sync.Mutex{}), + } +} + +// Get returns the current address space size. It blocks until an address space +// size is set. +func (as *AddressSpace) Get() uint8 { + as.cond.L.Lock() + defer as.cond.L.Unlock() + + // If the size has been set, then return the current size + if as.size != initSize { + return as.size + } + + // If the size is not set, then block until it is set + as.cond.Wait() + + return as.size +} + +// GetWithoutWait returns the current address space size regardless if it has +// been set yet. +func (as *AddressSpace) GetWithoutWait() uint8 { + as.cond.L.Lock() + defer as.cond.L.Unlock() + return as.size +} + +// Update updates the address space size to the new size, if it is larger. Then, +// each registered channel is notified of the Update. If this was the first time +// that the address space size was set, then the conditional broadcasts to stop +// blocking for all threads waiting on Get. +func (as *AddressSpace) Update(newSize uint8) { + as.cond.L.Lock() + defer as.cond.L.Unlock() + + // Skip Update if the address space size is unchanged + if as.size >= newSize { + return + } + + // Update address space size + oldSize := as.size + as.size = newSize + jww.INFO.Printf("Updated address space size from %d to %d", oldSize, as.size) + + // Broadcast that the address space size is set, if set for the first time + if oldSize == initSize { + as.cond.Broadcast() + } else { + // Broadcast the new address space size to all registered channels + for chanID, sizeChan := range as.notifyMap { + select { + case sizeChan <- as.size: + default: + jww.ERROR.Printf("Failed to send address space Update of %d on "+ + "channel with ID %s", as.size, chanID) + } + } + } +} + +// RegisterNotification returns a channel that will trigger for every address +// space size Update. The provided tag is the unique ID for the channel. +// Returns an error if the tag is already used. +func (as *AddressSpace) RegisterNotification(tag string) (chan uint8, error) { + as.cond.L.Lock() + defer as.cond.L.Unlock() + + if _, exists := as.notifyMap[tag]; exists { + return nil, errors.Errorf("tag \"%s\" already exists in notify map", tag) + } + + as.notifyMap[tag] = make(chan uint8, 1) + + return as.notifyMap[tag], nil +} + +// UnregisterNotification stops broadcasting address space size updates on the +// channel with the specified tag. +func (as *AddressSpace) UnregisterNotification(tag string) { + as.cond.L.Lock() + defer as.cond.L.Unlock() + + delete(as.notifyMap, tag) +} + +// NewTestAddressSpace initialises a new AddressSpace for testing with the given +// size. +func NewTestAddressSpace(newSize uint8, x interface{}) *AddressSpace { + switch x.(type) { + case *testing.T, *testing.M, *testing.B, *testing.PB: + break + default: + jww.FATAL.Panicf("NewTestAddressSpace is restricted to testing only. "+ + "Got %T", x) + } + + as := &AddressSpace{ + size: initSize, + notifyMap: make(map[string]chan uint8), + cond: sync.NewCond(&sync.Mutex{}), + } + + as.Update(newSize) + + return as +} diff --git a/network/ephemeral/addressSpace_test.go b/network/ephemeral/addressSpace_test.go new file mode 100644 index 0000000000000000000000000000000000000000..e08f08d844a6aff3bf387db5456a2147fe89f16b --- /dev/null +++ b/network/ephemeral/addressSpace_test.go @@ -0,0 +1,291 @@ +package ephemeral + +import ( + "reflect" + "strconv" + "sync" + "testing" + "time" +) + +// Unit test of NewAddressSpace. +func Test_newAddressSpace(t *testing.T) { + expected := &AddressSpace{ + size: initSize, + notifyMap: make(map[string]chan uint8), + cond: sync.NewCond(&sync.Mutex{}), + } + + as := NewAddressSpace() + + if !reflect.DeepEqual(expected, as) { + t.Errorf("NewAddressSpace failed to return the expected AddressSpace."+ + "\nexpected: %+v\nreceived: %+v", expected, as) + } +} + +// Test that AddressSpace.Get blocks when the address space size has not been +// set and that it does not block when it has been set. +func Test_addressSpace_Get(t *testing.T) { + as := NewAddressSpace() + expectedSize := uint8(42) + + // Call Get and error if it does not block + wait := make(chan uint8) + go func() { wait <- as.Get() }() + select { + case size := <-wait: + t.Errorf("Get failed to block and returned size %d.", size) + case <-time.NewTimer(10 * time.Millisecond).C: + } + + // Update address size + as.cond.L.Lock() + as.size = expectedSize + as.cond.L.Unlock() + + // Call Get and error if it does block + wait = make(chan uint8) + go func() { wait <- as.Get() }() + select { + case size := <-wait: + if size != expectedSize { + t.Errorf("Get returned the wrong size.\nexpected: %d\nreceived: %d", + expectedSize, size) + } + case <-time.NewTimer(15 * time.Millisecond).C: + t.Error("Get blocking when the size has been updated.") + } +} + +// Test that AddressSpace.Get blocks until the condition broadcasts. +func Test_addressSpace_Get_WaitBroadcast(t *testing.T) { + as := NewAddressSpace() + + wait := make(chan uint8) + go func() { wait <- as.Get() }() + + go func() { + select { + case size := <-wait: + if size != initSize { + t.Errorf("Get returned the wrong size.\nexpected: %d\nreceived: %d", + initSize, size) + } + case <-time.NewTimer(15 * time.Millisecond).C: + t.Error("Get blocking when the Cond has broadcast.") + } + }() + + time.Sleep(5 * time.Millisecond) + + as.cond.Broadcast() +} + +// Unit test of AddressSpace.GetWithoutWait. +func Test_addressSpace_GetWithoutWait(t *testing.T) { + as := NewAddressSpace() + + size := as.GetWithoutWait() + if size != initSize { + t.Errorf("GetWithoutWait returned the wrong size."+ + "\nexpected: %d\nreceived: %d", initSize, size) + } +} + +// Tests that AddressSpace.Update only updates the size when it is larger. +func Test_addressSpace_update(t *testing.T) { + as := NewAddressSpace() + expectedSize := uint8(42) + + // Attempt to Update to larger size + as.Update(expectedSize) + if as.size != expectedSize { + t.Errorf("Update failed to set the new size."+ + "\nexpected: %d\nreceived: %d", expectedSize, as.size) + } + + // Attempt to Update to smaller size + as.Update(expectedSize - 1) + if as.size != expectedSize { + t.Errorf("Update failed to set the new size."+ + "\nexpected: %d\nreceived: %d", expectedSize, as.size) + } +} + +// Tests that AddressSpace.Update sends the new size to all registered channels. +func Test_addressSpace_update_GetAndChannels(t *testing.T) { + as := NewAddressSpace() + var wg sync.WaitGroup + expectedSize := uint8(42) + + // Start threads that are waiting for an Update + wait := []chan uint8{make(chan uint8), make(chan uint8), make(chan uint8)} + for _, waitChan := range wait { + go func(waitChan chan uint8) { waitChan <- as.Get() }(waitChan) + } + + // Wait on threads + for i, waitChan := range wait { + go func(i int, waitChan chan uint8) { + wg.Add(1) + defer wg.Done() + + select { + case size := <-waitChan: + if size != expectedSize { + t.Errorf("Thread %d received unexpected size."+ + "\nexpected: %d\nreceived: %d", i, expectedSize, size) + } + case <-time.NewTimer(15 * time.Millisecond).C: + t.Errorf("Timed out waiting for Get to return on thread %d.", i) + } + }(i, waitChan) + } + + // Register channels + notifyChannels := make(map[string]chan uint8) + var notifyChan chan uint8 + var err error + var chanID string + for i := 0; i < 3; i++ { + chanID = strconv.Itoa(i) + notifyChannels[chanID], err = as.RegisterNotification(chanID) + if err != nil { + t.Errorf("Failed to regisdter channel: %+v", err) + } + } + + // Wait for new size on channels + for chanID, notifyChan := range notifyChannels { + go func(chanID string, notifyChan chan uint8) { + wg.Add(1) + defer wg.Done() + + select { + case size := <-notifyChan: + t.Errorf("Received size %d on channel %s when it should not have.", + size, chanID) + case <-time.NewTimer(15 * time.Millisecond).C: + } + }(chanID, notifyChan) + } + + time.Sleep(5 * time.Millisecond) + + // Attempt to Update to larger size + as.Update(expectedSize) + + wg.Wait() + + // Unregistered one channel and make sure it will not receive + delete(notifyChannels, chanID) + as.UnregisterNotification(chanID) + + expectedSize++ + + // Wait for new size on channels + for chanID, notifyChan := range notifyChannels { + go func(chanID string, notifyChan chan uint8) { + wg.Add(1) + defer wg.Done() + + select { + case size := <-notifyChan: + if size != expectedSize { + t.Errorf("Failed to receive expected size on channel %s."+ + "\nexpected: %d\nreceived: %d", chanID, expectedSize, size) + } + case <-time.NewTimer(15 * time.Millisecond).C: + t.Errorf("Timed out waiting on channel %s", chanID) + } + }(chanID, notifyChan) + } + + // Wait for timeout on unregistered channel + go func() { + wg.Add(1) + defer wg.Done() + + select { + case size := <-notifyChan: + t.Errorf("Received size %d on channel %s when it should not have.", + size, chanID) + case <-time.NewTimer(15 * time.Millisecond).C: + } + }() + + time.Sleep(5 * time.Millisecond) + + // Attempt to Update to larger size + as.Update(expectedSize) + + wg.Wait() +} + +// Tests that a channel created by AddressSpace.RegisterNotification receives +// the expected size when triggered. +func Test_addressSpace_RegisterNotification(t *testing.T) { + as := NewAddressSpace() + expectedSize := uint8(42) + + // Register channel + chanID := "chanID" + sizeChan, err := as.RegisterNotification(chanID) + if err != nil { + t.Errorf("RegisterNotification returned an error: %+v", err) + } + + // Wait on channel or error after timing out + go func() { + select { + case size := <-sizeChan: + if size != expectedSize { + t.Errorf("received wrong size on channel."+ + "\nexpected: %d\nreceived: %d", expectedSize, size) + } + case <-time.NewTimer(10 * time.Millisecond).C: + t.Error("Timed out waiting on channel.") + } + }() + + // Send on channel + select { + case as.notifyMap[chanID] <- expectedSize: + default: + t.Errorf("Sent on channel %s that should not be in map.", chanID) + } +} + +// Tests that when AddressSpace.UnregisterNotification unregisters a channel, +// it no longer can be triggered from the map. +func Test_addressSpace_UnregisterNotification(t *testing.T) { + as := NewAddressSpace() + expectedSize := uint8(42) + + // Register channel and then unregister it + chanID := "chanID" + sizeChan, err := as.RegisterNotification(chanID) + if err != nil { + t.Errorf("RegisterNotification returned an error: %+v", err) + } + as.UnregisterNotification(chanID) + + // Wait for timeout or error if the channel receives + go func() { + select { + case size := <-sizeChan: + t.Errorf("Received %d on channel %s that should not be in map.", + size, chanID) + case <-time.NewTimer(10 * time.Millisecond).C: + } + }() + + // Send on channel + select { + case as.notifyMap[chanID] <- expectedSize: + t.Errorf("Sent size %d on channel %s that should not be in map.", + expectedSize, chanID) + default: + } +} diff --git a/network/ephemeral/testutil.go b/network/ephemeral/testutil.go index d2708b0c71c7d112151e4494d3ef631a78adaf62..c213078eeecaca6178d6f2cd7617caf468da792e 100644 --- a/network/ephemeral/testutil.go +++ b/network/ephemeral/testutil.go @@ -70,7 +70,7 @@ func (t *testNetworkManager) GetHealthTracker() interfaces.HealthTracker { return nil } -func (t *testNetworkManager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppable, error) { +func (t *testNetworkManager) Follow(_ interfaces.ClientErrorReport) (stoppable.Stoppable, error) { return nil, nil } @@ -84,12 +84,19 @@ func (t *testNetworkManager) GetSender() *gateway.Sender { return nil } +func (t *testNetworkManager) GetAddressSize() uint8 { return 15 } +func (t *testNetworkManager) RegisterAddressSizeNotification(string) (chan uint8, error) { + return nil, nil +} + +func (t *testNetworkManager) UnregisterAddressSizeNotification(string) {} + func NewTestNetworkManager(i interface{}) interfaces.NetworkManager { switch i.(type) { case *testing.T, *testing.M, *testing.B: break default: - jww.FATAL.Panicf("initTesting is restricted to testing only."+ + jww.FATAL.Panicf("NewTestNetworkManager is restricted to testing only."+ "Got %T", i) } @@ -97,17 +104,22 @@ func NewTestNetworkManager(i interface{}) interfaces.NetworkManager { cert, err := utils.ReadFile(testkeys.GetNodeCertPath()) if err != nil { - jww.FATAL.Panicf("Failed to create new test Instance: %v", err) + jww.FATAL.Panicf("Failed to create new test Instance: %+v", err) } - commsManager.AddHost(&id.Permissioning, "", cert, connect.GetDefaultHostParams()) + _, err = commsManager.AddHost( + &id.Permissioning, "", cert, connect.GetDefaultHostParams()) + if err != nil { + jww.FATAL.Panicf("Failed to add host: %+v", err) + } instanceComms := &connect.ProtoComms{ Manager: commsManager, } - thisInstance, err := network.NewInstanceTesting(instanceComms, getNDF(), getNDF(), nil, nil, i) + thisInstance, err := network.NewInstanceTesting( + instanceComms, getNDF(), getNDF(), nil, nil, i) if err != nil { - jww.FATAL.Panicf("Failed to create new test Instance: %v", err) + jww.FATAL.Panicf("Failed to create new test Instance: %+v", err) } thisManager := &testNetworkManager{instance: thisInstance} diff --git a/network/ephemeral/tracker.go b/network/ephemeral/tracker.go index 7292f73d8cfa8741c4bac631dd9c43ab3859b53d..da81a326914389233631d15fd19bf9ee65744ed0 100644 --- a/network/ephemeral/tracker.go +++ b/network/ephemeral/tracker.go @@ -22,150 +22,157 @@ import ( const validityGracePeriod = 5 * time.Minute const TimestampKey = "IDTrackingTimestamp" +const TimestampStoreVersion = 0 const ephemeralStoppable = "EphemeralCheck" +const addressSpaceSizeChanTag = "ephemeralTracker" -// Track runs a thread which checks for past and present ephemeral ids -func Track(session *storage.Session, ourId *id.ID) stoppable.Stoppable { +// Track runs a thread which checks for past and present ephemeral ID. +func Track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID) stoppable.Stoppable { stop := stoppable.NewSingle(ephemeralStoppable) - go track(session, ourId, stop) + go track(session, addrSpace, ourId, stop) return stop } -// track is a thread which continuously processes ephemeral ids. -// If any error occurs, the thread crashes -func track(session *storage.Session, ourId *id.ID, stop *stoppable.Single) { +// track is a thread which continuously processes ephemeral IDs. Panics if any +// error occurs. +func track(session *storage.Session, addrSpace *AddressSpace, ourId *id.ID, stop *stoppable.Single) { // Check that there is a timestamp in store at all err := checkTimestampStore(session) if err != nil { - jww.FATAL.Panicf("Could not store timestamp "+ - "for ephemeral ID tracking: %v", err) + jww.FATAL.Panicf("Could not store timestamp for ephemeral ID "+ + "tracking: %+v", err) } // Get the latest timestamp from store lastTimestampObj, err := session.Get(TimestampKey) if err != nil { - jww.FATAL.Panicf("Could not get timestamp: %v", err) + jww.FATAL.Panicf("Could not get timestamp: %+v", err) } lastCheck, err := unmarshalTimestamp(lastTimestampObj) if err != nil { - jww.FATAL.Panicf("Could not parse stored timestamp: %v", err) + jww.FATAL.Panicf("Could not parse stored timestamp: %+v", err) } - // Wait until we get the id size from the network + // Wait until we get the ID size from the network receptionStore := session.Reception() - receptionStore.WaitForIdSizeUpdate() + addressSizeUpdate, err := addrSpace.RegisterNotification(addressSpaceSizeChanTag) + if err != nil { + jww.FATAL.Panicf("failed to register address size notification "+ + "channel: %+v", err) + } + addressSize := addrSpace.Get() - for true { + for { now := netTime.Now() - //hack for inconsistent time on android - if now.Sub(lastCheck) <=0{ + // Hack for inconsistent time on android + if now.Before(lastCheck) || now.Equal(lastCheck) { now = lastCheck.Add(time.Nanosecond) } // Generates the IDs since the last track - protoIds, err := ephemeral.GetIdsByRange(ourId, receptionStore.GetIDSize(), - now, now.Sub(lastCheck)) + protoIds, err := ephemeral.GetIdsByRange( + ourId, uint(addressSize), now, now.Sub(lastCheck)) jww.DEBUG.Printf("Now: %s, LastCheck: %s, Different: %s", now, lastCheck, now.Sub(lastCheck)) - jww.DEBUG.Printf("protoIds Count: %d", len(protoIds)) if err != nil { - jww.FATAL.Panicf("Could not generate "+ - "upcoming IDs: %v", err) + jww.FATAL.Panicf("Could not generate upcoming IDs: %+v", err) } // Generate identities off of that list - identities := generateIdentities(protoIds, ourId) - - jww.INFO.Printf("Number of Identities Generated: %d", - len(identities)) + identities := generateIdentities(protoIds, ourId, addressSize) + jww.INFO.Printf("Number of Identities Generated: %d", len(identities)) jww.INFO.Printf("Current Identity: %d (source: %s), Start: %s, End: %s", - identities[len(identities)-1].EphId.Int64(), identities[len(identities)-1].Source, - identities[len(identities)-1].StartValid, identities[len(identities)-1].EndValid) + identities[len(identities)-1].EphId.Int64(), + identities[len(identities)-1].Source, + identities[len(identities)-1].StartValid, + identities[len(identities)-1].EndValid) - // Add identities to storage if unique + // Add identities to storage, if unique for _, identity := range identities { if err = receptionStore.AddIdentity(identity); err != nil { - jww.FATAL.Panicf("Could not insert "+ - "identity: %v", err) + jww.FATAL.Panicf("Could not insert identity: %+v", err) } } - // Generate the time stamp for storage + // Generate the timestamp for storage vo, err := marshalTimestamp(now) if err != nil { - jww.FATAL.Panicf("Could not marshal "+ - "timestamp for storage: %v", err) + jww.FATAL.Panicf("Could not marshal timestamp for storage: %+v", err) } // Store the timestamp if err = session.Set(TimestampKey, vo); err != nil { - jww.FATAL.Panicf("Could not store timestamp: %v", err) + jww.FATAL.Panicf("Could not store timestamp: %+v", err) } - // Sleep until the last Id has expired + // Sleep until the last ID has expired timeToSleep := calculateTickerTime(protoIds) - t := time.NewTimer(timeToSleep) select { - case <-t.C: + case <-time.NewTimer(timeToSleep).C: + case addressSize = <-addressSizeUpdate: + receptionStore.SetToExpire(addressSize) case <-stop.Quit(): return } } } -// generateIdentities is a constructor which generates a list of -// identities off of the list of protoIdentities passed in -func generateIdentities(protoIds []ephemeral.ProtoIdentity, - ourId *id.ID) []reception.Identity { +// generateIdentities generates a list of identities off of the list of passed +// in ProtoIdentity. +func generateIdentities(protoIds []ephemeral.ProtoIdentity, ourId *id.ID, + addressSize uint8) []reception.Identity { - identities := make([]reception.Identity, 0) + identities := make([]reception.Identity, len(protoIds)) - // Add identities for every ephemeral id - for _, eid := range protoIds { + // Add identities for every ephemeral ID + for i, eid := range protoIds { // Expand the grace period for both start and end eid.End.Add(validityGracePeriod) eid.Start.Add(-validityGracePeriod) - identities = append(identities, reception.Identity{ - EphId: eid.Id, - Source: ourId, - End: eid.End, - StartValid: eid.Start, - EndValid: eid.End, - Ephemeral: false, - }) + identities[i] = reception.Identity{ + EphId: eid.Id, + Source: ourId, + AddressSize: addressSize, + End: eid.End, + StartValid: eid.Start, + EndValid: eid.End, + Ephemeral: false, + } } return identities } -// Sanitation check of timestamp store. If a value has not been stored yet -// then the current time is stored +// checkTimestampStore performs a sanitation check of timestamp store. If a +// value has not been stored yet, then the current time is stored. func checkTimestampStore(session *storage.Session) error { if _, err := session.Get(TimestampKey); err != nil { - // only generate from the last hour because this is a new id, it - // couldn't receive messages yet + // Only generate from the last hour because this is a new ID; it could + // not yet receive messages now, err := marshalTimestamp(netTime.Now().Add(-1 * time.Hour)) if err != nil { - return errors.Errorf("Could not marshal new timestamp for storage: %v", err) + return errors.Errorf("Could not marshal new timestamp for "+ + "storage: %+v", err) } + return session.Set(TimestampKey, now) } return nil } -// Takes the stored timestamp and unmarshal into a time object +// unmarshalTimestamp unmarshal the stored timestamp into a time.Time. func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) { if lastTimestampObj == nil || lastTimestampObj.Data == nil { return netTime.Now(), nil @@ -176,27 +183,29 @@ func unmarshalTimestamp(lastTimestampObj *versioned.Object) (time.Time, error) { return lastTimestamp, err } -// Marshals the timestamp for ekv storage. Generates a storable object +// marshalTimestamp marshals the timestamp and generates a storable object for +// ekv storage. func marshalTimestamp(timeToStore time.Time) (*versioned.Object, error) { data, err := timeToStore.MarshalBinary() return &versioned.Object{ - Version: 0, + Version: TimestampStoreVersion, Timestamp: netTime.Now(), Data: data, }, err } -// Helper function which calculates the time for the ticker based -// off of the last ephemeral ID to expire +// calculateTickerTime calculates the time for the ticker based off of the last +// ephemeral ID to expire. func calculateTickerTime(baseIDs []ephemeral.ProtoIdentity) time.Duration { if len(baseIDs) == 0 { return time.Duration(0) } + // Get the last identity in the list lastIdentity := baseIDs[len(baseIDs)-1] - // Factor out the grace period previously expanded upon. + // Factor out the grace period previously expanded upon // Calculate and return that duration gracePeriod := lastIdentity.End.Add(-validityGracePeriod) return lastIdentity.End.Sub(gracePeriod) diff --git a/network/ephemeral/tracker_test.go b/network/ephemeral/tracker_test.go index e6c3e11f8fd3b8a9e120f3634cac636b385f66c6..3b9e03b29db0b470eebb6a56e490d5f98ac0385b 100644 --- a/network/ephemeral/tracker_test.go +++ b/network/ephemeral/tracker_test.go @@ -28,35 +28,34 @@ func TestCheck(t *testing.T) { session := storage.InitTestingSession(t) instance := NewTestNetworkManager(t) if err := setupInstance(instance); err != nil { - t.Errorf("Could not set up instance: %v", err) + t.Errorf("Could not set up instance: %+v", err) } - /// Store a mock initial timestamp the store + // Store a mock initial timestamp the store now := netTime.Now() twoDaysAgo := now.Add(-2 * 24 * time.Hour) twoDaysTimestamp, err := marshalTimestamp(twoDaysAgo) if err != nil { - t.Errorf("Could not marshal timestamp for test setup: %v", err) + t.Errorf("Could not marshal timestamp for test setup: %+v", err) } + err = session.Set(TimestampKey, twoDaysTimestamp) if err != nil { - t.Errorf("Could not set mock timestamp for test setup: %v", err) + t.Errorf("Could not set mock timestamp for test setup: %+v", err) } ourId := id.NewIdFromBytes([]byte("Sauron"), t) - stop := Track(session, ourId) - session.Reception().MarkIdSizeAsSet() + stop := Track(session, NewTestAddressSpace(15, t), ourId) err = stop.Close(3 * time.Second) if err != nil { - t.Errorf("Could not close thread: %v", err) + t.Errorf("Could not close thread: %+v", err) } } -// Unit test for track +// Unit test for track. func TestCheck_Thread(t *testing.T) { - session := storage.InitTestingSession(t) instance := NewTestNetworkManager(t) if err := setupInstance(instance); err != nil { @@ -65,26 +64,25 @@ func TestCheck_Thread(t *testing.T) { ourId := id.NewIdFromBytes([]byte("Sauron"), t) stop := stoppable.NewSingle(ephemeralStoppable) - /// Store a mock initial timestamp the store + // Store a mock initial timestamp the store now := netTime.Now() yesterday := now.Add(-24 * time.Hour) yesterdayTimestamp, err := marshalTimestamp(yesterday) if err != nil { - t.Errorf("Could not marshal timestamp for test setup: %v", err) + t.Errorf("Could not marshal timestamp for test setup: %+v", err) } + err = session.Set(TimestampKey, yesterdayTimestamp) if err != nil { - t.Errorf("Could not set mock timestamp for test setup: %v", err) + t.Errorf("Could not set mock timestamp for test setup: %+v", err) } // Run the tracker go func() { - track(session, ourId, stop) + track(session, NewTestAddressSpace(15, t), ourId, stop) }() time.Sleep(3 * time.Second) - session.Reception().MarkIdSizeAsSet() - err = stop.Close(3 * time.Second) if err != nil { t.Errorf("Could not close thread: %v", err) @@ -95,7 +93,7 @@ func TestCheck_Thread(t *testing.T) { func setupInstance(instance interfaces.NetworkManager) error { cert, err := utils.ReadFile(testkeys.GetNodeKeyPath()) if err != nil { - return errors.Errorf("Failed to read cert from from file: %v", err) + return errors.Errorf("Failed to read cert from from file: %+v", err) } ri := &mixmessages.RoundInfo{ ID: 1, @@ -103,20 +101,20 @@ func setupInstance(instance interfaces.NetworkManager) error { testCert, err := rsa.LoadPrivateKeyFromPem(cert) if err != nil { - return errors.Errorf("Failed to load cert from from file: %v", err) + return errors.Errorf("Failed to load cert from from file: %+v", err) } if err = signature.SignRsa(ri, testCert); err != nil { - return errors.Errorf("Failed to sign round info: %v", err) + return errors.Errorf("Failed to sign round info: %+v", err) } if err = instance.GetInstance().RoundUpdate(ri); err != nil { - return errors.Errorf("Failed to RoundUpdate from from file: %v", err) + return errors.Errorf("Failed to RoundUpdate from from file: %+v", err) } ri = &mixmessages.RoundInfo{ ID: 2, } if err = signature.SignRsa(ri, testCert); err != nil { - return errors.Errorf("Failed to sign round info: %v", err) + return errors.Errorf("Failed to sign round info: %+v", err) } if err = instance.GetInstance().RoundUpdate(ri); err != nil { return errors.Errorf("Failed to RoundUpdate from from file: %v", err) diff --git a/network/follow.go b/network/follow.go index 8fedb69e085bd699bcb55414a0c95e0a6e20eb34..05d8134600d2370e647363e56624005d39fc472a 100644 --- a/network/follow.go +++ b/network/follow.go @@ -64,20 +64,20 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch m.follow(report, rng, m.Comms, isRunning) case <-TrackTicker.C: numPolls := atomic.SwapUint64(m.tracker, 0) - if m.numLatencies !=0{ - latencyAvg := time.Nanosecond*time.Duration(m.latencySum/m.numLatencies) + if m.numLatencies != 0 { + latencyAvg := time.Nanosecond * time.Duration(m.latencySum/m.numLatencies) m.latencySum, m.numLatencies = 0, 0 jww.INFO.Printf("Polled the network %d times in the "+ "last %s, with an average newest packet latency of %s", numPolls, debugTrackPeriod, latencyAvg) - }else{ + } else { jww.INFO.Printf("Polled the network %d times in the "+ "last %s", numPolls, debugTrackPeriod) } } - if !isRunning.IsRunning(){ + if !isRunning.IsRunning() { jww.ERROR.Printf("Killing network follower " + "due to failed exit") return @@ -89,9 +89,8 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, comms followNetworkComms, isRunning interfaces.Running) { - - //get the identity we will poll for - identity, err := m.Session.Reception().GetIdentity(rng) + //Get the identity we will poll for + identity, err := m.Session.Reception().GetIdentity(rng, m.addrSpace.GetWithoutWait()) if err != nil { jww.FATAL.Panicf("Failed to get an identity, this should be "+ "impossible: %+v", err) @@ -121,7 +120,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, identity.EndRequest, identity.EndRequest.Sub(identity.StartRequest), host.GetId()) return comms.SendPoll(host, &pollReq) }) - if !isRunning.IsRunning(){ + if !isRunning.IsRunning() { jww.ERROR.Printf("Killing network follower " + "due to failed exit") return @@ -165,10 +164,9 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, m.GetSender().UpdateNdf(m.GetInstance().GetPartialNdf().Get()) } - //check that the stored address space is correct - m.Session.Reception().UpdateIdSize(uint(m.Instance.GetPartialNdf().Get().AddressSpaceSize)) - // Updates any id size readers of a network compliant id size - m.Session.Reception().MarkIdSizeAsSet() + // Update the address space size + m.addrSpace.Update(m.Instance.GetPartialNdf().Get().AddressSpace[0].Size) + // NOTE: this updates rounds and updates the tracking of the health of the // network if pollResp.Updates != nil { @@ -224,17 +222,16 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, } } - newestTS := uint64(0) - for i:=0;i<len(pollResp.Updates[len(pollResp.Updates)-1].Timestamps);i++{ - if pollResp.Updates[len(pollResp.Updates)-1].Timestamps[i]!=0{ + for i := 0; i < len(pollResp.Updates[len(pollResp.Updates)-1].Timestamps); i++ { + if pollResp.Updates[len(pollResp.Updates)-1].Timestamps[i] != 0 { newestTS = pollResp.Updates[len(pollResp.Updates)-1].Timestamps[i] } } - newest := time.Unix(0,int64(newestTS)) + newest := time.Unix(0, int64(newestTS)) - if newest.After(now){ + if newest.After(now) { deltaDur := newest.Sub(now) m.latencySum = uint64(deltaDur) m.numLatencies++ diff --git a/network/manager.go b/network/manager.go index cf443e9912d43ffc14bdd73ddc09064d9ec3f51c..c072da4eb05bd74f60fd95dfdcfcdd7bd47e7428 100644 --- a/network/manager.go +++ b/network/manager.go @@ -47,9 +47,12 @@ type manager struct { message *message.Manager //number of polls done in a period of time - tracker *uint64 - latencySum uint64 + tracker *uint64 + latencySum uint64 numLatencies uint64 + + // Address space size + addrSpace *ephemeral.AddressSpace } // NewManager builds a new reception manager object using inputted key fields @@ -71,10 +74,11 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, tracker := uint64(0) - //create manager object + // create manager object m := manager{ - param: params, - tracker: &tracker, + param: params, + tracker: &tracker, + addrSpace: ephemeral.NewAddressSpace(), } m.Internal = internal.Internal{ @@ -141,7 +145,7 @@ func (m *manager) Follow(report interfaces.ClientErrorReport) (stoppable.Stoppab // Round processing multi.Add(m.round.StartProcessors()) - multi.Add(ephemeral.Track(m.Session, m.ReceptionID)) + multi.Add(ephemeral.Track(m.Session, m.addrSpace, m.ReceptionID)) return multi, nil } @@ -173,3 +177,22 @@ func (m *manager) CheckGarbledMessages() { func (m *manager) InProgressRegistrations() int { return len(m.Internal.NodeRegistration) } + +// GetAddressSize returns the current address space size. It blocks until an +// address space size is set. +func (m *manager) GetAddressSize() uint8 { + return m.addrSpace.Get() +} + +// RegisterAddressSizeNotification returns a channel that will trigger for every +// address space size update. The provided tag is the unique ID for the channel. +// Returns an error if the tag is already used. +func (m *manager) RegisterAddressSizeNotification(tag string) (chan uint8, error) { + return m.addrSpace.RegisterNotification(tag) +} + +// UnregisterAddressSizeNotification stops broadcasting address space size +// updates on the channel with the specified tag. +func (m *manager) UnregisterAddressSizeNotification(tag string) { + m.addrSpace.UnregisterNotification(tag) +} diff --git a/single/manager_test.go b/single/manager_test.go index 2ce788f7b86ba44fd61a2aaa68814d1d62a49a02..a4384b166322df3206937ff58d56187aed0a6f52 100644 --- a/single/manager_test.go +++ b/single/manager_test.go @@ -324,10 +324,18 @@ func (tnm *testNetworkManager) InProgressRegistrations() int { return 0 } -func (t *testNetworkManager) GetSender() *gateway.Sender { +func (tnm *testNetworkManager) GetSender() *gateway.Sender { return nil } +func (tnm *testNetworkManager) GetAddressSize() uint8 { return 16 } + +func (tnm *testNetworkManager) RegisterAddressSizeNotification(string) (chan uint8, error) { + return nil, nil +} + +func (tnm *testNetworkManager) UnregisterAddressSizeNotification(string) {} + func getNDF() *ndf.NetworkDefinition { return &ndf.NetworkDefinition{ E2E: ndf.Group{ diff --git a/single/transmission.go b/single/transmission.go index d5289f339b1120c87256228d87fd7b7514c0a9d7..bbe41371d7252b29bf78b87bed37088af95c733c 100644 --- a/single/transmission.go +++ b/single/transmission.go @@ -64,12 +64,9 @@ type roundEvents interface { func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, tag string, MaxMsgs uint8, rng io.Reader, callback ReplyComm, timeout time.Duration, roundEvents roundEvents) error { - // Get ephemeral ID address size; this will block until the client knows the - // address size if it is currently unknown - if m.store.Reception().IsIdSizeDefault() { - m.store.Reception().WaitForIdSizeUpdate() - } - addressSize := m.store.Reception().GetIDSize() + // Get ephemeral ID address space size; this blocks until the address space + // size is set for the first time + addressSize := m.net.GetAddressSize() // Create new CMIX message containing the transmission payload cmixMsg, dhKey, rid, ephID, err := m.makeTransmitCmixMessage(partner, @@ -93,6 +90,7 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, err = m.reception.AddIdentity(reception.Identity{ EphId: ephID, Source: rid, + AddressSize: addressSize, End: timeStart.Add(2 * timeout), ExtraChecks: 10, StartValid: timeStart.Add(-2 * timeout), @@ -175,7 +173,7 @@ func (m *Manager) transmitSingleUse(partner contact2.Contact, payload []byte, // makeTransmitCmixMessage generates a CMIX message containing the transmission message, // which contains the encrypted payload. func (m *Manager) makeTransmitCmixMessage(partner contact2.Contact, - payload []byte, tag string, maxMsgs uint8, addressSize uint, + payload []byte, tag string, maxMsgs uint8, addressSize uint8, timeout time.Duration, timeNow time.Time, rng io.Reader) (format.Message, *cyclic.Int, *id.ID, ephemeral.Id, error) { e2eGrp := m.store.E2e().GetGroup() @@ -255,8 +253,9 @@ func generateDhKeys(grp *cyclic.Group, dhPubKey *cyclic.Int, // contains a nonce. If the generated ephemeral ID has a window that is not // within +/- the given 2*timeout from now, then the IDs are generated again // using a new nonce. -func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int, addressSize uint, - timeout time.Duration, timeNow time.Time, rng io.Reader) (*id.ID, ephemeral.Id, error) { +func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int, + addressSize uint8, timeout time.Duration, timeNow time.Time, + rng io.Reader) (*id.ID, ephemeral.Id, error) { var rid *id.ID var ephID ephemeral.Id @@ -277,7 +276,7 @@ func makeIDs(msg *transmitMessagePayload, publicKey *cyclic.Int, addressSize uin rid = msg.GetRID(publicKey) // Generate the ephemeral ID - ephID, start, end, err = ephemeral.GetId(rid, addressSize, timeNow.UnixNano()) + ephID, start, end, err = ephemeral.GetId(rid, uint(addressSize), timeNow.UnixNano()) if err != nil { return nil, ephemeral.Id{}, errors.Errorf("failed to generate "+ "ephemeral ID from newly generated ID: %+v", err) diff --git a/single/transmission_test.go b/single/transmission_test.go index 367a8b3bfa4bd07df0e266045281624934ea3ee3..761f65d9fc5e757aa84710cf7f3ef476f0ae91ac 100644 --- a/single/transmission_test.go +++ b/single/transmission_test.go @@ -366,7 +366,7 @@ func Test_makeIDs_Consistency(t *testing.T) { if err != nil { t.Fatalf("Failed to generate public key: %+v", err) } - addressSize := uint(32) + addressSize := uint8(32) expectedPayload, err := unmarshalTransmitMessagePayload(msgPayload.Marshal()) if err != nil { @@ -397,7 +397,7 @@ func Test_makeIDs_Consistency(t *testing.T) { } expectedEphID, _, _, err := ephemeral.GetId(expectedPayload.GetRID(publicKey), - addressSize, timeNow.UnixNano()) + uint(addressSize), timeNow.UnixNano()) if err != nil { t.Fatalf("Failed to generate expected ephemeral ID: %+v", err) } diff --git a/storage/reception/IdentityUse.go b/storage/reception/IdentityUse.go index 2c8b9df3b64601651d7489e96935ff1a3d360db9..08f1a070aadc0ac5271156516e860371c0ca7daa 100644 --- a/storage/reception/IdentityUse.go +++ b/storage/reception/IdentityUse.go @@ -1,12 +1,15 @@ package reception import ( + "fmt" "github.com/pkg/errors" "gitlab.com/elixxir/client/storage/rounds" "gitlab.com/elixxir/crypto/hash" "gitlab.com/xx_network/crypto/randomness" "io" "math/big" + "strconv" + "strings" "time" ) @@ -48,3 +51,17 @@ func (iu IdentityUse) setSamplingPeriod(rng io.Reader) (IdentityUse, error) { iu.EndRequest = iu.EndValid.Add(iu.RequestMask - time.Duration(periodOffset)) return iu, nil } + +func (iu IdentityUse) GoString() string { + str := make([]string, 0, 7) + + str = append(str, "Identity:"+iu.Identity.GoString()) + str = append(str, "StartRequest:"+iu.StartRequest.String()) + str = append(str, "EndRequest:"+iu.EndRequest.String()) + str = append(str, "Fake:"+strconv.FormatBool(iu.Fake)) + str = append(str, "UR:"+fmt.Sprintf("%+v", iu.UR)) + str = append(str, "ER:"+fmt.Sprintf("%+v", iu.ER)) + str = append(str, "CR:"+fmt.Sprintf("%+v", iu.CR)) + + return "{" + strings.Join(str, ", ") + "}" +} diff --git a/storage/reception/fake.go b/storage/reception/fake.go index 38cb63a241cc25122b98ff8b5a1762da0f3994a5..3e6ba00fc711c097b7b765027962e60dc81d98bb 100644 --- a/storage/reception/fake.go +++ b/storage/reception/fake.go @@ -10,7 +10,8 @@ import ( // generateFakeIdentity generates a fake identity of the given size with the // given random number generator -func generateFakeIdentity(rng io.Reader, idSize uint, now time.Time) (IdentityUse, error) { +func generateFakeIdentity(rng io.Reader, addressSize uint8, + now time.Time) (IdentityUse, error) { // Randomly generate an identity randIdBytes := make([]byte, id.ArrIDLen-1) if _, err := rng.Read(randIdBytes); err != nil { @@ -23,7 +24,8 @@ func generateFakeIdentity(rng io.Reader, idSize uint, now time.Time) (IdentityUs randID.SetType(id.User) // Generate the current ephemeral ID from the random identity - ephID, start, end, err := ephemeral.GetId(randID, idSize, now.UnixNano()) + ephID, start, end, err := ephemeral.GetId( + randID, uint(addressSize), now.UnixNano()) if err != nil { return IdentityUse{}, errors.WithMessage(err, "failed to generate an "+ "ephemeral ID for random identity when none is available") @@ -33,6 +35,7 @@ func generateFakeIdentity(rng io.Reader, idSize uint, now time.Time) (IdentityUs Identity: Identity{ EphId: ephID, Source: randID, + AddressSize: addressSize, End: end, ExtraChecks: 0, StartValid: start, diff --git a/storage/reception/fake_test.go b/storage/reception/fake_test.go index 2c748ac2258e0950e8901a2748c0057c498770ef..0cb52480ac8eb3b713f9334eefa1dad9b4351dc3 100644 --- a/storage/reception/fake_test.go +++ b/storage/reception/fake_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "math" "math/rand" + "strconv" "strings" "testing" "time" @@ -13,12 +14,14 @@ import ( func Test_generateFakeIdentity(t *testing.T) { rng := rand.New(rand.NewSource(42)) + addressSize := uint8(15) end, _ := json.Marshal(time.Unix(0, 1258494203759765625)) startValid, _ := json.Marshal(time.Unix(0, 1258407803759765625)) endValid, _ := json.Marshal(time.Unix(0, 1258494203759765625)) expected := "{\"EphId\":[0,0,0,0,0,0,46,197]," + "\"Source\":[83,140,127,150,177,100,191,27,151,187,159,75,180,114," + "232,159,91,20,132,242,82,9,201,217,52,62,146,186,9,221,157,82,3]," + + "\"AddressSize\":" + strconv.Itoa(int(addressSize)) + "," + "\"End\":" + string(end) + ",\"ExtraChecks\":0," + "\"StartValid\":" + string(startValid) + "," + "\"EndValid\":" + string(endValid) + "," + @@ -28,7 +31,7 @@ func Test_generateFakeIdentity(t *testing.T) { timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) - received, err := generateFakeIdentity(rng, 15, timestamp) + received, err := generateFakeIdentity(rng, addressSize, timestamp) if err != nil { t.Errorf("generateFakeIdentity() returned an error: %+v", err) } @@ -58,7 +61,7 @@ func Test_generateFakeIdentity_GetEphemeralIdError(t *testing.T) { rng := rand.New(rand.NewSource(42)) timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.UTC) - _, err := generateFakeIdentity(rng, math.MaxUint64, timestamp) + _, err := generateFakeIdentity(rng, math.MaxInt8, timestamp) if err == nil || !strings.Contains(err.Error(), "ephemeral ID") { t.Errorf("generateFakeIdentity() did not return the correct error on "+ "failure to generate ephemeral ID: %+v", err) diff --git a/storage/reception/identity.go b/storage/reception/identity.go index 4c65d9696ba4a6d0fd567395a4d86e8bd9145b45..5d0721e5c0830edbb01b6e95ea8a6f63ad0bf638 100644 --- a/storage/reception/identity.go +++ b/storage/reception/identity.go @@ -8,6 +8,7 @@ import ( "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/netTime" "strconv" + "strings" "time" ) @@ -16,8 +17,9 @@ const identityStorageVersion = 0 type Identity struct { // Identity - EphId ephemeral.Id - Source *id.ID + EphId ephemeral.Id + Source *id.ID + AddressSize uint8 // Usage variables End time.Time // Timestamp when active polling will stop @@ -76,13 +78,30 @@ func (i Identity) delete(kv *versioned.KV) error { return kv.Delete(identityStorageKey, identityStorageVersion) } -func (i *Identity) String() string { +func (i Identity) String() string { return strconv.FormatInt(i.EphId.Int64(), 16) + " " + i.Source.String() } +func (i Identity) GoString() string { + str := make([]string, 0, 9) + + str = append(str, "EphId:"+strconv.FormatInt(i.EphId.Int64(), 16)) + str = append(str, "Source:"+i.Source.String()) + str = append(str, "AddressSize:"+strconv.FormatUint(uint64(i.AddressSize), 10)) + str = append(str, "End:"+i.End.String()) + str = append(str, "ExtraChecks:"+strconv.FormatUint(uint64(i.ExtraChecks), 10)) + str = append(str, "StartValid:"+i.StartValid.String()) + str = append(str, "EndValid:"+i.EndValid.String()) + str = append(str, "RequestMask:"+i.RequestMask.String()) + str = append(str, "Ephemeral:"+strconv.FormatBool(i.Ephemeral)) + + return "{" + strings.Join(str, ", ") + "}" +} + func (i Identity) Equal(b Identity) bool { return i.EphId == b.EphId && i.Source.Cmp(b.Source) && + i.AddressSize == b.AddressSize && i.End.Equal(b.End) && i.ExtraChecks == b.ExtraChecks && i.StartValid.Equal(b.StartValid) && diff --git a/storage/reception/identity_test.go b/storage/reception/identity_test.go index d80b9501486c94045209da5c99dc0b77a07306f8..1fada0d707c848b5d2ea9f66398b4c1cc2c5a0c7 100644 --- a/storage/reception/identity_test.go +++ b/storage/reception/identity_test.go @@ -16,6 +16,7 @@ func TestIdentity_EncodeDecode(t *testing.T) { r := Identity{ EphId: ephemeral.Id{}, Source: &id.Permissioning, + AddressSize: 15, End: netTime.Now().Round(0), ExtraChecks: 12, StartValid: netTime.Now().Round(0), @@ -45,6 +46,7 @@ func TestIdentity_Delete(t *testing.T) { r := Identity{ EphId: ephemeral.Id{}, Source: &id.Permissioning, + AddressSize: 15, End: netTime.Now().Round(0), ExtraChecks: 12, StartValid: netTime.Now().Round(0), @@ -90,11 +92,11 @@ func TestIdentity_Equal(t *testing.T) { if !a.Identity.Equal(b.Identity) { t.Errorf("Equal() found two equal identities as unequal."+ - "\na: %s\nb: %s", a.String(), b.String()) + "\na: %s\nb: %s", a, b) } if a.Identity.Equal(c.Identity) { t.Errorf("Equal() found two unequal identities as equal."+ - "\na: %s\nc: %s", a.String(), c.String()) + "\na: %s\nc: %s", a, c) } } diff --git a/storage/reception/store.go b/storage/reception/store.go index 08498aa5c0aa856727fe60c989419f2aeb18616a..b6bdf41f7db520e5e9d88880a1246ced9dc2b3b0 100644 --- a/storage/reception/store.go +++ b/storage/reception/store.go @@ -1,7 +1,6 @@ package reception import ( - "bytes" "encoding/json" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" @@ -12,7 +11,6 @@ import ( "gitlab.com/xx_network/primitives/netTime" "golang.org/x/crypto/blake2b" "io" - "strconv" "sync" "time" ) @@ -20,17 +18,11 @@ import ( const receptionPrefix = "reception" const receptionStoreStorageKey = "receptionStoreKey" const receptionStoreStorageVersion = 0 -const receptionIDSizeStorageKey = "receptionIDSizeKey" -const receptionIDSizeStorageVersion = 0 -const defaultIDSize = 12 type Store struct { // Identities which are being actively checked - active []*registration - present map[idHash]interface{} - idSize int - idSizeCond *sync.Cond - isIdSizeSet bool + active []*registration + present map[idHash]struct{} kv *versioned.KV @@ -56,13 +48,10 @@ func makeIdHash(ephID ephemeral.Id, source *id.ID) idHash { // NewStore creates a new reception store that starts empty. func NewStore(kv *versioned.KV) *Store { - kv = kv.Prefix(receptionPrefix) s := &Store{ - active: make([]*registration, 0), - present: make(map[idHash]interface{}), - idSize: defaultIDSize * 2, - kv: kv, - idSizeCond: sync.NewCond(&sync.Mutex{}), + active: []*registration{}, + present: make(map[idHash]struct{}), + kv: kv.Prefix(receptionPrefix), } // Store the empty list @@ -70,53 +59,37 @@ func NewStore(kv *versioned.KV) *Store { jww.FATAL.Panicf("Failed to save new reception store: %+v", err) } - // Update the size so queries can be made - s.UpdateIdSize(defaultIDSize) - return s } func LoadStore(kv *versioned.KV) *Store { kv = kv.Prefix(receptionPrefix) - s := &Store{ - kv: kv, - present: make(map[idHash]interface{}), - idSizeCond: sync.NewCond(&sync.Mutex{}), - } // Load the versioned object for the reception list - vo, err := kv.Get(receptionStoreStorageKey, - receptionStoreStorageVersion) + vo, err := kv.Get(receptionStoreStorageKey, receptionStoreStorageVersion) if err != nil { jww.FATAL.Panicf("Failed to get the reception storage list: %+v", err) } - identities := make([]storedReference, len(s.active)) - err = json.Unmarshal(vo.Data, &identities) - if err != nil { - jww.FATAL.Panicf("Failed to unmarshal the reception storage list: %+v", err) + // JSON unmarshal identities list + var identities []storedReference + if err = json.Unmarshal(vo.Data, &identities); err != nil { + jww.FATAL.Panicf("Failed to unmarshal the stored identity list: %+v", err) + } + + s := &Store{ + active: make([]*registration, len(identities)), + present: make(map[idHash]struct{}, len(identities)), + kv: kv, } - s.active = make([]*registration, len(identities)) for i, sr := range identities { s.active[i], err = loadRegistration(sr.Eph, sr.Source, sr.StartValid, s.kv) if err != nil { jww.FATAL.Panicf("Failed to load registration for %s: %+v", regPrefix(sr.Eph, sr.Source, sr.StartValid), err) } - s.present[makeIdHash(sr.Eph, sr.Source)] = nil - } - - // Load the ephemeral ID length - vo, err = kv.Get(receptionIDSizeStorageKey, - receptionIDSizeStorageVersion) - if err != nil { - jww.FATAL.Panicf("Failed to get the reception ID size: %+v", err) - } - - if s.idSize, err = strconv.Atoi(string(vo.Data)); err != nil { - jww.FATAL.Panicf("Failed to unmarshal the reception ID size: %+v", - err) + s.present[makeIdHash(sr.Eph, sr.Source)] = struct{}{} } return s @@ -124,7 +97,6 @@ func LoadStore(kv *versioned.KV) *Store { func (s *Store) save() error { identities := s.makeStoredReferences() - data, err := json.Marshal(&identities) if err != nil { return errors.WithMessage(err, "failed to store reception store") @@ -165,7 +137,7 @@ func (s *Store) makeStoredReferences() []storedReference { return identities[:i] } -func (s *Store) GetIdentity(rng io.Reader) (IdentityUse, error) { +func (s *Store) GetIdentity(rng io.Reader, addressSize uint8) (IdentityUse, error) { s.mux.Lock() defer s.mux.Unlock() @@ -181,7 +153,7 @@ func (s *Store) GetIdentity(rng io.Reader) (IdentityUse, error) { // poll with so we can continue tracking the network and to further // obfuscate network identities. if len(s.active) == 0 { - identity, err = generateFakeIdentity(rng, uint(s.idSize), now) + identity, err = generateFakeIdentity(rng, addressSize, now) if err != nil { jww.FATAL.Panicf("Failed to generate a new ID when none "+ "available: %+v", err) @@ -196,20 +168,18 @@ func (s *Store) GetIdentity(rng io.Reader) (IdentityUse, error) { // Calculate the sampling period identity, err = identity.setSamplingPeriod(rng) if err != nil { - jww.FATAL.Panicf("Failed to calculate the sampling period: "+ - "%+v", err) + jww.FATAL.Panicf("Failed to calculate the sampling period: %+v", err) } return identity, nil } func (s *Store) AddIdentity(identity Identity) error { - idH := makeIdHash(identity.EphId, identity.Source) s.mux.Lock() defer s.mux.Unlock() - //do not make duplicates of IDs + // Do not make duplicates of IDs if _, ok := s.present[idH]; ok { jww.DEBUG.Printf("Ignoring duplicate identity for %d (%s)", identity.EphId, identity.Source) @@ -218,7 +188,7 @@ func (s *Store) AddIdentity(identity Identity) error { if identity.StartValid.After(identity.EndValid) { return errors.Errorf("Cannot add an identity which start valid "+ - "time (%s) is after its end valid time(%s)", identity.StartValid, + "time (%s) is after its end valid time (%s)", identity.StartValid, identity.EndValid) } @@ -229,11 +199,11 @@ func (s *Store) AddIdentity(identity Identity) error { } s.active = append(s.active, reg) - s.present[idH] = nil + s.present[idH] = struct{}{} if !identity.Ephemeral { if err := s.save(); err != nil { - jww.FATAL.Panicf("Failed to save reception store after identity " + - "addition") + jww.FATAL.Panicf("Failed to save reception store after identity "+ + "addition: %+v", err) } } @@ -244,86 +214,44 @@ func (s *Store) RemoveIdentity(ephID ephemeral.Id) { s.mux.Lock() defer s.mux.Unlock() - for i := 0; i < len(s.active); i++ { - inQuestion := s.active[i] - if bytes.Equal(inQuestion.EphId[:], ephID[:]) { + for i, inQuestion := range s.active { + if inQuestion.EphId == ephID { s.active = append(s.active[:i], s.active[i+1:]...) + err := inQuestion.Delete() if err != nil { jww.FATAL.Panicf("Failed to delete identity: %+v", err) } + if !inQuestion.Ephemeral { if err := s.save(); err != nil { - jww.FATAL.Panicf("Failed to save reception store after " + - "identity removal") + jww.FATAL.Panicf("Failed to save reception store after "+ + "identity removal: %+v", err) } } + return } } } -// Returns whether idSize is set to default -func (s *Store) IsIdSizeDefault() bool { - s.mux.Lock() - defer s.mux.Unlock() - return s.isIdSizeSet -} - -// Updates idSize boolean and broadcasts to any waiting -// idSize readers that id size is now updated with the network -func (s *Store) MarkIdSizeAsSet() { - s.mux.Lock() - s.idSizeCond.L.Lock() - defer s.mux.Unlock() - defer s.idSizeCond.L.Unlock() - s.isIdSizeSet = true - s.idSizeCond.Broadcast() -} - -// Wrapper function which calls a -// sync.Cond wait. Used on any reader of idSize -// who cannot use the default id size -func (s *Store) WaitForIdSizeUpdate() { - s.idSizeCond.L.Lock() - defer s.idSizeCond.L.Unlock() - for !s.IsIdSizeDefault() { - - s.idSizeCond.Wait() - } -} - -func (s *Store) UpdateIdSize(idSize uint) { +func (s *Store) SetToExpire(addressSize uint8) { s.mux.Lock() defer s.mux.Unlock() - if s.idSize == int(idSize) { - return - } - jww.INFO.Printf("Updating address space size to %v", idSize) - - s.idSize = int(idSize) - - // Store the ID size - obj := &versioned.Object{ - Version: receptionIDSizeStorageVersion, - Timestamp: netTime.Now(), - Data: []byte(strconv.Itoa(s.idSize)), - } + expire := netTime.Now().Add(5 * time.Minute) - err := s.kv.Set(receptionIDSizeStorageKey, - receptionIDSizeStorageVersion, obj) - if err != nil { - jww.FATAL.Panicf("Failed to store reception ID size: %+v", err) + for i, active := range s.active { + if active.AddressSize < addressSize && active.EndValid.After(expire) { + s.active[i].EndValid = expire + err := s.active[i].store(s.kv) + if err != nil { + jww.ERROR.Printf("Failed to store identity %d: %+v", i, err) + } + } } } -func (s *Store) GetIDSize() uint { - s.mux.Lock() - defer s.mux.Unlock() - return uint(s.idSize) -} - func (s *Store) prune(now time.Time) { lengthBefore := len(s.active) @@ -332,8 +260,8 @@ func (s *Store) prune(now time.Time) { inQuestion := s.active[i] if now.After(inQuestion.End) && inQuestion.ExtraChecks == 0 { if err := inQuestion.Delete(); err != nil { - jww.ERROR.Printf("Failed to delete Identity for %s: "+ - "%+v", inQuestion, err) + jww.ERROR.Printf("Failed to delete Identity for %s: %+v", + inQuestion, err) } s.active = append(s.active[:i], s.active[i+1:]...) @@ -346,7 +274,7 @@ func (s *Store) prune(now time.Time) { if lengthBefore != len(s.active) { jww.INFO.Printf("Pruned %d identities", lengthBefore-len(s.active)) if err := s.save(); err != nil { - jww.FATAL.Panicf("Failed to store reception storage") + jww.FATAL.Panicf("Failed to store reception storage: %+v", err) } } } @@ -360,11 +288,15 @@ func (s *Store) selectIdentity(rng io.Reader, now time.Time) (IdentityUse, error } else { seed := make([]byte, 32) if _, err := rng.Read(seed); err != nil { - return IdentityUse{}, errors.WithMessage(err, "Failed to "+ - "choose ID due to rng failure") + return IdentityUse{}, errors.WithMessage(err, "Failed to choose "+ + "ID due to RNG failure") } - selectedNum := large.NewInt(1).Mod(large.NewIntFromBytes(seed), large.NewInt(int64(len(s.active)))) + selectedNum := large.NewInt(1).Mod( + large.NewIntFromBytes(seed), + large.NewInt(int64(len(s.active))), + ) + selected = s.active[selectedNum.Uint64()] } @@ -372,9 +304,12 @@ func (s *Store) selectIdentity(rng io.Reader, now time.Time) (IdentityUse, error selected.ExtraChecks-- } - jww.TRACE.Printf("Selected identity: EphId: %d ID: %s End: %s StartValid: %s EndValid: %s", - selected.EphId.Int64(), selected.Source, selected.End.Format("01/02/06 03:04:05 pm"), - selected.StartValid.Format("01/02/06 03:04:05 pm"), selected.EndValid.Format("01/02/06 03:04:05 pm")) + jww.TRACE.Printf("Selected identity: EphId: %d ID: %s End: %s "+ + "StartValid: %s EndValid: %s", + selected.EphId.Int64(), selected.Source, + selected.End.Format("01/02/06 03:04:05 pm"), + selected.StartValid.Format("01/02/06 03:04:05 pm"), + selected.EndValid.Format("01/02/06 03:04:05 pm")) return IdentityUse{ Identity: selected.Identity, diff --git a/storage/reception/store_test.go b/storage/reception/store_test.go index 8779e8f54cedacd24ae9e4c892fb3b19c279b814..b969c9680a0f4e0885808df8b9144db6ca82d819 100644 --- a/storage/reception/store_test.go +++ b/storage/reception/store_test.go @@ -16,13 +16,12 @@ func TestNewStore(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) expected := &Store{ active: make([]*registration, 0), - idSize: defaultIDSize, kv: kv, } s := NewStore(kv) - if !reflect.DeepEqual([]*registration{}, s.active) || s.idSize != defaultIDSize { + if !reflect.DeepEqual([]*registration{}, s.active) { t.Errorf("NewStore() failed to return the expected Store."+ "\nexpected: %+v\nreceived: %+v", expected, s) } @@ -154,14 +153,14 @@ func TestStore_GetIdentity(t *testing.T) { t.Errorf("AddIdentity() produced an error: %+v", err) } - idu, err := s.GetIdentity(prng) + idu, err := s.GetIdentity(prng, 15) if err != nil { t.Errorf("GetIdentity() produced an error: %+v", err) } if !testID.Equal(idu.Identity) { t.Errorf("GetIdentity() did not return the expected Identity."+ - "\nexpected: %s\nreceived: %s", testID.String(), idu.String()) + "\nexpected: %s\nreceived: %s", testID, idu) } } @@ -181,7 +180,7 @@ func TestStore_AddIdentity(t *testing.T) { if !s.active[0].Identity.Equal(testID.Identity) { t.Errorf("Failed to get expected Identity.\nexpected: %s\nreceived: %s", - testID.Identity.String(), s.active[0]) + testID.Identity, s.active[0]) } } @@ -204,19 +203,6 @@ func TestStore_RemoveIdentity(t *testing.T) { } } -func TestStore_UpdateIdSize(t *testing.T) { - kv := versioned.NewKV(make(ekv.Memstore)) - s := NewStore(kv) - newSize := s.idSize * 2 - - s.UpdateIdSize(uint(newSize)) - - if s.idSize != newSize { - t.Errorf("UpdateIdSize() failed to update the size."+ - "\nexpected: %d\nrecieved: %d", newSize, s.idSize) - } -} - func TestStore_prune(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) s := NewStore(kv)