@@ -17,13 +17,33 @@ import { TTD_LEGACY_FLOW_FLAG_NAME } from '../../functions'
1717import { getAWSCredentialsFromEKS , AWSCredentials } from '../../../../lib/AWS/sts'
1818jest . mock ( '../../../../lib/AWS/sts' )
1919
20+ // Create a mock send function that can be controlled per test
21+ const mockS3Send = jest . fn ( )
22+
23+ // Mock AWS S3 client for AWS flow tests
24+ jest . mock ( '@aws-sdk/client-s3' , ( ) => ( {
25+ S3Client : jest . fn ( ) . mockImplementation ( ( ) => ( {
26+ send : mockS3Send
27+ } ) ) ,
28+ PutObjectCommand : jest . fn ( ) . mockImplementation ( ( input ) => ( {
29+ constructor : { name : 'PutObjectCommand' } ,
30+ input
31+ } ) )
32+ } ) )
33+
2034let testDestination = createTestIntegration ( Destination )
2135
2236beforeEach ( ( ) => {
2337 // Re-Initialize the destination before each test
2438 // This is done to mitigate a bug where action responses persist into other tests
2539 testDestination = createTestIntegration ( Destination )
2640
41+ // Reset S3 mock to default success behavior
42+ mockS3Send . mockReset ( )
43+ mockS3Send . mockResolvedValue ( {
44+ $metadata : { httpStatusCode : 200 }
45+ } )
46+
2747 // Mock function to fetch AWS Credentials from STS
2848 ; ( getAWSCredentialsFromEKS as jest . Mock ) . mockResolvedValue ( {
2949 accessKeyId : 'TESTACCESSKEY' ,
@@ -37,7 +57,7 @@ afterAll(() => {
3757} )
3858
3959const events : SegmentEvent [ ] = [ ]
40- for ( let index = 1 ; index <= 1500 ; index ++ ) {
60+ for ( let index = 0 ; index < 1500 ; index ++ ) {
4161 events . push (
4262 createTestEvent ( {
4363 event : 'Audience Entered' ,
@@ -50,7 +70,7 @@ for (let index = 1; index <= 1500; index++) {
5070 advertisingId : '123'
5171 } ,
5272 traits : {
53- email : `testing${ index } @testing.com`
73+ email : index < 100 ? 'invalid-email' : `testing${ index } @testing.com`
5474 } ,
5575 personas : {
5676 external_audience_id : 'external_audience_id'
@@ -60,28 +80,6 @@ for (let index = 1; index <= 1500; index++) {
6080 )
6181}
6282
63- // Push Gmail addresses
64- events . push (
65- createTestEvent ( {
66- event : 'Audience Entered' ,
67- type : 'track' ,
68- properties : {
69- audience_key : 'personas_test_audience'
70- } ,
71- context : {
72- device : {
73- advertisingId : '123'
74- } ,
75- traits : {
76- email : `some.id+testing@gmail.com`
77- } ,
78- personas : {
79- external_audience_id : 'external_audience_id'
80- }
81- }
82- } )
83- )
84-
8583const event = createTestEvent ( {
8684 event : 'Audience Entered' ,
8785 type : 'track' ,
@@ -101,7 +99,7 @@ const event = createTestEvent({
10199 }
102100} )
103101
104- describe ( 'TheTradeDeskCrm.syncAudience' , ( ) => {
102+ describe ( 'TheTradeDeskCrm.syncAudience legacy flow ' , ( ) => {
105103 it ( 'should fail if batch has less than 1500 and using legacy flow' , async ( ) => {
106104 nock ( `https://api.thetradedesk.com/v3/crmdata/segment/advertiser_id` )
107105 . get ( / .* / )
@@ -501,3 +499,200 @@ describe('TheTradeDeskCrm.syncAudience', () => {
501499 ` )
502500 } )
503501} )
502+
503+ describe ( 'TheTradeDeskCrm.syncAudience AWS Flow' , ( ) => {
504+ it ( 'should execute AWS flow when legacy flag is disabled' , async ( ) => {
505+ const responses = await testDestination . testBatchAction ( 'syncAudience' , {
506+ events,
507+ settings : {
508+ advertiser_id : 'advertiser_id' ,
509+ auth_token : 'test_token' ,
510+ __segment_internal_engage_force_full_sync : true ,
511+ __segment_internal_engage_batch_sync : true
512+ } ,
513+ features : {
514+ [ TTD_LEGACY_FLOW_FLAG_NAME ] : false
515+ } ,
516+ useDefaultMappings : true ,
517+ mapping : {
518+ name : 'test_audience' ,
519+ region : 'US' ,
520+ pii_type : 'Email'
521+ }
522+ } )
523+
524+ // AWS flow returns a MultiStatusResponse, not HTTP responses
525+ expect ( responses ) . toBeDefined ( )
526+ } )
527+
528+ it ( 'should process batch successfully with AWS flow' , async ( ) => {
529+ const responses = await testDestination . testBatchAction ( 'syncAudience' , {
530+ events,
531+ settings : {
532+ advertiser_id : 'advertiser_id' ,
533+ auth_token : 'test_token' ,
534+ __segment_internal_engage_force_full_sync : true ,
535+ __segment_internal_engage_batch_sync : true
536+ } ,
537+ features : {
538+ [ TTD_LEGACY_FLOW_FLAG_NAME ] : false
539+ } ,
540+ useDefaultMappings : true ,
541+ mapping : {
542+ name : 'test_audience' ,
543+ region : 'US' ,
544+ pii_type : 'Email'
545+ }
546+ } )
547+
548+ // Verify the responses exist
549+ expect ( responses . length ) . toBe ( 0 )
550+
551+ // Verify multiStatusResponse is populated with success responses
552+ const multiStatusResponse = testDestination . results ?. [ 0 ] ?. multistatus
553+ expect ( multiStatusResponse ?. length ) . toBe ( 1500 )
554+ if ( multiStatusResponse ) {
555+ // 99 events have invalid emails (index 1-99), rest are valid (1401 + 1 Gmail = 1402)
556+ const successResponses = multiStatusResponse . filter ( ( r : { status : number } ) => r . status === 200 )
557+ expect ( successResponses . length ) . toBe ( 1400 )
558+ }
559+ } )
560+
561+ it ( 'should fail with invalid email in AWS flow' , async ( ) => {
562+ await testDestination . testBatchAction ( 'syncAudience' , {
563+ events : events ,
564+ settings : {
565+ advertiser_id : 'advertiser_id' ,
566+ auth_token : 'test_token' ,
567+ __segment_internal_engage_force_full_sync : true ,
568+ __segment_internal_engage_batch_sync : true
569+ } ,
570+ features : {
571+ [ TTD_LEGACY_FLOW_FLAG_NAME ] : false
572+ } ,
573+ useDefaultMappings : true ,
574+ mapping : {
575+ name : 'test_audience' ,
576+ region : 'US' ,
577+ pii_type : 'Email'
578+ }
579+ } )
580+
581+ const multiStatusResponse = testDestination . results ?. [ 0 ] ?. multistatus
582+ expect ( multiStatusResponse ) . toBeDefined ( )
583+ if ( multiStatusResponse ) {
584+ // The first event with should have error status due to invalid email
585+ const invalidEmailResponse = multiStatusResponse [ 0 ]
586+ expect ( invalidEmailResponse . status ) . toBe ( 400 )
587+ const errorResponse = invalidEmailResponse as MultiStatusErrorNode
588+ expect ( errorResponse . errortype ) . toBe ( 'PAYLOAD_VALIDATION_FAILED' )
589+ expect ( errorResponse . errormessage ) . toContain ( 'Invalid email format' )
590+ }
591+ } )
592+
593+ it ( 'should mark all payloads as failed when AWS upload fails with default 400 status' , async ( ) => {
594+ // Mock S3 client to throw an error without $metadata
595+ mockS3Send . mockRejectedValue ( new Error ( 'AWS connection failed' ) )
596+
597+ await testDestination . testBatchAction ( 'syncAudience' , {
598+ events : events ,
599+ settings : {
600+ advertiser_id : 'advertiser_id' ,
601+ auth_token : 'test_token' ,
602+ __segment_internal_engage_force_full_sync : true ,
603+ __segment_internal_engage_batch_sync : true
604+ } ,
605+ features : {
606+ [ TTD_LEGACY_FLOW_FLAG_NAME ] : false
607+ } ,
608+ useDefaultMappings : true ,
609+ mapping : {
610+ name : 'test_audience' ,
611+ region : 'US' ,
612+ pii_type : 'Email'
613+ }
614+ } )
615+
616+ const multiStatusResponse = testDestination . results ?. [ 0 ] ?. multistatus
617+ expect ( multiStatusResponse ) . toBeDefined ( )
618+ if ( multiStatusResponse ) {
619+ expect ( multiStatusResponse [ multiStatusResponse . length - 1 ] . status ) . toBe ( 400 )
620+ const errorResponse = multiStatusResponse [ multiStatusResponse . length - 1 ] as MultiStatusErrorNode
621+ expect ( errorResponse . errormessage ) . toContain ( 'Failed to upload payload to Integrations Outbound Controller' )
622+ }
623+ } )
624+
625+ it ( 'should use AWS error status code when available in $metadata' , async ( ) => {
626+ // Mock S3 client to throw an error with $metadata.httpStatusCode
627+ const awsError = new Error ( 'Service Unavailable' ) as Error & { $metadata ?: { httpStatusCode : number } }
628+ awsError . $metadata = { httpStatusCode : 503 }
629+ mockS3Send . mockRejectedValue ( awsError )
630+
631+ await testDestination . testBatchAction ( 'syncAudience' , {
632+ events : events ,
633+ settings : {
634+ advertiser_id : 'advertiser_id' ,
635+ auth_token : 'test_token' ,
636+ __segment_internal_engage_force_full_sync : true ,
637+ __segment_internal_engage_batch_sync : true
638+ } ,
639+ features : {
640+ [ TTD_LEGACY_FLOW_FLAG_NAME ] : false
641+ } ,
642+ useDefaultMappings : true ,
643+ mapping : {
644+ name : 'test_audience' ,
645+ region : 'US' ,
646+ pii_type : 'Email'
647+ }
648+ } )
649+
650+ const multiStatusResponse = testDestination . results ?. [ 0 ] ?. multistatus
651+ expect ( multiStatusResponse ) . toBeDefined ( )
652+ if ( multiStatusResponse ) {
653+ // Should use the 503 status code from AWS error
654+ expect ( multiStatusResponse [ multiStatusResponse . length - 1 ] . status ) . toBe ( 503 )
655+ }
656+ } )
657+
658+ it ( 'should preserve already-errored payloads when AWS upload fails' , async ( ) => {
659+ // Mock S3 client to throw an error
660+ mockS3Send . mockRejectedValue ( new Error ( 'AWS connection failed' ) )
661+
662+ await testDestination . testBatchAction ( 'syncAudience' , {
663+ events : events ,
664+ settings : {
665+ advertiser_id : 'advertiser_id' ,
666+ auth_token : 'test_token' ,
667+ __segment_internal_engage_force_full_sync : true ,
668+ __segment_internal_engage_batch_sync : true
669+ } ,
670+ features : {
671+ [ TTD_LEGACY_FLOW_FLAG_NAME ] : false
672+ } ,
673+ useDefaultMappings : true ,
674+ mapping : {
675+ name : 'test_audience' ,
676+ region : 'US' ,
677+ pii_type : 'Email'
678+ }
679+ } )
680+
681+ const multiStatusResponse = testDestination . results ?. [ 0 ] ?. multistatus
682+ expect ( multiStatusResponse ) . toBeDefined ( )
683+ if ( multiStatusResponse ) {
684+ expect ( multiStatusResponse . length ) . toBe ( 1500 )
685+
686+ // First payload should have PAYLOAD_VALIDATION_FAILED error (from extractUsers)
687+ const invalidEmailError = multiStatusResponse [ 0 ] as MultiStatusErrorNode
688+ expect ( invalidEmailError . status ) . toBe ( 400 )
689+ expect ( invalidEmailError . errortype ) . toBe ( 'PAYLOAD_VALIDATION_FAILED' )
690+ expect ( invalidEmailError . errormessage ) . toContain ( 'Invalid email format' )
691+
692+ // Second payload should have AWS upload failure error
693+ const awsFailureError = multiStatusResponse [ multiStatusResponse . length - 1 ] as MultiStatusErrorNode
694+ expect ( awsFailureError . status ) . toBe ( 400 )
695+ expect ( awsFailureError . errormessage ) . toContain ( 'Failed to upload payload to Integrations Outbound Controller' )
696+ }
697+ } )
698+ } )
0 commit comments