Skip to content

Commit 99d0bcf

Browse files
authored
1. Adds validation for action in sendOdpEvent (#370)
2. Allows support for different fs_user_id keys in identifiers
1 parent 03876a5 commit 99d0bcf

File tree

3 files changed

+129
-27
lines changed

3 files changed

+129
-27
lines changed

pkg/odp/event/event_manager.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24+
"strings"
2425
"sync"
2526
"time"
2627

@@ -175,6 +176,11 @@ func (bm *BatchEventManager) ProcessEvent(apiKey, apiHost string, odpEvent Event
175176
return errors.New(utils.OdpNotIntegrated)
176177
}
177178

179+
if odpEvent.Action == "" {
180+
bm.logger.Error(utils.OdpInvalidAction, errors.New("invalid event action"))
181+
return errors.New(utils.OdpInvalidAction)
182+
}
183+
178184
if !utils.IsValidOdpData(odpEvent.Data) {
179185
bm.logger.Error(utils.OdpInvalidData, errors.New("invalid event data"))
180186
return errors.New(utils.OdpInvalidData)
@@ -187,6 +193,7 @@ func (bm *BatchEventManager) ProcessEvent(apiKey, apiHost string, odpEvent Event
187193
}
188194

189195
bm.addCommonData(&odpEvent)
196+
bm.convertIdentifiers(&odpEvent)
190197
bm.eventQueue.Add(odpEvent)
191198

192199
if bm.eventQueue.Size() < bm.batchSize {
@@ -323,3 +330,19 @@ func (bm *BatchEventManager) addCommonData(odpEvent *Event) {
323330
}
324331
odpEvent.Data = commonData
325332
}
333+
334+
// Convert incorrect case/separator of identifier key `fs_user_id`
335+
// (ie. `fs-user-id`, `FS_USER_ID`).
336+
func (bm *BatchEventManager) convertIdentifiers(odpEvent *Event) {
337+
if odpEvent.Identifiers[utils.OdpFSUserIDKey] != "" {
338+
return
339+
}
340+
keysToCheck := map[string]bool{"fs-user-id": true, utils.OdpFSUserIDKey: true}
341+
for k, v := range odpEvent.Identifiers {
342+
if keysToCheck[strings.ToLower(k)] {
343+
odpEvent.Identifiers[utils.OdpFSUserIDKey] = v
344+
delete(odpEvent.Identifiers, k)
345+
break
346+
}
347+
}
348+
}

pkg/odp/event/event_manager_test.go

Lines changed: 103 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (e *EventManagerTestSuite) TestTickerWhenODPConfigIsUpdated() {
127127
em := NewBatchEventManager(WithFlushInterval(flushInterval),
128128
WithAPIManager(e.eventAPIManager))
129129
e.eventAPIManager.wg.Add(1)
130-
em.ProcessEvent("a", "b", Event{})
130+
em.ProcessEvent("a", "b", Event{Action: "123"})
131131
eg := newExecutionContext()
132132
odpConfig := config.NewConfig("a", "b", nil)
133133
eg.Go(func(ctx context.Context) {
@@ -142,15 +142,15 @@ func (e *EventManagerTestSuite) TestTickerWhenODPConfigIsUpdated() {
142142
// Check events fired with updated config using ticker
143143
e.eventAPIManager.wg.Add(1)
144144
odpConfig.Update("c", "d", nil)
145-
em.ProcessEvent("a", "b", Event{})
145+
em.ProcessEvent("a", "b", Event{Action: "123"})
146146
e.eventAPIManager.wg.Wait()
147147
e.Equal("c", e.eventAPIManager.apiKey)
148148
e.Equal("d", e.eventAPIManager.apiHost)
149149
}
150150

151151
func (e *EventManagerTestSuite) TestEventsDispatchedWhenContextIsTerminated() {
152152
eg := newExecutionContext()
153-
e.eventManager.eventQueue.Add(Event{})
153+
e.eventManager.eventQueue.Add(Event{Action: "123"})
154154
e.eventAPIManager.wg.Add(1)
155155
e.Equal(1, e.eventManager.eventQueue.Size())
156156
eg.Go(func(ctx context.Context) {
@@ -165,7 +165,7 @@ func (e *EventManagerTestSuite) TestEventsDispatchedWhenContextIsTerminated() {
165165
func (e *EventManagerTestSuite) TestEventsDispatchedWhenFlushIntervalReached() {
166166
eg := newExecutionContext()
167167
e.eventManager.flushInterval = 50 * time.Millisecond
168-
e.eventManager.eventQueue.Add(Event{})
168+
e.eventManager.eventQueue.Add(Event{Action: "123"})
169169
e.eventAPIManager.wg.Add(1)
170170
e.Equal(1, e.eventManager.eventQueue.Size())
171171
eg.Go(func(ctx context.Context) {
@@ -202,7 +202,7 @@ func (e *EventManagerTestSuite) TestIdentifyUserWhenODPIntegrated() {
202202

203203
func (e *EventManagerTestSuite) TestProcessEventWithInvalidODPConfig() {
204204
em := NewBatchEventManager(WithAPIManager(&MockEventAPIManager{}))
205-
e.Error(em.ProcessEvent("", "", Event{}))
205+
e.Error(em.ProcessEvent("", "", Event{Action: "123"}))
206206
e.Equal(0, em.eventQueue.Size())
207207
}
208208

@@ -224,6 +224,37 @@ func (e *EventManagerTestSuite) TestProcessEventWithValidEventData() {
224224
e.Equal(1, e.eventManager.eventQueue.Size())
225225
}
226226

227+
func (e *EventManagerTestSuite) TestProcessEventWithValidUserIdentifiers() {
228+
em := NewBatchEventManager(WithAPIManager(&MockEventAPIManager{}))
229+
validIdentifiers := []string{utils.OdpFSUserIDKey, "fs-user-id", "FS-USER-ID", "FS_USER_ID"}
230+
for i, userID := range validIdentifiers {
231+
expectedValue := fmt.Sprintf("%d", i)
232+
tmpEvent := Event{
233+
Type: "t1",
234+
Action: "a1",
235+
Identifiers: map[string]string{userID: expectedValue},
236+
Data: map[string]interface{}{
237+
"key11": "value-1",
238+
},
239+
}
240+
241+
e.NoError(em.ProcessEvent("1", "2", tmpEvent))
242+
e.Equal(i+1, em.eventQueue.Size())
243+
// Check event was added to queue
244+
sentEvent := em.eventQueue.Get(i + 1)
245+
fEvent, exists := sentEvent[i].(Event)
246+
e.True(exists)
247+
248+
// Check valid key was added
249+
e.Equal(expectedValue, fEvent.Identifiers[utils.OdpFSUserIDKey])
250+
// Check old key was deleted
251+
if validIdentifiers[i] != utils.OdpFSUserIDKey {
252+
_, exists := fEvent.Identifiers[validIdentifiers[i]]
253+
e.False(exists)
254+
}
255+
}
256+
}
257+
227258
func (e *EventManagerTestSuite) TestProcessEventWithInvalidEventData() {
228259
tmpEvent := Event{
229260
Type: "t1",
@@ -238,16 +269,44 @@ func (e *EventManagerTestSuite) TestProcessEventWithInvalidEventData() {
238269
e.Equal(0, e.eventManager.eventQueue.Size())
239270
}
240271

272+
func (e *EventManagerTestSuite) TestProcessEventWithValidAction() {
273+
tmpEvent := Event{
274+
Type: "t1",
275+
Action: "a1",
276+
Identifiers: map[string]string{"id-key-1": "id-value-1"},
277+
Data: map[string]interface{}{
278+
"key11": "value-1",
279+
},
280+
}
281+
282+
e.NoError(e.eventManager.ProcessEvent("1", "2", tmpEvent))
283+
e.Equal(1, e.eventManager.eventQueue.Size())
284+
}
285+
286+
func (e *EventManagerTestSuite) TestProcessEventWithInvalidAction() {
287+
tmpEvent := Event{
288+
Type: "t1",
289+
Action: "",
290+
Identifiers: map[string]string{"id-key-1": "id-value-1"},
291+
Data: map[string]interface{}{
292+
"key11": "value-1",
293+
},
294+
}
295+
296+
e.Equal(errors.New(utils.OdpInvalidAction), e.eventManager.ProcessEvent("a", "b", tmpEvent))
297+
e.Equal(0, e.eventManager.eventQueue.Size())
298+
}
299+
241300
func (e *EventManagerTestSuite) TestProcessEventDiscardsEventExceedingMaxQueueSize() {
242301
e.eventManager.maxQueueSize = 1
243-
e.eventManager.eventQueue.Add(Event{})
244-
e.Error(e.eventManager.ProcessEvent("a", "b", Event{}))
302+
e.eventManager.eventQueue.Add(Event{Action: "123"})
303+
e.Error(e.eventManager.ProcessEvent("a", "b", Event{Action: "123"}))
245304
e.Equal(1, e.eventManager.eventQueue.Size())
246305
}
247306

248307
func (e *EventManagerTestSuite) TestProcessEventWithBatchSizeNotReached() {
249308
em := NewBatchEventManager(WithAPIManager(&MockEventAPIManager{}))
250-
e.NoError(em.ProcessEvent("a", "b", Event{}))
309+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
251310
e.Equal(1, em.eventQueue.Size())
252311
e.Equal(0, e.eventAPIManager.timesSendEventsCalled)
253312
}
@@ -257,7 +316,7 @@ func (e *EventManagerTestSuite) TestProcessEventWithBatchSizeReached() {
257316
em := NewBatchEventManager(WithAPIManager(apiManager), WithFlushInterval(0))
258317
e.Equal(0, em.eventQueue.Size())
259318
apiManager.wg.Add(1)
260-
e.NoError(em.ProcessEvent("a", "b", Event{}))
319+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
261320
// Wait for event fire through go routine
262321
apiManager.wg.Wait()
263322
e.Equal(0, em.eventQueue.Size())
@@ -267,12 +326,12 @@ func (e *EventManagerTestSuite) TestProcessEventWithBatchSizeReached() {
267326
func (e *EventManagerTestSuite) TestProcessEventsExceedingBatchSize() {
268327
apiManager := &MockEventAPIManager{}
269328
em := NewBatchEventManager(WithAPIManager(apiManager), WithFlushInterval(0))
270-
em.eventQueue.Add(Event{})
271-
em.eventQueue.Add(Event{})
329+
em.eventQueue.Add(Event{Action: "123"})
330+
em.eventQueue.Add(Event{Action: "123"})
272331
e.Equal(2, em.eventQueue.Size())
273332
// Three batch events should be fired
274333
apiManager.wg.Add(3)
275-
e.NoError(em.ProcessEvent("a", "b", Event{}))
334+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
276335
// Wait for event fire through go routine
277336
apiManager.wg.Wait()
278337
// Since all events fired successfully, queue should be empty
@@ -283,7 +342,7 @@ func (e *EventManagerTestSuite) TestProcessEventsExceedingBatchSize() {
283342
func (e *EventManagerTestSuite) TestProcessEventFirstEventFailsWithRetries() {
284343
apiManager := &MockEventAPIManager{}
285344
em := NewBatchEventManager(WithAPIManager(apiManager), WithFlushInterval(0))
286-
em.eventQueue.Add(Event{})
345+
em.eventQueue.Add(Event{Action: "123"})
287346
e.Equal(1, em.eventQueue.Size())
288347
// Return true for retry for all calls
289348
apiManager.retryResponses = []bool{true, true, true}
@@ -293,7 +352,7 @@ func (e *EventManagerTestSuite) TestProcessEventFirstEventFailsWithRetries() {
293352
// Total 2 events in queue which make 2 batches
294353
// first batch will be retried thrice, second one wont be fired since first failed thrice
295354
apiManager.wg.Add(maxRetries)
296-
e.NoError(em.ProcessEvent("a", "b", Event{}))
355+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
297356
// Wait for three retries
298357
apiManager.wg.Wait()
299358
// Since all events failed, queue should contain all events
@@ -304,14 +363,14 @@ func (e *EventManagerTestSuite) TestProcessEventFirstEventFailsWithRetries() {
304363
func (e *EventManagerTestSuite) TestProcessEventFirstEventFailsWithRetryNotAllowed() {
305364
apiManager := &MockEventAPIManager{}
306365
em := NewBatchEventManager(WithAPIManager(apiManager), WithFlushInterval(0))
307-
em.eventQueue.Add(Event{})
366+
em.eventQueue.Add(Event{Action: "123"})
308367
e.Equal(1, em.eventQueue.Size())
309368
apiManager.retryResponses = []bool{false}
310369
tmpError := errors.New("")
311370
apiManager.errResponses = []error{tmpError}
312371
// first batch will not be retried, second one wont be fired since first failed
313372
apiManager.wg.Add(1)
314-
e.NoError(em.ProcessEvent("a", "b", Event{}))
373+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
315374
// Wait for three retries
316375
apiManager.wg.Wait()
317376
// Since first batch of 2 events failed with no retry allowed, queue should only contain 1 event
@@ -322,7 +381,7 @@ func (e *EventManagerTestSuite) TestProcessEventFirstEventFailsWithRetryNotAllow
322381
func (e *EventManagerTestSuite) TestProcessEventSecondEventFailsWithRetriesLaterPasses() {
323382
apiManager := &MockEventAPIManager{}
324383
em := NewBatchEventManager(WithAPIManager(apiManager), WithFlushInterval(0))
325-
em.eventQueue.Add(Event{})
384+
em.eventQueue.Add(Event{Action: "123"})
326385
e.Equal(1, em.eventQueue.Size())
327386
// Return true for retry for all second batch calls
328387
apiManager.retryResponses = []bool{false, true, true, true, false, false}
@@ -332,7 +391,7 @@ func (e *EventManagerTestSuite) TestProcessEventSecondEventFailsWithRetriesLater
332391
// Total 2 events in queue which make 2 batches
333392
// first batch will be successfully dispatched, second will be retried thrice
334393
apiManager.wg.Add(4)
335-
e.NoError(em.ProcessEvent("a", "b", Event{}))
394+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
336395
// Wait for events to fire
337396
apiManager.wg.Wait()
338397
// Since second batch of 1 event failed, queue should be contain 1 event
@@ -344,7 +403,7 @@ func (e *EventManagerTestSuite) TestProcessEventSecondEventFailsWithRetriesLater
344403
time.Sleep(200 * time.Millisecond)
345404

346405
apiManager.wg.Add(2)
347-
e.NoError(em.ProcessEvent("a", "b", Event{}))
406+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
348407
// Wait for events to fire
349408
apiManager.wg.Wait()
350409
// Queue should be empty since remaining event was sent now
@@ -356,7 +415,7 @@ func (e *EventManagerTestSuite) TestProcessEventSecondEventFailsWithRetriesLater
356415
func (e *EventManagerTestSuite) TestProcessEventFirstEventPassesWithRetries() {
357416
apiManager := &MockEventAPIManager{}
358417
em := NewBatchEventManager(WithAPIManager(apiManager), WithFlushInterval(0))
359-
em.eventQueue.Add(Event{})
418+
em.eventQueue.Add(Event{Action: "123"})
360419
e.Equal(1, em.eventQueue.Size())
361420
// Return true for first batch call only
362421
apiManager.retryResponses = []bool{true, false, false}
@@ -366,7 +425,7 @@ func (e *EventManagerTestSuite) TestProcessEventFirstEventPassesWithRetries() {
366425
// Total 2 events in queue which make 2 batches
367426
// first batch will be retried once, second will be successful immediately
368427
apiManager.wg.Add(3)
369-
e.NoError(em.ProcessEvent("a", "b", Event{}))
428+
e.NoError(em.ProcessEvent("a", "b", Event{Action: "123"}))
370429
// Wait for events to fire
371430
apiManager.wg.Wait()
372431
// Since all events were successful, queue should be empty
@@ -384,7 +443,7 @@ func (e *EventManagerTestSuite) TestEventManagerAsyncBehaviour() {
384443
eg := newExecutionContext()
385444
callAllMethods := func(id string) {
386445
eventManager.IdentifyUser("-1", "-1", id)
387-
eventManager.ProcessEvent("-1", "-1", Event{})
446+
eventManager.ProcessEvent("-1", "-1", Event{Action: "123"})
388447
}
389448
for i := 0; i < iterations; i++ {
390449
eg.Go(func(ctx context.Context) {
@@ -428,7 +487,7 @@ func (e *EventManagerTestSuite) TestFlushEventsAsyncBehaviour() {
428487
}
429488

430489
func (e *EventManagerTestSuite) TestAddCommonData() {
431-
userEvent := Event{}
490+
userEvent := Event{Action: "123"}
432491
e.eventManager.addCommonData(&userEvent)
433492
e.NotNil(userEvent.Data)
434493
e.NotEmpty(userEvent.Data["idempotence_id"])
@@ -437,6 +496,23 @@ func (e *EventManagerTestSuite) TestAddCommonData() {
437496
e.Equal(event.Version, userEvent.Data["data_source_version"])
438497
}
439498

499+
func (e *EventManagerTestSuite) TestConvertIdentifiers() {
500+
validKeys := []string{utils.OdpFSUserIDKey, "fs-user-id", "FS-USER-ID", "FS_USER_ID"}
501+
expectedValue := "123"
502+
503+
for _, k := range validKeys {
504+
userEvent := Event{Identifiers: map[string]string{k: "123"}}
505+
e.eventManager.convertIdentifiers(&userEvent)
506+
// Check valid key was added
507+
e.Equal(expectedValue, userEvent.Identifiers[utils.OdpFSUserIDKey])
508+
// Check old key was deleted
509+
if k != utils.OdpFSUserIDKey {
510+
_, exists := userEvent.Identifiers[k]
511+
e.False(exists)
512+
}
513+
}
514+
}
515+
440516
func (e *EventManagerTestSuite) TestUserDataOverridesCommonData() {
441517
userEvent := Event{Data: map[string]interface{}{
442518
"abc": nil,
@@ -456,17 +532,17 @@ func (e *EventManagerTestSuite) TestUserDataOverridesCommonData() {
456532

457533
func (e *EventManagerTestSuite) TestIsOdpServiceIntegrated() {
458534
e.True(e.eventManager.IsOdpServiceIntegrated("a", "b"))
459-
e.eventManager.eventQueue.Add(Event{})
535+
e.eventManager.eventQueue.Add(Event{Action: "123"})
460536
e.Equal(1, e.eventManager.eventQueue.Size())
461537

462538
e.False(e.eventManager.IsOdpServiceIntegrated("", ""))
463539
e.Equal(0, e.eventManager.eventQueue.Size())
464540

465-
e.eventManager.eventQueue.Add(Event{})
541+
e.eventManager.eventQueue.Add(Event{Action: "123"})
466542
e.False(e.eventManager.IsOdpServiceIntegrated("a", ""))
467543
e.Equal(0, e.eventManager.eventQueue.Size())
468544

469-
e.eventManager.eventQueue.Add(Event{})
545+
e.eventManager.eventQueue.Add(Event{Action: "123"})
470546
e.False(e.eventManager.IsOdpServiceIntegrated("", "b"))
471547
e.Equal(0, e.eventManager.eventQueue.Size())
472548
}
@@ -475,7 +551,7 @@ func (e *EventManagerTestSuite) TestEventQueueRaceCondition() {
475551
testIterations := 10
476552
var wg sync.WaitGroup
477553
asyncfunc := func() {
478-
e.eventManager.eventQueue.Add(Event{})
554+
e.eventManager.eventQueue.Add(Event{Action: "123"})
479555
e.eventManager.eventQueue.Size()
480556
e.eventManager.eventQueue.Get(1)
481557
e.eventManager.eventQueue.Remove(1)

pkg/odp/utils/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,6 @@ const OdpEventFailed = "ODP event send failed (%s)"
4040

4141
// OdpInvalidData error string when odp event data is invalid
4242
const OdpInvalidData = "ODP data is not valid"
43+
44+
// OdpInvalidAction error string when odp event action is invalid
45+
const OdpInvalidAction = "ODP action is not valid (cannot be empty)"

0 commit comments

Comments
 (0)