Skip to content
Snippets Groups Projects

Channels impl

Merged Benjamin Wenger requested to merge channelsImpl into project/Channels
1 file
+ 27
0
Compare changes
  • Side-by-side
  • Inline
+ 27
0
@@ -49,6 +49,8 @@ type NameService interface {
ValidateChannelMessage(username string, lease time.Time, pubKey ed25519.PublicKey, authorIDSignature []byte) bool
}
// loadRegistrationDisk loads a registrationDisk from the kv
// and returns the registrationDisk.
func loadRegistrationDisk(kv *versioned.KV) (registrationDisk, error) {
obj, err := kv.Get(registrationDiskKey, registrationDiskVersion)
if err != nil {
@@ -57,6 +59,8 @@ func loadRegistrationDisk(kv *versioned.KV) (registrationDisk, error) {
return UnmarshallRegistrationDisk(obj.Data)
}
// saveRegistrationDisk saves the given saveRegistrationDisk to
// the given kv.
func saveRegistrationDisk(kv *versioned.KV, reg registrationDisk) error {
regBytes, err := reg.Marshall()
if err != nil {
@@ -71,6 +75,8 @@ func saveRegistrationDisk(kv *versioned.KV, reg registrationDisk) error {
return nil
}
// registrationDisk is used to encapsulate the channel user's key pair,
// lease and lease signature.
type registrationDisk struct {
rwmutex sync.RWMutex
@@ -80,6 +86,7 @@ type registrationDisk struct {
Signature []byte
}
// newRegistrationDisk creates a new newRegistrationDisk.
func newRegistrationDisk(publicKey ed25519.PublicKey, privateKey ed25519.PrivateKey,
lease time.Time, signature []byte) registrationDisk {
return registrationDisk{
@@ -90,6 +97,8 @@ func newRegistrationDisk(publicKey ed25519.PublicKey, privateKey ed25519.Private
}
}
// Update updates the registrationDisk that is currently
// stored on the kv with a new lease and lease signature.
func (r registrationDisk) Update(lease int64, signature []byte) {
r.rwmutex.Lock()
defer r.rwmutex.Unlock()
@@ -98,6 +107,7 @@ func (r registrationDisk) Update(lease int64, signature []byte) {
r.Signature = signature
}
// Marshall marshalls the registrationDisk.
func (r registrationDisk) Marshall() ([]byte, error) {
r.rwmutex.RLock()
defer r.rwmutex.RUnlock()
@@ -105,6 +115,7 @@ func (r registrationDisk) Marshall() ([]byte, error) {
return json.Marshal(&r)
}
// UnmarshallRegistrationDisk unmarshalls a registrationDisk
func UnmarshallRegistrationDisk(data []byte) (registrationDisk, error) {
var r registrationDisk
err := json.Unmarshal(data, &r)
@@ -114,6 +125,7 @@ func UnmarshallRegistrationDisk(data []byte) (registrationDisk, error) {
return r, nil
}
// GetLease returns the current registrationDisk lease.
func (r registrationDisk) GetLease() time.Time {
r.rwmutex.RLock()
defer r.rwmutex.RUnlock()
@@ -121,6 +133,7 @@ func (r registrationDisk) GetLease() time.Time {
return time.Unix(0, r.Lease)
}
// GetPublicKey returns the current public key.
func (r registrationDisk) GetPublicKey() ed25519.PublicKey {
r.rwmutex.RLock()
defer r.rwmutex.RUnlock()
@@ -128,6 +141,7 @@ func (r registrationDisk) GetPublicKey() ed25519.PublicKey {
return r.PublicKey
}
// GetPrivateKey returns the current private key.
func (r registrationDisk) GetPrivateKey() ed25519.PrivateKey {
r.rwmutex.RLock()
defer r.rwmutex.RUnlock()
@@ -135,6 +149,7 @@ func (r registrationDisk) GetPrivateKey() ed25519.PrivateKey {
return r.PrivateKey
}
// GetLeaseSignature returns the currentl signature and lease time.
func (r registrationDisk) GetLeaseSignature() ([]byte, time.Time) {
r.rwmutex.RLock()
defer r.rwmutex.RUnlock()
@@ -142,6 +157,9 @@ func (r registrationDisk) GetLeaseSignature() ([]byte, time.Time) {
return r.Signature, time.Unix(0, r.Lease)
}
// clientIDTracker encapsulates the client channel lease and the
// repetitive scheduling of new lease registrations when the
// current lease expires.
type clientIDTracker struct {
kv *versioned.KV
@@ -157,8 +175,10 @@ type clientIDTracker struct {
udPubKey ed25519.PublicKey
}
// clientIDTracker implements the NameService interface.
var _ NameService = (*clientIDTracker)(nil)
// newclientIDTracker creates a new clientIDTracker.
func newclientIDTracker(comms channelLeaseComms, host *connect.Host, username string, kv *versioned.KV,
receptionIdentity xxdk.ReceptionIdentity, udPubKey ed25519.PublicKey, rngSource *fastRNG.StreamGenerator) *clientIDTracker {
@@ -204,6 +224,8 @@ func (c *clientIDTracker) Start() (stoppable.Stoppable, error) {
return stopper, nil
}
// registrationWorker is meant to run in it's own goroutine
// periodically registering, getting a new lease.
func (c *clientIDTracker) registrationWorker(stopper *stoppable.Single) {
for {
if time.Now().After(c.registrationDisk.GetLease().Add(-graceDuration)) {
@@ -256,6 +278,9 @@ func (c *clientIDTracker) ValidateChannelMessage(username string, lease time.Tim
return channel.VerifyChannelLease(authorIDSignature, pubKey, username, lease, c.udPubKey)
}
// register causes a request for a new channel lease to be sent to
// the user discovery server. If successful in procuration of a new lease
// then it is written to the registrationDisk on the kv.
func (c *clientIDTracker) register() error {
lease, signature, err := c.requestChannelLease()
if err != nil {
@@ -267,6 +292,8 @@ func (c *clientIDTracker) register() error {
return nil
}
// requestChannelLease requests a new channel lease
// from the user discovery server.
func (c *clientIDTracker) requestChannelLease() (int64, []byte, error) {
ts := time.Now().UnixNano()
privKey, err := c.receptionIdentity.GetRSAPrivateKey()
Loading