Skip to content
Snippets Groups Projects

Tls websockets

Open
Jonah Hussonrequested to merge
tls-websockets into release
6 open threads
4 files
+ 15
16
Compare changes
  • Side-by-side
  • Inline

Files

+ 8
6
@@ -159,7 +159,7 @@ var NewUnary = func(host string, opts *ConnectOptions) UnaryTransport {
@@ -159,7 +159,7 @@ var NewUnary = func(host string, opts *ConnectOptions) UnaryTransport {
}
}
}
}
type ClientStreamTransport interface {
type WebsocketStreamingTransport interface {
Header() (http.Header, error)
Header() (http.Header, error)
Trailer() http.Header
Trailer() http.Header
@@ -200,6 +200,7 @@ type webSocketTransport struct {
@@ -200,6 +200,7 @@ type webSocketTransport struct {
}
}
func (t *webSocketTransport) Header() (http.Header, error) {
func (t *webSocketTransport) Header() (http.Header, error) {
 
// Try to read headers if we haven't already
if t.header == nil {
if t.header == nil {
t.resOnce.Do(t.receiveHeaders)
t.resOnce.Do(t.receiveHeaders)
}
}
@@ -306,6 +307,8 @@ func (t *webSocketTransport) Receive(context.Context) (_ io.ReadCloser, err erro
@@ -306,6 +307,8 @@ func (t *webSocketTransport) Receive(context.Context) (_ io.ReadCloser, err erro
return res, nil
return res, nil
}
}
 
// receiveHeaders should be called through resOnce.Do.
 
// It is used to retreive streaming headers if they have not been read.
func (t *webSocketTransport) receiveHeaders() {
func (t *webSocketTransport) receiveHeaders() {
ctx := context.Background()
ctx := context.Background()
var err error
var err error
@@ -338,8 +341,7 @@ func (t *webSocketTransport) receiveHeaders() {
@@ -338,8 +341,7 @@ func (t *webSocketTransport) receiveHeaders() {
func (t *webSocketTransport) CloseSend() error {
func (t *webSocketTransport) CloseSend() error {
// 0x01 means the finish send frame.
// 0x01 means the finish send frame.
// ref. transports/websocket/websocket.ts
// ref. transports/websocket/websocket.ts
t.writeMessage(int(websocket.MessageBinary), []byte{0x01})
return t.writeMessage(int(websocket.MessageBinary), []byte{0x01})
return nil
}
}
func (t *webSocketTransport) Close() error {
func (t *webSocketTransport) Close() error {
@@ -352,7 +354,6 @@ func (t *webSocketTransport) Close() error {
@@ -352,7 +354,6 @@ func (t *webSocketTransport) Close() error {
return err
return err
}
}
t.closed = true
t.closed = true
// Close the WebSocket connection.
return nil
return nil
}
}
@@ -362,14 +363,15 @@ func (t *webSocketTransport) writeMessage(msg int, b []byte) error {
@@ -362,14 +363,15 @@ func (t *webSocketTransport) writeMessage(msg int, b []byte) error {
return t.conn.Write(context.Background(), websocket.MessageType(msg), b)
return t.conn.Write(context.Background(), websocket.MessageType(msg), b)
}
}
var NewClientStream = func(host, endpoint string, opts *ConnectOptions) (ClientStreamTransport, error) {
var NewClientStream = func(host, endpoint string, opts *ConnectOptions) (WebsocketStreamingTransport, error) {
// TODO: WebSocket over TLS support.
h := http.Header{}
h := http.Header{}
h.Set("Sec-WebSocket-Protocol", "grpc-websockets")
h.Set("Sec-WebSocket-Protocol", "grpc-websockets")
var conn *websocket.Conn
var conn *websocket.Conn
dialer := &websocket.DialOptions{}
dialer := &websocket.DialOptions{}
dialer.HTTPClient = http.DefaultClient
dialer.HTTPClient = http.DefaultClient
 
// Set weebsocket dialer http header
dialer.HTTPHeader = h
dialer.HTTPHeader = h
 
// Set websocket dialer subprotocol
dialer.Subprotocols = []string{"grpc-websockets"}
dialer.Subprotocols = []string{"grpc-websockets"}
scheme := "ws"
scheme := "ws"
if opts.WithTLS {
if opts.WithTLS {
Loading