Skip to content
Snippets Groups Projects
Commit ee6642a8 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'release' of gitlab.com:elixxir/client into release

parents f7954168 4e6fe64d
Branches
Tags
No related merge requests found
...@@ -356,6 +356,7 @@ func (c *Client) GetSwitchboard() interfaces.Switchboard { ...@@ -356,6 +356,7 @@ func (c *Client) GetSwitchboard() interfaces.Switchboard {
// events. // events.
func (c *Client) GetRoundEvents() interfaces.RoundEvents { func (c *Client) GetRoundEvents() interfaces.RoundEvents {
jww.INFO.Printf("GetRoundEvents()") jww.INFO.Printf("GetRoundEvents()")
jww.WARN.Printf("GetRoundEvents does not handle Client Errors edge case!")
return c.network.GetInstance().GetRoundEvents() return c.network.GetInstance().GetRoundEvents()
} }
......
...@@ -297,15 +297,15 @@ func (c *Client) RegisterRoundEventsHandler(rid int, cb RoundEventCallback, ...@@ -297,15 +297,15 @@ func (c *Client) RegisterRoundEventsHandler(rid int, cb RoundEventCallback,
} }
// RegisterMessageDeliveryCB allows the caller to get notified if the rounds a // RegisterMessageDeliveryCB allows the caller to get notified if the rounds a
// message was sent in sucesfully completed. Under the hood, this uses the same // message was sent in successfully completed. Under the hood, this uses the same
// interface as RegisterRoundEventsHandler, but provides a convienet way to use // interface as RegisterRoundEventsHandler, but provides a convent way to use
// the interface in its most common form, looking up the result of message // the interface in its most common form, looking up the result of message
// retreval // retrieval
// //
// The callbacks will return at timeoutMS if no state update occurs // The callbacks will return at timeoutMS if no state update occurs
// //
// This function takes the marshaled send report to ensure a memory leak does // This function takes the marshaled send report to ensure a memory leak does
// not occur as a result of both sides of the bindings holding a refrence to // not occur as a result of both sides of the bindings holding a reference to
// the same pointer. // the same pointer.
func (c *Client) RegisterMessageDeliveryCB(marshaledSendReport []byte, func (c *Client) RegisterMessageDeliveryCB(marshaledSendReport []byte,
mdc MessageDeliveryCallback, timeoutMS int) (*Unregister, error) { mdc MessageDeliveryCallback, timeoutMS int) (*Unregister, error) {
......
...@@ -23,12 +23,13 @@ package network ...@@ -23,12 +23,13 @@ package network
// instance // instance
import ( import (
"gitlab.com/elixxir/client/network/gateway" "bytes"
//"gitlab.com/elixxir/client/storage"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
bloom "gitlab.com/elixxir/bloomfilter" bloom "gitlab.com/elixxir/bloomfilter"
"gitlab.com/elixxir/client/network/gateway"
pb "gitlab.com/elixxir/comms/mixmessages" pb "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/elixxir/primitives/knownRounds" "gitlab.com/elixxir/primitives/knownRounds"
"gitlab.com/elixxir/primitives/states"
"gitlab.com/xx_network/comms/connect" "gitlab.com/xx_network/comms/connect"
"gitlab.com/xx_network/crypto/csprng" "gitlab.com/xx_network/crypto/csprng"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
...@@ -62,7 +63,7 @@ func (m *manager) followNetwork(quitCh <-chan struct{}) { ...@@ -62,7 +63,7 @@ func (m *manager) followNetwork(quitCh <-chan struct{}) {
} }
} }
var followCnt int = 0 var followCnt = 0
// executes each iteration of the follower // executes each iteration of the follower
func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
...@@ -86,10 +87,10 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { ...@@ -86,10 +87,10 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
LastUpdate: uint64(m.Instance.GetLastUpdateID()), LastUpdate: uint64(m.Instance.GetLastUpdateID()),
ClientID: m.Uid.Bytes(), ClientID: m.Uid.Bytes(),
} }
jww.TRACE.Printf("polling %s for NDF", gwHost) jww.TRACE.Printf("Polling %s for NDF...", gwHost)
pollResp, err := comms.SendPoll(gwHost, &pollReq) pollResp, err := comms.SendPoll(gwHost, &pollReq)
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", err) jww.ERROR.Printf("Unable to poll %s for NDF: %+v", gwHost, err)
return return
} }
...@@ -98,7 +99,7 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { ...@@ -98,7 +99,7 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
gwRoundsState := &knownRounds.KnownRounds{} gwRoundsState := &knownRounds.KnownRounds{}
err = gwRoundsState.Unmarshal(pollResp.KnownRounds) err = gwRoundsState.Unmarshal(pollResp.KnownRounds)
if err != nil { if err != nil {
jww.ERROR.Printf("Failed to unmartial: %+v", err) jww.ERROR.Printf("Failed to unmarshal: %+v", err)
return return
} }
var filterList []*bloom.Ring var filterList []*bloom.Ring
...@@ -127,13 +128,13 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { ...@@ -127,13 +128,13 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
if pollResp.PartialNDF != nil { if pollResp.PartialNDF != nil {
err = m.Instance.UpdatePartialNdf(pollResp.PartialNDF) err = m.Instance.UpdatePartialNdf(pollResp.PartialNDF)
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", err) jww.ERROR.Printf("Unable to update partial NDF: %+v", err)
return return
} }
err = m.Instance.UpdateGatewayConnections() err = m.Instance.UpdateGatewayConnections()
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", err) jww.ERROR.Printf("Unable to update gateway connections: %+v", err)
return return
} }
} }
...@@ -142,11 +143,48 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) { ...@@ -142,11 +143,48 @@ func (m *manager) follow(rng csprng.Source, comms followNetworkComms) {
// network // network
if pollResp.Updates != nil { if pollResp.Updates != nil {
err = m.Instance.RoundUpdates(pollResp.Updates) err = m.Instance.RoundUpdates(pollResp.Updates)
//jww.TRACE.Printf("%+v", pollResp.Updates)
if err != nil { if err != nil {
jww.ERROR.Printf("%+v", err) jww.ERROR.Printf("%+v", err)
return return
} }
// Iterate over ClientErrors for each RoundUpdate
for _, update := range pollResp.Updates {
// Ignore irrelevant updates
if update.State != uint32(states.COMPLETED) && update.State != uint32(states.FAILED) {
continue
}
for _, clientErr := range update.ClientErrors {
// If this Client appears in the ClientError
if bytes.Equal(clientErr.ClientId, m.Session.GetUser().ID.Marshal()) {
// Obtain relevant NodeGateway information
nGw, err := m.Instance.GetNodeAndGateway(gwHost.GetId())
if err != nil {
jww.ERROR.Printf("Unable to get NodeGateway: %+v", err)
return
}
nid, err := nGw.Node.GetNodeId()
if err != nil {
jww.ERROR.Printf("Unable to get NodeID: %+v", err)
return
}
// FIXME: Should be able to trigger proper type of round event
// FIXME: without mutating the RoundInfo. Signature also needs verified
// FIXME: before keys are deleted
update.State = uint32(states.FAILED)
m.Instance.GetRoundEvents().TriggerRoundEvent(update)
// Delete all existing keys and trigger a re-registration with the relevant Node
m.Session.Cmix().Remove(nid)
m.Instance.GetAddGatewayChan() <- nGw
}
}
}
} }
// ---- Round Processing ----- // ---- Round Processing -----
......
...@@ -9,6 +9,7 @@ package message ...@@ -9,6 +9,7 @@ package message
import ( import (
"gitlab.com/elixxir/client/interfaces/message" "gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/primitives/format"
"time" "time"
) )
...@@ -45,6 +46,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) { ...@@ -45,6 +46,7 @@ func (m *Manager) processGarbledMessages(quitCh <-chan struct{}) {
func (m *Manager) handleGarbledMessages() { func (m *Manager) handleGarbledMessages() {
garbledMsgs := m.Session.GetGarbledMessages() garbledMsgs := m.Session.GetGarbledMessages()
e2eKv := m.Session.E2e() e2eKv := m.Session.E2e()
var failedMsgs []format.Message
//try to decrypt every garbled message, excising those who's counts are too high //try to decrypt every garbled message, excising those who's counts are too high
for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has; grbldMsg, count, timestamp, has = garbledMsgs.Next() { for grbldMsg, count, timestamp, has := garbledMsgs.Next(); has; grbldMsg, count, timestamp, has = garbledMsgs.Next() {
fingerprint := grbldMsg.GetKeyFP() fingerprint := grbldMsg.GetKeyFP()
...@@ -52,9 +54,9 @@ func (m *Manager) handleGarbledMessages() { ...@@ -52,9 +54,9 @@ func (m *Manager) handleGarbledMessages() {
if key, isE2E := e2eKv.PopKey(fingerprint); isE2E { if key, isE2E := e2eKv.PopKey(fingerprint); isE2E {
// Decrypt encrypted message // Decrypt encrypted message
msg, err := key.Decrypt(grbldMsg) msg, err := key.Decrypt(grbldMsg)
if err == nil {
// get the sender // get the sender
sender := key.GetSession().GetPartner() sender := key.GetSession().GetPartner()
if err == nil {
//remove from the buffer if decryption is successful //remove from the buffer if decryption is successful
garbledMsgs.Remove(grbldMsg) garbledMsgs.Remove(grbldMsg)
//handle the successfully decrypted message //handle the successfully decrypted message
...@@ -74,7 +76,10 @@ func (m *Manager) handleGarbledMessages() { ...@@ -74,7 +76,10 @@ func (m *Manager) handleGarbledMessages() {
time.Since(timestamp) > m.param.GarbledMessageWait { time.Since(timestamp) > m.param.GarbledMessageWait {
garbledMsgs.Remove(grbldMsg) garbledMsgs.Remove(grbldMsg)
} else { } else {
garbledMsgs.Failed(grbldMsg) failedMsgs = append(failedMsgs, grbldMsg)
} }
} }
for _, grbldMsg := range failedMsgs {
garbledMsgs.Failed(grbldMsg)
}
} }
package message
import (
"encoding/binary"
"gitlab.com/elixxir/client/interfaces/message"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network/internal"
"gitlab.com/elixxir/client/network/message/parse"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/e2e"
"gitlab.com/elixxir/client/switchboard"
"gitlab.com/elixxir/comms/client"
"gitlab.com/elixxir/crypto/fastRNG"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/crypto/csprng"
"math/rand"
"testing"
"time"
)
type TestListener struct {
ch chan bool
}
// the Hear function is called to exercise the listener, passing in the
// data as an item
func (l TestListener) Hear(item message.Receive) {
l.ch <- true
}
// Returns a name, used for debugging
func (l TestListener) Name() string {
return "TEST LISTENER FOR GARBLED MESSAGES"
}
func TestManager_CheckGarbledMessages(t *testing.T) {
sess1 := storage.InitTestingSession(t)
sess2 := storage.InitTestingSession(t)
sw := switchboard.New()
l := TestListener{
ch: make(chan bool),
}
sw.RegisterListener(sess2.GetUser().ID, message.Raw, l)
comms, err := client.NewClientComms(sess1.GetUser().ID, nil, nil, nil)
if err != nil {
t.Errorf("Failed to start client comms: %+v", err)
}
i := internal.Internal{
Session: sess1,
Switchboard: sw,
Rng: fastRNG.NewStreamGenerator(1, 1, csprng.NewSystemRNG),
Comms: comms,
Health: nil,
Uid: sess1.GetUser().ID,
Instance: nil,
NodeRegistration: nil,
}
m := NewManager(i, params.Messages{
MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20,
MaxChecksGarbledMessage: 20,
GarbledMessageWait: time.Hour,
}, nil)
e2ekv := i.Session.E2e()
err = e2ekv.AddPartner(sess2.GetUser().ID, sess2.E2e().GetDHPublicKey(), e2ekv.GetDHPrivateKey(), e2e.GetDefaultSessionParams(), e2e.GetDefaultSessionParams())
if err != nil {
t.Errorf("Failed to add e2e partner: %+v", err)
t.FailNow()
}
err = sess2.E2e().AddPartner(sess1.GetUser().ID, sess1.E2e().GetDHPublicKey(), sess2.E2e().GetDHPrivateKey(), e2e.GetDefaultSessionParams(), e2e.GetDefaultSessionParams())
if err != nil {
t.Errorf("Failed to add e2e partner: %+v", err)
t.FailNow()
}
partner1, err := sess2.E2e().GetPartner(sess1.GetUser().ID)
if err != nil {
t.Errorf("Failed to get partner: %+v", err)
t.FailNow()
}
msg := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen())
key, err := partner1.GetKeyForSending(params.Standard)
if err != nil {
t.Errorf("failed to get key: %+v", err)
t.FailNow()
}
contents := make([]byte, msg.ContentsSize())
prng := rand.New(rand.NewSource(42))
prng.Read(contents)
fmp := parse.FirstMessagePartFromBytes(contents)
binary.BigEndian.PutUint32(fmp.Type, uint32(message.Raw))
fmp.NumParts[0] = uint8(1)
binary.BigEndian.PutUint16(fmp.Len, 256)
fmp.Part[0] = 0
ts, err := time.Now().MarshalBinary()
if err != nil {
t.Errorf("failed to martial ts: %+v", err)
}
copy(fmp.Timestamp, ts)
msg.SetContents(fmp.Bytes())
encryptedMsg := key.Encrypt(msg)
i.Session.GetGarbledMessages().Add(encryptedMsg)
quitch := make(chan struct{})
go m.processGarbledMessages(quitch)
m.CheckGarbledMessages()
ticker := time.NewTicker(time.Second)
select {
case <-ticker.C:
t.Error("Didn't hear anything")
case <-l.ch:
t.Log("Heard something")
}
}
...@@ -335,5 +335,13 @@ func InitTestingSession(i interface{}) *Session { ...@@ -335,5 +335,13 @@ func InitTestingSession(i interface{}) *Session {
globals.Log.FATAL.Panicf("InitTestingSession failed to create dummy critical messages: %+v", err) globals.Log.FATAL.Panicf("InitTestingSession failed to create dummy critical messages: %+v", err)
} }
s.garbledMessages, err = utility.NewMeteredCmixMessageBuffer(s.kv, garbledMessagesKey)
if err != nil {
globals.Log.FATAL.Panicf("Failed to create garbledMessages buffer: %+v", err)
}
s.conversations = conversation.NewStore(s.kv)
s.partition = partition.New(s.kv)
return s return s
} }
...@@ -257,6 +257,7 @@ func (mb *MessageBuffer) Next() (interface{}, bool) { ...@@ -257,6 +257,7 @@ func (mb *MessageBuffer) Next() (interface{}, bool) {
if err != nil { if err != nil {
jww.FATAL.Panicf("Could not load message: %v", err) jww.FATAL.Panicf("Could not load message: %v", err)
} }
return m, true return m, true
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment