Skip to content
Snippets Groups Projects
Commit a0a79656 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

remove deprecated collator

parent 7f9ae590
Branches
Tags
No related merge requests found
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2019 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package network
import (
"crypto/sha256"
"fmt"
"github.com/pkg/errors"
"gitlab.com/elixxir/client/globals"
"gitlab.com/elixxir/client/parse"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"sync"
"time"
)
type multiPartMessage struct {
parts [][]byte
numPartsReceived uint8
}
const PendingMessageKeyLenBits = uint64(256)
const PendingMessageKeyLen = PendingMessageKeyLenBits / 8
type PendingMessageKey [PendingMessageKeyLen]byte
type Collator struct {
pendingMessages map[PendingMessageKey]*multiPartMessage
// TODO do we need a lock here? or can we assume that requests will come
// from only one thread?
mux sync.Mutex
}
func NewCollator() *Collator {
return &Collator{
pendingMessages: make(map[PendingMessageKey]*multiPartMessage),
}
}
// AddMessage validates its input and silently does nothing on failure
// TODO should this return an error?
// TODO this should take a different type as parameter.
// TODO this takes too many types. i should split it up.
// This method returns a byte slice with the assembled message if it's
// received a completed message.
func (mb *Collator) AddMessage(message *format.Message, sender *id.ID,
timeout time.Duration) (*parse.Message, error) {
payload := message.Contents.GetRightAligned()
recipient, err := message.GetRecipient()
if err != nil {
return nil, err
}
//get the time
timestamp := time.Time{}
err = timestamp.UnmarshalBinary(message.GetTimestamp()[:len(message.GetTimestamp())-1])
if err != nil {
globals.Log.WARN.Printf("Failed to parse timestamp for message %v: %+v",
message.GetTimestamp(), errors.New(err.Error()))
}
partition, err := parse.ValidatePartition(payload)
if err != nil {
return nil, errors.WithMessage(err, "Received an invalid partition: ")
} else {
if partition.MaxIndex == 0 {
//this is the only part of the message. we should take the fast
//path and skip putting it in the map
typedBody, err := parse.Parse(partition.Body)
// Log an error if the message is malformed and return nothing
if err != nil {
return nil, errors.WithMessage(err, "Malformed message received")
}
msg := parse.Message{
TypedBody: *typedBody,
InferredType: parse.Unencrypted,
Sender: sender,
Receiver: recipient,
Timestamp: timestamp,
}
return &msg, nil
} else {
// assemble the map key into a new chunk of memory
var key PendingMessageKey
h := sha256.New()
h.Write(partition.ID)
h.Write(sender.Bytes())
keyHash := h.Sum(nil)
copy(key[:], keyHash[:PendingMessageKeyLen])
mb.mux.Lock()
defer mb.mux.Unlock()
message, ok := mb.pendingMessages[key]
if !ok {
// this is a multi-part message we haven't seen before.
// make a new array of partitions for this key
newMessage := make([][]byte, partition.MaxIndex+1)
newMessage[partition.Index] = partition.Body
message = &multiPartMessage{
parts: newMessage,
numPartsReceived: 1,
}
mb.pendingMessages[key] = message
// start timeout for these partitions
// TODO vary timeout depending on number of messages?
time.AfterFunc(timeout, func() {
mb.mux.Lock()
defer mb.mux.Unlock()
_, ok := mb.pendingMessages[key]
if ok {
delete(mb.pendingMessages, key)
}
})
} else {
// append to array for this key
message.numPartsReceived++
message.parts[partition.Index] = partition.Body
}
if message.numPartsReceived > partition.MaxIndex {
// Construct message
fullMsg, err := parse.Assemble(message.parts)
if err != nil {
delete(mb.pendingMessages, key)
return nil, errors.WithMessage(err, "Malformed message: padding error, ")
}
typedBody, err := parse.Parse(fullMsg)
// Log an error if the message is malformed and return nothing
if err != nil {
delete(mb.pendingMessages, key)
return nil, errors.WithMessage(err, "Malformed message received")
}
msg := parse.Message{
TypedBody: *typedBody,
InferredType: parse.Unencrypted,
Sender: sender,
Receiver: recipient,
Timestamp: timestamp,
}
delete(mb.pendingMessages, key)
return &msg, nil
} else {
// need more parts
return nil, nil
}
}
}
}
// Debug: dump all messages that are currently in the map
func (mb *Collator) dump() string {
dump := ""
mb.mux.Lock()
for key := range mb.pendingMessages {
if mb.pendingMessages[key].parts != nil {
for i, part := range mb.pendingMessages[key].parts {
dump += fmt.Sprintf("Part %v: %v\n", i, part)
}
dump += fmt.Sprintf("Total parts received: %v\n",
mb.pendingMessages[key].numPartsReceived)
}
}
mb.mux.Unlock()
return dump
}
////////////////////////////////////////////////////////////////////////////////
// Copyright © 2019 Privategrity Corporation /
// /
// All rights reserved. /
////////////////////////////////////////////////////////////////////////////////
package network
import (
"bytes"
"encoding/hex"
"gitlab.com/elixxir/client/parse"
"gitlab.com/elixxir/primitives/format"
"gitlab.com/xx_network/primitives/id"
"math/rand"
"testing"
"time"
)
func TestCollator_AddMessage(t *testing.T) {
uid := id.NewIdFromUInt(69, id.User, t)
collator := &Collator{
pendingMessages: make(map[PendingMessageKey]*multiPartMessage),
}
var bodies [][]byte
for length := 5; length < 20*format.TotalLen; length += 20 {
newBody := make([]byte, length)
_, err := rand.Read(newBody)
if err != nil {
t.Errorf("Couldn't generate enough random bytes: %v", err.Error())
}
bodies = append(bodies, newBody)
}
for i := range bodies {
partitions, err := parse.Partition([]byte(bodies[i]), []byte{5})
if err != nil {
t.Errorf("Error partitioning messages: %v", err.Error())
}
var result *parse.Message
for j := range partitions {
fm := format.NewMessage()
fm.SetRecipient(id.NewIdFromUInt(6, id.User, t))
fm.Contents.SetRightAligned(partitions[j])
result, err = collator.AddMessage(fm, uid, time.Minute)
if err != nil {
t.Fatal(err)
}
}
typedBody, err := parse.Parse(bodies[i])
// This always fails because of the trailing zeroes. Question is, how
// much does it matter in regular usage? Protobufs know their length
// already, and strings should respect null terminators,
// so it's probably not actually that much of a problem.
if !bytes.Contains(result.Body, typedBody.Body) {
t.Errorf("Input didn't match output for %v. \n Got: %v\n Expected %v",
i, hex.EncodeToString(result.Body),
hex.EncodeToString(typedBody.Body))
}
}
}
func TestCollator_AddMessage_Timeout(t *testing.T) {
uid := id.NewIdFromUInt(69, id.User, t)
collator := &Collator{
pendingMessages: make(map[PendingMessageKey]*multiPartMessage),
}
//enough for four partitions, probably
body := make([]byte, 3*format.ContentsLen)
partitions, err := parse.Partition(body, []byte{88})
if err != nil {
t.Errorf("Error partitioning messages: %v", err.Error())
}
var result *parse.Message
for i := range partitions {
fm := format.NewMessage()
now := time.Now()
nowBytes, _ := now.MarshalBinary()
nowBytes = append(nowBytes, make([]byte, format.TimestampLen-len(nowBytes))...)
fm.SetTimestamp(nowBytes)
fm.SetRecipient(id.NewIdFromUInt(6, id.User, t))
fm.Contents.SetRightAligned(partitions[i])
result, err = collator.AddMessage(fm, uid, 80*time.Millisecond)
if err != nil {
t.Fatal(err)
}
if result != nil {
t.Error("Got a result from collator when it should be timing out" +
" submessages")
}
time.Sleep(50 * time.Millisecond)
}
time.Sleep(80 * time.Millisecond)
if len(collator.pendingMessages) != 0 {
t.Error("Multi-part message didn't get timed out properly")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment