Skip to content
Snippets Groups Projects
Commit 8a41ef3e authored by Jake Taylor's avatar Jake Taylor
Browse files

reeeeeee

parent 90679725
No related branches found
No related tags found
2 merge requests!23Release,!21Reee/reeeeeeeeeeeee
......@@ -119,7 +119,7 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard,
})
//create sub managers
m.message = message.NewManager(m.Internal, m.param.Messages, m.NodeRegistration, m.sender)
m.message = message.NewManager(m.Internal, m.param, m.NodeRegistration, m.sender)
m.round = rounds.NewManager(m.Internal, m.param.Rounds, m.message.GetMessageReceptionChannel(), m.sender)
return &m, nil
......
......@@ -65,12 +65,12 @@ func TestManager_CheckGarbledMessages(t *testing.T) {
if err != nil {
t.Errorf(err.Error())
}
m := NewManager(i, params.Messages{
m := NewManager(i, params.Network{Messages: params.Messages{
MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20,
MaxChecksGarbledMessage: 20,
GarbledMessageWait: time.Hour,
}, nil, sender)
}}, nil, sender)
e2ekv := i.Session.E2e()
err = e2ekv.AddPartner(sess2.GetUser().TransmissionID, sess2.E2e().GetDHPublicKey(), e2ekv.GetDHPrivateKey(),
......
......@@ -8,7 +8,9 @@
package message
import (
"encoding/base64"
"fmt"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/interfaces/params"
"gitlab.com/elixxir/client/network/gateway"
"gitlab.com/elixxir/client/network/internal"
......@@ -19,10 +21,11 @@ import (
)
type Manager struct {
param params.Messages
param params.Network
partitioner parse.Partitioner
internal.Internal
sender *gateway.Sender
blacklistedNodes map[string]interface{}
messageReception chan Bundle
nodeRegistration chan network.NodeGateway
......@@ -30,7 +33,7 @@ type Manager struct {
triggerGarbled chan struct{}
}
func NewManager(internal internal.Internal, param params.Messages,
func NewManager(internal internal.Internal, param params.Network,
nodeRegistration chan network.NodeGateway, sender *gateway.Sender) *Manager {
dummyMessage := format.NewMessage(internal.Session.Cmix().GetGroup().GetP().ByteLen())
m := Manager{
......@@ -41,8 +44,16 @@ func NewManager(internal internal.Internal, param params.Messages,
triggerGarbled: make(chan struct{}, 100),
nodeRegistration: nodeRegistration,
sender: sender,
Internal: internal,
}
for _, nodeId := range param.BlacklistedNodes {
decodedId, err := base64.StdEncoding.DecodeString(nodeId)
if err != nil {
jww.ERROR.Printf("Unable to decode blacklisted Node ID %s: %+v", decodedId, err)
continue
}
m.blacklistedNodes[string(decodedId)] = nil
}
m.Internal = internal
return &m
}
......
......@@ -8,7 +8,6 @@
package message
import (
"bytes"
"fmt"
"github.com/golang-collections/collections/set"
"github.com/pkg/errors"
......@@ -32,11 +31,11 @@ import (
// WARNING: Potentially Unsafe
// Public manager function to send a message over CMIX
func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message,
recipient *id.ID, cmixParams params.CMIX, networkParams params.Network,
recipient *id.ID, cmixParams params.CMIX,
stop *stoppable.Single) (id.Round, ephemeral.Id, error) {
msgCopy := msg.Copy()
return sendCmixHelper(sender, msgCopy, recipient, cmixParams, networkParams, m.Instance,
return sendCmixHelper(sender, msgCopy, recipient, cmixParams, m.blacklistedNodes, m.Instance,
m.Session, m.nodeRegistration, m.Rng, m.Internal.Events,
m.TransmissionID, m.Comms, stop)
}
......@@ -51,7 +50,7 @@ func (m *Manager) SendCMIX(sender *gateway.Sender, msg format.Message,
// which can be registered with the network instance to get a callback on
// its status
func sendCmixHelper(sender *gateway.Sender, msg format.Message,
recipient *id.ID, cmixParams params.CMIX, networkParams params.Network, instance *network.Instance,
recipient *id.ID, cmixParams params.CMIX, blacklistedNodes map[string]interface{}, instance *network.Instance,
session *storage.Session, nodeRegistration chan network.NodeGateway,
rng *fastRNG.StreamGenerator, events interfaces.EventManager,
senderId *id.ID, comms sendCmixCommsInterface,
......@@ -84,27 +83,27 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
if err != nil {
jww.WARN.Printf("Failed to GetUpcomingRealtime (msgDigest: %s): %+v", msg.Digest(), err)
}
if bestRound == nil {
continue
}
//add the round on to the list of attempted, so it is not tried again
attempted.Insert(bestRound)
// Determine whether the selected round contains any Nodes
// that are blacklisted by the params.Network object
containsBlacklisted := false
for _, blacklistedNodeId := range networkParams.BlacklistedNodes {
if containsBlacklisted {
break
}
for _, nodeId := range bestRound.Topology {
// TODO: Figure out encoding
if bytes.Equal(nodeId, []byte(blacklistedNodeId)) {
if _, isBlacklisted := blacklistedNodes[string(nodeId)]; isBlacklisted {
containsBlacklisted = true
break
}
}
}
if bestRound == nil || containsBlacklisted {
if containsBlacklisted {
jww.WARN.Printf("Round %d contains blacklisted node, skipping...", bestRound.ID)
continue
}
//add the round on to the list of attempted, so it is not tried again
attempted.Insert(bestRound)
// Retrieve host and key information from round
firstGateway, roundKeys, err := processRound(instance, session, nodeRegistration, bestRound, recipient.String(), msg.Digest())
if err != nil {
......
......@@ -108,17 +108,17 @@ func Test_attemptSendCmix(t *testing.T) {
t.Errorf("%+v", errors.New(err.Error()))
return
}
m := NewManager(i, params.Messages{
m := NewManager(i, params.Network{Messages: params.Messages{
MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20,
MaxChecksGarbledMessage: 20,
GarbledMessageWait: time.Hour,
}, nil, sender)
}}, nil, sender)
msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen())
msgCmix.SetContents([]byte("test"))
e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID())
_, _, err = sendCmixHelper(sender, msgCmix, sess2.GetUser().ReceptionID,
params.GetDefaultCMIX(), m.Instance, m.Session, m.nodeRegistration,
params.GetDefaultCMIX(), make(map[string]interface{}), m.Instance, m.Session, m.nodeRegistration,
m.Rng, events, m.TransmissionID, &MockSendCMIXComms{t: t}, nil)
if err != nil {
t.Errorf("Failed to sendcmix: %+v", err)
......
......@@ -107,12 +107,12 @@ func Test_attemptSendManyCmix(t *testing.T) {
t.Errorf("%+v", errors.New(err.Error()))
return
}
m := NewManager(i, params.Messages{
m := NewManager(i, params.Network{Messages: params.Messages{
MessageReceptionBuffLen: 20,
MessageReceptionWorkerPoolSize: 20,
MaxChecksGarbledMessage: 20,
GarbledMessageWait: time.Hour,
}, nil, sender)
}}, nil, sender)
msgCmix := format.NewMessage(m.Session.Cmix().GetGroup().GetP().ByteLen())
msgCmix.SetContents([]byte("test"))
e2e.SetUnencrypted(msgCmix, m.Session.User().GetCryptographicIdentity().GetTransmissionID())
......
......@@ -30,7 +30,7 @@ func (m *manager) SendCMIX(msg format.Message, recipient *id.ID, param params.CM
"network is not healthy")
}
return m.message.SendCMIX(m.GetSender(), msg, recipient, param, m.param, nil)
return m.message.SendCMIX(m.GetSender(), msg, recipient, param, nil)
}
// SendManyCMIX sends many "raw" CMIX message payloads to each of the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment