Skip to content
Snippets Groups Projects
Select Git revision
  • 96dfef9c9e87c841897b6c8347e780eb59e99bdd
  • release default protected
  • 11-22-implement-kv-interface-defined-in-collectiveversionedkvgo
  • hotfix/TestHostPool_UpdateNdf_AddFilter
  • XX-4719/announcementChannels
  • xx-4717/logLevel
  • jonah/noob-channel
  • master protected
  • XX-4707/tagDiskJson
  • xx-4698/notification-retry
  • hotfix/notifylockup
  • syncNodes
  • hotfix/localCB
  • XX-4677/NewChanManagerMobile
  • XX-4689/DmSync
  • duplicatePrefix
  • XX-4601/HavenInvites
  • finalizedUICallbacks
  • XX-4673/AdminKeySync
  • debugNotifID
  • anne/test
  • v4.7.5
  • v4.7.4
  • v4.7.3
  • v4.7.2
  • v4.7.1
  • v4.6.3
  • v4.6.1
  • v4.5.0
  • v4.4.4
  • v4.3.11
  • v4.3.8
  • v4.3.7
  • v4.3.6
  • v4.3.5
  • v4.2.0
  • v4.3.0
  • v4.3.4
  • v4.3.3
  • v4.3.2
  • v4.3.1
41 results

manager_test.go

Blame
  • critical.go 4.20 KiB
    ///////////////////////////////////////////////////////////////////////////////
    // Copyright © 2020 xx network SEZC                                          //
    //                                                                           //
    // Use of this source code is governed by a license that can be found in the //
    // LICENSE file                                                              //
    ///////////////////////////////////////////////////////////////////////////////
    
    package message
    
    import (
    	jww "github.com/spf13/jwalterweatherman"
    	"gitlab.com/elixxir/client/interfaces/message"
    	"gitlab.com/elixxir/client/interfaces/params"
    	"gitlab.com/elixxir/client/interfaces/utility"
    	ds "gitlab.com/elixxir/comms/network/dataStructures"
    	"gitlab.com/elixxir/primitives/format"
    	"gitlab.com/elixxir/primitives/states"
    	"gitlab.com/xx_network/primitives/id"
    	"time"
    )
    
    // Critical Messages are protocol layer communications that must succeed. These
    // are added to the persistent critical messages store.  This thread waits for
    // network access to move from unhealthy to healthy and the sends all critical
    // messages.
    // Health is tracked by registering with the Health
    // Tracker (/network/Health/Tracker.g0)
    
    //Thread loop for processing critical messages
    func (m *Manager) processCriticalMessages(quitCh <-chan struct{}) {
    	done := false
    	for !done {
    		select {
    		case <-quitCh:
    			done = true
    		case isHealthy := <-m.networkIsHealthy:
    			if isHealthy {
    				m.criticalMessages()
    			}
    		}
    	}
    }
    
    // processes all critical messages
    func (m *Manager) criticalMessages() {
    	critMsgs := m.Session.GetCriticalMessages()
    	// try to send every message in the critical messages and the raw critical
    	// messages buffer in parallel
    
    	//critical messages
    	for msg, param, has := critMsgs.Next(); has; msg, param, has = critMsgs.Next() {
    		go func(msg message.Send, param params.E2E) {
    			//send the message
    			rounds, _, err := m.SendE2E(msg, param)
    			//if the message fail to send, notify the buffer so it can be handled
    			//in the future and exit
    			if err != nil {
    				jww.ERROR.Printf("Failed to send critical message on "+
    					"notification of healthy network: %+v", err)
    				critMsgs.Failed(msg)
    				return
    			}
    			jww.INFO.Printf("critical RoundIDs: %v", rounds)
    			//wait on the results to make sure the rounds were successful
    			sendResults := make(chan ds.EventReturn, len(rounds))
    			roundEvents := m.Instance.GetRoundEvents()
    			for _, r := range rounds {
    				roundEvents.AddRoundEventChan(r, sendResults, 1*time.Minute,
    					states.COMPLETED, states.FAILED)
    			}
    			success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, len(rounds))
    			if !success {
    				jww.ERROR.Printf("critical message send failed to transmit "+
    					"transmit %v/%v paritions: %v round failures, %v timeouts",
    					numRoundFail+numTimeOut, len(rounds), numRoundFail, numTimeOut)
    				critMsgs.Failed(msg)
    				return
    			}
    			critMsgs.Succeeded(msg)
    		}(msg, param)
    	}
    
    	critRawMsgs := m.Session.GetCriticalRawMessages()
    	param := params.GetDefaultCMIX()
    	//raw critical messages
    	for msg, rid, has := critRawMsgs.Next(); has; msg, rid, has = critRawMsgs.Next() {
    		localRid := rid.DeepCopy()
    		go func(msg format.Message, rid *id.ID) {
    			//send the message
    			round, _, err := m.SendCMIX(msg, rid, param)
    			//if the message fail to send, notify the buffer so it can be handled
    			//in the future and exit
    			if err != nil {
    				jww.ERROR.Printf("Failed to send critical message on "+
    					"notification of healthy network: %+v", err)
    				critRawMsgs.Failed(msg, rid)
    				return
    			}
    			jww.INFO.Printf("critical healthy RoundIDs: %v", round)
    
    			//wait on the results to make sure the rounds were sucesfull
    			sendResults := make(chan ds.EventReturn, 1)
    			roundEvents := m.Instance.GetRoundEvents()
    
    			roundEvents.AddRoundEventChan(round, sendResults, 1*time.Minute,
    				states.COMPLETED, states.FAILED)
    
    			success, numTimeOut, numRoundFail := utility.TrackResults(sendResults, 1)
    			if !success {
    				jww.ERROR.Printf("critical message send failed to transmit "+
    					"transmit %v/%v paritions: %v round failures, %v timeouts",
    					numRoundFail+numTimeOut, 1, numRoundFail, numTimeOut)
    				critRawMsgs.Failed(msg, rid)
    				return
    			}
    			critRawMsgs.Succeeded(msg, rid)
    		}(msg, localRid)
    	}
    
    }