diff --git a/e2e/parse/conversation/message.go b/e2e/parse/conversation/message.go index 748256a5d20c83a59476eb9cc91cf1e6030bba79..bc7f92aa3f4bee6b2decd98a85f19e0c020be16e 100644 --- a/e2e/parse/conversation/message.go +++ b/e2e/parse/conversation/message.go @@ -20,25 +20,23 @@ const ( TruncatedMessageIdLen = 8 ) -// MessageId is the ID of a message stored in a Message. -type MessageId [MessageIdLen]byte +// MessageID is the ID of a message stored in a Message. +type MessageID [MessageIdLen]byte -// truncatedMessageId represents the first64 bits of the MessageId. -type truncatedMessageId [TruncatedMessageIdLen]byte +// truncatedMessageID represents the first64 bits of the MessageID. +type truncatedMessageID [TruncatedMessageIdLen]byte -// A Message is the structure held in a ring buffer. -// It represents a received message by the user, which needs -// its reception verified to the original sender of the message. +// Message is the structure held in a ring buffer. It represents a received +// message by the user, which needs its reception verified to the original +// sender of the message. type Message struct { - // id is the sequential ID of the Message in the ring buffer - id uint32 - // The ID of the message - MessageId MessageId + id uint32 // The sequential ID of the Message in the ring buffer + MessageId MessageID // The ID of the message Timestamp time.Time } // newMessage is the constructor for a Message object. -func newMessage(id uint32, mid MessageId, timestamp time.Time) *Message { +func newMessage(id uint32, mid MessageID, timestamp time.Time) *Message { return &Message{ id: id, MessageId: mid, @@ -46,8 +44,8 @@ func newMessage(id uint32, mid MessageId, timestamp time.Time) *Message { } } -// marshal creates a byte buffer containing the serialized information -// of a Message. +// marshal creates a byte buffer containing the serialized information of a +// Message. func (m *Message) marshal() []byte { buff := bytes.NewBuffer(nil) @@ -86,50 +84,47 @@ func unmarshalMessage(data []byte) *Message { MessageId: mid, Timestamp: ts, } - } -// NewMessageIdFromBytes is a constructor for MessageId -// creates a MessageId from byte data. -func NewMessageIdFromBytes(data []byte) MessageId { - mid := MessageId{} +// NewMessageIdFromBytes creates a MessageID from byte data. +func NewMessageIdFromBytes(data []byte) MessageID { + mid := MessageID{} copy(mid[:], data) return mid } -// String returns a base64 encode of the MessageId. This functions -// satisfies the fmt.Stringer interface. -func (mid MessageId) String() string { +// String returns a base 64 encoding of the MessageID. This functions adheres to +// the fmt.Stringer interface. +func (mid MessageID) String() string { return base64.StdEncoding.EncodeToString(mid[:]) } -// truncate converts a MessageId into a truncatedMessageId. -func (mid MessageId) truncate() truncatedMessageId { - return newTruncatedMessageId(mid.Bytes()) +// truncate converts a MessageID into a truncatedMessageID. +func (mid MessageID) truncate() truncatedMessageID { + return newTruncatedMessageID(mid.Bytes()) } -// Bytes returns the byte data of the MessageId. -func (mid MessageId) Bytes() []byte { +// Bytes returns the byte data of the MessageID. +func (mid MessageID) Bytes() []byte { return mid[:] } -// newTruncatedMessageId is a constructor for truncatedMessageId -// creates a truncatedMessageId from byte data. -func newTruncatedMessageId(data []byte) truncatedMessageId { - tmid := truncatedMessageId{} - copy(tmid[:], data) - return tmid +// newTruncatedMessageID creates a truncatedMessageID from byte data. +func newTruncatedMessageID(data []byte) truncatedMessageID { + tmID := truncatedMessageID{} + copy(tmID[:], data) + return tmID } -// String returns a base64 encode of the truncatedMessageId. This functions -// satisfies the fmt.Stringer interface. -func (tmid truncatedMessageId) String() string { - return base64.StdEncoding.EncodeToString(tmid[:]) +// String returns the base 64 encoding of the truncatedMessageID. This functions +// adheres to the fmt.Stringer interface. +func (tmID truncatedMessageID) String() string { + return base64.StdEncoding.EncodeToString(tmID[:]) } -// Bytes returns the byte data of the truncatedMessageId. -func (tmid truncatedMessageId) Bytes() []byte { - return tmid[:] +// Bytes returns the byte data of the truncatedMessageID. +func (tmID truncatedMessageID) Bytes() []byte { + return tmID[:] } diff --git a/e2e/parse/conversation/message_test.go b/e2e/parse/conversation/message_test.go index eb11eabfb99e3d6fb02983af597a3355a852bf40..35cc9403604caeb40f4932cee3ed52a058520945 100644 --- a/e2e/parse/conversation/message_test.go +++ b/e2e/parse/conversation/message_test.go @@ -14,9 +14,9 @@ import ( "time" ) -// TestMessage_MarshalUnmarshal tests whether a marshalled Message deserializes into -// the same Message using unmarshalMessage. -func TestMessage_MarshalUnmarshal(t *testing.T) { +// Tests whether a marshalled Message deserializes into the same Message using +// unmarshalMessage. +func TestMessage_Marshal_unmarshalMessage(t *testing.T) { timestamp := time.Date(2009, 11, 17, 20, 34, 58, 651387237, time.Local) testId := NewMessageIdFromBytes([]byte("messageId123")) @@ -31,55 +31,49 @@ func TestMessage_MarshalUnmarshal(t *testing.T) { unmarshalled := unmarshalMessage(serialized) if !reflect.DeepEqual(unmarshalled, message) { - t.Fatalf("Unmarshal did not output expected data."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", message, unmarshalled) + t.Errorf("Unmarshal did not output expected data."+ + "\nexpected: %v\nreceived: %v", message, unmarshalled) } } -// TestMessageId_truncate tests the MessageId truncate function. -func TestMessageId_truncate(t *testing.T) { - testId := NewMessageIdFromBytes([]byte("This is going to be 32 bytes....")) +// Tests the MessageID truncate function. +func TestMessageID_truncate(t *testing.T) { + testID := NewMessageIdFromBytes([]byte("This is going to be 32 bytes...")) - tmid := testId.truncate() - expected := truncatedMessageId{} - copy(expected[:], testId.Bytes()) - if len(tmid.Bytes()) != TruncatedMessageIdLen { - t.Fatalf("MessageId.Truncate() did not produce a truncatedMessageId of "+ - "TruncatedMessageIdLen (%d)."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", TruncatedMessageIdLen, expected, tmid) + tmID := testID.truncate() + expected := truncatedMessageID{} + copy(expected[:], testID.Bytes()) + if len(tmID.Bytes()) != TruncatedMessageIdLen { + t.Errorf("truncatedMessageID has incorrect length."+ + "\nexpected: %v\nreceived: %v", expected, tmID) } } -// TestNewMessageIdFromBytes tests that NewMessageIdFromBytes -// properly constructs a MessageId. +// Tests that NewMessageIdFromBytes properly constructs a MessageID. func TestNewMessageIdFromBytes(t *testing.T) { - expected := make([]byte, 0, MessageIdLen) - for i := 0; i < MessageIdLen; i++ { - expected = append(expected, byte(i)) + expected := make([]byte, MessageIdLen) + for i := range expected { + expected[i] = byte(i) } + testId := NewMessageIdFromBytes(expected) if !bytes.Equal(expected, testId.Bytes()) { - t.Fatalf("Unexpected output from NewMessageIdFromBytes."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, testId.Bytes()) + t.Errorf("Unexpected output from NewMessageIdFromBytes."+ + "\nexpected: %v\nreceived: %v", expected, testId.Bytes()) } } -// TestNewTruncatedMessageId tests that newTruncatedMessageId -// constructs a proper truncatedMessageId. +// Tests that newTruncatedMessageID constructs a proper truncatedMessageID. func TestNewTruncatedMessageId(t *testing.T) { expected := make([]byte, 0, TruncatedMessageIdLen) for i := 0; i < TruncatedMessageIdLen; i++ { expected = append(expected, byte(i)) } - testId := newTruncatedMessageId(expected) + testId := newTruncatedMessageID(expected) if !bytes.Equal(expected, testId.Bytes()) { - t.Fatalf("Unexpected output from newTruncatedMessageId."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, testId.Bytes()) + t.Fatalf("Unexpected output from newTruncatedMessageID."+ + "\nexpected: %v\nreceived: %v", expected, testId.Bytes()) } } diff --git a/e2e/parse/conversation/partner.go b/e2e/parse/conversation/partner.go index 8bad184d3c1791d2751003f692651ae7112b0854..28349a8edba998c8c47c9f8225a678aaaafd20a6 100644 --- a/e2e/parse/conversation/partner.go +++ b/e2e/parse/conversation/partner.go @@ -15,6 +15,7 @@ import ( "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" "math" + "os" "strings" "sync" ) @@ -40,7 +41,6 @@ type Conversation struct { // conversationDisk stores the public data of Conversation for saving to disk. type conversationDisk struct { - // Public and stored data LastReceivedID uint32 NumReceivedRevolutions uint32 NextSendID uint64 @@ -51,12 +51,10 @@ type conversationDisk struct { // saved to KV, and returned. func LoadOrMakeConversation(kv *versioned.KV, partner *id.ID) *Conversation { c, err := loadConversation(kv, partner) - if err != nil && !strings.Contains(err.Error(), "Failed to Load conversation") { - jww.FATAL.Panicf("Failed to loadOrMakeConversation: %s", err) - } - - // Create new conversation and save to KV if one does not exist - if c == nil { + if err != nil && !(os.IsNotExist(err) || strings.Contains(err.Error(), "object not found")) { + jww.FATAL.Panicf("Failed to load conversation from storage: %+v", err) + } else if c == nil { + // Create new conversation and save to KV if one does not exist c = &Conversation{ lastReceivedID: 0, numReceivedRevolutions: 0, @@ -66,7 +64,8 @@ func LoadOrMakeConversation(kv *versioned.KV, partner *id.ID) *Conversation { } if err = c.save(); err != nil { - jww.FATAL.Panicf("Failed to save new conversation: %s", err) + jww.FATAL.Panicf( + "Failed to save new conversation to storage: %+v", err) } } @@ -85,8 +84,8 @@ func (c *Conversation) ProcessReceivedMessageID(mid uint32) uint64 { c.numReceivedRevolutions++ c.lastReceivedID = mid if err := c.save(); err != nil { - jww.FATAL.Panicf("Failed to save after updating Last "+ - "Received ID in a conversation: %s", err) + jww.FATAL.Panicf("Failed to save after updating last "+ + "received ID in a conversation: %+v", err) } high = c.numReceivedRevolutions @@ -94,8 +93,8 @@ func (c *Conversation) ProcessReceivedMessageID(mid uint32) uint64 { if mid > c.lastReceivedID { c.lastReceivedID = mid if err := c.save(); err != nil { - jww.FATAL.Panicf("Failed to save after updating Last "+ - "Received ID in a conversation: %s", err) + jww.FATAL.Panicf("Failed to save after updating last "+ + "received ID in a conversation: %+v", err) } } high = c.numReceivedRevolutions @@ -122,8 +121,8 @@ func (c *Conversation) GetNextSendID() (uint64, uint32) { old := c.nextSentID c.nextSentID++ if err := c.save(); err != nil { - jww.FATAL.Panicf("Failed to save after incrementing the sendID: %s", - err) + jww.FATAL.Panicf( + "Failed to save after incrementing the sendID: %+v", err) } c.mux.Unlock() return old, uint32(old & 0x00000000FFFFFFFF) @@ -150,7 +149,7 @@ func loadConversation(kv *versioned.KV, partner *id.ID) (*Conversation, error) { return c, nil } -// save saves the Conversation to KV storage. +// save stores the Conversation in storage. func (c *Conversation) save() error { data, err := c.marshal() if err != nil { diff --git a/e2e/parse/conversation/partner_test.go b/e2e/parse/conversation/partner_test.go index 1e4d2097b50ab988a9f87e269ab9daff6bfdc476..9a52791f147f1ad9b69eace2bc6cc3378320f265 100644 --- a/e2e/parse/conversation/partner_test.go +++ b/e2e/parse/conversation/partner_test.go @@ -8,7 +8,6 @@ package conversation import ( - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" @@ -17,10 +16,8 @@ import ( "testing" ) -// Tests happy path of LoadOrMakeConversation() when making a new Conversation. -func TestLoadOrMakeConversation_Make(t *testing.T) { - // Uncomment to print keys that Set and get are called on - jww.SetStdoutThreshold(jww.LevelTrace) +// Tests happy path of LoadOrMakeConversation when making a new Conversation. +func TestLoadOrMakeConversation_New(t *testing.T) { // Set up test values kv := versioned.NewKV(make(ekv.Memstore)) partner := id.NewIdFromString("partner ID", id.User, t) @@ -37,12 +34,12 @@ func TestLoadOrMakeConversation_Make(t *testing.T) { // Check that the result matches the expected Conversation if !reflect.DeepEqual(expectedConv, conv) { - t.Errorf("LoadOrMakeConversation() made unexpected Conversation."+ - "\n\texpected: %+v\n\treceived: %+v", expectedConv, conv) + t.Errorf("LoadOrMakeConversation made unexpected Conversation."+ + "\nexpected: %+v\nreceived: %+v", expectedConv, conv) } } -// Tests happy path of LoadOrMakeConversation() when loading a Conversation. +// Tests happy path of LoadOrMakeConversation when loading a Conversation. func TestLoadOrMakeConversation_Load(t *testing.T) { // Set up test values kv := versioned.NewKV(make(ekv.Memstore)) @@ -54,12 +51,12 @@ func TestLoadOrMakeConversation_Load(t *testing.T) { // Check that the result matches the expected Conversation if !reflect.DeepEqual(expectedConv, conv) { - t.Errorf("LoadOrMakeConversation() made unexpected Conversation."+ - "\n\texpected: %+v\n\treceived: %+v", expectedConv, conv) + t.Errorf("LoadOrMakeConversation made unexpected Conversation."+ + "\nexpected: %+v\nreceived: %+v", expectedConv, conv) } } -// Tests case 1 of Conversation.ProcessReceivedMessageID(). +// Tests case 1 of Conversation.ProcessReceivedMessageID. func TestConversation_ProcessReceivedMessageID_Case_1(t *testing.T) { // Set up test values mid := uint32(5) @@ -74,18 +71,16 @@ func TestConversation_ProcessReceivedMessageID_Case_1(t *testing.T) { result := conv.ProcessReceivedMessageID(mid) if result != expectedResult { - t.Errorf("ProcessReceivedMessageID() did not product the expected "+ - "result.\n\texpected: %+v\n\trecieved: %+v", - expectedResult, result) + t.Errorf("ProcessReceivedMessageID did not product the expected "+ + "result.\nexpected: %+v\n\trecieved: %+v", expectedResult, result) } if !reflect.DeepEqual(expectedConv, conv) { - t.Errorf("ProcessReceivedMessageID() did not product the expected "+ - "Conversation.\n\texpected: %+v\n\trecieved: %+v", - expectedConv, conv) + t.Errorf("ProcessReceivedMessageID did not product the expected "+ + "Conversation.\nexpected: %+v\n\trecieved: %+v", expectedConv, conv) } } -// Tests case 0 of Conversation.ProcessReceivedMessageID(). +// Tests case 0 of Conversation.ProcessReceivedMessageID. func TestConversation_ProcessReceivedMessageID_Case_0(t *testing.T) { // Set up test values mid := uint32(5) @@ -98,18 +93,16 @@ func TestConversation_ProcessReceivedMessageID_Case_0(t *testing.T) { result := conv.ProcessReceivedMessageID(mid) if result != expectedResult { - t.Errorf("ProcessReceivedMessageID() did not product the expected "+ - "result.\n\texpected: %+v\n\trecieved: %+v", - expectedResult, result) + t.Errorf("ProcessReceivedMessageID did not product the expected "+ + "result.\nexpected: %+v\n\trecieved: %+v", expectedResult, result) } if !reflect.DeepEqual(expectedConv, conv) { - t.Errorf("ProcessReceivedMessageID() did not product the expected "+ - "Conversation.\n\texpected: %+v\n\trecieved: %+v", - expectedConv, conv) + t.Errorf("ProcessReceivedMessageID did not product the expected "+ + "Conversation.\nexpected: %+v\n\trecieved: %+v", expectedConv, conv) } } -// Tests case -1 of Conversation.ProcessReceivedMessageID(). +// Tests case -1 of Conversation.ProcessReceivedMessageID. func TestConversation_ProcessReceivedMessageID_Case_Neg1(t *testing.T) { // Set up test values mid := uint32(topRegion + 5) @@ -123,18 +116,16 @@ func TestConversation_ProcessReceivedMessageID_Case_Neg1(t *testing.T) { result := conv.ProcessReceivedMessageID(mid) if result != expectedResult { - t.Errorf("ProcessReceivedMessageID() did not product the expected "+ - "result.\n\texpected: %+v\n\trecieved: %+v", - expectedResult, result) + t.Errorf("ProcessReceivedMessageID did not product the expected "+ + "result.\nexpected: %+v\n\trecieved: %+v", expectedResult, result) } if !reflect.DeepEqual(expectedConv, conv) { - t.Errorf("ProcessReceivedMessageID() did not product the expected "+ - "Conversation.\n\texpected: %+v\n\trecieved: %+v", - expectedConv, conv) + t.Errorf("ProcessReceivedMessageID did not product the expected "+ + "Conversation.\nexpected: %+v\n\trecieved: %+v", expectedConv, conv) } } -// Tests happy path of Conversation.GetNextSendID(). +// Tests happy path of Conversation.GetNextSendID. func TestConversation_GetNextSendID(t *testing.T) { // Set up test values kv := versioned.NewKV(make(ekv.Memstore)) @@ -146,42 +137,42 @@ func TestConversation_GetNextSendID(t *testing.T) { fullID, truncID := conv.GetNextSendID() if fullID != i { t.Errorf("Returned incorrect full sendID."+ - "\n\texpected: %d\n\treceived: %d", i, fullID) + "\nexpected: %d\nreceived: %d", i, fullID) } if truncID != uint32(i) { t.Errorf("Returned incorrect truncated sendID."+ - "\n\texpected: %d\n\treceived: %d", uint32(i), truncID) + "\nexpected: %d\nreceived: %d", uint32(i), truncID) } } } -// Tests the happy path of save() and loadConversation(). +// Tests the happy path of save and loadConversation. func TestConversation_save_load(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) partner := id.NewIdFromString("partner ID", id.User, t) expectedConv := makeRandomConv(kv, partner) - expectedErr := "loadConversation() produced an error: Failed to Load " + + expectedErr := "loadConversation produced an error: Failed to Load " + "conversation: object not found" err := expectedConv.save() if err != nil { - t.Errorf("save() produced an error: %v", err) + t.Errorf("save produced an error: %v", err) } testConv, err := loadConversation(kv, partner) if err != nil { - t.Errorf("loadConversation() produced an error: %v", err) + t.Errorf("loadConversation produced an error: %v", err) } if !reflect.DeepEqual(expectedConv, testConv) { t.Errorf("saving and loading Conversation failed."+ - "\n\texpected: %+v\n\treceived: %+v", expectedConv, testConv) + "\nexpected: %+v\nreceived: %+v", expectedConv, testConv) } _, err = loadConversation(versioned.NewKV(make(ekv.Memstore)), partner) if err == nil { - t.Errorf("loadConversation() failed to produce an error."+ - "\n\texpected: %s\n\treceived: %v", expectedErr, nil) + t.Errorf("loadConversation failed to produce an error."+ + "\nexpected: %s\nreceived: %v", expectedErr, nil) } } @@ -200,7 +191,7 @@ func TestConversation_Delete(t *testing.T) { } if err := conv.delete(); err != nil { - t.Errorf("delete() produced an error: %+v", err) + t.Errorf("delete produced an error: %+v", err) } if _, err := loadConversation(kv, partner); err == nil { @@ -208,7 +199,7 @@ func TestConversation_Delete(t *testing.T) { } } -// Tests the happy path of marshal() and unmarshal(). +// Tests the happy path of marshal and unmarshal. func TestConversation_marshal_unmarshal(t *testing.T) { expectedConv := makeRandomConv(versioned.NewKV(make(ekv.Memstore)), id.NewIdFromString("partner ID", id.User, t)) @@ -216,17 +207,17 @@ func TestConversation_marshal_unmarshal(t *testing.T) { data, err := expectedConv.marshal() if err != nil { - t.Errorf("marshal() returned an error: %v", err) + t.Errorf("marshal returned an error: %v", err) } err = testConv.unmarshal(data) if err != nil { - t.Errorf("unmarshal() returned an error: %v", err) + t.Errorf("unmarshal returned an error: %v", err) } if !reflect.DeepEqual(expectedConv, testConv) { t.Errorf("marshaling and unmarshaling Conversation failed."+ - "\n\texpected: %+v\n\treceived: %+v", expectedConv, testConv) + "\nexpected: %+v\nreceived: %+v", expectedConv, testConv) } } diff --git a/e2e/parse/conversation/ring.go b/e2e/parse/conversation/ring.go index 320e93d4e459f8561ebfaa6b70aa6f9c82f5a435..7e255451bae9eae4fbada051fc85ea4a1bb4067f 100644 --- a/e2e/parse/conversation/ring.go +++ b/e2e/parse/conversation/ring.go @@ -33,14 +33,14 @@ const ( loadMessageErr = "failed to load message with truncated ID %s from storage: %+v" loadBuffErr = "failed to load ring buffer from storage: %+v" noMessageFoundErr = "failed to find message with message ID %s" - lookupTooOldErr = "requested ID %d is lower than oldest id %d" - lookupPastRecentErr = "requested id %d is higher than most recent id %d" + lookupTooOldErr = "requested ID %d is lower than oldest ID %d" + lookupPastRecentErr = "requested ID %d is higher than most recent ID %d" ) // Buff is a circular buffer which containing Message's. type Buff struct { buff []*Message - lookup map[truncatedMessageId]*Message + lookup map[truncatedMessageID]*Message oldest, newest uint32 mux sync.RWMutex kv *versioned.KV @@ -53,7 +53,7 @@ func NewBuff(kv *versioned.KV, n int) (*Buff, error) { // Construct object rb := &Buff{ buff: make([]*Message, n), - lookup: make(map[truncatedMessageId]*Message, n), + lookup: make(map[truncatedMessageID]*Message, n), oldest: 0, // Set to max int since index is unsigned. // Upon first insert, index will overflow back to zero. @@ -66,36 +66,35 @@ func NewBuff(kv *versioned.KV, n int) (*Buff, error) { } // Add pushes a message to the circular buffer Buff. -func (rb *Buff) Add(id MessageId, timestamp time.Time) error { - rb.mux.Lock() - defer rb.mux.Unlock() - rb.push(&Message{ +func (b *Buff) Add(id MessageID, timestamp time.Time) error { + b.mux.Lock() + defer b.mux.Unlock() + b.push(&Message{ MessageId: id, Timestamp: timestamp, }) - return rb.save() + return b.save() } // Get retrieves the most recent entry. -func (rb *Buff) Get() *Message { - rb.mux.RLock() - defer rb.mux.RUnlock() - - mostRecentIndex := rb.newest % uint32(len(rb.buff)) - return rb.buff[mostRecentIndex] +func (b *Buff) Get() *Message { + b.mux.RLock() + defer b.mux.RUnlock() + mostRecentIndex := b.newest % uint32(len(b.buff)) + return b.buff[mostRecentIndex] } -// GetByMessageId looks up and returns the message with MessageId id from -// Buff.lookup. If the message does not exist, an error is returned. -func (rb *Buff) GetByMessageId(id MessageId) (*Message, error) { - rb.mux.RLock() - defer rb.mux.RUnlock() +// GetByMessageID looks up and returns the message with MessageID ID from the +// lookup map. If the message does not exist, an error is returned. +func (b *Buff) GetByMessageID(id MessageID) (*Message, error) { + b.mux.RLock() + defer b.mux.RUnlock() // Look up message - msg, exists := rb.lookup[id.truncate()] - if !exists { // If message not found, return an error + msg, exists := b.lookup[id.truncate()] + if !exists { return nil, errors.Errorf(noMessageFoundErr, id) } @@ -103,66 +102,65 @@ func (rb *Buff) GetByMessageId(id MessageId) (*Message, error) { return msg, nil } -// GetNextMessage looks up the Message with the next sequential Message.id -// in the ring buffer after the Message with the requested MessageId. -func (rb *Buff) GetNextMessage(id MessageId) (*Message, error) { - rb.mux.RLock() - defer rb.mux.RUnlock() +// GetNextMessage looks up the Message with the next sequential MessageID in the +// ring buffer after the Message with the requested MessageID. +func (b *Buff) GetNextMessage(id MessageID) (*Message, error) { + b.mux.RLock() + defer b.mux.RUnlock() // Look up message - msg, exists := rb.lookup[id.truncate()] - if !exists { // If message not found, return an error + msg, exists := b.lookup[id.truncate()] + if !exists { return nil, errors.Errorf(noMessageFoundErr, id) } lookupId := msg.id + 1 - // Check it's not before our first known id - if lookupId < rb.oldest { - return nil, errors.Errorf(lookupTooOldErr, id, rb.oldest) + // Check that it is not before our first known ID + if lookupId < b.oldest { + return nil, errors.Errorf(lookupTooOldErr, id, b.oldest) } - // Check it's not after our last known id - if lookupId > rb.newest { - return nil, errors.Errorf(lookupPastRecentErr, id, rb.newest) + // Check that it is not after our last known ID + if lookupId > b.newest { + return nil, errors.Errorf(lookupPastRecentErr, id, b.newest) } - return rb.buff[(lookupId % uint32(len(rb.buff)))], nil + return b.buff[(lookupId % uint32(len(b.buff)))], nil } -// next is a helper function for Buff, which handles incrementing -// the old & new markers. -func (rb *Buff) next() { - rb.newest++ - if rb.newest >= uint32(len(rb.buff)) { - rb.oldest++ +// next handles incrementing the old and new markers. +func (b *Buff) next() { + b.newest++ + if b.newest >= uint32(len(b.buff)) { + b.oldest++ } } -// push adds a Message to the Buff, clearing the overwritten message from -// both the buff and the lookup structures. -func (rb *Buff) push(val *Message) { +// push adds a Message to the Buff, clearing the overwritten message from both +// the buff and the lookup structures. +func (b *Buff) push(val *Message) { // Update circular buffer trackers - rb.next() + b.next() - val.id = rb.newest + val.id = b.newest // Handle overwrite of the oldest message - rb.handleMessageOverwrite() + b.handleMessageOverwrite() // Set message in RAM - rb.buff[rb.newest%uint32(len(rb.buff))] = val - rb.lookup[val.MessageId.truncate()] = val + b.buff[b.newest%uint32(len(b.buff))] = val + b.lookup[val.MessageId.truncate()] = val } -// handleMessageOverwrite is a helper function which deletes the message -// that will be overwritten by push from the lookup structure. -func (rb *Buff) handleMessageOverwrite() { - overwriteIndex := rb.newest % uint32(len(rb.buff)) - messageToOverwrite := rb.buff[overwriteIndex] +// handleMessageOverwrite deletes the message that will be overwritten by push +// from the lookup structure. +func (b *Buff) handleMessageOverwrite() { + overwriteIndex := b.newest % uint32(len(b.buff)) + messageToOverwrite := b.buff[overwriteIndex] if messageToOverwrite != nil { - delete(rb.lookup, messageToOverwrite.MessageId.truncate()) + delete(b.lookup, messageToOverwrite.MessageId.truncate()) } } @@ -170,8 +168,8 @@ func (rb *Buff) handleMessageOverwrite() { // Storage Functions // //////////////////////////////////////////////////////////////////////////////// -// LoadBuff loads the ring buffer from storage. It loads all -// messages from storage and repopulates the buffer. +// LoadBuff loads the ring buffer from storage. It loads all messages from +// storage and repopulates the buffer. func LoadBuff(kv *versioned.KV) (*Buff, error) { kv = kv.Prefix(ringBuffPrefix) @@ -187,7 +185,7 @@ func LoadBuff(kv *versioned.KV) (*Buff, error) { // Construct buffer rb := &Buff{ buff: make([]*Message, len(list)), - lookup: make(map[truncatedMessageId]*Message, len(list)), + lookup: make(map[truncatedMessageID]*Message, len(list)), oldest: oldest, newest: newest, mux: sync.RWMutex{}, @@ -195,14 +193,14 @@ func LoadBuff(kv *versioned.KV) (*Buff, error) { } // Load each message from storage - for i, tmid := range list { - msg, err := loadMessage(tmid, kv) + for i, tmID := range list { + msg, err := loadMessage(tmID, kv) if err != nil { return nil, err } - // Place message into reconstructed buffer (RAM) - rb.lookup[tmid] = msg + // Place message into reconstructed buffer in memory + rb.lookup[tmID] = msg rb.buff[i] = msg } @@ -210,55 +208,51 @@ func LoadBuff(kv *versioned.KV) (*Buff, error) { } // save stores the ring buffer and its elements to storage. -// NOTE: save is unsafe, a lock should be held by the caller. -func (rb *Buff) save() error { +// NOTE: This function is not thread-safe; a lock should be held by the caller. +func (b *Buff) save() error { // Save each message individually to storage - for _, msg := range rb.buff { + for _, msg := range b.buff { if msg != nil { - if err := rb.saveMessage(msg); err != nil { - return errors.Errorf(saveMessageErr, - msg.MessageId, err) + if err := b.saveMessage(msg); err != nil { + return errors.Errorf(saveMessageErr, msg.MessageId, err) } } } - return rb.saveBuff() + return b.saveBuff() } -// saveBuff is a function which saves the marshalled Buff. -func (rb *Buff) saveBuff() error { +// saveBuff saves the marshalled Buff to storage. +func (b *Buff) saveBuff() error { obj := &versioned.Object{ Version: ringBuffVersion, Timestamp: netTime.Now(), - Data: rb.marshal(), + Data: b.marshal(), } - return rb.kv.Set(ringBuffKey, ringBuffVersion, obj) - + return b.kv.Set(ringBuffKey, ringBuffVersion, obj) } -// marshal creates a byte buffer containing serialized information -// on the Buff. -func (rb *Buff) marshal() []byte { +// marshal creates a byte buffer containing serialized information on the Buff. +func (b *Buff) marshal() []byte { // Create buffer of proper size - // (newest (4 bytes) + oldest (4 bytes) + - // (TruncatedMessageIdLen * length of buffer) + // (newest (4) + oldest (4) + (TruncatedMessageIdLen * length of buffer)) buff := bytes.NewBuffer(nil) - buff.Grow(4 + 4 + (TruncatedMessageIdLen * len(rb.lookup))) + buff.Grow(4 + 4 + (TruncatedMessageIdLen * len(b.lookup))) // Write newest index into buffer - b := make([]byte, 4) - binary.LittleEndian.PutUint32(b, uint32(rb.newest)) - buff.Write(b) + bb := make([]byte, 4) + binary.LittleEndian.PutUint32(bb, b.newest) + buff.Write(bb) // Write oldest index into buffer - b = make([]byte, 4) - binary.LittleEndian.PutUint32(b, uint32(rb.oldest)) - buff.Write(b) + bb = make([]byte, 4) + binary.LittleEndian.PutUint32(bb, b.oldest) + buff.Write(bb) // Write the truncated message IDs into buffer - for _, msg := range rb.buff { + for _, msg := range b.buff { if msg != nil { buff.Write(msg.MessageId.truncate().Bytes()) } @@ -268,8 +262,8 @@ func (rb *Buff) marshal() []byte { } // unmarshalBuffer unmarshalls a byte slice into Buff information. -func unmarshalBuffer(b []byte) (newest, oldest uint32, - list []truncatedMessageId) { +func unmarshalBuffer(b []byte) ( + newest, oldest uint32, list []truncatedMessageID) { buff := bytes.NewBuffer(b) // Read the newest index from the buffer @@ -279,36 +273,36 @@ func unmarshalBuffer(b []byte) (newest, oldest uint32, oldest = binary.LittleEndian.Uint32(buff.Next(4)) // Initialize list to the number of truncated IDs - list = make([]truncatedMessageId, 0, buff.Len()/TruncatedMessageIdLen) + list = make([]truncatedMessageID, 0, buff.Len()/TruncatedMessageIdLen) - // Read each truncatedMessageId and save into list - for next := buff.Next(TruncatedMessageIdLen); len(next) == TruncatedMessageIdLen; next = buff.Next(TruncatedMessageIdLen) { - list = append(list, newTruncatedMessageId(next)) + // Read each truncatedMessageID and save into list + const n = TruncatedMessageIdLen + for next := buff.Next(n); len(next) == n; next = buff.Next(n) { + list = append(list, newTruncatedMessageID(next)) } return } -// saveMessage saves a Message to storage, using the truncatedMessageId -// as the KV key. -func (rb *Buff) saveMessage(msg *Message) error { +// saveMessage saves a Message to storage. +func (b *Buff) saveMessage(msg *Message) error { obj := &versioned.Object{ Version: messageVersion, Timestamp: netTime.Now(), Data: msg.marshal(), } - return rb.kv.Set( + return b.kv.Set( makeMessageKey(msg.MessageId.truncate()), messageVersion, obj) } -// loadMessage loads a message given truncatedMessageId from storage. -func loadMessage(tmid truncatedMessageId, kv *versioned.KV) (*Message, error) { +// loadMessage loads a message given truncatedMessageID from storage. +func loadMessage(tmID truncatedMessageID, kv *versioned.KV) (*Message, error) { // Load message from storage - vo, err := kv.Get(makeMessageKey(tmid), messageVersion) + vo, err := kv.Get(makeMessageKey(tmID), messageVersion) if err != nil { - return nil, errors.Errorf(loadMessageErr, tmid, err) + return nil, errors.Errorf(loadMessageErr, tmID, err) } // Unmarshal message @@ -316,6 +310,6 @@ func loadMessage(tmid truncatedMessageId, kv *versioned.KV) (*Message, error) { } // makeMessageKey generates te key used to save a message to storage. -func makeMessageKey(tmid truncatedMessageId) string { - return messageKey + tmid.String() +func makeMessageKey(tmID truncatedMessageID) string { + return messageKey + tmID.String() } diff --git a/e2e/parse/conversation/ring_test.go b/e2e/parse/conversation/ring_test.go index 76624bc5ac56e881f75c2b11afa36ed5baa29459..e9174d366681c427b33bfb626c51a292001a1f30 100644 --- a/e2e/parse/conversation/ring_test.go +++ b/e2e/parse/conversation/ring_test.go @@ -23,35 +23,29 @@ func TestNewBuff(t *testing.T) { buffLen := 20 testBuff, err := NewBuff(kv, buffLen) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } - /// Check buffer was initialized to expected length + // Check buffer was initialized to expected length if len(testBuff.buff) != buffLen { - t.Fatalf("NewBuff did not produce buffer of "+ - "expected size. "+ - "\n\tExpected: %d"+ - "\n\tReceived slice size: %v", + t.Errorf("New Buff has incorrect length.\nexpected: %d\nreceived: %d", buffLen, len(testBuff.lookup)) } // Check that buffer exists in KV _, err = kv.Prefix(ringBuffPrefix).Get(ringBuffKey, ringBuffVersion) if err != nil { - t.Fatalf("Could not pull Buff from KV: %v", err) + t.Errorf("Failed to load Buff from KV: %+v", err) } - } -// TestBuff_Add tests whether Buff.Add properly adds to the Buff object. -// This includes modifying the Buff.buff, buff.lookup and proper index updates. +// Tests that Buff.Add properly adds to the Buff object. This includes modifying +// the Buff.buff, buff.lookup, and proper index updates. func TestBuff_Add(t *testing.T) { // Initialize buffer - kv := versioned.NewKV(make(ekv.Memstore)) - buffLen := 20 - testBuff, err := NewBuff(kv, buffLen) + testBuff, err := NewBuff(versioned.NewKV(make(ekv.Memstore)), 20) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } // Insert initial message @@ -59,18 +53,19 @@ func TestBuff_Add(t *testing.T) { mid := NewMessageIdFromBytes([]byte("test")) err = testBuff.Add(mid, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Add returned an error: %+v", err) } // Check that map entries exist if len(testBuff.lookup) != 1 { - t.Fatalf("Message was not added to buffer's map") + t.Errorf("Incorrect length: message was not added to buffer's map."+ + "\nexpected: %d\nreceived: %d", 1, len(testBuff.lookup)) } // Check that expected entry exists in the map received, exists := testBuff.lookup[mid.truncate()] if !exists { - t.Fatalf("Message does not exist in buffer after add.") + t.Error("Message does not exist in buffer after add.") } // Reconstruct added message @@ -82,34 +77,30 @@ func TestBuff_Add(t *testing.T) { // Check map for inserted Message if !reflect.DeepEqual(expected, received) { - t.Fatalf("Expected Message not found in map."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, received) + t.Errorf("Expected message not found in map."+ + "\nexpected: %+v\nreceived: %+v", expected, received) } // Check buffer for inserted Message if !reflect.DeepEqual(testBuff.buff[0], expected) { - t.Fatalf("Expected message not found in buffer."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, testBuff.buff[0]) + t.Errorf("Expected message not found in buffer."+ + "\nexpected: %+v\nreceived: %+v", expected, testBuff.buff[0]) } // Check that newest index was updated if testBuff.newest != 0 { - t.Fatalf("Buffer's newest index was not updated to expected value."+ - "\n\tExpected: %d"+ - "\n\tReceived: %d", 0, testBuff.newest) + t.Errorf("Buffer's newest index was not updated to expected value."+ + "\nexpected: %d\nreceived: %d", 0, testBuff.newest) } } -// TestBuff_Add_Overflow inserts buffer length + 1 Message's to the buffer -// and ensures the oldest value is overwritten. +// Inserts buffer length + 1 Message's to the buffer and ensures the oldest +// value is overwritten. func TestBuff_Add_Overflow(t *testing.T) { - kv := versioned.NewKV(make(ekv.Memstore)) buffLen := 20 - testBuff, err := NewBuff(kv, buffLen) + testBuff, err := NewBuff(versioned.NewKV(make(ekv.Memstore)), buffLen) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } // Insert message to be overwritten @@ -117,7 +108,7 @@ func TestBuff_Add_Overflow(t *testing.T) { oldest := NewMessageIdFromBytes([]byte("will be overwritten")) err = testBuff.Add(oldest, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } // Insert buffLen elements to overwrite element inserted above @@ -126,39 +117,34 @@ func TestBuff_Add_Overflow(t *testing.T) { mid := NewMessageIdFromBytes([]byte(strconv.Itoa(i))) err = testBuff.Add(mid, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } if testBuff.newest != uint32(i+1) { - t.Fatalf("Buffer's newest index was not updated for insert."+ - "\n\tExpected: %d"+ - "\n\tReceived: %d", i+1, testBuff.newest) + t.Errorf("Buffer's newest index was not updated for insert."+ + "\nexpected: %d\nreceived: %d", i+1, testBuff.newest) } } // Test that the oldest index has been updated if testBuff.oldest != 1 { - t.Fatalf("Buffer's oldest index was not updated to expected value."+ - "\n\tExpected: %d"+ - "\n\tReceived: %d", 1, testBuff.oldest) + t.Errorf("Buffer's oldest index was not updated to expected value."+ + "\nexpected: %d\nreceived: %d", 1, testBuff.oldest) } // Check that oldest value no longer exists in map _, exists := testBuff.lookup[oldest.truncate()] if exists { - t.Fatalf("Oldest value expected to be overwritten in map!") + t.Errorf("Oldest value expected to be overwritten in map!") } - } -// TestBuff_Get tests that Buff.Get returns the latest inserted Message. +// Tests that Buff.Get returns the latest inserted Message. func TestBuff_Get(t *testing.T) { // Initialize buffer - kv := versioned.NewKV(make(ekv.Memstore)) - buffLen := 20 - testBuff, err := NewBuff(kv, buffLen) + testBuff, err := NewBuff(versioned.NewKV(make(ekv.Memstore)), 20) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } // Insert initial message @@ -166,7 +152,7 @@ func TestBuff_Get(t *testing.T) { mid := NewMessageIdFromBytes([]byte("test")) err = testBuff.Add(mid, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } // Reconstruct expected message @@ -176,14 +162,13 @@ func TestBuff_Get(t *testing.T) { id: 0, } - // Retrieve newly inserted value using get() + // Retrieve newly inserted value using get received := testBuff.Get() // Check that retrieved value is expected if !reflect.DeepEqual(received, expected) { - t.Fatalf("get() did not retrieve expected value."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, received) + t.Errorf("Get did not retrieve expected value."+ + "\nexpected: %+v\nreceived: %+v", expected, received) } // Construct new message to insert @@ -197,27 +182,23 @@ func TestBuff_Get(t *testing.T) { // Add new message to buffer err = testBuff.Add(newlyInsertedMid, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } - // Ensure newly inserted message is returned by get() + // Ensure newly inserted message is returned by get if !reflect.DeepEqual(testBuff.Get(), newlyInserted) { - t.Fatalf("get() did not retrieve expected value."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, received) + t.Errorf("Get did not retrieve expected value."+ + "\nexpected: %+v\nreceived: %+v", expected, received) } - } -// TestBuff_GetByMessageId tests that Buff.GetByMessageId returns the Message with -// the requested MessageId. -func TestBuff_GetByMessageId(t *testing.T) { +// Tests that Buff.GetByMessageID returns the Message with the requested +// MessageID. +func TestBuff_GetByMessageID(t *testing.T) { // Initialize buffer - kv := versioned.NewKV(make(ekv.Memstore)) - buffLen := 20 - testBuff, err := NewBuff(kv, buffLen) + testBuff, err := NewBuff(versioned.NewKV(make(ekv.Memstore)), 20) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } // Insert initial message @@ -225,7 +206,7 @@ func TestBuff_GetByMessageId(t *testing.T) { mid := NewMessageIdFromBytes([]byte("test")) err = testBuff.Add(mid, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } // Reconstruct expected message @@ -236,50 +217,48 @@ func TestBuff_GetByMessageId(t *testing.T) { } // Retrieve message using getter - received, err := testBuff.GetByMessageId(mid) + received, err := testBuff.GetByMessageID(mid) if err != nil { - t.Fatalf("GetMessageId error: %v", err) + t.Errorf("GetByMessageID error: %+v", err) } // Check retrieved value matches expected if !reflect.DeepEqual(received, expected) { - t.Fatalf("GetByMessageId retrieved unexpected value."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, received) + t.Errorf("GetByMessageID retrieved unexpected value."+ + "\nexpected: %+v\nreceived: %+v", expected, received) } } -// TestBuff_GetByMessageId_Error tests that Buff.GetByMessageId returns an error -// when requesting a MessageId that does not exist in Buff. -func TestBuff_GetByMessageId_Error(t *testing.T) { +// Tests that Buff.GetByMessageID returns an error when requesting a MessageID +// that does not exist in Buff. +func TestBuff_GetByMessageID_Error(t *testing.T) { // Initialize buffer kv := versioned.NewKV(make(ekv.Memstore)) buffLen := 20 testBuff, err := NewBuff(kv, buffLen) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } - uninsertedMid := NewMessageIdFromBytes([]byte("test")) + unInsertedMid := NewMessageIdFromBytes([]byte("test")) - // Un-inserted MessageId should not exist in Buff, causing an error - _, err = testBuff.GetByMessageId(uninsertedMid) + // Un-inserted MessageID should not exist in Buff, causing an error + _, err = testBuff.GetByMessageID(unInsertedMid) if err == nil { - t.Fatalf("GetByMessageId should error when requesting a " + - "MessageId not in the buffer") + t.Errorf("GetByMessageID should error when requesting a MessageID " + + "not in the buffer.") } } -// TestBuff_GetNextMessage tests whether func TestBuff_GetNextMessage(t *testing.T) { // Initialize buffer kv := versioned.NewKV(make(ekv.Memstore)) buffLen := 20 testBuff, err := NewBuff(kv, buffLen) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } // Insert initial message @@ -287,14 +266,14 @@ func TestBuff_GetNextMessage(t *testing.T) { oldMsgId := NewMessageIdFromBytes([]byte("test")) err = testBuff.Add(oldMsgId, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } // Insert next message nextMsgId := NewMessageIdFromBytes([]byte("test2")) err = testBuff.Add(nextMsgId, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } // Construct expected message (the newest message) @@ -307,26 +286,23 @@ func TestBuff_GetNextMessage(t *testing.T) { // Retrieve message after the old message received, err := testBuff.GetNextMessage(oldMsgId) if err != nil { - t.Fatalf("GetNextMessage error: %v", err) + t.Errorf("GetNextMessage returned an error: %+v", err) } if !reflect.DeepEqual(expected, received) { - t.Fatalf("GetNextMessage did not retrieve expected value."+ - "\n\tExpected: %v"+ - "\n\tReceived: %v", expected, received) + t.Errorf("GetNextMessage did not retrieve expected value."+ + "\nexpected: %+v\nreceived: %+v", expected, received) } } -// TestBuff_marshalUnmarshal tests that the Buff's marshal and unmarshalBuffer functionality -// are inverse methods. func TestLoadBuff(t *testing.T) { // Initialize buffer kv := versioned.NewKV(make(ekv.Memstore)) buffLen := 20 testBuff, err := NewBuff(kv, buffLen) if err != nil { - t.Fatalf("NewBuff error: %v", err) + t.Errorf("Failed to make new Buff: %+v", err) } // Insert buffLen elements to overwrite element inserted above @@ -335,18 +311,18 @@ func TestLoadBuff(t *testing.T) { mid := NewMessageIdFromBytes([]byte(strconv.Itoa(i))) err = testBuff.Add(mid, timestamp) if err != nil { - t.Fatalf("Add error: %v", err) + t.Errorf("Failed to add message to buffer: %+v", err) } } // Load buffer from storage received, err := LoadBuff(kv) if err != nil { - t.Fatalf("LoadBuff error: %v", err) + t.Errorf("LoadBuff returned an error: %+v", err) } if reflect.DeepEqual(testBuff, received) { - t.Fatalf("Loaded buffer does not match stored.") + t.Errorf("Loaded buffer does not match stored."+ + "\nexpected: %+v\nreceived: %+v", testBuff, received) } - } diff --git a/e2e/parse/conversation/store.go b/e2e/parse/conversation/store.go index aeaa1a42bc8339489d5e6927edb9e9673465d0ad..b698d473f95c9af0e64cd6bfc1bc6f51a1789b16 100644 --- a/e2e/parse/conversation/store.go +++ b/e2e/parse/conversation/store.go @@ -22,17 +22,16 @@ type Store struct { mux sync.RWMutex } -// NewStore returns a new conversation store made off of the KV. +// NewStore returns a new conversation Store made off of the KV. func NewStore(kv *versioned.KV) *Store { - kv = kv.Prefix(conversationKeyPrefix) return &Store{ loadedConversations: make(map[id.ID]*Conversation), - kv: kv, + kv: kv.Prefix(conversationKeyPrefix), } } -// Get gets the conversation with the given partner ID from RAM, if it is there. -// Otherwise, it loads it from disk. +// Get gets the conversation with the given partner ID from memory, if it is +// there. Otherwise, it loads it from disk. func (s *Store) Get(partner *id.ID) *Conversation { s.mux.RLock() c, ok := s.loadedConversations[*partner] @@ -50,25 +49,25 @@ func (s *Store) Get(partner *id.ID) *Conversation { return c } -// delete deletes the conversation with the given partner ID from memory and +// Delete deletes the conversation with the given partner ID from memory and // storage. Panics if the object cannot be deleted from storage. func (s *Store) Delete(partner *id.ID) { s.mux.Lock() defer s.mux.Unlock() - // get contact from memory + // Get contact from memory c, exists := s.loadedConversations[*partner] if !exists { return } - // delete contact from storage + // Delete contact from storage err := c.delete() if err != nil { - jww.FATAL.Panicf("Failed to remover conversation with ID %s from "+ + jww.FATAL.Panicf("Failed to remove conversation with ID %s from "+ "storage: %+v", partner, err) } - // delete contact from memory + // Delete contact from memory delete(s.loadedConversations, *partner) } diff --git a/e2e/parse/conversation/store_test.go b/e2e/parse/conversation/store_test.go index f4d109d98e67a504844bdd698586e2f47e74cc69..16598c3cd268194c32f89eba021ccd11e4e7733b 100644 --- a/e2e/parse/conversation/store_test.go +++ b/e2e/parse/conversation/store_test.go @@ -8,7 +8,6 @@ package conversation import ( - jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" @@ -16,50 +15,38 @@ import ( "testing" ) -// Read jww trace output to determine if key names are ok -func TestStore_Get_Prefix(t *testing.T) { - // Uncomment to print keys that Set and get are called on - jww.SetStdoutThreshold(jww.LevelTrace) - - // It's a conversation with a partner, so does there need to be an additional layer of hierarchy here later? - rootKv := versioned.NewKV(make(ekv.Memstore)) - store := NewStore(rootKv) - conv := store.Get(id.NewIdFromUInt(8, id.User, t)) - t.Log(conv) -} - // Happy path. func TestStore_Delete(t *testing.T) { kv := versioned.NewKV(make(ekv.Memstore)) store := NewStore(kv) - pids := make([]*id.ID, 10) + pIDs := make([]*id.ID, 10) // Generate list of IDs - for i := range pids { - pids[i] = id.NewIdFromUInt(rand.Uint64(), id.User, t) + for i := range pIDs { + pIDs[i] = id.NewIdFromUInt(rand.Uint64(), id.User, t) } // Add IDs to storage and memory - for _, pid := range pids { + for _, pid := range pIDs { store.Get(pid) } - // delete conversations with IDs with even numbered indexes - for i := 0; i < len(pids); i += 2 { - store.Delete(pids[i]) + // Delete conversations with IDs with even numbered indexes + for i := 0; i < len(pIDs); i += 2 { + store.Delete(pIDs[i]) } // Ensure even numbered conversation were deleted and all others still exist - for i, pid := range pids { + for i, pid := range pIDs { _, exists := store.loadedConversations[*pid] if i%2 == 0 { if exists { - t.Errorf("%d. delete() failed to delete the conversation "+ - "(ID %s) from memory.", i, pid) + t.Errorf("Delete failed to delete the conversation for ID "+ + "%s (%d).", pid, i) } } else if !exists { - t.Errorf("%d. delete() unexpetedly deleted the conversation "+ - "(ID %s) from memory.", i, pid) + t.Errorf("Delete unexpetedly deletde the conversation for ID "+ + "%s (%d).", pid, i) } } } diff --git a/e2e/parse/firstMessagePart.go b/e2e/parse/firstMessagePart.go index f24ccc33e4cee57dba08a3cbf20c8b40225f647c..05280cd16e0bbed42b7a1cbe3e62c49241143233 100644 --- a/e2e/parse/firstMessagePart.go +++ b/e2e/parse/firstMessagePart.go @@ -39,7 +39,7 @@ func newFirstMessagePart(mt catalog.MessageType, id uint32, numParts uint8, timestamp time.Time, contents []byte) firstMessagePart { // Create the message structure - m := FirstMessagePartFromBytes(make([]byte, len(contents)+firstHeaderLen)) + m := firstMessagePartFromBytes(make([]byte, len(contents)+firstHeaderLen)) // Set the message type binary.BigEndian.PutUint32(m.Type, uint32(mt)) @@ -75,9 +75,9 @@ var firstMessagePartFromBytesVersions = map[uint8]func([]byte) firstMessagePart{ firstMessagePartCurrentVersion: firstMessagePartFromBytesVer0, } -// FirstMessagePartFromBytes builds a firstMessagePart mapped to the passed in +// firstMessagePartFromBytes builds a firstMessagePart mapped to the passed in // data slice. Mapped by reference; a copy is not made. -func FirstMessagePartFromBytes(data []byte) firstMessagePart { +func firstMessagePartFromBytes(data []byte) firstMessagePart { // Map the data according to its version version := data[len(data)-1] @@ -105,27 +105,27 @@ func firstMessagePartFromBytesVer0(data []byte) firstMessagePart { } } -// GetType returns the message type. -func (m firstMessagePart) GetType() catalog.MessageType { +// getType returns the message type. +func (m firstMessagePart) getType() catalog.MessageType { return catalog.MessageType(binary.BigEndian.Uint32(m.Type)) } -// GetNumParts returns the number of message parts. -func (m firstMessagePart) GetNumParts() uint8 { +// getNumParts returns the number of message parts. +func (m firstMessagePart) getNumParts() uint8 { return m.NumParts[0] } -// GetTimestamp returns the timestamp as a time.Time. -func (m firstMessagePart) GetTimestamp() time.Time { +// getTimestamp returns the timestamp as a time.Time. +func (m firstMessagePart) getTimestamp() time.Time { return time.Unix(0, int64(binary.BigEndian.Uint64(m.Timestamp))) } -// GetVersion returns the version number of the data encoding. -func (m firstMessagePart) GetVersion() uint8 { +// getVersion returns the version number of the data encoding. +func (m firstMessagePart) getVersion() uint8 { return m.Version[0] } -// Bytes returns the serialised message data. -func (m firstMessagePart) Bytes() []byte { +// bytes returns the serialised message data. +func (m firstMessagePart) bytes() []byte { return m.Data } diff --git a/e2e/parse/firstMessagePart_test.go b/e2e/parse/firstMessagePart_test.go index ae8f9af8f2914c7ef4e2b1035893764a7ff748a9..d1459191cf165ebaeb2e2248105097a57e3ece1c 100644 --- a/e2e/parse/firstMessagePart_test.go +++ b/e2e/parse/firstMessagePart_test.go @@ -15,8 +15,9 @@ import ( "time" ) -// Expected firstMessagePart for checking against, generated by fmp in TestNewFirstMessagePart -var efmp = firstMessagePart{ +// Expected firstMessagePart for checking against, generated by fmp in +// TestNewFirstMessagePart. +var expectedFMP = firstMessagePart{ messagePart: messagePart{ Data: []byte{0, 0, 4, 53, 0, 0, 13, 2, 0, 0, 0, 2, 22, 87, 28, 11, 215, 220, 82, 0, 116, 101, 115, 116, 105, 110, 103, 115, 116, 114, 105, @@ -33,8 +34,8 @@ var efmp = firstMessagePart{ Version: []byte{firstMessagePartCurrentVersion}, } -// Test that newFirstMessagePart returns a correctly made firstMessagePart -func TestNewFirstMessagePart(t *testing.T) { +// Test that newFirstMessagePart returns a correctly made firstMessagePart. +func Test_newFirstMessagePart(t *testing.T) { fmp := newFirstMessagePart( catalog.XxMessage, 1077, @@ -43,53 +44,54 @@ func TestNewFirstMessagePart(t *testing.T) { []byte{'t', 'e', 's', 't', 'i', 'n', 'g', 's', 't', 'r', 'i', 'n', 'g'}, ) - gotTime := fmp.GetTimestamp() + gotTime := fmp.getTimestamp() expectedTime := time.Unix(1609786229, 0).UTC() if !gotTime.Equal(expectedTime) { t.Errorf("Failed to get expected timestamp."+ "\nexpected: %s\nreceived: %s", expectedTime, gotTime) } - if !reflect.DeepEqual(fmp, efmp) { + if !reflect.DeepEqual(fmp, expectedFMP) { t.Errorf("Expected and got firstMessagePart did not match."+ - "\nexpected: %+v\nrecieved: %+v", efmp, fmp) + "\nexpected: %+v\nrecieved: %+v", expectedFMP, fmp) } } -// Test that FirstMessagePartFromBytes returns a correctly made firstMessagePart from the bytes of one -func TestFirstMessagePartFromBytes(t *testing.T) { - fmp := FirstMessagePartFromBytes(efmp.Data) +// Test that firstMessagePartFromBytes returns a correctly made firstMessagePart +// from the bytes of one. +func Test_firstMessagePartFromBytes(t *testing.T) { + fmp := firstMessagePartFromBytes(expectedFMP.Data) - if !reflect.DeepEqual(fmp, efmp) { + if !reflect.DeepEqual(fmp, expectedFMP) { t.Error("Expected and got firstMessagePart did not match") } } -// Test that GetType returns the correct type for a firstMessagePart -func TestFirstMessagePart_GetType(t *testing.T) { - if efmp.GetType() != catalog.XxMessage { - t.Errorf("Got %v, expected %v", efmp.GetType(), catalog.XxMessage) +// Test that firstMessagePart.getType returns the correct type. +func Test_firstMessagePart_getType(t *testing.T) { + if expectedFMP.getType() != catalog.XxMessage { + t.Errorf("Got %v, expected %v", expectedFMP.getType(), catalog.XxMessage) } } -// Test that GetNumParts returns the correct number of parts for a firstMessagePart -func TestFirstMessagePart_GetNumParts(t *testing.T) { - if efmp.GetNumParts() != 2 { - t.Errorf("Got %v, expected %v", efmp.GetNumParts(), 2) +// Test that firstMessagePart.getNumParts returns the correct number of parts. +func Test_firstMessagePart_getNumParts(t *testing.T) { + if expectedFMP.getNumParts() != 2 { + t.Errorf("Got %v, expected %v", expectedFMP.getNumParts(), 2) } } -// Test that GetTimestamp returns the correct timestamp for a firstMessagePart -func TestFirstMessagePart_GetTimestamp(t *testing.T) { - et := efmp.GetTimestamp() +// Test that firstMessagePart.getTimestamp returns the correct timestamp. +func Test_firstMessagePart_getTimestamp(t *testing.T) { + et := expectedFMP.getTimestamp() if !time.Unix(1609786229, 0).Equal(et) { t.Errorf("Got %v, expected %v", et, time.Unix(1609786229, 0)) } } -// Test that GetTimestamp returns the correct bytes for a firstMessagePart -func TestFirstMessagePart_Bytes(t *testing.T) { - if bytes.Compare(efmp.Bytes(), efmp.Data) != 0 { - t.Errorf("Got %v, expected %v", efmp.Bytes(), efmp.Data) +// Test that firstMessagePart.bytes returns the correct bytes. +func Test_firstMessagePart_bytes(t *testing.T) { + if !bytes.Equal(expectedFMP.bytes(), expectedFMP.Data) { + t.Errorf("Got %v, expected %v", expectedFMP.bytes(), expectedFMP.Data) } } diff --git a/e2e/parse/messagePart.go b/e2e/parse/messagePart.go index 01c514d7c33fe1654534a67aeaf8671744d93982..95d33cd6ac889d9050c76b62b948c1cee1fd4e8a 100644 --- a/e2e/parse/messagePart.go +++ b/e2e/parse/messagePart.go @@ -87,32 +87,32 @@ func messagePartFromBytesVer0(data []byte) messagePart { } } -// GetID returns the message ID. -func (m messagePart) GetID() uint32 { +// getID returns the message ID. +func (m messagePart) getID() uint32 { return binary.BigEndian.Uint32(m.Id) } -// GetPart returns the message part number. -func (m messagePart) GetPart() uint8 { +// getPart returns the message part number. +func (m messagePart) getPart() uint8 { return m.Part[0] } -// GetContents returns the entire contents slice. -func (m messagePart) GetContents() []byte { +// getContents returns the entire contents slice. +func (m messagePart) getContents() []byte { return m.Contents } -// GetSizedContents returns the contents truncated to include only stored data. -func (m messagePart) GetSizedContents() []byte { - return m.Contents[:m.GetContentsLength()] +// getSizedContents returns the contents truncated to include only stored data. +func (m messagePart) getSizedContents() []byte { + return m.Contents[:m.getContentsLength()] } -// GetContentsLength returns the length of the data in the contents. -func (m messagePart) GetContentsLength() int { +// getContentsLength returns the length of the data in the contents. +func (m messagePart) getContentsLength() int { return int(binary.BigEndian.Uint16(m.Len)) } -// Bytes returns the serialised message data. -func (m messagePart) Bytes() []byte { +// bytes returns the serialised message data. +func (m messagePart) bytes() []byte { return m.Data } diff --git a/e2e/parse/messagePart_test.go b/e2e/parse/messagePart_test.go index 61ecc7467ff5c95dd2cf89830cacbb319cf9660f..93c1bb83df372fcd456822bf7b984d8bb47ff937 100644 --- a/e2e/parse/messagePart_test.go +++ b/e2e/parse/messagePart_test.go @@ -13,60 +13,64 @@ import ( "testing" ) -// Expected messagePart for checking against, generated by gotmp in Test_newMessagePart -var emp = messagePart{ - Data: []uint8{0x0, 0x0, 0x0, 0x20, 0x6, 0x0, 0x7, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67, messagePartCurrentVersion}, - Id: []uint8{0x0, 0x0, 0x0, 0x20}, Part: []uint8{0x6}, +// Expected messagePart for checking against; generated by goTmp in +// Test_newMessagePart. +var expectedMP = messagePart{ + Data: []uint8{0x0, 0x0, 0x0, 0x20, 0x6, 0x0, 0x7, 0x74, 0x65, 0x73, 0x74, + 0x69, 0x6e, 0x67, messagePartCurrentVersion}, + Id: []uint8{0x0, 0x0, 0x0, 0x20}, Part: []uint8{0x6}, Len: []uint8{0x0, 0x7}, Contents: []uint8{0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67}, Version: []uint8{messagePartCurrentVersion}, } -// This tests that a new function part is successfully created +// Tests that a new function part is successfully created. func Test_newMessagePart(t *testing.T) { - gotmp := newMessagePart(32, 6, []byte{'t', 'e', 's', 't', 'i', 'n', 'g'}) - if !reflect.DeepEqual(gotmp, emp) { + goTmp := newMessagePart(32, 6, []byte{'t', 'e', 's', 't', 'i', 'n', 'g'}) + if !reflect.DeepEqual(goTmp, expectedMP) { t.Errorf("MessagePart received and MessagePart expected do not match."+ - "\nexpected: %#v\nreceived: %#v", emp, gotmp) + "\nexpected: %#v\nreceived: %#v", expectedMP, goTmp) } } -// Test that GetID returns the correct ID -func TestMessagePart_GetID(t *testing.T) { - if emp.GetID() != 32 { +// Test that messagePart.getID returns the correct ID. +func Test_messagePart_getID(t *testing.T) { + if expectedMP.getID() != 32 { t.Errorf("received and expected do not match."+ - "\n\tGot: %#v\n\tExpected: %#v", emp.GetID(), 32) + "\n\tGot: %#v\nexpected: %#v", expectedMP.getID(), 32) } } -// Test that GetPart returns the correct part number -func TestMessagePart_GetPart(t *testing.T) { - if emp.GetPart() != 6 { +// Test that getPart returns the correct part number +func TestMessagePart_getPart(t *testing.T) { + if expectedMP.getPart() != 6 { t.Errorf("received and expected do not match."+ - "\n\tGot: %#v\n\tExpected: %#v", emp.GetPart(), 6) + "\n\tGot: %#v\nexpected: %#v", expectedMP.getPart(), 6) } } -// Test that GetContents returns the message contests -func TestMessagePart_GetContents(t *testing.T) { - if bytes.Compare(emp.GetContents(), []byte{'t', 'e', 's', 't', 'i', 'n', 'g'}) != 0 { +// Test that getContents returns the message contests +func TestMessagePart_getContents(t *testing.T) { + if !bytes.Equal(expectedMP.getContents(), + []byte{'t', 'e', 's', 't', 'i', 'n', 'g'}) { t.Errorf("received and expected do not match."+ - "\n\tGot: %#v\n\tExpected: %#v", emp.GetContents(), 6) + "\n\tGot: %#v\nexpected: %#v", expectedMP.getContents(), 6) } } -// Test that GetSizedContents returns the message contests -func TestMessagePart_GetSizedContents(t *testing.T) { - if bytes.Compare(emp.GetSizedContents(), []byte{'t', 'e', 's', 't', 'i', 'n', 'g'}) != 0 { +// Test that getSizedContents returns the message contests +func TestMessagePart_getSizedContents(t *testing.T) { + if !bytes.Equal(expectedMP.getSizedContents(), + []byte{'t', 'e', 's', 't', 'i', 'n', 'g'}) { t.Errorf("received and expected do not match."+ - "\n\tGot: %#v\n\tExpected: %#v", emp.GetSizedContents(), 6) + "\n\tGot: %#v\nexpected: %#v", expectedMP.getSizedContents(), 6) } } -// Test that GetContentsLength returns the message length -func TestMessagePart_GetContentsLength(t *testing.T) { - if emp.GetContentsLength() != 7 { +// Test that getContentsLength returns the message length +func TestMessagePart_getContentsLength(t *testing.T) { + if expectedMP.getContentsLength() != 7 { t.Errorf("received and expected do not match."+ - "\n\tGot: %#v\n\tExpected: %#v", emp.GetContentsLength(), 7) + "\n\tGot: %#v\nexpected: %#v", expectedMP.getContentsLength(), 7) } } diff --git a/e2e/parse/partition.go b/e2e/parse/partition.go index 06e10bcae00615e356ad305d4fef8d5ae5a38e4e..0b6d5dde38c14a1f0b2d5156dbfb787b34792452 100644 --- a/e2e/parse/partition.go +++ b/e2e/parse/partition.go @@ -41,6 +41,7 @@ func NewPartitioner(messageSize int, kv *versioned.KV) Partitioner { partition: partition.NewOrLoad(kv), } p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize + return p } @@ -63,12 +64,12 @@ func (p Partitioner) Partition(recipient *id.ID, mt catalog.MessageType, // Create the first message part var sub []byte sub, payload = splitPayload(payload, p.firstContentsSize) - parts[0] = newFirstMessagePart(mt, messageID, numParts, timestamp, sub).Bytes() + parts[0] = newFirstMessagePart(mt, messageID, numParts, timestamp, sub).bytes() // Create all subsequent message parts for i := uint8(1); i < numParts; i++ { sub, payload = splitPayload(payload, p.partContentsSize) - parts[i] = newMessagePart(messageID, i, sub).Bytes() + parts[i] = newMessagePart(messageID, i, sub).bytes() } return parts, fullMessageID, nil @@ -81,23 +82,23 @@ func (p Partitioner) HandlePartition(sender *id.ID, // If it is the first message in a set, then handle it as so // Decode the message structure - fm := FirstMessagePartFromBytes(contents) + fm := firstMessagePartFromBytes(contents) // Handle the message ID messageID := p.conversation.Get(sender). - ProcessReceivedMessageID(fm.GetID()) - storeageTimestamp := netTime.Now() - return p.partition.AddFirst(sender, fm.GetType(), - messageID, fm.GetPart(), fm.GetNumParts(), fm.GetTimestamp(), storeageTimestamp, - fm.GetSizedContents(), relationshipFingerprint) + ProcessReceivedMessageID(fm.getID()) + storageTimestamp := netTime.Now() + return p.partition.AddFirst(sender, fm.getType(), messageID, + fm.getPart(), fm.getNumParts(), fm.getTimestamp(), storageTimestamp, + fm.getSizedContents(), relationshipFingerprint) } else { // If it is a subsequent message part, handle it as so mp := messagePartFromBytes(contents) - messageID := p.conversation.Get(sender). - ProcessReceivedMessageID(mp.GetID()) + messageID := + p.conversation.Get(sender).ProcessReceivedMessageID(mp.getID()) - return p.partition.Add(sender, messageID, mp.GetPart(), - mp.GetSizedContents(), relationshipFingerprint) + return p.partition.Add(sender, messageID, mp.getPart(), + mp.getSizedContents(), relationshipFingerprint) } } diff --git a/e2e/parse/partition/multiPartMessage.go b/e2e/parse/partition/multiPartMessage.go index 581689984e6b561c4abbc78ca18632d89f5fada9..d067248d2a792fe846a9cb2e42c92ba001d5b3d5 100644 --- a/e2e/parse/partition/multiPartMessage.go +++ b/e2e/parse/partition/multiPartMessage.go @@ -9,7 +9,6 @@ package partition import ( "encoding/json" - "fmt" "github.com/pkg/errors" jww "github.com/spf13/jwalterweatherman" "gitlab.com/elixxir/client/catalog" @@ -19,21 +18,26 @@ import ( "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" + "strconv" "sync" "time" ) -const currentMultiPartMessageVersion = 0 -const messageKey = "MultiPart" +const ( + currentMultiPartMessageVersion = 0 + messageKey = "MultiPart" +) type multiPartMessage struct { Sender *id.ID MessageID uint64 NumParts uint8 PresentParts uint8 - // Timestamp of message from sender + + // SenderTimestamp is the timestamp of message from sender. SenderTimestamp time.Time - // Timestamp in which message was stored in RAM + + // StorageTimestamp is the timestamp in which message was stored in RAM StorageTimestamp time.Time MessageType catalog.MessageType @@ -46,7 +50,8 @@ type multiPartMessage struct { // creates a new one and saves it if one does not exist. func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, kv *versioned.KV) *multiPartMessage { - kv = kv.Prefix(versioned.MakePartnerPrefix(sender)).Prefix(fmt.Sprintf("MessageID:%d", messageID)) + kv = kv.Prefix(versioned.MakePartnerPrefix(sender)). + Prefix(makeMultiPartMessagePrefix(messageID)) obj, err := kv.Get(messageKey, currentMultiPartMessageVersion) if err != nil { @@ -60,23 +65,24 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, MessageType: 0, kv: kv, } + if err = mpm.save(); err != nil { - jww.FATAL.Panicf("Failed to save new multi part "+ - "message from %s messageID %v: %s", sender, messageID, err) + jww.FATAL.Panicf("Failed to save new multipart message from "+ + "%s messageID %d: %+v", sender, messageID, err) } + return mpm } - jww.FATAL.Panicf("Failed to open multi part "+ - "message from %s messageID %v: %s", sender, messageID, err) - } - mpm := &multiPartMessage{ - kv: kv, + jww.FATAL.Panicf("Failed to open multipart message from %s messageID "+ + "%d: %+v", sender, messageID, err) } + mpm := &multiPartMessage{kv: kv} + if err = json.Unmarshal(obj.Data, mpm); err != nil { - jww.FATAL.Panicf("Failed to unmarshal multi part "+ - "message from %s messageID %v: %s", sender, messageID, err) + jww.FATAL.Panicf("Failed to unmarshal multipart message from %s "+ + "messageID %d: %+v", sender, messageID, err) } return mpm @@ -85,7 +91,7 @@ func loadOrCreateMultiPartMessage(sender *id.ID, messageID uint64, func (mpm *multiPartMessage) save() error { data, err := json.Marshal(mpm) if err != nil { - return errors.Wrap(err, "Failed to unmarshal multi-part message") + return errors.Wrap(err, "Failed to unmarshal multipart message") } obj := versioned.Object{ @@ -103,22 +109,22 @@ func (mpm *multiPartMessage) Add(partNumber uint8, part []byte) { // Extend the list if needed if len(mpm.parts) <= int(partNumber) { - mpm.parts = append(mpm.parts, make([][]byte, int(partNumber)-len(mpm.parts)+1)...) + mpm.parts = append(mpm.parts, + make([][]byte, int(partNumber)-len(mpm.parts)+1)...) } mpm.parts[partNumber] = part mpm.PresentParts++ if err := savePart(mpm.kv, partNumber, part); err != nil { - jww.FATAL.Panicf("Failed to save multi part "+ - "message part %v from %s messageID %v: %s", partNumber, mpm.Sender, - mpm.MessageID, err) + jww.FATAL.Panicf("Failed to save multipart message part %d from %s "+ + "messageID %d: %+v", partNumber, mpm.Sender, mpm.MessageID, err) } if err := mpm.save(); err != nil { - jww.FATAL.Panicf("Failed to save multi part "+ - "message after adding part %v from %s messageID %v: %s", partNumber, - mpm.Sender, mpm.MessageID, err) + jww.FATAL.Panicf("Failed to save multipart message after adding part "+ + "%d from %s messageID %d: %+v", partNumber, mpm.Sender, + mpm.MessageID, err) } } @@ -129,7 +135,8 @@ func (mpm *multiPartMessage) AddFirst(mt catalog.MessageType, partNumber uint8, // Extend the list if needed if len(mpm.parts) <= int(partNumber) { - mpm.parts = append(mpm.parts, make([][]byte, int(partNumber)-len(mpm.parts)+1)...) + mpm.parts = append(mpm.parts, + make([][]byte, int(partNumber)-len(mpm.parts)+1)...) } mpm.NumParts = numParts @@ -140,20 +147,21 @@ func (mpm *multiPartMessage) AddFirst(mt catalog.MessageType, partNumber uint8, mpm.StorageTimestamp = storageTimestamp if err := savePart(mpm.kv, partNumber, part); err != nil { - jww.FATAL.Panicf("Failed to save multi part "+ - "message part %v from %s messageID %v: %s", partNumber, mpm.Sender, - mpm.MessageID, err) + jww.FATAL.Panicf("Failed to save multipart message part %d from %s "+ + "messageID %d: %+v", partNumber, mpm.Sender, mpm.MessageID, err) } if err := mpm.save(); err != nil { - jww.FATAL.Panicf("Failed to save multi part message after adding part "+ - "%v from %s messageID %v: %s", - partNumber, mpm.Sender, mpm.MessageID, err) + jww.FATAL.Panicf("Failed to save multipart message after adding part "+ + "%d from %s messageID %d: %+v", partNumber, mpm.Sender, + mpm.MessageID, err) } } -func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (receive.Message, bool) { +func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) ( + receive.Message, bool) { mpm.mux.Lock() + if mpm.NumParts == 0 || mpm.NumParts != mpm.PresentParts { mpm.mux.Unlock() return receive.Message{}, false @@ -161,10 +169,11 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (receive // Make sure the parts buffer is large enough to load all parts from disk if len(mpm.parts) < int(mpm.NumParts) { - mpm.parts = append(mpm.parts, make([][]byte, int(mpm.NumParts)-len(mpm.parts))...) + mpm.parts = append(mpm.parts, + make([][]byte, int(mpm.NumParts)-len(mpm.parts))...) } - // delete the multipart message + // Delete the multipart message lenMsg := mpm.delete() mpm.mux.Unlock() @@ -193,34 +202,41 @@ func (mpm *multiPartMessage) IsComplete(relationshipFingerprint []byte) (receive return m, true } -// deletes all parts from disk and RAM. Returns the message length for reconstruction +// delete removes all parts from disk and memory. Returns the message length for +// reconstruction. func (mpm *multiPartMessage) delete() int { - // Load all parts from disk, deleting files from disk as we go along var err error lenMsg := 0 + + // Load all parts from disk, deleting files from disk as we go along for i := uint8(0); i < mpm.NumParts; i++ { if mpm.parts[i] == nil { if mpm.parts[i], err = loadPart(mpm.kv, i); err != nil { - jww.FATAL.Panicf("Failed to load multi part "+ - "message part %v from %s messageID %v: %s", i, mpm.Sender, - mpm.MessageID, err) + jww.FATAL.Panicf("Failed to load multipart message part %d "+ + "from %s messageID %d: %+v", + i, mpm.Sender, mpm.MessageID, err) } + if err = deletePart(mpm.kv, i); err != nil { - jww.FATAL.Panicf("Failed to delete multi part "+ - "message part %v from %s messageID %v: %s", i, mpm.Sender, - mpm.MessageID, err) + jww.FATAL.Panicf("Failed to delete multipart message part "+ + "%d from %s messageID %d: %+v", + i, mpm.Sender, mpm.MessageID, err) } } + lenMsg += len(mpm.parts[i]) } - //key := makeMultiPartMessageKey(mpm.MessageID) - if err := mpm.kv.Delete(messageKey, - currentMultiPartMessageVersion); err != nil { - jww.FATAL.Panicf("Failed to delete multi part "+ - "message from %s messageID %v: %s", mpm.Sender, - mpm.MessageID, err) + // key := makeMultiPartMessageKey(mpm.MessageID) + err = mpm.kv.Delete(messageKey, currentMultiPartMessageVersion) + if err != nil { + jww.FATAL.Panicf("Failed to delete multipart message from %s "+ + "messageID %d: %+v", mpm.Sender, mpm.MessageID, err) } return lenMsg } + +func makeMultiPartMessagePrefix(messageID uint64) string { + return "MessageID:" + strconv.FormatUint(messageID, 10) +} diff --git a/e2e/parse/partition/multiPartMessage_test.go b/e2e/parse/partition/multiPartMessage_test.go index 040b3dfde1624c5124d73072018fad8fd6b5eef0..8bd4846625bde3c50dcdb30a2c6ab19e3e423bdb 100644 --- a/e2e/parse/partition/multiPartMessage_test.go +++ b/e2e/parse/partition/multiPartMessage_test.go @@ -10,7 +10,8 @@ package partition import ( "bytes" "encoding/json" - "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/catalog" + "gitlab.com/elixxir/client/e2e/receive" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/crypto/e2e" "gitlab.com/elixxir/ekv" @@ -22,7 +23,7 @@ import ( "time" ) -// Tests the creation part of loadOrCreateMultiPartMessage(). +// Tests the creation part of loadOrCreateMultiPartMessage. func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { // Set up expected test value prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) @@ -37,28 +38,28 @@ func Test_loadOrCreateMultiPartMessage_Create(t *testing.T) { } expectedData, err := json.Marshal(expectedMpm) if err != nil { - t.Fatalf("Failed to marshal expected multiPartMessage: %v", err) + t.Errorf("Failed to JSON marshal expected multiPartMessage: %+v", err) } // Make new multiPartMessage - mpm := loadOrCreateMultiPartMessage(expectedMpm.Sender, - expectedMpm.MessageID, expectedMpm.kv) + mpm := loadOrCreateMultiPartMessage( + expectedMpm.Sender, expectedMpm.MessageID, expectedMpm.kv) CheckMultiPartMessages(expectedMpm, mpm, t) obj, err := mpm.kv.Get(messageKey, 0) if err != nil { - t.Errorf("get() failed to get multiPartMessage from key value store: %v", err) + t.Errorf("Get failed to get multiPartMessage storage: %+v", err) } if !bytes.Equal(expectedData, obj.Data) { - t.Errorf("loadOrCreateMultiPartMessage() did not save the "+ - "multiPartMessage correctly.\n\texpected: %+v\n\treceived: %+v", + t.Errorf("loadOrCreateMultiPartMessage did not save the "+ + "multiPartMessage correctly.\nexpected: %+v\nreceived: %+v", expectedData, obj.Data) } } -// Tests the loading part of loadOrCreateMultiPartMessage(). +// Tests the loading part of loadOrCreateMultiPartMessage. func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { // Set up expected test value prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) @@ -73,52 +74,69 @@ func Test_loadOrCreateMultiPartMessage_Load(t *testing.T) { } err := expectedMpm.save() if err != nil { - t.Fatalf("Failed to save multiPartMessage: %v", err) + t.Errorf("Failed to JSON marshal expected multiPartMessage: %+v", err) } // Make new multiPartMessage - mpm := loadOrCreateMultiPartMessage(expectedMpm.Sender, - expectedMpm.MessageID, expectedMpm.kv) + mpm := loadOrCreateMultiPartMessage( + expectedMpm.Sender, expectedMpm.MessageID, expectedMpm.kv) CheckMultiPartMessages(expectedMpm, mpm, t) } -func CheckMultiPartMessages(expectedMpm *multiPartMessage, mpm *multiPartMessage, t *testing.T) { - // The kv differs because it has prefix called, so we compare fields individually +func CheckMultiPartMessages( + expectedMpm *multiPartMessage, mpm *multiPartMessage, t *testing.T) { + // The kv differs because it has prefix called, so we compare fields + // individually if expectedMpm.SenderTimestamp != mpm.SenderTimestamp { - t.Errorf("timestamps mismatch: expected %v, got %v", expectedMpm.SenderTimestamp, mpm.SenderTimestamp) + t.Errorf("Timestamps mismatch: expected %s, got %s", + expectedMpm.SenderTimestamp, mpm.SenderTimestamp) } + if expectedMpm.MessageType != mpm.MessageType { - t.Errorf("messagetype mismatch: expected %v, got %v", expectedMpm.MessageID, mpm.MessageID) + t.Errorf("Messagetype mismatch: expected %d, got %d", + expectedMpm.MessageType, mpm.MessageType) } + if expectedMpm.MessageID != mpm.MessageID { - t.Errorf("messageid mismatch: expected %v, got %v", expectedMpm.MessageID, mpm.MessageID) + t.Errorf("MessageID mismatch: expected %d, got %d", + expectedMpm.MessageID, mpm.MessageID) } + if expectedMpm.NumParts != mpm.NumParts { - t.Errorf("numparts mismatch: expected %v, got %v", expectedMpm.NumParts, mpm.NumParts) + t.Errorf("NumParts mismatch: expected %d, got %d", + expectedMpm.NumParts, mpm.NumParts) } + if expectedMpm.PresentParts != mpm.PresentParts { - t.Errorf("presentparts mismatch: expected %v, got %v", expectedMpm.PresentParts, mpm.PresentParts) + t.Errorf("PresentParts mismatch: expected %d, got %d", + expectedMpm.PresentParts, mpm.PresentParts) } + if !expectedMpm.Sender.Cmp(mpm.Sender) { - t.Errorf("sender mismatch: expected %v, got %v", expectedMpm.Sender, mpm.Sender) + t.Errorf("Sender mismatch: expected %s, got %s", + expectedMpm.Sender, mpm.Sender) } + if len(expectedMpm.parts) != len(mpm.parts) { - t.Error("parts different length") + t.Errorf("parts length mismatch: expected %d, got %d", + len(expectedMpm.parts), len(mpm.parts)) } + for i := range expectedMpm.parts { if !bytes.Equal(expectedMpm.parts[i], mpm.parts[i]) { - t.Errorf("parts differed at index %v", i) + t.Errorf("Parts differed at index %d.", i) } } } -// Tests happy path of multiPartMessage.AddFingerprint(). +// Tests happy path of multiPartMessage.AddFingerprint. func TestMultiPartMessage_Add(t *testing.T) { // Generate test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) - mpm := loadOrCreateMultiPartMessage(id.NewIdFromUInt(prng.Uint64(), id.User, t), - prng.Uint64(), versioned.NewKV(make(ekv.Memstore))) + mpm := loadOrCreateMultiPartMessage( + id.NewIdFromUInt(prng.Uint64(), id.User, t), prng.Uint64(), + versioned.NewKV(make(ekv.Memstore))) partNums, parts := generateParts(prng, 0) for i := range partNums { @@ -128,12 +146,12 @@ func TestMultiPartMessage_Add(t *testing.T) { for i, p := range partNums { if !bytes.Equal(mpm.parts[p], parts[i]) { t.Errorf("Incorrect part at index %d (%d)."+ - "\n\texpected: %v\n\treceived: %v", p, i, parts[i], mpm.parts[p]) + "\nexpected: %v\nreceived: %v", p, i, parts[i], mpm.parts[p]) } } if len(partNums) != int(mpm.PresentParts) { - t.Errorf("Incorrect PresentParts.\n\texpected: %d\n\treceived: %d", + t.Errorf("Incorrect PresentParts.\nexpected: %d\nreceived: %d", len(partNums), int(mpm.PresentParts)) } @@ -144,17 +162,17 @@ func TestMultiPartMessage_Add(t *testing.T) { obj, err := mpm.kv.Get(messageKey, 0) if err != nil { - t.Errorf("get() failed to get multiPartMessage from key value store: %v", err) + t.Errorf("get failed to get multiPartMessage from key value store: %v", err) } if !bytes.Equal(expectedData, obj.Data) { - t.Errorf("loadOrCreateMultiPartMessage() did not save the "+ - "multiPartMessage correctly.\n\texpected: %+v\n\treceived: %+v", + t.Errorf("loadOrCreateMultiPartMessage did not save the "+ + "multiPartMessage correctly.\nexpected: %+v\nreceived: %+v", expectedData, obj.Data) } } -// Tests happy path of multiPartMessage.AddFirst(). +// Tests happy path of multiPartMessage.AddFirst. func TestMultiPartMessage_AddFirst(t *testing.T) { // Generate test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) @@ -164,7 +182,7 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { NumParts: uint8(prng.Uint32()), PresentParts: 1, SenderTimestamp: netTime.Now(), - MessageType: message.NoType, + MessageType: catalog.NoType, parts: make([][]byte, 3), kv: versioned.NewKV(make(ekv.Memstore)), } @@ -179,31 +197,33 @@ func TestMultiPartMessage_AddFirst(t *testing.T) { data, err := loadPart(npm.kv, 2) if err != nil { - t.Errorf("loadPart() produced an error: %v", err) + t.Errorf("loadPart produced an error: %v", err) } if !bytes.Equal(data, expectedMpm.parts[2]) { - t.Errorf("AddFirst() did not save multiPartMessage correctly."+ - "\n\texpected: %#v\n\treceived: %#v", expectedMpm.parts[2], data) + t.Errorf("AddFirst did not save multiPartMessage correctly."+ + "\nexpected: %#v\nreceived: %#v", expectedMpm.parts[2], data) } } -// Tests happy path of multiPartMessage.IsComplete(). +// Tests happy path of multiPartMessage.IsComplete. func TestMultiPartMessage_IsComplete(t *testing.T) { // Create multiPartMessage and fill with random parts prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) mid := prng.Uint64() - mpm := loadOrCreateMultiPartMessage(id.NewIdFromUInt(prng.Uint64(), id.User, t), - mid, versioned.NewKV(make(ekv.Memstore))) + mpm := loadOrCreateMultiPartMessage( + id.NewIdFromUInt(prng.Uint64(), id.User, t), mid, + versioned.NewKV(make(ekv.Memstore))) partNums, parts := generateParts(prng, 75) - // Check that IsComplete() is false where there are no parts + // Check that IsComplete is false where there are no parts msg, complete := mpm.IsComplete([]byte{0}) if complete { - t.Error("IsComplete() returned true when NumParts == 0.") + t.Error("IsComplete returned true when NumParts == 0.") } - mpm.AddFirst(message.XxMessage, partNums[0], 75, netTime.Now(), netTime.Now(), parts[0]) + mpm.AddFirst(catalog.XxMessage, partNums[0], 75, netTime.Now(), + netTime.Now(), parts[0]) for i := range partNums { if i > 0 { mpm.Add(partNums[i], parts[i]) @@ -212,7 +232,7 @@ func TestMultiPartMessage_IsComplete(t *testing.T) { msg, complete = mpm.IsComplete([]byte{0}) if !complete { - t.Error("IsComplete() returned false when the message should be complete.") + t.Error("IsComplete returned false when the message should be complete.") } var payload []byte @@ -220,33 +240,32 @@ func TestMultiPartMessage_IsComplete(t *testing.T) { payload = append(payload, b...) } - expectedMsg := message.Receive{ + expectedMsg := receive.Message{ Payload: payload, MessageType: mpm.MessageType, Sender: mpm.Sender, Timestamp: msg.Timestamp, - Encryption: 0, ID: e2e.NewMessageID([]byte{0}, mid), } if !reflect.DeepEqual(expectedMsg, msg) { - t.Errorf("IsComplete() did not return the expected message."+ - "\n\texpected: %v\n\treceived: %v", expectedMsg, msg) + t.Errorf("IsComplete did not return the expected message."+ + "\nexpected: %v\nreceived: %v", expectedMsg, msg) } } -// Tests happy path of multiPartMessage.delete(). +// Tests happy path of multiPartMessage.delete. func TestMultiPartMessage_delete(t *testing.T) { prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) kv := versioned.NewKV(make(ekv.Memstore)) - mpm := loadOrCreateMultiPartMessage(id.NewIdFromUInt(prng.Uint64(), id.User, t), - prng.Uint64(), kv) + mpm := loadOrCreateMultiPartMessage( + id.NewIdFromUInt(prng.Uint64(), id.User, t), prng.Uint64(), kv) mpm.delete() obj, err := kv.Get(messageKey, 0) if ekv.Exists(err) { - t.Errorf("delete() did not properly delete key %s."+ + t.Errorf("delete did not properly delete key %s."+ "\n\tobject received: %+v", messageKey, obj) } } diff --git a/e2e/parse/partition/part.go b/e2e/parse/partition/part.go index 9c24861ff83e732606341e1130bf041cc344c75a..25b0e2566abe28ef39d5605ab1cdc90d5f2542f9 100644 --- a/e2e/parse/partition/part.go +++ b/e2e/parse/partition/part.go @@ -8,9 +8,9 @@ package partition import ( - "fmt" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/xx_network/primitives/netTime" + "strconv" ) const currentMultiPartMessagePartVersion = 0 @@ -43,12 +43,7 @@ func deletePart(kv *versioned.KV, partNum uint8) error { return kv.Delete(key, currentMultiPartMessageVersion) } -// Make the key for a part +// makeMultiPartMessagePartKey makes the key for a part. func makeMultiPartMessagePartKey(part uint8) string { - return fmt.Sprintf("part:%v", part) + return "part:" + strconv.FormatUint(uint64(part), 10) } - -//func multiPartMessagePartPrefix(kv *versioned.KV, id uint64) *versioned.KV { -// return kv.prefix(keyMultiPartMessagePartPrefix). -// prefix(strconv.FormatUint(id, 32)) -//} diff --git a/e2e/parse/partition/part_test.go b/e2e/parse/partition/part_test.go index b421d6af31f6f06236805ac6b472ca310b669cce..936de7af865b4379ee4e72eb396bf926f1ff56bd 100644 --- a/e2e/parse/partition/part_test.go +++ b/e2e/parse/partition/part_test.go @@ -16,7 +16,7 @@ import ( "testing" ) -// Tests happy path of savePart(). +// Tests happy path of savePart. func Test_savePart(t *testing.T) { // Set up test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) @@ -29,23 +29,23 @@ func Test_savePart(t *testing.T) { // Save part err := savePart(kv, partNum, part) if err != nil { - t.Errorf("savePart() produced an error: %v", err) + t.Errorf("savePart produced an error: %+v", err) } // Attempt to get from key value store obj, err := kv.Get(key, 0) if err != nil { - t.Errorf("get() produced an error: %v", err) + t.Errorf("Get produced an error: %+v", err) } // Check if the data is correct if !bytes.Equal(part, obj.Data) { t.Errorf("Part retrieved from key value store is not expected."+ - "\n\texpected: %v\n\treceived: %v", part, obj.Data) + "\nexpected: %v\nreceived: %v", part, obj.Data) } } -// Tests happy path of loadPart(). +// Tests happy path of loadPart. func Test_loadPart(t *testing.T) { // Set up test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) @@ -56,25 +56,26 @@ func Test_loadPart(t *testing.T) { key := makeMultiPartMessagePartKey(partNum) // Save part to key value store - err := rootKv.Set(key, 0, &versioned.Object{Timestamp: netTime.Now(), Data: part}) + err := rootKv.Set( + key, 0, &versioned.Object{Timestamp: netTime.Now(), Data: part}) if err != nil { - t.Fatalf("Failed to set object: %v", err) + t.Errorf("Failed to set object: %+v", err) } // Load part from key value store data, err := loadPart(rootKv, partNum) if err != nil { - t.Errorf("loadPart() produced an error: %v", err) + t.Errorf("loadPart produced an error: %v+", err) } // Check if the data is correct if !bytes.Equal(part, data) { t.Errorf("Part loaded from key value store is not expected."+ - "\n\texpected: %v\n\treceived: %v", part, data) + "\nexpected: %v\nreceived: %v", part, data) } } -// Tests that loadPart() returns an error that an item was not found for unsaved +// Tests that loadPart returns an error that an item was not found for unsaved // key. func Test_loadPart_NotFoundError(t *testing.T) { // Set up test values @@ -87,17 +88,17 @@ func Test_loadPart_NotFoundError(t *testing.T) { // Load part from key value store data, err := loadPart(kv, partNum) if ekv.Exists(err) { - t.Errorf("loadPart() found an item for the key: %v", err) + t.Errorf("loadPart found an item for the key: %v", err) } // Check if the data is correct if !bytes.Equal([]byte{}, data) { t.Errorf("Part loaded from key value store is not expected."+ - "\n\texpected: %v\n\treceived: %v", []byte{}, data) + "\nexpected: %v\nreceived: %v", []byte{}, data) } } -// Test happy path of deletePart(). +// Test happy path of deletePart. func TestDeletePart(t *testing.T) { // Set up test values prng := rand.New(rand.NewSource(netTime.Now().UnixNano())) @@ -109,18 +110,18 @@ func TestDeletePart(t *testing.T) { // Save part err := savePart(kv, partNum, part) if err != nil { - t.Fatalf("savePart() produced an error: %v", err) + t.Errorf("savePart produced an error: %+v", err) } // Attempt to delete part err = deletePart(kv, partNum) if err != nil { - t.Errorf("deletePart() produced an error: %v", err) + t.Errorf("deletePart produced an error: %+v", err) } // Check if part was deleted _, err = loadPart(kv, partNum) if ekv.Exists(err) { - t.Errorf("part was found in key value store: %v", err) + t.Errorf("part was found in key value store: %+v", err) } } diff --git a/e2e/parse/partition/store.go b/e2e/parse/partition/store.go index 6ad8cb4fb48014ffb1d87c5ab0f2f81cabb1759f..7774e3187b90235d79f2ce4082024e8f4a547dec 100644 --- a/e2e/parse/partition/store.go +++ b/e2e/parse/partition/store.go @@ -49,16 +49,19 @@ func NewOrLoad(kv *versioned.KV) *Store { return partitionStore } -func (s *Store) AddFirst(partner *id.ID, mt catalog.MessageType, messageID uint64, - partNum, numParts uint8, senderTimestamp, storageTimestamp time.Time, - part []byte, relationshipFingerprint []byte) (receive.Message, bool) { +func (s *Store) AddFirst(partner *id.ID, mt catalog.MessageType, + messageID uint64, partNum, numParts uint8, senderTimestamp, + storageTimestamp time.Time, part []byte, relationshipFingerprint []byte) ( + receive.Message, bool) { mpm := s.load(partner, messageID) mpm.AddFirst(mt, partNum, numParts, senderTimestamp, storageTimestamp, part) msg, ok := mpm.IsComplete(relationshipFingerprint) + s.mux.Lock() defer s.mux.Unlock() + if !ok { s.activeParts[mpm] = true s.saveActiveParts() @@ -89,14 +92,15 @@ func (s *Store) Add(partner *id.ID, messageID uint64, partNum uint8, return msg, ok } -// Prune clear old messages on it's stored timestamp +// prune clears old messages on it's stored timestamp. func (s *Store) prune() { s.mux.Lock() defer s.mux.Unlock() + now := netTime.Now() - for mpm, _ := range s.activeParts { + for mpm := range s.activeParts { if now.Sub(mpm.StorageTimestamp) >= clearPartitionThreshold { - jww.INFO.Printf("prune partition: %v", mpm) + jww.INFO.Printf("Prune partition: %v", mpm) mpm.mux.Lock() mpm.delete() mpID := getMultiPartID(mpm.Sender, mpm.MessageID) @@ -121,6 +125,7 @@ func (s *Store) load(partner *id.ID, messageID uint64) *multiPartMessage { func (s *Store) saveActiveParts() { jww.INFO.Printf("Saving %d active partitions", len(s.activeParts)) + activeList := make([]*multiPartMessage, 0, len(s.activeParts)) for mpm := range s.activeParts { mpm.mux.Lock() @@ -131,7 +136,7 @@ func (s *Store) saveActiveParts() { data, err := json.Marshal(&activeList) if err != nil { - jww.FATAL.Panicf("Could not save active partitions: %v", err) + jww.FATAL.Panicf("Could not save active partitions: %+v", err) } obj := versioned.Object{ @@ -142,7 +147,7 @@ func (s *Store) saveActiveParts() { err = s.kv.Set(activePartitions, activePartitionVersion, &obj) if err != nil { - jww.FATAL.Panicf("Could not save active partitions: %v", err) + jww.FATAL.Panicf("Could not save active partitions: %+v", err) } } @@ -151,19 +156,19 @@ func (s *Store) loadActivePartitions() { defer s.mux.Unlock() obj, err := s.kv.Get(activePartitions, activePartitionVersion) if err != nil { - jww.DEBUG.Printf("Could not load active partitions: %v", err) + jww.DEBUG.Printf("Could not load active partitions: %+v", err) return } activeList := make([]*multiPartMessage, 0) - if err := json.Unmarshal(obj.Data, &activeList); err != nil { - jww.FATAL.Panicf("Failed to "+ - "unmarshal active partitions: %v", err) + if err = json.Unmarshal(obj.Data, &activeList); err != nil { + jww.FATAL.Panicf("Failed to unmarshal active partitions: %+v", err) } jww.INFO.Printf("loadActivePartitions found %d active", len(activeList)) for _, activeMpm := range activeList { - mpm := loadOrCreateMultiPartMessage(activeMpm.Sender, activeMpm.MessageID, s.kv) + mpm := loadOrCreateMultiPartMessage( + activeMpm.Sender, activeMpm.MessageID, s.kv) s.activeParts[mpm] = true } diff --git a/e2e/parse/partition/store_test.go b/e2e/parse/partition/store_test.go index 1fcd8c54a74cc5976c022d38b0d597826ed13991..9a86fb17b65997dc08be57aeb63800a3ba775f4c 100644 --- a/e2e/parse/partition/store_test.go +++ b/e2e/parse/partition/store_test.go @@ -9,7 +9,7 @@ package partition import ( "bytes" - "gitlab.com/elixxir/client/interfaces/message" + "gitlab.com/elixxir/client/catalog" "gitlab.com/elixxir/client/storage/versioned" "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" @@ -18,8 +18,8 @@ import ( "testing" ) -// Tests happy path of New(). -func TestNew(t *testing.T) { +// Tests happy path of NewOrLoad. +func TestNewOrLoad(t *testing.T) { rootKv := versioned.NewKV(make(ekv.Memstore)) expectedStore := &Store{ multiParts: make(map[multiPartID]*multiPartMessage), @@ -30,69 +30,70 @@ func TestNew(t *testing.T) { store := NewOrLoad(rootKv) if !reflect.DeepEqual(expectedStore, store) { - t.Errorf("New() did not return the expecte Store."+ - "\n\texpected: %v\n\treceived: %v", expectedStore, store) + t.Errorf("New did not return the expecte Store."+ + "\nexpected: %v\nreceived: %v", expectedStore, store) } } -// Tests happy path of Store.AddFirst(). +// Tests happy path of Store.AddFirst. func TestStore_AddFirst(t *testing.T) { part := []byte("Test message.") s := NewOrLoad(versioned.NewKV(ekv.Memstore{})) msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), - message.XxMessage, 5, 0, 1, netTime.Now(), netTime.Now(), part, + catalog.XxMessage, 5, 0, 1, netTime.Now(), netTime.Now(), part, []byte{0}) if !complete { - t.Errorf("AddFirst() returned that the message was not complete.") + t.Errorf("AddFirst returned that the message was not complete.") } if !bytes.Equal(part, msg.Payload) { - t.Errorf("AddFirst() returned message with invalid payload."+ - "\n\texpected: %v\n\treceived: %v", part, msg.Payload) + t.Errorf("AddFirst returned message with invalid payload."+ + "\nexpected: %v\nreceived: %v", part, msg.Payload) } } -// Tests happy path of Store.Add(). +// Tests happy path of Store.Add. func TestStore_Add(t *testing.T) { part1 := []byte("Test message.") part2 := []byte("Second Sentence.") - s := New(versioned.NewKV(ekv.Memstore{})) + s := NewOrLoad(versioned.NewKV(ekv.Memstore{})) msg, complete := s.AddFirst(id.NewIdFromString("User", id.User, t), - message.XxMessage, 5, 0, 2, netTime.Now(), netTime.Now(), part1, + catalog.XxMessage, 5, 0, 2, netTime.Now(), netTime.Now(), part1, []byte{0}) if complete { - t.Errorf("AddFirst() returned that the message was complete.") + t.Errorf("AddFirst returned that the message was complete.") } msg, complete = s.Add(id.NewIdFromString("User", id.User, t), 5, 1, part2, []byte{0}) if !complete { - t.Errorf("AddFirst() returned that the message was not complete.") + t.Errorf("AddFirst returned that the message was not complete.") } part := append(part1, part2...) if !bytes.Equal(part, msg.Payload) { - t.Errorf("AddFirst() returned message with invalid payload."+ - "\n\texpected: %v\n\treceived: %v", part, msg.Payload) + t.Errorf("AddFirst returned message with invalid payload."+ + "\nexpected: %v\nreceived: %v", part, msg.Payload) } } -// Unit test of prune -func TestStore_ClearMessages(t *testing.T) { - // Setup: Add 2 message to store: an old message past the threshold and a new message +// Unit test of Store.prune. +func TestStore_prune(t *testing.T) { + // Setup: Add 2 message to store: an old message past the threshold and a + // new message part1 := []byte("Test message.") part2 := []byte("Second Sentence.") - s := New(versioned.NewKV(ekv.Memstore{})) + s := NewOrLoad(versioned.NewKV(ekv.Memstore{})) partner1 := id.NewIdFromString("User", id.User, t) messageId1 := uint64(5) oldTimestamp := netTime.Now().Add(-2 * clearPartitionThreshold) s.AddFirst(partner1, - message.XxMessage, messageId1, 0, 2, netTime.Now(), + catalog.XxMessage, messageId1, 0, 2, netTime.Now(), oldTimestamp, part1, []byte{0}) s.Add(partner1, messageId1, 1, part2, []byte{0}) @@ -100,7 +101,7 @@ func TestStore_ClearMessages(t *testing.T) { partner2 := id.NewIdFromString("User1", id.User, t) messageId2 := uint64(6) newTimestamp := netTime.Now() - s.AddFirst(partner2, message.XxMessage, messageId2, 0, 2, netTime.Now(), + s.AddFirst(partner2, catalog.XxMessage, messageId2, 0, 2, netTime.Now(), newTimestamp, part1, []byte{0}) @@ -110,14 +111,12 @@ func TestStore_ClearMessages(t *testing.T) { // Check if old message cleared mpmId := getMultiPartID(partner1, messageId1) if _, ok := s.multiParts[mpmId]; ok { - t.Errorf("Prune error: " + - "Expected old message to be cleared out of store") + t.Errorf("Prune error: Expected old message to be cleared out of store") } // Check if new message remains mpmId2 := getMultiPartID(partner2, messageId2) if _, ok := s.multiParts[mpmId2]; !ok { - t.Errorf("Prune error: " + - "Expected new message to be remain in store") + t.Errorf("Prune error: expected new message to remain in store") } } diff --git a/e2e/parse/partition_test.go b/e2e/parse/partition_test.go index 4d1f64c82880ff38f311fb23a2a1e4513db13f86..3412924f7678db58038ba33e93724587bb602baf 100644 --- a/e2e/parse/partition_test.go +++ b/e2e/parse/partition_test.go @@ -8,8 +8,9 @@ package parse import ( - "gitlab.com/elixxir/client/interfaces/message" - "gitlab.com/elixxir/client/storage" + "gitlab.com/elixxir/client/catalog" + "gitlab.com/elixxir/client/storage/versioned" + "gitlab.com/elixxir/ekv" "gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/netTime" "testing" @@ -23,81 +24,69 @@ var ipsumTestStr = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cra // Test that NewPartitioner outputs a correctly made Partitioner func TestNewPartitioner(t *testing.T) { - storeSession := storage.InitTestingSession(t) - p := NewPartitioner(4096, storeSession) + p := NewPartitioner(4096, versioned.NewKV(make(ekv.Memstore))) if p.baseMessageSize != 4096 { - t.Errorf("baseMessageSize content mismatch"+ - "\n\texpected: %v\n\treceived: %v", - 4096, p.baseMessageSize) + t.Errorf("baseMessageSize content mismatch."+ + "\nexpected: %d\nreceived: %d", 4096, p.baseMessageSize) } if p.deltaFirstPart != firstHeaderLen-headerLen { - t.Errorf("deltaFirstPart content mismatch"+ - "\n\texpected: %v\n\treceived: %v", + t.Errorf("deltaFirstPart content mismatch.\nexpected: %d\nreceived: %d", firstHeaderLen-headerLen, p.deltaFirstPart) } if p.firstContentsSize != 4096-firstHeaderLen { - t.Errorf("firstContentsSize content mismatch"+ - "\n\texpected: %v\n\treceived: %v", + t.Errorf("firstContentsSize content mismatch."+ + "\nexpected: %d\nreceived: %d", 4096-firstHeaderLen, p.firstContentsSize) } if p.maxSize != (4096-firstHeaderLen)+(MaxMessageParts-1)*(4096-headerLen) { - t.Errorf("maxSize content mismatch"+ - "\n\texpected: %v\n\treceived: %v", - (4096-firstHeaderLen)+(MaxMessageParts-1)*(4096-headerLen), p.maxSize) + t.Errorf("maxSize content mismatch.\nexpected: %d\nreceived: %d", + (4096-firstHeaderLen)+(MaxMessageParts-1)*(4096-headerLen), + p.maxSize) } if p.partContentsSize != 4088 { - t.Errorf("partContentsSize content mismatch"+ - "\n\texpected: %v\n\treceived: %v", - 4088, p.partContentsSize) + t.Errorf("partContentsSize content mismatch."+ + "\nexpected: %d\nreceived: %d", 4088, p.partContentsSize) } } -// Test that no error is returned running Partition +// Test that no error is returned running Partitioner.Partition. func TestPartitioner_Partition(t *testing.T) { - storeSession := storage.InitTestingSession(t) - p := NewPartitioner(len(ipsumTestStr), storeSession) + p := NewPartitioner(len(ipsumTestStr), versioned.NewKV(make(ekv.Memstore))) - _, _, err := p.Partition(&id.DummyUser, message.XxMessage, - netTime.Now(), []byte(ipsumTestStr)) + _, _, err := p.Partition( + &id.DummyUser, catalog.XxMessage, netTime.Now(), []byte(ipsumTestStr)) if err != nil { t.Error(err) } } -// Test that HandlePartition can handle a message part +// Test that Partitioner.HandlePartition can handle a message part. func TestPartitioner_HandlePartition(t *testing.T) { - storeSession := storage.InitTestingSession(t) - p := NewPartitioner(len(ipsumTestStr), storeSession) - + p := NewPartitioner(len(ipsumTestStr), versioned.NewKV(make(ekv.Memstore))) m := newMessagePart(1107, 1, []byte(ipsumTestStr)) _, _ = p.HandlePartition( &id.DummyUser, - message.None, - m.Bytes(), - []byte{'t', 'e', 's', 't', 'i', 'n', 'g', - 's', 't', 'r', 'i', 'n', 'g'}, + m.bytes(), + []byte{'t', 'e', 's', 't', 'i', 'n', 'g', 's', 't', 'r', 'i', 'n', 'g'}, ) } // Test that HandlePartition can handle a first message part func TestPartitioner_HandleFirstPartition(t *testing.T) { - storeSession := storage.InitTestingSession(t) - p := NewPartitioner(len(ipsumTestStr), storeSession) - - m := newFirstMessagePart(message.XxMessage, 1107, 1, netTime.Now(), []byte(ipsumTestStr)) + p := NewPartitioner(len(ipsumTestStr), versioned.NewKV(make(ekv.Memstore))) + m := newFirstMessagePart( + catalog.XxMessage, 1107, 1, netTime.Now(), []byte(ipsumTestStr)) _, _ = p.HandlePartition( &id.DummyUser, - message.None, - m.Bytes(), - []byte{'t', 'e', 's', 't', 'i', 'n', 'g', - 's', 't', 'r', 'i', 'n', 'g'}, + m.bytes(), + []byte{'t', 'e', 's', 't', 'i', 'n', 'g', 's', 't', 'r', 'i', 'n', 'g'}, ) } diff --git a/network/gateway/hostPool.go b/network/gateway/hostPool.go index d605c7190fec5111fb1e7b47f8e5c9eea3ddb793..25dc7192ed9933dcf323190a526c772a1548e8a3 100644 --- a/network/gateway/hostPool.go +++ b/network/gateway/hostPool.go @@ -742,7 +742,7 @@ func (h *HostPool) addGateway(gwId *id.ID, ndfIndex int) { case h.addGatewayChan <- ng: default: jww.WARN.Printf( - "Unable to send AddGateway event for id %s", gwId) + "Unable to send AddGateway event for ID %s", gwId) } } else if host.GetAddress() != gw.Address { diff --git a/network/gateway/hostpool_test.go b/network/gateway/hostpool_test.go index 9c60be66153f6d19cc7fffc8e57fb8d032e460d5..f5a49cc4e9c0d052ac704d901976da4635069c3c 100644 --- a/network/gateway/hostpool_test.go +++ b/network/gateway/hostpool_test.go @@ -165,7 +165,7 @@ func TestHostPool_ManageHostPool(t *testing.T) { for _, ndfGw := range testNdf.Gateways { gwId, err := id.Unmarshal(ndfGw.ID) if err != nil { - t.Fatalf("Failed to marshal gateway id for %v", ndfGw) + t.Fatalf("Failed to marshal gateway ID for %v", ndfGw) } if _, ok := testPool.hostMap[*gwId]; ok { t.Errorf("Expected gateway %v to be removed from pool", gwId) @@ -765,7 +765,7 @@ func TestHostPool_updateConns_AddGateways(t *testing.T) { for _, ndfGw := range newGateways { gwId, err := id.Unmarshal(ndfGw.ID) if err != nil { - t.Errorf("Failed to marshal gateway id for %v", ndfGw) + t.Errorf("Failed to marshal gateway ID for %v", ndfGw) } _, ok := testPool.getSpecific(gwId) if !ok { @@ -840,7 +840,7 @@ func TestHostPool_updateConns_RemoveGateways(t *testing.T) { for _, ndfGw := range testNdf.Gateways { gwId, err := id.Unmarshal(ndfGw.ID) if err != nil { - t.Fatalf("Failed to marshal gateway id for %v", ndfGw) + t.Fatalf("Failed to marshal gateway ID for %v", ndfGw) } if _, ok := testPool.hostMap[*gwId]; ok { t.Errorf("Expected gateway %v to be removed from pool", gwId)