Skip to content
Snippets Groups Projects
Select Git revision
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • release default protected
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
40 results

manager.go

Blame
  • manager.go 2.15 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package rounds
    
    import (
    	"fmt"
    	"gitlab.com/elixxir/client/interfaces/params"
    	"gitlab.com/elixxir/client/network/gateway"
    	"gitlab.com/elixxir/client/network/internal"
    	"gitlab.com/elixxir/client/network/message"
    	"gitlab.com/elixxir/client/stoppable"
    )
    
    type Manager struct {
    	params params.Rounds
    	internal.Internal
    	sender *gateway.Sender
    
    	historicalRounds    chan historicalRoundRequest
    	lookupRoundMessages chan roundLookup
    	messageBundles      chan<- message.Bundle
    }
    
    func NewManager(internal internal.Internal, params params.Rounds,
    	bundles chan<- message.Bundle, sender *gateway.Sender) *Manager {
    	m := &Manager{
    		params: params,
    
    		historicalRounds:    make(chan historicalRoundRequest, params.HistoricalRoundsBufferLen),
    		lookupRoundMessages: make(chan roundLookup, params.LookupRoundsBufferLen),
    		messageBundles:      bundles,
    		sender:              sender,
    	}
    
    	m.Internal = internal
    	return m
    }
    
    func (m *Manager) StartProcessors() stoppable.Stoppable {
    
    	multi := stoppable.NewMulti("Rounds")
    
    	//start the historical rounds thread
    	historicalRoundsStopper := stoppable.NewSingle("ProcessHistoricalRounds")
    	go m.processHistoricalRounds(m.Comms, historicalRoundsStopper.Quit())
    	multi.Add(historicalRoundsStopper)
    
    	//start the message retrieval worker pool
    	for i := uint(0); i < m.params.NumMessageRetrievalWorkers; i++ {
    		stopper := stoppable.NewSingle(fmt.Sprintf("Messager Retriever %v", i))
    		go m.processMessageRetrieval(m.Comms, stopper.Quit())
    		multi.Add(stopper)
    	}
    
    	// Start the periodic unchecked round worker
    	stopper := stoppable.NewSingle("UncheckRound")
    	go m.processUncheckedRounds(m.params.UncheckRoundPeriod, backOffTable, stopper.Quit())
    	multi.Add(stopper)
    
    	return multi
    }