diff --git a/single/listener.go b/single/listener.go index 869cbac443b93baf6fed142f89653a7f88113987..138eb86f02322bd269a324001e80b2822413eb2d 100644 --- a/single/listener.go +++ b/single/listener.go @@ -89,28 +89,28 @@ func (l *listener) Process(ecrMsg format.Message, func (l *listener) process(ecrMsg format.Message, receptionID receptionID.EphemeralIdentity, round rounds.Round) error { // Unmarshal the cMix message contents to a request message - requestMsg, err := message.UnmarshalRequest(ecrMsg.GetContents(), + request, err := message.UnmarshalRequest(ecrMsg.GetContents(), l.grp.GetP().ByteLen()) if err != nil { return errors.Errorf("could not unmarshal contents: %+v", err) } // Generate DH key and symmetric key - senderPubkey := requestMsg.GetPubKey(l.grp) + senderPubkey := request.GetPubKey(l.grp) dhKey := l.grp.Exp(senderPubkey, l.myPrivKey, l.grp.NewInt(1)) key := singleUse.NewRequestKey(dhKey) // Verify the MAC - if !singleUse.VerifyMAC(key, requestMsg.GetPayload(), ecrMsg.GetMac()) { + if !singleUse.VerifyMAC(key, request.GetPayload(), ecrMsg.GetMac()) { return errors.New("failed to verify MAC") } // Decrypt the request message payload fp := ecrMsg.GetKeyFP() - decryptedPayload := cAuth.Crypt(key, fp[:24], requestMsg.GetPayload()) + decryptedPayload := cAuth.Crypt(key, fp[:24], request.GetPayload()) // Unmarshal payload - payload, err := message.UnmarshalRequestPayload(decryptedPayload) + requestPayload, err := message.UnmarshalRequestPayload(decryptedPayload) if err != nil { return errors.Errorf("could not unmarshal decrypted payload: %+v", err) } @@ -118,11 +118,11 @@ func (l *listener) process(ecrMsg format.Message, cbFunc := func(payloadContents []byte, rounds []rounds.Round) { used := uint32(0) r := Request{ - sender: payload.GetRecipientID(requestMsg.GetPubKey(l.grp)), + sender: requestPayload.GetRecipientID(request.GetPubKey(l.grp)), senderPubKey: senderPubkey, dhKey: dhKey, tag: l.tag, - maxParts: payload.GetMaxResponseParts(), + maxParts: requestPayload.GetMaxResponseParts(), used: &used, requestPayload: payloadContents, net: l.net, @@ -131,9 +131,9 @@ func (l *listener) process(ecrMsg format.Message, go l.cb.Callback(&r, receptionID, rounds) } - if numParts := payload.GetNumParts(); numParts > 1 { + if numParts := requestPayload.GetNumRequestParts(); numParts > 1 { c := message.NewCollator(numParts) - _, _, err = c.Collate(payload) + _, _, err = c.Collate(requestPayload) if err != nil { return errors.Errorf("could not collate initial payload: %+v", err) } @@ -143,8 +143,6 @@ func (l *listener) process(ecrMsg format.Message, ridCollector := newRoundIdCollector(int(numParts)) for i, cy := range cyphers { - key = singleUse.NewRequestPartKey(dhKey, uint64(i+1)) - fp = singleUse.NewRequestPartFingerprint(dhKey, uint64(i+1)) p := &requestPartProcessor{ myId: l.myID, tag: l.tag, @@ -154,7 +152,7 @@ func (l *listener) process(ecrMsg format.Message, roundIDs: ridCollector, } - err = l.net.AddFingerprint(l.myID, fp, p) + err = l.net.AddFingerprint(l.myID, cy.getFingerprint(), p) if err != nil { return errors.Errorf("could not add fingerprint for single-"+ "use request part %d of %d: %+v", i, numParts, err) @@ -163,7 +161,7 @@ func (l *listener) process(ecrMsg format.Message, l.net.CheckInProgressMessages() } else { - cbFunc(payload.GetContents(), []rounds.Round{round}) + cbFunc(requestPayload.GetContents(), []rounds.Round{round}) } return nil diff --git a/single/listener_test.go b/single/listener_test.go index 366907e527492c921d382bebead2499cf7d7a18e..f6f6bed68b4c5f1fe9f1a3e5849e73df2579e0ee 100644 --- a/single/listener_test.go +++ b/single/listener_test.go @@ -124,9 +124,10 @@ func Test_listener_Process(t *testing.T) { } // newRequestMessage creates a new encrypted request message for testing. -func newRequestMessage(payload []byte, grp *cyclic.Group, recipient contact.Contact, - rng io.Reader, handler *mockListenCmixHandler, t *testing.T) ( - format.Message, receptionID.EphemeralIdentity, *cyclic.Int, *cyclic.Int) { +func newRequestMessage(payload []byte, grp *cyclic.Group, + recipient contact.Contact, rng io.Reader, handler *mockListenCmixHandler, + t *testing.T) (format.Message, receptionID.EphemeralIdentity, *cyclic.Int, + *cyclic.Int) { net := newMockListenCmix(handler) maxResponseMessages := uint8(6) diff --git a/single/message/collator_test.go b/single/message/collator_test.go index 617697db480b6dd6f6375757b326952db17021f9..e45d169741e9b510164157cbaee2835ca53157de 100644 --- a/single/message/collator_test.go +++ b/single/message/collator_test.go @@ -127,4 +127,6 @@ type mockPart struct { func (m mockPart) GetNumParts() uint8 { return m.numParts } func (m mockPart) GetPartNum() uint8 { return m.partNum } func (m mockPart) GetContents() []byte { return m.contents } -func (m mockPart) Marshal() []byte { return append([]byte{m.numParts, m.partNum}, m.contents...) } +func (m mockPart) Marshal() []byte { + return append([]byte{m.numParts, m.partNum}, m.contents...) +} diff --git a/single/message/request.go b/single/message/request.go index 3519902e75b6804df0e4ca0108b396e5af79863b..f20c36a7fe025bd0ec0a8490a31f4e5bfa38b426 100644 --- a/single/message/request.go +++ b/single/message/request.go @@ -220,7 +220,6 @@ func mapRequestPayload(data []byte) RequestPayload { size: data[nonceSize+numRequestPartsSize+maxResponsePartsSize : requestMinSize], contents: data[requestMinSize:], } - mp.numRequestParts[0] = 1 return mp } diff --git a/single/message/request_test.go b/single/message/request_test.go index 8bdc81cf91acd357db9d471bffc2235fe34ef2d2..3a9120f11883fd19af2630e14cb8a0603a0a2aa0 100644 --- a/single/message/request_test.go +++ b/single/message/request_test.go @@ -194,7 +194,6 @@ func TestNewRequestPayload(t *testing.T) { size: make([]byte, sizeSize), contents: make([]byte, payloadSize-requestMinSize), } - expected.numRequestParts[0] = 1 binary.BigEndian.PutUint16(expected.size, uint16(payloadSize-requestMinSize)) expected.SetMaxResponseParts(10) expected.data = append(expected.nonce, diff --git a/single/params.go b/single/params.go new file mode 100644 index 0000000000000000000000000000000000000000..10d60a908022f6d7c0e4857133a8e7a7b8444306 --- /dev/null +++ b/single/params.go @@ -0,0 +1,43 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package single + +import ( + "gitlab.com/elixxir/client/cmix" + "time" +) + +// Default values. +const ( + defaultRequestTimeout = 30 * time.Second + defaultMaxResponseMessages = 255 +) + +// RequestParams contains configurable parameters for sending a single-use +// request message. +type RequestParams struct { + // Timeout is the duration to wait before timing out while sending a request + Timeout time.Duration + + // MaxResponseMessages is the maximum number of messages allowed in the + // response to the request + MaxResponseMessages uint8 + + // CmixParams is the parameters used in sending a cMix message + CmixParams cmix.CMIXParams +} + +// GetDefaultRequestParams returns a RequestParams with the default +// configuration. +func GetDefaultRequestParams() RequestParams { + return RequestParams{ + Timeout: defaultRequestTimeout, + MaxResponseMessages: defaultMaxResponseMessages, + CmixParams: cmix.GetDefaultCMIXParams(), + } +} diff --git a/single/params_test.go b/single/params_test.go new file mode 100644 index 0000000000000000000000000000000000000000..39f56c80c888e1496dffebb2b9f60d8951cd055e --- /dev/null +++ b/single/params_test.go @@ -0,0 +1,32 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package single + +import ( + "gitlab.com/elixxir/client/cmix" + "reflect" + "testing" +) + +// Tests that GetDefaultRequestParams returns a RequestParams with the expected +// default values. +func TestGetDefaultRequestParams(t *testing.T) { + expected := RequestParams{ + Timeout: defaultRequestTimeout, + MaxResponseMessages: defaultMaxResponseMessages, + CmixParams: cmix.GetDefaultCMIXParams(), + } + + params := GetDefaultRequestParams() + params.CmixParams.Stop = expected.CmixParams.Stop + + if !reflect.DeepEqual(expected, params) { + t.Errorf("Failed to get expected default params."+ + "\nexpected: %+v\nreceived: %+v", expected, params) + } +} diff --git a/single/receivedRequest.go b/single/receivedRequest.go index cb445e8b0a02b15d6458dd824a3cdd478d1167f3..6f81f17e7c0832a4addea0ed8c8f155560b2ab7f 100644 --- a/single/receivedRequest.go +++ b/single/receivedRequest.go @@ -22,6 +22,15 @@ import ( "time" ) +// Error messages. +const ( + // Request.Respond + errSendResponse = "%d responses failed to send, the response will be handleable and will time out" + errUsed = "cannot respond to single-use response that has already been responded to" + errMaxResponseLength = "length of provided payload %d greater than max payload %d" + errTrackResults = "tracking results of %d rounds: %d round failures, %d round event time-outs; the send cannot be retried." +) + // Request contains the information contained in a single-use request message. type Request struct { sender *id.ID // ID of the sender/ID to send response to @@ -54,15 +63,13 @@ func (r *Request) Respond(payload []byte, cMixParams cmix.CMIXParams, // Make sure this has only been run once newRun := atomic.CompareAndSwapUint32(r.used, 0, 1) if !newRun { - return nil, errors.Errorf("cannot respond to single-use response " + - "that has already been responded to") + return nil, errors.New(errUsed) } // Check that the payload is not too long if len(payload) > r.GetMaxResponseLength() { - return nil, errors.Errorf("length of provided payload too long for "+ - "message payload capacity, max: %d, received: %d", - r.GetMaxResponseLength(), len(payload)) + return nil, errors.Errorf( + errMaxResponseLength, len(payload), r.GetMaxResponseLength()) } // Partition the payload @@ -74,21 +81,17 @@ func (r *Request) Respond(payload []byte, cMixParams cmix.CMIXParams, rounds := make([]id.Round, len(parts)) sendResults := make(chan ds.EventReturn, len(parts)) - if cMixParams.DebugTag != cmix.DefaultDebugTag { - cMixParams.DebugTag = "single.Response" - } - - // fixme: should the above debug tag and the below service tag be flipped?? - svc := cmixMsg.Service{ - Identifier: r.dhKey.Bytes(), - Tag: "single.response-dummyService", - Metadata: nil, + if cMixParams.DebugTag == cmix.DefaultDebugTag || cMixParams.DebugTag == "" { + cMixParams.DebugTag = "single-use.Response" } var wg sync.WaitGroup wg.Add(len(parts)) failed := uint32(0) + jww.INFO.Printf("[SU] Sending single-use response cMix message with %d "+ + "parts to %s (%s)", len(parts), r.sender, r.tag) + for i := 0; i < len(parts); i++ { go func(i int, part []byte) { defer wg.Done() @@ -96,16 +99,18 @@ func (r *Request) Respond(payload []byte, cMixParams cmix.CMIXParams, // Send Message round, ephID, err := r.net.Send( - r.sender, partFP, svc, ecrPart, mac, cMixParams) + r.sender, partFP, cmixMsg.Service{}, ecrPart, mac, cMixParams) if err != nil { atomic.AddUint32(&failed, 1) jww.ERROR.Printf("[SU] Failed to send single-use response "+ - "cMix message part %d of %d: %+v", i, len(parts), err) + "cMix message part %d of %d to %s (%s): %+v", + i, len(parts), r.sender, r.tag, err) return } jww.DEBUG.Printf("[SU] Sent single-use response cMix message part "+ - "%d on round %d to address ID %d.", i, round, ephID.Int64()) + "%d of %d on round %d to %s (eph ID %d) (%s).", + i, len(parts), round, r.sender, ephID.Int64(), r.tag) rounds[i] = round r.net.GetInstance().GetRoundEvents().AddRoundEventChan( @@ -117,12 +122,11 @@ func (r *Request) Respond(payload []byte, cMixParams cmix.CMIXParams, wg.Wait() if failed > 0 { - return nil, errors.Errorf("One or more send failed for the " + - "response, the response will be handleable and will time out") + return nil, errors.Errorf(errSendResponse, failed) } - jww.DEBUG.Printf("Sent %d single-use response CMIX messages to %s.", - len(parts), r.sender) + jww.INFO.Printf("[SU] Sent single-use response cMix message with %d "+ + "parts to %s (%s).", len(parts), r.sender, r.tag) // Count the number of rounds roundMap := map[id.Round]struct{}{} @@ -136,12 +140,11 @@ func (r *Request) Respond(payload []byte, cMixParams cmix.CMIXParams, success, numRoundFail, numTimeOut := cmix.TrackResults( sendResults, len(roundMap)) if !success { - return nil, errors.Errorf("tracking results of %d rounds: %d round "+ - "failures, %d round event time outs; the send cannot be retried.", - len(rounds), numRoundFail, numTimeOut) + return nil, errors.Errorf( + errTrackResults, len(rounds), numRoundFail, numTimeOut) } - jww.DEBUG.Printf("Tracked %d single-use response message round(s).", + jww.DEBUG.Printf("[SU] Tracked %d single-use response message round(s).", len(roundMap)) return rounds, nil @@ -225,8 +228,9 @@ func splitPayload(payload []byte, maxSize, maxParts int) [][]byte { return parts } -// BuildTestRequest can be used for mocking a Request -func BuildTestRequest(payload []byte, t *testing.T) *Request { +// BuildTestRequest can be used for mocking a Request. Should only be used for +// tests. +func BuildTestRequest(payload []byte, _ testing.TB) *Request { return &Request{ sender: nil, senderPubKey: nil, diff --git a/single/request.go b/single/request.go index 6cb079652cca7e72a82c2c1f623802a5ff19f64a..ec068aebc084f1d682a82e561c6a6b17ead1f2af 100644 --- a/single/request.go +++ b/single/request.go @@ -19,6 +19,7 @@ import ( "gitlab.com/xx_network/primitives/netTime" "io" "sync" + "sync/atomic" "time" ) @@ -28,20 +29,6 @@ type Response interface { rounds []rounds.Round, err error) } -type RequestParams struct { - Timeout time.Duration - MaxResponseMessages uint8 - CmixParam cmix.CMIXParams -} - -func GetDefaultRequestParams() RequestParams { - return RequestParams{ - Timeout: 30 * time.Second, - MaxResponseMessages: 255, - CmixParam: cmix.GetDefaultCMIXParams(), - } -} - // Error messages. const ( // TransmitRequest @@ -51,7 +38,7 @@ const ( errMakeIDs = "failed to generate IDs (%s for %s): %+v" errAddFingerprint = "failed to add fingerprint %d of %d: %+v (%s for %s)" errSendRequest = "failed to send %s request to %s: %+v" - errSendRequestPart = "failed to send request part %d of %d (%s for %s): %+v" + errSendRequestPart = "%d requests failed to send, the request will be handleable and will time out" // generateDhKeys errGenerateInGroup = "failed to generate private key in group: %+v" @@ -69,8 +56,8 @@ const maxNumRequestParts = 255 // GetMaxRequestSize returns the maximum size of a request payload. func GetMaxRequestSize(net Cmix, e2eGrp *cyclic.Group) int { - payloadSize := message.GetRequestPayloadSize(net.GetMaxMessageLength(), - e2eGrp.GetP().ByteLen()) + payloadSize := message.GetRequestPayloadSize( + net.GetMaxMessageLength(), e2eGrp.GetP().ByteLen()) requestSize := message.GetRequestContentsSize(payloadSize) requestPartSize := message.GetRequestPartContentsSize( net.GetMaxMessageLength()) @@ -96,7 +83,7 @@ func GetMaxRequestSize(net Cmix, e2eGrp *cyclic.Group) int { // // The network follower must be running and healthy to transmit. func TransmitRequest(recipient contact.Contact, tag string, payload []byte, - callback Response, param RequestParams, net Cmix, rng csprng.Source, + responseCB Response, params RequestParams, net Cmix, rng csprng.Source, e2eGrp *cyclic.Group) ([]id.Round, receptionID.EphemeralIdentity, error) { if len(payload) > GetMaxRequestSize(net, e2eGrp) { @@ -132,12 +119,13 @@ func TransmitRequest(recipient contact.Contact, tag string, payload []byte, request := message.NewRequest( net.GetMaxMessageLength(), e2eGrp.GetP().ByteLen()) requestPayload := message.NewRequestPayload( - request.GetPayloadSize(), firstPart, param.MaxResponseMessages) + request.GetPayloadSize(), firstPart, params.MaxResponseMessages) + requestPayload.SetNumRequestParts(1 + uint8(len(parts))) // Generate new user ID and address ID var sendingID receptionID.EphemeralIdentity requestPayload, sendingID, err = makeIDs( - requestPayload, publicKey, addressSize, param.Timeout, timeStart, rng) + requestPayload, publicKey, addressSize, params.Timeout, timeStart, rng) if err != nil { return nil, receptionID.EphemeralIdentity{}, errors.Errorf(errMakeIDs, tag, recipient, err) @@ -156,7 +144,7 @@ func TransmitRequest(recipient contact.Contact, tag string, payload []byte, request.SetPayload(encryptedPayload) // Register the response pickup - collator := message.NewCollator(param.MaxResponseMessages) + collator := message.NewCollator(params.MaxResponseMessages) timeoutKillChan := make(chan bool) var callbackOnce sync.Once wrapper := func(payload []byte, receptionID receptionID.EphemeralIdentity, @@ -168,11 +156,11 @@ func TransmitRequest(recipient contact.Contact, tag string, payload []byte, callbackOnce.Do(func() { net.DeleteClientFingerprints(sendingID.Source) - go callback.Callback(payload, receptionID, rounds, err) + go responseCB.Callback(payload, receptionID, rounds, err) }) } - cyphers := makeCyphers(dhKey, param.MaxResponseMessages, + cyphers := makeCyphers(dhKey, params.MaxResponseMessages, singleUse.NewResponseKey, singleUse.NewResponseFingerprint) roundIds := newRoundIdCollector(len(cyphers)) @@ -195,7 +183,7 @@ func TransmitRequest(recipient contact.Contact, tag string, payload []byte, } } - net.AddIdentity(sendingID.Source, timeStart.Add(param.Timeout), false) + net.AddIdentity(sendingID.Source, timeStart.Add(params.Timeout), false) // Send the payload svc := cmixMsg.Service{ @@ -203,36 +191,74 @@ func TransmitRequest(recipient contact.Contact, tag string, payload []byte, Tag: tag, Metadata: nil, } - param.CmixParam.Timeout = param.Timeout + params.CmixParams.Timeout = params.Timeout + if params.CmixParams.DebugTag == cmix.DefaultDebugTag || + params.CmixParams.DebugTag == "" { + params.CmixParams.DebugTag = "single-use.Request" + } - rid, _, err := net.Send( - recipient.ID, fp, svc, request.Marshal(), mac, param.CmixParam) + jww.INFO.Printf("[SU] Sending single-use request cMix message with %d "+ + "parts to %s (%s).", 1+len(parts), recipient.ID, tag) + + rid, ephID, err := net.Send( + recipient.ID, fp, svc, request.Marshal(), mac, params.CmixParams) if err != nil { return nil, receptionID.EphemeralIdentity{}, errors.Errorf(errSendRequest, tag, recipient, err) } + jww.DEBUG.Printf("[SU] Sent single-use request cMix message part "+ + "%d of %d on round %d to %s (eph ID %d) (%s).", + 0, len(parts)+1, rid, recipient.ID, ephID.Int64(), tag) + + var wg sync.WaitGroup + wg.Add(len(parts)) + failed := uint32(0) + roundIDs := make([]id.Round, len(parts)+1) roundIDs[0] = rid for i, part := range parts { - requestPart := message.NewRequestPart(net.GetMaxMessageLength()) - requestPart.SetPartNum(uint8(i + 1)) - requestPart.SetContents(part) + go func(i int, part []byte) { + defer wg.Done() + requestPart := message.NewRequestPart(net.GetMaxMessageLength()) + requestPart.SetPartNum(uint8(i)) + requestPart.SetContents(part) + + key := singleUse.NewRequestPartKey(dhKey, uint64(i)) + fp := singleUse.NewRequestPartFingerprint(dhKey, uint64(i)) + encryptedPayload := auth.Crypt(key, fp[:24], requestPart.Marshal()) + mac := singleUse.MakeMAC(key, encryptedPayload) + + var ephID ephemeral.Id + var err error + roundIDs[i], ephID, err = net.Send(recipient.ID, fp, + cmixMsg.Service{}, encryptedPayload, mac, params.CmixParams) + if err != nil { + atomic.AddUint32(&failed, 1) + jww.ERROR.Printf("[SU] Failed to send single-use request "+ + "cMix message part %d of %d to %s (%s): %+v", + i, len(part)+1, recipient.ID, tag, err) + return + } + + jww.DEBUG.Printf("[SU] Sent single-use request cMix message part "+ + "%d of %d on round %d to %s (eph ID %d) (%s).", i, + len(parts)+1, roundIDs[i], recipient.ID, ephID.Int64(), tag) + }(i+1, part) + } - key = singleUse.NewResponseKey(dhKey, uint64(i+1)) - fp = singleUse.NewResponseFingerprint(dhKey, uint64(i+1)) - encryptedPayload = auth.Crypt(key, fp[:24], requestPart.Marshal()) - mac = singleUse.MakeMAC(key, encryptedPayload) + // Wait for all go routines to finish + wg.Wait() - roundIDs[i+1], _, err = net.Send( - recipient.ID, fp, svc, encryptedPayload, mac, param.CmixParam) - if err != nil { - return nil, receptionID.EphemeralIdentity{}, errors.Errorf( - errSendRequestPart, i, len(part), tag, recipient, err) - } + if failed > 0 { + return nil, receptionID.EphemeralIdentity{}, + errors.Errorf(errSendRequestPart, failed) } - remainingTimeout := param.Timeout - netTime.Since(timeStart) + jww.INFO.Printf("[SU] Sent single-use request cMix message with %d "+ + "parts to %s (%s).", 1+len(parts), recipient.ID, tag) + + remainingTimeout := params.Timeout - netTime.Since(timeStart) go waitForTimeout(timeoutKillChan, wrapper, remainingTimeout) return []id.Round{rid}, sendingID, nil @@ -294,8 +320,8 @@ func makeIDs(payload message.RequestPayload, publicKey *cyclic.Int, } jww.INFO.Printf("[SU] Generated singe-use sender reception ID: %s, "+ - "ephId: %d, publicKey: %x, payload: %q", - rid, ephID.Int64(), publicKey.Bytes(), payload) + "ephId: %d, publicKey: %s, payload: %q", + rid, ephID.Int64(), publicKey.Text(10), payload) return payload, receptionID.EphemeralIdentity{ EphId: ephID, @@ -310,12 +336,8 @@ func waitForTimeout(kill chan bool, cb callbackWrapper, timeout time.Duration) { case <-kill: return case <-time.After(timeout): - cb( - nil, - receptionID.EphemeralIdentity{}, - nil, - errors.Errorf(errResponseTimeout, timeout), - ) + cb(nil, receptionID.EphemeralIdentity{}, nil, + errors.Errorf(errResponseTimeout, timeout)) } } diff --git a/single/requestPartProcessor.go b/single/requestPartProcessor.go index 55575023ebcb62dad92c10d3a0e5045191c27ba5..8649237e539e6892c3e7fcdea9bc13c961b71fa4 100644 --- a/single/requestPartProcessor.go +++ b/single/requestPartProcessor.go @@ -34,21 +34,21 @@ func (rpp *requestPartProcessor) Process(msg format.Message, decrypted, err := rpp.cy.decrypt(msg.GetContents(), msg.GetMac()) if err != nil { jww.ERROR.Printf("[SU] Failed to decrypt single-use request payload "+ - "for %s to %s: %+v", rpp.tag, rpp.myId, err) + "%d (%s): %+v", rpp.cy.num, rpp.tag, err) return } requestPart, err := message.UnmarshalRequestPart(decrypted) if err != nil { jww.ERROR.Printf("[SU] Failed to unmarshal single-use request part "+ - "payload for %s to %s: %+v", rpp.tag, rpp.myId, err) + "payload %d (%s): %+v", rpp.cy.num, rpp.tag, err) return } payload, done, err := rpp.c.Collate(requestPart) if err != nil { jww.ERROR.Printf("[SU] Failed to collate single-use request payload "+ - "for %s to %s: %+v", rpp.tag, rpp.myId, err) + "%d (%s): %+v", rpp.cy.num, rpp.tag, err) return } diff --git a/single/request_test.go b/single/request_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7e2847324e140fba5e36320c6a22c01650cfac04 --- /dev/null +++ b/single/request_test.go @@ -0,0 +1,204 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package single + +import ( + "bytes" + "fmt" + jww "github.com/spf13/jwalterweatherman" + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/client/single/message" + "gitlab.com/elixxir/crypto/contact" + dh "gitlab.com/elixxir/crypto/diffieHellman" + "gitlab.com/elixxir/crypto/fastRNG" + "gitlab.com/xx_network/crypto/csprng" + "gitlab.com/xx_network/primitives/id" + "reflect" + "testing" + "time" +) + +func TestGetMaxRequestSize(t *testing.T) { +} + +type mockResponse struct { + payloadChan chan []byte +} + +func (m mockResponse) Callback( + payload []byte, _ receptionID.EphemeralIdentity, _ []rounds.Round, _ error) { + m.payloadChan <- payload +} + +type mockReceiver struct { + t testing.TB + response []byte + requestChan chan *Request +} + +func (m *mockReceiver) Callback( + request *Request, _ receptionID.EphemeralIdentity, _ []rounds.Round) { + m.requestChan <- request + _, err := request.Respond(m.response, cmix.GetDefaultCMIXParams(), 0) + if err != nil { + m.t.Errorf("Failed to respond: %+v", err) + } +} + +// Tests single-use request and response. +func TestTransmitRequest(t *testing.T) { + jww.SetStdoutThreshold(jww.LevelDebug) + rng := fastRNG.NewStreamGenerator(12, 1024, csprng.NewSystemRNG).GetStream() + handler := newMockCmixHandler() + myID := id.NewIdFromString("myID", id.User, t) + net := newMockCmix(myID, handler, t) + grp := net.GetInstance().GetE2EGroup() + + partnerPrivKey := dh.GeneratePrivateKey(dh.DefaultPrivateKeyLength, grp, rng) + partnerPubKey := dh.GeneratePublicKey(partnerPrivKey, grp) + + recipient := contact.Contact{ + ID: id.NewIdFromString("recipient", id.User, t), + DhPubKey: partnerPubKey, + } + + buff := bytes.NewBuffer(nil) + payloadSize := message.GetRequestPayloadSize(net.GetMaxMessageLength(), + grp.GetP().ByteLen()) + requestSize := message.GetRequestContentsSize(payloadSize) + firstPart := make([]byte, requestSize) + copy(firstPart, "First part payload.") + buff.Write(firstPart) + requestPartSize := message.GetRequestPartContentsSize( + net.GetMaxMessageLength()) + + for i := 0; i < 10; i++ { + part := make([]byte, requestPartSize) + copy(part, fmt.Sprintf("Part #%d payload.", i)) + buff.Write(part) + } + payload := buff.Bytes() + + tag := "TestTransmitRequest" + responsePayload := make([]byte, 4096) + copy(responsePayload, "My response.") + responseChan := make(chan []byte, 10) + response := mockResponse{responseChan} + params := GetDefaultRequestParams() + + requestChan := make(chan *Request, 10) + recipientNet := newMockCmix(recipient.ID, handler, t) + _ = Listen(tag, recipient.ID, partnerPrivKey, recipientNet, grp, + &mockReceiver{t, responsePayload, requestChan}) + + _, _, err := TransmitRequest( + recipient, tag, payload, response, params, net, rng, grp) + if err != nil { + t.Errorf("TransmitRequest returned an error: %+v", err) + } + + select { + case r := <-requestChan: + if !bytes.Equal(r.GetPayload(), payload) { + t.Errorf("Received unexpected request payload."+ + "\nexpected: %q\nreceived: %q", payload, r.GetPayload()) + } + case <-time.After(30 * time.Millisecond): + t.Errorf("Timed out waiting to receive request.") + } + + select { + case r := <-responseChan: + if !bytes.Equal(r, responsePayload) { + t.Errorf("Received unexpected response.\nexpected: %q\nreceived: %q", + payload, r) + } + case <-time.After(30 * time.Millisecond): + t.Errorf("Timed out waiting to receive response.") + } +} + +// Tests that waitForTimeout returns and does not call the callback when the +// kill channel is used. +func Test_waitForTimeout(t *testing.T) { + timeout := 15 * time.Millisecond + cbChan := make(chan error, 1) + cb := func( + _ []byte, _ receptionID.EphemeralIdentity, _ []rounds.Round, err error) { + cbChan <- err + } + killChan := make(chan bool, 1) + + go func() { + time.Sleep(timeout / 2) + killChan <- true + }() + + waitForTimeout(killChan, cb, timeout) + + select { + case <-cbChan: + t.Error("Callback called when waitForTimeout should have been killed.") + case <-time.After(timeout): + } +} + +// Error path: tests that waitForTimeout returns an error on the callback when +// the timeout is reached. +func Test_waitForTimeout_TimeoutError(t *testing.T) { + timeout := 15 * time.Millisecond + expectedErr := fmt.Sprintf(errResponseTimeout, timeout) + cbChan := make(chan error) + cb := func( + _ []byte, _ receptionID.EphemeralIdentity, _ []rounds.Round, err error) { + cbChan <- err + } + killChan := make(chan bool) + + go waitForTimeout(killChan, cb, timeout) + + select { + case r := <-cbChan: + if r == nil || r.Error() != expectedErr { + t.Errorf("Did not get expected error on callback."+ + "\nexpected: %s\nreceived: %+v", expectedErr, r) + } + case <-time.After(timeout * 2): + t.Errorf("Timed out waiting on callback.") + } +} + +// Builds a payload alongside the expected first part and list of subsequent +// parts and tests that partitionPayload properly partitions the payload into +// the expected parts. +func Test_partitionPayload(t *testing.T) { + const partSize = 16 + expectedFirstPart := []byte("first part") + expectedParts := make([][]byte, 10) + payload := bytes.NewBuffer(expectedFirstPart) + for i := range expectedParts { + expectedParts[i] = make([]byte, partSize) + copy(expectedParts[i], fmt.Sprintf("Part #%d", i)) + payload.Write(expectedParts[i]) + } + + firstPart, parts := partitionPayload( + len(expectedFirstPart), partSize, payload.Bytes()) + + if !bytes.Equal(expectedFirstPart, firstPart) { + t.Errorf("Received unexpected first part.\nexpected: %q\nreceived: %q", + expectedFirstPart, firstPart) + } + + if !reflect.DeepEqual(expectedParts, parts) { + t.Errorf("Received unexpected parts.\nexpected: %q\nreceived: %q", + expectedParts, parts) + } +} diff --git a/single/utils_test.go b/single/utils_test.go new file mode 100644 index 0000000000000000000000000000000000000000..473bb2dfa4025c02cee6bbca4d25e7e7506da3ef --- /dev/null +++ b/single/utils_test.go @@ -0,0 +1,221 @@ +//////////////////////////////////////////////////////////////////////////////// +// Copyright © 2020 xx network SEZC // +// // +// Use of this source code is governed by a license that can be found in the // +// LICENSE file // +//////////////////////////////////////////////////////////////////////////////// + +package single + +import ( + "gitlab.com/elixxir/client/cmix" + "gitlab.com/elixxir/client/cmix/identity/receptionID" + "gitlab.com/elixxir/client/cmix/message" + "gitlab.com/elixxir/client/cmix/rounds" + "gitlab.com/elixxir/comms/network" + "gitlab.com/elixxir/primitives/format" + "gitlab.com/xx_network/comms/connect" + "gitlab.com/xx_network/primitives/id" + "gitlab.com/xx_network/primitives/id/ephemeral" + "sync" + "testing" + "time" +) + +//////////////////////////////////////////////////////////////////////////////// +// Mock cMix Client // +//////////////////////////////////////////////////////////////////////////////// + +// Tests that mockCmix adheres to the Cmix interface. +var _ Cmix = (*mockCmix)(nil) + +type mockCmixHandler struct { + fingerprints map[id.ID]map[format.Fingerprint]message.Processor + services map[id.ID]map[string]message.Processor + sends []send + sync.Mutex +} + +func newMockCmixHandler() *mockCmixHandler { + return &mockCmixHandler{ + fingerprints: make(map[id.ID]map[format.Fingerprint]message.Processor), + services: make(map[id.ID]map[string]message.Processor), + sends: []send{}, + } +} + +type mockCmix struct { + myID *id.ID + numPrimeBytes int + addressSpaceSize uint8 + health bool + instance *network.Instance + handler *mockCmixHandler + t testing.TB + sync.Mutex +} + +type send struct { + myID *id.ID + recipient *id.ID + ms message.Service + msg format.Message +} + +func newMockCmix(myID *id.ID, handler *mockCmixHandler, t testing.TB) *mockCmix { + comms := &connect.ProtoComms{Manager: connect.NewManagerTesting(t)} + def := getNDF() + + instance, err := network.NewInstanceTesting(comms, def, def, nil, nil, t) + if err != nil { + panic(err) + } + + return &mockCmix{ + myID: myID, + numPrimeBytes: 1024, + addressSpaceSize: 18, + health: true, + instance: instance, + handler: handler, + t: t, + } +} + +func (m *mockCmix) IsHealthy() bool { + return m.health +} + +func (m *mockCmix) GetMaxMessageLength() int { + msg := format.NewMessage(m.numPrimeBytes) + return msg.ContentsSize() +} + +func (m *mockCmix) GetAddressSpace() uint8 { + return m.addressSpaceSize +} + +func (m *mockCmix) DeleteClientFingerprints(identity *id.ID) { + m.handler.Lock() + defer m.handler.Unlock() + delete(m.handler.fingerprints, *identity) +} + +func (m *mockCmix) AddFingerprint( + identity *id.ID, fp format.Fingerprint, mp message.Processor) error { + m.handler.Lock() + defer m.handler.Unlock() + if _, exists := m.handler.fingerprints[*identity]; !exists { + m.handler.fingerprints[*identity] = + map[format.Fingerprint]message.Processor{fp: mp} + } else { + m.handler.fingerprints[*identity][fp] = mp + } + return nil +} + +func (m *mockCmix) AddIdentity(*id.ID, time.Time, bool) {} + +func (m *mockCmix) Send(recipient *id.ID, fp format.Fingerprint, + ms message.Service, payload, mac []byte, _ cmix.CMIXParams) ( + id.Round, ephemeral.Id, error) { + + msg := format.NewMessage(m.numPrimeBytes) + msg.SetMac(mac) + msg.SetKeyFP(fp) + msg.SetContents(payload) + + m.handler.Lock() + defer m.handler.Unlock() + + var sent bool + + if _, exists := m.handler.fingerprints[*recipient]; exists { + if mp, exists := m.handler.fingerprints[*recipient][fp]; exists { + sent = true + go mp.Process( + msg, receptionID.EphemeralIdentity{Source: m.myID}, rounds.Round{}) + } + } + + if _, exists := m.handler.services[*recipient]; exists { + if mp, exists := m.handler.services[*recipient][serviceKey(ms)]; exists { + sent = true + go mp.Process( + msg, receptionID.EphemeralIdentity{Source: m.myID}, rounds.Round{}) + } + } + + if !sent { + m.handler.sends = append(m.handler.sends, send{ + myID: m.myID, + recipient: recipient, + ms: ms, + msg: msg, + }) + } + + return 0, ephemeral.Id{}, nil +} + +func serviceKey(ms message.Service) string { + return string(append(ms.Identifier, []byte(ms.Tag)...)) +} + +func (m *mockCmix) AddService(clientID *id.ID, ms message.Service, mp message.Processor) { + m.handler.Lock() + defer m.handler.Unlock() + + if _, exists := m.handler.services[*clientID]; !exists { + m.handler.services[*clientID] = map[string]message.Processor{ + serviceKey(ms): mp} + } else { + m.handler.services[*clientID][serviceKey(ms)] = mp + } +} + +func (m *mockCmix) DeleteService(clientID *id.ID, ms message.Service, _ message.Processor) { + m.handler.Lock() + defer m.handler.Unlock() + + if _, exists := m.handler.services[*clientID]; exists { + delete(m.handler.services[*clientID], serviceKey(ms)) + } +} + +func (m *mockCmix) GetInstance() *network.Instance { + return m.instance +} + +func (m *mockCmix) CheckInProgressMessages() { + m.handler.Lock() + defer m.handler.Unlock() + + var newSends []send + + for _, s := range m.handler.sends { + var sent bool + + if _, exists := m.handler.fingerprints[*s.recipient]; exists { + if mp, exists := m.handler.fingerprints[*s.recipient][s.msg.GetKeyFP()]; exists { + sent = true + go mp.Process( + s.msg, receptionID.EphemeralIdentity{Source: s.myID}, rounds.Round{}) + } + } + + if _, exists := m.handler.services[*s.recipient]; exists { + if mp, exists := m.handler.services[*s.recipient][serviceKey(s.ms)]; exists { + sent = true + go mp.Process( + s.msg, receptionID.EphemeralIdentity{Source: s.myID}, rounds.Round{}) + } + } + + if !sent { + newSends = append(newSends, s) + } + } + + m.handler.sends = newSends +}