Releases: event-driven-io/emmett
0.42.0
🚀 What's New
- Added the blue-green strategy for the PostgreSQL projection rebuild and locking for multiple instances by @oskardudycz in 286, 299.
Now, when you start async consumers with processors, or run rebuildPostgreSQLProjections to rebuild projections, it'll ensure that there's only a single instance of the processor running. By default processor id is equal to emt:processor:projector:${options.projectionName}, but can be set explicitly. The most important is to ensure that when you're running the same processor type then they share the same id, as:
- this will ensure that there's a single instance running,
- The progress is tracked correctly, and you can resume processing where it left off.
The lock is kept for a set period, and it'll also automatically retry lock acquisition. You can fine-tune from the default settings by passing lock settings, e.g.:
// this will ensure that it can take over processing
// when for one minute none other instance wasn't updating the processor progress
// If you pass 0 then it will always do takeover immediately
const oneMinute = 60;
// This is the same as the default rebuild policy, you can customise it
// You can also decide to fail or skip acquisition, stopping the processor handling
// When other processor is active
const defaultRebuildLockPolicy: LockAcquisitionPolicy = {
type: 'retry',
retries: 100,
minTimeout: 100,
maxTimeout: 5000,
};
const consumer = rebuildPostgreSQLProjections({
connectionString,
projection,
lock: { timeoutSeconds: oneMinute, acquisitionPolicy: defaultRebuildLockPolicy }
});The same settings you can pass to regular async processors.
You can do an in-place rebuild, truncating and rebuilding the same storage location, by providing a different evolve method:
const shoppingCartsSummaryProjectionNew = pongoSingleStreamProjection({
collectionName: shoppingCartsSummaryCollectionName,
evolve: evolveV2,
canHandle: ['ProductItemAdded', 'ShoppingCartConfirmed'],
initialState: () => ({
status: 'pending',
productItemsCount: 0,
}),
});Or (recommended) specify a new projection version, which will add a version suffix to the pongo collection name, targeting a different table:
const shoppingCartsSummaryProjectionV2 = pongoSingleStreamProjection({
collectionName: shoppingCartsSummaryCollectionName,
version: 2, // <= new version
evolve: evolveV2,
canHandle: ['ProductItemAdded', 'ShoppingCartConfirmed'],
initialState: () => ({
status: 'pending',
productItemsCount: 0,
}),
});
Read more on the details in Rebuilding Event-Driven Read Models in a safe and resilient way.
- Implemented initial events schema versioning with upcasting and downcasting by @oskardudycz in 292.
This feature allows both event payload evolution, but also mapping of types that are not supported by JSON, like Date, Bigint, etc.
For instance, having the following events payload:
type ProductItemAdded = Event<
'ProductItemAdded',
{ productItem: PricedProductItem }
>;
type DiscountApplied = Event<'DiscountApplied', { percent: number }>;
type ShoppingCartEvent = ProductItemAdded | DiscountApplied;
type PricedProductItem = { productId: string; quantity: number; price: number };You can also define the payload structure as:
type ProductItemPayload = {
productId: string;
quantity: string;
price: string;
};
type ProductItemAddedFromPayload = Event<
'ProductItemAdded',
{ productItem: ProductItemPayload }
>;
type DiscountAppliedFromPayload = Event<'DiscountApplied', { percent: string }>;
type ShoppingCartEventFromPayload =
| ProductItemAddedFromPayload
| DiscountAppliedFromPayload;Then define the upcast strategy and use it in e.g. CommandHandler
const upcast = (event: ShoppingCartEvenPayload): ShoppingCartEvent => {
switch (event.type) {
case 'ProductItemAdded': {
return {
type: 'ProductItemAdded',
data: {
productItem: {
productId: event.data.productItem.productId,
quantity: Number(event.data.productItem.quantity),
price: Number(event.data.productItem.price),
},
},
};
}
case 'DiscountApplied': {
const e = event;
return {
type: 'DiscountApplied',
data: { percent: Number(e.data.percent) },
};
}
default:
return event as ShoppingCartEvent;
}
}
};
const handleCommandWithUpcast = CommandHandler({
evolve,
initialState,
schema: { versioning: { upcast } },
});This allows keeping the regular evolve method strongly typed and not polluted by type mapping, e.g.
const evolve = (
state: ShoppingCart,
{ type, data }: ShoppingCartEvent,
): ShoppingCart => {
switch (type) {
case 'ProductItemAdded': {
const productItem = data.productItem;
return {
productItems: [...state.productItems, productItem],
totalAmount:
state.totalAmount + productItem.price * productItem.quantity,
};
}
case 'DiscountApplied':
return {
...state,
totalAmount: state.totalAmount * (1 - data.percent),
};
}
};The same way you can version in projections, e.g.
const shoppingCartsSummaryProjection = pongoMultiStreamProjection({
// (...) the rest of definition
eventsOptions: {
schema: { versioning: { upcast } },
},
});Read more in:
- Added Honojs integration by @dawidhermann in 278, @oskardudycz in 280.
You can use it as described in the Express.js integration section of the docs, respecting Hono.js specifics by setting upthe application:
import { getApplication } from `@event-driven-io/emmett-honojs`;
const app = getApplication({ apis: [shoppingCartApi(eventStore)] })Then routes, e.g.
export const shoppingCartApi =
(eventStore: EventStore<ReadEventMetadataWithGlobalPosition>) =>
(router: Hono) => {
// Open Shopping cart
router.post(
'/clients/:clientId/shopping-carts/',
async (context: Context) => {
const clientId = assertNotEmptyString(context.req.param('clientId'));
const shoppingCartId = clientId;
const result = await handle(
eventStore,
shoppingCartId,
{
type: 'OpenShoppingCart',
data: { clientId, shoppingCartId, now: new Date() },
},
{ expectedStreamVersion: STREAM_DOES_NOT_EXIST },
);
return Created({
context,
createdId: shoppingCartId,
eTag: toWeakETag(result.nextExpectedStreamVersion),
});
},
);
// (...) other routes
};You can also do e2e and integration tests, e.g.
import { ApiSpecification } from `@event-driven-io/emmett-honojs`;
void describe('ShoppingCart', () => {
let clientId: string;
let shoppingCartId: string;
beforeEach(() => {
clientId = randomUUID();
shoppingCartId = `shopping_cart:${clientId}:current`;
});
void describe('When empty', () => {
void it('should open shopping cart', () => {
return given()
.when((request) => {
return request
.post(`/clients/${clientId}/shopping-carts/`)
.send(productItem);
})
.then([expectResponse(201)]);
});
void it('should NOT add product item', () => {
return given()
.when((request) => {
return request
.post(
`/clients/${clientId}/shopping-carts/${shoppingCartId}/product-items`,
)
.set({ [HeaderNames.IF_MATCH]: toWeakETag(0) })
.send(productItem);
})
.then([expectResponse(403)]);
});
});
const given = ApiSpecification.for<
ShoppingCartEvent,
EventStore<ReadEventMetadataWithGlobalPosition>
>(
() => getInMemoryEventStore(),
(eventStore) =>
getApplication({
apis: [shoppingCartApi(eventStore)],
}),
);
});- Added streamExists for all event store implementations. This is useful for pre-condition checks to determine whether the referenced stream exists. by @talyssonoc in 288.
Example usage:
const exists = await eventStore.streamExists(streamName)-
Added basic graceful shutdown capability and plugged that into processors This is important, to release processors' locks and other managed resources. It was already possible through
closeconsumer's method, but now it'll also be handled automatically by handling theSIGINTandSIGTERMsignals internally by @oskardudycz in 293, 287 -
Improved event store migrations resiliency and developer experience by enabling specifying a timeout, doing dry run and skipping migration hash check. Some migrations (like those from this release) can take longer, so you can now specify custom timeout:
const 3minutes = 3 * 60 * 1000;
await eventStore.schema.migrate({
mi...0.41.0
🚀 What's New
- Added inline projection initialisation for PostgreSQL and SQLite event stores. Now they're called together with regular event store initialisation! by @oskardudycz in 276
- Added SQLite Event Store samples. See them here. It also contains examples of advanced raw SQL projection with multiple tables. See it: here. by @oskardudycz in 276
- Removed the need to pass the SQLite Connection to the projection integration test setup. Now, if you don't provide it, it'll use an in-memory database. You can also just pass the file name. by @oskardudycz in 276
- Added simple SQLConnectionPool to make easier reuse of the connection. It'll be replaced with the Dumbo one once the new version is out and integrated.. by @oskardudycz in 276
📝 What's Changed
- Removed Dumbo usage in the SQLite package. This is an intermediate fix, it'll be brought back when work on refactoring Pongo and Dumbo is finished. by @oskardudycz in 276
Full Changelog: 0.40.0...0.41.0
0.40.0
🚀 What's New
- Added MongoDB Consumer based on MongoDB ChangeStream subscription. This is a first beta version of MongoDB consumers using Change Streams. It works, but we'll need to test it still more and ensure that's resilient and stable. It requires clustered setup and Mongo 5 or newer. Example usage:
const consumer = mongoDBEventStoreConsumer({
connectionString,
processors: [
inMemoryReactor<NumberRecorded>({
processorId: 'test processor',
eachMessage: (event) => {
console.log(`I just handled event! ${ JSONSerializer.serialize(event) }`);
},
}),
],
});
await consumer.start();See more in tests for reactors and projections.
By @arturwojnar with a bit of help from @oskardudycz in #258, #271
📝 What's Changed
- Exported SQLite projections It appears that we somehow missed it 🤦 by @oskardudycz in #272
🎉 New Contributors
- @arturwojnar made their first contribution in #258
Full Changelog: 0.39.1...0.40.0
0.39.1
📝 What's Changed
- Fixed Emmett error checks and mapping It seems that because of transpilation, TypeScript settings, Node.js version, or moon phases
instance ofmay or may not work for Error types. Used structure checking instead by @oskardudycz in #270
Full Changelog: 0.39.0...0.39.1
0.39.0
📝What's Changed
- BREAKING: Removed dependency on web-streams-pollyfil and removed emmett-shims package. The initial idea of using streaming shims was to provide streaming support for the event store, both on the web and in Node.js. Still, this polyfill dependency can cause issues in compatibility with other tools, like Svelte. For now, I moved the streaming code to emmett-postgresql so we wouldn't lose it, but it'll be removed from there and moved to a dedicated package eventually. Removed also stream function from InMemoryEventStore, which I think/hope no one used. by @oskardudycz in #269
Full Changelog: 0.38.7...0.39.0
0.38.7
📝 What's Changed
- Fixed PostgreSQL Event Store automatic migration from pre-0.38.0 versions. It appeared that PostgreSQL cannot replace a function if the difference is in the result type. 🤷 Added integration tests against a snapshotted version of the schema to ensure that such bugs will be caught in the future by @oskardudycz in #268
Full Changelog: 0.38.6...0.38.7
0.38.6
📝 What's Changed
- All command handler options are passed now to stream aggregation options. Partition was missing, which wasn't an issue for current implementations, as they're always using the default partition for now, but for custom implementations using partitioning, this could cause issues. by @wataruoguchi in #265
🥳 New Contributors
- @wataruoguchi made their first contribution in #265
Full Changelog: 0.38.5...0.38.6
0.38.5
📝 What's Changed
- Fixed deepEquals setup to cover a wider range of cases Now it should work for dates, sets, maps and other types in a more rigid way. by @oskardudycz in #262
Full Changelog: 0.38.4...0.38.5
0.38.4
📝 What's Changed
- Improved PostgreSQL Event Store performance. Ensured that the information that migrations were run is preserved throughout the event store creation in the command handler. This was causing redundant tries to create the schema again. Improved the check for existing stream during append to avoid a potential full scan instead of the index check. by @oskardudycz in 259 and 261
📚 Docs
- Added read models documentation to getting started by @oskardudycz in 254
Full Changelog: 0.38.3...0.38.4
0.38.3
📝 What's Changed
- Fixed ESDB reactors to handle events sequentially by @oskardudycz in #252
- Fixed InMemorDatabase replace method to select document to replace correctly by @oskardudycz in #252
- Made reactors skip events that they have already processed. This is done by checking if the message checkpoint is greater than the last processed one. by @oskardudycz in #252
Full Changelog: 0.38.2...0.38.3