diff --git a/.github/workflows/reusable-run-linting-check-and-unit-tests.yml b/.github/workflows/reusable-run-linting-check-and-unit-tests.yml index ac214ec07e..844d687874 100644 --- a/.github/workflows/reusable-run-linting-check-and-unit-tests.yml +++ b/.github/workflows/reusable-run-linting-check-and-unit-tests.yml @@ -51,7 +51,8 @@ jobs: "packages/parser", "packages/parameters", "packages/validation", - "packages/metrics" + "packages/metrics", + "packages/kafka" ] fail-fast: false steps: diff --git a/biome.json b/biome.json index 46c5a0c5d5..7d582f3a49 100644 --- a/biome.json +++ b/biome.json @@ -31,7 +31,9 @@ "lib", "cdk.out", "site", - ".aws-sam" + ".aws-sam", + "**/*.generated.js", + "**/*.generated.d.ts" ] } -} \ No newline at end of file +} diff --git a/package-lock.json b/package-lock.json index c488b5de41..a85f8fe6f5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -23,7 +23,8 @@ "layers", "examples/app", "packages/event-handler", - "packages/validation" + "packages/validation", + "packages/kafka" ], "devDependencies": { "@biomejs/biome": "^1.9.4", @@ -343,14 +344,14 @@ } }, "node_modules/@aws-cdk/toolkit-lib/node_modules/glob": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/glob/-/glob-11.0.2.tgz", - "integrity": "sha512-YT7U7Vye+t5fZ/QMkBFrTJ7ZQxInIUjwyAjVj84CYXqgBdv30MFUPGnBR6sQaVq6Is15wYJUsnzTuWaGRBhBAQ==", + "version": "11.0.3", + "resolved": "https://registry.npmjs.org/glob/-/glob-11.0.3.tgz", + "integrity": "sha512-2Nim7dha1KVkaiF4q6Dj+ngPPMdfvLJEOpZk/jKiUAkqKebpGAWQXAq9z1xu9HKu5lWfqw/FASuccEjyznjPaA==", "license": "ISC", "dependencies": { - "foreground-child": "^3.1.0", - "jackspeak": "^4.0.1", - "minimatch": "^10.0.0", + "foreground-child": "^3.3.1", + "jackspeak": "^4.1.1", + "minimatch": "^10.0.3", "minipass": "^7.1.2", "package-json-from-dist": "^1.0.0", "path-scurry": "^2.0.0" @@ -365,10 +366,25 @@ "url": "https://github.com/sponsors/isaacs" } }, + "node_modules/@aws-cdk/toolkit-lib/node_modules/glob/node_modules/minimatch": { + "version": "10.0.3", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-10.0.3.tgz", + "integrity": "sha512-IPZ167aShDZZUMdRk66cyQAW3qr0WzbHkPdMYa8bzZhlHhO3jALbKdxcaak7W9FfT2rZNpQuUu4Od7ILEpXSaw==", + "license": "ISC", + "dependencies": { + "@isaacs/brace-expansion": "^5.0.0" + }, + "engines": { + "node": "20 || >=22" + }, + "funding": { + "url": "https://github.com/sponsors/isaacs" + } + }, "node_modules/@aws-cdk/toolkit-lib/node_modules/jackspeak": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-4.1.0.tgz", - "integrity": "sha512-9DDdhb5j6cpeitCbvLO7n7J4IxnbM6hoF6O1g4HQ5TfhvvKN8ywDM7668ZhMHRqVmxqhps/F6syWK2KcPxYlkw==", + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-4.1.1.tgz", + "integrity": "sha512-zptv57P3GpL+O0I7VdMJNBZCu+BPHVQUk55Ft8/QCJjTVxrnJHuVuX/0Bl2A6/+2oyR/ZMEuFKwmzqqZ/U5nPQ==", "license": "BlueOak-1.0.0", "dependencies": { "@isaacs/cliui": "^8.0.2" @@ -695,6 +711,10 @@ "resolved": "packages/jmespath", "link": true }, + "node_modules/@aws-lambda-powertools/kafka": { + "resolved": "packages/kafka", + "link": true + }, "node_modules/@aws-lambda-powertools/logger": { "resolved": "packages/logger", "link": true @@ -9968,16 +9988,6 @@ "@aws-sdk/client-s3": "^3.808.0" } }, - "node_modules/@aws-sdk/lib-storage/node_modules/buffer": { - "version": "5.6.0", - "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", - "integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==", - "license": "MIT", - "dependencies": { - "base64-js": "^1.0.2", - "ieee754": "^1.1.4" - } - }, "node_modules/@aws-sdk/lib-storage/node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -11423,6 +11433,27 @@ "node": ">=6.9.0" } }, + "node_modules/@isaacs/balanced-match": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/@isaacs/balanced-match/-/balanced-match-4.0.1.tgz", + "integrity": "sha512-yzMTt9lEb8Gv7zRioUilSglI0c0smZ9k5D65677DLWLtWJaXIS3CqcGyUFByYKlnUj6TkjLVs54fBl6+TiGQDQ==", + "license": "MIT", + "engines": { + "node": "20 || >=22" + } + }, + "node_modules/@isaacs/brace-expansion": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/@isaacs/brace-expansion/-/brace-expansion-5.0.0.tgz", + "integrity": "sha512-ZT55BDLV0yv0RBm2czMiZ+SqCGO7AvmOM3G/w2xhVPH+te0aKgFjmBvGlL1dH+ql2tgGO3MVrbb3jCKyvpgnxA==", + "license": "MIT", + "dependencies": { + "@isaacs/balanced-match": "^4.0.1" + }, + "engines": { + "node": "20 || >=22" + } + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -12354,41 +12385,36 @@ "version": "1.1.2", "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/base64": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@protobufjs/base64/-/base64-1.1.2.tgz", "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/codegen": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/@protobufjs/codegen/-/codegen-2.0.4.tgz", "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/eventemitter": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz", "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/fetch": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz", "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", + "devOptional": true, "license": "BSD-3-Clause", - "optional": true, - "peer": true, "dependencies": { "@protobufjs/aspromise": "^1.1.1", "@protobufjs/inquire": "^1.1.0" @@ -12398,41 +12424,36 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz", "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/inquire": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz", "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/path": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz", "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/pool": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz", "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@protobufjs/utf8": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==", - "license": "BSD-3-Clause", - "optional": true, - "peer": true + "devOptional": true, + "license": "BSD-3-Clause" }, "node_modules/@redis/client": { "version": "5.5.6", @@ -12855,28 +12876,6 @@ "node": "^14.17.0 || ^16.13.0 || >=18.0.0" } }, - "node_modules/@sigstore/sign/node_modules/glob": { - "version": "10.4.1", - "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.1.tgz", - "integrity": "sha512-2jelhlq3E4ho74ZyVLN03oKdAZVUa6UDZzFLVH1H7dnoax+y9qyaq8zBkfDIggjniU19z0wU18y16jMB2eyVIw==", - "dev": true, - "dependencies": { - "foreground-child": "^3.1.0", - "jackspeak": "^3.1.2", - "minimatch": "^9.0.4", - "minipass": "^7.1.2", - "path-scurry": "^1.11.1" - }, - "bin": { - "glob": "dist/esm/bin.mjs" - }, - "engines": { - "node": ">=16 || 14 >=14.18" - }, - "funding": { - "url": "https://github.com/sponsors/isaacs" - } - }, "node_modules/@sigstore/sign/node_modules/http-proxy-agent": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/http-proxy-agent/-/http-proxy-agent-5.0.0.tgz", @@ -12948,22 +12947,6 @@ "node": ">=8" } }, - "node_modules/@sigstore/sign/node_modules/minimatch": { - "version": "9.0.5", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.5.tgz", - "integrity": "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow==", - "dev": true, - "license": "ISC", - "dependencies": { - "brace-expansion": "^2.0.1" - }, - "engines": { - "node": ">=16 || 14 >=14.17" - }, - "funding": { - "url": "https://github.com/sponsors/isaacs" - } - }, "node_modules/@sigstore/sign/node_modules/minipass-collect": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/minipass-collect/-/minipass-collect-1.0.2.tgz", @@ -13859,6 +13842,12 @@ "node": ">=18.0.0" } }, + "node_modules/@standard-schema/spec": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.0.0.tgz", + "integrity": "sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA==", + "license": "MIT" + }, "node_modules/@tootallnate/once": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/@tootallnate/once/-/once-2.0.0.tgz", @@ -14917,6 +14906,16 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/avro-js": { + "version": "1.12.0", + "resolved": "https://registry.npmjs.org/avro-js/-/avro-js-1.12.0.tgz", + "integrity": "sha512-mBhOjtHHua2MHrrgQ71YKKTGfZpS1sPvgL+QcCQ5SkUyp6qLkeTsCnQXUmATfpiOvoXB6CczzFEqn5UKbPUn3Q==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "underscore": "^1.13.2" + } + }, "node_modules/aws-cdk": { "version": "2.1018.1", "resolved": "https://registry.npmjs.org/aws-cdk/-/aws-cdk-2.1018.1.tgz", @@ -15499,27 +15498,13 @@ } }, "node_modules/buffer": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", - "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "dev": true, - "funding": [ - { - "type": "github", - "url": "https://github.com/sponsors/feross" - }, - { - "type": "patreon", - "url": "https://www.patreon.com/feross" - }, - { - "type": "consulting", - "url": "https://feross.org/support" - } - ], + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", + "integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==", + "license": "MIT", "dependencies": { - "base64-js": "^1.3.1", - "ieee754": "^1.1.13" + "base64-js": "^1.0.2", + "ieee754": "^1.1.4" } }, "node_modules/buffer-crc32": { @@ -15685,14 +15670,14 @@ } }, "node_modules/cdk-assets/node_modules/glob": { - "version": "11.0.2", - "resolved": "https://registry.npmjs.org/glob/-/glob-11.0.2.tgz", - "integrity": "sha512-YT7U7Vye+t5fZ/QMkBFrTJ7ZQxInIUjwyAjVj84CYXqgBdv30MFUPGnBR6sQaVq6Is15wYJUsnzTuWaGRBhBAQ==", + "version": "11.0.3", + "resolved": "https://registry.npmjs.org/glob/-/glob-11.0.3.tgz", + "integrity": "sha512-2Nim7dha1KVkaiF4q6Dj+ngPPMdfvLJEOpZk/jKiUAkqKebpGAWQXAq9z1xu9HKu5lWfqw/FASuccEjyznjPaA==", "license": "ISC", "dependencies": { - "foreground-child": "^3.1.0", - "jackspeak": "^4.0.1", - "minimatch": "^10.0.0", + "foreground-child": "^3.3.1", + "jackspeak": "^4.1.1", + "minimatch": "^10.0.3", "minipass": "^7.1.2", "package-json-from-dist": "^1.0.0", "path-scurry": "^2.0.0" @@ -15708,9 +15693,9 @@ } }, "node_modules/cdk-assets/node_modules/jackspeak": { - "version": "4.1.0", - "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-4.1.0.tgz", - "integrity": "sha512-9DDdhb5j6cpeitCbvLO7n7J4IxnbM6hoF6O1g4HQ5TfhvvKN8ywDM7668ZhMHRqVmxqhps/F6syWK2KcPxYlkw==", + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-4.1.1.tgz", + "integrity": "sha512-zptv57P3GpL+O0I7VdMJNBZCu+BPHVQUk55Ft8/QCJjTVxrnJHuVuX/0Bl2A6/+2oyR/ZMEuFKwmzqqZ/U5nPQ==", "license": "BlueOak-1.0.0", "dependencies": { "@isaacs/cliui": "^8.0.2" @@ -15732,12 +15717,12 @@ } }, "node_modules/cdk-assets/node_modules/minimatch": { - "version": "10.0.1", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-10.0.1.tgz", - "integrity": "sha512-ethXTt3SGGR+95gudmqJ1eNhRO7eGEGIgYA9vnPatK4/etz2MEVDno5GMCibdMTuBMyElzIlgxMna3K94XDIDQ==", + "version": "10.0.3", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-10.0.3.tgz", + "integrity": "sha512-IPZ167aShDZZUMdRk66cyQAW3qr0WzbHkPdMYa8bzZhlHhO3jALbKdxcaak7W9FfT2rZNpQuUu4Od7ILEpXSaw==", "license": "ISC", "dependencies": { - "brace-expansion": "^2.0.1" + "@isaacs/brace-expansion": "^5.0.0" }, "engines": { "node": "20 || >=22" @@ -17388,11 +17373,12 @@ } }, "node_modules/foreground-child": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/foreground-child/-/foreground-child-3.1.1.tgz", - "integrity": "sha512-TMKDUnIte6bfb5nWv7V/caI169OHgvwjb7V4WkeUvbQQdjr5rWKqHFiKWb/fcOwB+CzBT+qbWjvj+DVwRskpIg==", + "version": "3.3.1", + "resolved": "https://registry.npmjs.org/foreground-child/-/foreground-child-3.3.1.tgz", + "integrity": "sha512-gIXjKqtFuWEgzFRJA9WCQeSJLZDjgJUOMCMzxtvFq/37KojM1BFGufqsCy0r4qSQmYLsZYMeyRqzIWOMup03sw==", + "license": "ISC", "dependencies": { - "cross-spawn": "^7.0.0", + "cross-spawn": "^7.0.6", "signal-exit": "^4.0.1" }, "engines": { @@ -19640,9 +19626,8 @@ "version": "5.3.2", "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", - "license": "Apache-2.0", - "optional": true, - "peer": true + "devOptional": true, + "license": "Apache-2.0" }, "node_modules/loupe": { "version": "3.1.3", @@ -19854,9 +19839,9 @@ } }, "node_modules/markdownlint-cli2/node_modules/ignore": { - "version": "7.0.4", - "resolved": "https://registry.npmjs.org/ignore/-/ignore-7.0.4.tgz", - "integrity": "sha512-gJzzk+PQNznz8ysRrC0aOkBNVRBDtE1n53IqyqEf3PXrYwomFs5q4pGMizBMJF+ykh03insJ27hB8gSrD2Hn8A==", + "version": "7.0.5", + "resolved": "https://registry.npmjs.org/ignore/-/ignore-7.0.5.tgz", + "integrity": "sha512-Hs59xBNfUIunMFgWAbGX5cq6893IbWg4KnrjbYwX3tx0ztorVgTDA6B2sxf8ejHJ4wz8BqGUMYlnzNBer5NvGg==", "dev": true, "license": "MIT", "engines": { @@ -22583,13 +22568,12 @@ } }, "node_modules/protobufjs": { - "version": "7.5.2", - "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.5.2.tgz", - "integrity": "sha512-f2ls6rpO6G153Cy+o2XQ+Y0sARLOZ17+OGVLHrc3VUKcLHYKEKWbkSujdBWQXM7gKn5NTfp0XnRPZn1MIu8n9w==", + "version": "7.5.3", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.5.3.tgz", + "integrity": "sha512-sildjKwVqOI2kmFDiXQ6aEB0fjYTafpEvIBs8tOR8qI4spuL9OPROLVu2qZqi/xgCfsHIwVqlaF8JBjWFHnKbw==", + "devOptional": true, "hasInstallScript": true, "license": "BSD-3-Clause", - "optional": true, - "peer": true, "dependencies": { "@protobufjs/aspromise": "^1.1.2", "@protobufjs/base64": "^1.1.2", @@ -24663,6 +24647,13 @@ "node": ">=0.8.0" } }, + "node_modules/underscore": { + "version": "1.13.7", + "resolved": "https://registry.npmjs.org/underscore/-/underscore-1.13.7.tgz", + "integrity": "sha512-GMXzWtsc57XAtguZgaQViUOzs0KTkk8ojr3/xAxXLITqf/3EMwxC0inyETfDFjH/Krbhuep0HNbbjI9i/q3F3g==", + "dev": true, + "license": "MIT" + }, "node_modules/undici-types": { "version": "7.8.0", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.8.0.tgz", @@ -25494,6 +25485,36 @@ "@aws-lambda-powertools/commons": "^2.21.0" } }, + "packages/kafka": { + "name": "@aws-lambda-powertools/kafka", + "version": "2.21.0", + "license": "MIT-0", + "dependencies": { + "@aws-lambda-powertools/commons": "2.21.0", + "@standard-schema/spec": "^1.0.0" + }, + "devDependencies": { + "avro-js": "^1.12.0", + "protobufjs": "^7.5.3", + "zod": "^3.25.67" + }, + "peerDependencies": { + "zod": ">=3.24.0", + "valibot": ">=1.0.0", + "arktype": ">=2.0.0" + }, + "peerDependenciesMeta": { + "zod": { + "optional": true + }, + "valibot": { + "optional": true + }, + "arktype": { + "optional": true + } + } + }, "packages/logger": { "name": "@aws-lambda-powertools/logger", "version": "2.21.0", diff --git a/package.json b/package.json index 5edd4217c6..eed4f5fc5b 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,8 @@ "layers", "examples/app", "packages/event-handler", - "packages/validation" + "packages/validation", + "packages/kafka" ], "type": "module", "scripts": { diff --git a/packages/kafka/package.json b/packages/kafka/package.json new file mode 100644 index 0000000000..6660df63be --- /dev/null +++ b/packages/kafka/package.json @@ -0,0 +1,112 @@ +{ + "name": "@aws-lambda-powertools/kafka", + "version": "2.21.0", + "author": { + "name": "Amazon Web Services", + "url": "https://aws.amazon.com" + }, + "publishConfig": { + "access": "public" + }, + "scripts": { + "test": "vitest --run", + "test:unit": "vitest --run", + "test:unit:coverage": "vitest --run tests/unit --coverage.enabled --coverage.thresholds.100 --coverage.include='src/**'", + "test:unit:types": "echo 'Not Implemented'", + "test:e2e:nodejs18x": "echo \"Not implemented\"", + "test:e2e:nodejs20x": "echo \"Not implemented\"", + "test:e2e:nodejs22x": "echo \"Not implemented\"", + "test:e2e": "echo \"Not implemented\"", + "build:cjs": "tsc --build tsconfig.cjs.json && echo '{ \"type\": \"commonjs\" }' > lib/cjs/package.json", + "build:esm": "tsc --build tsconfig.json && echo '{ \"type\": \"module\" }' > lib/esm/package.json", + "build": "npm run build:esm & npm run build:cjs", + "lint": "biome lint .", + "lint:fix": "biome check --write .", + "prepack": "node ../../.github/scripts/release_patch_package_json.js .", + "proto:gen": "npx pbjs -t static-module -w es6 -o $(pwd)/tests/protos/product.es6.generated.js $(pwd)/tests/protos/product.proto && npx pbts -o $(pwd)/tests/protos/product.es6.generated.d.ts $(pwd)/tests/protos/product.es6.generated.js && npx pbjs -t static-module -w commonjs -o $(pwd)/tests/protos/product.cjs.generated.js $(pwd)/tests/protos/product.proto && npx pbts -o $(pwd)/tests/protos/product.cjs.generated.d.ts $(pwd)/tests/protos/product.cjs.generated.js" + }, + "license": "MIT-0", + "homepage": "https://github.com/aws-powertools/powertools-lambda-typescript#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/aws-powertools/powertools-lambda-typescript.git" + }, + "bugs": { + "url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues" + }, + "keywords": [], + "dependencies": { + "@aws-lambda-powertools/commons": "2.21.0", + "@standard-schema/spec": "^1.0.0" + }, + "peerDependencies": { + "zod": ">=3.24.0", + "valibot": ">=1.0.0", + "arktype": ">=2.0.0" + }, + "peerDependenciesMeta": { + "zod": { + "optional": true + }, + "valibot": { + "optional": true + }, + "arktype": { + "optional": true + } + }, + "files": [ + "lib" + ], + "type": "module", + "exports": { + ".": { + "require": { + "types": "./lib/cjs/index.d.ts", + "default": "./lib/cjs/index.js" + }, + "import": { + "types": "./lib/esm/index.d.ts", + "default": "./lib/esm/index.js" + } + }, + "./errors": { + "require": { + "types": "./lib/cjs/errors.d.ts", + "default": "./lib/cjs/errors.js" + }, + "import": { + "types": "./lib/esm/errors.d.ts", + "default": "./lib/esm/errors.js" + } + }, + "./types": { + "require": { + "types": "./lib/cjs/types/types.d.ts", + "default": "./lib/cjs/types/types.js" + }, + "import": { + "types": "./lib/esm/types/types.d.ts", + "default": "./lib/esm/types/types.js" + } + } + }, + "typesVersions": { + "*": { + "errors": [ + "lib/cjs/errors.d.ts", + "lib/esm/errors.d.ts" + ], + "types": [ + "lib/cjs/types/types.d.ts", + "lib/esm/types/types.d.ts" + ] + } + }, + "private": true, + "devDependencies": { + "avro-js": "^1.12.0", + "protobufjs": "^7.5.3", + "zod": "^3.25.67" + } +} diff --git a/packages/kafka/src/constants.ts b/packages/kafka/src/constants.ts new file mode 100644 index 0000000000..230276147b --- /dev/null +++ b/packages/kafka/src/constants.ts @@ -0,0 +1,10 @@ +/** + * Types of Kafka schema formats. + */ +const SchemaType = { + JSON: 'json', + AVRO: 'avro', + PROTOBUF: 'protobuf', +} as const; + +export { SchemaType }; diff --git a/packages/kafka/src/consumer.ts b/packages/kafka/src/consumer.ts new file mode 100644 index 0000000000..6f73b54351 --- /dev/null +++ b/packages/kafka/src/consumer.ts @@ -0,0 +1,227 @@ +import type { AsyncHandler } from '@aws-lambda-powertools/commons/types'; +import { isNull, isRecord } from '@aws-lambda-powertools/commons/typeutils'; +import type { StandardSchemaV1 } from '@standard-schema/spec'; +import type { Context, Handler } from 'aws-lambda'; +import { + KafkaConsumerAvroMissingSchemaError, + KafkaConsumerParserError, + KafkaConsumerProtobufMissingSchemaError, +} from './errors.js'; +import type { + ConsumerRecord, + ConsumerRecords, + Record as KafkaRecord, + MSKEvent, + SchemaConfig, + SchemaConfigValue, +} from './types/types.js'; + +/** + * Type guard to assert that the event is a valid {@link MSKEvent | `MSKEvent`}. + * + * @param event - The event to validate, expected to be an MSKEvent. + */ +const assertIsMSKEvent = (event: unknown): event is MSKEvent => { + if ( + !isRecord(event) || + !isRecord(event.records) || + !Object.values(event.records).every((arr) => Array.isArray(arr)) + ) { + throw new Error( + 'Event is not a valid MSKEvent. Expected an object with a "records" property.' + ); + } + + return true; +}; + +/** + * Deserialize Kafka message headers from an array of header objects. + * + * It returns `null` if the headers are `null`, or an array of header objects + * where each header value is decoded as a UTF-8 string. + * + * @param headers - An array of header objects, where each object maps header keys (string) + * to header values (`number[]`), representing the raw bytes of each header value - + * i.e. `[{ "headerKey": [104, 101, 108, 108, 111] }]` + */ +const deserializeHeaders = (headers: Record[] | null) => { + if (headers === null) { + return null; + } + const result = []; + for (const header of headers) { + const entries = []; + for (const [headerKey, headerValue] of Object.entries(header)) { + entries.push([headerKey, Buffer.from(headerValue).toString('utf-8')]); + } + result.push(Object.fromEntries(entries)); + } + return result; +}; + +/** + * Deserialize a base64-encoded value using the provided schema configuration. + * + * It returns the deserialized value, which may be a string, object, or other type depending on the schema type. + * + * @param value - The base64-encoded string to deserialize. + * @param config - The schema configuration to use for deserialization. See {@link SchemaConfigValue | `SchemaConfigValue`}. + * If not provided, the value is decoded as a UTF-8 string. + */ +const deserialize = async (value: string, config?: SchemaConfigValue) => { + // no config -> default to base64 decoding + if (config === undefined) { + return Buffer.from(value, 'base64').toString(); + } + + // if config is provided, we expect it to have a specific type + if (!['json', 'avro', 'protobuf'].includes(config.type)) { + throw new Error( + `Unsupported deserialization type: ${config.type}. Supported types are: json, avro, protobuf.` + ); + } + + if (config.type === 'json') { + const deserializer = await import('./deserializer/json.js'); + return deserializer.deserialize(value); + } + + if (config.type === 'avro') { + if (!config.schema) { + throw new KafkaConsumerAvroMissingSchemaError( + 'Schema string is required for Avro deserialization' + ); + } + const deserializer = await import('./deserializer/avro.js'); + return deserializer.deserialize(value, config.schema); + } + if (config.type === 'protobuf') { + if (!config.schema) { + throw new KafkaConsumerProtobufMissingSchemaError( + 'Schema string is required for Protobuf deserialization' + ); + } + const deserializer = await import('./deserializer/protobuf.js'); + return deserializer.deserialize(value, config.schema); + } +}; + +/** + * Deserialize the key of a Kafka record. + * + * If the key is `undefined`, it returns `undefined`. + * + * @param key - The base64-encoded key to deserialize. + * @param config - The schema configuration for deserializing the key. See {@link SchemaConfigValue | `SchemaConfigValue`}. + */ +const deserializeKey = async (key?: string, config?: SchemaConfigValue) => { + if (key === undefined || key === '') { + return undefined; + } + if (isNull(key)) return null; + return await deserialize(key, config); +}; + +const parseSchema = async (value: unknown, schema: StandardSchemaV1) => { + let result = schema['~standard'].validate(value); + /* v8 ignore start */ + if (result instanceof Promise) result = await result; + /* v8 ignore stop */ + if (result.issues) { + throw new KafkaConsumerParserError( + `Schema validation failed ${result.issues}` + ); + } + return result.value; +}; + +/** + * Deserialize a single record from an MSK event. + * + * @param record - A single record from the MSK event. + * @param config - The schema configuration for deserializing the record's key and value. + */ +const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => { + const { key, value, headers, ...rest } = record; + const { key: keyConfig, value: valueConfig } = config; + + const deserializedKey = await deserializeKey(key, keyConfig); + const deserializedValue = await deserialize(value, valueConfig); + + return { + ...rest, + key: keyConfig?.parserSchema + ? await parseSchema(deserializedKey, keyConfig.parserSchema) + : deserializedKey, + value: valueConfig?.parserSchema + ? await parseSchema(deserializedValue, valueConfig.parserSchema) + : deserializedValue, + originalKey: key, + originalValue: value, + headers: deserializeHeaders(headers), + originalHeaders: headers, + }; +}; + +/** + * Wrap a handler function to automatically deserialize and validate Kafka records from an MSK event. + * + * The returned function will: + * - Deserialize the key and value of each record using the provided schema config. + * - Validate the deserialized key and value using Zod schemas if provided. + * - Replace the `records` property in the event with an array of deserialized and validated records. + * - Call the original handler with the modified event and original context/arguments. + * + * @example + * ```ts + * import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; + * import { z } from 'zod'; + * + * const keySchema = z.string(); + * const valueSchema = z.object({ + * id: z.number(), + * }); + * + * export const handler = kafkaConsumer, z.infer>(async (event, context) => { + * // event.records is now an array of deserialized and validated records + * for (const record of event.records) { + * console.log(record.key, record.value); + * } + * }, { + * key: { type: 'json', parserSchema: keySchema }, + * value: { type: 'json', parserSchema: valueSchema }, + * }); + * ``` + * + * @typeParam K - Optional type of the deserialized key - defaults to `unknown`. + * @typeParam V - Optional type of the deserialized value - defaults to `unknown`. + * + * @param handler - The original handler function to wrap. It should accept the deserialized event as its first argument. + * @param config - The schema configuration for deserializing and validating record keys and values. + */ +const kafkaConsumer = ( + handler: AsyncHandler>>, + config: SchemaConfig +): ((event: MSKEvent, context: Context) => Promise) => { + return async (event: MSKEvent, context: Context): Promise => { + assertIsMSKEvent(event); + + const consumerRecords: ConsumerRecord[] = []; + for (const recordsArray of Object.values(event.records)) { + for (const record of recordsArray) { + consumerRecords.push(await deserializeRecord(record, config)); + } + } + + return handler( + { + ...event, + records: consumerRecords, + }, + context + ); + }; +}; + +export { kafkaConsumer }; diff --git a/packages/kafka/src/deserializer/avro.ts b/packages/kafka/src/deserializer/avro.ts new file mode 100644 index 0000000000..91d8721464 --- /dev/null +++ b/packages/kafka/src/deserializer/avro.ts @@ -0,0 +1,20 @@ +import avro from 'avro-js'; +import { KafkaConsumerDeserializationError } from '../errors.js'; + +/** + * Deserialize an Avro message from a base64-encoded string using the provided Avro schema. + * + * @param data - The base64-encoded string representing the Avro binary data. + * @param schema - The Avro schema as a JSON string. + */ +export const deserialize = async (data: string, schema: string) => { + try { + const type = avro.parse(schema); + const buffer = Buffer.from(data, 'base64'); + return type.fromBuffer(buffer); + } catch (error) { + throw new KafkaConsumerDeserializationError( + `Failed to deserialize Avro message: ${error}, message: ${data}, schema: ${schema}` + ); + } +}; diff --git a/packages/kafka/src/deserializer/json.ts b/packages/kafka/src/deserializer/json.ts new file mode 100644 index 0000000000..30e937bff1 --- /dev/null +++ b/packages/kafka/src/deserializer/json.ts @@ -0,0 +1,19 @@ +/** + * Deserializes a base64 encoded string into either a JSON object or plain string + * @param data - The base64 encoded string to deserialize + * @returns The deserialized data as either a JSON object or string + */ +export const deserialize = async (data: string) => { + // Decode the base64 string to a buffer + const decoded = Buffer.from(data, 'base64'); + try { + // Attempt to parse the decoded data as JSON + // we assume it's a JSON but it can also be a string, we don't know + return JSON.parse(decoded.toString()); + } catch (error) { + // If JSON parsing fails, log the error and return the decoded string + // in case we could not parse it we return the base64 decoded value + console.error(`Failed to parse JSON from base64 value: ${data}`, error); + return decoded.toString(); + } +}; diff --git a/packages/kafka/src/deserializer/protobuf.ts b/packages/kafka/src/deserializer/protobuf.ts new file mode 100644 index 0000000000..f6c323e5f8 --- /dev/null +++ b/packages/kafka/src/deserializer/protobuf.ts @@ -0,0 +1,26 @@ +import { KafkaConsumerDeserializationError } from '../errors.js'; +import type { ProtobufMessage } from '../types/types.js'; + +/** + * Deserialises a Protobuf message from a base64-encoded string. + * + * @template T - The type of the deserialised message object. + * @param MessageClass - The Protobuf message type definition. + * See {@link MessageType} from '@protobuf-ts/runtime'. + * @param data - The base64-encoded string representing the Protobuf binary data. + * @returns The deserialised message object of type T. + * @throws {KafkaConsumerDeserializationError} If deserialization fails. + */ +export const deserialize = ( + data: string, + messageType: ProtobufMessage +): T => { + try { + const buffer = Buffer.from(data, 'base64'); + return messageType.decode(buffer, buffer.length); + } catch (error) { + throw new KafkaConsumerDeserializationError( + `Failed to deserialize Protobuf message: ${error}, message: ${data}, messageType: ${messageType}` + ); + } +}; diff --git a/packages/kafka/src/errors.ts b/packages/kafka/src/errors.ts new file mode 100644 index 0000000000..5eac573f17 --- /dev/null +++ b/packages/kafka/src/errors.ts @@ -0,0 +1,55 @@ +/** + * Base error class for Kafka consumer-related errors. + * All Kafka consumer errors should extend this class. + */ +class KafkaConsumerError extends Error { + constructor(message: string) { + super(message); + this.name = 'KafkaConsumerError'; + } +} + +/** + * Error thrown when a required Protobuf schema is missing during Kafka message consumption. + */ +class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError { + constructor(message: string) { + super(message); + this.name = 'KafkaConsumerProtobufMissingSchemaError'; + } +} + +/** + * Error thrown when deserialization of a Kafka message fails. + */ +class KafkaConsumerDeserializationError extends KafkaConsumerError { + constructor(message: string) { + super(message); + this.name = 'KafkaConsumerDeserializationError'; + } +} + +/** + * Error thrown when a required Avro schema is missing during Kafka message consumption. + */ +class KafkaConsumerAvroMissingSchemaError extends KafkaConsumerError { + constructor(message: string) { + super(message); + this.name = 'KafkaConsumerAvroMissingSchemaError'; + } +} + +class KafkaConsumerParserError extends KafkaConsumerError { + constructor(message: string) { + super(message); + this.name = 'KafkaConsumerParserError'; + } +} + +export { + KafkaConsumerError, + KafkaConsumerAvroMissingSchemaError, + KafkaConsumerDeserializationError, + KafkaConsumerProtobufMissingSchemaError, + KafkaConsumerParserError, +}; diff --git a/packages/kafka/src/index.ts b/packages/kafka/src/index.ts new file mode 100644 index 0000000000..30205b1aef --- /dev/null +++ b/packages/kafka/src/index.ts @@ -0,0 +1,2 @@ +export { SchemaType } from './constants.js'; +export { kafkaConsumer } from './consumer.js'; diff --git a/packages/kafka/src/types/avro-js.d.ts b/packages/kafka/src/types/avro-js.d.ts new file mode 100644 index 0000000000..ed6d21fcbb --- /dev/null +++ b/packages/kafka/src/types/avro-js.d.ts @@ -0,0 +1,32 @@ +/** + * Minimal TypeScript declaration for the avro-js library, which does not have its own type definitions. + */ +declare module 'avro-js' { + /** + * Interface for the parsed Avro type + */ + interface AvroType { + /** + * Deserialize an Avro message from a binary buffer + * + * @param buffer - Binary buffer containing Avro-encoded data + * @returns The deserialized object + */ + fromBuffer(buffer: Buffer): unknown; + } + + /** + * Parse an Avro schema from a JSON string + * + * @param schema - Avro schema as a JSON string + * @returns A parsed Avro type that can be used to serialize and deserialize data + */ + function parse(schema: string): AvroType; + + // Export the default object with the parse method + const avro: { + parse: typeof parse; + }; + + export default avro; +} diff --git a/packages/kafka/src/types/types.ts b/packages/kafka/src/types/types.ts new file mode 100644 index 0000000000..8ea00b8cf6 --- /dev/null +++ b/packages/kafka/src/types/types.ts @@ -0,0 +1,216 @@ +import type { StandardSchemaV1 } from '@standard-schema/spec'; +import type { Reader } from 'protobufjs'; +import type { SchemaType as SchemaTypeMap } from '../constants.js'; + +/** + * Represents a single Kafka consumer record with generic key and value types. + */ +type ConsumerRecord = { + /** + * The deserialized key of the record + */ + key: K | undefined; + /** + * The deserialized value of the record + */ + value: V; + /** + * The original (raw, encoded) key as received from Kafka, or undefined if not present + */ + originalKey: string | undefined; + /** + * The original (raw, encoded) value as received from Kafka, or undefined if not present + */ + originalValue: string; + /** + * Optional array of headers as key-value string pairs, or null/undefined if not present + */ + headers?: { [k: string]: string }[] | null; + /** + * Optional array of original record headers + */ + originalHeaders?: RecordHeader[] | null; +}; + +/** + * Represents a collection of Kafka consumer records, along with MSK event metadata. + */ +type ConsumerRecords = { + /** + * Array of consumer records + */ + records: Array>; +} & Omit; + +/** + * Union type for supported schema types (JSON, Avro, Protobuf). + */ +type SchemaType = (typeof SchemaTypeMap)[keyof typeof SchemaTypeMap]; + +/** + * Union type for supported schema configurations (JSON, Avro, Protobuf). + */ +type SchemaConfigValue = JsonConfig | AvroConfig | ProtobufConfig; + +/** + * Configuration for JSON schema validation. + */ +type JsonConfig = { + /** + * Indicates the schema type is JSON + */ + type: typeof SchemaTypeMap.JSON; + /** + * Optional Zod schema for runtime validation + */ + parserSchema?: StandardSchemaV1; +}; + +/** + * Configuration for Avro schema validation. + */ +type AvroConfig = { + /** + * Indicates the schema type is Avro + */ + type: typeof SchemaTypeMap.AVRO; + /** + * Avro schema definition as a string + */ + schema: string; + /** + * Optional Zod schema for runtime validation + */ + parserSchema?: StandardSchemaV1; +}; +/** + * Configuration for Protobuf schema validation. + */ +type ProtobufConfig = { + /** + * Indicates the schema type is Protobuf + */ + type: typeof SchemaTypeMap.PROTOBUF; + /** + * Protobuf message type for decoding + */ + schema: ProtobufMessage; + /** + * Optional Zod schema for runtime validation + */ + parserSchema?: StandardSchemaV1; +}; + +/** + * Configuration for key and value schema validation. + */ +type SchemaConfig = { + /** + * Schema type for the value. + * If not provided, the value will not be validated. + */ + value: SchemaConfigValue; + /** + * Schema type for the key. + * If not provided, the key will not be validated. + */ + key?: SchemaConfigValue; +}; + +/** + * Represents a Kafka record header as a mapping of header key to byte array. + */ +interface RecordHeader { + /** + * Header key mapped to its value as an array of bytes + */ + [headerKey: string]: number[]; +} + +/** + * Represents a single Kafka record as received from MSK. + */ +interface Record { + /** + * Kafka topic name + */ + topic: string; + /** + * Partition number within the topic + */ + partition: number; + /** + * Offset of the record within the partition + */ + offset: number; + /** + * Timestamp of the record + */ + timestamp: number; + /** + * Type of timestamp (creation or log append time) + */ + timestampType: 'CREATE_TIME' | 'LOG_APPEND_TIME'; + /** + * Base64-encoded key of the record + */ + key: string; + /** + * Base64-encoded value of the record + */ + value: string; + /** + * Array of record headers + */ + headers: RecordHeader[]; +} + +// https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html +/** + * AWS Lambda event structure for MSK (Managed Streaming for Kafka). + * See: https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html + */ +interface MSKEvent { + /** + * Event source identifier (always 'aws:kafka') + */ + eventSource: 'aws:kafka'; + /** + * ARN of the Kafka event source + */ + eventSourceArn: string; + /** + * Comma-separated list of Kafka bootstrap servers + */ + bootstrapServers: string; + /** + * Mapping of topic names to arrays of records + */ + records: { + [topic: string]: Record[]; + }; +} + +interface ProtobufMessage { + decode(reader: Reader | Uint8Array, length?: number): T; +} + +interface Deserializer { + deserialize( + input: string, + schema: string | ProtobufMessage + ): unknown; +} + +export type { + ConsumerRecord, + ConsumerRecords, + Deserializer, + MSKEvent, + ProtobufMessage, + Record, + RecordHeader, + SchemaType, + SchemaConfig, + SchemaConfigValue, +}; diff --git a/packages/kafka/tests/events/avro.json b/packages/kafka/tests/events/avro.json new file mode 100644 index 0000000000..d495060d7c --- /dev/null +++ b/packages/kafka/tests/events/avro.json @@ -0,0 +1,51 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "NDI=", + "value": "0g8MTGFwdG9wUrgehes/j0A=", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 16, + "timestamp": 1545084650988, + "timestampType": "CREATE_TIME", + "key": "NDI=", + "value": "1A8UU21hcnRwaG9uZVK4HoXrv4JA", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 17, + "timestamp": 1545084650989, + "timestampType": "CREATE_TIME", + "key": null, + "value": "1g8USGVhZHBob25lc0jhehSuv2JA", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + } + ] + } +} \ No newline at end of file diff --git a/packages/kafka/tests/events/default.json b/packages/kafka/tests/events/default.json new file mode 100644 index 0000000000..0f9aa2a80e --- /dev/null +++ b/packages/kafka/tests/events/default.json @@ -0,0 +1,50 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "cmVjb3JkS2V5", + "value": "ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 16, + "timestamp": 1545084650988, + "timestampType": "CREATE_TIME", + "value": "ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 17, + "timestamp": 1545084650989, + "timestampType": "CREATE_TIME", + "key": null, + "value": "ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + } + ] + } +} diff --git a/packages/kafka/tests/events/protobuf.json b/packages/kafka/tests/events/protobuf.json new file mode 100644 index 0000000000..e65096d9e5 --- /dev/null +++ b/packages/kafka/tests/events/protobuf.json @@ -0,0 +1,51 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4", + "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "mytopic-0": [ + { + "topic": "mytopic", + "partition": 0, + "offset": 15, + "timestamp": 1545084650987, + "timestampType": "CREATE_TIME", + "key": "NDI=", + "value": "COkHEgZMYXB0b3AZUrgehes/j0A=", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 16, + "timestamp": 1545084650988, + "timestampType": "CREATE_TIME", + "key": "NDI=", + "value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + }, + { + "topic": "mytopic", + "partition": 0, + "offset": 17, + "timestamp": 1545084650989, + "timestampType": "CREATE_TIME", + "key": null, + "value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA", + "headers": [ + { + "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] + } + ] + } + ] + } +} \ No newline at end of file diff --git a/packages/kafka/tests/protos/product.cjs.generated.d.ts b/packages/kafka/tests/protos/product.cjs.generated.d.ts new file mode 100644 index 0000000000..ba2ec57ae6 --- /dev/null +++ b/packages/kafka/tests/protos/product.cjs.generated.d.ts @@ -0,0 +1,110 @@ +import * as $protobuf from "protobufjs"; +import Long = require("long"); +/** Properties of a Product. */ +export interface IProduct { + + /** Product id */ + id?: (number|null); + + /** Product name */ + name?: (string|null); + + /** Product price */ + price?: (number|null); +} + +/** Represents a Product. */ +export class Product implements IProduct { + + /** + * Constructs a new Product. + * @param [properties] Properties to set + */ + constructor(properties?: IProduct); + + /** Product id. */ + public id: number; + + /** Product name. */ + public name: string; + + /** Product price. */ + public price: number; + + /** + * Creates a new Product instance using the specified properties. + * @param [properties] Properties to set + * @returns Product instance + */ + public static create(properties?: IProduct): Product; + + /** + * Encodes the specified Product message. Does not implicitly {@link Product.verify|verify} messages. + * @param message Product message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encode(message: IProduct, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Encodes the specified Product message, length delimited. Does not implicitly {@link Product.verify|verify} messages. + * @param message Product message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encodeDelimited(message: IProduct, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a Product message from the specified reader or buffer. + * @param reader Reader or buffer to decode from + * @param [length] Message length if known beforehand + * @returns Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(reader: ($protobuf.Reader|Uint8Array), length?: number): Product; + + /** + * Decodes a Product message from the specified reader or buffer, length delimited. + * @param reader Reader or buffer to decode from + * @returns Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): Product; + + /** + * Verifies a Product message. + * @param message Plain object to verify + * @returns `null` if valid, otherwise the reason why it is not + */ + public static verify(message: { [k: string]: any }): (string|null); + + /** + * Creates a Product message from a plain object. Also converts values to their respective internal types. + * @param object Plain object + * @returns Product + */ + public static fromObject(object: { [k: string]: any }): Product; + + /** + * Creates a plain object from a Product message. Also converts values to other types if specified. + * @param message Product + * @param [options] Conversion options + * @returns Plain object + */ + public static toObject(message: Product, options?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this Product to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + + /** + * Gets the default type url for Product + * @param [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns The default type url + */ + public static getTypeUrl(typeUrlPrefix?: string): string; +} diff --git a/packages/kafka/tests/protos/product.cjs.generated.js b/packages/kafka/tests/protos/product.cjs.generated.js new file mode 100644 index 0000000000..7ff5fb2c91 --- /dev/null +++ b/packages/kafka/tests/protos/product.cjs.generated.js @@ -0,0 +1,264 @@ +/*eslint-disable block-scoped-var, id-length, no-control-regex, no-magic-numbers, no-prototype-builtins, no-redeclare, no-shadow, no-var, sort-vars*/ +"use strict"; + +var $protobuf = require("protobufjs/minimal"); + +// Common aliases +var $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +var $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); + +$root.Product = (function() { + + /** + * Properties of a Product. + * @exports IProduct + * @interface IProduct + * @property {number|null} [id] Product id + * @property {string|null} [name] Product name + * @property {number|null} [price] Product price + */ + + /** + * Constructs a new Product. + * @exports Product + * @classdesc Represents a Product. + * @implements IProduct + * @constructor + * @param {IProduct=} [properties] Properties to set + */ + function Product(properties) { + if (properties) + for (var keys = Object.keys(properties), i = 0; i < keys.length; ++i) + if (properties[keys[i]] != null) + this[keys[i]] = properties[keys[i]]; + } + + /** + * Product id. + * @member {number} id + * @memberof Product + * @instance + */ + Product.prototype.id = 0; + + /** + * Product name. + * @member {string} name + * @memberof Product + * @instance + */ + Product.prototype.name = ""; + + /** + * Product price. + * @member {number} price + * @memberof Product + * @instance + */ + Product.prototype.price = 0; + + /** + * Creates a new Product instance using the specified properties. + * @function create + * @memberof Product + * @static + * @param {IProduct=} [properties] Properties to set + * @returns {Product} Product instance + */ + Product.create = function create(properties) { + return new Product(properties); + }; + + /** + * Encodes the specified Product message. Does not implicitly {@link Product.verify|verify} messages. + * @function encode + * @memberof Product + * @static + * @param {IProduct} message Product message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + Product.encode = function encode(message, writer) { + if (!writer) + writer = $Writer.create(); + if (message.id != null && Object.hasOwnProperty.call(message, "id")) + writer.uint32(/* id 1, wireType 0 =*/8).int32(message.id); + if (message.name != null && Object.hasOwnProperty.call(message, "name")) + writer.uint32(/* id 2, wireType 2 =*/18).string(message.name); + if (message.price != null && Object.hasOwnProperty.call(message, "price")) + writer.uint32(/* id 3, wireType 1 =*/25).double(message.price); + return writer; + }; + + /** + * Encodes the specified Product message, length delimited. Does not implicitly {@link Product.verify|verify} messages. + * @function encodeDelimited + * @memberof Product + * @static + * @param {IProduct} message Product message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + Product.encodeDelimited = function encodeDelimited(message, writer) { + return this.encode(message, writer).ldelim(); + }; + + /** + * Decodes a Product message from the specified reader or buffer. + * @function decode + * @memberof Product + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @param {number} [length] Message length if known beforehand + * @returns {Product} Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + Product.decode = function decode(reader, length, error) { + if (!(reader instanceof $Reader)) + reader = $Reader.create(reader); + var end = length === undefined ? reader.len : reader.pos + length, message = new $root.Product(); + while (reader.pos < end) { + var tag = reader.uint32(); + if (tag === error) + break; + switch (tag >>> 3) { + case 1: { + message.id = reader.int32(); + break; + } + case 2: { + message.name = reader.string(); + break; + } + case 3: { + message.price = reader.double(); + break; + } + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }; + + /** + * Decodes a Product message from the specified reader or buffer, length delimited. + * @function decodeDelimited + * @memberof Product + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @returns {Product} Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + Product.decodeDelimited = function decodeDelimited(reader) { + if (!(reader instanceof $Reader)) + reader = new $Reader(reader); + return this.decode(reader, reader.uint32()); + }; + + /** + * Verifies a Product message. + * @function verify + * @memberof Product + * @static + * @param {Object.} message Plain object to verify + * @returns {string|null} `null` if valid, otherwise the reason why it is not + */ + Product.verify = function verify(message) { + if (typeof message !== "object" || message === null) + return "object expected"; + if (message.id != null && message.hasOwnProperty("id")) + if (!$util.isInteger(message.id)) + return "id: integer expected"; + if (message.name != null && message.hasOwnProperty("name")) + if (!$util.isString(message.name)) + return "name: string expected"; + if (message.price != null && message.hasOwnProperty("price")) + if (typeof message.price !== "number") + return "price: number expected"; + return null; + }; + + /** + * Creates a Product message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof Product + * @static + * @param {Object.} object Plain object + * @returns {Product} Product + */ + Product.fromObject = function fromObject(object) { + if (object instanceof $root.Product) + return object; + var message = new $root.Product(); + if (object.id != null) + message.id = object.id | 0; + if (object.name != null) + message.name = String(object.name); + if (object.price != null) + message.price = Number(object.price); + return message; + }; + + /** + * Creates a plain object from a Product message. Also converts values to other types if specified. + * @function toObject + * @memberof Product + * @static + * @param {Product} message Product + * @param {$protobuf.IConversionOptions} [options] Conversion options + * @returns {Object.} Plain object + */ + Product.toObject = function toObject(message, options) { + if (!options) + options = {}; + var object = {}; + if (options.defaults) { + object.id = 0; + object.name = ""; + object.price = 0; + } + if (message.id != null && message.hasOwnProperty("id")) + object.id = message.id; + if (message.name != null && message.hasOwnProperty("name")) + object.name = message.name; + if (message.price != null && message.hasOwnProperty("price")) + object.price = options.json && !isFinite(message.price) ? String(message.price) : message.price; + return object; + }; + + /** + * Converts this Product to JSON. + * @function toJSON + * @memberof Product + * @instance + * @returns {Object.} JSON object + */ + Product.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * Gets the default type url for Product + * @function getTypeUrl + * @memberof Product + * @static + * @param {string} [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns {string} The default type url + */ + Product.getTypeUrl = function getTypeUrl(typeUrlPrefix) { + if (typeUrlPrefix === undefined) { + typeUrlPrefix = "type.googleapis.com"; + } + return typeUrlPrefix + "/Product"; + }; + + return Product; +})(); + +module.exports = $root; diff --git a/packages/kafka/tests/protos/product.es6.generated.d.ts b/packages/kafka/tests/protos/product.es6.generated.d.ts new file mode 100644 index 0000000000..ba2ec57ae6 --- /dev/null +++ b/packages/kafka/tests/protos/product.es6.generated.d.ts @@ -0,0 +1,110 @@ +import * as $protobuf from "protobufjs"; +import Long = require("long"); +/** Properties of a Product. */ +export interface IProduct { + + /** Product id */ + id?: (number|null); + + /** Product name */ + name?: (string|null); + + /** Product price */ + price?: (number|null); +} + +/** Represents a Product. */ +export class Product implements IProduct { + + /** + * Constructs a new Product. + * @param [properties] Properties to set + */ + constructor(properties?: IProduct); + + /** Product id. */ + public id: number; + + /** Product name. */ + public name: string; + + /** Product price. */ + public price: number; + + /** + * Creates a new Product instance using the specified properties. + * @param [properties] Properties to set + * @returns Product instance + */ + public static create(properties?: IProduct): Product; + + /** + * Encodes the specified Product message. Does not implicitly {@link Product.verify|verify} messages. + * @param message Product message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encode(message: IProduct, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Encodes the specified Product message, length delimited. Does not implicitly {@link Product.verify|verify} messages. + * @param message Product message or plain object to encode + * @param [writer] Writer to encode to + * @returns Writer + */ + public static encodeDelimited(message: IProduct, writer?: $protobuf.Writer): $protobuf.Writer; + + /** + * Decodes a Product message from the specified reader or buffer. + * @param reader Reader or buffer to decode from + * @param [length] Message length if known beforehand + * @returns Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decode(reader: ($protobuf.Reader|Uint8Array), length?: number): Product; + + /** + * Decodes a Product message from the specified reader or buffer, length delimited. + * @param reader Reader or buffer to decode from + * @returns Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): Product; + + /** + * Verifies a Product message. + * @param message Plain object to verify + * @returns `null` if valid, otherwise the reason why it is not + */ + public static verify(message: { [k: string]: any }): (string|null); + + /** + * Creates a Product message from a plain object. Also converts values to their respective internal types. + * @param object Plain object + * @returns Product + */ + public static fromObject(object: { [k: string]: any }): Product; + + /** + * Creates a plain object from a Product message. Also converts values to other types if specified. + * @param message Product + * @param [options] Conversion options + * @returns Plain object + */ + public static toObject(message: Product, options?: $protobuf.IConversionOptions): { [k: string]: any }; + + /** + * Converts this Product to JSON. + * @returns JSON object + */ + public toJSON(): { [k: string]: any }; + + /** + * Gets the default type url for Product + * @param [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns The default type url + */ + public static getTypeUrl(typeUrlPrefix?: string): string; +} diff --git a/packages/kafka/tests/protos/product.es6.generated.js b/packages/kafka/tests/protos/product.es6.generated.js new file mode 100644 index 0000000000..c1127bc488 --- /dev/null +++ b/packages/kafka/tests/protos/product.es6.generated.js @@ -0,0 +1,262 @@ +/*eslint-disable block-scoped-var, id-length, no-control-regex, no-magic-numbers, no-prototype-builtins, no-redeclare, no-shadow, no-var, sort-vars*/ +import * as $protobuf from "protobufjs/minimal"; + +// Common aliases +const $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util; + +// Exported root namespace +const $root = $protobuf.roots["default"] || ($protobuf.roots["default"] = {}); + +export const Product = $root.Product = (() => { + + /** + * Properties of a Product. + * @exports IProduct + * @interface IProduct + * @property {number|null} [id] Product id + * @property {string|null} [name] Product name + * @property {number|null} [price] Product price + */ + + /** + * Constructs a new Product. + * @exports Product + * @classdesc Represents a Product. + * @implements IProduct + * @constructor + * @param {IProduct=} [properties] Properties to set + */ + function Product(properties) { + if (properties) + for (let keys = Object.keys(properties), i = 0; i < keys.length; ++i) + if (properties[keys[i]] != null) + this[keys[i]] = properties[keys[i]]; + } + + /** + * Product id. + * @member {number} id + * @memberof Product + * @instance + */ + Product.prototype.id = 0; + + /** + * Product name. + * @member {string} name + * @memberof Product + * @instance + */ + Product.prototype.name = ""; + + /** + * Product price. + * @member {number} price + * @memberof Product + * @instance + */ + Product.prototype.price = 0; + + /** + * Creates a new Product instance using the specified properties. + * @function create + * @memberof Product + * @static + * @param {IProduct=} [properties] Properties to set + * @returns {Product} Product instance + */ + Product.create = function create(properties) { + return new Product(properties); + }; + + /** + * Encodes the specified Product message. Does not implicitly {@link Product.verify|verify} messages. + * @function encode + * @memberof Product + * @static + * @param {IProduct} message Product message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + Product.encode = function encode(message, writer) { + if (!writer) + writer = $Writer.create(); + if (message.id != null && Object.hasOwnProperty.call(message, "id")) + writer.uint32(/* id 1, wireType 0 =*/8).int32(message.id); + if (message.name != null && Object.hasOwnProperty.call(message, "name")) + writer.uint32(/* id 2, wireType 2 =*/18).string(message.name); + if (message.price != null && Object.hasOwnProperty.call(message, "price")) + writer.uint32(/* id 3, wireType 1 =*/25).double(message.price); + return writer; + }; + + /** + * Encodes the specified Product message, length delimited. Does not implicitly {@link Product.verify|verify} messages. + * @function encodeDelimited + * @memberof Product + * @static + * @param {IProduct} message Product message or plain object to encode + * @param {$protobuf.Writer} [writer] Writer to encode to + * @returns {$protobuf.Writer} Writer + */ + Product.encodeDelimited = function encodeDelimited(message, writer) { + return this.encode(message, writer).ldelim(); + }; + + /** + * Decodes a Product message from the specified reader or buffer. + * @function decode + * @memberof Product + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @param {number} [length] Message length if known beforehand + * @returns {Product} Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + Product.decode = function decode(reader, length, error) { + if (!(reader instanceof $Reader)) + reader = $Reader.create(reader); + let end = length === undefined ? reader.len : reader.pos + length, message = new $root.Product(); + while (reader.pos < end) { + let tag = reader.uint32(); + if (tag === error) + break; + switch (tag >>> 3) { + case 1: { + message.id = reader.int32(); + break; + } + case 2: { + message.name = reader.string(); + break; + } + case 3: { + message.price = reader.double(); + break; + } + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }; + + /** + * Decodes a Product message from the specified reader or buffer, length delimited. + * @function decodeDelimited + * @memberof Product + * @static + * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from + * @returns {Product} Product + * @throws {Error} If the payload is not a reader or valid buffer + * @throws {$protobuf.util.ProtocolError} If required fields are missing + */ + Product.decodeDelimited = function decodeDelimited(reader) { + if (!(reader instanceof $Reader)) + reader = new $Reader(reader); + return this.decode(reader, reader.uint32()); + }; + + /** + * Verifies a Product message. + * @function verify + * @memberof Product + * @static + * @param {Object.} message Plain object to verify + * @returns {string|null} `null` if valid, otherwise the reason why it is not + */ + Product.verify = function verify(message) { + if (typeof message !== "object" || message === null) + return "object expected"; + if (message.id != null && message.hasOwnProperty("id")) + if (!$util.isInteger(message.id)) + return "id: integer expected"; + if (message.name != null && message.hasOwnProperty("name")) + if (!$util.isString(message.name)) + return "name: string expected"; + if (message.price != null && message.hasOwnProperty("price")) + if (typeof message.price !== "number") + return "price: number expected"; + return null; + }; + + /** + * Creates a Product message from a plain object. Also converts values to their respective internal types. + * @function fromObject + * @memberof Product + * @static + * @param {Object.} object Plain object + * @returns {Product} Product + */ + Product.fromObject = function fromObject(object) { + if (object instanceof $root.Product) + return object; + let message = new $root.Product(); + if (object.id != null) + message.id = object.id | 0; + if (object.name != null) + message.name = String(object.name); + if (object.price != null) + message.price = Number(object.price); + return message; + }; + + /** + * Creates a plain object from a Product message. Also converts values to other types if specified. + * @function toObject + * @memberof Product + * @static + * @param {Product} message Product + * @param {$protobuf.IConversionOptions} [options] Conversion options + * @returns {Object.} Plain object + */ + Product.toObject = function toObject(message, options) { + if (!options) + options = {}; + let object = {}; + if (options.defaults) { + object.id = 0; + object.name = ""; + object.price = 0; + } + if (message.id != null && message.hasOwnProperty("id")) + object.id = message.id; + if (message.name != null && message.hasOwnProperty("name")) + object.name = message.name; + if (message.price != null && message.hasOwnProperty("price")) + object.price = options.json && !isFinite(message.price) ? String(message.price) : message.price; + return object; + }; + + /** + * Converts this Product to JSON. + * @function toJSON + * @memberof Product + * @instance + * @returns {Object.} JSON object + */ + Product.prototype.toJSON = function toJSON() { + return this.constructor.toObject(this, $protobuf.util.toJSONOptions); + }; + + /** + * Gets the default type url for Product + * @function getTypeUrl + * @memberof Product + * @static + * @param {string} [typeUrlPrefix] your custom typeUrlPrefix(default "type.googleapis.com") + * @returns {string} The default type url + */ + Product.getTypeUrl = function getTypeUrl(typeUrlPrefix) { + if (typeUrlPrefix === undefined) { + typeUrlPrefix = "type.googleapis.com"; + } + return typeUrlPrefix + "/Product"; + }; + + return Product; +})(); + +export { $root as default }; diff --git a/packages/kafka/tests/protos/product.proto b/packages/kafka/tests/protos/product.proto new file mode 100644 index 0000000000..b9f815a118 --- /dev/null +++ b/packages/kafka/tests/protos/product.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +message Product { + int32 id = 1; + string name = 2; + double price = 3; +} \ No newline at end of file diff --git a/packages/kafka/tests/tsconfig.json b/packages/kafka/tests/tsconfig.json new file mode 100644 index 0000000000..6b38e082b1 --- /dev/null +++ b/packages/kafka/tests/tsconfig.json @@ -0,0 +1,13 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "rootDir": "../../", + "noEmit": true, + "resolveJsonModule": true + }, + "include": [ + "../../testing/src/setupEnv.ts", + "../src/**/*", + "./**/*" + ] +} \ No newline at end of file diff --git a/packages/kafka/tests/unit/consumer.test.ts b/packages/kafka/tests/unit/consumer.test.ts new file mode 100644 index 0000000000..04b6caf201 --- /dev/null +++ b/packages/kafka/tests/unit/consumer.test.ts @@ -0,0 +1,345 @@ +import { readFileSync } from 'node:fs'; +import { join } from 'node:path'; +import type { Context } from 'aws-lambda'; +import { describe, expect, it } from 'vitest'; +import { z } from 'zod'; +import { + KafkaConsumerAvroMissingSchemaError, + KafkaConsumerParserError, + KafkaConsumerProtobufMissingSchemaError, +} from '../../src/errors.js'; +import { SchemaType, kafkaConsumer } from '../../src/index.js'; +import type { ConsumerRecords, MSKEvent } from '../../src/types/types.js'; +import { Product as ProductProto } from '../protos/product.es6.generated.js'; + +describe('Kafka consumer', () => { + // Common test setup + const keyZodSchema = z.string(); + const valueZodSchema = z.object({ + id: z.number(), + name: z.string(), + price: z.number().positive({ + message: "Price can't be negative", + }), + }); + + type Key = z.infer; + type Product = z.infer; + type SerializationType = 'json' | 'avro' | 'protobuf'; + + const jsonTestEvent = JSON.parse( + readFileSync(join(__dirname, '..', 'events', 'default.json'), 'utf-8') + ) as unknown as MSKEvent; + const avroTestEvent = JSON.parse( + readFileSync(join(__dirname, '..', 'events', 'avro.json'), 'utf-8') + ) as unknown as MSKEvent; + const protobufTestEvent = JSON.parse( + readFileSync(join(__dirname, '..', 'events', 'protobuf.json'), 'utf-8') + ) as unknown as MSKEvent; + const context = {} as Context; + const baseHandler = async ( + event: ConsumerRecords, + _context: Context + ) => event; + + // Test data constants + const TEST_DATA = { + json: { + key: 'recordKey', + value: { id: 12345, name: 'product5', price: 45 }, + originalKey: 'cmVjb3JkS2V5', + originalValue: + 'ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9', + }, + avro: { + data: { + key: 42, + value: { id: 1001, name: 'Laptop', price: 999.99 }, + originalKey: 'NDI=', + originalValue: '0g8MTGFwdG9wUrgehes/j0A=', + }, + schema: `{ + "type": "record", + "name": "Product", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": "string" }, + { "name": "price", "type": "double" } + ] + }`, + }, + protobuf: { + data: { + key: 42, + value: { id: 1001, name: 'Laptop', price: 999.99 }, + originalKey: 'NDI=', + originalValue: 'COkHEgZMYXB0b3AZUrgehes/j0A=', + }, + schema: ProductProto, + }, + headers: { + withHeaders: { + headers: [{ headerKey: 'headerValue' }], + originalHeaders: [ + { headerKey: [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] }, + ], + }, + withoutHeaders: { + headers: null, + originalHeaders: null, + }, + }, + otherFields: { + topic: 'mytopic', + partition: 0, + offset: 15, + timestamp: 1545084650987, + timestampType: 'CREATE_TIME', + }, + } as const; + + it('deserializes json message', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { type: 'json' }, + key: { type: 'json' }, + }); + + // Act + const result = (await handler(jsonTestEvent, context)) as ConsumerRecords; + + // Assess + expect(result.records[0]).toEqual({ + ...TEST_DATA.json, + ...TEST_DATA.headers.withHeaders, + ...TEST_DATA.otherFields, + }); + }); + + it('deserializes avro message', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { + type: 'avro', + schema: TEST_DATA.avro.schema, + }, + key: { type: 'json' }, + }); + + // Act + const result = (await handler(avroTestEvent, context)) as ConsumerRecords< + unknown, + Product + >; + + // Assess + expect(result.records[0]).toEqual({ + ...TEST_DATA.avro.data, + ...TEST_DATA.headers.withHeaders, + ...TEST_DATA.otherFields, + }); + }); + + it('deserializes protobuf message', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { + type: 'protobuf', + schema: TEST_DATA.protobuf.schema, + }, + key: { type: 'json' }, + }); + + // Act + const event = (await handler( + protobufTestEvent, + context + )) as ConsumerRecords; + + // Assess + expect(event.records[0]).toEqual({ + ...TEST_DATA.protobuf.data, + ...TEST_DATA.headers.withHeaders, + ...TEST_DATA.otherFields, + }); + }); + + it.each([ + { + type: 'avro' as Extract, + event: avroTestEvent, + error: KafkaConsumerAvroMissingSchemaError, + }, + { + type: 'protobuf' as Extract, + event: protobufTestEvent, + error: KafkaConsumerProtobufMissingSchemaError, + }, + ])( + 'throws when schemaStr not passed for $type event', + async ({ type, error, event }) => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + // @ts-expect-error - testing missing schemaStr + value: { type }, + }); + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow(error); + } + ); + + it('throws if schema type is not json, avro or protobuf', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { + // @ts-expect-error - testing unsupported type + type: 'xml', + }, + }); + + // Act & Assess + await expect(handler(jsonTestEvent, context)).rejects.toThrow(); + }); + + it('deserializes with no headers provided', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { type: 'json' }, + }); + const jsonTestEventWithoutHeaders = { + ...jsonTestEvent, + records: { + 'test-topic': [ + { + key: TEST_DATA.json.originalKey, + value: TEST_DATA.json.originalValue, + headers: null, + }, + ], + }, + } as unknown as MSKEvent; + + // Act + const result = (await handler( + jsonTestEventWithoutHeaders, + context + )) as ConsumerRecords; + + // Assess + expect(result.records[0]).toEqual({ + ...TEST_DATA.json, + ...TEST_DATA.headers.withoutHeaders, + }); + }); + + it.each([ + { + type: 'key', + event: { + ...jsonTestEvent, + records: { + 'test-topic': [ + { + key: 'eyJpZCI6NDIsIm5hbWUiOiJpbnZhbGlkUHJvZHVjdCIsInByaWNlIjotMTAwfQ==', + value: TEST_DATA.json.originalValue, + headers: null, + }, + ], + }, + } as unknown as MSKEvent, + }, + { + type: 'value', + event: { + ...jsonTestEvent, + records: { + 'test-topic': [ + { + key: TEST_DATA.json.originalKey, + value: + 'eyJpZCI6NDIsIm5hbWUiOiJpbnZhbGlkUHJvZHVjdCIsInByaWNlIjotMTAwfQ==', + headers: null, + }, + ], + }, + } as unknown as MSKEvent, + }, + ])('throws when zod schema validation fails for $type', async ({ event }) => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { + type: SchemaType.JSON, + parserSchema: valueZodSchema, + }, + key: { + type: SchemaType.JSON, + parserSchema: keyZodSchema, + }, + }); + + // Act & Assess + await expect(handler(event, context)).rejects.toThrow( + KafkaConsumerParserError + ); + }); + + it('throws when non MSK event passed kafka consumer', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { type: 'json' }, + }); + + // Act & Assess + await expect(handler({} as MSKEvent, context)).rejects.toThrow( + 'Event is not a valid MSKEvent. Expected an object with a "records" property.' + ); + }); + + it.each([ + { + type: 'key parserSchema but no value parserSchema', + config: { + key: { + type: SchemaType.JSON, + parserSchema: keyZodSchema, + }, + value: { type: SchemaType.JSON }, + }, + }, + { + type: 'value parserSchema but no key parserSchema', + config: { + key: { type: SchemaType.JSON }, + value: { + type: SchemaType.JSON, + parserSchema: valueZodSchema, + }, + }, + }, + ])('deserializes with $type', async ({ config }) => { + // Prepare + const handler = kafkaConsumer(baseHandler, config); + const customEvent = { + ...jsonTestEvent, + records: { + 'test-topic': [ + { + key: TEST_DATA.json.originalKey, + value: TEST_DATA.json.originalValue, + headers: null, + }, + ], + }, + } as unknown as MSKEvent; + + // Act + const result = (await handler(customEvent, context)) as ConsumerRecords; + + // Assess + expect(result.records[0]).toEqual({ + ...TEST_DATA.json, + ...TEST_DATA.headers.withoutHeaders, + }); + }); +}); diff --git a/packages/kafka/tests/unit/deserializer.avro.test.ts b/packages/kafka/tests/unit/deserializer.avro.test.ts new file mode 100644 index 0000000000..a0dd6486e8 --- /dev/null +++ b/packages/kafka/tests/unit/deserializer.avro.test.ts @@ -0,0 +1,60 @@ +import { describe, expect, it } from 'vitest'; +import { deserialize } from '../../src/deserializer/avro.js'; +import { KafkaConsumerDeserializationError } from '../../src/errors.js'; + +describe('Avro Deserializer: ', () => { + it('returns avro deserialised value', async () => { + // Prepare + const message = '0g8MTGFwdG9wUrgehes/j0A='; + const schema = `{ + "type": "record", + "name": "Product", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": "string" }, + { "name": "price", "type": "double" } + ] + }`; + const expected = { id: 1001, name: 'Laptop', price: 999.99 }; + + // Act & Assess + expect(await deserialize(message, schema)).toEqual(expected); + }); + + it('throws when avro deserialiser fails', async () => { + // Prepare + const message = '0g8MTGFwdG9wUrgehes/j0A='; + const schema = `{ + "type": "record", + "name": "Product", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": "string" }, + ] + }`; // Invalid schema, missing "price" field + + // Act & Assess + await expect(deserialize(message, schema)).rejects.toThrow( + KafkaConsumerDeserializationError + ); + }); + + it('throws when avro deserialiser has not matching schema', async () => { + // Prepare + const message = '0g8MTGFwdG9wUrgehes/j0A='; + const schema = `{ + "type": "record", + "name": "Product", + "fields": [ + { "name": "productId", "type": "int" }, + { "name": "productName", "type": "string" }, + { "name": "productPrice", "type": "double" }, + ] + }`; // Valid schema, but does not match the message content + + // Act & Assess + await expect(deserialize(message, schema)).rejects.toThrow( + KafkaConsumerDeserializationError + ); + }); +}); diff --git a/packages/kafka/tests/unit/deserializer.protobuf.test.ts b/packages/kafka/tests/unit/deserializer.protobuf.test.ts new file mode 100644 index 0000000000..6e38d622b4 --- /dev/null +++ b/packages/kafka/tests/unit/deserializer.protobuf.test.ts @@ -0,0 +1,28 @@ +import type { Message } from 'protobufjs'; +import { describe, expect, it } from 'vitest'; +import { deserialize } from '../../src/deserializer/protobuf.js'; +import { KafkaConsumerDeserializationError } from '../../src/errors.js'; +import type { ProtobufMessage } from '../../src/types/types.js'; +import { Product } from '../protos/product.es6.generated.js'; + +describe('Protobuf deserialiser: ', () => { + it('throws when protobuf serialise fails', () => { + // Prepare + const data = 'COkHEgZMYXB0b3AZUrgehes/j0A='; + const invalidType = {} as ProtobufMessage; + + // Act & Assess + expect(() => deserialize(data, invalidType)).toThrow( + KafkaConsumerDeserializationError + ); + }); + + it('returns protobuf deserialised value', () => { + // Prepare + const data = 'COkHEgZMYXB0b3AZUrgehes/j0A='; + const expected = { id: 1001, name: 'Laptop', price: 999.99 }; + + // Act & Assess + expect(deserialize(data, Product)).toEqual(expected); + }); +}); diff --git a/packages/kafka/tsconfig.cjs.json b/packages/kafka/tsconfig.cjs.json new file mode 100644 index 0000000000..7d570c8dbe --- /dev/null +++ b/packages/kafka/tsconfig.cjs.json @@ -0,0 +1,13 @@ +{ + "extends": "../../tsconfig.cjs.json", + "compilerOptions": { + "composite": true, + "declaration": true, + "outDir": "./lib/cjs/", + "rootDir": "./src", + "tsBuildInfoFile": ".tsbuildinfo/cjs.json" + }, + "include": [ + "./src/**/*" + ] +} \ No newline at end of file diff --git a/packages/kafka/tsconfig.json b/packages/kafka/tsconfig.json new file mode 100644 index 0000000000..204ff253d4 --- /dev/null +++ b/packages/kafka/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "baseUrl": ".", + "outDir": "./lib/esm", + "rootDir": "./src", + "tsBuildInfoFile": ".tsbuildinfo/esm.json", + "composite": true, + "declaration": true + }, + "include": [ + "./src/**/*" + ] +} \ No newline at end of file diff --git a/packages/kafka/vitest.config.ts b/packages/kafka/vitest.config.ts new file mode 100644 index 0000000000..9f1196ef1f --- /dev/null +++ b/packages/kafka/vitest.config.ts @@ -0,0 +1,8 @@ +import { defineProject } from 'vitest/config'; + +export default defineProject({ + test: { + environment: 'node', + setupFiles: ['../testing/src/setupEnv.ts'], + }, +});