Skip to content
Snippets Groups Projects
Commit 930ef2e6 authored by Jono Wenger's avatar Jono Wenger
Browse files

Fix duplicate file transfer callback bug

parent dd2970d0
No related branches found
No related tags found
2 merge requests!510Release,!247Fix duplicate file transfer callback bug
...@@ -74,7 +74,7 @@ func (ct *callbackTracker) call(err error) { ...@@ -74,7 +74,7 @@ func (ct *callbackTracker) call(err error) {
if timeSinceLastCall > ct.period { if timeSinceLastCall > ct.period {
// If no callback occurred, then trigger the callback now // If no callback occurred, then trigger the callback now
go ct.cb(err) ct.cb(err)
ct.lastCall = netTime.Now() ct.lastCall = netTime.Now()
} else { } else {
// If a callback did occur, then schedule a new callback to occur at the // If a callback did occur, then schedule a new callback to occur at the
...@@ -89,7 +89,7 @@ func (ct *callbackTracker) call(err error) { ...@@ -89,7 +89,7 @@ func (ct *callbackTracker) call(err error) {
return return
case <-timer.C: case <-timer.C:
ct.mux.Lock() ct.mux.Lock()
go ct.cb(err) ct.cb(err)
ct.lastCall = netTime.Now() ct.lastCall = netTime.Now()
ct.scheduled = false ct.scheduled = false
ct.mux.Unlock() ct.mux.Unlock()
......
...@@ -358,6 +358,11 @@ func (m *manager) registerSentProgressCallback(st *store.SentTransfer, ...@@ -358,6 +358,11 @@ func (m *manager) registerSentProgressCallback(st *store.SentTransfer,
// Build part tracker from copy of part statuses vector // Build part tracker from copy of part statuses vector
tracker := &sentFilePartTracker{st.CopyPartStatusVector()} tracker := &sentFilePartTracker{st.CopyPartStatusVector()}
// If the callback data is the same as the last call, skip the call
if !st.CompareAndSwapCallbackFps(completed, arrived, total, err) {
return
}
// Call the progress callback // Call the progress callback
progressCB(completed, arrived, total, st, tracker, err) progressCB(completed, arrived, total, st, tracker, err)
} }
...@@ -518,6 +523,11 @@ func (m *manager) registerReceivedProgressCallback(rt *store.ReceivedTransfer, ...@@ -518,6 +523,11 @@ func (m *manager) registerReceivedProgressCallback(rt *store.ReceivedTransfer,
// Build part tracker from copy of part statuses vector // Build part tracker from copy of part statuses vector
tracker := &receivedFilePartTracker{rt.CopyPartStatusVector()} tracker := &receivedFilePartTracker{rt.CopyPartStatusVector()}
// If the callback data is the same as the last call, skip the call
if !rt.CompareAndSwapCallbackFps(completed, received, total, err) {
return
}
// Call the progress callback // Call the progress callback
progressCB(completed, received, total, rt, tracker, err) progressCB(completed, received, total, rt, tracker, err)
} }
......
...@@ -85,8 +85,6 @@ func Test_FileTransfer_Smoke(t *testing.T) { ...@@ -85,8 +85,6 @@ func Test_FileTransfer_Smoke(t *testing.T) {
cMixHandler := newMockCmixHandler() cMixHandler := newMockCmixHandler()
rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG) rngGen := fastRNG.NewStreamGenerator(1000, 10, csprng.NewSystemRNG)
params := DefaultParams() params := DefaultParams()
// params.MaxThroughput = math.MaxInt
// params.MaxThroughput = 0
// Set up the first client // Set up the first client
myID1 := id.NewIdFromString("myID1", id.User, t) myID1 := id.NewIdFromString("myID1", id.User, t)
......
...@@ -15,8 +15,8 @@ import ( ...@@ -15,8 +15,8 @@ import (
"testing" "testing"
) )
// Tests that no data is lost when marshaling and // Tests that no data is lost when marshaling and unmarshalling the Params
// unmarshaling the Params object. // object.
func TestParams_MarshalUnmarshal(t *testing.T) { func TestParams_MarshalUnmarshal(t *testing.T) {
// Construct a set of params // Construct a set of params
p := DefaultParams() p := DefaultParams()
...@@ -27,8 +27,6 @@ func TestParams_MarshalUnmarshal(t *testing.T) { ...@@ -27,8 +27,6 @@ func TestParams_MarshalUnmarshal(t *testing.T) {
t.Fatalf("Marshal error: %v", err) t.Fatalf("Marshal error: %v", err)
} }
t.Logf("%s", string(data))
// Unmarshal the params object // Unmarshal the params object
received := Params{} received := Params{}
err = json.Unmarshal(data, &received) err = json.Unmarshal(data, &received)
...@@ -42,11 +40,8 @@ func TestParams_MarshalUnmarshal(t *testing.T) { ...@@ -42,11 +40,8 @@ func TestParams_MarshalUnmarshal(t *testing.T) {
t.Fatalf("Marshal error: %v", err) t.Fatalf("Marshal error: %v", err)
} }
t.Logf("%s", string(data2)) // Check that they match (it is done this way to avoid false failures with
// the reflect.DeepEqual function and pointers)
// Check that they match (it is done this way to avoid
// false failures with the reflect.DeepEqual function and
// pointers)
if !bytes.Equal(data, data2) { if !bytes.Equal(data, data2) {
t.Fatalf("Data was lost in marshal/unmarshal.") t.Fatalf("Data was lost in marshal/unmarshal.")
} }
......
...@@ -85,6 +85,10 @@ type ReceivedTransfer struct { ...@@ -85,6 +85,10 @@ type ReceivedTransfer struct {
// Stores the received status for each file part in a bitstream format // Stores the received status for each file part in a bitstream format
partStatus *utility.StateVector partStatus *utility.StateVector
// Unique identifier of the last progress callback called (used to prevent
// callback calls with duplicate data)
lastCallbackFingerprint string
mux sync.RWMutex mux sync.RWMutex
kv *versioned.KV kv *versioned.KV
} }
...@@ -202,6 +206,37 @@ func (rt *ReceivedTransfer) CopyPartStatusVector() *utility.StateVector { ...@@ -202,6 +206,37 @@ func (rt *ReceivedTransfer) CopyPartStatusVector() *utility.StateVector {
return rt.partStatus.DeepCopy() return rt.partStatus.DeepCopy()
} }
// CompareAndSwapCallbackFps compares the fingerprint to the previous callback
// call's fingerprint. If they are different, the new one is stored, and it
// returns true. Returns fall if they are the same.
func (rt *ReceivedTransfer) CompareAndSwapCallbackFps(
completed bool, received, total uint16, err error) bool {
fp := generateReceivedFp(completed, received, total, err)
rt.mux.Lock()
defer rt.mux.Unlock()
if fp != rt.lastCallbackFingerprint {
rt.lastCallbackFingerprint = fp
return true
}
return false
}
// generateReceivedFp generates a fingerprint for a received progress callback.
func generateReceivedFp(completed bool, received, total uint16, err error) string {
errString := "<nil>"
if err != nil {
errString = err.Error()
}
return strconv.FormatBool(completed) +
strconv.FormatUint(uint64(received), 10) +
strconv.FormatUint(uint64(total), 10) +
errString
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Storage Functions // // Storage Functions //
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
......
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
ftCrypto "gitlab.com/elixxir/crypto/fileTransfer" ftCrypto "gitlab.com/elixxir/crypto/fileTransfer"
"gitlab.com/xx_network/primitives/id" "gitlab.com/xx_network/primitives/id"
"gitlab.com/xx_network/primitives/netTime" "gitlab.com/xx_network/primitives/netTime"
"strconv"
"sync" "sync"
) )
...@@ -83,6 +84,10 @@ type SentTransfer struct { ...@@ -83,6 +84,10 @@ type SentTransfer struct {
// Stores the status of each part in a bitstream format // Stores the status of each part in a bitstream format
partStatus *utility.StateVector partStatus *utility.StateVector
// Unique identifier of the last progress callback called (used to prevent
// callback calls with duplicate data)
lastCallbackFingerprint string
mux sync.RWMutex mux sync.RWMutex
kv *versioned.KV kv *versioned.KV
} }
...@@ -214,6 +219,36 @@ func (st *SentTransfer) CopyPartStatusVector() *utility.StateVector { ...@@ -214,6 +219,36 @@ func (st *SentTransfer) CopyPartStatusVector() *utility.StateVector {
return st.partStatus.DeepCopy() return st.partStatus.DeepCopy()
} }
// CompareAndSwapCallbackFps compares the fingerprint to the previous callback
// call's fingerprint. If they are different, the new one is stored, and it
// returns true. Returns fall if they are the same.
func (st *SentTransfer) CompareAndSwapCallbackFps(
completed bool, arrived, total uint16, err error) bool {
fp := generateSentFp(completed, arrived, total, err)
st.mux.Lock()
defer st.mux.Unlock()
if fp != st.lastCallbackFingerprint {
st.lastCallbackFingerprint = fp
return true
}
return false
}
// generateSentFp generates a fingerprint for a sent progress callback.
func generateSentFp(completed bool, arrived, total uint16, err error) string {
errString := "<nil>"
if err != nil {
errString = err.Error()
}
return strconv.FormatBool(completed) +
strconv.FormatUint(uint64(arrived), 10) +
strconv.FormatUint(uint64(total), 10) +
errString
}
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Storage Functions // // Storage Functions //
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment