diff --git a/network/collate.go b/network/collate.go deleted file mode 100644 index e628eb4406815e78a2e894389f4eb54d191b72c4..0000000000000000000000000000000000000000 --- a/network/collate.go +++ /dev/null @@ -1,179 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2019 Privategrity Corporation / -// / -// All rights reserved. / -//////////////////////////////////////////////////////////////////////////////// - -package network - -import ( - "crypto/sha256" - "fmt" - "github.com/pkg/errors" - "gitlab.com/elixxir/client/globals" - "gitlab.com/elixxir/client/parse" - "gitlab.com/elixxir/primitives/format" - "gitlab.com/xx_network/primitives/id" - "sync" - "time" -) - -type multiPartMessage struct { - parts [][]byte - numPartsReceived uint8 -} - -const PendingMessageKeyLenBits = uint64(256) -const PendingMessageKeyLen = PendingMessageKeyLenBits / 8 - -type PendingMessageKey [PendingMessageKeyLen]byte - -type Collator struct { - pendingMessages map[PendingMessageKey]*multiPartMessage - // TODO do we need a lock here? or can we assume that requests will come - // from only one thread? - mux sync.Mutex -} - -func NewCollator() *Collator { - return &Collator{ - pendingMessages: make(map[PendingMessageKey]*multiPartMessage), - } -} - -// AddMessage validates its input and silently does nothing on failure -// TODO should this return an error? -// TODO this should take a different type as parameter. -// TODO this takes too many types. i should split it up. -// This method returns a byte slice with the assembled message if it's -// received a completed message. -func (mb *Collator) AddMessage(message *format.Message, sender *id.ID, - timeout time.Duration) (*parse.Message, error) { - - payload := message.Contents.GetRightAligned() - recipient, err := message.GetRecipient() - if err != nil { - return nil, err - } - - //get the time - timestamp := time.Time{} - - err = timestamp.UnmarshalBinary(message.GetTimestamp()[:len(message.GetTimestamp())-1]) - - if err != nil { - globals.Log.WARN.Printf("Failed to parse timestamp for message %v: %+v", - message.GetTimestamp(), errors.New(err.Error())) - } - - partition, err := parse.ValidatePartition(payload) - - if err != nil { - return nil, errors.WithMessage(err, "Received an invalid partition: ") - } else { - if partition.MaxIndex == 0 { - //this is the only part of the message. we should take the fast - //path and skip putting it in the map - typedBody, err := parse.Parse(partition.Body) - // Log an error if the message is malformed and return nothing - if err != nil { - return nil, errors.WithMessage(err, "Malformed message received") - } - - msg := parse.Message{ - TypedBody: *typedBody, - InferredType: parse.Unencrypted, - Sender: sender, - Receiver: recipient, - Timestamp: timestamp, - } - - return &msg, nil - } else { - // assemble the map key into a new chunk of memory - var key PendingMessageKey - h := sha256.New() - h.Write(partition.ID) - h.Write(sender.Bytes()) - keyHash := h.Sum(nil) - copy(key[:], keyHash[:PendingMessageKeyLen]) - - mb.mux.Lock() - defer mb.mux.Unlock() - message, ok := mb.pendingMessages[key] - if !ok { - // this is a multi-part message we haven't seen before. - // make a new array of partitions for this key - newMessage := make([][]byte, partition.MaxIndex+1) - newMessage[partition.Index] = partition.Body - - message = &multiPartMessage{ - parts: newMessage, - numPartsReceived: 1, - } - - mb.pendingMessages[key] = message - - // start timeout for these partitions - // TODO vary timeout depending on number of messages? - time.AfterFunc(timeout, func() { - mb.mux.Lock() - defer mb.mux.Unlock() - _, ok := mb.pendingMessages[key] - if ok { - delete(mb.pendingMessages, key) - } - }) - } else { - // append to array for this key - message.numPartsReceived++ - message.parts[partition.Index] = partition.Body - } - if message.numPartsReceived > partition.MaxIndex { - // Construct message - fullMsg, err := parse.Assemble(message.parts) - if err != nil { - delete(mb.pendingMessages, key) - return nil, errors.WithMessage(err, "Malformed message: padding error, ") - } - typedBody, err := parse.Parse(fullMsg) - // Log an error if the message is malformed and return nothing - if err != nil { - delete(mb.pendingMessages, key) - return nil, errors.WithMessage(err, "Malformed message received") - } - - msg := parse.Message{ - TypedBody: *typedBody, - InferredType: parse.Unencrypted, - Sender: sender, - Receiver: recipient, - Timestamp: timestamp, - } - - delete(mb.pendingMessages, key) - return &msg, nil - } else { - // need more parts - return nil, nil - } - } - } -} - -// Debug: dump all messages that are currently in the map -func (mb *Collator) dump() string { - dump := "" - mb.mux.Lock() - for key := range mb.pendingMessages { - if mb.pendingMessages[key].parts != nil { - for i, part := range mb.pendingMessages[key].parts { - dump += fmt.Sprintf("Part %v: %v\n", i, part) - } - dump += fmt.Sprintf("Total parts received: %v\n", - mb.pendingMessages[key].numPartsReceived) - } - } - mb.mux.Unlock() - return dump -} diff --git a/network/collate_test.go b/network/collate_test.go deleted file mode 100644 index b4c7714e1c1ef2a99d24d5889fa4bd140e965a19..0000000000000000000000000000000000000000 --- a/network/collate_test.go +++ /dev/null @@ -1,108 +0,0 @@ -//////////////////////////////////////////////////////////////////////////////// -// Copyright © 2019 Privategrity Corporation / -// / -// All rights reserved. / -//////////////////////////////////////////////////////////////////////////////// - -package network - -import ( - "bytes" - "encoding/hex" - "gitlab.com/elixxir/client/parse" - "gitlab.com/elixxir/primitives/format" - "gitlab.com/xx_network/primitives/id" - "math/rand" - "testing" - "time" -) - -func TestCollator_AddMessage(t *testing.T) { - - uid := id.NewIdFromUInt(69, id.User, t) - - collator := &Collator{ - pendingMessages: make(map[PendingMessageKey]*multiPartMessage), - } - var bodies [][]byte - for length := 5; length < 20*format.TotalLen; length += 20 { - newBody := make([]byte, length) - _, err := rand.Read(newBody) - if err != nil { - t.Errorf("Couldn't generate enough random bytes: %v", err.Error()) - } - - bodies = append(bodies, newBody) - } - for i := range bodies { - partitions, err := parse.Partition([]byte(bodies[i]), []byte{5}) - if err != nil { - t.Errorf("Error partitioning messages: %v", err.Error()) - } - - var result *parse.Message - for j := range partitions { - - fm := format.NewMessage() - fm.SetRecipient(id.NewIdFromUInt(6, id.User, t)) - fm.Contents.SetRightAligned(partitions[j]) - - result, err = collator.AddMessage(fm, uid, time.Minute) - if err != nil { - t.Fatal(err) - } - } - - typedBody, err := parse.Parse(bodies[i]) - - // This always fails because of the trailing zeroes. Question is, how - // much does it matter in regular usage? Protobufs know their length - // already, and strings should respect null terminators, - // so it's probably not actually that much of a problem. - if !bytes.Contains(result.Body, typedBody.Body) { - t.Errorf("Input didn't match output for %v. \n Got: %v\n Expected %v", - i, hex.EncodeToString(result.Body), - hex.EncodeToString(typedBody.Body)) - } - } -} - -func TestCollator_AddMessage_Timeout(t *testing.T) { - - uid := id.NewIdFromUInt(69, id.User, t) - - collator := &Collator{ - pendingMessages: make(map[PendingMessageKey]*multiPartMessage), - } - //enough for four partitions, probably - body := make([]byte, 3*format.ContentsLen) - partitions, err := parse.Partition(body, []byte{88}) - if err != nil { - t.Errorf("Error partitioning messages: %v", err.Error()) - } - var result *parse.Message - for i := range partitions { - fm := format.NewMessage() - now := time.Now() - nowBytes, _ := now.MarshalBinary() - nowBytes = append(nowBytes, make([]byte, format.TimestampLen-len(nowBytes))...) - fm.SetTimestamp(nowBytes) - fm.SetRecipient(id.NewIdFromUInt(6, id.User, t)) - fm.Contents.SetRightAligned(partitions[i]) - - result, err = collator.AddMessage(fm, uid, 80*time.Millisecond) - if err != nil { - t.Fatal(err) - } - if result != nil { - t.Error("Got a result from collator when it should be timing out" + - " submessages") - } - time.Sleep(50 * time.Millisecond) - } - - time.Sleep(80 * time.Millisecond) - if len(collator.pendingMessages) != 0 { - t.Error("Multi-part message didn't get timed out properly") - } -}