Skip to content
Snippets Groups Projects
Commit 8a3eff24 authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

finished graph and fully tested

parent d9cb392f
Branches
Tags
No related merge requests found
package globals
const MinSlotSize = uint32(2)
const MinSlotSize = uint32(4)
const MaxThreads = uint32(12)
......@@ -2,4 +2,4 @@ package graphs
import "gitlab.com/elixxir/server/services"
type Initializer func(batchSize uint32, errorHandler services.ErrorCallback) *services.Graph
type Initializer func(errorHandler services.ErrorCallback) *services.Graph
package graphs
package precomputation
import (
"gitlab.com/elixxir/crypto/cryptops"
......@@ -7,7 +7,12 @@ import (
"gitlab.com/elixxir/server/services"
)
type PrecompDispatchStream struct {
//This file implements the Graph for the Precomputation Decrypt phase
// Decrypt phase transforms first unpermuted internode keys
// and partial cypher texts into the data that the permute phase needs
//Stream holding data containing keys and inputs used by decrypt
type DecryptStream struct {
Grp *cyclic.Group
PublicCypherKey *cyclic.Int
......@@ -23,11 +28,11 @@ type PrecompDispatchStream struct {
CypherAD *cyclic.IntBuffer
}
func (s *PrecompDispatchStream) GetStreamName() string {
return "PrecompDispatchStream"
func (s *DecryptStream) GetName() string {
return "PrecompDecryptStream"
}
func (s *PrecompDispatchStream) Link(batchSize uint32, source ...interface{}) {
func (s *DecryptStream) Link(batchSize uint32, source ...interface{}) {
round := source[0].(*node.RoundBuffer)
s.Grp = round.Grp
......@@ -44,9 +49,11 @@ func (s *PrecompDispatchStream) Link(batchSize uint32, source ...interface{}) {
s.CypherAD = s.Grp.NewIntBuffer(batchSize, s.Grp.NewInt(1))
}
//Sole module in Precomputation Decrypt implementing cryptops.Elgamal
var DecryptElgamal = services.Module{
// Multiplies in own Encrypted Keys and Partial Cypher Texts
Adapt: func(streamInput services.Stream, cryptop cryptops.Cryptop, chunk services.Chunk) error {
s, ok := streamInput.(*PrecompDispatchStream)
s, ok := streamInput.(*DecryptStream)
elgamal, ok2 := cryptop.(cryptops.ElGamalPrototype)
if !ok || !ok2 {
......@@ -68,8 +75,9 @@ var DecryptElgamal = services.Module{
Name: "DecryptElgamal",
}
func InitPrecompDispatchGraph(batchSize uint32, errorHandler services.ErrorCallback) *services.Graph {
g := services.NewGraph("PrecomputationDecrypt", errorHandler, &PrecompDispatchStream{})
//Called to initialize the graph. Conforms to graphs.Initialize function type
func InitDecryptGraph(errorHandler services.ErrorCallback) *services.Graph {
g := services.NewGraph("PrecompDecrypt", errorHandler, &DecryptStream{})
g.First(&DecryptElgamal)
g.Last(&DecryptElgamal)
......
package graphs
package precomputation
import (
"fmt"
"gitlab.com/elixxir/crypto/cryptops"
"gitlab.com/elixxir/crypto/cyclic"
"gitlab.com/elixxir/crypto/large"
"gitlab.com/elixxir/server/graphs"
"gitlab.com/elixxir/server/node"
"gitlab.com/elixxir/server/services"
"testing"
)
//Test that DecryptStream.GetName() returns the correct name
func TestDecryptStream_GetName(t *testing.T) {
expected := "PrecompDecryptStream"
ds := DecryptStream{}
if ds.GetName() != expected {
t.Errorf("DecryptStream.GetName(), Expected %s, Recieved %s", expected, ds.GetName())
}
}
//Test that DecryptStream.Link() Links correctly
func TestDecryptStream_Link(t *testing.T) {
primeString := "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1" +
"29024E088A67CC74020BBEA63B139B22514A08798E3404DD" +
"EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245" +
"E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED" +
"EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D" +
"C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F" +
"83655D23DCA3AD961C62F356208552BB9ED529077096966D" +
"670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B" +
"E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9" +
"DE2BCBF6955817183995497CEA956AE515D2261898FA0510" +
"15728E5A8AACAA68FFFFFFFFFFFFFFFF"
grp := cyclic.NewGroup(large.NewIntFromString(primeString, 16), large.NewInt(2), large.NewInt(1283))
ds := DecryptStream{}
batchSize := uint32(100)
round := node.NewRound(grp, 1, batchSize, batchSize)
ds.Link(batchSize, round)
checkStreamIntBuffer(grp, ds.R, round.R, "R", t)
checkStreamIntBuffer(grp, ds.U, round.U, "U", t)
checkStreamIntBuffer(grp, ds.R, round.R, "Y_R", t)
checkStreamIntBuffer(grp, ds.U, round.U, "Y_U", t)
checkIntBuffer(ds.KeysMsg, batchSize, "KeysMsg", grp.NewInt(1), t)
checkIntBuffer(ds.CypherMsg, batchSize, "CypherMsg", grp.NewInt(1), t)
checkIntBuffer(ds.KeysAD, batchSize, "KeysAD", grp.NewInt(1), t)
checkIntBuffer(ds.CypherAD, batchSize, "CypherAD", grp.NewInt(1), t)
}
func TestDecryptGraph(t *testing.T) {
primeString := "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD1" +
"29024E088A67CC74020BBEA63B139B22514A08798E3404DD" +
"EF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245" +
"E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED" +
"EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3D" +
"C2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F" +
"83655D23DCA3AD961C62F356208552BB9ED529077096966D" +
"670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B" +
"E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9" +
"DE2BCBF6955817183995497CEA956AE515D2261898FA0510" +
"15728E5A8AACAA68FFFFFFFFFFFFFFFF"
grp := cyclic.NewGroup(large.NewIntFromString(primeString, 16), large.NewInt(2), large.NewInt(1283))
batchSize := uint32(100)
expectedName := "PrecompDecrypt"
//Show that the Inti function meets the function type
var graphInit graphs.Initializer
graphInit = InitDecryptGraph
//Initialize graph
g := graphInit(func(err error) { return })
if g.GetName() != expectedName {
t.Errorf("PrecompDecrypt has incorrect name Expected %s, Recieved %s", expectedName, g.GetName())
}
//Build the graph
g.Build(batchSize)
//Build the round
round := node.NewRound(grp, 1, g.GetBatchSize(), g.GetExpandedBatchSize())
//Link the graph to the round. building the stream object
g.Link(round)
stream := g.GetStream().(*DecryptStream)
//fill the fields of the stream object for testing
grp.Random(stream.PublicCypherKey)
for i := uint32(0); i < g.GetExpandedBatchSize(); i++ {
grp.Random(stream.R.Get(i))
grp.Random(stream.U.Get(i))
grp.Random(stream.Y_R.Get(i))
grp.Random(stream.Y_U.Get(i))
}
//Build i/o used for testing
KeysMsgExpected := grp.NewIntBuffer(g.GetExpandedBatchSize(), grp.NewInt(1))
CypherMsgExpected := grp.NewIntBuffer(g.GetExpandedBatchSize(), grp.NewInt(1))
KeysADExpected := grp.NewIntBuffer(g.GetExpandedBatchSize(), grp.NewInt(1))
CypherADExpected := grp.NewIntBuffer(g.GetExpandedBatchSize(), grp.NewInt(1))
//Run the graph
g.Run()
//Send inputs into the graph
go func(g *services.Graph) {
for i := uint32(0); i < g.GetExpandedBatchSize(); i++ {
g.Send(services.NewChunk(i, i+1))
}
}(g)
//Get the output
s := g.GetStream().(*DecryptStream)
for chunk := range g.ChunkDoneChannel() {
for i := chunk.Begin(); i < chunk.End(); i++ {
// Compute expected result for this slot
cryptops.ElGamal(s.Grp, s.R.Get(i), s.Y_R.Get(i), s.PublicCypherKey, KeysMsgExpected.Get(i), CypherMsgExpected.Get(i))
//Execute elgamal on the keys for the Associated Data
cryptops.ElGamal(s.Grp, s.U.Get(i), s.Y_U.Get(i), s.PublicCypherKey, KeysADExpected.Get(i), CypherADExpected.Get(i))
if KeysMsgExpected.Get(i).Cmp(s.KeysMsg.Get(i)) != 0 {
t.Error(fmt.Sprintf("PrecompDecrypt: Message Keys not equal on slot %v", i))
}
if CypherMsgExpected.Get(i).Cmp(s.CypherMsg.Get(i)) != 0 {
t.Error(fmt.Sprintf("PrecompDecrypt: Message Keys Cypher not equal on slot %v", i))
}
if KeysADExpected.Get(i).Cmp(s.KeysAD.Get(i)) != 0 {
t.Error(fmt.Sprintf("PrecompDecrypt: AD Keys not equal on slot %v", i))
}
if CypherADExpected.Get(i).Cmp(s.CypherAD.Get(i)) != 0 {
t.Error(fmt.Sprintf("PrecompDecrypt: AD Keys Cypher not equal on slot %v", i))
}
}
}
}
func checkStreamIntBuffer(grp *cyclic.Group, ib, sourceib *cyclic.IntBuffer, source string, t *testing.T) {
if ib.Len() != sourceib.Len() {
t.Errorf("preomp.DecryptStream.Link: Length of intBuffer %s not correct, "+
"Expected %v, Recieved: %v", source, sourceib.Len(), ib.Len())
}
numBad := 0
for i := 0; i < sourceib.Len(); i++ {
grp.SetUint64(sourceib.Get(uint32(i)), uint64(i))
ci := ib.Get(uint32(i))
if ci.Cmp(sourceib.Get(uint32(i))) != 0 {
numBad++
}
}
if numBad != 0 {
t.Errorf("preomp.DecryptStream.Link: Ints in %v/%v intBuffer %s intilized incorrectly",
numBad, sourceib.Len(), source)
}
}
func checkIntBuffer(ib *cyclic.IntBuffer, expandedBatchSize uint32, source string, defaultInt *cyclic.Int, t *testing.T) {
if ib.Len() != int(expandedBatchSize) {
t.Errorf("New RoundBuffer: Length of intBuffer %s not correct, "+
"Expected %v, Recieved: %v", source, expandedBatchSize, ib.Len())
}
numBad := 0
for i := uint32(0); i < expandedBatchSize; i++ {
ci := ib.Get(i)
if ci.Cmp(defaultInt) != 0 {
numBad++
}
}
if numBad != 0 {
t.Errorf("New RoundBuffer: Ints in %v/%v intBuffer %s intilized incorrectly",
numBad, expandedBatchSize, source)
}
}
......@@ -98,7 +98,7 @@ func checkIntBuffer(ib *cyclic.IntBuffer, expandedBatchSize uint32, source strin
}
numBad := 0
for i := 0; i < int(expandedBatchSize); i++ {
for i := uint32(0); i < expandedBatchSize; i++ {
ci := ib.Get(i)
if ci.Cmp(defaultInt) != 0 {
numBad++
......
......@@ -48,7 +48,7 @@ type Stream1 struct {
I []int
}
func (s *Stream1) GetStreamName() string {
func (s *Stream1) GetName() string {
return "Stream1"
}
......@@ -169,7 +169,7 @@ func TestGraph(t *testing.T) {
g.Build(batchSize)
roundSize := uint32(math.Ceil(1.2 * float64(g.Cap())))
roundSize := uint32(math.Ceil(1.2 * float64(g.GetExpandedBatchSize())))
roundBuf := RoundBuffer{}
roundBuf.Build(roundSize)
......@@ -179,7 +179,7 @@ func TestGraph(t *testing.T) {
go func(g *Graph) {
for i := uint32(0); i < g.Cap(); i++ {
for i := uint32(0); i < g.GetExpandedBatchSize(); i++ {
g.Send(NewChunk(i, i+1))
}
}(g)
......@@ -190,8 +190,6 @@ func TestGraph(t *testing.T) {
stream := g.GetStream().(*Stream1)
// This is probably the problem - we're ranging over a channel that
// doesn't get closed properly. So the range won't finish.
for chunk := range g.ChunkDoneChannel() {
for i := chunk.Begin(); i < chunk.End(); i++ {
// Compute expected result for this slot
......@@ -210,11 +208,6 @@ func TestGraph(t *testing.T) {
}
}
}
// Did he mean to have encCh and endCh? encCh is probably just a typo
// Also, is this the send that panics?
// There's a send on a closed channel that panics every time main is
// run, so we need to figure out if it's because of something main did,
// or if it's a problem in the dispatcher.
encCh <- true
}(g, endCh)
......
......@@ -52,7 +52,7 @@ func NewGraph(name string, callback ErrorCallback, stream Stream) *Graph {
}
// This is too long of a function
func (g *Graph) Build(batchSize uint32) uint32 {
func (g *Graph) Build(batchSize uint32) {
//Check if graph has modules
if len(g.modules) == 0 {
......@@ -84,8 +84,12 @@ func (g *Graph) Build(batchSize uint32) uint32 {
}
} else {
if m.ChunkSize < globals.MinSlotSize {
panic(fmt.Sprintf("ChunkSize (%v) cannot be smaller than the minimum slot range (%v), "+
"Module: %s", m.ChunkSize, globals.MinSlotSize, m.Name))
/*panic(fmt.Sprintf("ChunkSize (%v) cannot be smaller than the minimum slot range (%v), "+
"Module: %s", m.ChunkSize, globals.MinSlotSize, m.Name))*/
m.ChunkSize = globals.MinSlotSize
if m.AssignmentSize < globals.MinSlotSize {
m.AssignmentSize = globals.MinSlotSize
}
}
if m.AssignmentSize%m.ChunkSize != 0 {
......@@ -167,8 +171,6 @@ func (g *Graph) Build(batchSize uint32) uint32 {
g.outputChannel = g.outputModule.input
delete(g.modules, g.outputModule.id)
return lcm
}
func (g *Graph) Run() {
......@@ -253,11 +255,11 @@ func (g *Graph) ChunkDoneChannel() OutputNotify {
return g.outputChannel
}
func (g *Graph) Cap() uint32 {
func (g *Graph) GetExpandedBatchSize() uint32 {
return g.expandBatchSize
}
func (g *Graph) Len() uint32 {
func (g *Graph) GetBatchSize() uint32 {
return g.batchSize
}
......
package services
type Stream interface {
GetStreamName() string
GetName() string
Link(BatchSize uint32, source ...interface{})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment