Skip to content

Commit 4f335a3

Browse files
committed
chore: review comments
🔒 Scanned for secrets using gitleaks 8.29.1
1 parent 73c4bb7 commit 4f335a3

File tree

8 files changed

+2801
-165
lines changed

8 files changed

+2801
-165
lines changed

src/v0/destinations/tiktok_audience/config.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
import { EVENT_TYPES } from '../../util/recordUtils';
2+
13
export const ACTION_MAP: Record<string, string> = {
24
add: 'add',
35
remove: 'delete',
46
};
57

68
export const ACTION_RECORD_MAP: Record<string, string> = {
7-
insert: 'add',
8-
delete: 'remove',
9-
update: 'add',
9+
[EVENT_TYPES.INSERT]: 'add',
10+
[EVENT_TYPES.UPDATE]: 'add',
11+
[EVENT_TYPES.DELETE]: 'remove',
1012
};
1113

1214
export const SHA256_TRAITS = ['IDFA_SHA256', 'AAID_SHA256', 'EMAIL_SHA256', 'PHONE_SHA256'];
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import md5 from 'md5';
2+
import { hashToSha256, InstrumentationError, formatZodError } from '@rudderstack/integrations-lib';
3+
import { groupBy } from 'lodash';
4+
import type { TiktokAudienceRecordRequest } from './recordTypes';
5+
import { TiktokAudienceRecordRouterRequestSchema } from './recordTypes';
6+
import { SHA256_TRAITS, ENDPOINT, ENDPOINT_PATH, ACTION_RECORD_MAP } from './config';
7+
import { defaultRequestConfig, getSuccessRespEvents, handleRtTfSingleEventError } from '../../util';
8+
import { RouterTransformationResponse } from '../../../types';
9+
10+
type Identifier = {
11+
id: string;
12+
audience_ids: string[];
13+
};
14+
15+
type Payload = {
16+
event: TiktokAudienceRecordRequest;
17+
batchIdentifiers: Identifier[];
18+
idSchema: string[];
19+
advertiserId: string;
20+
action: string;
21+
};
22+
23+
function prepareIdentifiersPayload(event: TiktokAudienceRecordRequest): Payload {
24+
const { message, connection, destination } = event;
25+
const { isHashRequired, audienceId } = connection.config.destination;
26+
const { advertiserId } = destination.Config;
27+
const { action, identifiers } = message;
28+
29+
const hashIdentifier = (fieldName: string, value: string) => {
30+
if (isHashRequired) {
31+
if (SHA256_TRAITS.includes(fieldName)) {
32+
return hashToSha256(value);
33+
}
34+
return md5(value);
35+
}
36+
return value;
37+
};
38+
39+
const identifiersList: Identifier[] = [];
40+
for (const [fieldName, value] of Object.entries(identifiers)) {
41+
if (value) {
42+
identifiersList.push({
43+
id: hashIdentifier(fieldName, value),
44+
audience_ids: [audienceId],
45+
});
46+
}
47+
}
48+
49+
const payload: Payload = {
50+
event,
51+
batchIdentifiers: identifiersList,
52+
idSchema: Object.keys(identifiers).sort(),
53+
advertiserId,
54+
action: ACTION_RECORD_MAP[action],
55+
};
56+
return payload;
57+
}
58+
59+
function buildResponseForProcessTransformation(
60+
payload: Record<string, any>,
61+
event: TiktokAudienceRecordRequest,
62+
) {
63+
const accessToken = event.metadata?.secret?.accessToken;
64+
const userId = event.message?.userId;
65+
66+
const response = defaultRequestConfig();
67+
response.body.JSON = payload;
68+
response.userId = userId;
69+
response.endpoint = ENDPOINT;
70+
response.endpointPath = ENDPOINT_PATH;
71+
response.headers = {
72+
'Access-Token': accessToken,
73+
'Content-Type': 'application/json',
74+
};
75+
return response;
76+
}
77+
78+
function validateAudienceRecordEvent(event: unknown) {
79+
const result = TiktokAudienceRecordRouterRequestSchema.safeParse(event);
80+
if (!result.success) {
81+
throw new InstrumentationError(formatZodError(result.error));
82+
}
83+
return result.data;
84+
}
85+
86+
function processTiktokAudienceRecords(events: unknown[]): {
87+
recordFailedResponses: RouterTransformationResponse[];
88+
recordSuccessfulResponses: RouterTransformationResponse[];
89+
} {
90+
const recordSuccessfulResponses: RouterTransformationResponse[] = [];
91+
const recordFailedResponses: RouterTransformationResponse[] = [];
92+
93+
const payloads: Payload[] = [];
94+
95+
for (const event of events) {
96+
try {
97+
const tiktokEvent = validateAudienceRecordEvent(event);
98+
const payload = prepareIdentifiersPayload(tiktokEvent);
99+
payloads.push(payload);
100+
} catch (error) {
101+
recordFailedResponses.push(handleRtTfSingleEventError(event, error, {}));
102+
}
103+
}
104+
105+
const groupedPayloads = groupBy(
106+
payloads,
107+
(payload) => `${payload.advertiserId}-${payload.action}-${payload.idSchema.join(',')}`,
108+
);
109+
110+
for (const [key, payloadArray] of Object.entries(groupedPayloads)) {
111+
try {
112+
const [advertiserId, action, idSchema] = key.split('-');
113+
const idSchemaList = idSchema.split(',');
114+
const batchData = payloadArray.map((payload) => payload.batchIdentifiers);
115+
const metadataList = payloadArray.map((payload) => payload.event.metadata);
116+
117+
const payload: Record<string, any> = {
118+
batch_data: batchData,
119+
id_schema: idSchemaList,
120+
advertiser_ids: [advertiserId],
121+
action,
122+
};
123+
const response = buildResponseForProcessTransformation(payload, payloadArray[0].event);
124+
recordSuccessfulResponses.push(
125+
getSuccessRespEvents(response, metadataList, payloadArray[0].event.destination, true),
126+
);
127+
} catch (error) {
128+
recordFailedResponses.push(
129+
...payloadArray.map((payload) => handleRtTfSingleEventError(payload.event, error, {})),
130+
);
131+
}
132+
}
133+
134+
return { recordFailedResponses, recordSuccessfulResponses };
135+
}
136+
137+
export { processTiktokAudienceRecords };
File renamed without changes.

src/v0/destinations/tiktok_audience/transform.record.ts

Lines changed: 0 additions & 75 deletions
This file was deleted.

src/v0/destinations/tiktok_audience/transform.ts

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import md5 from 'md5';
2-
import { hashToSha256, InstrumentationError, formatZodError } from '@rudderstack/integrations-lib';
2+
import {
3+
hashToSha256,
4+
InstrumentationError,
5+
formatZodError,
6+
groupByInBatches,
7+
} from '@rudderstack/integrations-lib';
38
import type { RouterTransformationResponse } from '../../../types';
49
import type { TiktokAudienceListRequest } from './types';
510
import { TiktokAudienceListRouterRequestSchema } from './types';
@@ -9,9 +14,9 @@ import {
914
getDestinationExternalIDInfoForRetl,
1015
getSuccessRespEvents,
1116
handleRtTfSingleEventError,
12-
isEventSentByVDMV2Flow,
1317
} from '../../util';
14-
import { processTiktokAudienceRecord, validateAudienceRecordEvent } from './transform.record';
18+
import { processTiktokAudienceRecords } from './recordTransform';
19+
import { TiktokAudienceRecordRequest } from './recordTypes';
1520

1621
function prepareIdentifiersList(event: TiktokAudienceListRequest) {
1722
const { message, destination, metadata } = event;
@@ -95,39 +100,57 @@ function processTiktokAudienceList(event: TiktokAudienceListRequest) {
95100
return buildResponseForProcessTransformation(identifierLists, event);
96101
}
97102

98-
function process(event: unknown) {
99-
if (isEventSentByVDMV2Flow(event)) {
100-
return processTiktokAudienceRecord(validateAudienceRecordEvent(event));
101-
}
102-
return processTiktokAudienceList(validateAudienceListEvent(event));
103-
}
104-
105-
const processRouterDest = async (events: unknown[]): Promise<RouterTransformationResponse[]> => {
103+
const processRouterDest = async (
104+
events: (TiktokAudienceListRequest | TiktokAudienceRecordRequest)[],
105+
): Promise<RouterTransformationResponse[]> => {
106106
if (!events || events.length === 0) return [];
107107

108-
const successfulResponses: RouterTransformationResponse[] = [];
109-
const failedResponses: RouterTransformationResponse[] = [];
108+
const groupedEvents = await groupByInBatches<
109+
TiktokAudienceListRequest | TiktokAudienceRecordRequest,
110+
string
111+
>(events, (event) => event.message?.type?.toLowerCase());
110112

111-
for (const event of events) {
112-
try {
113-
let tiktokEvent; let response;
113+
const supportedEventTypes = ['record', 'audiencelist'];
114+
const eventTypes = Object.keys(groupedEvents);
115+
const unsupportedEventList = eventTypes.filter(
116+
(eventType) => !supportedEventTypes.includes(eventType),
117+
);
114118

115-
if (isEventSentByVDMV2Flow(event)) {
116-
tiktokEvent = validateAudienceRecordEvent(event);
117-
response = processTiktokAudienceRecord(tiktokEvent);
118-
} else {
119-
tiktokEvent = validateAudienceListEvent(event);
120-
response = processTiktokAudienceList(tiktokEvent);
121-
}
119+
const failedResponses: RouterTransformationResponse[] = [];
120+
const successfulResponses: RouterTransformationResponse[] = [];
122121

123-
successfulResponses.push(
124-
getSuccessRespEvents(response, [tiktokEvent.metadata], tiktokEvent.destination, true),
122+
if (groupedEvents.record) {
123+
const { recordFailedResponses, recordSuccessfulResponses } = processTiktokAudienceRecords(
124+
groupedEvents.record,
125+
);
126+
failedResponses.push(...recordFailedResponses);
127+
successfulResponses.push(...recordSuccessfulResponses);
128+
}
129+
if (groupedEvents.audiencelist) {
130+
for (const event of groupedEvents.audiencelist) {
131+
try {
132+
const tiktokEvent = validateAudienceListEvent(event);
133+
const response = processTiktokAudienceList(tiktokEvent);
134+
successfulResponses.push(
135+
getSuccessRespEvents(response, [tiktokEvent.metadata], tiktokEvent.destination, true),
136+
);
137+
} catch (error) {
138+
failedResponses.push(handleRtTfSingleEventError(event, error, {}));
139+
}
140+
}
141+
}
142+
for (const unsupportedEvent of unsupportedEventList) {
143+
for (const event of groupedEvents[unsupportedEvent]) {
144+
failedResponses.push(
145+
handleRtTfSingleEventError(
146+
event,
147+
new InstrumentationError(`unsupported event found ${unsupportedEvent}`),
148+
{},
149+
),
125150
);
126-
} catch (error) {
127-
failedResponses.push(handleRtTfSingleEventError(event, error, {}));
128151
}
129152
}
130153
return [...failedResponses, ...successfulResponses];
131154
};
132155

133-
export { process, processRouterDest };
156+
export { processRouterDest };

src/v0/util/recordUtils.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
const { InstrumentationError } = require('@rudderstack/integrations-lib');
22
const { generateErrorObject, getErrorRespEvents } = require('./index');
33

4-
const eventTypes = ['update', 'insert', 'delete'];
4+
const EVENT_TYPES = {
5+
INSERT: 'insert',
6+
DELETE: 'delete',
7+
UPDATE: 'update',
8+
};
59

610
function getErrorMetaData(inputs, acceptedOperations) {
711
const metadata = [];
@@ -18,7 +22,7 @@ function getErrorMetaData(inputs, acceptedOperations) {
1822

1923
function getErrorResponse(groupedRecordsByAction) {
2024
const errorMetaData = [];
21-
const errorMetaDataObject = getErrorMetaData(groupedRecordsByAction, eventTypes);
25+
const errorMetaDataObject = getErrorMetaData(groupedRecordsByAction, Object.values(EVENT_TYPES));
2226
if (errorMetaDataObject.length > 0) {
2327
errorMetaData.push(errorMetaDataObject);
2428
}
@@ -52,4 +56,5 @@ function createFinalResponse(deleteResponse, insertResponse, updateResponse, err
5256
module.exports = {
5357
getErrorResponse,
5458
createFinalResponse,
59+
EVENT_TYPES,
5560
};

0 commit comments

Comments
 (0)