From 1120d1b7736b34e7224eb753c08819c0dbec850d Mon Sep 17 00:00:00 2001 From: "Richard T. Carback III" <rick.carback@gmail.com> Date: Fri, 28 Aug 2020 19:01:13 +0000 Subject: [PATCH] More thread high level logic --- io/threads.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/io/threads.go b/io/threads.go index d9488f1b6..6713e3a86 100644 --- a/io/threads.go +++ b/io/threads.go @@ -143,4 +143,78 @@ func processHistoricalRounds(ctx *context.Context, rids []RoundID) []*RoundInfo return ris } +func StartMessageReceivers(ctx *context.Context) Stoppable { + // We assume receivers channel is set up elsewhere, but note that this + // would also be a reasonable place under assumption of 1 call to + // message receivers (would also make sense to .Close it instead of + // using quit channel, which somewhat simplifies for loop later. + receiverCh := ctx.GetNetwork().GetMessageReceiverCh() + for i := 0; i < ctx.GetNumReceivers(); i++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go MessageReceiver(ctx, messagesCh, quitCh) + } + + // Return multistoppable +} + +func MessageReceiver(ctx *context.Context, messagesCh chan ClientMessage, + quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case m := <- messagesCh: + ReceiveMessage(ctx, m) // defined elsewhere... + } + } +} + +func StartNodeKeyExchange(ctx *context.Context) { + keyCh := ctx.GetNetwork().GetNodeKeysCh() + for i := 0; i < ctx.GetNumNodeKeyExchangers(); i ++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go ExchangeNodeKeys(ctx, keyCh, quitCh) + } + + // return multistoppable +} + +func ExchangeNodeKeys(ctx *context.Context, keyCh chan node.ID, quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case nid := <- keyCh: + nodekey := RegisterNode(ctx, nid) // defined elsewhere... + ctx.GetStorage().SetNodeKey(nid, nodekey) + } + } +} + +func StartNodeRemover(ctx *context.Context) { + remCh := ctx.GetNetwork().GetNodeRemCh() + for i := 0; i < ctx.GetNumNodeRemovers(); i ++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go RemoveNode(ctx, remCh, quitCh) + } + + // return multistoppable +} + +func RemoveNode(ctx *context.Context, remCh chan node.ID, quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case nid := <- keyCh: + ctx.GetStorage().RemoveNodeKey(nid) + } + } +} -- GitLab