Skip to content
Snippets Groups Projects
Commit 536d0ab4 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Merge branch 'release' into 'XX-3566_const_time_comparisons'

# Conflicts:
#   cmix/follow.go
parents 0fa8acc0 121dc3f3
No related branches found
No related tags found
2 merge requests!510Release,!434constant time byte slice comparisons
Showing
with 565 additions and 365 deletions
...@@ -17,6 +17,7 @@ stages: ...@@ -17,6 +17,7 @@ stages:
- test - test
- build - build
- trigger_integration - trigger_integration
- version_check
test: test:
stage: test stage: test
...@@ -72,8 +73,19 @@ tag: ...@@ -72,8 +73,19 @@ tag:
image: $DOCKER_IMAGE image: $DOCKER_IMAGE
script: script:
- git remote add origin_tags git@$GITLAB_SERVER:elixxir/client.git || true - git remote add origin_tags git@$GITLAB_SERVER:elixxir/client.git || true
- git tag $(release/client.linux64 version | grep "Elixxir Client v"| cut -d ' ' -f3) -f - git tag $(release/client.linux64 version | grep "Elixxir Client v"| cut -d ' ' -f3)
- git push origin_tags -f --tags - git push origin_tags --tags
version_check:
stage: version_check
only:
- master
- release
image: $DOCKER_IMAGE
script:
- GITTAG=$(git describe --tags)
- CODEVERS=$(release/client.darwin64 version | grep "Elixxir Client v"| cut -d ' ' -f3)
- if [[ $GITTAG != $CODEVERS ]]; then echo "VERSION NUMBER BAD $GITTAG != $CODEVER"; exit -1; fi
bindings-ios: bindings-ios:
stage: build stage: build
......
...@@ -462,19 +462,19 @@ use the correct versions as listed below. ...@@ -462,19 +462,19 @@ use the correct versions as listed below.
| | Version | Download | Documentation | | | Version | Download | Documentation |
|----------------------|--------:|---------------------------------------------------------------------|-------------------------------------------------------------------------| |----------------------|--------:|---------------------------------------------------------------------|-------------------------------------------------------------------------|
| `protoc` | 3.15.6 | https://github.com/protocolbuffers/protobuf/releases/tag/v3.15.6 | https://developers.google.com/protocol-buffers/docs/gotutorial | | `protoc` | 3.21.9 | https://github.com/protocolbuffers/protobuf/releases/tag/v3.21.9 | https://developers.google.com/protocol-buffers/docs/gotutorial |
| `protoc-gen-go` | 1.27.1 | https://github.com/protocolbuffers/protobuf-go/releases/tag/v1.27.1 | https://pkg.go.dev/google.golang.org/protobuf@v1.27.1/cmd/protoc-gen-go | | `protoc-gen-go` | 1.28.1 | https://github.com/protocolbuffers/protobuf-go/releases/tag/v1.28.1 | https://pkg.go.dev/google.golang.org/protobuf@v1.28.1/cmd/protoc-gen-go |
| `protoc-gen-go-grpc` | 1.2.0 | https://github.com/grpc/grpc-go/releases/tag/v1.2.0 | https://pkg.go.dev/google.golang.org/grpc/cmd/protoc-gen-go-grpc | | `protoc-gen-go-grpc` | 1.2.0 | https://github.com/grpc/grpc-go/releases/tag/v1.2.0 | https://pkg.go.dev/google.golang.org/grpc/cmd/protoc-gen-go-grpc |
1. Download the correct release of `protoc` from the 1. Download the correct release of `protoc` from the
[release page](https://github.com/protocolbuffers/protobuf/releases) or use [release page](https://github.com/protocolbuffers/protobuf/releases) or use
the link from the table above to get the download for your OS. the link from the table above to get the download for your OS.
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.15.6/protoc-3.15.6-linux-x86_64.zip wget https://github.com/protocolbuffers/protobuf/releases/download/v3.21.9/protoc-3.21.9-linux-x86_64.zip
2. Extract the files to a folder, such as `$HOME/.local`. 2. Extract the files to a folder, such as `$HOME/.local`.
unzip protoc-3.15.6-linux-x86_64.zip -d $HOME/.local unzip protoc-3.21.9-linux-x86_64.zip -d $HOME/.local
3. Add the selected directory to your environment’s `PATH` variable, make sure 3. Add the selected directory to your environment’s `PATH` variable, make sure
to include it in your `.profile` or `.bashrc` file. Also, include your go bin to include it in your `.profile` or `.bashrc` file. Also, include your go bin
...@@ -492,18 +492,18 @@ use the correct versions as listed below. ...@@ -492,18 +492,18 @@ use the correct versions as listed below.
Which prints the current version Which prints the current version
libprotoc 3.15.6 libprotoc 3.21.9
5. Next, download `protoc-gen-go` and `protoc-gen-go-grpc` using the version 5. Next, download `protoc-gen-go` and `protoc-gen-go-grpc` using the version
found in the table above. found in the table above.
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.27 go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
6. Check that `protoc-gen-go` is installed with the correct version. 6. Check that `protoc-gen-go` is installed with the correct version.
protoc-gen-go --version protoc-gen-go --version
protoc-gen-go v1.27.1 protoc-gen-go v1.28.1
7. Check that `protoc-gen-go-grpc` is installed with the correct version. 7. Check that `protoc-gen-go-grpc` is installed with the correct version.
......
...@@ -906,6 +906,9 @@ func (cm *ChannelsManager) SendAdminGeneric(adminPrivateKey, ...@@ -906,6 +906,9 @@ func (cm *ChannelsManager) SendAdminGeneric(adminPrivateKey,
chanMsgId, rnd, ephId, err := cm.api.SendAdminGeneric(rsaPrivKey, chanMsgId, rnd, ephId, err := cm.api.SendAdminGeneric(rsaPrivKey,
chanId, msgTy, message, time.Duration(leaseTimeMS), chanId, msgTy, message, time.Duration(leaseTimeMS),
params.CMIX) params.CMIX)
if err != nil {
return nil, err
}
// Construct send report // Construct send report
return constructChannelSendReport(chanMsgId, rnd.ID, ephId) return constructChannelSendReport(chanMsgId, rnd.ID, ephId)
......
...@@ -8,7 +8,9 @@ ...@@ -8,7 +8,9 @@
package bindings package bindings
import ( import (
"encoding/base64"
"encoding/json" "encoding/json"
"github.com/pkg/errors"
jww "github.com/spf13/jwalterweatherman" jww "github.com/spf13/jwalterweatherman"
"time" "time"
...@@ -23,62 +25,77 @@ import ( ...@@ -23,62 +25,77 @@ import (
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// FileTransfer object is a bindings-layer struct which wraps a // FileTransfer object is a bindings-layer struct which wraps a
// fileTransfer.FileTransfer interface. // [fileTransfer.FileTransfer] interface.
type FileTransfer struct { type FileTransfer struct {
w *e2e.Wrapper w *e2e.Wrapper
} }
// ReceivedFile is a public struct that contains the metadata of a new file // ReceivedFile contains the metadata of a new received file transfer. It is
// transfer. // received from a sender on a new file transfer. It is returned by
// [ReceiveFileCallback.Callback].
// //
// Example JSON: // Example JSON:
// { // {
// "TransferID":"B4Z9cwU18beRoGbk5xBjbcd5Ryi9ZUFA2UBvi8FOHWo=", // "TransferID": "0U+QY1nMOUzQGxGpqZyxDw8Cd6+qm8t870CzLtVoUM8=",
// "SenderID":"emV6aW1hAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD", // "SenderID": "UL3+S8XdJHAfUtCUm7iZMxW8orR8Nd5JM9Ky7/5jds8D",
// "Preview":"aXQncyBtZSBhIHByZXZpZXc=", // "Preview": "aXQNcyBtZSBhIHByZXZpZXc=",
// "Name": "testfile.txt", // "Name": "testfile.txt",
// "Type": "text file", // "Type": "text file",
// "Size": 2048 // "Size": 2048
// } // }
type ReceivedFile struct { type ReceivedFile struct {
TransferID []byte // ID of the file transfer TransferID *ftCrypto.TransferID // ID of the file transfer
SenderID []byte // ID of the file sender SenderID *id.ID // ID of the file sender
Preview []byte // A preview of the file Preview []byte // A preview of the file
Name string // Name of the file Name string // Name of the file
Type string // String that indicates type of file Type string // String that indicates type of file
Size int // The size of the file, in bytes Size int // The size of the file, in bytes
} }
// FileSend is a public struct that contains the file contents and its name, // FileSend contains the file and its metadata to send. This structure is JSON
// type, and preview. // marshalled and passed as the payload to [FileTransfer.Send].
//
// Example JSON:
// { // {
// "Name": "testfile.txt", // "Name": "testfile.txt",
// "Type": "text file", // "Type": "text file",
// "Preview":"aXQncyBtZSBhIHByZXZpZXc=", // "Preview": "RMlsZSBwCmV2aWV3Lg==",
// "Contents":"VGhpcyBpcyB0aGUgZnVsbCBjb250ZW50cyBvZiB0aGUgZmlsZSBpbiBieXRlcw==" // "Contents": "RMlsZSBjb250ZW50cy4="
// } // }
type FileSend struct { type FileSend struct {
Name string // Name of the file // Name is the human-readable file name. Get max length from
Type string // String that indicates type of file // [FileTransfer.MaxFileNameLen].
Preview []byte // A preview of the file Name string
Contents []byte // Full contents of the file
// Type is a shorthand that identifies the type of file. Get max length from
// [FileTransfer.MaxFileTypeLen].
Type string
// Preview of the file data (e.g. a thumbnail). Get max length from
// [FileTransfer.MaxPreviewSize].
Preview []byte
// Contents is the full file contents. Get max length from
// [FileTransfer.MaxFileSize].
Contents []byte
} }
// Progress is a public struct that represents the progress of an in-progress // Progress contains the progress information of a transfer. It is returned by
// file transfer. // [FileTransferSentProgressCallback.Callback] and
// [FileTransferReceiveProgressCallback.Callback].
// //
// Example JSON: // Example JSON:
// { // {
// "TransferID": "RyJcMqtI3IIM1+YMxRwCcFiOX6AGuIzS+vQaPnqXVT8=",
// "Completed": false, // "Completed": false,
// "Transmitted": 128, // "Transmitted": 128,
// "Total":2048, // "Total": 2048
// "Err":null
// } // }
type Progress struct { type Progress struct {
TransferID *ftCrypto.TransferID // Transfer ID
Completed bool // Status of transfer (true if done) Completed bool // Status of transfer (true if done)
Transmitted int // Number of file parts sent/received Transmitted int // Number of file parts sent/received
Total int // Total number of file parts Total int // Total number of file parts
Err error // Error status (if any)
} }
// ReceiveFileCallback is a bindings-layer interface that contains a callback // ReceiveFileCallback is a bindings-layer interface that contains a callback
...@@ -87,33 +104,42 @@ type ReceiveFileCallback interface { ...@@ -87,33 +104,42 @@ type ReceiveFileCallback interface {
// Callback is called when a new file transfer is received. // Callback is called when a new file transfer is received.
// //
// Parameters: // Parameters:
// - payload - the JSON marshalled bytes of a ReceivedFile object. // - payload - JSON of [ReceivedFile], which contains information about the
// - err - any errors that occurred during reception // incoming file transfer.
Callback(payload []byte, err error) Callback(payload []byte)
} }
// FileTransferSentProgressCallback is a bindings-layer interface that contains // FileTransferSentProgressCallback is a bindings-layer interface that contains
// a callback that is called when the sent progress updates. // a callback that is called when the sent progress updates.
type FileTransferSentProgressCallback interface { type FileTransferSentProgressCallback interface {
// Callback is called when a file part is sent or an error occurs. // Callback is called when a file part is sent or an error occurs. Once a
// transfer completes, it should be closed using [FileTransfer.CloseSend].
// //
// Parameters: // Parameters:
// - payload - the JSON marshalled bytes of a Progress object. // - payload - JSON of [Progress], which describes the progress of the
// - t - tracker that allows the lookup of the status of any file part // current transfer.
// - err - any errors that occurred during sending // - t - file part tracker that allows the lookup of the status of
// individual file parts.
// - err - Fatal errors during sending. If an error is returned, the
// transfer has failed and will not resume. It must be cleared using
// [FileTransfer.CloseSend].
Callback(payload []byte, t *FilePartTracker, err error) Callback(payload []byte, t *FilePartTracker, err error)
} }
// FileTransferReceiveProgressCallback is a bindings-layer interface that is // FileTransferReceiveProgressCallback is a bindings-layer interface that is
// called with the progress of a received file. // called with the progress of a received file.
//
type FileTransferReceiveProgressCallback interface { type FileTransferReceiveProgressCallback interface {
// Callback is called when a file part is sent or an error occurs. // Callback is called when a file part is received or an error occurs. Once
// a transfer completes, the file can be received using
// [FileTransfer.Receive].
// //
// Parameters: // Parameters:
// - payload - the JSON marshalled bytes of a Progress object. // - payload - JSON of [Progress], which describes the progress of the
// - t - tracker that allows the lookup of the status of any file part // current transfer.
// - err - any errors that occurred during sending // - t - file part tracker that allows the lookup of the status of
// individual file parts.
// - err - Fatal errors during receiving. If an error is returned, the
// transfer has failed and will not resume.
Callback(payload []byte, t *FilePartTracker, err error) Callback(payload []byte, t *FilePartTracker, err error)
} }
...@@ -124,18 +150,28 @@ type FileTransferReceiveProgressCallback interface { ...@@ -124,18 +150,28 @@ type FileTransferReceiveProgressCallback interface {
// InitFileTransfer creates a bindings-level file transfer manager. // InitFileTransfer creates a bindings-level file transfer manager.
// //
// Parameters: // Parameters:
// - e2eID - e2e object ID in the tracker // - e2eID - ID of [E2e] object in tracker.
// - paramsJSON - JSON marshalled fileTransfer.Params // - receiveFileCallback - A callback that is called when a new file transfer
// is received.
// - e2eFileTransferParamsJson - JSON of
// [gitlab.com/elixxir/client/fileTransfer/e2e.Params].
// - fileTransferParamsJson - JSON of [fileTransfer.Params].
//
// Returns:
// - New [FileTransfer] object.
func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback,
e2eFileTransferParamsJson, fileTransferParamsJson []byte) (*FileTransfer, error) { e2eFileTransferParamsJson, fileTransferParamsJson []byte) (*FileTransfer, error) {
jww.INFO.Printf("Calling InitFileTransfer()") jww.INFO.Printf("[FT] Calling InitFileTransfer(e2eID:%d params:%s)",
e2eID, fileTransferParamsJson)
// Get user from singleton // Get user from singleton
user, err := e2eTrackerSingleton.get(e2eID) user, err := e2eTrackerSingleton.get(e2eID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
e2eFileTransferParams, err := parseE2eFileTransferParams(e2eFileTransferParamsJson) e2eFileTransferParams, err :=
parseE2eFileTransferParams(e2eFileTransferParamsJson)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -147,17 +183,26 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, ...@@ -147,17 +183,26 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback,
// Create file transfer manager // Create file transfer manager
m, err := fileTransfer.NewManager(fileTransferParams, user.api) m, err := fileTransfer.NewManager(fileTransferParams, user.api)
if err != nil {
return nil, errors.Errorf(
"could not create new file transfer manager: %+v", err)
}
rcb := func(tid *ftCrypto.TransferID, fileName, fileType string, rcb := func(tid *ftCrypto.TransferID, fileName, fileType string,
sender *id.ID, size uint32, preview []byte) { sender *id.ID, size uint32, preview []byte) {
receiveFileCallback.Callback(json.Marshal(ReceivedFile{ data, err := json.Marshal(ReceivedFile{
TransferID: tid.Bytes(), TransferID: tid,
SenderID: sender.Marshal(), SenderID: sender,
Preview: preview, Preview: preview,
Name: fileName, Name: fileName,
Type: fileType, Type: fileType,
Size: int(size), Size: int(size),
})) })
if err != nil {
jww.FATAL.Panicf(
"[FT] Failed to JSON marshal ReceivedFile: %+v", err)
}
receiveFileCallback.Callback(data)
} }
w, err := e2e.NewWrapper(rcb, e2eFileTransferParams, m, user.api) w, err := e2e.NewWrapper(rcb, e2eFileTransferParams, m, user.api)
...@@ -175,21 +220,31 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback, ...@@ -175,21 +220,31 @@ func InitFileTransfer(e2eID int, receiveFileCallback ReceiveFileCallback,
return &FileTransfer{w: w}, nil return &FileTransfer{w: w}, nil
} }
// Send is the bindings-level function for sending a file. // Send initiates the sending of a file to a recipient and returns a transfer ID
// that uniquely identifies this file transfer. Progress for the file transfer
// is reported to that passed in callback.
// //
// Parameters: // Parameters:
// - payload - JSON marshalled FileSend // - payload - JSON of [FileSend], which contains the file contents and its
// - recipientID - marshalled recipient id.ID // metadata.
// - retry - number of retries allowed // - recipientID - marshalled bytes of the recipient's [id.ID].
// - callback - callback that reports file sending progress // - retry - The number of sending retries allowed on send failure (e.g. a
// - period - Duration (in ms) to wait between progress callbacks triggering. // retry of 2.0 with 6 parts means 12 total possible sends).
// This value should depend on how frequently you want to receive updates, // - callback - A callback that reports the progress of the file transfer. The
// and should be tuned to your implementation. // callback is called once on initialization, on every progress update (or
// less if restricted by the period), or on fatal error.
// - period - The progress callback will be limited from triggering only once
// per period. It is a duration in milliseconds. This value should depend on
// how frequently you want to receive updates, and should be tuned to your
// implementation.
// //
// Returns: // Returns:
// - []byte - unique file transfer ID // - The bytes of the unique [fileTransfer.TransferID].
func (f *FileTransfer) Send(payload, recipientID []byte, retry float32, func (f *FileTransfer) Send(payload, recipientID []byte, retry float32,
callback FileTransferSentProgressCallback, period int) ([]byte, error) { callback FileTransferSentProgressCallback, period int) ([]byte, error) {
jww.INFO.Printf("[FT] Sending file transfer to %s.",
base64.StdEncoding.EncodeToString(recipientID))
// Unmarshal recipient ID // Unmarshal recipient ID
recipient, err := id.Unmarshal(recipientID) recipient, err := id.Unmarshal(recipientID)
if err != nil { if err != nil {
...@@ -201,25 +256,29 @@ func (f *FileTransfer) Send(payload, recipientID []byte, retry float32, ...@@ -201,25 +256,29 @@ func (f *FileTransfer) Send(payload, recipientID []byte, retry float32,
// Wrap transfer progress callback to be passed to fileTransfer layer // Wrap transfer progress callback to be passed to fileTransfer layer
cb := func(completed bool, arrived, total uint16, cb := func(completed bool, arrived, total uint16,
st fileTransfer.SentTransfer, t fileTransfer.FilePartTracker, err error) { st fileTransfer.SentTransfer, t fileTransfer.FilePartTracker, err error) {
prog := &Progress{ progress := &Progress{
TransferID: st.TransferID(),
Completed: completed, Completed: completed,
Transmitted: int(arrived), Transmitted: int(arrived),
Total: int(total), Total: int(total),
Err: err,
} }
pm, err := json.Marshal(prog) pm, err2 := json.Marshal(progress)
if err2 != nil {
jww.FATAL.Panicf(
"[FT] Failed to JSON marshal sent Progress object: %+v", err)
}
callback.Callback(pm, &FilePartTracker{t}, err) callback.Callback(pm, &FilePartTracker{t}, err)
} }
// Unmarshal payload // Unmarshal payload
fs := &FileSend{} var fs FileSend
err = json.Unmarshal(payload, fs) if err = json.Unmarshal(payload, &fs); err != nil {
if err != nil {
return nil, err return nil, err
} }
// Send file // Send file
ftID, err := f.w.Send(recipient, fs.Name, fs.Type, fs.Contents, retry, fs.Preview, cb, p) ftID, err := f.w.Send(
recipient, fs.Name, fs.Type, fs.Contents, retry, fs.Preview, cb, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -230,28 +289,28 @@ func (f *FileTransfer) Send(payload, recipientID []byte, retry float32, ...@@ -230,28 +289,28 @@ func (f *FileTransfer) Send(payload, recipientID []byte, retry float32,
// Receive returns the full file on the completion of the transfer. It deletes // Receive returns the full file on the completion of the transfer. It deletes
// internal references to the data and unregisters any attached progress // internal references to the data and unregisters any attached progress
// callbacks. Returns an error if the transfer is not complete, the full file // callback. Returns an error if the transfer is not complete, the full file
// cannot be verified, or if the transfer cannot be found. // cannot be verified, or if the transfer cannot be found.
// //
// Receive can only be called once the progress callback returns that the // Receive can only be called once the progress callback returns that the file
// file transfer is complete. // transfer is complete.
// //
// Parameters: // Parameters:
// - tidBytes - file transfer ID // - tidBytes - The file transfer's unique [fileTransfer.TransferID].
func (f *FileTransfer) Receive(tidBytes []byte) ([]byte, error) { func (f *FileTransfer) Receive(tidBytes []byte) ([]byte, error) {
tid := ftCrypto.UnmarshalTransferID(tidBytes) tid := ftCrypto.UnmarshalTransferID(tidBytes)
return f.w.Receive(&tid) return f.w.Receive(&tid)
} }
// CloseSend deletes a file from the internal storage once a transfer has // CloseSend deletes a file from the internal storage once a transfer has
// completed or reached the retry limit. Returns an error if the transfer has // completed or reached the retry limit. If neither of those condition are met,
// not run out of retries. // an error is returned.
// //
// This function should be called once a transfer completes or errors out (as // This function should be called once a transfer completes or errors out (as
// reported by the progress callback). // reported by the progress callback).
// //
// Parameters: // Parameters:
// - tidBytes - file transfer ID // - tidBytes - the file transfer's unique [fileTransfer.TransferID].
func (f *FileTransfer) CloseSend(tidBytes []byte) error { func (f *FileTransfer) CloseSend(tidBytes []byte) error {
tid := ftCrypto.UnmarshalTransferID(tidBytes) tid := ftCrypto.UnmarshalTransferID(tidBytes)
return f.w.CloseSend(&tid) return f.w.CloseSend(&tid)
...@@ -264,26 +323,39 @@ func (f *FileTransfer) CloseSend(tidBytes []byte) error { ...@@ -264,26 +323,39 @@ func (f *FileTransfer) CloseSend(tidBytes []byte) error {
// RegisterSentProgressCallback allows for the registration of a callback to // RegisterSentProgressCallback allows for the registration of a callback to
// track the progress of an individual sent file transfer. // track the progress of an individual sent file transfer.
// //
// SentProgressCallback is auto registered on Send; this function should be // The callback will be called immediately when added to report the current
// called when resuming clients or registering extra callbacks. // progress of the transfer. It will then call every time a file part
// arrives, the transfer completes, or a fatal error occurs. It is called at
// most once every period regardless of the number of progress updates.
//
// In the event that the client is closed and resumed, this function must be
// used to re-register any callbacks previously registered with this
// function or Send.
// //
// Parameters: // Parameters:
// - tidBytes - file transfer ID // - tidBytes - The file transfer's unique [fileTransfer.TransferID].
// - callback - callback that reports file reception progress // - callback - A callback that reports the progress of the file transfer. The
// - period - Duration (in ms) to wait between progress callbacks triggering. // callback is called once on initialization, on every progress update (or
// This value should depend on how frequently you want to receive updates, // less if restricted by the period), or on fatal error.
// and should be tuned to your implementation. // - period - The progress callback will be limited from triggering only once
// per period. It is a duration in milliseconds. This value should depend on
// how frequently you want to receive updates, and should be tuned to your
// implementation.
func (f *FileTransfer) RegisterSentProgressCallback(tidBytes []byte, func (f *FileTransfer) RegisterSentProgressCallback(tidBytes []byte,
callback FileTransferSentProgressCallback, period int) error { callback FileTransferSentProgressCallback, period int) error {
cb := func(completed bool, arrived, total uint16, cb := func(completed bool, arrived, total uint16,
st fileTransfer.SentTransfer, t fileTransfer.FilePartTracker, err error) { st fileTransfer.SentTransfer, t fileTransfer.FilePartTracker, err error) {
prog := &Progress{ progress := &Progress{
TransferID: st.TransferID(),
Completed: completed, Completed: completed,
Transmitted: int(arrived), Transmitted: int(arrived),
Total: int(total), Total: int(total),
Err: err,
} }
pm, err := json.Marshal(prog) pm, err2 := json.Marshal(progress)
if err2 != nil {
jww.FATAL.Panicf(
"[FT] Failed to JSON marshal sent Progress object: %+v", err)
}
callback.Callback(pm, &FilePartTracker{t}, err) callback.Callback(pm, &FilePartTracker{t}, err)
} }
p := time.Millisecond * time.Duration(period) p := time.Millisecond * time.Duration(period)
...@@ -295,25 +367,42 @@ func (f *FileTransfer) RegisterSentProgressCallback(tidBytes []byte, ...@@ -295,25 +367,42 @@ func (f *FileTransfer) RegisterSentProgressCallback(tidBytes []byte,
// RegisterReceivedProgressCallback allows for the registration of a callback to // RegisterReceivedProgressCallback allows for the registration of a callback to
// track the progress of an individual received file transfer. // track the progress of an individual received file transfer.
// //
// This should be done when a new transfer is received on the ReceiveCallback. // The callback will be called immediately when added to report the current
// progress of the transfer. It will then call every time a file part is
// received, the transfer completes, or a fatal error occurs. It is called at
// most once every period regardless of the number of progress updates.
//
// In the event that the client is closed and resumed, this function must be
// used to re-register any callbacks previously registered.
//
// Once the callback reports that the transfer has completed, the recipient can
// get the full file by calling Receive.
// //
// Parameters: // Parameters:
// - tidBytes - file transfer ID // - tidBytes - The file transfer's unique [fileTransfer.TransferID].
// - callback - callback that reports file reception progress // - callback - A callback that reports the progress of the file transfer. The
// - period - Duration (in ms) to wait between progress callbacks triggering. // callback is called once on initialization, on every progress update (or
// This value should depend on how frequently you want to receive updates, // less if restricted by the period), or on fatal error.
// and should be tuned to your implementation. // - period - The progress callback will be limited from triggering only once
// per period. It is a duration in milliseconds. This value should depend on
// how frequently you want to receive updates, and should be tuned to your
// implementation.
func (f *FileTransfer) RegisterReceivedProgressCallback(tidBytes []byte, func (f *FileTransfer) RegisterReceivedProgressCallback(tidBytes []byte,
callback FileTransferReceiveProgressCallback, period int) error { callback FileTransferReceiveProgressCallback, period int) error {
cb := func(completed bool, received, total uint16, cb := func(completed bool, received, total uint16,
rt fileTransfer.ReceivedTransfer, t fileTransfer.FilePartTracker, err error) { rt fileTransfer.ReceivedTransfer, t fileTransfer.FilePartTracker,
prog := &Progress{ err error) {
progress := &Progress{
TransferID: rt.TransferID(),
Completed: completed, Completed: completed,
Transmitted: int(received), Transmitted: int(received),
Total: int(total), Total: int(total),
Err: err,
} }
pm, err := json.Marshal(prog) pm, err2 := json.Marshal(progress)
if err2 != nil {
jww.FATAL.Panicf(
"[FT] Failed to JSON marshal received Progress object: %+v", err)
}
callback.Callback(pm, &FilePartTracker{t}, err) callback.Callback(pm, &FilePartTracker{t}, err)
} }
p := time.Millisecond * time.Duration(period) p := time.Millisecond * time.Duration(period)
......
...@@ -9,6 +9,7 @@ package bindings ...@@ -9,6 +9,7 @@ package bindings
import ( import (
"encoding/json" "encoding/json"
"fmt"
"testing" "testing"
"gitlab.com/elixxir/crypto/fileTransfer" "gitlab.com/elixxir/crypto/fileTransfer"
...@@ -16,52 +17,53 @@ import ( ...@@ -16,52 +17,53 @@ import (
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// Creates example JSON outputs used in documentation.
func TestFileTransfer_inputs(t *testing.T) { func TestFileTransfer_inputs(t *testing.T) {
fs := &FileSend{ // ReceivedFile
Name: "testfile.txt",
Type: "text file",
Preview: []byte("it's me a preview"),
Contents: []byte("This is the full contents of the file in bytes"),
}
fsm, _ := json.Marshal(fs)
t.Log("FileSend example json:")
t.Log(string(fsm))
t.Log("\n")
tid, _ := fileTransfer.NewTransferID(csprng.NewSystemRNG()) tid, _ := fileTransfer.NewTransferID(csprng.NewSystemRNG())
sid := id.NewIdFromString("zezima", id.User, t) sid, _ := id.NewRandomID(csprng.NewSystemRNG(), id.User)
rf := &ReceivedFile{ rf := &ReceivedFile{
TransferID: tid.Bytes(), TransferID: &tid,
SenderID: sid.Marshal(), SenderID: sid,
Preview: []byte("it's me a preview"), Preview: []byte("it's me a preview"),
Name: "testfile.txt", Name: "testfile.txt",
Type: "text file", Type: "text file",
Size: 2048, Size: 2048,
} }
rfm, _ := json.Marshal(rf) rfm, _ := json.MarshalIndent(rf, "", " ")
t.Log("ReceivedFile example json:") t.Log("ReceivedFile example JSON:")
t.Log(string(rfm)) fmt.Printf("%s\n\n", rfm)
t.Log("\n")
// FileSend
fs := &FileSend{
Name: "testFile",
Type: "txt",
Preview: []byte("File preview."),
Contents: []byte("File contents."),
}
fsm, _ := json.MarshalIndent(fs, "", " ")
t.Log("FileSend example JSON:")
fmt.Printf("%s\n\n", fsm)
// Progress
p := &Progress{ p := &Progress{
TransferID: &tid,
Completed: false, Completed: false,
Transmitted: 128, Transmitted: 128,
Total: 2048, Total: 2048,
Err: nil,
} }
pm, _ := json.Marshal(p) pm, _ := json.MarshalIndent(p, "", " ")
t.Log("Progress example json:") t.Log("Progress example JSON:")
t.Log(string(pm)) fmt.Printf("%s\n\n", pm)
t.Log("\n")
// EventReport
er := &EventReport{ er := &EventReport{
Priority: 1, Priority: 1,
Category: "Test Events", Category: "Test Events",
EventType: "Ping", EventType: "Ping",
Details: "This is an example of an event report", Details: "This is an example of an event report",
} }
erm, _ := json.Marshal(er) erm, _ := json.MarshalIndent(er, "", " ")
t.Log("EventReport example json:") t.Log("EventReport example JSON:")
t.Log(string(erm)) fmt.Printf("%s\n\n", erm)
t.Log("\n")
} }
...@@ -99,15 +99,26 @@ func (c *Cmix) ReadyToSend() bool { ...@@ -99,15 +99,26 @@ func (c *Cmix) ReadyToSend() bool {
jww.FATAL.Panicf("Failed to get node registration status: %+v", err) jww.FATAL.Panicf("Failed to get node registration status: %+v", err)
} }
// FIXME: This is a fix put in place because not all nodes in the NDF are
// online. This should be fixed.
total = 340
return numReg >= total*7/10 return numReg >= total*7/10
} }
// IsReadyInfo contains information on if the network is ready and how close it
// is to being ready.
//
// Example JSON:
//
// {
// "IsReady": true,
// "HowClose": 0.534
// }
type IsReadyInfo struct {
IsReady bool
HowClose float64
}
// NetworkFollowerStatus gets the state of the network follower. It returns a // NetworkFollowerStatus gets the state of the network follower. It returns a
// status with the following values: // status with the following values:
//
// Stopped - 0 // Stopped - 0
// Running - 2000 // Running - 2000
// Stopping - 3000 // Stopping - 3000
...@@ -144,6 +155,44 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) { ...@@ -144,6 +155,44 @@ func (c *Cmix) GetNodeRegistrationStatus() ([]byte, error) {
return json.Marshal(nodeRegReport) return json.Marshal(nodeRegReport)
} }
// IsReady returns true if at least percentReady of node registrations has
// completed. If not all have completed, then it returns false and howClose will
// be a percent (0-1) of node registrations completed.
//
// Parameters:
// - percentReady - The percentage of nodes required to be registered with to
// be ready. This is a number between 0 and 1.
//
// Returns:
// - JSON of [IsReadyInfo].
func (c *Cmix) IsReady(percentReady float64) ([]byte, error) {
isReady, howClose := c.api.IsReady(percentReady)
return json.Marshal(&IsReadyInfo{isReady, howClose})
}
// PauseNodeRegistrations stops all node registrations and returns a function to
// resume them.
//
// Parameters:
// - timeoutMS - The timeout, in milliseconds, to wait when stopping threads
// before failing.
func (c *Cmix) PauseNodeRegistrations(timeoutMS int) error {
timeout := time.Duration(timeoutMS) * time.Millisecond
return c.api.PauseNodeRegistrations(timeout)
}
// ChangeNumberOfNodeRegistrations changes the number of parallel node
// registrations up to the initialized maximum.
//
// Parameters:
// - toRun - The number of parallel node registrations.
// - timeoutMS - The timeout, in milliseconds, to wait when changing node
// registrations before failing.
func (c *Cmix) ChangeNumberOfNodeRegistrations(toRun, timeoutMS int) error {
timeout := time.Duration(timeoutMS) * time.Millisecond
return c.api.ChangeNumberOfNodeRegistrations(toRun, timeout)
}
// HasRunningProcessies checks if any background threads are running and returns // HasRunningProcessies checks if any background threads are running and returns
// true if one or more are. // true if one or more are.
// //
...@@ -168,6 +217,7 @@ func (c *Cmix) IsHealthy() bool { ...@@ -168,6 +217,7 @@ func (c *Cmix) IsHealthy() bool {
// - []byte - A JSON marshalled list of all running processes. // - []byte - A JSON marshalled list of all running processes.
// //
// JSON Example: // JSON Example:
//
// { // {
// "FileTransfer{BatchBuilderThread, FilePartSendingThread#0, FilePartSendingThread#1, FilePartSendingThread#2, FilePartSendingThread#3}", // "FileTransfer{BatchBuilderThread, FilePartSendingThread#0, FilePartSendingThread#1, FilePartSendingThread#2, FilePartSendingThread#3}",
// "MessageReception Worker 0" // "MessageReception Worker 0"
...@@ -220,6 +270,7 @@ func (c *Cmix) RegisterClientErrorCallback(clientError ClientError) { ...@@ -220,6 +270,7 @@ func (c *Cmix) RegisterClientErrorCallback(clientError ClientError) {
// - err - JSON unmarshalling error // - err - JSON unmarshalling error
// //
// Example JSON: // Example JSON:
//
// [ // [
// { // {
// "Id": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD", // bytes of id.ID encoded as base64 string // "Id": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD", // bytes of id.ID encoded as base64 string
...@@ -246,10 +297,38 @@ type TrackServicesCallback interface { ...@@ -246,10 +297,38 @@ type TrackServicesCallback interface {
Callback(marshalData []byte, err error) Callback(marshalData []byte, err error)
} }
// TrackServicesWithIdentity will return via a callback the list of services the
// backend keeps track of for the provided identity. This may be passed into
// other bindings call which may need context on the available services for this
// single identity. This will only return services for the given identity.
//
// Parameters:
// - e2eID - e2e object ID in the tracker.
// - cb - A TrackServicesCallback, which will be passed the marshalled
// message.ServiceList.
func (c *Cmix) TrackServicesWithIdentity(e2eId int,
cb TrackServicesCallback) error {
// Retrieve the user from the tracker
user, err := e2eTrackerSingleton.get(e2eId)
if err != nil {
return err
}
receptionId := user.api.GetReceptionIdentity().ID
c.api.GetCmix().TrackServices(func(list message.ServiceList) {
res := make(message.ServiceList)
res[*receptionId] = list[*receptionId]
cb.Callback(json.Marshal(res))
})
return nil
}
// TrackServices will return via a callback the list of services the // TrackServices will return via a callback the list of services the
// backend keeps track of, which is formally referred to as a // backend keeps track of, which is formally referred to as a
// [message.ServiceList]. This may be passed into other bindings call which // [message.ServiceList]. This may be passed into other bindings call which
// may need context on the available services for this client. // may need context on the available services for this client. This will
// provide services for all identities that the client tracks.
// //
// Parameters: // Parameters:
// - cb - A TrackServicesCallback, which will be passed the marshalled // - cb - A TrackServicesCallback, which will be passed the marshalled
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
// via GetNotificationsReport as a JSON marshalled byte data. // via GetNotificationsReport as a JSON marshalled byte data.
// //
// Example JSON: // Example JSON:
//
// [ // [
// { // {
// "ForMe": true, // boolean // "ForMe": true, // boolean
...@@ -44,6 +45,7 @@ type NotificationReports []NotificationReport ...@@ -44,6 +45,7 @@ type NotificationReports []NotificationReport
// this user. // this user.
// //
// Example NotificationReport JSON: // Example NotificationReport JSON:
//
// { // {
// "ForMe": true, // "ForMe": true,
// "Type": "e2e", // "Type": "e2e",
...@@ -80,33 +82,27 @@ type NotificationReport struct { ...@@ -80,33 +82,27 @@ type NotificationReport struct {
// NotificationReports. // NotificationReports.
// //
// Parameters: // Parameters:
// - e2eID - e2e object ID in the tracker
// - notificationCSV - the notification data received from the // - notificationCSV - the notification data received from the
// notifications' server. // notifications' server.
// - marshalledServices - the JSON-marshalled list of services the backend // - marshalledServices - the JSON-marshalled list of services the backend
// keeps track of. Refer to Cmix.TrackServices for information about this. // keeps track of. Refer to Cmix.TrackServices or
// Cmix.TrackServicesWithIdentity for information about this.
// //
// Returns: // Returns:
// - []byte - A JSON marshalled NotificationReports. Some NotificationReport's // - []byte - A JSON marshalled NotificationReports. Some NotificationReport's
// within in this structure may have their NotificationReport.ForMe // within in this structure may have their NotificationReport.ForMe
// set to false. These may be ignored. // set to false. These may be ignored.
func GetNotificationsReport(e2eId int, notificationCSV string, func GetNotificationsReport(notificationCSV string,
marshalledServices []byte) ([]byte, error) { marshalledServices []byte) ([]byte, error) {
// Retrieve user
user, err := e2eTrackerSingleton.get(e2eId)
if err != nil {
return nil, err
}
// If services are retrieved using TrackServicesWithIdentity, this
// should return a single list.
serviceList := message.ServiceList{} serviceList := message.ServiceList{}
err = json.Unmarshal(marshalledServices, &serviceList) err := json.Unmarshal(marshalledServices, &serviceList)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Retrieve the services for this user
services := serviceList[*user.api.GetReceptionIdentity().ID]
// Decode notifications' server data // Decode notifications' server data
notificationList, err := notifications.DecodeNotificationsCSV(notificationCSV) notificationList, err := notifications.DecodeNotificationsCSV(notificationCSV)
if err != nil { if err != nil {
...@@ -117,6 +113,7 @@ func GetNotificationsReport(e2eId int, notificationCSV string, ...@@ -117,6 +113,7 @@ func GetNotificationsReport(e2eId int, notificationCSV string,
reportList := make([]*NotificationReport, len(notificationList)) reportList := make([]*NotificationReport, len(notificationList))
// Iterate over data provided by server // Iterate over data provided by server
for _, services := range serviceList {
for i := range notificationList { for i := range notificationList {
notifData := notificationList[i] notifData := notificationList[i]
...@@ -139,6 +136,7 @@ func GetNotificationsReport(e2eId int, notificationCSV string, ...@@ -139,6 +136,7 @@ func GetNotificationsReport(e2eId int, notificationCSV string,
} }
} }
} }
}
return json.Marshal(reportList) return json.Marshal(reportList)
} }
......
...@@ -24,6 +24,8 @@ type broadcastClient struct { ...@@ -24,6 +24,8 @@ type broadcastClient struct {
rng *fastRNG.StreamGenerator rng *fastRNG.StreamGenerator
} }
// NewBroadcastChannelFunc creates a broadcast Channel. Used so that it can be
// replaced in tests.
type NewBroadcastChannelFunc func(channel *crypto.Channel, net Client, type NewBroadcastChannelFunc func(channel *crypto.Channel, net Client,
rng *fastRNG.StreamGenerator) (Channel, error) rng *fastRNG.StreamGenerator) (Channel, error)
...@@ -52,7 +54,8 @@ func NewBroadcastChannel(channel *crypto.Channel, net Client, ...@@ -52,7 +54,8 @@ func NewBroadcastChannel(channel *crypto.Channel, net Client,
} }
// RegisterListener registers a listener for broadcast messages. // RegisterListener registers a listener for broadcast messages.
func (bc *broadcastClient) RegisterListener(listenerCb ListenerFunc, method Method) error { func (bc *broadcastClient) RegisterListener(
listenerCb ListenerFunc, method Method) error {
var tag string var tag string
switch method { switch method {
case Symmetric: case Symmetric:
......
...@@ -83,7 +83,7 @@ type Channel interface { ...@@ -83,7 +83,7 @@ type Channel interface {
Stop() Stop()
} }
// Assembler is a function which allows a bre // Assembler is a function which allows a bre.
type Assembler func(rid id.Round) (payload []byte, err error) type Assembler func(rid id.Round) (payload []byte, err error)
// Client contains the methods from [cmix.Client] that are required by // Client contains the methods from [cmix.Client] that are required by
...@@ -92,7 +92,8 @@ type Client interface { ...@@ -92,7 +92,8 @@ type Client interface {
SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler, SendWithAssembler(recipient *id.ID, assembler cmix.MessageAssembler,
cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) cmixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error)
IsHealthy() bool IsHealthy() bool
AddIdentityWithHistory(id *id.ID, validUntil, beginning time.Time, persistent bool) AddIdentityWithHistory(
id *id.ID, validUntil, beginning time.Time, persistent bool)
AddService(clientID *id.ID, newService message.Service, AddService(clientID *id.ID, newService message.Service,
response message.Processor) response message.Processor)
DeleteClientService(clientID *id.ID) DeleteClientService(clientID *id.ID)
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
package broadcast package broadcast
// Method enum for broadcast type // Method enum for broadcast type.
type Method uint8 type Method uint8
const ( const (
......
...@@ -21,7 +21,7 @@ const ( ...@@ -21,7 +21,7 @@ const (
errDecrypt = "[BCAST] Failed to decrypt payload for broadcast %s (%q): %+v" errDecrypt = "[BCAST] Failed to decrypt payload for broadcast %s (%q): %+v"
) )
// processor struct for message handling // processor struct for message handling.
type processor struct { type processor struct {
c *crypto.Channel c *crypto.Channel
cb ListenerFunc cb ListenerFunc
...@@ -36,16 +36,19 @@ func (p *processor) Process(msg format.Message, ...@@ -36,16 +36,19 @@ func (p *processor) Process(msg format.Message,
var err error var err error
switch p.method { switch p.method {
case RSAToPublic: case RSAToPublic:
decodedMessage, decryptErr := p.c.DecryptRSAToPublic(msg.GetContents(), msg.GetMac(), msg.GetKeyFP()) decodedMessage, decryptErr := p.c.DecryptRSAToPublic(
msg.GetContents(), msg.GetMac(), msg.GetKeyFP())
if decryptErr != nil { if decryptErr != nil {
jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, decryptErr) jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, decryptErr)
return return
} }
size := binary.BigEndian.Uint16(decodedMessage[:internalPayloadSizeLength]) size := binary.BigEndian.Uint16(
decodedMessage[:internalPayloadSizeLength])
payload = decodedMessage[internalPayloadSizeLength : size+internalPayloadSizeLength] payload = decodedMessage[internalPayloadSizeLength : size+internalPayloadSizeLength]
case Symmetric: case Symmetric:
payload, err = p.c.DecryptSymmetric(msg.GetContents(), msg.GetMac(), msg.GetKeyFP()) payload, err = p.c.DecryptSymmetric(
msg.GetContents(), msg.GetMac(), msg.GetKeyFP())
if err != nil { if err != nil {
jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, err) jww.ERROR.Printf(errDecrypt, p.c.ReceptionID, p.c.Name, err)
return return
...@@ -57,7 +60,8 @@ func (p *processor) Process(msg format.Message, ...@@ -57,7 +60,8 @@ func (p *processor) Process(msg format.Message,
p.cb(payload, receptionID, round) p.cb(payload, receptionID, round)
} }
// String returns a string identifying the symmetricProcessor for debugging purposes. // String returns a string identifying the symmetricProcessor for debugging
// purposes.
func (p *processor) String() string { func (p *processor) String() string {
return "broadcastChannel-" + p.c.Name return "broadcastChannel-" + p.c.Name
} }
...@@ -35,13 +35,10 @@ const ( ...@@ -35,13 +35,10 @@ const (
// or smaller and the channel [rsa.PrivateKey] must be passed in. // or smaller and the channel [rsa.PrivateKey] must be passed in.
// //
// The network must be healthy to send. // The network must be healthy to send.
func (bc *broadcastClient) BroadcastRSAtoPublic(pk rsa.PrivateKey, func (bc *broadcastClient) BroadcastRSAtoPublic(
payload []byte, cMixParams cmix.CMIXParams) (rounds.Round, ephemeral.Id, error) { pk rsa.PrivateKey, payload []byte, cMixParams cmix.CMIXParams) (
// Confirm network health rounds.Round, ephemeral.Id, error) {
assemble := func(rid id.Round) ([]byte, error) { return payload, nil }
assemble := func(rid id.Round) ([]byte, error) {
return payload, nil
}
return bc.BroadcastRSAToPublicWithAssembler(pk, assemble, cMixParams) return bc.BroadcastRSAToPublicWithAssembler(pk, assemble, cMixParams)
} }
...@@ -91,7 +88,7 @@ func (bc *broadcastClient) BroadcastRSAToPublicWithAssembler( ...@@ -91,7 +88,7 @@ func (bc *broadcastClient) BroadcastRSAToPublicWithAssembler(
"asymmetric broadcast message") "asymmetric broadcast message")
} }
// Create service using asymmetric broadcast service tag & channel // Create service using asymmetric broadcast service tag and channel
// reception ID allows anybody with this info to listen for messages on // reception ID allows anybody with this info to listen for messages on
// this channel // this channel
service = message.Service{ service = message.Service{
......
...@@ -158,7 +158,8 @@ func Test_asymmetricClient_Smoke(t *testing.T) { ...@@ -158,7 +158,8 @@ func Test_asymmetricClient_Smoke(t *testing.T) {
} }
// Broadcast payload // Broadcast payload
_, _, err := clients[0].BroadcastRSAtoPublic(pk, payload, cmix.GetDefaultCMIXParams()) _, _, err := clients[0].BroadcastRSAtoPublic(
pk, payload, cmix.GetDefaultCMIXParams())
if err != nil { if err != nil {
t.Errorf("Cmix 0 failed to send broadcast: %+v", err) t.Errorf("Cmix 0 failed to send broadcast: %+v", err)
} }
......
...@@ -22,7 +22,6 @@ const ( ...@@ -22,7 +22,6 @@ const (
// broadcastClient.Broadcast // broadcastClient.Broadcast
errNetworkHealth = "cannot send broadcast when the network is not healthy" errNetworkHealth = "cannot send broadcast when the network is not healthy"
errPayloadSize = "size of payload %d must be less than %d" errPayloadSize = "size of payload %d must be less than %d"
errBroadcastMethodType = "cannot call %s broadcast using %s channel"
) )
// Tags. // Tags.
...@@ -50,7 +49,8 @@ func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams) ...@@ -50,7 +49,8 @@ func (bc *broadcastClient) Broadcast(payload []byte, cMixParams cmix.CMIXParams)
// The payload must be of the size [Channel.MaxPayloadSize] or smaller. // The payload must be of the size [Channel.MaxPayloadSize] or smaller.
// //
// The network must be healthy to send. // The network must be healthy to send.
func (bc *broadcastClient) BroadcastWithAssembler(assembler Assembler, cMixParams cmix.CMIXParams) ( func (bc *broadcastClient) BroadcastWithAssembler(
assembler Assembler, cMixParams cmix.CMIXParams) (
rounds.Round, ephemeral.Id, error) { rounds.Round, ephemeral.Id, error) {
if !bc.net.IsHealthy() { if !bc.net.IsHealthy() {
return rounds.Round{}, ephemeral.Id{}, errors.New(errNetworkHealth) return rounds.Round{}, ephemeral.Id{}, errors.New(errNetworkHealth)
...@@ -59,7 +59,7 @@ func (bc *broadcastClient) BroadcastWithAssembler(assembler Assembler, cMixParam ...@@ -59,7 +59,7 @@ func (bc *broadcastClient) BroadcastWithAssembler(assembler Assembler, cMixParam
assemble := func(rid id.Round) (fp format.Fingerprint, assemble := func(rid id.Round) (fp format.Fingerprint,
service message.Service, encryptedPayload, mac []byte, err error) { service message.Service, encryptedPayload, mac []byte, err error) {
//assemble the passed payload // Assemble the passed payload
payload, err := assembler(rid) payload, err := assembler(rid)
if err != nil { if err != nil {
return format.Fingerprint{}, message.Service{}, nil, nil, err return format.Fingerprint{}, message.Service{}, nil, nil, err
...@@ -93,6 +93,6 @@ func (bc *broadcastClient) BroadcastWithAssembler(assembler Assembler, cMixParam ...@@ -93,6 +93,6 @@ func (bc *broadcastClient) BroadcastWithAssembler(assembler Assembler, cMixParam
return return
} }
return bc.net.SendWithAssembler(bc.channel.ReceptionID, assemble, return bc.net.SendWithAssembler(
cMixParams) bc.channel.ReceptionID, assemble, cMixParams)
} }
Channels provides a channels implementation on top of broadcast which is capable of handing the user facing features of Channels provide a channels implementation on top of broadcast which is capable
channels, including replies, reactions, and eventually admin commands. of handing the user facing features of channels, including replies, reactions,
and eventually admin commands.
on sending, data propagates as follows: on sending, data propagates as follows:
```text
Send function (Example: SendMessage) - > SendGeneric -> Send function (Example: SendMessage) - > SendGeneric ->
Broadcast.BroadcastWithAssembler -> cmix.SendWithAssembler Broadcast.BroadcastWithAssembler -> cmix.SendWithAssembler
```
on receiving messages propagate as follows: on receiving messages propagate as follows:
```text
cmix message pickup (by service)- > broadcast.Processor -> cmix message pickup (by service)- > broadcast.Processor ->
userListener -> events.triggerEvent -> userListener -> events.triggerEvent ->
messageTypeHandler (example: Text) -> messageTypeHandler (example: Text) ->
eventModel (example: ReceiveMessage) eventModel (example: ReceiveMessage)
```
on sendingAdmin, data propagates as follows: on sendingAdmin, data propagates as follows:
```text
Send function - > SendAdminGeneric -> Send function - > SendAdminGeneric ->
Broadcast.BroadcastAsymmetricWithAssembler -> cmix.SendWithAssembler Broadcast.BroadcastAsymmetricWithAssembler -> cmix.SendWithAssembler
```
on receiving admin messages propagate as follows: on receiving admin messages propagate as follows:
```text
cmix message pickup (by service)- > broadcast.Processor -> adminListener -> cmix message pickup (by service)- > broadcast.Processor -> adminListener ->
events.triggerAdminEvent -> messageTypeHandler (example: Text) -> events.triggerAdminEvent -> messageTypeHandler (example: Text) ->
eventModel (example: ReceiveMessage) eventModel (example: ReceiveMessage)
```
...@@ -26,7 +26,7 @@ type adminListener struct { ...@@ -26,7 +26,7 @@ type adminListener struct {
checkSent messageReceiveFunc checkSent messageReceiveFunc
} }
// Listen is called when a message is received for the admin listener // Listen is called when a message is received for the admin listener.
func (al *adminListener) Listen(payload []byte, func (al *adminListener) Listen(payload []byte,
receptionID receptionID.EphemeralIdentity, round rounds.Round) { receptionID receptionID.EphemeralIdentity, round rounds.Round) {
// Get the message ID // Get the message ID
...@@ -40,7 +40,7 @@ func (al *adminListener) Listen(payload []byte, ...@@ -40,7 +40,7 @@ func (al *adminListener) Listen(payload []byte,
return return
} }
//check if we sent the message, ignore triggering if we sent // Check if we sent the message, ignore triggering if we sent
if al.checkSent(msgID, round) { if al.checkSent(msgID, round) {
return return
} }
...@@ -49,14 +49,13 @@ func (al *adminListener) Listen(payload []byte, ...@@ -49,14 +49,13 @@ func (al *adminListener) Listen(payload []byte,
// Check the round to ensure that the message is not a replay // Check the round to ensure that the message is not a replay
if id.Round(cm.RoundID) != round.ID { if id.Round(cm.RoundID) != round.ID {
jww.WARN.Printf("The round message %s send on %s referenced "+ jww.WARN.Printf("The round message %s send on %s referenced (%d) was "+
"(%d) was not the same as the round the message was found on (%d)", "not the same as the round the message was found on (%d)",
msgID, al.chID, cm.RoundID, round.ID) msgID, al.chID, cm.RoundID, round.ID)
return return
} }
// Replace the timestamp on the message if it is outside of the // Replace the timestamp on the message if it is outside the allowable range
// allowable range
ts := vetTimestamp(time.Unix(0, cm.LocalTimestamp), ts := vetTimestamp(time.Unix(0, cm.LocalTimestamp),
round.Timestamps[states.QUEUED], msgID) round.Timestamps[states.QUEUED], msgID)
......
...@@ -33,9 +33,9 @@ type triggerAdminEventDummy struct { ...@@ -33,9 +33,9 @@ type triggerAdminEventDummy struct {
} }
func (taed *triggerAdminEventDummy) triggerAdminEvent(chID *id.ID, func (taed *triggerAdminEventDummy) triggerAdminEvent(chID *id.ID,
cm *ChannelMessage, ts time.Time, messageID cryptoChannel.MessageID, cm *ChannelMessage, _ time.Time, messageID cryptoChannel.MessageID,
receptionID receptionID.EphemeralIdentity, round rounds.Round, receptionID receptionID.EphemeralIdentity, round rounds.Round,
status SentStatus) (uint64, error) { _ SentStatus) (uint64, error) {
taed.gotData = true taed.gotData = true
taed.chID = chID taed.chID = chID
...@@ -49,7 +49,6 @@ func (taed *triggerAdminEventDummy) triggerAdminEvent(chID *id.ID, ...@@ -49,7 +49,6 @@ func (taed *triggerAdminEventDummy) triggerAdminEvent(chID *id.ID,
// Tests the happy path. // Tests the happy path.
func TestAdminListener_Listen(t *testing.T) { func TestAdminListener_Listen(t *testing.T) {
// Build inputs // Build inputs
chID := &id.ID{} chID := &id.ID{}
chID[0] = 1 chID[0] = 1
...@@ -77,7 +76,9 @@ func TestAdminListener_Listen(t *testing.T) { ...@@ -77,7 +76,9 @@ func TestAdminListener_Listen(t *testing.T) {
al := adminListener{ al := adminListener{
chID: chID, chID: chID,
trigger: dummy.triggerAdminEvent, trigger: dummy.triggerAdminEvent,
checkSent: func(messageID cryptoChannel.MessageID, r rounds.Round) bool { return false }, checkSent: func(cryptoChannel.MessageID, rounds.Round) bool {
return false
},
} }
// Call the listener // Call the listener
...@@ -111,8 +112,7 @@ func TestAdminListener_Listen(t *testing.T) { ...@@ -111,8 +112,7 @@ func TestAdminListener_Listen(t *testing.T) {
// Tests that the message is rejected when the round it came on doesn't match // Tests that the message is rejected when the round it came on doesn't match
// the round in the channel message. // the round in the channel message.
func TestAdminListener_Listen_BadRound(t *testing.T) { func TestAdminListener_Listen_BadRound(t *testing.T) {
// Build inputs
// build inputs
chID := &id.ID{} chID := &id.ID{}
chID[0] = 1 chID[0] = 1
...@@ -138,18 +138,19 @@ func TestAdminListener_Listen_BadRound(t *testing.T) { ...@@ -138,18 +138,19 @@ func TestAdminListener_Listen_BadRound(t *testing.T) {
al := adminListener{ al := adminListener{
chID: chID, chID: chID,
trigger: dummy.triggerAdminEvent, trigger: dummy.triggerAdminEvent,
checkSent: func(messageID cryptoChannel.MessageID, r rounds.Round) bool { return false }, checkSent: func(cryptoChannel.MessageID, rounds.Round) bool {
return false
},
} }
// Call the listener // Call the listener
al.Listen(cmSerial, receptionID.EphemeralIdentity{}, r) al.Listen(cmSerial, receptionID.EphemeralIdentity{}, r)
// check the results // Check the results
if dummy.gotData { if dummy.gotData {
t.Fatalf("payload handled when it should have failed due to " + t.Fatal(
"a round issue") "Payload handled when it should have failed due to a round issue.")
} }
} }
// Tests that the message is rejected when the channel message is malformed. // Tests that the message is rejected when the channel message is malformed.
...@@ -170,7 +171,9 @@ func TestAdminListener_Listen_BadChannelMessage(t *testing.T) { ...@@ -170,7 +171,9 @@ func TestAdminListener_Listen_BadChannelMessage(t *testing.T) {
al := adminListener{ al := adminListener{
chID: chID, chID: chID,
trigger: dummy.triggerAdminEvent, trigger: dummy.triggerAdminEvent,
checkSent: func(messageID cryptoChannel.MessageID, r rounds.Round) bool { return false }, checkSent: func(cryptoChannel.MessageID, rounds.Round) bool {
return false
},
} }
// Call the listener // Call the listener
...@@ -181,14 +184,12 @@ func TestAdminListener_Listen_BadChannelMessage(t *testing.T) { ...@@ -181,14 +184,12 @@ func TestAdminListener_Listen_BadChannelMessage(t *testing.T) {
t.Fatalf("payload handled when it should have failed due to " + t.Fatalf("payload handled when it should have failed due to " +
"a malformed channel message") "a malformed channel message")
} }
} }
// Tests that the message is rejected when the sized broadcast message is // Tests that the message is rejected when the sized broadcast message is
// malformed. // malformed.
func TestAdminListener_Listen_BadSizedBroadcast(t *testing.T) { func TestAdminListener_Listen_BadSizedBroadcast(t *testing.T) {
// Build inputs
// build inputs
chID := &id.ID{} chID := &id.ID{}
chID[0] = 1 chID[0] = 1
...@@ -217,7 +218,9 @@ func TestAdminListener_Listen_BadSizedBroadcast(t *testing.T) { ...@@ -217,7 +218,9 @@ func TestAdminListener_Listen_BadSizedBroadcast(t *testing.T) {
al := adminListener{ al := adminListener{
chID: chID, chID: chID,
trigger: dummy.triggerAdminEvent, trigger: dummy.triggerAdminEvent,
checkSent: func(messageID cryptoChannel.MessageID, r rounds.Round) bool { return false }, checkSent: func(cryptoChannel.MessageID, rounds.Round) bool {
return false
},
} }
// Call the listener // Call the listener
......
...@@ -7,8 +7,8 @@ ...@@ -7,8 +7,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.27.1 // protoc-gen-go v1.28.1
// protoc v3.15.6 // protoc v3.21.9
// source: channelMessages.proto // source: channelMessages.proto
package channels package channels
...@@ -44,17 +44,17 @@ type ChannelMessage struct { ...@@ -44,17 +44,17 @@ type ChannelMessage struct {
// Payload is the actual message payload. It will be processed differently // Payload is the actual message payload. It will be processed differently
// based on the PayloadType. // based on the PayloadType.
Payload []byte `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"` Payload []byte `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"`
// nickname is the name which the user is using for this message // nickname is the name which the user is using for this message it will not
// it will not be longer than 24 characters // be longer than 24 characters.
Nickname string `protobuf:"bytes,5,opt,name=Nickname,proto3" json:"Nickname,omitempty"` Nickname string `protobuf:"bytes,5,opt,name=Nickname,proto3" json:"Nickname,omitempty"`
// Nonce is 32 bits of randomness to ensure that two messages in the same // Nonce is 32 bits of randomness to ensure that two messages in the same
// round with that have the same nickname, payload, and lease will not have // round with that have the same nickname, payload, and lease will not have
// the same message ID. // the same message ID.
Nonce []byte `protobuf:"bytes,6,opt,name=Nonce,proto3" json:"Nonce,omitempty"` Nonce []byte `protobuf:"bytes,6,opt,name=Nonce,proto3" json:"Nonce,omitempty"`
// LocalTimestamp is the timestamp when the "send call" is made based upon the // LocalTimestamp is the timestamp when the "send call" is made based upon
// local clock. If this differs by more than 5 seconds +/- from when the round // the local clock. If this differs by more than 5 seconds +/- from when the
// it sent on is queued, then a random mutation on the queued time (+/- 200ms) // round it sent on is queued, then a random mutation on the queued time
// will be used by local clients instead // (+/- 200ms) will be used by local clients instead.
LocalTimestamp int64 `protobuf:"varint,7,opt,name=LocalTimestamp,proto3" json:"LocalTimestamp,omitempty"` LocalTimestamp int64 `protobuf:"varint,7,opt,name=LocalTimestamp,proto3" json:"LocalTimestamp,omitempty"`
} }
......
...@@ -28,8 +28,8 @@ message ChannelMessage{ ...@@ -28,8 +28,8 @@ message ChannelMessage{
// based on the PayloadType. // based on the PayloadType.
bytes Payload = 4; bytes Payload = 4;
// nickname is the name which the user is using for this message // nickname is the name which the user is using for this message it will not
// it will not be longer than 24 characters // be longer than 24 characters.
string Nickname = 5; string Nickname = 5;
// Nonce is 32 bits of randomness to ensure that two messages in the same // Nonce is 32 bits of randomness to ensure that two messages in the same
...@@ -37,10 +37,10 @@ message ChannelMessage{ ...@@ -37,10 +37,10 @@ message ChannelMessage{
// the same message ID. // the same message ID.
bytes Nonce = 6; bytes Nonce = 6;
// LocalTimestamp is the timestamp when the "send call" is made based upon the // LocalTimestamp is the timestamp when the "send call" is made based upon
// local clock. If this differs by more than 5 seconds +/- from when the round // the local clock. If this differs by more than 5 seconds +/- from when the
// it sent on is queued, then a random mutation on the queued time (+/- 200ms) // round it sent on is queued, then a random mutation on the queued time
// will be used by local clients instead // (+/- 200ms) will be used by local clients instead.
int64 LocalTimestamp = 7; int64 LocalTimestamp = 7;
} }
......
...@@ -16,8 +16,8 @@ import ( ...@@ -16,8 +16,8 @@ import (
"time" "time"
) )
// NewDummyNameService returns a dummy object adhering to the name service // NewDummyNameService returns a dummy object adhering to the name service. This
// This neither produces valid signatures nor validates passed signatures. // neither produces valid signatures nor validates passed signatures.
// //
// THIS IS FOR DEVELOPMENT AND DEBUGGING PURPOSES ONLY. // THIS IS FOR DEVELOPMENT AND DEBUGGING PURPOSES ONLY.
func NewDummyNameService(username string, rng io.Reader) (NameService, error) { func NewDummyNameService(username string, rng io.Reader) (NameService, error) {
...@@ -31,23 +31,23 @@ func NewDummyNameService(username string, rng io.Reader) (NameService, error) { ...@@ -31,23 +31,23 @@ func NewDummyNameService(username string, rng io.Reader) (NameService, error) {
lease: netTime.Now().Add(35 * 24 * time.Hour), lease: netTime.Now().Add(35 * 24 * time.Hour),
} }
//generate the private key // Generate the private key
var err error var err error
dns.public, dns.private, err = ed25519.GenerateKey(rng) dns.public, dns.private, err = ed25519.GenerateKey(rng)
if err != nil { if err != nil {
return nil, err return nil, err
} }
//generate a dummy user discover identity to produce a validation signature // Generate a dummy user discover identity to produce a validation signature
//just sign with our own key, it wont be evaluated anyhow // just sign with our own key, it will not be evaluated anyhow
dns.validationSig = channel.SignChannelLease(dns.public, dns.username, dns.validationSig = channel.SignChannelLease(dns.public, dns.username,
dns.lease, dns.private) dns.lease, dns.private)
return dns, nil return dns, nil
} }
// dummyNameService is a dummy NameService implementation. This is NOT meant // dummyNameService is a dummy NameService implementation. This is NOT meant for
// for use in production // use in production.
type dummyNameService struct { type dummyNameService struct {
private ed25519.PrivateKey private ed25519.PrivateKey
public ed25519.PublicKey public ed25519.PublicKey
...@@ -94,14 +94,14 @@ func (dns *dummyNameService) SignChannelMessage(message []byte) ( ...@@ -94,14 +94,14 @@ func (dns *dummyNameService) SignChannelMessage(message []byte) (
return sig, nil return sig, nil
} }
// ValidateChannelMessage will always return true, indicating the the channel // ValidateChannelMessage will always return true, indicating that the channel
// message is valid. This will ignore the passed in arguments. As a result, // message is valid. This will ignore the passed in arguments. As a result,
// these values may be dummy or precanned. // these values may be dummy or precanned.
// //
// THIS IS FOR DEVELOPMENT AND DEBUGGING PURPOSES ONLY. // THIS IS FOR DEVELOPMENT AND DEBUGGING PURPOSES ONLY.
func (dns *dummyNameService) ValidateChannelMessage(username string, lease time.Time, func (dns *dummyNameService) ValidateChannelMessage(
pubKey ed25519.PublicKey, authorIDSignature []byte) bool { string, time.Time, ed25519.PublicKey, []byte) bool {
//ignore the authorIDSignature // Ignore the authorIDSignature
jww.WARN.Printf("ValidateChannelMessage called on Dummy Name Service, " + jww.WARN.Printf("ValidateChannelMessage called on Dummy Name Service, " +
"no validation done - identity not validated. YOU SHOULD NEVER SEE " + "no validation done - identity not validated. YOU SHOULD NEVER SEE " +
"THIS MESSAGE IN PRODUCTION") "THIS MESSAGE IN PRODUCTION")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment