Skip to content
Snippets Groups Projects
Commit 60abe762 authored by Josh Brooks's avatar Josh Brooks
Browse files

Merge branch 'XX-2474/ClientComms' into 'release'

Add new endpoints for client changes

Closes XX-2474

See merge request elixxir/comms!261
parents 7b218ca6 42d8416b
No related branches found
No related tags found
1 merge request!58Revert "Modify waiting lock"
...@@ -198,3 +198,90 @@ func (c *Comms) SendPoll(host *connect.Host, ...@@ -198,3 +198,90 @@ func (c *Comms) SendPoll(host *connect.Host,
result := &pb.GatewayPollResponse{} result := &pb.GatewayPollResponse{}
return result, ptypes.UnmarshalAny(resultMsg, result) return result, ptypes.UnmarshalAny(resultMsg, result)
} }
// Client -> Gateway Send Function
func (c *Comms) RequestHistoricalRounds(host *connect.Host,
message *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) {
// Create the Send Function
f := func(conn *grpc.ClientConn) (*any.Any, error) {
// Set up the context
ctx, cancel := connect.MessagingContext()
defer cancel()
// Send the message
resultMsg, err := pb.NewGatewayClient(conn).RequestHistoricalRounds(ctx, message)
if err != nil {
return nil, errors.New(err.Error())
}
return ptypes.MarshalAny(resultMsg)
}
// Execute the Send function
jww.DEBUG.Printf("Sending Poll message: %+v", message)
resultMsg, err := c.Send(host, f)
if err != nil {
return nil, err
}
// Marshall the result
result := &pb.HistoricalRoundsResponse{}
return result, ptypes.UnmarshalAny(resultMsg, result)
}
// Client -> Gateway Send Function
func (c *Comms) RequestMessages(host *connect.Host,
message *pb.GetMessages) (*pb.GetMessagesResponse, error) {
// Create the Send Function
f := func(conn *grpc.ClientConn) (*any.Any, error) {
// Set up the context
ctx, cancel := connect.MessagingContext()
defer cancel()
// Send the message
resultMsg, err := pb.NewGatewayClient(conn).RequestMessages(ctx, message)
if err != nil {
return nil, errors.New(err.Error())
}
return ptypes.MarshalAny(resultMsg)
}
// Execute the Send function
jww.DEBUG.Printf("Sending Poll message: %+v", message)
resultMsg, err := c.Send(host, f)
if err != nil {
return nil, err
}
// Marshall the result
result := &pb.GetMessagesResponse{}
return result, ptypes.UnmarshalAny(resultMsg, result)
}
// Client -> Gateway Send Function
func (c *Comms) RequestBloom(host *connect.Host,
message *pb.GetBloom) (*pb.GetBloomResponse, error) {
// Create the Send Function
f := func(conn *grpc.ClientConn) (*any.Any, error) {
// Set up the context
ctx, cancel := connect.MessagingContext()
defer cancel()
// Send the message
resultMsg, err := pb.NewGatewayClient(conn).RequestBloom(ctx, message)
if err != nil {
return nil, errors.New(err.Error())
}
return ptypes.MarshalAny(resultMsg)
}
// Execute the Send function
jww.DEBUG.Printf("Sending Poll message: %+v", message)
resultMsg, err := c.Send(host, f)
if err != nil {
return nil, err
}
// Marshall the result
result := &pb.GetBloomResponse{}
return result, ptypes.UnmarshalAny(resultMsg, result)
}
...@@ -154,3 +154,47 @@ func TestComms_SendPoll(t *testing.T) { ...@@ -154,3 +154,47 @@ func TestComms_SendPoll(t *testing.T) {
t.Errorf("SendPoll: Error received: %+v", err) t.Errorf("SendPoll: Error received: %+v", err)
} }
} }
// Smoke test RequestMessages
func TestComms_RequestMessages(t *testing.T) {
gatewayAddress := getNextAddress()
testID := id.NewIdFromString("test", id.Gateway, t)
gw := gateway.StartGateway(testID, gatewayAddress,
gateway.NewImplementation(), nil, nil)
defer gw.Shutdown()
var c Comms
var manager connect.Manager
host, err := manager.AddHost(testID, gatewayAddress, nil, false, false)
if err != nil {
t.Errorf("Unable to call NewHost: %+v", err)
}
_, err = c.RequestMessages(host,
&pb.GetMessages{})
if err != nil {
t.Errorf("SendPoll: Error received: %+v", err)
}
}
// Smoke test RequestHistoricalRounds
func TestComms_RequestHistoricalRounds(t *testing.T) {
gatewayAddress := getNextAddress()
testID := id.NewIdFromString("test", id.Gateway, t)
gw := gateway.StartGateway(testID, gatewayAddress,
gateway.NewImplementation(), nil, nil)
defer gw.Shutdown()
var c Comms
var manager connect.Manager
host, err := manager.AddHost(testID, gatewayAddress, nil, false, false)
if err != nil {
t.Errorf("Unable to call NewHost: %+v", err)
}
_, err = c.RequestBloom(host,
&pb.GetBloom{})
if err != nil {
t.Errorf("SendPoll: Error received: %+v", err)
}
}
...@@ -145,3 +145,18 @@ func (g *Comms) PollForNotifications(ctx context.Context, msg *messages.Authenti ...@@ -145,3 +145,18 @@ func (g *Comms) PollForNotifications(ctx context.Context, msg *messages.Authenti
func (g *Comms) Poll(ctx context.Context, msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error) { func (g *Comms) Poll(ctx context.Context, msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error) {
return g.handler.Poll(msg) return g.handler.Poll(msg)
} }
// Client -> Gateway historical round request
func (g *Comms) RequestHistoricalRounds(ctx context.Context, msg *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) {
return g.handler.RequestHistoricalRounds(msg)
}
// Client -> Gateway message request
func (g *Comms) RequestMessages(ctx context.Context, msg *pb.GetMessages) (*pb.GetMessagesResponse, error) {
return g.handler.RequestMessages(msg)
}
// Client -> Gateway bloom filter request
func (g *Comms) RequestBloom(ctx context.Context, msg *pb.GetBloom) (*pb.GetBloomResponse, error) {
return g.handler.RequestBloom(msg)
}
...@@ -25,7 +25,7 @@ type Handler interface { ...@@ -25,7 +25,7 @@ type Handler interface {
// Return any MessageIDs in the buffer for this UserID // Return any MessageIDs in the buffer for this UserID
CheckMessages(userID *id.ID, messageID string, ipAddress string) ([]string, error) CheckMessages(userID *id.ID, messageID string, ipAddress string) ([]string, error)
// Returns the message matching the given parameters to the client // Returns the message matching the given parameters to the client
GetMessage(userID *id.ID, msgID string, ipAddress string) (*pb.Slot, error) GetMessage(userID *id.ID, msgID string, ipAddress string) (*pb.Slot, error) // todo: depracate?
// Upload a message to the cMix Gateway // Upload a message to the cMix Gateway
PutMessage(message *pb.GatewaySlot, ipAddress string) (*pb.GatewaySlotResponse, error) PutMessage(message *pb.GatewaySlot, ipAddress string) (*pb.GatewaySlotResponse, error)
// Pass-through for Registration Nonce Communication // Pass-through for Registration Nonce Communication
...@@ -37,6 +37,12 @@ type Handler interface { ...@@ -37,6 +37,12 @@ type Handler interface {
PollForNotifications(auth *connect.Auth) ([]*id.ID, error) PollForNotifications(auth *connect.Auth) ([]*id.ID, error)
// Client -> Gateway unified polling // Client -> Gateway unified polling
Poll(msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error) Poll(msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error)
// Client -> Gateway historical round request
RequestHistoricalRounds(msg *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
// Client -> Gateway message request
RequestMessages(msg *pb.GetMessages) (*pb.GetMessagesResponse, error)
// Client -> Gateway bloom request
RequestBloom(msg *pb.GetBloom) (*pb.GetBloomResponse, error)
} }
// Gateway object used to implement endpoints and top-level comms functionality // Gateway object used to implement endpoints and top-level comms functionality
...@@ -97,6 +103,12 @@ type implementationFunctions struct { ...@@ -97,6 +103,12 @@ type implementationFunctions struct {
PollForNotifications func(auth *connect.Auth) ([]*id.ID, error) PollForNotifications func(auth *connect.Auth) ([]*id.ID, error)
// Client -> Gateway unified polling // Client -> Gateway unified polling
Poll func(msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error) Poll func(msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error)
// Client -> Gateway historical round request
RequestHistoricalRounds func(msg *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error)
// Client -> Gateway message request
RequestMessages func(msg *pb.GetMessages) (*pb.GetMessagesResponse, error)
// Client -> Gateway bloom request
RequestBloom func(msg *pb.GetBloom) (*pb.GetBloomResponse, error)
} }
// Implementation allows users of the client library to set the // Implementation allows users of the client library to set the
...@@ -142,6 +154,18 @@ func NewImplementation() *Implementation { ...@@ -142,6 +154,18 @@ func NewImplementation() *Implementation {
warn(um) warn(um)
return &pb.GatewayPollResponse{}, nil return &pb.GatewayPollResponse{}, nil
}, },
RequestHistoricalRounds: func(msg *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) {
warn(um)
return &pb.HistoricalRoundsResponse{}, nil
},
RequestMessages: func(msg *pb.GetMessages) (*pb.GetMessagesResponse, error) {
warn(um)
return &pb.GetMessagesResponse{}, nil
},
RequestBloom: func(msg *pb.GetBloom) (*pb.GetBloomResponse, error) {
warn(um)
return &pb.GetBloomResponse{}, nil
},
}, },
} }
} }
...@@ -184,3 +208,18 @@ func (s *Implementation) PollForNotifications(auth *connect.Auth) ([]*id.ID, err ...@@ -184,3 +208,18 @@ func (s *Implementation) PollForNotifications(auth *connect.Auth) ([]*id.ID, err
func (s *Implementation) Poll(msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error) { func (s *Implementation) Poll(msg *pb.GatewayPoll) (*pb.GatewayPollResponse, error) {
return s.Functions.Poll(msg) return s.Functions.Poll(msg)
} }
// Client -> Gateway historical round request
func (s *Implementation) RequestHistoricalRounds(msg *pb.HistoricalRounds) (*pb.HistoricalRoundsResponse, error) {
return s.Functions.RequestHistoricalRounds(msg)
}
// Client -> Gateway historical round request
func (s *Implementation) RequestMessages(msg *pb.GetMessages) (*pb.GetMessagesResponse, error) {
return s.Functions.RequestMessages(msg)
}
// Client -> Gateway bloom request
func (s *Implementation) RequestBloom(msg *pb.GetBloom) (*pb.GetBloomResponse, error) {
return s.Functions.RequestBloom(msg)
}
This diff is collapsed.
...@@ -171,6 +171,52 @@ service Gateway { ...@@ -171,6 +171,52 @@ service Gateway {
// Client -> Gateway unified polling // Client -> Gateway unified polling
rpc Poll (GatewayPoll) returns (GatewayPollResponse) { rpc Poll (GatewayPoll) returns (GatewayPollResponse) {
} }
// Client -> Gateway historical round request
rpc RequestHistoricalRounds(HistoricalRounds) returns (HistoricalRoundsResponse) {
}
// Client -> Gateway message request
rpc RequestMessages(GetMessages) returns (GetMessagesResponse) {
}
// Client -> Gateway bloom request
rpc RequestBloom(GetBloom) returns (GetBloomResponse) {
}
}
// Client -> Gateway request for bloom filter for available messages
message GetBloom{
bytes ClientID = 1;
}
// Gateway's response to client's request for bloom filters
message GetBloomResponse{
repeated bytes Filters = 1;
}
// Client -> gateway request for information about historical rounds
message HistoricalRounds{
repeated uint64 rounds = 1;
}
// Gateway's response to client's request for previous (historical) rounds
message HistoricalRoundsResponse{
repeated RoundInfo Rounds = 1;
}
// Client -> gateway request for available messages
// The query will be a request for all messages
// available in a round.
message GetMessages{
bytes ClientID = 1;
bytes RoundID = 2;
}
// Gateway response to a GetMessages request
message GetMessagesResponse{
repeated Slot Messages = 1;
bool HasRound = 2;
} }
// ClientRequest message for clients to poll new CMIX messages // ClientRequest message for clients to poll new CMIX messages
...@@ -208,15 +254,15 @@ message Slot { ...@@ -208,15 +254,15 @@ message Slot {
message GatewayPoll { message GatewayPoll {
NDFHash Partial = 1; NDFHash Partial = 1;
uint64 LastUpdate = 2; uint64 LastUpdate = 2;
string LastMessageID = 3; string LastMessageID = 3; // fixme: depracate this field
} }
// Unified Client->Gateway polling response // Unified Client->Gateway polling response
message GatewayPollResponse { message GatewayPollResponse {
NDF PartialNDF = 1; // Empty if no update needed NDF PartialNDF = 1; // Empty if no update needed
RoundInfo LastRound = 2; // Last round the gateway has messages from RoundInfo LastRound = 2; // Last round the gateway has messages from // fixme: depracate this field
repeated RoundInfo Updates = 3; // Empty if no update needed repeated RoundInfo Updates = 3; // Empty if no update needed
repeated string NewMessageIDs = 4; // Empty if no update needed repeated string NewMessageIDs = 4; // Empty if no update needed // fixme: depracate this field
} }
// Client -> Gateway authentication message // Client -> Gateway authentication message
......
...@@ -12,8 +12,8 @@ package dataStructures ...@@ -12,8 +12,8 @@ package dataStructures
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlab.com/elixxir/comms/mixmessages" "gitlab.com/elixxir/comms/mixmessages"
"gitlab.com/xx_network/ring"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/ring"
) )
const RoundInfoBufLen = 500 const RoundInfoBufLen = 500
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment