Skip to content

Commit 0d7ea60

Browse files
email-exporter: Add an LRU cache of seen hashed email addresses (#8219)
1 parent 23608e1 commit 0d7ea60

File tree

7 files changed

+142
-17
lines changed

7 files changed

+142
-17
lines changed

cmd/email-exporter/main.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ type Config struct {
4949
// PardotBaseURL is the base URL for the Pardot API. (e.g.,
5050
// "https://pi.pardot.com")
5151
PardotBaseURL string `validate:"required"`
52+
53+
// EmailCacheSize controls how many hashed email addresses are retained
54+
// in memory to prevent duplicates from being sent to the Pardot API.
55+
// Each entry consumes ~120 bytes, so 100,000 entries uses around 12 MB
56+
// of memory. If left unset, no caching is performed.
57+
EmailCacheSize int `validate:"omitempty,min=1"`
5258
}
5359
Syslog cmd.SyslogConfig
5460
OpenTelemetry cmd.OpenTelemetryConfig
@@ -87,13 +93,19 @@ func main() {
8793
clientSecret, err := c.EmailExporter.ClientSecret.Pass()
8894
cmd.FailOnError(err, "Loading clientSecret")
8995

96+
var cache *email.EmailCache
97+
if c.EmailExporter.EmailCacheSize > 0 {
98+
cache = email.NewHashedEmailCache(c.EmailExporter.EmailCacheSize, scope)
99+
}
100+
90101
pardotClient, err := email.NewPardotClientImpl(
91102
clk,
92103
c.EmailExporter.PardotBusinessUnit,
93104
clientId,
94105
clientSecret,
95106
c.EmailExporter.SalesforceBaseURL,
96107
c.EmailExporter.PardotBaseURL,
108+
cache,
97109
)
98110
cmd.FailOnError(err, "Creating Pardot API client")
99111
exporterServer := email.NewExporterImpl(pardotClient, c.EmailExporter.PerDayLimit, c.EmailExporter.MaxConcurrentRequests, scope, logger)

email/cache.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package email
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/hex"
6+
"sync"
7+
8+
"github.com/golang/groupcache/lru"
9+
"github.com/prometheus/client_golang/prometheus"
10+
)
11+
12+
type EmailCache struct {
13+
sync.Mutex
14+
cache *lru.Cache
15+
requests *prometheus.CounterVec
16+
}
17+
18+
func NewHashedEmailCache(maxEntries int, stats prometheus.Registerer) *EmailCache {
19+
requests := prometheus.NewCounterVec(prometheus.CounterOpts{
20+
Name: "email_cache_requests",
21+
}, []string{"status"})
22+
stats.MustRegister(requests)
23+
24+
return &EmailCache{
25+
cache: lru.New(maxEntries),
26+
requests: requests,
27+
}
28+
}
29+
30+
func hashEmail(email string) string {
31+
sum := sha256.Sum256([]byte(email))
32+
return hex.EncodeToString(sum[:])
33+
}
34+
35+
func (c *EmailCache) Seen(email string) bool {
36+
if c == nil {
37+
// If the cache is nil we assume it was not configured.
38+
return false
39+
}
40+
41+
hash := hashEmail(email)
42+
43+
c.Lock()
44+
defer c.Unlock()
45+
46+
_, ok := c.cache.Get(hash)
47+
if !ok {
48+
c.requests.WithLabelValues("miss").Inc()
49+
return false
50+
}
51+
52+
c.requests.WithLabelValues("hit").Inc()
53+
return true
54+
}
55+
56+
func (c *EmailCache) Store(email string) {
57+
if c == nil {
58+
// If the cache is nil we assume it was not configured.
59+
return
60+
}
61+
62+
hash := hashEmail(email)
63+
64+
c.Lock()
65+
defer c.Unlock()
66+
67+
c.cache.Add(hash, nil)
68+
}

email/exporter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import (
1717

1818
// contactsQueueCap limits the queue size to prevent unbounded growth. This
1919
// value is adjustable as needed. Each RFC 5321 email address, encoded in UTF-8,
20-
// is at most 320 bytes. Storing 10,000 emails requires ~3.44 MB of memory.
21-
const contactsQueueCap = 10000
20+
// is at most 320 bytes. Storing 100,000 emails requires ~34.4 MB of memory.
21+
const contactsQueueCap = 100000
2222

2323
var ErrQueueFull = errors.New("email-exporter queue is full")
2424

email/exporter_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,27 @@ var ctx = context.Background()
2222
type mockPardotClientImpl struct {
2323
sync.Mutex
2424
CreatedContacts []string
25+
cache *EmailCache
2526
}
2627

2728
// newMockPardotClientImpl returns a MockPardotClientImpl, implementing the
2829
// PardotClient interface. Both refer to the same instance, with the interface
2930
// for mock interaction and the struct for state inspection and modification.
30-
func newMockPardotClientImpl() (PardotClient, *mockPardotClientImpl) {
31+
func newMockPardotClientImpl(cache *EmailCache) (PardotClient, *mockPardotClientImpl) {
3132
mockImpl := &mockPardotClientImpl{
3233
CreatedContacts: []string{},
34+
cache: cache,
3335
}
3436
return mockImpl, mockImpl
3537
}
3638

3739
// SendContact adds an email to CreatedContacts.
3840
func (m *mockPardotClientImpl) SendContact(email string) error {
3941
m.Lock()
40-
defer m.Unlock()
41-
4242
m.CreatedContacts = append(m.CreatedContacts, email)
43+
m.Unlock()
44+
45+
m.cache.Store(email)
4346
return nil
4447
}
4548

@@ -56,7 +59,7 @@ func (m *mockPardotClientImpl) getCreatedContacts() []string {
5659
// ExporterImpl queue and cleanup() to drain and shutdown. If start() is called,
5760
// cleanup() must be called.
5861
func setup() (*ExporterImpl, *mockPardotClientImpl, func(), func()) {
59-
mockClient, clientImpl := newMockPardotClientImpl()
62+
mockClient, clientImpl := newMockPardotClientImpl(nil)
6063
exporter := NewExporterImpl(mockClient, 1000000, 5, metrics.NoopRegisterer, blog.NewMock())
6164
daemonCtx, cancel := context.WithCancel(context.Background())
6265
return exporter, clientImpl,

email/pardot.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ type PardotClientImpl struct {
6363
contactsURL string
6464
tokenURL string
6565
token *oAuthToken
66+
emailCache *EmailCache
6667
clk clock.Clock
6768
}
6869

6970
var _ PardotClient = &PardotClientImpl{}
7071

7172
// NewPardotClientImpl creates a new PardotClientImpl.
72-
func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClientImpl, error) {
73+
func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string, cache *EmailCache) (*PardotClientImpl, error) {
7374
contactsURL, err := url.JoinPath(pardotBaseURL, contactsPath)
7475
if err != nil {
7576
return nil, fmt.Errorf("failed to join contacts path: %w", err)
@@ -85,9 +86,9 @@ func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret,
8586
clientSecret: clientSecret,
8687
contactsURL: contactsURL,
8788
tokenURL: tokenURL,
88-
89-
token: &oAuthToken{},
90-
clk: clk,
89+
token: &oAuthToken{},
90+
emailCache: cache,
91+
clk: clk,
9192
}, nil
9293
}
9394

@@ -144,6 +145,15 @@ func redactEmail(body []byte, email string) string {
144145
// SendContact submits an email to the Pardot Contacts endpoint, retrying up
145146
// to 3 times with exponential backoff.
146147
func (pc *PardotClientImpl) SendContact(email string) error {
148+
if pc.emailCache.Seen(email) {
149+
// Another goroutine has already sent this email address.
150+
return nil
151+
}
152+
// There is a possible race here where two goroutines could enqueue and send
153+
// the same email address between this check and the actual HTTP request.
154+
// However, at an average rate of ~1 email every 2 seconds, this is unlikely
155+
// to happen in practice.
156+
147157
var err error
148158
for attempt := range maxAttempts {
149159
time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase))
@@ -183,6 +193,7 @@ func (pc *PardotClientImpl) SendContact(email string) error {
183193

184194
defer resp.Body.Close()
185195
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
196+
pc.emailCache.Store(email)
186197
return nil
187198
}
188199

email/pardot_test.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ import (
66
"io"
77
"net/http"
88
"net/http/httptest"
9+
"sync/atomic"
910
"testing"
1011
"time"
1112

1213
"github.com/jmhodges/clock"
14+
"github.com/letsencrypt/boulder/metrics"
1315
"github.com/letsencrypt/boulder/test"
16+
"github.com/prometheus/client_golang/prometheus"
1417
)
1518

1619
func defaultTokenHandler(w http.ResponseWriter, r *http.Request) {
@@ -44,7 +47,7 @@ func TestSendContactSuccess(t *testing.T) {
4447
defer contactSrv.Close()
4548

4649
clk := clock.NewFake()
47-
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
50+
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
4851
test.AssertNotError(t, err, "failed to create client")
4952

5053
err = client.SendContact("[email protected]")
@@ -70,7 +73,7 @@ func TestSendContactUpdateTokenFails(t *testing.T) {
7073
defer contactSrv.Close()
7174

7275
clk := clock.NewFake()
73-
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
76+
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
7477
test.AssertNotError(t, err, "Failed to create client")
7578

7679
err = client.SendContact("[email protected]")
@@ -94,7 +97,7 @@ func TestSendContact4xx(t *testing.T) {
9497
defer contactSrv.Close()
9598

9699
clk := clock.NewFake()
97-
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
100+
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
98101
test.AssertNotError(t, err, "Failed to create client")
99102

100103
err = client.SendContact("[email protected]")
@@ -142,7 +145,7 @@ func TestSendContactTokenExpiry(t *testing.T) {
142145
defer contactSrv.Close()
143146

144147
clk := clock.NewFake()
145-
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
148+
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
146149
test.AssertNotError(t, err, "Failed to create client")
147150

148151
// First call uses the initial token ("old_token").
@@ -172,7 +175,7 @@ func TestSendContactServerErrorsAfterMaxAttempts(t *testing.T) {
172175
contactSrv := httptest.NewServer(http.HandlerFunc(contactHandler))
173176
defer contactSrv.Close()
174177

175-
client, _ := NewPardotClientImpl(clock.NewFake(), "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
178+
client, _ := NewPardotClientImpl(clock.NewFake(), "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
176179

177180
err := client.SendContact("[email protected]")
178181
test.AssertError(t, err, "Should fail after retrying all attempts")
@@ -200,11 +203,38 @@ func TestSendContactRedactsEmail(t *testing.T) {
200203
defer contactSrv.Close()
201204

202205
clk := clock.NewFake()
203-
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL)
206+
client, err := NewPardotClientImpl(clk, "biz-unit", "cid", "csec", tokenSrv.URL, contactSrv.URL, nil)
204207
test.AssertNotError(t, err, "failed to create client")
205208

206209
err = client.SendContact(emailToTest)
207210
test.AssertError(t, err, "SendContact should fail")
208211
test.AssertNotContains(t, err.Error(), emailToTest)
209212
test.AssertContains(t, err.Error(), "[REDACTED]")
210213
}
214+
215+
func TestSendContactDeduplication(t *testing.T) {
216+
t.Parallel()
217+
218+
tokenSrv := httptest.NewServer(http.HandlerFunc(defaultTokenHandler))
219+
defer tokenSrv.Close()
220+
221+
var contactHits int32
222+
contactSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
223+
atomic.AddInt32(&contactHits, 1)
224+
w.WriteHeader(http.StatusOK)
225+
}))
226+
defer contactSrv.Close()
227+
228+
cache := NewHashedEmailCache(1000, metrics.NoopRegisterer)
229+
client, _ := NewPardotClientImpl(clock.New(), "biz", "cid", "csec", tokenSrv.URL, contactSrv.URL, cache)
230+
231+
err := client.SendContact("[email protected]")
232+
test.AssertNotError(t, err, "SendContact should succeed on first call")
233+
test.AssertMetricWithLabelsEquals(t, client.emailCache.requests, prometheus.Labels{"status": "miss"}, 1)
234+
235+
err = client.SendContact("[email protected]")
236+
test.AssertNotError(t, err, "SendContact should succeed on second call")
237+
238+
test.AssertEquals(t, int32(1), atomic.LoadInt32(&contactHits))
239+
test.AssertMetricWithLabelsEquals(t, client.emailCache.requests, prometheus.Labels{"status": "hit"}, 1)
240+
}

test/config-next/email-exporter.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"passwordFile": "test/secrets/salesforce_client_secret"
3333
},
3434
"salesforceBaseURL": "http://localhost:9601",
35-
"pardotBaseURL": "http://localhost:9602"
35+
"pardotBaseURL": "http://localhost:9602",
36+
"emailCacheSize": 100000
3637
},
3738
"syslog": {
3839
"stdoutlevel": 6,

0 commit comments

Comments
 (0)