Skip to content
Merged
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,10 @@ const contract = defineContract({
});

// 6. Client - type-safe publishing with explicit error handling
const clientResult = await TypedAmqpClient.create({ contract, connection });
if (clientResult.isError()) {
throw clientResult.error; // or handle error appropriately
}
const client = clientResult.get();
const client = await TypedAmqpClient.create({
contract,
urls: ["amqp://localhost"],
}).resultToPromise();

const result = await client.publish("orderCreated", {
orderId: "ORD-123", // ✅ TypeScript knows!
Expand All @@ -107,8 +106,8 @@ const worker = await TypedAmqpWorker.create({
console.log(message.orderId); // ✅ TypeScript knows!
},
},
connection,
});
urls: ["amqp://localhost"],
}).resultToPromise();
```

> **Note**: If your application both publishes and consumes messages, see the [Architecture Review](docs/review/2025-12-25-architecture-review.md#3-connection-sharing-analysis) for connection sharing strategies to optimize resource usage.
Expand Down Expand Up @@ -144,12 +143,12 @@ import { contract } from "./contract";
console.log("Processing:", message.orderId);
},
},
connection: "amqp://localhost",
urls: ["amqp://localhost"],
}),
// Client for publishing messages
AmqpClientModule.forRoot({
contract,
connection: "amqp://localhost",
urls: ["amqp://localhost"],
}),
],
})
Expand Down
14 changes: 8 additions & 6 deletions docs/TERMINOLOGY.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ When implementing the contract, we use our terms:

```typescript
// Client = runtime publisher
const client = await TypedAmqpClient.create({ contract, urls });
await client.publish("orderCreated", message);
const client = await TypedAmqpClient.create({ contract, urls }).resultToPromise();

await client.publish("orderCreated", message).resultToPromise();

// Worker = runtime consumer
const worker = await TypedAmqpWorker.create({
Expand All @@ -92,7 +93,7 @@ const worker = await TypedAmqpWorker.create({
},
},
urls,
});
}).resultToPromise();
```

These terms (`TypedAmqpClient`, `TypedAmqpWorker`) describe the **runtime components** that implement the contract.
Expand Down Expand Up @@ -148,14 +149,15 @@ await publisher.publish(exchange, routingKey, message);
const consumer = await createConsumer(queue, handler);

// amqp-contract uses:
const client = await TypedAmqpClient.create({ contract, urls });
await client.publish("orderCreated", message);
const client = await TypedAmqpClient.create({ contract, urls }).resultToPromise();

await client.publish("orderCreated", message).resultToPromise();

const worker = await TypedAmqpWorker.create({
contract,
handlers: { processOrder: handler },
urls,
});
}).resultToPromise();
```

The functionality is identical; only the naming differs.
Expand Down
5 changes: 3 additions & 2 deletions docs/adr/002-separate-packages.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,9 @@ Each can be used independently or together.
```typescript
import { TypedAmqpClient } from '@amqp-contract/client';

const client = await TypedAmqpClient.create({ contract, urls });
await client.publish('orderCreated', { ... });
const client = await TypedAmqpClient.create({ contract, urls }).resultToPromise();

await client.publish('orderCreated', { ... }).resultToPromise();
```

**Use case**: API services, webhook handlers, event publishers
Expand Down
6 changes: 3 additions & 3 deletions docs/blog/introducing-amqp-contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ TypeScript catches errors before runtime:

```typescript
// ❌ TypeScript error - "orderDeleted" doesn't exist in contract
await client.publish("orderDeleted", { orderId: "123" });
await client.publish("orderDeleted", { orderId: "123" }).resultToPromise();

// ❌ TypeScript error - missing handler for "processOrder"
await TypedAmqpWorker.create({
Expand Down Expand Up @@ -316,12 +316,12 @@ import { contract } from "./contract";
console.log("Processing:", message.orderId);
},
},
connection: process.env.RABBITMQ_URL,
urls: [process.env.RABBITMQ_URL],
}),
// Client module for publishing messages
AmqpClientModule.forRoot({
contract,
connection: process.env.RABBITMQ_URL,
urls: [process.env.RABBITMQ_URL],
}),
],
})
Expand Down
162 changes: 94 additions & 68 deletions docs/guide/client-nestjs-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ import { contract } from "./contract";
imports: [
AmqpClientModule.forRoot({
contract,
connection: "amqp://localhost",
urls: ["amqp://localhost"],
}),
],
})
Expand All @@ -121,7 +121,7 @@ export class OrderService {
async createOrder(customerId: string, amount: number, items: any[]) {
const orderId = this.generateOrderId();

const result = this.client.publish("orderCreated", {
const result = await this.client.publish("orderCreated", {
orderId,
customerId,
amount,
Expand Down Expand Up @@ -177,30 +177,53 @@ That's it! The client automatically connects when the application starts and dis

## Configuration with Environment Variables

Use `@nestjs/config` for environment-based configuration:
Use `@nestjs/config` with `registerAs` and Zod for type-safe configuration:

```typescript
import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { ConfigModule } from "@nestjs/config";
import { AmqpClientModule } from "@amqp-contract/client-nestjs";
import { contract } from "./contract";
import { amqpConfig } from "./config/amqp.config";

@Module({
imports: [
ConfigModule.forRoot(),
ConfigModule.forRoot({
load: [amqpConfig],
}),
AmqpClientModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
useFactory: () => ({
contract,
connection: configService.get("RABBITMQ_URL") || "amqp://localhost",
urls: [amqpConfig().url],
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}
```

Create a config file with Zod validation:

```typescript
// config/amqp.config.ts
import { registerAs } from "@nestjs/config";
import { z } from "zod";

const amqpConfigSchema = z.object({
url: z.string().url().default("amqp://localhost"),
});

export const amqpConfig = registerAs("amqp", () => {
const config = amqpConfigSchema.parse({
url: process.env.RABBITMQ_URL,
});
return config;
});

export type AmqpConfig = z.infer<typeof amqpConfigSchema>;
```

Then set the environment variable:

```bash
Expand Down Expand Up @@ -476,10 +499,13 @@ export const orderRouter = initServer.router({

const orderId = generateOrderId();

await client.publish("orderCreated", {
orderId,
...input,
});
await client
.publish("orderCreated", {
orderId,
...input,
})
.mapError((error) => new Error(`Failed to publish order: ${error.message}`))
.resultToPromise();

return {
orderId,
Expand All @@ -499,7 +525,7 @@ import { OrderService } from "./order.service";
imports: [
AmqpClientModule.forRoot({
contract,
connection: "amqp://localhost",
urls: ["amqp://localhost"],
}),
],
controllers: [OrderController],
Expand Down Expand Up @@ -559,47 +585,53 @@ export class OrderEventService {
async publishOrderCreated(order: any) {
this.logger.log(`Publishing OrderCreated event for ${order.orderId}`);

await this.client.publish("orderCreated", order, {
persistent: true,
headers: {
"event-type": "OrderCreated",
"event-version": "1.0",
"aggregate-id": order.orderId,
timestamp: new Date().toISOString(),
},
});
await this.client
.publish("orderCreated", order, {
persistent: true,
headers: {
"event-type": "OrderCreated",
"event-version": "1.0",
"aggregate-id": order.orderId,
timestamp: new Date().toISOString(),
},
})
.resultToPromise();
}

async publishOrderUpdated(order: any) {
this.logger.log(`Publishing OrderUpdated event for ${order.orderId}`);

await this.client.publish("orderUpdated", order, {
persistent: true,
headers: {
"event-type": "OrderUpdated",
"event-version": "1.0",
"aggregate-id": order.orderId,
timestamp: new Date().toISOString(),
},
});
await this.client
.publish("orderUpdated", order, {
persistent: true,
headers: {
"event-type": "OrderUpdated",
"event-version": "1.0",
"aggregate-id": order.orderId,
timestamp: new Date().toISOString(),
},
})
.resultToPromise();
}

async publishOrderCancelled(orderId: string) {
this.logger.log(`Publishing OrderCancelled event for ${orderId}`);

await this.client.publish(
"orderCancelled",
{ orderId },
{
persistent: true,
headers: {
"event-type": "OrderCancelled",
"event-version": "1.0",
"aggregate-id": orderId,
timestamp: new Date().toISOString(),
await this.client
.publish(
"orderCancelled",
{ orderId },
{
persistent: true,
headers: {
"event-type": "OrderCancelled",
"event-version": "1.0",
"aggregate-id": orderId,
timestamp: new Date().toISOString(),
},
},
},
);
)
.resultToPromise();
}
}
```
Expand All @@ -614,7 +646,7 @@ Use multiple clients for different domains:
imports: [
AmqpClientModule.forRoot({
contract: orderContract,
connection: "amqp://localhost",
urls: ["amqp://localhost"],
}),
],
providers: [OrderService, OrderController],
Expand All @@ -627,7 +659,7 @@ export class OrderModule {}
imports: [
AmqpClientModule.forRoot({
contract: paymentContract,
connection: "amqp://localhost",
urls: ["amqp://localhost"],
}),
],
providers: [PaymentService, PaymentController],
Expand Down Expand Up @@ -837,8 +869,8 @@ export class OrderService {

this.logger.log(`Creating order ${orderId} for customer ${customerId}`);

try {
await this.client.publish(
await this.client
.publish(
"orderCreated",
{
orderId,
Expand All @@ -854,23 +886,15 @@ export class OrderService {
"x-customer-id": customerId,
},
},
);

this.logger.log(`Order ${orderId} published successfully`);
return { orderId };
} catch (error: unknown) {
if (error instanceof Error) {
this.logger.error(`Failed to publish order ${orderId}`, error.stack);
} else {
this.logger.error(`Failed to publish order ${orderId}`);
}
)
.mapError((error) => {
this.logger.error(`Failed to publish order ${orderId}`, error);
return new Error(`Failed to create order: ${error.message}`);
})
.resultToPromise();

// Handle schema validation errors
if (this.isValidationError(error)) {
throw new BadRequestException("Invalid order data");
}
throw error;
}
this.logger.log(`Order ${orderId} published successfully`);
return { orderId };
}

private isValidationError(error: unknown): error is { issues: unknown } {
Expand Down Expand Up @@ -916,22 +940,24 @@ export class OrderController {

```typescript [app.module.ts]
import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { ConfigModule } from "@nestjs/config";
import { AmqpClientModule } from "@amqp-contract/client-nestjs";
import { contract } from "./contract";
import { OrderService } from "./order.service";
import { OrderController } from "./order.controller";
import { amqpConfig } from "./config/amqp.config";

@Module({
imports: [
ConfigModule.forRoot(),
ConfigModule.forRoot({
load: [amqpConfig],
}),
AmqpClientModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
useFactory: () => ({
contract,
connection: configService.get("RABBITMQ_URL") || "amqp://localhost",
urls: [amqpConfig().url],
}),
inject: [ConfigService],
}),
],
controllers: [OrderController],
Expand Down
Loading
Loading