diff --git a/.gitignore b/.gitignore index 1e7a1de7..8382554c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ packages/api-main/data .env dist dist/ +data data/* .vscode/ .idea/ diff --git a/docker-compose.yml b/docker-compose.yml index dec349f5..be766748 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -79,6 +79,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Post" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -94,6 +99,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Reply" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -109,6 +119,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Like" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -124,6 +139,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Dislike" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -139,6 +159,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Follow" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -153,6 +178,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Unfollow" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -167,6 +197,11 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Flag" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -181,6 +216,27 @@ services: environment: RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" RABBITMQ_EXCHANGE: "dither" + RABBITMQ_DLX_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_QUEUE: "dither.Remove" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" + API_ROOT: "http://api-main:3001/v1" + restart: always + command: ["pnpm", "start"] + dead-letter-service: + container_name: "dead-letter-service" + build: ./packages/dead-letter-service + depends_on: + rabbitmq: + condition: service_healthy + environment: + RABBITMQ_ENDPOINT: "amqp://guest:guest@rabbitmq:5672" + RABBITMQ_EXCHANGE: "dither-dlx" + RABBITMQ_DLX_EXCHANGE: "dither" + RABBITMQ_DLX_QUEUE: "dlx-queue" + RABBITMQ_LOG_EXCHANGE: "dither-log" + RABBITMQ_LOG_QUEUE: "log-queue" API_ROOT: "http://api-main:3001/v1" restart: always command: ["pnpm", "start"] @@ -225,4 +281,4 @@ volumes: rabbitmq-lib: driver: local rabbitmq-log: - driver: local + driver: local \ No newline at end of file diff --git a/packages/dead-letter-service/Dockerfile b/packages/dead-letter-service/Dockerfile new file mode 100644 index 00000000..483c39e3 --- /dev/null +++ b/packages/dead-letter-service/Dockerfile @@ -0,0 +1,14 @@ +FROM node:23-slim + +WORKDIR /app + +RUN apt-get update && rm -rf /var/lib/apt/lists/* +RUN corepack enable + +COPY package.json pnpm-lock.yaml ./ +COPY src ./src +COPY tsconfig.json ./tsconfig.json + +RUN pnpm install --no-cache + +CMD ["pnpm", "start"] \ No newline at end of file diff --git a/packages/dead-letter-service/package.json b/packages/dead-letter-service/package.json new file mode 100644 index 00000000..2c67eeb9 --- /dev/null +++ b/packages/dead-letter-service/package.json @@ -0,0 +1,30 @@ +{ + "name": "dead-letter-service", + "version": "1.0.0", + "description": "", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "license": "GnoGPL", + "publishConfig": { + "access": "public" + }, + "files": [ + "dist" + ], + "scripts": { + "start": "tsx ./src/index.ts", + "build": "tsc" + }, + "author": "Stuyk", + "devDependencies": { + "@atomone/event-consumer": "^1.0.12", + "@types/node": "^22.13.10", + "typescript": "^5.8.2", + "vitest": "^3.0.9" + }, + "dependencies": { + "@types/amqplib": "^0.10.7", + "amqplib": "^0.10.7", + "dead-letter-service": "link:" + } +} \ No newline at end of file diff --git a/packages/dead-letter-service/pnpm-lock.yaml b/packages/dead-letter-service/pnpm-lock.yaml new file mode 100644 index 00000000..8c53dca3 --- /dev/null +++ b/packages/dead-letter-service/pnpm-lock.yaml @@ -0,0 +1,454 @@ +lockfileVersion: '9.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +importers: + + .: + dependencies: + '@atomone/temporastate': + specifier: ^2.0.0 + version: 2.0.0 + '@elysiajs/node': + specifier: ^1.2.6 + version: 1.2.6(elysia@1.2.25(@sinclair/typebox@0.34.31)(typescript@5.8.2))(formidable@3.5.2)(ws@8.18.1) + '@types/node': + specifier: ^22.13.10 + version: 22.13.10 + elysia: + specifier: ^1.2.25 + version: 1.2.25(@sinclair/typebox@0.34.31)(typescript@5.8.2) + tsx: + specifier: ^4.19.3 + version: 4.19.3 + devDependencies: + typescript: + specifier: ^5.8.2 + version: 5.8.2 + +packages: + + '@atomone/temporastate@2.0.0': + resolution: {integrity: sha512-xiC6Kt99kJ1922Obe5l1+fIh9g7u0tEfsKTi9UH9dWNOy3qzJBtA9Kb5lmxH8fcz/ILQPT4sYOWesKQqAmMDFw==} + + '@elysiajs/node@1.2.6': + resolution: {integrity: sha512-qauTs0YNLvfSyqW8k8pDCazd3nPQtFOeDH/cSz1wBhAGn1HL1PQlywnK6V0+bOGemkKlxLIjhmvbHxGrNzZMSg==} + peerDependencies: + bufferutil: '>= 4.0.1' + elysia: '>= 1.2.7' + formidable: '>= 3.5.2' + ws: '>= 8.18.0' + peerDependenciesMeta: + bufferutil: + optional: true + + '@esbuild/aix-ppc64@0.25.1': + resolution: {integrity: sha512-kfYGy8IdzTGy+z0vFGvExZtxkFlA4zAxgKEahG9KE1ScBjpQnFsNOX8KTU5ojNru5ed5CVoJYXFtoxaq5nFbjQ==} + engines: {node: '>=18'} + cpu: [ppc64] + os: [aix] + + '@esbuild/android-arm64@0.25.1': + resolution: {integrity: sha512-50tM0zCJW5kGqgG7fQ7IHvQOcAn9TKiVRuQ/lN0xR+T2lzEFvAi1ZcS8DiksFcEpf1t/GYOeOfCAgDHFpkiSmA==} + engines: {node: '>=18'} + cpu: [arm64] + os: [android] + + '@esbuild/android-arm@0.25.1': + resolution: {integrity: sha512-dp+MshLYux6j/JjdqVLnMglQlFu+MuVeNrmT5nk6q07wNhCdSnB7QZj+7G8VMUGh1q+vj2Bq8kRsuyA00I/k+Q==} + engines: {node: '>=18'} + cpu: [arm] + os: [android] + + '@esbuild/android-x64@0.25.1': + resolution: {integrity: sha512-GCj6WfUtNldqUzYkN/ITtlhwQqGWu9S45vUXs7EIYf+7rCiiqH9bCloatO9VhxsL0Pji+PF4Lz2XXCES+Q8hDw==} + engines: {node: '>=18'} + cpu: [x64] + os: [android] + + '@esbuild/darwin-arm64@0.25.1': + resolution: {integrity: sha512-5hEZKPf+nQjYoSr/elb62U19/l1mZDdqidGfmFutVUjjUZrOazAtwK+Kr+3y0C/oeJfLlxo9fXb1w7L+P7E4FQ==} + engines: {node: '>=18'} + cpu: [arm64] + os: [darwin] + + '@esbuild/darwin-x64@0.25.1': + resolution: {integrity: sha512-hxVnwL2Dqs3fM1IWq8Iezh0cX7ZGdVhbTfnOy5uURtao5OIVCEyj9xIzemDi7sRvKsuSdtCAhMKarxqtlyVyfA==} + engines: {node: '>=18'} + cpu: [x64] + os: [darwin] + + '@esbuild/freebsd-arm64@0.25.1': + resolution: {integrity: sha512-1MrCZs0fZa2g8E+FUo2ipw6jw5qqQiH+tERoS5fAfKnRx6NXH31tXBKI3VpmLijLH6yriMZsxJtaXUyFt/8Y4A==} + engines: {node: '>=18'} + cpu: [arm64] + os: [freebsd] + + '@esbuild/freebsd-x64@0.25.1': + resolution: {integrity: sha512-0IZWLiTyz7nm0xuIs0q1Y3QWJC52R8aSXxe40VUxm6BB1RNmkODtW6LHvWRrGiICulcX7ZvyH6h5fqdLu4gkww==} + engines: {node: '>=18'} + cpu: [x64] + os: [freebsd] + + '@esbuild/linux-arm64@0.25.1': + resolution: {integrity: sha512-jaN3dHi0/DDPelk0nLcXRm1q7DNJpjXy7yWaWvbfkPvI+7XNSc/lDOnCLN7gzsyzgu6qSAmgSvP9oXAhP973uQ==} + engines: {node: '>=18'} + cpu: [arm64] + os: [linux] + + '@esbuild/linux-arm@0.25.1': + resolution: {integrity: sha512-NdKOhS4u7JhDKw9G3cY6sWqFcnLITn6SqivVArbzIaf3cemShqfLGHYMx8Xlm/lBit3/5d7kXvriTUGa5YViuQ==} + engines: {node: '>=18'} + cpu: [arm] + os: [linux] + + '@esbuild/linux-ia32@0.25.1': + resolution: {integrity: sha512-OJykPaF4v8JidKNGz8c/q1lBO44sQNUQtq1KktJXdBLn1hPod5rE/Hko5ugKKZd+D2+o1a9MFGUEIUwO2YfgkQ==} + engines: {node: '>=18'} + cpu: [ia32] + os: [linux] + + '@esbuild/linux-loong64@0.25.1': + resolution: {integrity: sha512-nGfornQj4dzcq5Vp835oM/o21UMlXzn79KobKlcs3Wz9smwiifknLy4xDCLUU0BWp7b/houtdrgUz7nOGnfIYg==} + engines: {node: '>=18'} + cpu: [loong64] + os: [linux] + + '@esbuild/linux-mips64el@0.25.1': + resolution: {integrity: sha512-1osBbPEFYwIE5IVB/0g2X6i1qInZa1aIoj1TdL4AaAb55xIIgbg8Doq6a5BzYWgr+tEcDzYH67XVnTmUzL+nXg==} + engines: {node: '>=18'} + cpu: [mips64el] + os: [linux] + + '@esbuild/linux-ppc64@0.25.1': + resolution: {integrity: sha512-/6VBJOwUf3TdTvJZ82qF3tbLuWsscd7/1w+D9LH0W/SqUgM5/JJD0lrJ1fVIfZsqB6RFmLCe0Xz3fmZc3WtyVg==} + engines: {node: '>=18'} + cpu: [ppc64] + os: [linux] + + '@esbuild/linux-riscv64@0.25.1': + resolution: {integrity: sha512-nSut/Mx5gnilhcq2yIMLMe3Wl4FK5wx/o0QuuCLMtmJn+WeWYoEGDN1ipcN72g1WHsnIbxGXd4i/MF0gTcuAjQ==} + engines: {node: '>=18'} + cpu: [riscv64] + os: [linux] + + '@esbuild/linux-s390x@0.25.1': + resolution: {integrity: sha512-cEECeLlJNfT8kZHqLarDBQso9a27o2Zd2AQ8USAEoGtejOrCYHNtKP8XQhMDJMtthdF4GBmjR2au3x1udADQQQ==} + engines: {node: '>=18'} + cpu: [s390x] + os: [linux] + + '@esbuild/linux-x64@0.25.1': + resolution: {integrity: sha512-xbfUhu/gnvSEg+EGovRc+kjBAkrvtk38RlerAzQxvMzlB4fXpCFCeUAYzJvrnhFtdeyVCDANSjJvOvGYoeKzFA==} + engines: {node: '>=18'} + cpu: [x64] + os: [linux] + + '@esbuild/netbsd-arm64@0.25.1': + resolution: {integrity: sha512-O96poM2XGhLtpTh+s4+nP7YCCAfb4tJNRVZHfIE7dgmax+yMP2WgMd2OecBuaATHKTHsLWHQeuaxMRnCsH8+5g==} + engines: {node: '>=18'} + cpu: [arm64] + os: [netbsd] + + '@esbuild/netbsd-x64@0.25.1': + resolution: {integrity: sha512-X53z6uXip6KFXBQ+Krbx25XHV/NCbzryM6ehOAeAil7X7oa4XIq+394PWGnwaSQ2WRA0KI6PUO6hTO5zeF5ijA==} + engines: {node: '>=18'} + cpu: [x64] + os: [netbsd] + + '@esbuild/openbsd-arm64@0.25.1': + resolution: {integrity: sha512-Na9T3szbXezdzM/Kfs3GcRQNjHzM6GzFBeU1/6IV/npKP5ORtp9zbQjvkDJ47s6BCgaAZnnnu/cY1x342+MvZg==} + engines: {node: '>=18'} + cpu: [arm64] + os: [openbsd] + + '@esbuild/openbsd-x64@0.25.1': + resolution: {integrity: sha512-T3H78X2h1tszfRSf+txbt5aOp/e7TAz3ptVKu9Oyir3IAOFPGV6O9c2naym5TOriy1l0nNf6a4X5UXRZSGX/dw==} + engines: {node: '>=18'} + cpu: [x64] + os: [openbsd] + + '@esbuild/sunos-x64@0.25.1': + resolution: {integrity: sha512-2H3RUvcmULO7dIE5EWJH8eubZAI4xw54H1ilJnRNZdeo8dTADEZ21w6J22XBkXqGJbe0+wnNJtw3UXRoLJnFEg==} + engines: {node: '>=18'} + cpu: [x64] + os: [sunos] + + '@esbuild/win32-arm64@0.25.1': + resolution: {integrity: sha512-GE7XvrdOzrb+yVKB9KsRMq+7a2U/K5Cf/8grVFRAGJmfADr/e/ODQ134RK2/eeHqYV5eQRFxb1hY7Nr15fv1NQ==} + engines: {node: '>=18'} + cpu: [arm64] + os: [win32] + + '@esbuild/win32-ia32@0.25.1': + resolution: {integrity: sha512-uOxSJCIcavSiT6UnBhBzE8wy3n0hOkJsBOzy7HDAuTDE++1DJMRRVCPGisULScHL+a/ZwdXPpXD3IyFKjA7K8A==} + engines: {node: '>=18'} + cpu: [ia32] + os: [win32] + + '@esbuild/win32-x64@0.25.1': + resolution: {integrity: sha512-Y1EQdcfwMSeQN/ujR5VayLOJ1BHaK+ssyk0AEzPjC+t1lITgsnccPqFjb6V+LsTp/9Iov4ysfjxLaGJ9RPtkVg==} + engines: {node: '>=18'} + cpu: [x64] + os: [win32] + + '@sinclair/typebox@0.34.31': + resolution: {integrity: sha512-qQ71T9DsITbX3dVCrcBERbs11YuSMg3wZPnT472JhqhWGPdiLgyvihJXU8m+ADJtJvRdjATIiACJD22dEknBrQ==} + + '@types/node@22.13.10': + resolution: {integrity: sha512-I6LPUvlRH+O6VRUqYOcMudhaIdUVWfsjnZavnsraHvpBwaEyMN29ry+0UVJhImYL16xsscu0aske3yA+uPOWfw==} + + asap@2.0.6: + resolution: {integrity: sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==} + + cookie@1.0.2: + resolution: {integrity: sha512-9Kr/j4O16ISv8zBBhJoi4bXOYNTkFLOqSL3UDB0njXxCXNezjeyVrJyGOWtgfs/q2km1gwBcfH8q1yEGoMYunA==} + engines: {node: '>=18'} + + dezalgo@1.0.4: + resolution: {integrity: sha512-rXSP0bf+5n0Qonsb+SVVfNfIsimO4HEtmnIpPHY8Q1UCzKlQrDMfdobr8nJOOsRgWCyMRqeSBQzmWUMq7zvVig==} + + elysia@1.2.25: + resolution: {integrity: sha512-WsdQpORJvb4uszzeqYT0lg97knw1iBW1NTzJ1Jm57tiHg+DfAotlWXYbjmvQ039ssV0fYELDHinLLoUazZkEHg==} + peerDependencies: + '@sinclair/typebox': '>= 0.34.0' + openapi-types: '>= 12.0.0' + typescript: '>= 5.0.0' + peerDependenciesMeta: + openapi-types: + optional: true + typescript: + optional: true + + esbuild@0.25.1: + resolution: {integrity: sha512-BGO5LtrGC7vxnqucAe/rmvKdJllfGaYWdyABvyMoXQlfYMb2bbRuReWR5tEGE//4LcNJj9XrkovTqNYRFZHAMQ==} + engines: {node: '>=18'} + hasBin: true + + formidable@3.5.2: + resolution: {integrity: sha512-Jqc1btCy3QzRbJaICGwKcBfGWuLADRerLzDqi2NwSt/UkXLsHJw2TVResiaoBufHVHy9aSgClOHCeJsSsFLTbg==} + + fsevents@2.3.3: + resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==} + engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} + os: [darwin] + + get-tsconfig@4.10.0: + resolution: {integrity: sha512-kGzZ3LWWQcGIAmg6iWvXn0ei6WDtV26wzHRMwDSzmAbcXrTEXxHy6IehI6/4eT6VRKyMP1eF1VqwrVUmE/LR7A==} + + hexoid@2.0.0: + resolution: {integrity: sha512-qlspKUK7IlSQv2o+5I7yhUd7TxlOG2Vr5LTa3ve2XSNVKAL/n/u/7KLvKmFNimomDIKvZFXWHv0T12mv7rT8Aw==} + engines: {node: '>=8'} + + memoirist@0.3.0: + resolution: {integrity: sha512-wR+4chMgVPq+T6OOsk40u9Wlpw1Pjx66NMNiYxCQQ4EUJ7jDs3D9kTCeKdBOkvAiqXlHLVJlvYL01PvIJ1MPNg==} + + once@1.4.0: + resolution: {integrity: sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==} + + resolve-pkg-maps@1.0.0: + resolution: {integrity: sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw==} + + tsx@4.19.3: + resolution: {integrity: sha512-4H8vUNGNjQ4V2EOoGw005+c+dGuPSnhpPBPHBtsZdGZBk/iJb4kguGlPWaZTZ3q5nMtFOEsY0nRDlh9PJyd6SQ==} + engines: {node: '>=18.0.0'} + hasBin: true + + typescript@5.8.2: + resolution: {integrity: sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ==} + engines: {node: '>=14.17'} + hasBin: true + + undici-types@6.20.0: + resolution: {integrity: sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==} + + wrappy@1.0.2: + resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + + ws@8.18.1: + resolution: {integrity: sha512-RKW2aJZMXeMxVpnZ6bck+RswznaxmzdULiBr6KY7XkTnW8uvt0iT9H5DkHUChXrc+uurzwa0rVI16n/Xzjdz1w==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + +snapshots: + + '@atomone/temporastate@2.0.0': {} + + '@elysiajs/node@1.2.6(elysia@1.2.25(@sinclair/typebox@0.34.31)(typescript@5.8.2))(formidable@3.5.2)(ws@8.18.1)': + dependencies: + elysia: 1.2.25(@sinclair/typebox@0.34.31)(typescript@5.8.2) + formidable: 3.5.2 + ws: 8.18.1 + + '@esbuild/aix-ppc64@0.25.1': + optional: true + + '@esbuild/android-arm64@0.25.1': + optional: true + + '@esbuild/android-arm@0.25.1': + optional: true + + '@esbuild/android-x64@0.25.1': + optional: true + + '@esbuild/darwin-arm64@0.25.1': + optional: true + + '@esbuild/darwin-x64@0.25.1': + optional: true + + '@esbuild/freebsd-arm64@0.25.1': + optional: true + + '@esbuild/freebsd-x64@0.25.1': + optional: true + + '@esbuild/linux-arm64@0.25.1': + optional: true + + '@esbuild/linux-arm@0.25.1': + optional: true + + '@esbuild/linux-ia32@0.25.1': + optional: true + + '@esbuild/linux-loong64@0.25.1': + optional: true + + '@esbuild/linux-mips64el@0.25.1': + optional: true + + '@esbuild/linux-ppc64@0.25.1': + optional: true + + '@esbuild/linux-riscv64@0.25.1': + optional: true + + '@esbuild/linux-s390x@0.25.1': + optional: true + + '@esbuild/linux-x64@0.25.1': + optional: true + + '@esbuild/netbsd-arm64@0.25.1': + optional: true + + '@esbuild/netbsd-x64@0.25.1': + optional: true + + '@esbuild/openbsd-arm64@0.25.1': + optional: true + + '@esbuild/openbsd-x64@0.25.1': + optional: true + + '@esbuild/sunos-x64@0.25.1': + optional: true + + '@esbuild/win32-arm64@0.25.1': + optional: true + + '@esbuild/win32-ia32@0.25.1': + optional: true + + '@esbuild/win32-x64@0.25.1': + optional: true + + '@sinclair/typebox@0.34.31': {} + + '@types/node@22.13.10': + dependencies: + undici-types: 6.20.0 + + asap@2.0.6: {} + + cookie@1.0.2: {} + + dezalgo@1.0.4: + dependencies: + asap: 2.0.6 + wrappy: 1.0.2 + + elysia@1.2.25(@sinclair/typebox@0.34.31)(typescript@5.8.2): + dependencies: + '@sinclair/typebox': 0.34.31 + cookie: 1.0.2 + memoirist: 0.3.0 + optionalDependencies: + typescript: 5.8.2 + + esbuild@0.25.1: + optionalDependencies: + '@esbuild/aix-ppc64': 0.25.1 + '@esbuild/android-arm': 0.25.1 + '@esbuild/android-arm64': 0.25.1 + '@esbuild/android-x64': 0.25.1 + '@esbuild/darwin-arm64': 0.25.1 + '@esbuild/darwin-x64': 0.25.1 + '@esbuild/freebsd-arm64': 0.25.1 + '@esbuild/freebsd-x64': 0.25.1 + '@esbuild/linux-arm': 0.25.1 + '@esbuild/linux-arm64': 0.25.1 + '@esbuild/linux-ia32': 0.25.1 + '@esbuild/linux-loong64': 0.25.1 + '@esbuild/linux-mips64el': 0.25.1 + '@esbuild/linux-ppc64': 0.25.1 + '@esbuild/linux-riscv64': 0.25.1 + '@esbuild/linux-s390x': 0.25.1 + '@esbuild/linux-x64': 0.25.1 + '@esbuild/netbsd-arm64': 0.25.1 + '@esbuild/netbsd-x64': 0.25.1 + '@esbuild/openbsd-arm64': 0.25.1 + '@esbuild/openbsd-x64': 0.25.1 + '@esbuild/sunos-x64': 0.25.1 + '@esbuild/win32-arm64': 0.25.1 + '@esbuild/win32-ia32': 0.25.1 + '@esbuild/win32-x64': 0.25.1 + + formidable@3.5.2: + dependencies: + dezalgo: 1.0.4 + hexoid: 2.0.0 + once: 1.4.0 + + fsevents@2.3.3: + optional: true + + get-tsconfig@4.10.0: + dependencies: + resolve-pkg-maps: 1.0.0 + + hexoid@2.0.0: {} + + memoirist@0.3.0: {} + + once@1.4.0: + dependencies: + wrappy: 1.0.2 + + resolve-pkg-maps@1.0.0: {} + + tsx@4.19.3: + dependencies: + esbuild: 0.25.1 + get-tsconfig: 4.10.0 + optionalDependencies: + fsevents: 2.3.3 + + typescript@5.8.2: {} + + undici-types@6.20.0: {} + + wrappy@1.0.2: {} + + ws@8.18.1: {} diff --git a/packages/dead-letter-service/src/config.ts b/packages/dead-letter-service/src/config.ts new file mode 100644 index 00000000..667ff92b --- /dev/null +++ b/packages/dead-letter-service/src/config.ts @@ -0,0 +1,22 @@ +import type { EventConsumerConfig } from '@atomone/event-consumer'; + +let config: EventConsumerConfig; + +export function useConfig(): EventConsumerConfig { + if (typeof config !== 'undefined') { + return config; + } + + config = { + exchange: process.env.RABBITMQ_EXCHANGE || 'dither', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', + durable: true, + rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', + }; + + return config; +} diff --git a/packages/dead-letter-service/src/index.ts b/packages/dead-letter-service/src/index.ts new file mode 100644 index 00000000..00425a4e --- /dev/null +++ b/packages/dead-letter-service/src/index.ts @@ -0,0 +1,18 @@ +import amqplib from 'amqplib'; + +import { useConfig } from './config'; + +const config = useConfig(); + +const _apiRoot = process.env.API_ROOT || 'http://localhost:3000'; + +const start = async () => { + const conn = await amqplib.connect(config.rabbitMQEndpoint); + const channel = await conn.createChannel(); + await channel.assertExchange(config.logExchange, 'direct', { durable: config.durable }); + await channel.assertQueue(config.logQueue, { durable: config.durable }); + await channel.consume(config.logQueue, async (msg) => { + console.log('Logging failed message: ', msg?.content.toString()); + }); +}; +start(); diff --git a/packages/dead-letter-service/tsconfig.json b/packages/dead-letter-service/tsconfig.json new file mode 100644 index 00000000..4389c6eb --- /dev/null +++ b/packages/dead-letter-service/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "ESNext", + "moduleResolution": "Bundler", + "lib": ["ESNext", "DOM"], + "outDir": "./dist", + "strict": true, + "allowJs": true, + "checkJs": false, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "declaration": true + }, + "include": ["src"], + "exclude": ["node_modules", "dist"] +} \ No newline at end of file diff --git a/packages/dislike-service/package.json b/packages/dislike-service/package.json index 647cd7aa..de1f5143 100644 --- a/packages/dislike-service/package.json +++ b/packages/dislike-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/dislike-service/src/config.ts b/packages/dislike-service/src/config.ts index 4a97f013..667ff92b 100644 --- a/packages/dislike-service/src/config.ts +++ b/packages/dislike-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Dislike', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/dislike-service/src/index.ts b/packages/dislike-service/src/index.ts index 38d74838..6bf53b88 100644 --- a/packages/dislike-service/src/index.ts +++ b/packages/dislike-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; declare module '@atomone/chronostate' { @@ -37,16 +38,16 @@ const dislikesHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Dislike message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/event-consumer/.gitignore b/packages/event-consumer/.gitignore new file mode 100644 index 00000000..875c3698 --- /dev/null +++ b/packages/event-consumer/.gitignore @@ -0,0 +1,3 @@ +node_modules/ +dist +dist/ \ No newline at end of file diff --git a/packages/event-consumer/dist/consumer.d.ts b/packages/event-consumer/dist/consumer.d.ts deleted file mode 100644 index 2fdd6ed2..00000000 --- a/packages/event-consumer/dist/consumer.d.ts +++ /dev/null @@ -1,10 +0,0 @@ -import type amqplib from 'amqplib'; -import type { EventConsumerConfig } from './types'; -export declare class EventConsumer { - private channel; - private config; - private handler; - constructor(config: EventConsumerConfig, handler: (msg: amqplib.Message) => Promise); - connect(): Promise; - consume(): Promise; -} diff --git a/packages/event-consumer/dist/consumer.js b/packages/event-consumer/dist/consumer.js deleted file mode 100644 index d6311574..00000000 --- a/packages/event-consumer/dist/consumer.js +++ /dev/null @@ -1,37 +0,0 @@ -import amqplib from 'amqplib'; -export class EventConsumer { - channel; - config; - handler; - constructor(config, handler) { - this.config = config; - this.handler = handler; - } - - async connect() { - const conn = await amqplib.connect(this.config.rabbitMQEndpoint); - this.channel = await conn.createChannel(); - await this.channel.assertExchange(this.config.exchange, 'direct', { durable: this.config.durable }); - await this.channel.assertQueue(this.config.queue, { durable: this.config.durable }); - } - - async consume() { - this.channel.consume(this.config.queue, async (msg) => { - if (msg) { - try { - const res = await this.handler(msg); - if (res) { - this.channel.ack(msg); - } - else { - this.channel.nack(msg); - } - } - catch (error) { - console.error('Error processing message:', error); - this.channel.nack(msg); - } - } - }); - } -} diff --git a/packages/event-consumer/dist/index.d.ts b/packages/event-consumer/dist/index.d.ts deleted file mode 100644 index 5bf16565..00000000 --- a/packages/event-consumer/dist/index.d.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { EventConsumer } from './consumer'; -export { EventConsumerConfig } from './types'; diff --git a/packages/event-consumer/dist/index.js b/packages/event-consumer/dist/index.js deleted file mode 100644 index 77d3cfd5..00000000 --- a/packages/event-consumer/dist/index.js +++ /dev/null @@ -1 +0,0 @@ -export { EventConsumer } from './consumer'; diff --git a/packages/event-consumer/dist/types.d.ts b/packages/event-consumer/dist/types.d.ts deleted file mode 100644 index 549a8b6d..00000000 --- a/packages/event-consumer/dist/types.d.ts +++ /dev/null @@ -1,6 +0,0 @@ -export type EventConsumerConfig = { - exchange: string; - queue: string; - durable: boolean; - rabbitMQEndpoint: string; -}; diff --git a/packages/event-consumer/dist/types.js b/packages/event-consumer/dist/types.js deleted file mode 100644 index cb0ff5c3..00000000 --- a/packages/event-consumer/dist/types.js +++ /dev/null @@ -1 +0,0 @@ -export {}; diff --git a/packages/event-consumer/package.json b/packages/event-consumer/package.json index 1f26d924..68df3b76 100644 --- a/packages/event-consumer/package.json +++ b/packages/event-consumer/package.json @@ -1,6 +1,6 @@ { "name": "@atomone/event-consumer", - "version": "1.0.7", + "version": "1.0.12", "description": "A package for consuming events from a RabbitMQ queue.", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -22,6 +22,7 @@ "vitest": "^3.0.9" }, "dependencies": { + "@atomone/event-consumer": "link:", "@types/amqplib": "^0.10.7", "amqplib": "^0.10.7" } diff --git a/packages/event-consumer/src/consumer.ts b/packages/event-consumer/src/consumer.ts index 59442de6..7037ff32 100644 --- a/packages/event-consumer/src/consumer.ts +++ b/packages/event-consumer/src/consumer.ts @@ -2,11 +2,16 @@ import type { EventConsumerConfig } from './types'; import amqplib from 'amqplib'; +export enum HandlerResponse { + SUCCESS = 'success', + FAILURE = 'failure', + REJECT = 'reject', +} export class EventConsumer { private channel!: amqplib.Channel; private config: EventConsumerConfig; - private handler: (msg: amqplib.Message) => Promise; - constructor(config: EventConsumerConfig, handler: (msg: amqplib.Message) => Promise) { + private handler: (msg: amqplib.Message) => Promise; + constructor(config: EventConsumerConfig, handler: (msg: amqplib.Message) => Promise) { this.config = config; this.handler = handler.bind(this); } @@ -15,25 +20,41 @@ export class EventConsumer { const conn = await amqplib.connect(this.config.rabbitMQEndpoint); this.channel = await conn.createChannel(); await this.channel.assertExchange(this.config.exchange, 'direct', { durable: this.config.durable }); - await this.channel.assertQueue(this.config.queue, { durable: this.config.durable }); + await this.channel.assertExchange(this.config.logExchange, 'direct', { durable: this.config.durable }); + await this.channel.assertQueue(this.config.logQueue, { durable: this.config.durable }); + await this.channel.assertQueue(this.config.queue, { durable: this.config.durable, deadLetterExchange: this.config.dlxExchange }); } async consume() { this.channel.consume(this.config.queue, async (msg) => { if (msg) { - try { - const res = await this.handler(msg); - if (res) { - this.channel.ack(msg); + if (msg.properties.headers && msg.properties.headers['x-death'] && msg.properties.headers['x-death'].length > 0 && msg.properties.headers['x-death'][0].count > 3) { + console.log('Message has been retried more than 3 times, storing to investigate.'); + this.channel.ack(msg); + this.channel.publish(this.config.logExchange, this.config.logQueue, msg.content); + } + else { + try { + const res = await this.handler(msg); + switch (res) { + case HandlerResponse.SUCCESS: + this.channel.ack(msg); + break; + case HandlerResponse.REJECT: + this.channel.ack(msg); + this.channel.publish(this.config.logExchange, this.config.logQueue, msg.content); + break; + case HandlerResponse.FAILURE: + default: + this.channel.reject(msg, false); + break; + } } - else { - this.channel.nack(msg); + catch (error) { + console.error('Error processing message:', error); + this.channel.reject(msg, false); } } - catch (error) { - console.error('Error processing message:', error); - this.channel.nack(msg); - } } }); } diff --git a/packages/event-consumer/src/types.ts b/packages/event-consumer/src/types.ts index 549a8b6d..a783643a 100644 --- a/packages/event-consumer/src/types.ts +++ b/packages/event-consumer/src/types.ts @@ -1,5 +1,9 @@ export type EventConsumerConfig = { exchange: string; + dlxExchange: string; + logExchange: string; + dlxQueue: string; + logQueue: string; queue: string; durable: boolean; rabbitMQEndpoint: string; diff --git a/packages/flag-service/package.json b/packages/flag-service/package.json index 47b6d152..7f0d0c4b 100644 --- a/packages/flag-service/package.json +++ b/packages/flag-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/flag-service/src/config.ts b/packages/flag-service/src/config.ts index dbefc69d..667ff92b 100644 --- a/packages/flag-service/src/config.ts +++ b/packages/flag-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Flag', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/flag-service/src/index.ts b/packages/flag-service/src/index.ts index a9374fd2..c42542c2 100644 --- a/packages/flag-service/src/index.ts +++ b/packages/flag-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -38,16 +39,16 @@ const flagsHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Flag message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/follow-service/package.json b/packages/follow-service/package.json index 64031034..8ac6213e 100644 --- a/packages/follow-service/package.json +++ b/packages/follow-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/follow-service/src/config.ts b/packages/follow-service/src/config.ts index d3695902..667ff92b 100644 --- a/packages/follow-service/src/config.ts +++ b/packages/follow-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Follow', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/follow-service/src/index.ts b/packages/follow-service/src/index.ts index 8dbee884..b2ab131c 100644 --- a/packages/follow-service/src/index.ts +++ b/packages/follow-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -37,16 +38,16 @@ const followsHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Follow message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/like-service/package.json b/packages/like-service/package.json index 45f15830..807badff 100644 --- a/packages/like-service/package.json +++ b/packages/like-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/like-service/src/config.ts b/packages/like-service/src/config.ts index 6190d903..667ff92b 100644 --- a/packages/like-service/src/config.ts +++ b/packages/like-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Like', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/like-service/src/index.ts b/packages/like-service/src/index.ts index c4f23ed7..bf8dcef6 100644 --- a/packages/like-service/src/index.ts +++ b/packages/like-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -39,16 +40,16 @@ const likesHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Like message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/post-remove-service/package.json b/packages/post-remove-service/package.json index ef6cd3bd..cb2742c7 100644 --- a/packages/post-remove-service/package.json +++ b/packages/post-remove-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/post-remove-service/src/config.ts b/packages/post-remove-service/src/config.ts index 2155ada3..667ff92b 100644 --- a/packages/post-remove-service/src/config.ts +++ b/packages/post-remove-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Remove', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/post-remove-service/src/index.ts b/packages/post-remove-service/src/index.ts index 14b422ac..573379df 100644 --- a/packages/post-remove-service/src/index.ts +++ b/packages/post-remove-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -37,16 +38,16 @@ const postsHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Remove message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/post-service/package.json b/packages/post-service/package.json index 6a933770..0bb876a5 100644 --- a/packages/post-service/package.json +++ b/packages/post-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/post-service/src/config.ts b/packages/post-service/src/config.ts index 63d5b59b..667ff92b 100644 --- a/packages/post-service/src/config.ts +++ b/packages/post-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Post', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/post-service/src/index.ts b/packages/post-service/src/index.ts index 9493f385..61d2de6b 100644 --- a/packages/post-service/src/index.ts +++ b/packages/post-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -38,16 +39,16 @@ const postsHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Post message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/rabbit-mq/Dockerfile b/packages/rabbit-mq/Dockerfile index c1708a6f..ccc2afef 100644 --- a/packages/rabbit-mq/Dockerfile +++ b/packages/rabbit-mq/Dockerfile @@ -1,3 +1,6 @@ FROM rabbitmq:latest RUN rabbitmq-plugins enable rabbitmq_management +RUN echo "management_agent.disable_metrics_collector = false" > /etc/rabbitmq/conf.d/20-management_agent.disable_metrics_collector.conf +#RUN rabbitmqctl set_policy DLX "dither.*" '{"dead-letter-exchange": "dither-dlx"}' --apply-to queues --priority 7 +#RUN rabbitmqctl set_policy PUSHBACK "dlx-queue" '{"dead-letter-exchange": "dither","message-ttl": 10000}' --apply-to queues --priority 7 \ No newline at end of file diff --git a/packages/reader-main/src/event-config.ts b/packages/reader-main/src/event-config.ts index 402d2563..710bcd4a 100644 --- a/packages/reader-main/src/event-config.ts +++ b/packages/reader-main/src/event-config.ts @@ -9,6 +9,10 @@ export function useEventConfig(): EventConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/reader-main/src/index.ts b/packages/reader-main/src/index.ts index 96f1ab70..15d65299 100644 --- a/packages/reader-main/src/index.ts +++ b/packages/reader-main/src/index.ts @@ -45,7 +45,7 @@ async function handleAction(action: Action) { if (action.memo.startsWith(config.MEMO_PREFIX + actionType)) { const transfer = getTransferMessage(action.messages as Array); const quantity = getTransferQuantities(action.messages as Array); - await channel.publish(eventConfig.exchange, actionType, Buffer.from(JSON.stringify({ sender: transfer?.from_address, quantity: quantity, ...action }))); + await channel.publish(eventConfig.exchange, eventConfig.exchange + '.' + actionType, Buffer.from(JSON.stringify({ sender: transfer?.from_address, quantity: quantity, ...action }))); break; } else { @@ -67,10 +67,18 @@ export async function start() { const conn = await amqplib.connect(eventConfig.rabbitMQEndpoint); channel = await conn.createChannel(); const exchange = eventConfig.exchange; + const dlxExchange = eventConfig.dlxExchange; + const logExchange = eventConfig.logExchange; await channel.assertExchange(exchange, 'direct', { durable: eventConfig.durable }); + await channel.assertExchange(logExchange, 'direct', { durable: eventConfig.durable }); + await channel.assertExchange(dlxExchange, 'fanout', { durable: eventConfig.durable }); + await channel.assertQueue(eventConfig.dlxQueue, { durable: eventConfig.durable, deadLetterExchange: exchange, messageTtl: 1000 * 10 }); + await channel.bindQueue(eventConfig.dlxQueue, dlxExchange, '*'); + await channel.assertQueue(eventConfig.logQueue, { durable: eventConfig.durable }); + await channel.bindQueue(eventConfig.logQueue, logExchange, eventConfig.logQueue); for (const actionType of actionTypes) { - await channel.assertQueue(actionType, { durable: eventConfig.durable }); - await channel.bindQueue(actionType, exchange, actionType); + await channel.assertQueue(exchange + '.' + actionType, { durable: eventConfig.durable, deadLetterExchange: dlxExchange }); + await channel.bindQueue(exchange + '.' + actionType, exchange, exchange + '.' + actionType); } state = new ChronoState({ ...config }); state.onLastBlock(handleLastBlock); diff --git a/packages/reader-main/src/types.ts b/packages/reader-main/src/types.ts index ce0e0c9a..d45933cb 100644 --- a/packages/reader-main/src/types.ts +++ b/packages/reader-main/src/types.ts @@ -14,6 +14,10 @@ export type MsgGeneric = { export type EventConfig = { exchange: string; + dlxExchange: string; + logExchange: string; + logQueue: string; + dlxQueue: string; durable: boolean; rabbitMQEndpoint: string; }; diff --git a/packages/reply-service/package.json b/packages/reply-service/package.json index 371c5bce..40b93071 100644 --- a/packages/reply-service/package.json +++ b/packages/reply-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/reply-service/src/config.ts b/packages/reply-service/src/config.ts index 3a5b0cc8..667ff92b 100644 --- a/packages/reply-service/src/config.ts +++ b/packages/reply-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Reply', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/reply-service/src/index.ts b/packages/reply-service/src/index.ts index 5b91e81a..2cf2a28f 100644 --- a/packages/reply-service/src/index.ts +++ b/packages/reply-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -39,16 +40,16 @@ const repliesHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Reply message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/packages/unfollow-service/package.json b/packages/unfollow-service/package.json index 6a933770..0bb876a5 100644 --- a/packages/unfollow-service/package.json +++ b/packages/unfollow-service/package.json @@ -11,7 +11,7 @@ }, "dependencies": { "@atomone/chronostate": "^2.2.2", - "@atomone/event-consumer": "^1.0.2", + "@atomone/event-consumer": "^1.0.12", "@types/node": "^22.13.10", "amqplib": "^0.10.7", "tsx": "^4.19.3" diff --git a/packages/unfollow-service/src/config.ts b/packages/unfollow-service/src/config.ts index 9be9e6e3..667ff92b 100644 --- a/packages/unfollow-service/src/config.ts +++ b/packages/unfollow-service/src/config.ts @@ -9,7 +9,11 @@ export function useConfig(): EventConsumerConfig { config = { exchange: process.env.RABBITMQ_EXCHANGE || 'dither', - queue: 'Unfollow', + queue: process.env.RABBITMQ_QUEUE || '', + dlxExchange: process.env.RABBITMQ_DLX_EXCHANGE || 'dither-dlx', + dlxQueue: process.env.RABBITMQ_DLX_QUEUE || 'dlx-queue', + logExchange: process.env.RABBITMQ_LOG_EXCHANGE || 'dither-log', + logQueue: process.env.RABBITMQ_LOG_QUEUE || 'log-queue', durable: true, rabbitMQEndpoint: process.env.RABBITMQ_ENDPOINT || 'amqp://localhost', }; diff --git a/packages/unfollow-service/src/index.ts b/packages/unfollow-service/src/index.ts index 53d9dea7..82c0eae2 100644 --- a/packages/unfollow-service/src/index.ts +++ b/packages/unfollow-service/src/index.ts @@ -3,6 +3,7 @@ import type amqplib from 'amqplib'; import { extractMemoContent } from '@atomone/chronostate'; import { EventConsumer } from '@atomone/event-consumer'; +import { HandlerResponse } from '@atomone/event-consumer/dist/consumer'; import { useConfig } from './config'; @@ -38,16 +39,16 @@ const unfollowsHandler = async (msg: amqplib.Message) => { }); if (rawResponse.status !== 200) { console.error('Error posting to API:', rawResponse); - return false; + return HandlerResponse.REJECT; } else { console.log(`dither.Unfollow message processed successfully: ${parsedContent.hash}`); - return true; + return HandlerResponse.SUCCESS; } } catch (error) { console.error('Error processing message:', error); - return false; + return HandlerResponse.FAILURE; }; }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 824f7ffe..f3b97933 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -112,13 +112,38 @@ importers: specifier: ^3.1.2 version: 3.1.3(@types/node@22.15.17)(jiti@2.4.2)(lightningcss@1.29.2)(tsx@4.19.4) + packages/dead-letter-service: + dependencies: + '@types/amqplib': + specifier: ^0.10.7 + version: 0.10.7 + amqplib: + specifier: ^0.10.7 + version: 0.10.8 + dead-letter-service: + specifier: 'link:' + version: 'link:' + devDependencies: + '@atomone/event-consumer': + specifier: ^1.0.12 + version: 1.0.12 + '@types/node': + specifier: ^22.13.10 + version: 22.15.17 + typescript: + specifier: ^5.8.2 + version: 5.8.3 + vitest: + specifier: ^3.0.9 + version: 3.1.3(@types/node@22.15.17)(jiti@2.4.2)(lightningcss@1.29.2)(tsx@4.19.4) + packages/dislike-service: dependencies: '@atomone/chronostate': specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -139,6 +164,9 @@ importers: packages/event-consumer: dependencies: + '@atomone/event-consumer': + specifier: 'link:' + version: 'link:' '@types/amqplib': specifier: ^0.10.7 version: 0.10.7 @@ -162,7 +190,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -187,7 +215,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -303,7 +331,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -328,7 +356,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -353,7 +381,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -403,7 +431,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10 @@ -450,7 +478,7 @@ importers: specifier: ^2.2.2 version: 2.2.2 '@atomone/event-consumer': - specifier: ^1.0.2 + specifier: ^1.0.12 version: 1.0.12 '@types/node': specifier: ^22.13.10