Skip to content
Snippets Groups Projects
Commit 409a0a9a authored by Benjamin Wenger's avatar Benjamin Wenger
Browse files

fixed bwLoggign to be more accurate

parent cb4831e2
No related branches found
No related tags found
No related merge requests found
......@@ -168,9 +168,8 @@ func ReceiveStreamPostPhase(streamServer mixmessages.Node_StreamPostPhaseServer,
//queue the phase to be operated on if it is not queued yet
p.AttemptToQueue(instance.GetResourceQueue().GetPhaseQueue())
start, strmErr := StreamPostPhase(p, batchInfo.BatchSize, streamServer)
start, end, strmErr := StreamPostPhase(p, batchInfo.BatchSize, streamServer)
end := time.Now()
jww.INFO.Printf("\tbwLogging: Round %d, "+
"received phase: %s, "+
"from: %s, to: %s, "+
......
......@@ -85,7 +85,7 @@ func StreamTransmitPhase(roundID id.Round, serverInstance phase.GenericInstance,
}
}
}
end := time.Now()
measureFunc := currentPhase.Measure
if measureFunc != nil {
measureFunc(measure.TagTransmitLastSlot)
......@@ -102,8 +102,6 @@ func StreamTransmitPhase(roundID id.Round, serverInstance phase.GenericInstance,
jww.INFO.Printf("[%s] RID %d StreamTransmitPhase FOR \"%s\""+
" COMPLETE/SEND", name, roundID, rType)
end := time.Now()
jww.INFO.Printf("\tbwLogging: Round %d, "+
"transmitted phase: %s, "+
"from: %s, to: %s, "+
......@@ -131,11 +129,11 @@ func StreamTransmitPhase(roundID id.Round, serverInstance phase.GenericInstance,
// StreamPostPhase implements the server gRPC handler for posting a
// phase from another node
func StreamPostPhase(p phase.Phase, batchSize uint32,
stream mixmessages.Node_StreamPostPhaseServer) (time.Time, error) {
stream mixmessages.Node_StreamPostPhaseServer) (time.Time, time.Time, error) {
// Send a chunk for each slot received along with
// its index until an error is received
slot, err := stream.Recv()
start := time.Now()
var start, end time.Time
slotsReceived := uint32(0)
for ; err == nil; slot, err = stream.Recv() {
index := slot.Index
......@@ -144,13 +142,19 @@ func StreamPostPhase(p phase.Phase, batchSize uint32,
if phaseErr != nil {
err = errors.Errorf("Failed on phase input %v for slot %v: %+v",
index, slot, phaseErr)
return start, phaseErr
return start, end, phaseErr
}
chunk := services.NewChunk(index, index+1)
p.Send(chunk)
slotsReceived++
if slotsReceived == 1 {
start = time.Now()
} else if slotsReceived == batchSize {
end = time.Now()
}
}
// Set error in ack message if we didn't receive all slots
......@@ -170,10 +174,10 @@ func StreamPostPhase(p phase.Phase, batchSize uint32,
errClose := stream.SendAndClose(&ack)
if errClose != nil && ack.Error != "" {
return start, errors.WithMessage(errClose, ack.Error)
return start, end, errors.WithMessage(errClose, ack.Error)
} else if errClose == nil && ack.Error != "" {
return start, errors.New(ack.Error)
return start, end, errors.New(ack.Error)
} else {
return start, errClose
return start, end, errClose
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment