Skip to content
Snippets Groups Projects
Select Git revision
  • 684b11b3ab9666f9a78e61aa1b0781857f80bb73
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

partition.go

  • user avatar
    684b11b3
    History
    partition.go 4.24 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package parse
    
    import (
    	"time"
    
    	"github.com/pkg/errors"
    	"gitlab.com/elixxir/client/catalog"
    	"gitlab.com/elixxir/client/e2e/parse/conversation"
    	"gitlab.com/elixxir/client/e2e/parse/partition"
    	"gitlab.com/elixxir/client/e2e/receive"
    	"gitlab.com/elixxir/client/storage/versioned"
    	"gitlab.com/xx_network/primitives/id"
    	"gitlab.com/xx_network/primitives/netTime"
    )
    
    const MaxMessageParts = 255
    
    type Partitioner struct {
    	baseMessageSize   int
    	firstContentsSize int
    	partContentsSize  int
    	deltaFirstPart    int
    	maxSize           int
    	conversation      *conversation.Store
    	partition         *partition.Store
    }
    
    func NewPartitioner(kv *versioned.KV, messageSize int) *Partitioner {
    	p := Partitioner{
    		baseMessageSize:   messageSize,
    		firstContentsSize: messageSize - firstHeaderLen,
    		partContentsSize:  messageSize - headerLen,
    		deltaFirstPart:    firstHeaderLen - headerLen,
    		conversation:      conversation.NewStore(kv),
    		partition:         partition.NewOrLoad(kv),
    	}
    	p.maxSize = p.firstContentsSize + (MaxMessageParts-1)*p.partContentsSize
    
    	return &p
    }
    
    func (p *Partitioner) Partition(recipient *id.ID, mt catalog.MessageType,
    	timestamp time.Time, payload []byte) ([][]byte, uint64, error) {
    
    	if len(payload) > p.maxSize {
    		return nil, 0, errors.Errorf("Payload is too long, max payload "+
    			"length is %d, received %d", p.maxSize, len(payload))
    	}
    
    	// Get the ID of the sent message
    	fullMessageID, messageID := p.conversation.Get(recipient).GetNextSendID()
    
    	// Get the number of parts of the message; this equates to just a linear
    	// equation
    	numParts := uint8((len(payload) + p.deltaFirstPart + p.partContentsSize - 1) / p.partContentsSize)
    	parts := make([][]byte, numParts)
    
    	// Create the first message part
    	var sub []byte
    	sub, payload = splitPayload(payload, p.firstContentsSize)
    	parts[0] = newFirstMessagePart(mt, messageID, numParts,
    		timestamp, sub, p.baseMessageSize).bytes()
    
    	// Create all subsequent message parts
    	for i := uint8(1); i < numParts; i++ {
    		sub, payload = splitPayload(payload, p.partContentsSize)
    		parts[i] = newMessagePart(messageID, i, sub, p.baseMessageSize).bytes()
    	}
    
    	return parts, fullMessageID, nil
    }
    
    func (p *Partitioner) HandlePartition(sender *id.ID,
    	contents []byte, relationshipFingerprint []byte) (receive.Message, bool) {
    
    	if isFirst(contents) {
    		// If it is the first message in a set, then handle it as so
    
    		// Decode the message structure
    		fm := firstMessagePartFromBytes(contents)
    
    		// Handle the message ID
    		messageID := p.conversation.Get(sender).
    			ProcessReceivedMessageID(fm.getID())
    		storageTimestamp := netTime.Now()
    		return p.partition.AddFirst(sender, fm.getType(), messageID,
    			fm.getPart(), fm.getNumParts(), fm.getTimestamp(), storageTimestamp,
    			fm.getSizedContents(), relationshipFingerprint)
    	} else {
    		// If it is a subsequent message part, handle it as so
    		mp := messagePartFromBytes(contents)
    		messageID :=
    			p.conversation.Get(sender).ProcessReceivedMessageID(mp.getID())
    
    		return p.partition.Add(sender, messageID, mp.getPart(),
    			mp.getSizedContents(), relationshipFingerprint)
    	}
    }
    
    // FirstPartitionSize returns the max partition payload size for the
    // first payload
    func (p *Partitioner) FirstPartitionSize() uint {
    	return uint(p.firstContentsSize)
    }
    
    // SecondPartitionSize returns the max partition payload size for all
    // payloads after the first payload
    func (p *Partitioner) SecondPartitionSize() uint {
    	return uint(p.partContentsSize)
    }
    
    // PayloadSize Returns the max payload size for a partitionable E2E
    // message
    func (p *Partitioner) PayloadSize() uint {
    	return uint(p.maxSize)
    }
    
    func splitPayload(payload []byte, length int) ([]byte, []byte) {
    	if len(payload) < length {
    		return payload, payload
    	}
    	return payload[:length], payload[length:]
    }
    
    func isFirst(payload []byte) bool {
    	return payload[idLen] == 0
    }