diff --git a/channels/interface.go b/channels/interface.go index ac07646c0abcdf6fe4c37cd97c8e25139ecfbdaf..efbf26b56c445050dca9252289a31238ca5b87bc 100644 --- a/channels/interface.go +++ b/channels/interface.go @@ -9,6 +9,7 @@ package channels import ( "gitlab.com/elixxir/client/cmix" + cryptoBroadcast "gitlab.com/elixxir/crypto/broadcast" cryptoChannel "gitlab.com/elixxir/crypto/channel" "gitlab.com/xx_network/crypto/signature/rsa" "gitlab.com/xx_network/primitives/id" @@ -82,4 +83,17 @@ type Manager interface { // on a multiple registration. RegisterReceiveHandler(messageType MessageType, listener MessageTypeReceiveMessage) error + + // GetChannels returns the IDs of all channels that have been joined. Use + // getChannelsUnsafe if you already have taken the mux. + GetChannels() []*id.ID + + // GetChannel returns the underlying cryptographic structure for a given channel. + GetChannel(chID *id.ID) (*cryptoBroadcast.Channel, error) + + // ReplayChannel replays all messages from the channel within the network's + // memory (~3 weeks) over the event model. It does this by wiping the + // underlying state tracking for message pickup for the channel, causing all + // messages to be re-retrieved from the network + ReplayChannel(chID *id.ID) error } diff --git a/channels/joinedChannel.go b/channels/joinedChannel.go index 6d4fc2e7b22a8cc226d1f31df60125f9a6ead27a..ab24aaa8bf98441b4af0bf2a6560251d4313d126 100644 --- a/channels/joinedChannel.go +++ b/channels/joinedChannel.go @@ -72,7 +72,7 @@ func (m *manager) loadChannels() { for i := range chList { jc, err := loadJoinedChannel( - chList[i], m.kv, m.client, m.rng, m.name, m.events, m.broadcastMaker) + chList[i], m.kv, m.net, m.rng, m.name, m.events, m.broadcastMaker) if err != nil { jww.FATAL.Panicf("Failed to load channel %s: %+v", chList[i], err) } @@ -90,7 +90,7 @@ func (m *manager) addChannel(channel *cryptoBroadcast.Channel) error { return ChannelAlreadyExistsErr } - b, err := m.broadcastMaker(channel, m.client, m.rng) + b, err := m.broadcastMaker(channel, m.net, m.rng) if err != nil { return err } @@ -167,14 +167,6 @@ func (m *manager) getChannel(channelID *id.ID) (*joinedChannel, error) { return jc, nil } -// getChannels returns the IDs of all channels that have been joined. Use -// getChannelsUnsafe if you already have taken the mux. -func (m *manager) getChannels() []*id.ID { - m.mux.Lock() - defer m.mux.Unlock() - return m.getChannelsUnsafe() -} - // getChannelsUnsafe returns the IDs of all channels that have been joined. This // function is unsafe because it does not take the mux; only use this function // when under a lock. @@ -230,14 +222,35 @@ func loadJoinedChannel(chId *id.ID, kv *versioned.KV, net broadcast.Client, if err != nil { return nil, err } - b, err := broadcastMaker(jcd.Broadcast, net, rngGen) + + b, err := initBroadcast(jcd.Broadcast, name, e, net, broadcastMaker, rngGen) + + jc := &joinedChannel{broadcast: b} + return jc, nil +} + +// delete removes the channel from the kv. +func (jc *joinedChannel) delete(kv *versioned.KV) error { + return kv.Delete(makeJoinedChannelKey(jc.broadcast.Get().ReceptionID), + joinedChannelVersion) +} + +func makeJoinedChannelKey(chId *id.ID) string { + return joinedChannelKey + chId.HexEncode() +} + +func initBroadcast(c *cryptoBroadcast.Channel, + name NameService, e *events, net broadcast.Client, + broadcastMaker broadcast.NewBroadcastChannelFunc, + rngGen *fastRNG.StreamGenerator) (broadcast.Channel, error) { + b, err := broadcastMaker(c, net, rngGen) if err != nil { return nil, err } err = b.RegisterListener((&userListener{ name: name, - chID: jcd.Broadcast.ReceptionID, + chID: c.ReceptionID, trigger: e.triggerEvent, }).Listen, broadcast.Symmetric) if err != nil { @@ -245,23 +258,12 @@ func loadJoinedChannel(chId *id.ID, kv *versioned.KV, net broadcast.Client, } err = b.RegisterListener((&adminListener{ - chID: jcd.Broadcast.ReceptionID, + chID: c.ReceptionID, trigger: e.triggerAdminEvent, }).Listen, broadcast.Asymmetric) if err != nil { return nil, err } - jc := &joinedChannel{broadcast: b} - return jc, nil -} - -// delete removes the channel from the kv. -func (jc *joinedChannel) delete(kv *versioned.KV) error { - return kv.Delete(makeJoinedChannelKey(jc.broadcast.Get().ReceptionID), - joinedChannelVersion) -} - -func makeJoinedChannelKey(chId *id.ID) string { - return joinedChannelKey + chId.HexEncode() + return b, nil } diff --git a/channels/joinedChannel_test.go b/channels/joinedChannel_test.go index a82a75884d775987f6d108f5923413febcf6f0e7..cebec207d35738d0c8f7ab3d04b267604130a87f 100644 --- a/channels/joinedChannel_test.go +++ b/channels/joinedChannel_test.go @@ -45,7 +45,7 @@ func Test_manager_store(t *testing.T) { t.Errorf("Failed to create new channel %d: %+v", i, err) } - b, err := broadcast.NewBroadcastChannel(ch, m.client, m.rng) + b, err := broadcast.NewBroadcastChannel(ch, m.net, m.rng) if err != nil { t.Errorf("Failed to make new broadcast channel: %+v", err) } @@ -81,7 +81,7 @@ func Test_manager_loadChannels(t *testing.T) { t.Errorf("Failed to create new channel %d: %+v", i, err) } - b, err := broadcast.NewBroadcastChannel(ch, m.client, m.rng) + b, err := broadcast.NewBroadcastChannel(ch, m.net, m.rng) if err != nil { t.Errorf("Failed to make new broadcast channel: %+v", err) } @@ -104,7 +104,7 @@ func Test_manager_loadChannels(t *testing.T) { newManager := &manager{ channels: make(map[id.ID]*joinedChannel), kv: m.kv, - client: m.client, + net: m.net, rng: m.rng, broadcastMaker: m.broadcastMaker, } @@ -369,7 +369,7 @@ func Test_loadJoinedChannel(t *testing.T) { t.Errorf("Failed to add channel: %+v", err) } - loadedJc, err := loadJoinedChannel(ch.ReceptionID, m.kv, m.client, m.rng, + loadedJc, err := loadJoinedChannel(ch.ReceptionID, m.kv, m.net, m.rng, m.name, m.events, m.broadcastMaker) if err != nil { t.Errorf("Failed to load joinedChannel: %+v", err) diff --git a/channels/manager.go b/channels/manager.go index 434ad90dd83dde233fe30b378b51c2bb037fa3ce..539d1a4e2734f0715f3e179dc64d3cf6244412f5 100644 --- a/channels/manager.go +++ b/channels/manager.go @@ -44,10 +44,10 @@ type manager struct { mux sync.RWMutex // External references - kv *versioned.KV - client broadcast.Client - rng *fastRNG.StreamGenerator - name NameService + kv *versioned.KV + net broadcast.Client + rng *fastRNG.StreamGenerator + name NameService // Events model *events @@ -67,7 +67,7 @@ func NewManager(kv *versioned.KV, client broadcast.Client, m := manager{ kv: kv, - client: client, + net: client, rng: rng, name: name, broadcastMaker: broadcast.NewBroadcastChannel, @@ -105,3 +105,50 @@ func (m *manager) LeaveChannel(channelID *id.ID) error { return nil } + +// GetChannels returns the IDs of all channels that have been joined. Use +// getChannelsUnsafe if you already have taken the mux. +func (m *manager) GetChannels() []*id.ID { + m.mux.Lock() + defer m.mux.Unlock() + return m.getChannelsUnsafe() +} + +// GetChannel returns the underlying cryptographic structure for a given channel. +func (m *manager) GetChannel(chID *id.ID) (*cryptoBroadcast.Channel, error) { + jc, err := m.getChannel(chID) + if err != nil { + return nil, err + } + return jc.broadcast.Get(), nil +} + +// ReplayChannel replays all messages from the channel within the network's +// memory (~3 weeks) over the event model. It does this by wiping the +// underlying state tracking for message pickup for the channel, causing all +// messages to be re-retrieved from the network +func (m *manager) ReplayChannel(chID *id.ID) error { + m.mux.RLock() + defer m.mux.RUnlock() + + jc, exists := m.channels[*chID] + if !exists { + return ChannelDoesNotExistsErr + } + + c := jc.broadcast.Get() + + // stop the broadcast which will completely wipe it from the underlying + // cmix object + jc.broadcast.Stop() + + //re-instantiate the broadcast, re-registering it from scratch + b, err := initBroadcast(c, m.name, m.events, m.net, m.broadcastMaker, m.rng) + if err != nil { + return err + } + jc.broadcast = b + + return nil + +} diff --git a/channels/manager_test.go b/channels/manager_test.go index fdf3e2bd2badb8e998916e9dabee75e247f7e416..29d283f4b53e573f304325589d047a609e5e30ea 100644 --- a/channels/manager_test.go +++ b/channels/manager_test.go @@ -8,11 +8,15 @@ package channels import ( + "fmt" + "gitlab.com/elixxir/client/broadcast" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/fastRNG" "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" "os" + "sync" "testing" "time" @@ -92,3 +96,102 @@ func TestManager_LeaveChannel(t *testing.T) { "model") } } + +func TestManager_GetChannels(t *testing.T) { + m := &manager{ + channels: make(map[id.ID]*joinedChannel), + mux: sync.RWMutex{}, + } + + rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) + + numtests := 10 + + chList := make(map[id.ID]interface{}) + + for i := 0; i < 10; i++ { + name := fmt.Sprintf("testChannel %d", numtests) + s := rng.GetStream() + tc, _, err := newTestChannel(name, "blarg", s) + s.Close() + if err != nil { + t.Fatalf("failed to generate channel %s", name) + } + bc, err := broadcast.NewBroadcastChannel(tc, new(mockBroadcastClient), rng) + if err != nil { + t.Fatalf("failed to generate broadcast %s", name) + } + m.channels[*tc.ReceptionID] = &joinedChannel{broadcast: bc} + chList[*tc.ReceptionID] = nil + } + + receivedChList := m.GetChannels() + + for _, receivedCh := range receivedChList { + if _, exists := chList[*receivedCh]; !exists { + t.Errorf("Channel was not returned") + } + } +} + +func TestManager_GetChannel(t *testing.T) { + m := &manager{ + channels: make(map[id.ID]*joinedChannel), + mux: sync.RWMutex{}, + } + + rng := fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG) + + numtests := 10 + + chList := make([]*id.ID, 0, numtests) + + for i := 0; i < 10; i++ { + name := fmt.Sprintf("testChannel %d", numtests) + s := rng.GetStream() + tc, _, err := newTestChannel(name, "blarg", s) + s.Close() + if err != nil { + t.Fatalf("failed to generate channel %s", name) + } + bc, err := broadcast.NewBroadcastChannel(tc, new(mockBroadcastClient), rng) + if err != nil { + t.Fatalf("failed to generate broadcast %s", name) + } + m.channels[*tc.ReceptionID] = &joinedChannel{broadcast: bc} + chList = append(chList, tc.ReceptionID) + } + + for i, receivedCh := range chList { + ch, err := m.GetChannel(receivedCh) + if err != nil { + t.Errorf("Channel %d failed to be gotten", i) + } else if !ch.ReceptionID.Cmp(receivedCh) { + t.Errorf("Channel %d Get returned wrong channel", i) + } + } +} + +func TestManager_GetChannel_BadChannel(t *testing.T) { + m := &manager{ + channels: make(map[id.ID]*joinedChannel), + mux: sync.RWMutex{}, + } + + numtests := 10 + + chList := make([]*id.ID, 0, numtests) + + for i := 0; i < 10; i++ { + chId := &id.ID{} + chId[0] = byte(i) + chList = append(chList, chId) + } + + for i, receivedCh := range chList { + _, err := m.GetChannel(receivedCh) + if err == nil { + t.Errorf("Channel %d returned when it doesnt exist", i) + } + } +}