|
| 1 | +# Event Sourcing with Kafka and ksqlDB |
| 2 | + |
| 3 | +* [Introduction](#0b79795d3efc95b9976c7c5b933afce2) |
| 4 | +* [Example Domain](#8753dff3c2879207fa06ef1844b1ea4d) |
| 5 | +* [Event Sourcing and CQRS 101](#19025f75ca30ec4a46f55a6b9afdeba6) |
| 6 | + * [State-Oriented Persistence](#436b314e78fec59a76bad8b93b52ee75) |
| 7 | + * [Event Sourcing](#c4b3d1c8edab1825366ac1d541d8226f) |
| 8 | + * [CQRS](#b2cf9293622451d86574d2973398ca70) |
| 9 | + * [Advantages of Event Sourcing and CQRS](#d8818c2c5ba0364540a49273f684b85c) |
| 10 | +* [Requirements for Event Store](#70b356f41293ace9df0d04cd8175ac35) |
| 11 | +* [Solution Architecture](#9f6302143996033ebb94d536b860acc3) |
| 12 | + * [Permanent Storage](#205928bf89c3012be2e11d1e5e7ad01f) |
| 13 | + * [Optimistic concurrency control](#6eec4db0e612f3a70dab7d96c463e8f6) |
| 14 | + * [Loading current state](#323effe18de24bcc666f161931c903f3) |
| 15 | + * [Subscribe to all events by aggregate type](#784ff5dca3b046266edf61637822bbff) |
| 16 | + * [Checkpoints](#0b584912c4fa746206884e080303ed49) |
| 17 | + * [Drawbacks](#0cfc0523189294ac086e11c8e286ba2d) |
| 18 | +* [How to Run the Sample?](#53af957fc9dc9f7083531a00fe3f364e) |
| 19 | + |
| 20 | +## <a name="0b79795d3efc95b9976c7c5b933afce2"></a>Introduction |
| 21 | + |
| 22 | +PostgreSQL is the world's most advanced open source database. Also, PostgreSQL is suitable for Event |
| 23 | +Sourcing. |
| 24 | + |
| 25 | +This repository provides a sample of event sourced system that uses PostgreSQL as event store. |
| 26 | + |
| 27 | + |
| 28 | + |
| 29 | +## <a name="8753dff3c2879207fa06ef1844b1ea4d"></a>Example Domain |
| 30 | + |
| 31 | +The example domain is ride hailing. |
| 32 | + |
| 33 | +* A rider can place an order for a ride along a route specifying a price. |
| 34 | +* A driver can accept and complete an order. |
| 35 | +* An order can be cancelled before completion. |
| 36 | + |
| 37 | +## <a name="19025f75ca30ec4a46f55a6b9afdeba6"></a>Event Sourcing and CQRS 101 |
| 38 | + |
| 39 | +### <a name="436b314e78fec59a76bad8b93b52ee75"></a>State-Oriented Persistence |
| 40 | + |
| 41 | + |
| 42 | + |
| 43 | +### <a name="c4b3d1c8edab1825366ac1d541d8226f"></a>Event Sourcing |
| 44 | + |
| 45 | +Event sourcing persists the state of an entity as a sequence of immutable state-changing events. |
| 46 | + |
| 47 | + |
| 48 | + |
| 49 | +Whenever the state of an entity changes, a new event is appended to the list of events. |
| 50 | + |
| 51 | + |
| 52 | + |
| 53 | +Current state of an entity can be restored by replaying all its events. |
| 54 | + |
| 55 | +An entity in event sourcing is also referenced as an aggregate. |
| 56 | + |
| 57 | +### <a name="b2cf9293622451d86574d2973398ca70"></a>CQRS |
| 58 | + |
| 59 | +CQRS (Command-query responsibility segregation) stands for segregating the responsibility between |
| 60 | +commands (write requests) and queries (read requests). The write requests and the read requests are |
| 61 | +processed by different handlers. |
| 62 | + |
| 63 | +A command generates zero or more events or results in an error. |
| 64 | + |
| 65 | + |
| 66 | + |
| 67 | +Event sourcing is usually used in conjunction with CQRS. |
| 68 | + |
| 69 | + |
| 70 | + |
| 71 | +Events in event sourcing are a part of a bounded context and should not be used "as-is" for |
| 72 | +integration with other bounded contexts. Integration events representing the current state of an |
| 73 | +aggregate should be used for communication between bounded contexts instead of a raw event sourcing |
| 74 | +change events. |
| 75 | + |
| 76 | +### <a name="d8818c2c5ba0364540a49273f684b85c"></a>Advantages of Event Sourcing and CQRS |
| 77 | + |
| 78 | +* Having a true history of the system (audit and traceability). |
| 79 | +* Ability to put the system in any prior state (e.g. for debugging). |
| 80 | +* Read-side projections can be created as needed (later) from events. It allows responding to future |
| 81 | + needs and new requirements. |
| 82 | +* Independent scaling. CQRS allows We can scale the read and write databases independently of each |
| 83 | + other. |
| 84 | +* Optimized data schema for read database (e.g. the read databases can be denormalized). |
| 85 | +* Simpler queries (e.g. complex `JOIN` operations can be avoided). |
| 86 | + |
| 87 | +## <a name="70b356f41293ace9df0d04cd8175ac35"></a>Requirements for Event Store |
| 88 | + |
| 89 | +* Permanent storage. Store events forever. |
| 90 | +* Optimistic concurrency control. Prevent lost update anomaly (write-write conflicts). |
| 91 | +* Loading current state. Loading all previous events for the particular aggregate ID from an event |
| 92 | + store. |
| 93 | +* Subscribe to all events by aggregate type. Instead of subscribing to a single event stream that |
| 94 | + represents an aggregate. |
| 95 | +* Checkpoints. Store the event offset (a position in the stream) after handling it. Subscribe from |
| 96 | + the last known position instead of the stream start after the application restart. |
| 97 | + |
| 98 | +## <a name="9f6302143996033ebb94d536b860acc3"></a>Solution Architecture |
| 99 | + |
| 100 | +TBD |
| 101 | + |
| 102 | +### <a name="205928bf89c3012be2e11d1e5e7ad01f"></a>Permanent Storage |
| 103 | + |
| 104 | +TBD |
| 105 | + |
| 106 | +### <a name="6eec4db0e612f3a70dab7d96c463e8f6"></a>Optimistic concurrency control |
| 107 | + |
| 108 | +TBD |
| 109 | + |
| 110 | +### <a name="323effe18de24bcc666f161931c903f3"></a>Loading current state |
| 111 | + |
| 112 | +TBD |
| 113 | + |
| 114 | +### <a name="784ff5dca3b046266edf61637822bbff"></a>Subscribe to all events by aggregate type |
| 115 | + |
| 116 | +Dual writes problem occur when you need to update the database **and** send the message |
| 117 | +reliable/atomically and 2-phase commit (2PC) is not an option. |
| 118 | + |
| 119 | +PostgreSQL doesn't allow subscribing on changes, so the solution to the dual writes problem is |
| 120 | +Transactional outbox pattern. An *outbox* table is used to keep track of handled events. Outbox |
| 121 | +handler (aka *message relay* and *polling publisher*) processes new events by polling the |
| 122 | +database's *outbox* table. Event processing includes updating the read model and publishing |
| 123 | +integration events. |
| 124 | + |
| 125 | +### <a name="0b584912c4fa746206884e080303ed49"></a>Checkpoints |
| 126 | + |
| 127 | +TBD |
| 128 | + |
| 129 | +### <a name="0cfc0523189294ac086e11c8e286ba2d"></a>Drawbacks |
| 130 | + |
| 131 | +Polling database's *outbox* table for new messages with fixed delay introduces a big lag (delay |
| 132 | +between polls) in eventual consistency between the write and read models. |
| 133 | + |
| 134 | +**In case of outbox handler failure duplicates and out of order events are possible.** |
| 135 | + |
| 136 | +Consumers of integration events should be idempotent and filter duplicates and out of order |
| 137 | +integration events. |
| 138 | + |
| 139 | +If your system can't accept even small chance of duplicates or unordering, then persistent |
| 140 | +subscription listener must be extracted into a separate microservice and run in a single replica ( |
| 141 | +`.spec.replicas=1` in Kubernetes). This microservice must not be updated using RollingUpdate |
| 142 | +Deployment strategy. Recreate Deployment strategy must be used |
| 143 | +instead (`.spec.strategy.type=Recreate` |
| 144 | +in Kubernetes) when all existing Pods are killed before new ones are created. |
| 145 | + |
| 146 | +## <a name="53af957fc9dc9f7083531a00fe3f364e"></a>How to Run the Sample? |
| 147 | + |
| 148 | +1. Download & installOpenJDK 11 (LTS) at [AdoptOpenJDK](https://adoptopenjdk.net/). |
| 149 | + |
| 150 | +2. Download and install [Docker](https://docs.docker.com/engine/install/) |
| 151 | + and [Docker Compose](https://docs.docker.com/compose/install/). |
| 152 | + |
| 153 | +3. Build Java project and Docker image |
| 154 | + ```bash |
| 155 | + ./gradlew clean build jibDockerBuild -i |
| 156 | + ``` |
| 157 | + |
| 158 | +4. Run Kafka, ksqlDB and event-sourcing-app |
| 159 | + ```bash |
| 160 | + docker-compose up -d --scale event-sourcing-app=2 |
| 161 | + # wait a few minutes |
| 162 | + ``` |
| 163 | +5. Follow the logs of the application |
| 164 | + ```bash |
| 165 | + docker-compose logs -f event-sourcing-app |
| 166 | + ``` |
| 167 | +6. Install [curl](https://curl.se/) and [jq](https://stedolan.github.io/jq/). |
| 168 | + |
| 169 | +7. Run [`test.sh`](test.sh) script and see the output. |
| 170 | + |
| 171 | +The `test.sh` script has the following instructions: |
| 172 | + |
| 173 | +1. Place new order. |
| 174 | + ```bash |
| 175 | + ORDER_ID=$(curl -s -X POST http://localhost:8080/orders/ -d '{"riderId":"63770803-38f4-4594-aec2-4c74918f7165","price":"123.45","route":[{"address":"Київ, вулиця Полярна, 17А","lat":50.51980052414157,"lon":30.467197278948536},{"address":"Київ, вулиця Новокостянтинівська, 18В","lat":50.48509161169076,"lon":30.485170724431292}]}' -H 'Content-Type: application/json' | jq -r .orderId) |
| 176 | + sleep 2s |
| 177 | + ``` |
| 178 | +2. Get the placed order. |
| 179 | + ```bash |
| 180 | + curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq |
| 181 | + ``` |
| 182 | + ```json |
| 183 | + { |
| 184 | + "id": "466aafd1-288c-4299-be26-3be0c9c5aef1", |
| 185 | + "version": 1, |
| 186 | + "status": "PLACED", |
| 187 | + "riderId": "63770803-38f4-4594-aec2-4c74918f7165", |
| 188 | + "price": 123.45, |
| 189 | + "route": [ |
| 190 | + { |
| 191 | + "address": "Київ, вулиця Полярна, 17А", |
| 192 | + "lat": 50.51980052414157, |
| 193 | + "lon": 30.467197278948536 |
| 194 | + }, |
| 195 | + { |
| 196 | + "address": "Київ, вулиця Новокостянтинівська, 18В", |
| 197 | + "lat": 50.48509161169076, |
| 198 | + "lon": 30.485170724431292 |
| 199 | + } |
| 200 | + ], |
| 201 | + "placedDate": "2021-04-25T16:51:52.680374Z" |
| 202 | + } |
| 203 | + ``` |
| 204 | +3. Accept the order. |
| 205 | + ```bash |
| 206 | + curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"ACCEPTED","driverId":"2c068a1a-9263-433f-a70b-067d51b98378","version":1}' -H 'Content-Type: application/json' |
| 207 | + sleep 2s |
| 208 | + ``` |
| 209 | +4. Get the accepted order. |
| 210 | + ```bash |
| 211 | + curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq |
| 212 | + ``` |
| 213 | + ```json |
| 214 | + { |
| 215 | + "id": "466aafd1-288c-4299-be26-3be0c9c5aef1", |
| 216 | + "version": 2, |
| 217 | + "status": "ACCEPTED", |
| 218 | + "riderId": "63770803-38f4-4594-aec2-4c74918f7165", |
| 219 | + "price": 123.45, |
| 220 | + "route": [ |
| 221 | + { |
| 222 | + "address": "Київ, вулиця Полярна, 17А", |
| 223 | + "lat": 50.51980052414157, |
| 224 | + "lon": 30.467197278948536 |
| 225 | + }, |
| 226 | + { |
| 227 | + "address": "Київ, вулиця Новокостянтинівська, 18В", |
| 228 | + "lat": 50.48509161169076, |
| 229 | + "lon": 30.485170724431292 |
| 230 | + } |
| 231 | + ], |
| 232 | + "driverId": "2c068a1a-9263-433f-a70b-067d51b98378", |
| 233 | + "placedDate": "2021-04-25T16:51:52.680374Z", |
| 234 | + "acceptedDate": "2021-04-25T16:51:55.114824Z" |
| 235 | + } |
| 236 | + ``` |
| 237 | +5. Try to cancel an outdated version of the order to simulate lost update. |
| 238 | + ```bash |
| 239 | + curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":1}' -H 'Content-Type: application/json' | jq |
| 240 | + ``` |
| 241 | + ```json |
| 242 | + { |
| 243 | + "error": "Actual revision 1 doesn't match expected revision 0" |
| 244 | + } |
| 245 | + ``` |
| 246 | +6. Try to cancel a version of the order 'from the future' to simulate unordering. |
| 247 | + ```bash |
| 248 | + curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":3}' -H 'Content-Type: application/json' | jq |
| 249 | + ``` |
| 250 | + ```json |
| 251 | + { |
| 252 | + "error": "Actual revision 1 doesn't match expected revision 2" |
| 253 | + } |
| 254 | + ``` |
| 255 | +7. Complete the order. |
| 256 | + ```bash |
| 257 | + curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"COMPLETED","version":2}' -H 'Content-Type: application/json' |
| 258 | + sleep 2s |
| 259 | + ``` |
| 260 | +8. Get the completed order. |
| 261 | + ```bash |
| 262 | + curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq |
| 263 | + ``` |
| 264 | + ```json |
| 265 | + { |
| 266 | + "id": "466aafd1-288c-4299-be26-3be0c9c5aef1", |
| 267 | + "version": 3, |
| 268 | + "status": "COMPLETED", |
| 269 | + "riderId": "63770803-38f4-4594-aec2-4c74918f7165", |
| 270 | + "price": 123.45, |
| 271 | + "route": [ |
| 272 | + { |
| 273 | + "address": "Київ, вулиця Полярна, 17А", |
| 274 | + "lat": 50.51980052414157, |
| 275 | + "lon": 30.467197278948536 |
| 276 | + }, |
| 277 | + { |
| 278 | + "address": "Київ, вулиця Новокостянтинівська, 18В", |
| 279 | + "lat": 50.48509161169076, |
| 280 | + "lon": 30.485170724431292 |
| 281 | + } |
| 282 | + ], |
| 283 | + "driverId": "2c068a1a-9263-433f-a70b-067d51b98378", |
| 284 | + "placedDate": "2021-04-25T16:51:52.680374Z", |
| 285 | + "acceptedDate": "2021-04-25T16:51:55.114824Z", |
| 286 | + "completedDate": "2021-04-25T16:51:57.314153Z" |
| 287 | + } |
| 288 | + ``` |
| 289 | +9. Try to cancel a completed order to simulate business rule violation. |
| 290 | + ```bash |
| 291 | + curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":3}' -H 'Content-Type: application/json' | jq |
| 292 | + ``` |
| 293 | + ```json |
| 294 | + { |
| 295 | + "error": "Order in status COMPLETED can't be cancelled" |
| 296 | + } |
| 297 | + ``` |
| 298 | +10. Print integration events. |
| 299 | + ```bash |
| 300 | + docker exec -it kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic order-integration-events --from-beginning --property print.key=true --timeout-ms 3000 |
| 301 | + ``` |
| 302 | + ``` |
| 303 | + 466aafd1-288c-4299-be26-3be0c9c5aef1 {"order_id":"466aafd1-288c-4299-be26-3be0c9c5aef1","event_type":"OrderPlacedEvent","event_timestamp":1619369512680,"version":1,"status":"PLACED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}]} |
| 304 | + 466aafd1-288c-4299-be26-3be0c9c5aef1 {"order_id":"466aafd1-288c-4299-be26-3be0c9c5aef1","event_type":"OrderAcceptedEvent","event_timestamp":1619369515114,"version":2,"status":"ACCEPTED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"} |
| 305 | + 466aafd1-288c-4299-be26-3be0c9c5aef1 {"order_id":"466aafd1-288c-4299-be26-3be0c9c5aef1","event_type":"OrderCompletedEvent","event_timestamp":1619369517314,"version":3,"status":"COMPLETED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"} |
| 306 | + ``` |
| 307 | + |
| 308 | + |
0 commit comments