Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 6 additions & 33 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
---
name: CI

on:
pull_request:
branches: [ main ]
branches: [main]
push:
branches: [ main ]
branches: [main]

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
java: [11, 17]
java: [25]
steps:
- uses: actions/checkout@v3
- name: Set up JDK
Expand All @@ -26,35 +27,7 @@ jobs:
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
- name: Get main for spotless
run: git fetch origin main
- name: Build with Maven
run: mvn -B verify
publish:
if: github.event_name == 'push' && contains(github.ref, 'main')
runs-on: ubuntu-latest
needs: build
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
check-latest: true
distribution: temurin
java-version: 11
server-id: ossrh-snapshots
server-username: MAVEN_USERNAME
server-password: MAVEN_PASSWORD
- uses: actions/cache@v3
env:
cache-name: cache-maven-artifacts
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
- name: Install XML utils
run: sudo apt update && sudo apt install libxml2-utils
- name: Set project version
run: echo "PROJECT_VERSION=$(xmllint --xpath '/*[local-name()="project"]/*[local-name()="version"]/text()' pom.xml)" >> $GITHUB_ENV
- name: Publish to the Maven Central Repository
run: if [[ "$PROJECT_VERSION" =~ .*SNAPSHOT ]]; then mvn -B deploy; fi
env:
MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }}
MAVEN_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}
96 changes: 20 additions & 76 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,74 +1,17 @@
# Flusswerk Application Example
This application simulates a simple indexing application to show the Flusswerk framework in action.

## Overview

The data processing logic is split in three components:

The [`Reader`](src/main/java/com/github/dbmdz/flusswerk/example/flow/Reader.java) receives a message from RabbitMQ and loads the corresponding document (as instance of `Document`).

```java
@Component
public class Reader implements Function<IndexMessage, Document> {

@Override
public Document apply(IndexMessage indexMessage) {
Document document;
try {
document = loadDocument(indexMessage.getItemId());
} catch (IOException exception) {
throw new StopProcessingException(
"Could not load document for id {}", indexMessage.getItemId())
.causedBy(exception);
}
return document;
}

private Document loadDocument(String itemId) throws IOException {
// ...
}

}
```

The [`Transformer`](src/main/java/com/github/dbmdz/flusswerk/example/flow/Transformer.java) then takes the document and builds the required data for the Indexing API (an `IndexDocument`):

```java
@Component
public class Transformer implements Function<Document, IndexDocument> {
This application demonstrates the Flusswerk framework in action. A Flusswerk application typically takes an incoming message from a RabbitMQ queue, does something, and then may send any number of messages to other topics for further processing.

@Override
public IndexDocument apply(Document document) {
IndexDocument indexDocument = new IndexDocument();
// ...
return indexDocument;
}
}
```


The [`Writer`](src/main/java/com/github/dbmdz/flusswerk/example/flow/Writer.java) finally takes the processed data, writes it to the Indexing API and sends messages to notify the next processing application.
The data processing logic is in [DemoProcessor](src/main/java/dev/mdz/flusswerk/example/flow/DemoProcessor.java). Usually all kinds of things can go wrong when processing data: Data might be corrupt, backends fail,... Flusswerk provides a simple way to handle these error cases with special exceptions.

```java
@Component
public class Writer implements Function<IndexDocument, Message> {
We simulate the three general error cases with the field `issue` in the incoming message:

private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class);

@Override
public Message apply(IndexDocument indexDocument) {
String id = (String) indexDocument.get("id");
try {
sendToSearchService(indexDocument);
} catch (Exception exception) {
throw new RetryProcessingException(
"Could not index document for id %s, will try again later", id)
.causedBy(exception);
}
return new RefreshWebsiteMessage(id, "search");
}
}
```
| Case | Field value | Behavior |
| ------------------------------- | --------------------------- | --------- |
| No error, everything just works | `EVERYTHING_FINE` (default) | This is the usual case, everything works as expected, data is processed. |
| Temporary error | `TEMPORARY` | Sometimes there are issues that will resolve themselves, and we can try processing the message again later. A typical example would be a temporary network issue or an API service that is restarting. |
| Permanent error | `PERMANENT` | In some cases it is clear that trying again later won't make a difference, e.g. if the data is corrupt. Then we stop processing for good. |

## Try yourself:

Expand All @@ -77,26 +20,27 @@ To try yourself, get the repository and RabbitMQ-Server:
```bash
$ git clone https://github.com/dbmdz/flusswerk-example.git
$ cd flusswerk-example
$ docker-compose up
$ docker-compose up -d
```

Then start the `flusswerk-example` Application from your IDE and open the RabbitMQ-Management UI at http://localhost:15672 (Login in as `guest`/`guest`).

Drop the following message into the queue `search.index`:
Drop the following message into the queue `incoming`:

```json
{ "itemId": "42", "tracingId": "12345" }
{ "id": "42" }
```

In the queue `search.publish`, you will find the outgoing message send by the `Writer`:
You now should find a message in the queue `next`.

Now try the error cases:

```json
{
"envelope":{},
"tracingId":"12345",
"itemId":"42",
"source":"search"
}
{ "id": "42", "issue": "TEMPORARY" }
```

The field `envelope` contains Flusswerk specific metadata and is usually only used by the Framework itself.
and

```json
{ "id": "42", "issue": "PERMANENT" }
```
Loading