Skip to content

Commit 7d5927f

Browse files
authored
Merge pull request #70 from dbmdz/refactoring
Refactor Flusswerk example
2 parents 2387d3d + f828df1 commit 7d5927f

File tree

29 files changed

+333
-933
lines changed

29 files changed

+333
-933
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1+
---
12
name: CI
23

34
on:
45
pull_request:
5-
branches: [ main ]
6+
branches: [main]
67
push:
7-
branches: [ main ]
8+
branches: [main]
89

910
jobs:
1011
build:
1112
runs-on: ubuntu-latest
1213
strategy:
1314
matrix:
14-
java: [11, 17]
15+
java: [25]
1516
steps:
1617
- uses: actions/checkout@v3
1718
- name: Set up JDK
@@ -26,35 +27,7 @@ jobs:
2627
with:
2728
path: ~/.m2/repository
2829
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
30+
- name: Get main for spotless
31+
run: git fetch origin main
2932
- name: Build with Maven
3033
run: mvn -B verify
31-
publish:
32-
if: github.event_name == 'push' && contains(github.ref, 'main')
33-
runs-on: ubuntu-latest
34-
needs: build
35-
steps:
36-
- uses: actions/checkout@v3
37-
- name: Set up JDK 11
38-
uses: actions/setup-java@v3
39-
with:
40-
check-latest: true
41-
distribution: temurin
42-
java-version: 11
43-
server-id: ossrh-snapshots
44-
server-username: MAVEN_USERNAME
45-
server-password: MAVEN_PASSWORD
46-
- uses: actions/cache@v3
47-
env:
48-
cache-name: cache-maven-artifacts
49-
with:
50-
path: ~/.m2/repository
51-
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
52-
- name: Install XML utils
53-
run: sudo apt update && sudo apt install libxml2-utils
54-
- name: Set project version
55-
run: echo "PROJECT_VERSION=$(xmllint --xpath '/*[local-name()="project"]/*[local-name()="version"]/text()' pom.xml)" >> $GITHUB_ENV
56-
- name: Publish to the Maven Central Repository
57-
run: if [[ "$PROJECT_VERSION" =~ .*SNAPSHOT ]]; then mvn -B deploy; fi
58-
env:
59-
MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }}
60-
MAVEN_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}

README.md

Lines changed: 20 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,17 @@
11
# Flusswerk Application Example
2-
This application simulates a simple indexing application to show the Flusswerk framework in action.
3-
42
## Overview
53

6-
The data processing logic is split in three components:
7-
8-
The [`Reader`](src/main/java/com/github/dbmdz/flusswerk/example/flow/Reader.java) receives a message from RabbitMQ and loads the corresponding document (as instance of `Document`).
9-
10-
```java
11-
@Component
12-
public class Reader implements Function<IndexMessage, Document> {
13-
14-
@Override
15-
public Document apply(IndexMessage indexMessage) {
16-
Document document;
17-
try {
18-
document = loadDocument(indexMessage.getItemId());
19-
} catch (IOException exception) {
20-
throw new StopProcessingException(
21-
"Could not load document for id {}", indexMessage.getItemId())
22-
.causedBy(exception);
23-
}
24-
return document;
25-
}
26-
27-
private Document loadDocument(String itemId) throws IOException {
28-
// ...
29-
}
30-
31-
}
32-
```
33-
34-
The [`Transformer`](src/main/java/com/github/dbmdz/flusswerk/example/flow/Transformer.java) then takes the document and builds the required data for the Indexing API (an `IndexDocument`):
35-
36-
```java
37-
@Component
38-
public class Transformer implements Function<Document, IndexDocument> {
4+
This application demonstrates the Flusswerk framework in action. A Flusswerk application typically takes an incoming message from a RabbitMQ queue, does something, and then may send any number of messages to other topics for further processing.
395

40-
@Override
41-
public IndexDocument apply(Document document) {
42-
IndexDocument indexDocument = new IndexDocument();
43-
// ...
44-
return indexDocument;
45-
}
46-
}
47-
```
48-
49-
50-
The [`Writer`](src/main/java/com/github/dbmdz/flusswerk/example/flow/Writer.java) finally takes the processed data, writes it to the Indexing API and sends messages to notify the next processing application.
6+
The data processing logic is in [DemoProcessor](src/main/java/dev/mdz/flusswerk/example/flow/DemoProcessor.java). Usually all kinds of things can go wrong when processing data: Data might be corrupt, backends fail,... Flusswerk provides a simple way to handle these error cases with special exceptions.
517

52-
```java
53-
@Component
54-
public class Writer implements Function<IndexDocument, Message> {
8+
We simulate the three general error cases with the field `issue` in the incoming message:
559

56-
private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class);
57-
58-
@Override
59-
public Message apply(IndexDocument indexDocument) {
60-
String id = (String) indexDocument.get("id");
61-
try {
62-
sendToSearchService(indexDocument);
63-
} catch (Exception exception) {
64-
throw new RetryProcessingException(
65-
"Could not index document for id %s, will try again later", id)
66-
.causedBy(exception);
67-
}
68-
return new RefreshWebsiteMessage(id, "search");
69-
}
70-
}
71-
```
10+
| Case | Field value | Behavior |
11+
| ------------------------------- | --------------------------- | --------- |
12+
| No error, everything just works | `EVERYTHING_FINE` (default) | This is the usual case, everything works as expected, data is processed. |
13+
| Temporary error | `TEMPORARY` | Sometimes there are issues that will resolve themselves, and we can try processing the message again later. A typical example would be a temporary network issue or an API service that is restarting. |
14+
| Permanent error | `PERMANENT` | In some cases it is clear that trying again later won't make a difference, e.g. if the data is corrupt. Then we stop processing for good. |
7215

7316
## Try yourself:
7417

@@ -77,26 +20,27 @@ To try yourself, get the repository and RabbitMQ-Server:
7720
```bash
7821
$ git clone https://github.com/dbmdz/flusswerk-example.git
7922
$ cd flusswerk-example
80-
$ docker-compose up
23+
$ docker-compose up -d
8124
```
8225

8326
Then start the `flusswerk-example` Application from your IDE and open the RabbitMQ-Management UI at http://localhost:15672 (Login in as `guest`/`guest`).
8427

85-
Drop the following message into the queue `search.index`:
28+
Drop the following message into the queue `incoming`:
8629

8730
```json
88-
{ "itemId": "42", "tracingId": "12345" }
31+
{ "id": "42" }
8932
```
9033

91-
In the queue `search.publish`, you will find the outgoing message send by the `Writer`:
34+
You now should find a message in the queue `next`.
35+
36+
Now try the error cases:
9237

9338
```json
94-
{
95-
"envelope":{},
96-
"tracingId":"12345",
97-
"itemId":"42",
98-
"source":"search"
99-
}
39+
{ "id": "42", "issue": "TEMPORARY" }
10040
```
10141

102-
The field `envelope` contains Flusswerk specific metadata and is usually only used by the Framework itself.
42+
and
43+
44+
```json
45+
{ "id": "42", "issue": "PERMANENT" }
46+
```

0 commit comments

Comments
 (0)