diff --git a/api/results.go b/api/results.go index 885458fe2974d22b16592ba9e3f3ce2f5d4154a2..cf30e253356567ae2e777f99a061c56b02299940 100644 --- a/api/results.go +++ b/api/results.go @@ -67,6 +67,8 @@ type historicalRoundsComm interface { func (c *Client) GetRoundResults(roundList []id.Round, timeout time.Duration, roundCallback RoundEventCallback) error { + jww.INFO.Printf("GetRoundResults(%v, %s)", roundList, timeout) + sendResults := make(chan ds.EventReturn, len(roundList)) return c.getRoundResults(roundList, timeout, roundCallback, @@ -91,6 +93,8 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, allRoundsSucceeded := true numResults := 0 + oldestRound := networkInstance.GetOldestRoundID() + // Parse and adjudicate every round for _, rnd := range roundList { // Every round is timed out by default, until proven to have finished @@ -111,9 +115,7 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, numResults++ } } else { - jww.DEBUG.Printf("Failed to ger round [%d] in buffer: %v", rnd, err) // Update oldest round (buffer may have updated externally) - oldestRound := networkInstance.GetOldestRoundID() if rnd < oldestRound { // If round is older that oldest round in our buffer // Add it to the historical round request (performed later) @@ -151,10 +153,12 @@ func (c *Client) getRoundResults(roundList []id.Round, timeout time.Duration, roundCallback(false, true, roundsResults) return case roundReport := <-sendResults: + numResults-- + // Skip if the round is nil (unknown from historical rounds) // they default to timed out, so correct behavior is preserved - if roundReport.RoundInfo == nil || roundReport.TimedOut { + if roundReport.RoundInfo == nil || roundReport.TimedOut { allRoundsSucceeded = false } else { // If available, denote the result @@ -182,12 +186,14 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, var resp *pb.HistoricalRoundsResponse - for { + //retry 5 times + for i:=0;i<5;i++{ // Find a gateway to request about the roundRequests gwHost, err := gateway.Get(instance.GetPartialNdf().Get(), comms, c.rng.GetStream()) if err != nil { - jww.FATAL.Panicf("Failed to track network, NDF has corrupt "+ + jww.ERROR.Printf("Failed to track network, NDF has corrupt "+ "data: %s", err) + continue } // If an error, retry with (potentially) a different gw host. @@ -195,10 +201,17 @@ func (c *Client) getHistoricalRounds(msg *pb.HistoricalRounds, // and process rounds resp, err = comms.RequestHistoricalRounds(gwHost, msg) if err == nil { + jww.ERROR.Printf("Failed to lookup historical rounds: %s", + err) + }else{ break } } + if resp == nil{ + return + } + // Process historical rounds, sending back to the caller thread for _, ri := range resp.Rounds { sendResults <- ds.EventReturn{ diff --git a/auth/callback.go b/auth/callback.go index ef21512f88a06c19322c03518bc262d7d2e94ebe..2c39824cb8631b222d6140fd66d2e9f143873639 100644 --- a/auth/callback.go +++ b/auth/callback.go @@ -213,7 +213,7 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, if mgr, err := m.storage.E2e().GetPartner(sr.GetPartner()); mgr != nil || err == nil { jww.WARN.Printf("Cannot confirm auth for %s, channel already "+ "exists.", sr.GetPartner()) - m.storage.Auth().Fail(sr.GetPartner()) + m.storage.Auth().Done(sr.GetPartner()) return } @@ -221,7 +221,7 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, baseFmt, partnerPubKey, err := handleBaseFormat(cmixMsg, grp) if err != nil { jww.WARN.Printf("Failed to handle auth confirm: %s", err) - m.storage.Auth().Fail(sr.GetPartner()) + m.storage.Auth().Done(sr.GetPartner()) return } @@ -236,7 +236,7 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, if !success { jww.WARN.Printf("Recieved auth confirmation failed its mac " + "check") - m.storage.Auth().Fail(sr.GetPartner()) + m.storage.Auth().Done(sr.GetPartner()) return } @@ -244,7 +244,7 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, if err != nil { jww.WARN.Printf("Failed to unmarshal auth confirmation's "+ "encrypted payload: %s", err) - m.storage.Auth().Fail(sr.GetPartner()) + m.storage.Auth().Done(sr.GetPartner()) return } @@ -252,7 +252,7 @@ func (m *Manager) handleConfirm(cmixMsg format.Message, sr *auth.SentRequest, if err := m.doConfirm(sr, grp, partnerPubKey, sr.GetMyPrivKey(), sr.GetPartnerHistoricalPubKey(), ecrFmt.GetOwnership()); err != nil { jww.WARN.Printf("Confirmation failed: %s", err) - m.storage.Auth().Fail(sr.GetPartner()) + m.storage.Auth().Done(sr.GetPartner()) return } } diff --git a/auth/confirm.go b/auth/confirm.go index d91f81e33e9e1d52aac5136e620ec405842ccd24..40c983c6feb73446495bb9307a2c6793fca7b259 100644 --- a/auth/confirm.go +++ b/auth/confirm.go @@ -40,10 +40,11 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, return 0, errors.Errorf("failed to find a pending Auth Request: %s", err) } + defer storage.Auth().Done(partner.ID) // verify the passed contact matches what is stored if storedContact.DhPubKey.Cmp(partner.DhPubKey) != 0 { - storage.Auth().Fail(partner.ID) + storage.Auth().Done(partner.ID) return 0, errors.WithMessage(err, "Pending Auth Request has different "+ "pubkey than stored") } @@ -64,7 +65,6 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, salt := make([]byte, saltSize) _, err = rng.Read(salt) if err != nil { - storage.Auth().Fail(partner.ID) return 0, errors.Wrap(err, "Failed to generate salt for "+ "confirmation") } @@ -104,10 +104,9 @@ func ConfirmRequestAuth(partner contact.Contact, rng io.Reader, p := storage.E2e().GetE2ESessionParams() if err := storage.E2e().AddPartner(partner.ID, partner.DhPubKey, newPrivKey, p, p); err != nil { - storage.Auth().Fail(partner.ID) - return 0, errors.Errorf("Failed to create channel with partner (%s) "+ - "on confirmation: %+v", - partner.ID, err) + jww.WARN.Printf("Failed to create channel with partner (%s) "+ + "on confirmation, this is likley a replay: %s", + partner.ID, err.Error()) } // delete the in progress negotiation diff --git a/bindings/client.go b/bindings/client.go index b2c94fa95f46926ae891a8e9a41cb794cbdb28c8..d5c40e456de1b263a48184250cbb67fecc9e0c88 100644 --- a/bindings/client.go +++ b/bindings/client.go @@ -8,7 +8,6 @@ package bindings import ( - "encoding/json" "errors" "fmt" jww "github.com/spf13/jwalterweatherman" @@ -157,11 +156,7 @@ func UnmarshalContact(b []byte) (*Contact, error) { //Unmarshals a marshaled send report object, returns an error if it fails func UnmarshalSendReport(b []byte) (*SendReport, error) { sr := &SendReport{} - if err := json.Unmarshal(b, sr); err != nil { - return nil, errors.New(fmt.Sprintf("Failed to Unmarshal "+ - "Send Report: %+v", err)) - } - return sr, nil + return sr, sr.Unmarshal(b) } // StartNetworkFollower kicks off the tracking of the network. It starts @@ -363,16 +358,25 @@ func (c *Client) WaitForRoundCompletion(roundID int, // the same pointer. func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte, mdc MessageDeliveryCallback, timeoutMS int) error { - + jww.INFO.Printf("WaitForMessageDelivery(%v, _, %v)", + marshaledSendReport, timeoutMS) sr, err := UnmarshalSendReport(marshaledSendReport) if err != nil { return errors.New(fmt.Sprintf("Failed to "+ - "WaitForRoundCompletion callback due to bad Send Report: %+v", err)) + "WaitForMessageDelivery callback due to bad Send Report: %+v", err)) + } + + if sr==nil || sr.rl == nil || len(sr.rl.list) == 0{ + return errors.New(fmt.Sprintf("Failed to "+ + "WaitForMessageDelivery callback due to invalid Send Report " + + "unmarshal: %s", string(marshaledSendReport))) } f := func(allRoundsSucceeded, timedOut bool, rounds map[id.Round]api.RoundResult) { results := make([]byte, len(sr.rl.list)) - + jww.INFO.Printf("Processing WaitForMessageDelivery report " + + "for %v, success: %v, timedout: %v", sr.mid, allRoundsSucceeded, + timedOut) for i, r := range sr.rl.list { if result, exists := rounds[r]; exists { results[i] = byte(result) @@ -384,7 +388,9 @@ func (c *Client) WaitForMessageDelivery(marshaledSendReport []byte, timeout := time.Duration(timeoutMS) * time.Millisecond - return c.api.GetRoundResults(sr.rl.list, timeout, f) + err = c.api.GetRoundResults(sr.rl.list, timeout, f) + + return err } // Returns a user object from which all information about the current user diff --git a/bindings/message.go b/bindings/message.go index efe899de839473e304a3d57d4bc149e8aba2df2f..6cead4ad09a8be5fc49433bf9e68e5cf0ab87a9f 100644 --- a/bindings/message.go +++ b/bindings/message.go @@ -7,7 +7,9 @@ package bindings -import "gitlab.com/elixxir/client/interfaces/message" +import ( + "gitlab.com/elixxir/client/interfaces/message" +) // Message is a message received from the cMix network in the clear // or that has been decrypted using established E2E keys. @@ -36,10 +38,12 @@ func (m *Message) GetMessageType() int { } // Returns the message's timestamp in ms -func (m *Message) GetTimestampMS() int { - return int(m.r.Timestamp.UnixNano()/1000000) +func (m *Message) GetTimestampMS() int64 { + ts := m.r.Timestamp.UnixNano() + ts = (ts+999999)/1000000 + return ts } -func (m *Message) GetTimestampNano() int { - return int(m.r.Timestamp.UnixNano()) +func (m *Message) GetTimestampNano() int64 { + return m.r.Timestamp.UnixNano() } diff --git a/bindings/send.go b/bindings/send.go index a7e35873447758c22cb9ddfc8eddeaf86fe1bb73..673fad0003ac009b707de67be8f3edc9804f639f 100644 --- a/bindings/send.go +++ b/bindings/send.go @@ -135,6 +135,11 @@ type SendReport struct { mid e2e.MessageID } +type SendReportDisk struct{ + List []id.Round + Mid []byte +} + func (sr *SendReport) GetRoundList() *RoundList { return sr.rl } @@ -144,5 +149,22 @@ func (sr *SendReport) GetMessageID() []byte { } func (sr *SendReport) Marshal() ([]byte, error) { - return json.Marshal(sr) + srd := SendReportDisk{ + List: sr.rl.list, + Mid: sr.mid[:], + } + return json.Marshal(&srd) +} + +func (sr *SendReport) Unmarshal(b []byte) error { + srd := SendReportDisk{ + } + if err := json.Unmarshal(b, &srd); err!=nil{ + return errors.New(fmt.Sprintf("Failed to unmarshal send " + + "report: %s", err.Error())) + } + + copy(sr.mid[:],srd.Mid) + sr.rl = &RoundList{list:srd.List} + return nil } diff --git a/network/follow.go b/network/follow.go index 752476ea1e7b821d942ca88d3b36dba19d87c5a0..b3edc5a686bd751b7169f9208bdb219679f7b47a 100644 --- a/network/follow.go +++ b/network/follow.go @@ -26,6 +26,7 @@ import ( "bytes" "fmt" "math" + "sync/atomic" "time" jww "github.com/spf13/jwalterweatherman" @@ -64,8 +65,9 @@ func (m *manager) followNetwork(report interfaces.ClientErrorReport, quitCh <-ch case <-ticker.C: m.follow(report, rng, m.Comms) case <-TrackTicker.C: - jww.INFO.Println(m.tracker.Report()) - m.tracker = newPollTracker() + numPolls := atomic.SwapUint64(m.tracker, 0) + jww.INFO.Printf("Polled the network %d times in the " + + "last %s", numPolls, debugTrackPeriod) } } } @@ -80,7 +82,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, "impossible: %+v", err) } - m.tracker.Track(identity.EphId, identity.Source) + atomic.AddUint64(m.tracker, 1) //randomly select a gateway to poll //TODO: make this more intelligent @@ -110,11 +112,13 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, pollResp, err := comms.SendPoll(gwHost, &pollReq) if err != nil { - report( - "NetworkFollower", - fmt.Sprintf("Failed to poll network, \"%s\", Gateway: %s", err.Error(), gwHost.String()), - fmt.Sprintf("%+v", err), - ) + if report!=nil{ + report( + "NetworkFollower", + fmt.Sprintf("Failed to poll network, \"%s\", Gateway: %s", err.Error(), gwHost.String()), + fmt.Sprintf("%+v", err), + ) + } jww.ERROR.Printf("Unable to poll %s for NDF: %+v", gwHost, err) return } diff --git a/network/manager.go b/network/manager.go index b62bb1d007953db87ea63c010f4090c081c9e4b7..5d726cb088e53c4c12db98b8ac6fc9e13bd3de78 100644 --- a/network/manager.go +++ b/network/manager.go @@ -43,8 +43,8 @@ type manager struct { round *rounds.Manager message *message.Manager - //map of polls for debugging - tracker *pollTracker + //number of polls done in a period of time + tracker *uint64 //tracks already checked rounds checked *checkedRounds @@ -67,10 +67,12 @@ func NewManager(session *storage.Session, switchboard *switchboard.Switchboard, // set them here when they are needed on startup session.E2e().SetE2ESessionParams(params.E2EParams) + tracker := uint64(0) + //create manager object m := manager{ param: params, - tracker: newPollTracker(), + tracker: &tracker, checked: newCheckedRounds(), } diff --git a/storage/auth/store.go b/storage/auth/store.go index a03dee9eaced47223f597f14d78c6c093ede6cfd..d6730d03d8fcde7db0ee9edb76cb056942be8b7c 100644 --- a/storage/auth/store.go +++ b/storage/auth/store.go @@ -355,17 +355,18 @@ func (s *Store) GetRequest(partner *id.ID) (RequestType, *SentRequest, contact.C } } -// Fail is one of two calls after using a request. This one is to be used when +// Done is one of two calls after using a request. This one is to be used when // the use is unsuccessful. It will allow any thread waiting on access to // continue using the structure. // It does not return an error because an error is not handleable. -func (s *Store) Fail(partner *id.ID) { +func (s *Store) Done(partner *id.ID) { s.mux.RLock() r, ok := s.requests[*partner] s.mux.RUnlock() if !ok { - jww.ERROR.Panicf("Request cannot be failed, not found: %s", partner) + jww.ERROR.Panicf("Request cannot be finished, not " + + "found: %s", partner) return } diff --git a/storage/auth/store_test.go b/storage/auth/store_test.go index 94d0318f3d702cd7f9fb58860ada31a1b6f54b17..9f94169dc07eefe12c2d7ec9685d3d256fa2f326 100644 --- a/storage/auth/store_test.go +++ b/storage/auth/store_test.go @@ -526,11 +526,11 @@ func TestStore_Fail(t *testing.T) { } }() - s.Fail(c.ID) + s.Done(c.ID) // Check if the request's mutex is locked if reflect.ValueOf(&s.requests[*c.ID].mux).Elem().FieldByName("state").Int() != 0 { - t.Errorf("Fail() did not unlock mutex.") + t.Errorf("Done() did not unlock mutex.") } } @@ -540,11 +540,11 @@ func TestStore_Fail_RequestNotInMap(t *testing.T) { defer func() { if r := recover(); r == nil { - t.Errorf("Fail() did not panic when the request is not in map.") + t.Errorf("Done() did not panic when the request is not in map.") } }() - s.Fail(id.NewIdFromUInt(rand.Uint64(), id.User, t)) + s.Done(id.NewIdFromUInt(rand.Uint64(), id.User, t)) } // Happy path: receive request.