Skip to content
Snippets Groups Projects
Commit 07665ab6 authored by Josh Brooks's avatar Josh Brooks
Browse files

Move OnSend bucket increment to network/message/ package

parent bf8e025b
Branches
Tags
2 merge requests!117Release,!83Add EKV backed leaky bucket storage
...@@ -50,7 +50,6 @@ func (c *Client) SendUnsafe(m message.Send, param params.Unsafe) ([]id.Round, ...@@ -50,7 +50,6 @@ func (c *Client) SendUnsafe(m message.Send, param params.Unsafe) ([]id.Round,
func (c *Client) SendCMIX(msg format.Message, recipientID *id.ID, func (c *Client) SendCMIX(msg format.Message, recipientID *id.ID,
param params.CMIX) (id.Round, ephemeral.Id, error) { param params.CMIX) (id.Round, ephemeral.Id, error) {
jww.INFO.Printf("SendCMIX(%s)", string(msg.GetContents())) jww.INFO.Printf("SendCMIX(%s)", string(msg.GetContents()))
c.OnSend(1)
return c.network.SendCMIX(msg, recipientID, param) return c.network.SendCMIX(msg, recipientID, param)
} }
...@@ -59,7 +58,6 @@ func (c *Client) SendCMIX(msg format.Message, recipientID *id.ID, ...@@ -59,7 +58,6 @@ func (c *Client) SendCMIX(msg format.Message, recipientID *id.ID,
// round ID of the round the payload was sent or an error if it fails. // round ID of the round the payload was sent or an error if it fails.
func (c *Client) SendManyCMIX(messages []message.TargetedCmixMessage, func (c *Client) SendManyCMIX(messages []message.TargetedCmixMessage,
params params.CMIX) (id.Round, []ephemeral.Id, error) { params params.CMIX) (id.Round, []ephemeral.Id, error) {
c.OnSend(uint32(len(messages)))
return c.network.SendManyCMIX(messages, params) return c.network.SendManyCMIX(messages, params)
} }
...@@ -75,13 +73,3 @@ func (c *Client) NewCMIXMessage(contents []byte) (format.Message, error) { ...@@ -75,13 +73,3 @@ func (c *Client) NewCMIXMessage(contents []byte) (format.Message, error) {
msg.SetContents(contents) msg.SetContents(contents)
return msg, nil return msg, nil
} }
// OnSend performs a bucket addition on a call to Client.SendCMIX or
// Client.SendManyCMIX, updating the bucket for the amount of messages sent.
func (c *Client) OnSend(messages uint32) {
rateLimitingParam := c.storage.GetBucketParams().Get()
c.storage.GetBucket().AddWithExternalParams(messages,
rateLimitingParam.Capacity, rateLimitingParam.LeakedTokens,
rateLimitingParam.LeakDuration)
}
...@@ -187,6 +187,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, ...@@ -187,6 +187,7 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
elapsed, numRoundTries) elapsed, numRoundTries)
jww.INFO.Print(m) jww.INFO.Print(m)
events.Report(1, "MessageSend", "Metric", m) events.Report(1, "MessageSend", "Metric", m)
onSend(1, session)
return id.Round(bestRound.ID), ephID, nil return id.Round(bestRound.ID), ephID, nil
} else { } else {
jww.FATAL.Panicf("Gateway %s returned no error, but failed "+ jww.FATAL.Panicf("Gateway %s returned no error, but failed "+
...@@ -198,3 +199,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message, ...@@ -198,3 +199,13 @@ func sendCmixHelper(sender *gateway.Sender, msg format.Message,
return 0, ephemeral.Id{}, errors.New("failed to send the message, " + return 0, ephemeral.Id{}, errors.New("failed to send the message, " +
"unknown error") "unknown error")
} }
// OnSend performs a bucket addition on a call to Manager.SendCMIX or
// Manager.SendManyCMIX, updating the bucket for the amount of messages sent.
func onSend(messages uint32, session *storage.Session) {
rateLimitingParam := session.GetBucketParams().Get()
session.GetBucket().AddWithExternalParams(messages,
rateLimitingParam.Capacity, rateLimitingParam.LeakedTokens,
rateLimitingParam.LeakDuration)
}
...@@ -203,6 +203,7 @@ func sendManyCmixHelper(sender *gateway.Sender, ...@@ -203,6 +203,7 @@ func sendManyCmixHelper(sender *gateway.Sender,
"in round %d", ephemeralIDsString, recipientString, bestRound.ID) "in round %d", ephemeralIDsString, recipientString, bestRound.ID)
jww.INFO.Print(m) jww.INFO.Print(m)
events.Report(1, "MessageSendMany", "Metric", m) events.Report(1, "MessageSendMany", "Metric", m)
onSend(uint32(len(msgs)), session)
return id.Round(bestRound.ID), ephemeralIDs, nil return id.Round(bestRound.ID), ephemeralIDs, nil
} else { } else {
jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+ jww.FATAL.Panicf("Gateway %s returned no error, but failed to "+
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment