Skip to content
Snippets Groups Projects
Commit 5fb3d681 authored by Richard T. Carback III's avatar Richard T. Carback III
Browse files

SendCMIX progress so far

parent db3284e0
Branches
Tags
No related merge requests found
...@@ -157,37 +157,54 @@ var rootCmd = &cobra.Command{ ...@@ -157,37 +157,54 @@ var rootCmd = &cobra.Command{
jww.FATAL.Panicf("%+v", err) jww.FATAL.Panicf("%+v", err)
} }
// Send Messages time.Sleep(10 * time.Second)
msgBody := viper.GetString("message")
recipientIDBytes, err := hex.DecodeString( // Wait until connected or crash on timeout
viper.GetString("destid")) connected := make(chan bool, 1)
if err != nil { client.GetHealth().AddChannel(connected)
jww.FATAL.Panicf("%+v", err) waitTimeout := time.Duration(viper.GetUint("waitTimeout"))
timeoutTick := time.NewTicker(waitTimeout * time.Second)
isConnected := false
for !isConnected {
select {
case isConnected = <-connected:
jww.INFO.Printf("health status: %b\n",
isConnected)
break
case <-timeoutTick.C:
jww.FATAL.Panic("timeout on connection")
} }
recipientID, err := id.Unmarshal(recipientIDBytes)
if err != nil {
jww.FATAL.Panicf("%+v", err)
} }
// Send Messages
msgBody := viper.GetString("message")
recipientID := getUIDFromString(viper.GetString("destid"))
msg := client.NewCMIXMessage(recipientID, []byte(msgBody)) msg := client.NewCMIXMessage(recipientID, []byte(msgBody))
params := params.GetDefaultCMIX() params := params.GetDefaultCMIX()
sendCnt := int(viper.GetUint("sendCount")) sendCnt := int(viper.GetUint("sendCount"))
sendDelay := time.Duration(viper.GetUint("sendDelay")) sendDelay := time.Duration(viper.GetUint("sendDelay"))
for i := 0; i < sendCnt; i++ { for i := 0; i < sendCnt; i++ {
client.SendCMIX(msg, params) fmt.Printf("Sending to %s: %s\n", recipientID, msgBody)
roundID, err := client.SendCMIX(msg, params)
if err != nil {
jww.FATAL.Panicf("%+v", err)
}
jww.INFO.Printf("RoundID: %d\n", roundID)
time.Sleep(sendDelay * time.Millisecond) time.Sleep(sendDelay * time.Millisecond)
} }
// Wait until message timeout or we receive enough then exit // Wait until message timeout or we receive enough then exit
// TODO: Actually check for how many messages we've received // TODO: Actually check for how many messages we've received
receiveCnt := viper.GetUint("receiveCount") receiveCnt := viper.GetUint("receiveCount")
waitTimeout := time.Duration(viper.GetUint("waitTimeout")) timeoutTick = time.NewTicker(waitTimeout * time.Second)
timeoutTick := time.NewTicker(waitTimeout * time.Second) done := false
for { for !done {
select { select {
case <-timeoutTick.C: case <-timeoutTick.C:
fmt.Println("Timed out!") fmt.Println("Timed out!")
done = true
break break
} }
} }
...@@ -195,6 +212,19 @@ var rootCmd = &cobra.Command{ ...@@ -195,6 +212,19 @@ var rootCmd = &cobra.Command{
}, },
} }
func getUIDFromString(idStr string) *id.ID {
idBytes, err := hex.DecodeString(fmt.Sprintf("%0*d%s",
66-len(idStr), 0, idStr))
if err != nil {
jww.FATAL.Panicf("%+v", err)
}
ID, err := id.Unmarshal(idBytes)
if err != nil {
jww.FATAL.Panicf("%+v", err)
}
return ID
}
func initLog(verbose bool, logPath string) { func initLog(verbose bool, logPath string) {
if logPath != "-" && logPath != "" { if logPath != "-" && logPath != "" {
// Disable stdout output // Disable stdout output
......
...@@ -54,13 +54,22 @@ func newTracker(timeout time.Duration) *Tracker { ...@@ -54,13 +54,22 @@ func newTracker(timeout time.Duration) *Tracker {
// Add a channel to the list of Tracker channels // Add a channel to the list of Tracker channels
// such that each channel can be notified of network changes // such that each channel can be notified of network changes
func (t *Tracker) AddChannel(c chan bool) { func (t *Tracker) AddChannel(c chan bool) {
t.mux.Lock()
t.channels = append(t.channels, c) t.channels = append(t.channels, c)
t.mux.Unlock()
select {
case c <- t.IsHealthy():
default:
}
} }
// Add a function to the list of Tracker function // Add a function to the list of Tracker function
// such that each function can be run after network changes // such that each function can be run after network changes
func (t *Tracker) AddFunc(f func(isHealthy bool)) { func (t *Tracker) AddFunc(f func(isHealthy bool)) {
t.mux.Lock()
t.funcs = append(t.funcs, f) t.funcs = append(t.funcs, f)
t.mux.Unlock()
go f(t.IsHealthy())
} }
func (t *Tracker) IsHealthy() bool { func (t *Tracker) IsHealthy() bool {
...@@ -115,6 +124,7 @@ func (t *Tracker) start(quitCh <-chan struct{}) { ...@@ -115,6 +124,7 @@ func (t *Tracker) start(quitCh <-chan struct{}) {
// Handle thread kill // Handle thread kill
break break
case heartbeat = <-t.heartbeat: case heartbeat = <-t.heartbeat:
jww.INFO.Printf("heartbeat: %+v", heartbeat)
if healthy(heartbeat) { if healthy(heartbeat) {
timerChan = time.NewTimer(t.timeout).C timerChan = time.NewTimer(t.timeout).C
t.setHealth(true) t.setHealth(true)
......
...@@ -39,7 +39,7 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err ...@@ -39,7 +39,7 @@ func (m *Manager) SendCMIX(msg format.Message, param params.CMIX) (id.Round, err
//build the topology //build the topology
idList, err := id.NewIDListFromBytes(bestRound.Topology) idList, err := id.NewIDListFromBytes(bestRound.Topology)
if err == nil { if err != nil {
jww.ERROR.Printf("Failed to use topology for round %v: %s", bestRound.ID, err) jww.ERROR.Printf("Failed to use topology for round %v: %s", bestRound.ID, err)
continue continue
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment