Skip to content
Snippets Groups Projects
Commit 2d42c016 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

Initial restore contacts functionality

parent 9c8f99b9
No related branches found
No related tags found
2 merge requests!231Revert "Update store to print changes to the partners list",!171RestoreContactsFromBackup
...@@ -73,3 +73,8 @@ func (c *Contact) GetFactList() *FactList { ...@@ -73,3 +73,8 @@ func (c *Contact) GetFactList() *FactList {
func (c *Contact) Marshal() ([]byte, error) { func (c *Contact) Marshal() ([]byte, error) {
return c.c.Marshal(), nil return c.c.Marshal(), nil
} }
// GetAPIContact returns the api contact object. Not exported to bindings.
func (c *Contact) GetAPIContact() *contact.Contact {
return c.c
}
...@@ -9,22 +9,31 @@ package xxmutils ...@@ -9,22 +9,31 @@ package xxmutils
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"math"
"sync"
"time" "time"
jww "github.com/spf13/jwalterweatherman"
"gitlab.com/elixxir/client/api"
"gitlab.com/elixxir/client/bindings" "gitlab.com/elixxir/client/bindings"
"gitlab.com/elixxir/client/storage"
"gitlab.com/elixxir/client/storage/versioned"
"gitlab.com/elixxir/crypto/contact" "gitlab.com/elixxir/crypto/contact"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
) )
// RestoreContactsUpdater interface provides a callback funciton // RestoreContactsUpdater interface provides a callback function
// for receiving update information from RestoreContactsFromBackup. // for receiving update information from RestoreContactsFromBackup.
type RestoreContactsUpdater interface { type RestoreContactsUpdater interface {
// RestoreContactsCallback is called to report the current # of contacts // RestoreContactsCallback is called to report the current # of contacts
// that are done processing against the total number that need to be // that have been found and how many have been restored
// against the total number that need to be
// processed. If an error occurs it it set on the err variable as a // processed. If an error occurs it it set on the err variable as a
// plain string. // plain string.
RestoreContactsCallback(current, total int, err string) RestoreContactsCallback(numFound, numRestored, total int, err string)
} }
// RestoreContactsFromBackup takes as input the jason output of the // RestoreContactsFromBackup takes as input the jason output of the
...@@ -35,84 +44,253 @@ type RestoreContactsUpdater interface { ...@@ -35,84 +44,253 @@ type RestoreContactsUpdater interface {
func RestoreContactsFromBackup(backupPartnerIDs []byte, client *bindings.Client, func RestoreContactsFromBackup(backupPartnerIDs []byte, client *bindings.Client,
udManager *bindings.UserDiscovery, udManager *bindings.UserDiscovery,
updatesCb RestoreContactsUpdater) error { updatesCb RestoreContactsUpdater) error {
// Constants/control settings
numRoutines := 8
maxChanSize := 10000
restoreTimeout := time.Duration(30 * time.Second)
api := client.GetInternalClient() api := client.GetInternalClient()
// Unmarshal IDs store := stateStore{
apiStore: api.GetStorage(),
}
// Unmarshal IDs and then check restore state
var idList []*id.ID var idList []*id.ID
if err := json.Unmarshal(backupPartnerIDs, &idList); err != nil { if err := json.Unmarshal(backupPartnerIDs, &idList); err != nil {
return err return err
} }
lookupIDs, resetContacts := checkRestoreState(idList, store)
var idsToLookup []*id.ID // State variables, how many we have looked up successfully
for id := range idList { // and how many we have already reset.
// TODO: Check storage for ID, if present, skip totalCnt := len(idList)
// if !IDRestored(id) { lookupCnt := len(resetContacts)
idsToLookup := append(idsToLookup, id) resetCnt := totalCnt - len(resetContacts) - len(lookupIDs)
//}
// Before we start, report initial state
updatesCb.RestoreContactsCallback(lookupCnt, resetCnt, totalCnt, "")
// Initialize channels
chanSize := int(math.Min(float64(maxChanSize), float64(len(idList))))
// Jobs are processed via the following pipeline:
// lookupCh -> foundCh -> resetContactCh -> restoredCh
// foundCh and restoredCh are used to track progress
lookupCh := make(chan *id.ID, chanSize)
foundCh := make(chan *contact.Contact, chanSize)
resetContactCh := make(chan *contact.Contact, chanSize)
restoredCh := make(chan *contact.Contact, chanSize)
// Start routines for processing
lcWg := sync.WaitGroup{}
lcWg.Add(numRoutines)
rsWg := sync.WaitGroup{}
rsWg.Add(numRoutines)
for i := 0; i < numRoutines; i++ {
go LookupContacts(lookupCh, foundCh, udManager, lcWg)
go ResetSessions(resetContactCh, restoredCh, api, rsWg)
} }
idList = idsToLookup
// Load channels based on previous state
go func() {
for i := range lookupIDs {
lookupCh <- lookupIDs[i]
}
}()
go func() {
for i := range resetContacts {
lookupCnt += 1
resetContactCh <- resetContacts[i]
}
}()
// Event Processing
done := false
var err error
for !done {
// NOTE: Timer is reset every loop
timeoutTimer := time.NewTimer(restoreTimeout)
select {
case <-timeoutTimer.C:
err = errors.New("restoring accounts timed out")
done = true
case c := <-foundCh:
store.set(c, contactFound)
lookupCnt += 1
// NOTE: Prevent blocking by using routine here
go func() { resetContactCh <- c }()
case c := <-restoredCh:
store.set(c, contactRestored)
resetCnt += 1
}
if resetCnt == totalCnt {
done = true
}
updatesCb.RestoreContactsCallback(lookupCnt, resetCnt, totalCnt,
"")
}
// Cleanup
close(lookupCh)
close(resetContactCh)
// Now wait for subroutines to close before closing their output chans
lcWg.Wait()
close(foundCh)
rsWg.Wait()
close(restoredCh)
return err
}
// LookupContacts routine looks up contacts
func LookupContacts(in chan *id.ID, out chan *contact.Contact,
udManager *bindings.UserDiscovery, wg sync.WaitGroup) {
defer wg.Done()
// Start looking up contacts with user discovery and feed this // Start looking up contacts with user discovery and feed this
// contacts channel. // contacts channel.
contactsCh := LookupContacts(idList, client, udManager) for lookupID := range in {
c, err := LookupContact(lookupID, udManager)
// Send a reset for each contact we looked up if err != nil {
cnt := 0 jww.WARN.Printf("could not lookup %s: %v", lookupID,
total := len(idList) err)
msg := "Restored from backup" // Retry later
me := api.GetUser() in <- lookupID
for contact := range contactsCh { } else {
cnt += 1 out <- c
// Report lookup failures
if contact.DhPubKey == nil {
updatesCb.RestoreContactsCallback(cnt, total,
fmt.Sprintf("ID %s could not be found in "+
" User Discovery.", contact.ID))
continue
} }
_, err := api.ResetSession(contact, me, msg) }
}
func ResetSessions(in, out chan *contact.Contact, api api.Client,
wg sync.WaitGroup) {
defer wg.Done()
me := api.GetUser().GetContact()
msg := "Account reset from backup"
for c := range in {
_, err := api.ResetSession(*c, me, msg)
if err != nil { if err != nil {
jww.WARN.Printf("Could not reset: %v", err) jww.WARN.Printf("could not reset %s: %v",
// TODO: Add contact object back into channel? c.ID, err)
// other retry logic? in <- c
continue } else {
out <- c
} }
}
}
// TODO: Mark ID done in storage // LookupContact lookups up a contact using the user discovery manager
func LookupContact(userID *id.ID, udManager *bindings.UserDiscovery) (
*contact.Contact, error) {
// This is a little wonky, but wait until we get called then
// set the result to the contact objects details if there is
// no error
lookup := &lookupcb{}
waiter := sync.Mutex{}
var result *contact.Contact
var err error
lookup.CB = func(c *bindings.Contact, errStr string) {
defer waiter.Unlock()
if errStr != "" {
err = errors.New(errStr)
}
result = c.GetAPIContact()
} }
// Take lock once to make sure I will wait
waiter.Lock()
return nil // in MS, so 90 seconds
timeout := 90 * 1000
udManager.Lookup(userID[:], lookup, timeout)
// Now force a wait for callback to exit
waiter.Lock()
defer waiter.Unlock()
return result, err
} }
// lookupcb provides the callback interface for UserDiscovery lookup function.
type lookupcb struct { type lookupcb struct {
contactsCh chan *contact.Contact CB func(c *bindings.Contact, err string)
} }
func (l lookupcb) Callback(contact *bindings.Contact, err string) { // Callback implements desired interface
if err != nil && err != "" { func (l *lookupcb) Callback(c *bindings.Contact, err string) { l.CB(c, err) }
jww.WARN.Printf("Restoring contact: %s", err)
} // restoreState is the internal state of a contact
l.contactsCh <- contact type restoreState byte
const (
contactNotFound restoreState = iota
contactFound
contactRestored
)
////
// stateStore wraps a kv and stores contact state for the restoration
// TODO: Right now, it uses 1 contact-per-key approach, but it might make sense
// to wrap this in a mutex and load/store a whole list
////
const stateStoreFmt = "restoreContactsFromBackup/v1/%s"
type stateStore struct {
apiStore *storage.Session
// TODO: We could put a syncmap or something here instead of
// 1-key-per-id
} }
// LookupContacts starts a thread that looks up the contacts for each user func (s stateStore) key(id *id.ID) string {
// in the idList and returns their contact object. It returns a buffered channel return fmt.Sprintf(stateStoreFmt, id)
// of contact objects which can be used to reset sessions. If a user cannot be }
// found in user discovery, it returns a contact with an empty DhPubKey.
func LookupContacts(idList []*id.ID, udManager *bindings.UserDiscovery) chan *contact.Contact { func (s stateStore) set(user *contact.Contact, state restoreState) error {
contactsCh := make(chan *contact.Contact, len(idList)) key := s.key(user.ID)
timeout := int(time.Duration(90 * time.Second)) // First byte is state var, second is contact object
data := []byte{byte(state)}
lookup := lookupcb{contactsCh: contactsCh} data = append(data, user.Marshal()...)
val := &versioned.Object{
// TODO: Version: 0,
// numConcurrent := 8 Timestamp: time.Now(),
// then only run 8 at a time... Data: data,
for _, uid := range idList { }
go func() { return s.apiStore.Set(key, val)
udManager.Lookup(uid[:], lookup, timeout) }
}() func (s stateStore) get(id *id.ID) (restoreState, *contact.Contact, error) {
key := s.key(id)
val, err := s.apiStore.Get(key)
if err != nil {
return contactNotFound, nil, err
} }
user, err := contact.Unmarshal(val.Data[1:])
if err != nil {
return contactFound, nil, err
}
return restoreState(val.Data[0]), &user, nil
}
// stateStore END
return contactsCh func checkRestoreState(IDs []*id.ID, store stateStore) ([]*id.ID,
[]*contact.Contact) {
var idsToLookup []*id.ID
var contactsToReset []*contact.Contact
for i := range IDs {
id := IDs[i]
idState, user, err := store.get(id)
if err != nil {
// Ignore errors here since they always will result
// in a retry.
jww.WARN.Printf("Error on restore check for %s: %v",
id, err)
}
switch idState {
case contactNotFound:
idsToLookup = append(idsToLookup, id)
case contactFound:
contactsToReset = append(contactsToReset, user)
}
// Restored state means we do nothing.
}
return idsToLookup, contactsToReset
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment