Skip to content

Commit 5183b9e

Browse files
fix(fb_custom_audience): one invalid event causes entire batch to fail
1 parent d4d9801 commit 5183b9e

File tree

3 files changed

+221
-61
lines changed

3 files changed

+221
-61
lines changed

src/v0/destinations/fb_custom_audience/recordTransform.ts

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
ConfigurationError,
66
groupByInBatches,
77
forEachInBatches,
8-
mapInBatches,
98
} from '@rudderstack/integrations-lib';
109
import type { Metadata } from '../../../types';
1110
import type {
@@ -22,6 +21,8 @@ import {
2221
checkSubsetOfArray,
2322
returnArrayOfSubarrays,
2423
getSuccessRespEvents,
24+
getErrorRespEvents,
25+
generateErrorObject,
2526
isEventSentByVDMV2Flow,
2627
isEventSentByVDMV1Flow,
2728
isDefinedAndNotNullAndNotEmpty,
@@ -52,7 +53,7 @@ const processRecord = (
5253
disableFormat: boolean | undefined,
5354
workspaceId: string,
5455
destinationId: string,
55-
): { dataElement: unknown[]; metadata: Metadata } => {
56+
): { metadata: Metadata; error?: string; dataElement?: unknown[] } => {
5657
const fields = record.message.fields!;
5758
let dataElement: unknown[] = [];
5859
let nullUserData = true;
@@ -85,9 +86,10 @@ const processRecord = (
8586
});
8687

8788
if (nullUserData) {
88-
throw new InstrumentationError(
89-
`All user properties [${userSchema.join(', ')}] are invalid or null. At least one valid field is required.`,
90-
);
89+
return {
90+
error: `All user properties [${userSchema.join(', ')}] are invalid or null. At least one valid field is required.`,
91+
metadata: record.metadata,
92+
};
9193
}
9294

9395
return { dataElement, metadata: record.metadata };
@@ -112,21 +114,40 @@ const processRecordEventArray = async (
112114
const { userSchema, isHashRequired, disableFormat, paramsPayload, prepareParams } = config;
113115
const toSendEvents: unknown[] = [];
114116
const metadata: Metadata[] = [];
117+
const invalidEvents: unknown[] = [];
115118

116119
await forEachInBatches(recordChunksArray, async (recordArray) => {
117-
const data = await mapInBatches(recordArray, async (input) => {
118-
const { dataElement, metadata: recordMetadata } = processRecord(
120+
const data: unknown[][] = [];
121+
await forEachInBatches(recordArray, async (input) => {
122+
const result = processRecord(
119123
input,
120124
userSchema,
121125
isHashRequired,
122126
disableFormat,
123127
input.metadata.workspaceId,
124128
destination.ID,
125129
);
126-
metadata.push(recordMetadata);
127-
return dataElement;
130+
if (result.error) {
131+
const error = new InstrumentationError(result.error);
132+
const errorObj = generateErrorObject(error);
133+
invalidEvents.push(
134+
getErrorRespEvents(
135+
[result.metadata],
136+
errorObj.status,
137+
errorObj.message,
138+
errorObj.statTags,
139+
),
140+
);
141+
} else {
142+
data.push(result.dataElement!);
143+
metadata.push(result.metadata);
144+
}
128145
});
129146

147+
if (data.length === 0) {
148+
return;
149+
}
150+
130151
const prepareFinalPayload = lodash.cloneDeep(paramsPayload);
131152
prepareFinalPayload.schema = userSchema;
132153
prepareFinalPayload.data = data;
@@ -149,7 +170,12 @@ const processRecordEventArray = async (
149170
});
150171
});
151172

152-
return getSuccessRespEvents(toSendEvents, metadata, destination, true);
173+
const successResponse =
174+
toSendEvents.length > 0
175+
? getSuccessRespEvents(toSendEvents, metadata, destination, true)
176+
: null;
177+
178+
return { successResponse, invalidEvents };
153179
};
154180

155181
/**
@@ -258,14 +284,20 @@ async function preparePayload(
258284
const insertResponse = await processAction('insert', 'add');
259285
const updateResponse = await processAction('update', 'add');
260286

261-
const errorResponse = getErrorResponse(groupedRecordsByAction);
287+
const errorResponse = [
288+
...getErrorResponse(groupedRecordsByAction),
289+
...(deleteResponse?.invalidEvents || []),
290+
...(insertResponse?.invalidEvents || []),
291+
...(updateResponse?.invalidEvents || []),
292+
];
262293

263294
const finalResponse = createFinalResponse(
264-
deleteResponse,
265-
insertResponse,
266-
updateResponse,
295+
deleteResponse?.successResponse,
296+
insertResponse?.successResponse,
297+
updateResponse?.successResponse,
267298
errorResponse,
268299
);
300+
269301
if (finalResponse.length === 0) {
270302
throw new InstrumentationError(
271303
'Missing valid parameters, unable to generate transformed payload',

test/integrations/destinations/fb_custom_audience/router/data.ts

Lines changed: 123 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
rETLRecordV1RouterRequest,
33
rETLRecordV2RouterRequest,
4+
rETLRecordV2AllNullRouterRequest,
45
rETLRecordV2RouterInvalidRequest,
56
rETLRecordV2RouterRequestWithValueBasedAudience,
67
rETLRecordV2RouterInvalidRequestWithValueBasedAudience,
@@ -884,59 +885,134 @@ export const data = [
884885
body: {
885886
output: [
886887
{
887-
batched: false,
888-
error:
889-
'All user properties [EMAIL, FI] are invalid or null. At least one valid field is required.',
890-
metadata: [
888+
batched: true,
889+
batchedRequest: [
891890
{
892-
attemptNum: 1,
893-
destinationId: 'default-destinationId',
894-
dontBatch: false,
895-
jobId: 1,
896-
secret: {
897-
accessToken: defaultAccessToken,
891+
version: '1',
892+
type: 'REST',
893+
method: 'POST',
894+
endpoint: 'https://graph.facebook.com/v23.0/23848494844100489/users',
895+
endpointPath: 'users',
896+
headers: {},
897+
params: {
898+
access_token: 'ABC',
899+
payload: {
900+
schema: ['EMAIL', 'FI'],
901+
data: [
902+
[
903+
'b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36',
904+
'1e14a2f476f7611a8b22bc85d14237fdc88aac828737e739416c32c5bce3bd16',
905+
],
906+
[
907+
'b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36',
908+
'1e14a2f476f7611a8b22bc85d14237fdc88aac828737e739416c32c5bce3bd16',
909+
],
910+
[
911+
'b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36',
912+
'1e14a2f476f7611a8b22bc85d14237fdc88aac828737e739416c32c5bce3bd16',
913+
],
914+
],
915+
},
898916
},
899-
sourceId: 'default-sourceId',
900-
userId: 'default-userId',
901-
workspaceId: 'default-workspaceId',
902-
},
903-
{
904-
attemptNum: 1,
905-
destinationId: 'default-destinationId',
906-
dontBatch: false,
907-
jobId: 2,
908-
secret: {
909-
accessToken: defaultAccessToken,
917+
body: {
918+
JSON: {},
919+
JSON_ARRAY: {},
920+
XML: {},
921+
FORM: {},
910922
},
911-
sourceId: 'default-sourceId',
912-
userId: 'default-userId',
913-
workspaceId: 'default-workspaceId',
923+
files: {},
914924
},
915-
{
916-
attemptNum: 1,
917-
destinationId: 'default-destinationId',
918-
dontBatch: false,
919-
jobId: 3,
920-
secret: {
921-
accessToken: defaultAccessToken,
922-
},
923-
sourceId: 'default-sourceId',
924-
userId: 'default-userId',
925-
workspaceId: 'default-workspaceId',
925+
],
926+
metadata: [generateMetadata(1), generateMetadata(2), generateMetadata(3)],
927+
destination: {
928+
Config: {
929+
accessToken: 'ABC',
930+
disableFormat: false,
931+
isHashRequired: true,
932+
isRaw: false,
933+
skipVerify: false,
934+
subType: 'NA',
935+
type: 'NA',
926936
},
927-
{
928-
attemptNum: 1,
929-
destinationId: 'default-destinationId',
930-
dontBatch: false,
931-
jobId: 4,
932-
secret: {
933-
accessToken: defaultAccessToken,
934-
},
935-
sourceId: 'default-sourceId',
936-
userId: 'default-userId',
937-
workspaceId: 'default-workspaceId',
937+
ID: '1mMy5cqbtfuaKZv1IhVQKnBdVwe',
938+
Name: 'FB_CUSTOM_AUDIENCE',
939+
Enabled: true,
940+
WorkspaceID: '1TSN08muJTZwH8iCDmnnRt1pmLd',
941+
DestinationDefinition: {
942+
ID: '1aIXqM806xAVm92nx07YwKbRrO9',
943+
Name: 'FB_CUSTOM_AUDIENCE',
944+
DisplayName: 'FB_CUSTOM_AUDIENCE',
945+
Config: {},
938946
},
939-
],
947+
Transformations: [],
948+
IsConnectionEnabled: true,
949+
IsProcessorEnabled: true,
950+
},
951+
statusCode: 200,
952+
},
953+
{
954+
batched: false,
955+
error:
956+
'All user properties [EMAIL, FI] are invalid or null. At least one valid field is required.',
957+
metadata: [generateMetadata(4)],
958+
statusCode: 400,
959+
statTags: {
960+
errorCategory: 'dataValidation',
961+
errorType: 'instrumentation',
962+
destType: 'FB_CUSTOM_AUDIENCE',
963+
module: 'destination',
964+
implementation: 'native',
965+
feature: 'router',
966+
destinationId: 'default-destinationId',
967+
workspaceId: 'default-workspaceId',
968+
},
969+
},
970+
],
971+
},
972+
},
973+
},
974+
},
975+
{
976+
name: 'fb_custom_audience',
977+
description: 'rETL record V2 all events have null user data',
978+
scenario: 'Framework',
979+
successCriteria:
980+
'all record events should return individual error responses when all user properties are null',
981+
feature: 'router',
982+
module: 'destination',
983+
version: 'v0',
984+
input: {
985+
request: {
986+
body: rETLRecordV2AllNullRouterRequest,
987+
},
988+
},
989+
output: {
990+
response: {
991+
status: 200,
992+
body: {
993+
output: [
994+
{
995+
batched: false,
996+
error:
997+
'All user properties [EMAIL, FI] are invalid or null. At least one valid field is required.',
998+
metadata: [generateMetadata(1)],
999+
statusCode: 400,
1000+
statTags: {
1001+
errorCategory: 'dataValidation',
1002+
errorType: 'instrumentation',
1003+
destType: 'FB_CUSTOM_AUDIENCE',
1004+
module: 'destination',
1005+
implementation: 'native',
1006+
feature: 'router',
1007+
destinationId: 'default-destinationId',
1008+
workspaceId: 'default-workspaceId',
1009+
},
1010+
},
1011+
{
1012+
batched: false,
1013+
error:
1014+
'All user properties [EMAIL, FI] are invalid or null. At least one valid field is required.',
1015+
metadata: [generateMetadata(2)],
9401016
statusCode: 400,
9411017
statTags: {
9421018
errorCategory: 'dataValidation',

test/integrations/destinations/fb_custom_audience/router/rETL.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,58 @@ export const rETLRecordV2RouterRequest: RouterTransformationRequest = {
164164
destType: 'fb_custom_audience',
165165
};
166166

167+
export const rETLRecordV2AllNullRouterRequest: RouterTransformationRequest = {
168+
input: [
169+
{
170+
destination: destinationV2,
171+
connection: connection,
172+
message: {
173+
action: 'insert',
174+
context: {
175+
sources: {
176+
job_run_id: 'cgiiurt8um7k7n5dq480',
177+
task_run_id: 'cgiiurt8um7k7n5dq48g',
178+
job_id: '2MUWghI7u85n91dd1qzGyswpZan',
179+
version: '895/merge',
180+
},
181+
},
182+
recordId: '1',
183+
rudderId: '1',
184+
identifiers: {
185+
EMAIL: null,
186+
FI: null,
187+
},
188+
type: 'record',
189+
},
190+
metadata: generateMetadata(1),
191+
},
192+
{
193+
destination: destinationV2,
194+
connection: connection,
195+
message: {
196+
action: 'insert',
197+
context: {
198+
sources: {
199+
job_run_id: 'cgiiurt8um7k7n5dq480',
200+
task_run_id: 'cgiiurt8um7k7n5dq48g',
201+
job_id: '2MUWghI7u85n91dd1qzGyswpZan',
202+
version: '895/merge',
203+
},
204+
},
205+
recordId: '2',
206+
rudderId: '2',
207+
identifiers: {
208+
EMAIL: null,
209+
FI: null,
210+
},
211+
type: 'record',
212+
},
213+
metadata: generateMetadata(2),
214+
},
215+
],
216+
destType: 'fb_custom_audience',
217+
};
218+
167219
export const rETLRecordV2RouterInvalidRequest: RouterTransformationRequest = {
168220
input: [
169221
{

0 commit comments

Comments
 (0)