Skip to content
Snippets Groups Projects
Select Git revision
  • f24abef729b7389d3af11f7397b467a7f40cf58f
  • main default protected
  • development
  • integration
  • v1.1.5
  • v1.1.4
  • v1.1.3
  • v1.1.2
  • v1.1.1
  • v1.1.0
  • v1.0.0
11 results

Environment.swift

Blame
  • handler.go 5.70 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package message
    
    import (
    	"fmt"
    	"strconv"
    	"sync"
    	"time"
    
    	"gitlab.com/elixxir/client/event"
    	"gitlab.com/elixxir/client/storage/versioned"
    	"gitlab.com/xx_network/primitives/id"
    
    	jww "github.com/spf13/jwalterweatherman"
    	"gitlab.com/elixxir/client/stoppable"
    	"gitlab.com/elixxir/primitives/format"
    	"gitlab.com/xx_network/primitives/netTime"
    )
    
    const (
    	inProcessKey = "InProcessMessagesKey"
    )
    
    type Handler interface {
    	GetMessageReceptionChannel() chan<- Bundle
    	StartProcesses() stoppable.Stoppable
    	CheckInProgressMessages()
    
    	// Fingerprints
    	AddFingerprint(clientID *id.ID, fingerprint format.Fingerprint, mp Processor) error
    	DeleteFingerprint(clientID *id.ID, fingerprint format.Fingerprint)
    	DeleteClientFingerprints(clientID *id.ID)
    
    	// Triggers
    	AddService(clientID *id.ID, newService Service, response Processor)
    	DeleteService(clientID *id.ID, toDelete Service, response Processor)
    	DeleteClientService(clientID *id.ID)
    	TrackServices(triggerTracker ServicesTracker)
    }
    
    type handler struct {
    	param Params
    
    	messageReception chan Bundle
    	checkInProgress  chan struct{}
    
    	inProcess *MeteredCmixMessageBuffer
    
    	events event.Reporter
    
    	FingerprintsManager
    	ServicesManager
    }
    
    func NewHandler(param Params, kv *versioned.KV, events event.Reporter,
    	standardID *id.ID) Handler {
    
    	garbled, err := NewOrLoadMeteredCmixMessageBuffer(kv, inProcessKey)
    	if err != nil {
    		jww.FATAL.Panicf(
    			"Failed to load or new the Garbled Messages system: %v", err)
    	}
    
    	m := handler{
    		param:            param,
    		messageReception: make(chan Bundle, param.MessageReceptionBuffLen),
    		checkInProgress:  make(chan struct{}, 100),
    		inProcess:        garbled,
    		events:           events,
    	}
    
    	m.FingerprintsManager = *newFingerprints(standardID)
    	m.ServicesManager = *NewServices()
    	return &m
    }
    
    // GetMessageReceptionChannel gets the channel to send received messages on.
    func (h *handler) GetMessageReceptionChannel() chan<- Bundle {
    	return h.messageReception
    }
    
    // StartProcesses starts all worker pool.
    func (h *handler) StartProcesses() stoppable.Stoppable {
    	multi := stoppable.NewMulti("MessageReception")
    
    	// Create the message handler workers
    	for i := uint(0); i < h.param.MessageReceptionWorkerPoolSize; i++ {
    		stop := stoppable.NewSingle(
    			"MessageReception Worker " + strconv.Itoa(int(i)))
    		go h.handleMessages(stop)
    		multi.Add(stop)
    	}
    
    	// Create the in progress messages thread
    	garbledStop := stoppable.NewSingle("GarbledMessages")
    	go h.recheckInProgressRunner(garbledStop)
    	multi.Add(garbledStop)
    
    	return multi
    }
    
    // handleMessages is a long-running thread that receives each Bundle from messageReception
    // and processes the messages in the Bundle
    func (h *handler) handleMessages(stop *stoppable.Single) {
    	for {
    		select {
    		case <-stop.Quit():
    			stop.ToStopped()
    			return
    		case bundle := <-h.messageReception:
    			go func() {
    				wg := sync.WaitGroup{}
    				wg.Add(len(bundle.Messages))
    				for i := range bundle.Messages {
    					msg := bundle.Messages[i]
    					jww.TRACE.Printf("handle IterMsgs: %s",
    						msg.Digest())
    
    					go func() {
    						count, ts := h.inProcess.Add(
    							msg, bundle.RoundInfo.Raw, bundle.Identity)
    						wg.Done()
    						h.handleMessage(count, ts, msg, bundle)
    					}()
    				}
    				wg.Wait()
    				bundle.Finish()
    			}()
    		}
    	}
    
    }
    
    // handleMessage processes an individual message in the Bundle
    // and handles the inProcess logic
    func (h *handler) handleMessage(count uint, ts time.Time, msg format.Message, bundle Bundle) {
    	success := h.handleMessageHelper(msg, bundle)
    	if success {
    		h.inProcess.Remove(
    			msg, bundle.RoundInfo.Raw, bundle.Identity)
    	} else {
    		// Fail the message if any part of the decryption
    		// fails, unless it is the last attempts and has
    		// been in the buffer long enough, in which case
    		// remove it
    		if count == h.param.MaxChecksInProcessMessage &&
    			netTime.Since(ts) > h.param.InProcessMessageWait {
    			h.inProcess.Remove(
    				msg, bundle.RoundInfo.Raw, bundle.Identity)
    		} else {
    			h.inProcess.Failed(
    				msg, bundle.RoundInfo.Raw, bundle.Identity)
    		}
    	}
    }
    
    // handleMessageHelper determines if any services or fingerprints match the given message
    // and runs the processor, returning whether a processor was found
    func (h *handler) handleMessageHelper(ecrMsg format.Message, bundle Bundle) bool {
    	fingerprint := ecrMsg.GetKeyFP()
    	identity := bundle.Identity
    	round := bundle.RoundInfo
    
    	jww.INFO.Printf("handleMessage(%s)", ecrMsg.Digest())
    
    	// If we have a fingerprint, process it
    	if proc, exists := h.pop(identity.Source, fingerprint); exists {
    		jww.DEBUG.Printf("handleMessage found fingerprint: %s",
    			ecrMsg.Digest())
    		proc.Process(ecrMsg, identity, round)
    		return true
    	}
    
    	services, exists := h.get(
    		identity.Source, ecrMsg.GetSIH(), ecrMsg.GetContents())
    	if exists {
    		for _, t := range services {
    			jww.DEBUG.Printf("handleMessage service found: %s, %s",
    				ecrMsg.Digest(), t)
    			go t.Process(ecrMsg, identity, round)
    		}
    		if len(services) == 0 {
    			jww.WARN.Printf("empty service list for %s: %s",
    				ecrMsg.Digest(), ecrMsg.GetSIH())
    		}
    		return true
    	}
    
    	im := fmt.Sprintf("Message cannot be identified: keyFP: %v, round: %d "+
    		"msgDigest: %s, not determined to be for client",
    		ecrMsg.GetKeyFP(), bundle.Round, ecrMsg.Digest())
    	jww.TRACE.Printf(im)
    
    	h.events.Report(1, "MessageReception", "Garbled", im)
    
    	return false
    }