diff --git a/io/threads.go b/io/threads.go index d9488f1b6d8c208ae0526ca24d9356a653553173..6713e3a86eb0a73f2416f8e3cb8211e2acd8be2d 100644 --- a/io/threads.go +++ b/io/threads.go @@ -143,4 +143,78 @@ func processHistoricalRounds(ctx *context.Context, rids []RoundID) []*RoundInfo return ris } +func StartMessageReceivers(ctx *context.Context) Stoppable { + // We assume receivers channel is set up elsewhere, but note that this + // would also be a reasonable place under assumption of 1 call to + // message receivers (would also make sense to .Close it instead of + // using quit channel, which somewhat simplifies for loop later. + receiverCh := ctx.GetNetwork().GetMessageReceiverCh() + for i := 0; i < ctx.GetNumReceivers(); i++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go MessageReceiver(ctx, messagesCh, quitCh) + } + + // Return multistoppable +} + +func MessageReceiver(ctx *context.Context, messagesCh chan ClientMessage, + quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case m := <- messagesCh: + ReceiveMessage(ctx, m) // defined elsewhere... + } + } +} + +func StartNodeKeyExchange(ctx *context.Context) { + keyCh := ctx.GetNetwork().GetNodeKeysCh() + for i := 0; i < ctx.GetNumNodeKeyExchangers(); i ++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go ExchangeNodeKeys(ctx, keyCh, quitCh) + } + + // return multistoppable +} + +func ExchangeNodeKeys(ctx *context.Context, keyCh chan node.ID, quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case nid := <- keyCh: + nodekey := RegisterNode(ctx, nid) // defined elsewhere... + ctx.GetStorage().SetNodeKey(nid, nodekey) + } + } +} + +func StartNodeRemover(ctx *context.Context) { + remCh := ctx.GetNetwork().GetNodeRemCh() + for i := 0; i < ctx.GetNumNodeRemovers(); i ++ { + // quitCh created for each thread, add to multistop + quitCh := make(chan bool) + go RemoveNode(ctx, remCh, quitCh) + } + + // return multistoppable +} + +func RemoveNode(ctx *context.Context, remCh chan node.ID, quitCh chan bool) { + done := false + for !done { + select { + case <-quitCh: + done = true + case nid := <- keyCh: + ctx.GetStorage().RemoveNodeKey(nid) + } + } +}