|
1 |
| -import { gunzipSync } from 'node:zlib'; |
2 | 1 | import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64';
|
| 2 | +import { Base64Encoded } from 'src/helpers/index.js'; |
3 | 3 | import { z } from 'zod';
|
4 | 4 | import type { KinesisDataStreamEvent } from '../types/schema.js';
|
5 | 5 | import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';
|
6 | 6 |
|
7 |
| -const decoder = new TextDecoder(); |
8 |
| - |
9 | 7 | const KinesisDataStreamRecordPayload = z.object({
|
10 | 8 | kinesisSchemaVersion: z.string(),
|
11 | 9 | partitionKey: z.string(),
|
12 | 10 | sequenceNumber: z.string(),
|
13 | 11 | approximateArrivalTimestamp: z.number(),
|
14 |
| - data: z.string().transform((data) => { |
15 |
| - const decompressed = decompress(data); |
16 |
| - const decoded = decoder.decode(fromBase64(data, 'base64')); |
17 |
| - try { |
18 |
| - // If data was not compressed, try to parse it as JSON otherwise it must be string |
19 |
| - return decompressed === data ? JSON.parse(decoded) : decompressed; |
20 |
| - } catch { |
21 |
| - return decoded; |
22 |
| - } |
23 |
| - }), |
| 12 | + data: Base64Encoded(z.any()), |
24 | 13 | });
|
25 | 14 |
|
26 |
| -const decompress = (data: string): string => { |
27 |
| - try { |
28 |
| - return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); |
29 |
| - } catch { |
30 |
| - return data; |
31 |
| - } |
32 |
| -}; |
33 |
| - |
34 | 15 | const KinesisDataStreamRecord = z.object({
|
35 | 16 | eventSource: z.literal('aws:kinesis'),
|
36 | 17 | eventVersion: z.string(),
|
|
0 commit comments