|
| 1 | +# Streaming data with Apache Kafka, Apache Flink and CrateDB. |
| 2 | + |
| 3 | +## About |
| 4 | + |
| 5 | +This example showcases what a data-streaming architecture leveraging Kafka and Flink could look |
| 6 | +like. |
| 7 | + |
| 8 | +We use. |
| 9 | + |
| 10 | +- Kafka (confluent) |
| 11 | +- Apache Flink |
| 12 | +- CrateDB |
| 13 | +- Python >=3.7<=3.11 |
| 14 | + |
| 15 | +## Overview |
| 16 | + |
| 17 | +An HTTP call is scheduled to run every 60 seconds on `weather_producer`, the API returns a JSON |
| 18 | +with the specified city's weather, the json is then sent through `Kafka`. |
| 19 | + |
| 20 | +`flink_consumer` is a flink application consuming the same kafka topic; |
| 21 | +upon receiving data, it sends the resulting datastream to the sink, which is CrateDB. |
| 22 | + |
| 23 | +Both `flink_consumer` and `weather_producer` are written using their respective Python Wrappers. |
| 24 | + |
| 25 | +[kafka-python](https://kafka-python.readthedocs.io/en/master/) |
| 26 | + |
| 27 | +[apache-flink](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/overview/) |
| 28 | + |
| 29 | +Everything is customizable via environment variables, the API schedule, the topic, credentials... |
| 30 | +etc. |
| 31 | + |
| 32 | +See `.env` for more details. |
| 33 | + |
| 34 | +## How to use |
| 35 | + |
| 36 | +The Docker Compose configuration will get you started quickly. |
| 37 | +You will need to fill in the API key of [Weather API](https://www.weatherapi.com/) |
| 38 | +into your local `.env` file. |
| 39 | + |
| 40 | +### Run the docker compose (and build the images) |
| 41 | + |
| 42 | +``` |
| 43 | +docker compose up -d --build |
| 44 | +``` |
| 45 | + |
| 46 | +### Stop the docker compose |
| 47 | + |
| 48 | +``` |
| 49 | +docker compose down |
| 50 | +``` |
| 51 | + |
| 52 | +### Poetry |
| 53 | + |
| 54 | +``` |
| 55 | +poetry install |
| 56 | +``` |
| 57 | + |
| 58 | +### Pip |
| 59 | + |
| 60 | +``` |
| 61 | +pip install -r requirements.txt |
| 62 | +``` |
| 63 | + |
| 64 | +## Notes |
| 65 | + |
| 66 | +### CrateDB initial settings. |
| 67 | + |
| 68 | +CrateDB stores the shard indexes on the file system by mapping a file into memory (mmap) |
| 69 | +You might need to set `max_map_count` to something higher than the usual default, like `262144`. |
| 70 | + |
| 71 | +You can do it by running `sysctl -w vm.max_map_count=262144`, |
| 72 | +for more information see: [this](https://cratedb.com/docs/guide/admin/bootstrap-checks.html#linux) |
| 73 | + |
| 74 | +### Mock API call. |
| 75 | + |
| 76 | +If you don't want to register in the weather api we use, you can use the |
| 77 | +provided function `mock_fetch_weather_data`, call this instead in the scheduler call. |
| 78 | + |
| 79 | +This is how it would look like: |
| 80 | + |
| 81 | +```python |
| 82 | +scheduler.enter( |
| 83 | + RUN_EVERY_SECONDS, |
| 84 | + 1, |
| 85 | + schedule_every, |
| 86 | + (RUN_EVERY_SECONDS, mock_fetch_weather_data, scheduler) |
| 87 | +) |
| 88 | +``` |
| 89 | + |
| 90 | +*After changing this, re-build the docker compose.* |
| 91 | + |
| 92 | +### Initial kafka topic. |
| 93 | + |
| 94 | +In this example the `Kafka` topic is only initialized the first data is sent to it, because of this |
| 95 | +the flink job could fail if it exceeds the default timeout (60) seconds, this might only happen |
| 96 | +if the API takes too long to respond *the very first time this project*. |
| 97 | + |
| 98 | +To solve this, you should [configure](https://kafka.apache.org/quickstart#quickstart_createtopic) |
| 99 | +the |
| 100 | +topics at boot time. This is recommended for production scenarios. |
| 101 | + |
| 102 | +If you are just testing things around, you can solve this by re-running `docker compose up -d`, it |
| 103 | +will only start `flink_job` and assuming everything went ok, the topic should already exist and |
| 104 | +work as expected. |
| 105 | + |
| 106 | +If it still fails, check if any other container/service is down, |
| 107 | +it could be a symptom of a wrong api token or an unresponsive Kafka server, for example. |
| 108 | + |
| 109 | +## Data and schema |
| 110 | + |
| 111 | +See `example.json` for the schema, as you can see in `weather_producer` and `flink_consumer`, schema |
| 112 | +manipulation is minimum, |
| 113 | +thanks to CrateDB's dynamic objects we only need to map `location` and `current` keys. |
| 114 | + |
| 115 | +For more information on dynamic objects |
| 116 | +see: [this](https://cratedb.com/blog/handling-dynamic-objects-in-cratedb) |
| 117 | + |
| 118 | +In `weather_producer` the `Kafka` producer directly serializes the json into a string. |
| 119 | + |
| 120 | +```python |
| 121 | +KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVER, |
| 122 | + value_serializer=lambda m: json.dumps(m).encode('utf-8')) |
| 123 | +``` |
| 124 | + |
| 125 | +In `flink_consumer` we use a `JSON` serializer and only specify the two main keys, |
| 126 | +`location` and `current` |
| 127 | + |
| 128 | +```python |
| 129 | +row_type_info = Types.ROW_NAMED(['location', 'current'], [Types.STRING(), Types.STRING()]) |
| 130 | +json_format = JsonRowDeserializationSchema.builder().type_info(row_type_info).build() |
| 131 | +``` |
| 132 | + |
| 133 | +If your format is not json, or if you want to specify the whole schema, adapt it as needed. |
| 134 | + |
| 135 | +[Here](https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html) |
| 136 | +you can find example of other formats like `csv` or `avro`. |
| 137 | + |
| 138 | +## Jars and versions. |
| 139 | + |
| 140 | +Jars are downloaded at build time to /app/jars, versions are pinned in the .env |
| 141 | + |
| 142 | +There is a `JARS_PATH` in `flink_consumer`, change it if you have the jars somewhere else. |
0 commit comments