Skip to content
Snippets Groups Projects
Commit ebffad81 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished message partioning

parent 32e83ff9
No related branches found
No related tags found
No related merge requests found
...@@ -19,39 +19,57 @@ type firstMessagePart struct { ...@@ -19,39 +19,57 @@ type firstMessagePart struct {
Timestamp []byte Timestamp []byte
} }
func newFirstMessagePart(mt message.Type, id uint32, part uint8, numParts uint8, //creates a new first message part for the passed in contents. Does no length checks
func newFirstMessagePart(mt message.Type, id uint32, numParts uint8,
timestamp time.Time, contents []byte) firstMessagePart { timestamp time.Time, contents []byte) firstMessagePart {
//create the message structure
data := make([]byte, len(contents)+firstHeaderLen) data := make([]byte, len(contents)+firstHeaderLen)
m := FirstMessagePartFromBytes(data) m := FirstMessagePartFromBytes(data)
//Put the message type in the message
binary.BigEndian.PutUint32(m.Type, uint32(mt)) binary.BigEndian.PutUint32(m.Type, uint32(mt))
//Add the message ID
binary.BigEndian.PutUint32(m.Id, id) binary.BigEndian.PutUint32(m.Id, id)
m.Part[0] = part
// Add the part number to the message, its always zero because this is the
// first part. Because the default is zero this step could be skipped, but\
// keep it in the code for clarity
m.Part[0] = 0
// Add the number of parts to the message
m.NumParts[0] = numParts m.NumParts[0] = numParts
//Serialize and add the timestamp to the payload
timestampBytes, err := timestamp.MarshalBinary() timestampBytes, err := timestamp.MarshalBinary()
if err != nil { if err != nil {
jww.FATAL.Panicf("Failed to create firstMessagePart: %s", err.Error()) jww.FATAL.Panicf("Failed to create firstMessagePart: %s", err.Error())
} }
copy(m.Timestamp, timestampBytes) copy(m.Timestamp, timestampBytes)
//set the contents length
binary.BigEndian.PutUint16(m.Len, uint16(len(contents)))
//add the contents to the payload
copy(m.Contents[:len(contents)], contents) copy(m.Contents[:len(contents)], contents)
//set the first bit to 1 to denote this is not a raw message
data[0] |= 0b10000000
return m return m
} }
// Builds a first message part mapped to the passed in data slice. Mapped by
// reference, a copy is not made.
func FirstMessagePartFromBytes(data []byte) firstMessagePart { func FirstMessagePartFromBytes(data []byte) firstMessagePart {
m := firstMessagePart{ m := firstMessagePart{
messagePart: messagePart{ messagePart: messagePart{
Data: data, Data: data,
Id: data[:idLen], Id: data[:idLen],
Part: data[idLen : idLen+partLen], Part: data[idLen : idLen+partLen],
Contents: data[idLen+partLen+numPartsLen+typeLen+timestampLen:], Len: data[idLen+partLen : idLen+partLen+lenLen],
Contents: data[idLen+partLen+numPartsLen+typeLen+timestampLen+lenLen:],
}, },
NumParts: data[idLen+partLen : idLen+partLen+numPartsLen], NumParts: data[idLen+partLen+lenLen : idLen+partLen+numPartsLen+lenLen],
Type: data[idLen+partLen+numPartsLen : idLen+partLen+numPartsLen+typeLen], Type: data[idLen+partLen+numPartsLen+lenLen : idLen+partLen+numPartsLen+typeLen+lenLen],
Timestamp: data[idLen+partLen+numPartsLen+typeLen : idLen+partLen+numPartsLen+typeLen+timestampLen], Timestamp: data[idLen+partLen+numPartsLen+typeLen+lenLen : idLen+partLen+numPartsLen+typeLen+timestampLen+lenLen],
} }
return m return m
} }
......
...@@ -6,32 +6,46 @@ import ( ...@@ -6,32 +6,46 @@ import (
const idLen = 4 const idLen = 4
const partLen = 1 const partLen = 1
const headerLen = idLen + partLen const lenLen = 2
const headerLen = idLen + partLen + lenLen
type messagePart struct { type messagePart struct {
Data []byte Data []byte
Id []byte Id []byte
Part []byte Part []byte
Len []byte
Contents []byte Contents []byte
} }
//creates a new message part for the passed in contents. Does no length checks
func newMessagePart(id uint32, part uint8, contents []byte) messagePart { func newMessagePart(id uint32, part uint8, contents []byte) messagePart {
//create the message structure
data := make([]byte, len(contents)+headerLen) data := make([]byte, len(contents)+headerLen)
m := MessagePartFromBytes(data) m := MessagePartFromBytes(data)
//add the message ID to the message
binary.BigEndian.PutUint32(m.Id, id) binary.BigEndian.PutUint32(m.Id, id)
//set the message part number
m.Part[0] = part m.Part[0] = part
//set the contents length
binary.BigEndian.PutUint16(m.Len, uint16(len(contents)))
//copy the contents into the message
copy(m.Contents[:len(contents)], contents) copy(m.Contents[:len(contents)], contents)
//set the first bit to 1 to denote this is not a raw message
data[0] |= 0b10000000
return m return m
} }
// Builds a Message part mapped to the passed in data slice. Mapped by
// reference, a copy is not made.
func MessagePartFromBytes(data []byte) messagePart { func MessagePartFromBytes(data []byte) messagePart {
m := messagePart{ m := messagePart{
Data: data, Data: data,
Id: data[:idLen], Id: data[:idLen],
Part: data[idLen : idLen+partLen], Part: data[idLen : idLen+partLen],
Contents: data[idLen+partLen+numPartsLen+typeLen:], Len: data[idLen+partLen : idLen+partLen+lenLen],
Contents: data[idLen+partLen+lenLen:],
} }
return m return m
} }
...@@ -48,10 +62,16 @@ func (m messagePart) GetContents() []byte { ...@@ -48,10 +62,16 @@ func (m messagePart) GetContents() []byte {
return m.Contents return m.Contents
} }
func (m messagePart) GetSizedContents() []byte {
size := m.GetContentsLength()
return m.Contents[:size]
}
func (m messagePart) GetContentsLength() int {
return int(binary.BigEndian.Uint16(m.Len))
}
func (m messagePart) Bytes() []byte { func (m messagePart) Bytes() []byte {
return m.Data return m.Data
} }
func (m messagePart) IsRaw() bool {
return isRaw(m.Data[0])
}
...@@ -40,15 +40,20 @@ func (p Partitioner) Partition(recipient *id.ID, mt message.Type, ...@@ -40,15 +40,20 @@ func (p Partitioner) Partition(recipient *id.ID, mt message.Type,
"length is %v, received %v", p.maxSize, len(payload)) "length is %v, received %v", p.maxSize, len(payload))
} }
//Get the ID of the sent message
_, messageID := p.ctx.Session.Conversations().Get(recipient).GetNextSendID() _, messageID := p.ctx.Session.Conversations().Get(recipient).GetNextSendID()
// get the number of parts of the message. This equates to just a linear
// equation
numParts := uint8((len(payload) + p.deltaFirstPart + p.partContentsSize - 1) / p.partContentsSize) numParts := uint8((len(payload) + p.deltaFirstPart + p.partContentsSize - 1) / p.partContentsSize)
parts := make([][]byte, numParts) parts := make([][]byte, numParts)
//Create the first message part
var sub []byte var sub []byte
sub, payload = splitPayload(payload, p.firstContentsSize) sub, payload = splitPayload(payload, p.firstContentsSize)
parts[0] = newFirstMessagePart(mt, messageID, 0, numParts, timestamp, sub).Bytes() parts[0] = newFirstMessagePart(mt, messageID, numParts, timestamp, sub).Bytes()
//create all subsiquent message parts
for i := uint8(1); i < numParts; i++ { for i := uint8(1); i < numParts; i++ {
sub, payload = splitPayload(payload, p.partContentsSize) sub, payload = splitPayload(payload, p.partContentsSize)
parts[i] = newMessagePart(messageID, i, sub).Bytes() parts[i] = newMessagePart(messageID, i, sub).Bytes()
...@@ -58,19 +63,11 @@ func (p Partitioner) Partition(recipient *id.ID, mt message.Type, ...@@ -58,19 +63,11 @@ func (p Partitioner) Partition(recipient *id.ID, mt message.Type,
} }
func (p Partitioner) HandlePartition(sender *id.ID, e message.EncryptionType, func (p Partitioner) HandlePartition(sender *id.ID, e message.EncryptionType,
contents []byte) (message.Receive, bool, error) { contents []byte) (message.Receive, bool) {
//if it is a raw message, there is nothing to do
if isRaw(contents) {
return message.Receive{
Payload: contents,
MessageType: message.Raw,
Sender: sender,
Timestamp: time.Time{},
Encryption: e,
}, true, nil
}
//If it is the first message in a set, handle it as so
if isFirst(contents) { if isFirst(contents) {
//decode the message structure
fm := FirstMessagePartFromBytes(contents) fm := FirstMessagePartFromBytes(contents)
timestamp, err := fm.GetTimestamp() timestamp, err := fm.GetTimestamp()
if err != nil { if err != nil {
...@@ -79,29 +76,22 @@ func (p Partitioner) HandlePartition(sender *id.ID, e message.EncryptionType, ...@@ -79,29 +76,22 @@ func (p Partitioner) HandlePartition(sender *id.ID, e message.EncryptionType,
fm.Timestamp, err) fm.Timestamp, err)
} }
//Handle the message ID
messageID := p.ctx.Session.Conversations().Get(sender). messageID := p.ctx.Session.Conversations().Get(sender).
ProcessReceivedMessageID(fm.GetID()) ProcessReceivedMessageID(fm.GetID())
m, ok := p.ctx.Session.Partition().AddFirst(sender, fm.GetType(), //Return the
return p.ctx.Session.Partition().AddFirst(sender, fm.GetType(),
messageID, fm.GetPart(), fm.GetNumParts(), timestamp, messageID, fm.GetPart(), fm.GetNumParts(), timestamp,
fm.GetContents()) fm.GetSizedContents())
if ok { //If it is a subsiquent message part, handle it as so
return m, true, nil
} else {
return message.Receive{}, false, nil
}
} else { } else {
mp := MessagePartFromBytes(contents) mp := MessagePartFromBytes(contents)
messageID := p.ctx.Session.Conversations().Get(sender). messageID := p.ctx.Session.Conversations().Get(sender).
ProcessReceivedMessageID(mp.GetID()) ProcessReceivedMessageID(mp.GetID())
m, ok := p.ctx.Session.Partition().Add(sender, messageID, mp.GetPart(), return p.ctx.Session.Partition().Add(sender, messageID, mp.GetPart(),
mp.GetContents()) mp.GetSizedContents())
if ok {
return m, true, nil
} else {
return message.Receive{}, false, nil
}
} }
} }
...@@ -112,10 +102,6 @@ func splitPayload(payload []byte, length int) ([]byte, []byte) { ...@@ -112,10 +102,6 @@ func splitPayload(payload []byte, length int) ([]byte, []byte) {
return payload[:length], payload[length:] return payload[:length], payload[length:]
} }
func isRaw(payload []byte) bool {
return payload[0]&0b10000000 == 0
}
func isFirst(payload []byte) bool { func isFirst(payload []byte) bool {
return payload[idLen] == 0 return payload[idLen] == 0
} }
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
const conversationKeyPrefix = "conversation" const conversationKeyPrefix = "conversation"
const currentConversationVersion = 0 const currentConversationVersion = 0
const maxTruncatedID = math.MaxUint32 / 2 const maxTruncatedID = math.MaxUint32
const bottomRegion = maxTruncatedID / 4 const bottomRegion = maxTruncatedID / 4
const topRegion = bottomRegion * 3 const topRegion = bottomRegion * 3
...@@ -82,7 +82,7 @@ func (c *Conversation) ProcessReceivedMessageID(mid uint32) uint64 { ...@@ -82,7 +82,7 @@ func (c *Conversation) ProcessReceivedMessageID(mid uint32) uint64 {
high = c.numReceivedRevolutions - 1 high = c.numReceivedRevolutions - 1
} }
return (uint64(high) << 31) | uint64(mid) return (uint64(high) << 32) | uint64(mid)
} }
func cmp(a, b uint32) int { func cmp(a, b uint32) int {
......
...@@ -74,7 +74,6 @@ func (k *Key) Decrypt(msg format.Message) (format.Message, error) { ...@@ -74,7 +74,6 @@ func (k *Key) Decrypt(msg format.Message) (format.Message, error) {
return format.Message{}, errors.New("HMAC verification failed for E2E message") return format.Message{}, errors.New("HMAC verification failed for E2E message")
} }
// Decrypt the payload // Decrypt the payload
decryptedPayload := e2eCrypto.Crypt(key, fp, msg.GetContents()) decryptedPayload := e2eCrypto.Crypt(key, fp, msg.GetContents())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment