|
| 1 | +# Word Count with Kafka Streams Interactive Queries |
| 2 | + |
| 3 | +This project demonstrates how to use Kafka Streams Interactive Queries to build a distributed, stateful stream processing application with queryable state stores. The example implements a real-time word counting application that exposes its state through a REST API. |
| 4 | + |
| 5 | +## Overview |
| 6 | + |
| 7 | +The application processes text messages from a Kafka topic, counts word occurrences, and stores the results in two state stores: |
| 8 | + |
| 9 | +- **word-count**: A key-value store containing all-time word counts |
| 10 | +- **windowed-word-count**: A windowed store containing per-minute word counts |
| 11 | + |
| 12 | +The state stores are distributed across multiple application instances, and the Interactive Queries API allows you to query any instance to retrieve data from any store, regardless of which instance actually hosts the data. |
| 13 | + |
| 14 | +## Architecture |
| 15 | + |
| 16 | +### Core Components |
| 17 | + |
| 18 | +- **WordCountInteractiveQueriesExample**: Main application that defines the stream processing topology |
| 19 | +- **WordCountInteractiveQueriesRestService**: Jetty-based REST API that exposes state store data via HTTP endpoints |
| 20 | +- **WordCountInteractiveQueriesDriver**: Sample data generator for testing |
| 21 | +- **MetadataService**: Handles discovery of which application instance contains specific data |
| 22 | +- **HostStoreInfo**: Represents metadata about application instances and their state stores |
| 23 | +- **KeyValueBean**: JSON-serializable response format for API endpoints |
| 24 | + |
| 25 | +### Stream Processing Topology |
| 26 | + |
| 27 | +1. **Input**: Reads text lines from `TextLinesTopic` |
| 28 | +2. **Processing**: |
| 29 | + - Splits text into words (lowercase, non-word characters as delimiters) |
| 30 | + - Groups by word |
| 31 | + - Counts occurrences |
| 32 | +3. **Output**: Maintains two state stores: |
| 33 | + - All-time word counts (KeyValue store) |
| 34 | + - 1-minute windowed word counts (Window store) |
| 35 | + |
| 36 | +## Run the application |
| 37 | + |
| 38 | +### Prerequisites |
| 39 | + |
| 40 | +* Java 17 or later |
| 41 | +* Maven 3.6 or later |
| 42 | +* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/) |
| 43 | +* The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) installed on your machine |
| 44 | + |
| 45 | +### 1. Start Kafka |
| 46 | + |
| 47 | +```bash |
| 48 | +confluent local kafka start |
| 49 | +``` |
| 50 | + |
| 51 | +Note the `Plaintext Ports` port, which is the port that you will specify as part of the bootstrap servers endpoint in later steps. |
| 52 | + |
| 53 | +### 2. Create the input topic |
| 54 | + |
| 55 | +```bash |
| 56 | +confluent local kafka topic create TextLinesTopic --partitions 3 --replication-factor 1 |
| 57 | +``` |
| 58 | + |
| 59 | +### 3. Build the application |
| 60 | + |
| 61 | +```bash |
| 62 | +mvn clean package -DskipTests |
| 63 | +``` |
| 64 | + |
| 65 | +### 4. Start Kafka Streams application instances |
| 66 | + |
| 67 | +Start multiple instances to see the distributed nature of the application. Pass the `Plaintext Ports` output when you started Kafka in place of the `<PLAINTEXT_PORT>` placeholder: |
| 68 | + |
| 69 | +**Instance 1 (Port 7070):** |
| 70 | + |
| 71 | +```bash |
| 72 | +java -cp target/kafka-streams-examples-8.0.0-standalone.jar \ |
| 73 | + io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample 7070 localhost:<PLAINTEXT_PORT> |
| 74 | +``` |
| 75 | + |
| 76 | +**Instance 2 (Port 7071):** |
| 77 | + |
| 78 | +```bash |
| 79 | +java -cp target/kafka-streams-examples-8.0.0-standalone.jar \ |
| 80 | + io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample 7071 localhost:<PLAINTEXT_PORT> |
| 81 | +``` |
| 82 | + |
| 83 | +### 5. Generate sample data |
| 84 | + |
| 85 | +```bash |
| 86 | +java -cp target/kafka-streams-examples-8.0.0-standalone.jar \ |
| 87 | + io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesDriver localhost:<PLAINTEXT_PORT> |
| 88 | +``` |
| 89 | + |
| 90 | +Or manually produce data: |
| 91 | + |
| 92 | +```bash |
| 93 | +confluent local kafka produce TextLinesTopic |
| 94 | + |
| 95 | +# Type some text lines: |
| 96 | +# hello world |
| 97 | +# kafka streams |
| 98 | +# hello kafka |
| 99 | +``` |
| 100 | + |
| 101 | +## Query the IQ-based REST APIs |
| 102 | + |
| 103 | +The application exposes the following endpoints on each instance: |
| 104 | + |
| 105 | +### Instance Discovery |
| 106 | + |
| 107 | +| Endpoint | Description | |
| 108 | +| --------------------------------------- | --------------------------------------- | |
| 109 | +| `GET /state/instances` | List all running instances | |
| 110 | +| `GET /state/instances/{storeName}` | List instances hosting a specific store | |
| 111 | +| `GET /state/instance/{storeName}/{key}` | Find instance containing a specific key | |
| 112 | + |
| 113 | +### Key-Value Store Queries |
| 114 | + |
| 115 | +| Endpoint | Description | |
| 116 | +| ---------------------------------------------------- | ---------------------------- | |
| 117 | +| `GET /state/keyvalue/{storeName}/{key}` | Get value for a specific key | |
| 118 | +| `GET /state/keyvalues/{storeName}/all` | Get all key-value pairs | |
| 119 | +| `GET /state/keyvalues/{storeName}/range/{from}/{to}` | Get key-value pairs in range | |
| 120 | + |
| 121 | +### Windowed Store Queries |
| 122 | + |
| 123 | +| Endpoint | Description | |
| 124 | +| --------------------------------------------------- | ----------------------------------------- | |
| 125 | +| `GET /state/windowed/{storeName}/{key}/{from}/{to}` | Get windowed values for key in time range | |
| 126 | + |
| 127 | +### Example API Calls |
| 128 | + |
| 129 | +```bash |
| 130 | +# List all instances |
| 131 | +curl http://localhost:7070/state/instances |
| 132 | + |
| 133 | +# List instances hosting the word-count store |
| 134 | +curl http://localhost:7070/state/instances/word-count |
| 135 | + |
| 136 | +# Get count for word "hello". The endpoint will act as a proxy to the instance hosting this key |
| 137 | +# if it's not local. |
| 138 | +curl http://localhost:7070/state/keyvalue/word-count/hello |
| 139 | + |
| 140 | +# Get all word counts on this instance. |
| 141 | +curl http://localhost:7070/state/keyvalues/word-count/all |
| 142 | + |
| 143 | +# Get word counts in range |
| 144 | +curl http://localhost:7070/state/keyvalues/word-count/range/a/m |
| 145 | + |
| 146 | +# Find which instance has the "hello" key |
| 147 | +curl http://localhost:7070/state/instance/word-count/hello |
| 148 | + |
| 149 | +# Get windowed counts for "hello" (timestamps in milliseconds) on this instance (does not proxy) |
| 150 | +curl "http://localhost:7070/state/windowed/windowed-word-count/hello/1754060800000/1754060956000" |
| 151 | +``` |
| 152 | + |
| 153 | +## Clean up |
| 154 | + |
| 155 | +When you are finished, stop the Kafka Streams application instances by entering `Ctrl+C` in the terminals where they are running. Then stop Kafka: |
| 156 | + |
| 157 | +```bash |
| 158 | +confluent local kafka stop |
| 159 | +``` |
| 160 | + |
| 161 | +## Key Features |
| 162 | + |
| 163 | +### Distributed State Management |
| 164 | + |
| 165 | +- State stores are automatically partitioned across application instances |
| 166 | +- Each instance only stores a subset of the data based on key partitioning |
| 167 | +- Cross-instance queries are automatically routed to the correct instance |
| 168 | + |
| 169 | +### Fault Tolerance |
| 170 | + |
| 171 | +- State stores are backed by Kafka topics for durability |
| 172 | +- Application instances can be restarted and will rebuild their local state |
| 173 | +- Multiple instances provide redundancy |
| 174 | + |
| 175 | +### Real-time Queries |
| 176 | + |
| 177 | +- Query state stores in real-time while stream processing continues |
| 178 | +- No need to wait for batch processing or external database updates |
| 179 | +- Sub-millisecond query latency for local data |
| 180 | + |
| 181 | +### Interactive Queries |
| 182 | + |
| 183 | +- Discover which instance contains specific data |
| 184 | +- Route queries to the appropriate instance automatically |
| 185 | +- Unified view across all instances through any endpoint |
| 186 | + |
| 187 | +## Resources |
| 188 | + |
| 189 | +- [Kafka Streams Documentation](https://kafka.apache.org/documentation/streams/) |
| 190 | +- [Interactive Queries](https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html) |
| 191 | +- [Confluent Documentation](https://docs.confluent.io/platform/current/streams/index.html) |
0 commit comments