diff --git a/network/follow.go b/network/follow.go index 9dcc4085a3a4097c96fac8688923330b31df32cd..ca0634be7f36a44e00c487046e488ee57525a55b 100644 --- a/network/follow.go +++ b/network/follow.go @@ -336,6 +336,7 @@ func (m *manager) follow(report interfaces.ClientErrorReport, rng csprng.Source, }, roundsUnknown, abandon) for _, rid := range roundsWithMessages { + //denote that the round has been looked at in the tracking store if identity.CR.Check(rid) { m.round.GetMessagesFromRound(rid, identity) } diff --git a/network/rounds/check.go b/network/rounds/check.go index 03cd83718495b71ef5455e9112dfe95fdda89fe3..acd81a429a3d8dd8d9f394b50f772a2412eaa3b5 100644 --- a/network/rounds/check.go +++ b/network/rounds/check.go @@ -63,6 +63,12 @@ func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.Iden jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ "up messages via historical lookup", roundID, identity.EphId.Int64(), identity.Source) + //store the round as an unretreived round + err = m.Session.UncheckedRounds().AddRound(roundID,nil, + identity.EphId, identity.Source) + if err != nil { + jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) + } // If we didn't find it, send to Historical Rounds Retrieval m.historicalRounds <- historicalRoundRequest{ rid: roundID, @@ -73,6 +79,12 @@ func (m *Manager) GetMessagesFromRound(roundID id.Round, identity reception.Iden jww.INFO.Printf("Messages found in round %d for %d (%s), looking "+ "up messages via in ram lookup", roundID, identity.EphId.Int64(), identity.Source) + //store the round as an unretreived round + err = m.Session.UncheckedRounds().AddRound(roundID,ri, + identity.EphId, identity.Source) + if err != nil { + jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d", roundID) + } // If found, send to Message Retrieval Workers m.lookupRoundMessages <- roundLookup{ roundInfo: ri, diff --git a/network/rounds/remoteFilters_test.go b/network/rounds/remoteFilters_test.go index 73ce8476be59b4c814cb1a3d42ec8d6d8791db3d..6addedec8555e5bf7a55dec115813d8af0ac7993 100644 --- a/network/rounds/remoteFilters_test.go +++ b/network/rounds/remoteFilters_test.go @@ -118,7 +118,7 @@ func TestValidFilterRange(t *testing.T) { Identity: reception.Identity{ EphId: expectedEphID, Source: requestGateway, - StartValid: time.Now().Add(12 * time.Hour), + StartValid: time.Now().Add(-12 * time.Hour), EndValid: time.Now().Add(24 * time.Hour), }, } diff --git a/network/rounds/retrieve.go b/network/rounds/retrieve.go index 9559e51b573757dccabc7f962cbb115809adb129..f531292c56829a0f943dc639fe356283defb47b1 100644 --- a/network/rounds/retrieve.go +++ b/network/rounds/retrieve.go @@ -49,11 +49,10 @@ func (m *Manager) processMessageRetrieval(comms messageRetrievalComms, case rl := <-m.lookupRoundMessages: ri := rl.roundInfo jww.DEBUG.Printf("Checking for messages in round %d", ri.ID) - err := m.Session.UncheckedRounds().AddRound(rl.roundInfo, + err := m.Session.UncheckedRounds().AddRound(id.Round(ri.ID),nil, rl.identity.EphId, rl.identity.Source) if err != nil { - jww.ERROR.Printf("Could not add round %d in unchecked rounds store: %v", - rl.roundInfo.ID, err) + jww.FATAL.Panicf("Failed to denote Unchecked Round for round %d",id.Round(ri.ID)) } // Convert gateways in round to proper ID format diff --git a/network/rounds/unchecked.go b/network/rounds/unchecked.go index bd91584fad326ae04b8c6574951681fd93266b34..72b51fa15bbfd42fde5dbf778525bfc7874cc9c2 100644 --- a/network/rounds/unchecked.go +++ b/network/rounds/unchecked.go @@ -50,6 +50,24 @@ func (m *Manager) processUncheckedRounds(checkInterval time.Duration, backoffTab case <-ticker.C: iterator := func(rid id.Round, rnd rounds.UncheckedRound){ + //check if it needs to be processed by historical Rounds + if rnd.Info==nil{ + jww.INFO.Printf("Messages in round %d for %d (%s), looking "+ + "up messages via historical lookup", rnd.Id, rnd.EpdId.Int64(), + rnd.Source) + // If we didn't find it, send to Historical Rounds Retrieval + m.historicalRounds <- historicalRoundRequest{ + rid: rnd.Id, + identity: reception.IdentityUse{ + Identity: reception.Identity{ + EphId: rnd.EpdId, + Source: rnd.Source, + }, + }, + numAttempts: 0, + } + return + } // If this round is due for a round check, send the round over // to the retrieval thread. If not due, check next round. if isRoundCheckDue(rnd.NumChecks, rnd.LastCheck, backoffTable) { diff --git a/network/rounds/unchecked_test.go b/network/rounds/unchecked_test.go index 980ca1e6157a583cc239cab4332e358c5b04e84a..d50c936425f1b741eee94f51162b875ba987a9a2 100644 --- a/network/rounds/unchecked_test.go +++ b/network/rounds/unchecked_test.go @@ -63,7 +63,7 @@ func TestUncheckedRoundScheduler(t *testing.T) { } // Add round ot check - err := testManager.Session.UncheckedRounds().AddRound(roundInfo, expectedEphID, requestGateway) + err := testManager.Session.UncheckedRounds().AddRound(roundId, roundInfo, expectedEphID, requestGateway) if err != nil { t.Fatalf("Could not add round to session: %v", err) } diff --git a/storage/rounds/checkedRounds.go b/storage/rounds/checkedRounds.go index a7e3ebdecd425a3ba97902a2197fd3189b7b6f59..dcfd1e6678926285859d90a0f53efcf40f691b77 100644 --- a/storage/rounds/checkedRounds.go +++ b/storage/rounds/checkedRounds.go @@ -52,9 +52,10 @@ func newCheckedRounds(maxRounds int, store *utility.BlockStore) *CheckedRounds { return &CheckedRounds{ m: make(map[id.Round]interface{}), l: list.New(), - recent: []id.Round{}, + recent: make([]id.Round,0,maxRounds), store: store, maxRounds: maxRounds, + } } diff --git a/storage/rounds/uncheckedRounds.go b/storage/rounds/uncheckedRounds.go index a8a92cc7859855735d8da7380c5ca9b5ee71ac76..176144c96162e0b2e5382f312bf568f3cdda9d55 100644 --- a/storage/rounds/uncheckedRounds.go +++ b/storage/rounds/uncheckedRounds.go @@ -10,6 +10,7 @@ package rounds import ( "bytes" "encoding/binary" + "fmt" "github.com/golang/protobuf/proto" "github.com/pkg/errors" "gitlab.com/elixxir/client/storage/versioned" @@ -18,11 +19,13 @@ import ( "gitlab.com/xx_network/primitives/id/ephemeral" "gitlab.com/xx_network/primitives/netTime" "sync" + "testing" "time" ) const ( uncheckedRoundVersion = 0 + roundInfoVersion = 0 uncheckedRoundPrefix = "uncheckedRoundPrefix" // Key to store rounds uncheckedRoundKey = "uncheckRounds" @@ -46,6 +49,8 @@ type Identity struct { // These rounds are stored for retry of message retrieval type UncheckedRound struct { Info *pb.RoundInfo + Id id.Round + Identity // Timestamp in which round has last been checked LastCheck time.Time @@ -54,16 +59,19 @@ type UncheckedRound struct { } // marshal serializes UncheckedRound r into a byte slice -func (r UncheckedRound) marshal() ([]byte, error) { +func (r UncheckedRound) marshal(kv *versioned.KV) ([]byte, error) { buf := bytes.NewBuffer(nil) - // Write the round info + // Store teh round info + if r.Info !=nil{ + if err := storeRoundInfo(kv, r.Info); err!=nil{ + return nil,errors.WithMessagef(err,"failed to marshal unchecked rounds") + } + } + + //marshel the round ID b := make([]byte, uint64Size) - infoBytes, err := proto.Marshal(r.Info) - binary.LittleEndian.PutUint64(b, uint64(len(infoBytes))) + binary.LittleEndian.PutUint64(b, uint64(r.Id)) buf.Write(b) - buf.Write(infoBytes) - - b = make([]byte, uint64Size) // Write the round identity info buf.Write(r.Identity.EpdId[:]) @@ -92,22 +100,16 @@ func (r UncheckedRound) marshal() ([]byte, error) { } // unmarshal deserializes round data from buff into UncheckedRound r -func (r *UncheckedRound) unmarshal(buff *bytes.Buffer) error { +func (r *UncheckedRound) unmarshal(kv *versioned.KV, buff *bytes.Buffer) error { // Deserialize the roundInfo - roundInfoLen := binary.LittleEndian.Uint64(buff.Next(uint64Size)) - roundInfoBytes := buff.Next(int(roundInfoLen)) - ri := &pb.RoundInfo{} - if err := proto.Unmarshal(roundInfoBytes, ri); err != nil { - return errors.WithMessagef(err, "Failed to unmarshal roundInfo") - } - r.Info = ri + r.Id = id.Round(binary.LittleEndian.Uint64(buff.Next(uint64Size))) // Deserialize the round identity information copy(r.EpdId[:], buff.Next(uint64Size)) sourceId, err := id.Unmarshal(buff.Next(id.ArrIDLen)) if err != nil { - return errors.WithMessage(err, "Failed to unmarshal round identity.source") + return errors.WithMessagef(err, "Failed to unmarshal round identity.source of %d", r.Id) } r.Source = sourceId @@ -116,11 +118,13 @@ func (r *UncheckedRound) unmarshal(buff *bytes.Buffer) error { timestampLen := binary.LittleEndian.Uint64(buff.Next(uint64Size)) tsByes := buff.Next(int(uint64(timestampLen))) if err = r.LastCheck.UnmarshalBinary(tsByes); err != nil { - return errors.WithMessage(err, "Failed to unmarshal round timestamp") + return errors.WithMessagef(err, "Failed to unmarshal round timestamp of %d", r.Id) } r.NumChecks = binary.LittleEndian.Uint64(buff.Next(uint64Size)) + r.Info, _ = loadRoundInfo(kv, id.Round(r.Id)) + return nil } @@ -167,12 +171,13 @@ func LoadUncheckedStore(kv *versioned.KV) (*UncheckedRoundStore, error) { } // Adds a round to check on the list and saves to memory -func (s *UncheckedRoundStore) AddRound(ri *pb.RoundInfo, ephID ephemeral.Id, source *id.ID) error { +func (s *UncheckedRoundStore) AddRound(rid id.Round, ri *pb.RoundInfo, ephID ephemeral.Id, source *id.ID) error { s.mux.Lock() defer s.mux.Unlock() - rid := id.Round(ri.ID) - if _, exists := s.list[rid]; !exists { + stored, exists := s.list[rid] + + if !exists ||stored.Info == nil { newUncheckedRound := UncheckedRound{ Info: ri, Identity: Identity{ @@ -184,7 +189,6 @@ func (s *UncheckedRoundStore) AddRound(ri *pb.RoundInfo, ephID ephemeral.Id, sou } s.list[rid] = newUncheckedRound - return s.save() } @@ -199,6 +203,13 @@ func (s *UncheckedRoundStore) GetRound(rid id.Round) (UncheckedRound, bool) { return rnd, exists } +func (s *UncheckedRoundStore) GetList(t *testing.T) map[id.Round]UncheckedRound{ + s.mux.RLock() + defer s.mux.RUnlock() + return s.list +} + + // Retrieves the list of rounds func (s *UncheckedRoundStore) IterateOverList(iterator func(rid id.Round, rnd UncheckedRound)) { s.mux.RLock() @@ -284,7 +295,7 @@ func (s *UncheckedRoundStore) marshal() ([]byte, error) { buf.Write(b) for rid, rnd := range s.list { - rndData, err := rnd.marshal() + rndData, err := rnd.marshal(s.kv) if err != nil { return nil, errors.WithMessagef(err, "Failed to marshal round %d", rid) } @@ -304,13 +315,48 @@ func (s *UncheckedRoundStore) unmarshal(data []byte) error { for i := 0; i < int(length); i++ { rnd := UncheckedRound{} - err := rnd.unmarshal(buff) + err := rnd.unmarshal(s.kv, buff) if err != nil { return errors.WithMessage(err, "Failed to unmarshal rounds in storage") } - s.list[id.Round(rnd.Info.ID)] = rnd + s.list[rnd.Id] = rnd } return nil } + +func storeRoundInfo(kv *versioned.KV, info *pb.RoundInfo)error{ + now := netTime.Now() + + data, err := proto.Marshal(info) + if err!=nil{ + return errors.WithMessagef(err, "Failed to store individual unchecked round") + } + + obj := versioned.Object{ + Version: roundInfoVersion, + Timestamp: now, + Data: data, + } + + return kv.Set(roundKey(id.Round(info.ID)), roundInfoVersion, &obj) +} + +func loadRoundInfo(kv *versioned.KV, id id.Round)( *pb.RoundInfo, error){ + vo, err := kv.Get(roundKey(id), roundInfoVersion) + if err != nil { + return nil, err + } + + ri := &pb.RoundInfo{} + if err = proto.Unmarshal(vo.Data, ri); err != nil { + return nil, errors.WithMessagef(err, "Failed to unmarshal roundInfo") + } + + return ri, nil +} + +func roundKey(roundID id.Round)string{ + return fmt.Sprintf("roundInfo:%d", roundID) +} diff --git a/storage/rounds/uncheckedRounds_test.go b/storage/rounds/uncheckedRounds_test.go index 63e8c195f4f2ffa752979e5f5780acdc6d2d3f16..6d962a0dc5fc8e72fb2d42b978f81d41c89eb431 100644 --- a/storage/rounds/uncheckedRounds_test.go +++ b/storage/rounds/uncheckedRounds_test.go @@ -95,7 +95,7 @@ func TestLoadUncheckedStore(t *testing.T) { ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} source := id.NewIdFromBytes([]byte("Sauron"), t) - err = testStore.AddRound(roundInfo, ephId, source) + err = testStore.AddRound(id.Round(roundInfo.ID), roundInfo, ephId, source) if err != nil { t.Fatalf("LoadUncheckedStore error: "+ "Could not add round to store: %v", err) @@ -147,7 +147,7 @@ func TestUncheckedRoundStore_AddRound(t *testing.T) { } ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} source := id.NewIdFromBytes([]byte("Sauron"), t) - err = testStore.AddRound(roundInfo, ephId, source) + err = testStore.AddRound(id.Round(roundInfo.ID), roundInfo, ephId, source) if err != nil { t.Fatalf("AddRound error: "+ "Could not add round to store: %v", err) @@ -177,7 +177,7 @@ func TestUncheckedRoundStore_GetRound(t *testing.T) { } ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} source := id.NewIdFromBytes([]byte("Sauron"), t) - err = testStore.AddRound(roundInfo, ephId, source) + err = testStore.AddRound(id.Round(roundInfo.ID), roundInfo, ephId, source) if err != nil { t.Fatalf("GetRound error: "+ "Could not add round to store: %v", err) @@ -231,7 +231,7 @@ func TestUncheckedRoundStore_GetList(t *testing.T) { } ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} source := id.NewIdFromUInt(uint64(i), id.User, t) - err = testStore.AddRound(roundInfo, ephId, source) + err = testStore.AddRound(id.Round(roundInfo.ID), roundInfo, ephId, source) if err != nil { t.Errorf("GetList error: "+ "Could not add round to store: %v", err) @@ -239,7 +239,7 @@ func TestUncheckedRoundStore_GetList(t *testing.T) { } // Retrieve list - retrievedList := testStore.GetList() + retrievedList := testStore.GetList(t) if len(retrievedList) != numRounds { t.Errorf("GetList error: "+ "List returned is not of expected size."+ @@ -275,7 +275,7 @@ func TestUncheckedRoundStore_IncrementCheck(t *testing.T) { } ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} source := id.NewIdFromUInt(uint64(i), id.User, t) - err = testStore.AddRound(roundInfo, ephId, source) + err = testStore.AddRound(id.Round(roundInfo.ID), roundInfo, ephId, source) if err != nil { t.Errorf("IncrementCheck error: "+ "Could not add round to store: %v", err) @@ -338,7 +338,7 @@ func TestUncheckedRoundStore_Remove(t *testing.T) { } ephId := ephemeral.Id{1, 2, 3, 4, 5, 6, 7, 8} source := id.NewIdFromUInt(uint64(i), id.User, t) - err = testStore.AddRound(roundInfo, ephId, source) + err = testStore.AddRound(id.Round(roundInfo.ID), roundInfo, ephId, source) if err != nil { t.Errorf("Remove error: "+ "Could not add round to store: %v", err)