Skip to content

Commit b9b3411

Browse files
authored
chore: vdm record v2 changes (#5033)
🔒 Scanned for secrets using gitleaks 8.29.1 ## What are the changes introduced in this PR? - Adding support for vdm record v2 changes. ## What is the related Linear task? - Resolves INT-5736
1 parent 2ee3bec commit b9b3411

File tree

10 files changed

+3234
-2387
lines changed

10 files changed

+3234
-2387
lines changed

src/v0/destinations/tiktok_audience/config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1+
import { EVENT_TYPES } from '../../util/recordUtils';
2+
13
export const ACTION_MAP: Record<string, string> = {
24
add: 'add',
35
remove: 'delete',
46
};
57

8+
export const ACTION_RECORD_MAP: Record<string, string> = {
9+
[EVENT_TYPES.INSERT]: 'add',
10+
[EVENT_TYPES.UPDATE]: 'add',
11+
[EVENT_TYPES.DELETE]: 'delete',
12+
};
13+
614
export const SHA256_TRAITS = ['IDFA_SHA256', 'AAID_SHA256', 'EMAIL_SHA256', 'PHONE_SHA256'];
715

816
export const BASE_URL = 'https://business-api.tiktok.com/open_api/v1.3';
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import md5 from 'md5';
2+
import { hashToSha256, InstrumentationError, formatZodError } from '@rudderstack/integrations-lib';
3+
import type {
4+
TiktokAudienceRecordRequest,
5+
IdentifiersPayload,
6+
Identifier,
7+
SegmentMappingPayload,
8+
ProcessTiktokAudienceRecordsResponse,
9+
} from './recordTypes';
10+
import { TiktokAudienceRecordRouterRequestSchema } from './recordTypes';
11+
import { SHA256_TRAITS, ENDPOINT, ENDPOINT_PATH, ACTION_RECORD_MAP } from './config';
12+
import { defaultRequestConfig, getSuccessRespEvents, handleRtTfSingleEventError } from '../../util';
13+
14+
function prepareIdentifiersPayload(event: TiktokAudienceRecordRequest): IdentifiersPayload {
15+
const { message, connection, destination } = event;
16+
const { isHashRequired, audienceId } = connection.config.destination;
17+
const { advertiserId } = destination.Config;
18+
const { action, identifiers } = message;
19+
20+
const hashIdentifier = (fieldName: string, value: string) => {
21+
if (isHashRequired) {
22+
if (SHA256_TRAITS.includes(fieldName)) {
23+
return hashToSha256(value);
24+
}
25+
return md5(value);
26+
}
27+
return value;
28+
};
29+
30+
const identifiersList: Identifier[] = [];
31+
for (const [fieldName, value] of Object.entries(identifiers)) {
32+
if (value) {
33+
identifiersList.push({
34+
id: hashIdentifier(fieldName, value),
35+
audience_ids: [audienceId],
36+
});
37+
}
38+
}
39+
40+
const payload: IdentifiersPayload = {
41+
event,
42+
batchIdentifiers: identifiersList,
43+
idSchema: Object.keys(identifiers).sort(),
44+
advertiserId,
45+
action: ACTION_RECORD_MAP[action],
46+
};
47+
return payload;
48+
}
49+
50+
function prepareSegmentMappingRequest(
51+
payload: SegmentMappingPayload,
52+
event: TiktokAudienceRecordRequest,
53+
) {
54+
const accessToken = event.metadata?.secret?.accessToken;
55+
const userId = event.message?.userId;
56+
57+
const response = defaultRequestConfig();
58+
response.body.JSON = payload;
59+
response.userId = userId;
60+
response.endpoint = ENDPOINT;
61+
response.endpointPath = ENDPOINT_PATH;
62+
response.headers = {
63+
'Access-Token': accessToken,
64+
'Content-Type': 'application/json',
65+
};
66+
return response;
67+
}
68+
69+
function validateAudienceRecordEvent(event: unknown): TiktokAudienceRecordRequest {
70+
const result = TiktokAudienceRecordRouterRequestSchema.safeParse(event);
71+
if (!result.success) {
72+
throw new InstrumentationError(formatZodError(result.error));
73+
}
74+
return result.data;
75+
}
76+
77+
function processTiktokAudienceRecords(events: unknown[]): ProcessTiktokAudienceRecordsResponse {
78+
const recordResponse: ProcessTiktokAudienceRecordsResponse = {
79+
failedResponses: [],
80+
successfulResponses: [],
81+
};
82+
const groupedPayloads: {
83+
advertiserId: string;
84+
action: string;
85+
idSchema: string[];
86+
payloads: IdentifiersPayload[];
87+
}[] = [];
88+
89+
for (const event of events) {
90+
try {
91+
const recordEvent = validateAudienceRecordEvent(event);
92+
const identifiersPayload = prepareIdentifiersPayload(recordEvent);
93+
94+
const existingGroup = groupedPayloads.find(
95+
(group) =>
96+
group.advertiserId === identifiersPayload.advertiserId &&
97+
group.action === identifiersPayload.action &&
98+
group.idSchema.length === identifiersPayload.idSchema.length &&
99+
group.idSchema.every((field, index) => field === identifiersPayload.idSchema[index]),
100+
);
101+
102+
if (existingGroup) {
103+
existingGroup.payloads.push(identifiersPayload);
104+
} else {
105+
groupedPayloads.push({
106+
advertiserId: identifiersPayload.advertiserId,
107+
action: identifiersPayload.action,
108+
idSchema: identifiersPayload.idSchema,
109+
payloads: [identifiersPayload],
110+
});
111+
}
112+
} catch (error) {
113+
recordResponse.failedResponses.push(handleRtTfSingleEventError(event, error, {}));
114+
}
115+
}
116+
117+
for (const group of groupedPayloads) {
118+
try {
119+
const batchIdentifiers = group.payloads.map((payload) => payload.batchIdentifiers);
120+
const metadataList = group.payloads.map((payload) => payload.event.metadata);
121+
122+
const payload: SegmentMappingPayload = {
123+
batch_data: batchIdentifiers,
124+
id_schema: group.idSchema,
125+
advertiser_ids: [group.advertiserId],
126+
action: group.action,
127+
};
128+
const response = prepareSegmentMappingRequest(payload, group.payloads[0].event);
129+
130+
recordResponse.successfulResponses.push(
131+
getSuccessRespEvents(response, metadataList, group.payloads[0].event.destination, true),
132+
);
133+
} catch (error) {
134+
recordResponse.failedResponses.push(
135+
...group.payloads.map((payload) => handleRtTfSingleEventError(payload.event, error, {})),
136+
);
137+
}
138+
}
139+
return recordResponse;
140+
}
141+
142+
export { processTiktokAudienceRecords };
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { z } from 'zod';
2+
import { RouterTransformationResponse } from '../../../types';
3+
4+
const TiktokAudienceDestinationSchema = z
5+
.object({
6+
Config: z
7+
.object({
8+
advertiserId: z.string(),
9+
})
10+
.passthrough(),
11+
})
12+
.passthrough();
13+
14+
const TiktokAudienceConnectionSchema = z
15+
.object({
16+
config: z
17+
.object({
18+
destination: z
19+
.object({
20+
isHashRequired: z.boolean(),
21+
audienceId: z.string(),
22+
})
23+
.passthrough(),
24+
})
25+
.passthrough(),
26+
})
27+
.passthrough();
28+
29+
const TiktokAudienceMessageSchema = z
30+
.object({
31+
type: z.enum(['record'], {
32+
required_error: 'message Type is not present. Aborting message.',
33+
}),
34+
action: z.enum(['insert', 'delete', 'update'], {
35+
required_error: 'action is not present. Aborting message.',
36+
}),
37+
userId: z.string().optional(),
38+
identifiers: z.record(z.string(), z.string().nullable()),
39+
fields: z.record(z.string(), z.string().nullable()),
40+
})
41+
.passthrough();
42+
43+
const TiktokAudienceMetadataSchema = z
44+
.object({
45+
secret: z
46+
.object({
47+
accessToken: z.string(),
48+
})
49+
.passthrough(),
50+
})
51+
.passthrough();
52+
53+
export const TiktokAudienceRecordRouterRequestSchema = z
54+
.object({
55+
message: TiktokAudienceMessageSchema,
56+
destination: TiktokAudienceDestinationSchema,
57+
connection: TiktokAudienceConnectionSchema,
58+
metadata: TiktokAudienceMetadataSchema,
59+
})
60+
.passthrough();
61+
62+
export type ProcessTiktokAudienceRecordsResponse = {
63+
failedResponses: RouterTransformationResponse[];
64+
successfulResponses: RouterTransformationResponse[];
65+
};
66+
67+
export type Identifier = {
68+
id: string;
69+
audience_ids: string[];
70+
};
71+
72+
export type IdentifiersPayload = {
73+
event: TiktokAudienceRecordRequest;
74+
batchIdentifiers: Identifier[];
75+
idSchema: string[];
76+
advertiserId: string;
77+
action: string;
78+
};
79+
80+
export type SegmentMappingPayload = {
81+
batch_data: Identifier[][];
82+
id_schema: string[];
83+
advertiser_ids: string[];
84+
action: string;
85+
};
86+
87+
export type TiktokAudienceRecordRequest = z.infer<typeof TiktokAudienceRecordRouterRequestSchema>;

src/v0/destinations/tiktok_audience/transform.ts

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
import md5 from 'md5';
2-
import { hashToSha256, InstrumentationError, formatZodError } from '@rudderstack/integrations-lib';
2+
import {
3+
hashToSha256,
4+
InstrumentationError,
5+
formatZodError,
6+
groupByInBatches,
7+
} from '@rudderstack/integrations-lib';
38
import type { RouterTransformationResponse } from '../../../types';
4-
import type { TiktokAudienceRequest } from './types';
5-
import { TiktokAudienceRouterRequestSchema } from './types';
9+
import type { TiktokAudienceListRequest } from './types';
10+
import { TiktokAudienceListRouterRequestSchema } from './types';
611
import { SHA256_TRAITS, ACTION_MAP, ENDPOINT, ENDPOINT_PATH } from './config';
712
import {
813
defaultRequestConfig,
914
getDestinationExternalIDInfoForRetl,
1015
getSuccessRespEvents,
1116
handleRtTfSingleEventError,
1217
} from '../../util';
18+
import { processTiktokAudienceRecords } from './recordTransform';
19+
import { ProcessTiktokAudienceRecordsResponse, TiktokAudienceRecordRequest } from './recordTypes';
1320

14-
function prepareIdentifiersList(event: TiktokAudienceRequest) {
21+
function prepareIdentifiersList(event: TiktokAudienceListRequest) {
1522
const { message, destination, metadata } = event;
1623
const { isHashRequired } = destination.Config;
1724

@@ -57,7 +64,7 @@ function prepareIdentifiersList(event: TiktokAudienceRequest) {
5764

5865
function buildResponseForProcessTransformation(
5966
identifiersList: any[],
60-
event: TiktokAudienceRequest,
67+
event: TiktokAudienceListRequest,
6168
) {
6269
const accessToken = event.metadata?.secret?.accessToken;
6370
const anonymousId = event.message?.anonymousId;
@@ -80,41 +87,70 @@ function buildResponseForProcessTransformation(
8087
return responses;
8188
}
8289

83-
function validateEvent(event: unknown) {
84-
const result = TiktokAudienceRouterRequestSchema.safeParse(event);
90+
function validateAudienceListEvent(event: unknown) {
91+
const result = TiktokAudienceListRouterRequestSchema.safeParse(event);
8592
if (!result.success) {
8693
throw new InstrumentationError(formatZodError(result.error));
8794
}
8895
return result.data;
8996
}
9097

91-
function processTiktokAudience(event: TiktokAudienceRequest) {
98+
function processTiktokAudienceList(event: TiktokAudienceListRequest) {
9299
const identifierLists = prepareIdentifiersList(event);
93100
return buildResponseForProcessTransformation(identifierLists, event);
94101
}
95102

96-
function process(event: unknown) {
97-
return processTiktokAudience(validateEvent(event));
98-
}
99-
100-
const processRouterDest = async (events: unknown[]): Promise<RouterTransformationResponse[]> => {
103+
const processRouterDest = async (
104+
events: (TiktokAudienceListRequest | TiktokAudienceRecordRequest)[],
105+
): Promise<RouterTransformationResponse[]> => {
101106
if (!events || events.length === 0) return [];
102107

103-
const successfulResponses: RouterTransformationResponse[] = [];
108+
const groupedEvents = await groupByInBatches<
109+
TiktokAudienceListRequest | TiktokAudienceRecordRequest,
110+
string
111+
>(events, (event) => event.message?.type?.toLowerCase());
112+
113+
const supportedEventTypes = ['record', 'audiencelist'];
114+
const eventTypes = Object.keys(groupedEvents);
115+
const unsupportedEventList = eventTypes.filter(
116+
(eventType) => !supportedEventTypes.includes(eventType),
117+
);
118+
104119
const failedResponses: RouterTransformationResponse[] = [];
120+
const successfulResponses: RouterTransformationResponse[] = [];
105121

106-
for (const event of events) {
107-
try {
108-
const tiktokEvent = validateEvent(event);
109-
const response = processTiktokAudience(tiktokEvent);
110-
successfulResponses.push(
111-
getSuccessRespEvents(response, [tiktokEvent.metadata], tiktokEvent.destination, true),
122+
if (groupedEvents.record) {
123+
const response: ProcessTiktokAudienceRecordsResponse = processTiktokAudienceRecords(
124+
groupedEvents.record,
125+
);
126+
failedResponses.push(...response.failedResponses);
127+
successfulResponses.push(...response.successfulResponses);
128+
}
129+
if (groupedEvents.audiencelist) {
130+
for (const event of groupedEvents.audiencelist) {
131+
try {
132+
const tiktokEvent = validateAudienceListEvent(event);
133+
const response = processTiktokAudienceList(tiktokEvent);
134+
successfulResponses.push(
135+
getSuccessRespEvents(response, [tiktokEvent.metadata], tiktokEvent.destination, true),
136+
);
137+
} catch (error) {
138+
failedResponses.push(handleRtTfSingleEventError(event, error, {}));
139+
}
140+
}
141+
}
142+
for (const unsupportedEvent of unsupportedEventList) {
143+
for (const event of groupedEvents[unsupportedEvent]) {
144+
failedResponses.push(
145+
handleRtTfSingleEventError(
146+
event,
147+
new InstrumentationError(`unsupported event found ${unsupportedEvent}`),
148+
{},
149+
),
112150
);
113-
} catch (error) {
114-
failedResponses.push(handleRtTfSingleEventError(event, error, {}));
115151
}
116152
}
117153
return [...failedResponses, ...successfulResponses];
118154
};
119155

120-
export { process, processRouterDest };
156+
export { processRouterDest };

0 commit comments

Comments
 (0)