|
| 1 | +--- |
| 2 | +title: "Managed Service for Apache Flink" |
| 3 | +linkTitle: "Managed Service for Apache Flink" |
| 4 | +description: > |
| 5 | + Get started with Managed Service for Apache Flink on LocalStack |
| 6 | +tags: ["Ultimate"] |
| 7 | +--- |
| 8 | + |
| 9 | +{{< callout >}} |
| 10 | +This service was formerly known as 'Kinesis Data Analytics for Apache Flink'. |
| 11 | +{{< /callout >}} |
| 12 | + |
| 13 | +## Introduction |
| 14 | + |
| 15 | +[Apache Flink](https://flink.apache.org/) is a framework for building applications that process and analyze streaming data. |
| 16 | +[Managed Service for Apache Flink (MSF)](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html) is an AWS service that provides the underlying infrastructure and a hosted Apache Flink cluster that can run Apache Flink applications. |
| 17 | + |
| 18 | +LocalStack lets you to run Flink applications locally and implements several [AWS-compatible API operations]({{< ref "coverage_kinesisanalyticsv2" >}}). |
| 19 | + |
| 20 | +A separate Apache Flink cluster is started in [application mode](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/overview/#application-mode) for every Managed Flink application created. |
| 21 | +Flink cluster deployment on LocalStack consists of two separate containers for [JobManager](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/flink-architecture/#jobmanager) and [TaskManager](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/flink-architecture/#taskmanagers). |
| 22 | + |
| 23 | +{{< callout "note" >}} |
| 24 | +The emulated MSF provider was introduced and made the default in LocalStack v4.1. |
| 25 | + |
| 26 | +If you wish to use the older mock provider, you can set `PROVIDER_OVERRIDE_KINESISANALYTICSV2=legacy`. |
| 27 | +{{< /callout >}} |
| 28 | + |
| 29 | +## Getting Started |
| 30 | + |
| 31 | +This guide builds a demo Flink application and deploys it to LocalStack. |
| 32 | +The application generates synthetic records, processes them and sends the output to an S3 bucket. |
| 33 | + |
| 34 | +Start the LocalStack container using your preferred method. |
| 35 | + |
| 36 | +### Build Application Code |
| 37 | + |
| 38 | +Begin by cloning the AWS sample repository. |
| 39 | +We will use the [S3 Sink](https://github.com/localstack-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/S3Sink) application in this example. |
| 40 | + |
| 41 | +{{< command >}} |
| 42 | +$ git clone https://github.com/localstack-samples/amazon-managed-service-for-apache-flink-examples.git |
| 43 | +$ cd java/S3Sink |
| 44 | +{{< /command >}} |
| 45 | + |
| 46 | +Next, use [Maven](https://maven.apache.org/) to compile and package the Flink application into a jar. |
| 47 | + |
| 48 | +{{< command >}} |
| 49 | +$ mvn package |
| 50 | +{{< /command >}} |
| 51 | + |
| 52 | +The Flink application jar file will be placed in the `./target/flink-kds-s3.jar` directory. |
| 53 | + |
| 54 | +### Upload Application Code |
| 55 | + |
| 56 | +MSF requires that all application code resides in S3. |
| 57 | + |
| 58 | +Create an S3 bucket and upload the compiled Flink application jar. |
| 59 | + |
| 60 | +{{< command >}} |
| 61 | +$ awslocal s3api create-bucket --bucket flink-bucket |
| 62 | +$ awslocal s3api put-object --bucket flink-bucket --key job.jar --body ./target/flink-kds-s3.jar |
| 63 | +{{< /command >}} |
| 64 | + |
| 65 | +### Output Sink |
| 66 | + |
| 67 | +As mentioned earlier, this Flink application writes the output to an S3 bucket. |
| 68 | + |
| 69 | +Create the S3 bucket that will serve as the sink. |
| 70 | + |
| 71 | +{{< command >}} |
| 72 | +$ awslocal s3api create-bucket --bucket sink-bucket |
| 73 | +{{< /command >}} |
| 74 | + |
| 75 | +### Permissions |
| 76 | + |
| 77 | +MSF requires a service execution role which allows it to connect to other services. |
| 78 | +Without the proper permissions policy and role, this example application will not be able to connect to S3 sink bucket to output the result. |
| 79 | + |
| 80 | +Create an IAM role for the running MSF application to assume. |
| 81 | + |
| 82 | +```json |
| 83 | +# role.json |
| 84 | +{ |
| 85 | + "Version": "2012-10-17", |
| 86 | + "Statement": [ |
| 87 | + { |
| 88 | + "Effect": "Allow", |
| 89 | + "Principal": {"Service": "kinesisanalytics.amazonaws.com"}, |
| 90 | + "Action": "sts:AssumeRole" |
| 91 | + } |
| 92 | + ] |
| 93 | +} |
| 94 | +``` |
| 95 | + |
| 96 | +{{< command >}} |
| 97 | +$ awslocal iam create-role --role-name msaf-role --assume-role-policy-document file://role.json |
| 98 | +{{< /command >}} |
| 99 | + |
| 100 | +Next create add a permissions policy to this role that permits read and write access to S3. |
| 101 | + |
| 102 | +```json |
| 103 | +# policy.json |
| 104 | +{ |
| 105 | + "Version": "2012-10-17", |
| 106 | + "Statement": [ |
| 107 | + { |
| 108 | + "Effect": "Allow", |
| 109 | + "Action": ["s3:GetObject", "s3:GetObjectVersion", "s3:PutObject"], |
| 110 | + "Resource": "*" |
| 111 | + } |
| 112 | + ] |
| 113 | +} |
| 114 | +``` |
| 115 | + |
| 116 | +{{< command >}} |
| 117 | +$ awslocal iam put-role-policy --role-name msaf-role --policy-name msaf-policy --policy-document file://policy.json |
| 118 | +{{< /command >}} |
| 119 | + |
| 120 | +Now, when the running MSF application assumes this role, it will have the necessary permissions to write to the S3 sink. |
| 121 | + |
| 122 | +### Deploy Application |
| 123 | + |
| 124 | +With all prerequisite resources in place, the Flink application can now be created and started. |
| 125 | + |
| 126 | +{{< command >}} |
| 127 | +$ awslocal kinesisanalyticsv2 create-application \ |
| 128 | + --application-name msaf-app \ |
| 129 | + --runtime-environment FLINK-1_20 \ |
| 130 | + --application-mode STREAMING \ |
| 131 | + --service-execution-role arn:aws:iam::000000000000:role/msaf-role \ |
| 132 | + --application-configuration '{ |
| 133 | + "ApplicationCodeConfiguration": { |
| 134 | + "CodeContent": { |
| 135 | + "S3ContentLocation": { |
| 136 | + "BucketARN": "arn:aws:s3:::flink-bucket", |
| 137 | + "FileKey": "job.jar" |
| 138 | + } |
| 139 | + }, |
| 140 | + "CodeContentType": "ZIPFILE" |
| 141 | + }, |
| 142 | + "EnvironmentProperties": { |
| 143 | + "PropertyGroups": [{ |
| 144 | + "PropertyGroupId": "bucket", "PropertyMap": {"name": "sink-bucket"} |
| 145 | + }] |
| 146 | + } |
| 147 | + }' |
| 148 | + |
| 149 | +$ awslocal kinesisanalyticsv2 start-application --application-name msaf-app |
| 150 | +{{< /command >}} |
| 151 | + |
| 152 | +Once the Flink cluster is up and running, the application will stream the results to the sink S3 bucket. |
| 153 | +You can verify this with: |
| 154 | + |
| 155 | +{{< command >}} |
| 156 | +$ awslocal s3api list-objects --bucket sink-bucket |
| 157 | +{{< /command >}} |
| 158 | + |
| 159 | +## CloudWatch Logging |
| 160 | + |
| 161 | +LocalStack MSF supports [CloudWatch Logs integration](https://docs.aws.amazon.com/managed-flink/latest/java/cloudwatch-logs.html) to help monitor the Flink cluster for application events or configuration problems. |
| 162 | +The logging option can be added at the time of creating the Flink application using the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) operation. |
| 163 | +Logging options can also be managed at a later point using the [AddApplicationCloudWatchLoggingOption](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_AddApplicationCloudWatchLoggingOption.html) and [DeleteApplicationCloudWatchLoggingOption](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationCloudWatchLoggingOption.html) operations. |
| 164 | + |
| 165 | +There are following prerequisites for CloudWatch Logs integration: |
| 166 | +- You must create the application's log group and log stream. |
| 167 | + Flink will not create it for you. |
| 168 | +- You must add the permissions your application needs to write to the log stream to the service execution role. |
| 169 | + Generally the following IAM actions are sufficient: `logs:DescribeLogGroups`, `logs:DescribeLogStreams` and `logs:PutLogEvents` |
| 170 | + |
| 171 | +To add a logging option: |
| 172 | + |
| 173 | +{{< command >}} |
| 174 | +$ awslocal kinesisanalyticsv2 add-application-cloud-watch-logging-option \ |
| 175 | + --application-name msaf-app \ |
| 176 | + --cloud-watch-logging-option '{"LogStreamARN": "arn:aws:logs:us-east-1:000000000000:log-group:msaf-log-group:log-stream:msaf-log-stream"}' |
| 177 | +{ |
| 178 | + "ApplicationARN": "arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app", |
| 179 | + "ApplicationVersionId": 2, |
| 180 | + "CloudWatchLoggingOptionDescriptions": [ |
| 181 | + { |
| 182 | + "CloudWatchLoggingOptionId": "1.1", |
| 183 | + "LogStreamARN": "arn:aws:logs:us-east-1:000000000000:log-group:msaf-log-group:log-stream:msaf-log-stream" |
| 184 | + } |
| 185 | + ] |
| 186 | +} |
| 187 | +{{< /command >}} |
| 188 | + |
| 189 | +{{< callout >}} |
| 190 | +Enabling CloudWatch Logs integration has a significant performance hit. |
| 191 | +{{< /callout >}} |
| 192 | + |
| 193 | +Configured logging options can be retrieved using [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html): |
| 194 | + |
| 195 | +{{< command >}} |
| 196 | +$ awslocal kinesisanalyticsv2 describe-application --application-name msaf-app | jq .ApplicationDetail.CloudWatchLoggingOptionDescriptions |
| 197 | +[ |
| 198 | + { |
| 199 | + "CloudWatchLoggingOptionId": "1.1", |
| 200 | + "LogStreamARN": "arn:aws:logs:us-east-1:000000000000:log-group:msaf-log-group:log-stream:msaf-log-stream" |
| 201 | + } |
| 202 | +] |
| 203 | +{{< /command >}} |
| 204 | + |
| 205 | +Log events can be retrieved from CloudWatch Logs using the appropriate operation. |
| 206 | +To retrieve all events: |
| 207 | + |
| 208 | +{{< command >}} |
| 209 | +$ awslocal logs get-log-events --log-group-name msaf-log-group --log-stream-name msaf-log-stream |
| 210 | +{{< /command >}} |
| 211 | + |
| 212 | +{{< callout >}} |
| 213 | +Logs events are reported to CloudWatch every 10 seconds. |
| 214 | +{{< /callout >}} |
| 215 | + |
| 216 | +LocalStack reports both Flink application and Flink framework logs to CloudWatch. |
| 217 | +However, certain extended information such as stack traces may be missing. |
| 218 | +You may obtain this information by execing into the Flink Docker container created by LocalStack and inspecting `/opt/flink/log`. |
| 219 | + |
| 220 | +## Resource Tagging |
| 221 | + |
| 222 | +You can manage [resource tags](https://docs.aws.amazon.com/managed-flink/latest/java/how-tagging.html) using [TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html), [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html) and [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html). |
| 223 | +Tags can also be specified when creating the Flink application using the [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) operation. |
| 224 | + |
| 225 | +{{< command >}} |
| 226 | +$ awslocal kinesisanalyticsv2 tag-resource \ |
| 227 | + --resource-arn arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app \ |
| 228 | + --tags Key=country,Value=SE |
| 229 | + |
| 230 | +$ awslocal kinesisanalyticsv2 list-tags-for-resource \ |
| 231 | + --resource-arn arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app |
| 232 | +{ |
| 233 | + "Tags": [ |
| 234 | + { |
| 235 | + "Key": "country", |
| 236 | + "Value": "SE" |
| 237 | + } |
| 238 | + ] |
| 239 | +} |
| 240 | + |
| 241 | +$ awslocal kinesisanalyticsv2 untag-resource \ |
| 242 | + --resource-arn arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app \ |
| 243 | + --tag-keys country |
| 244 | +{{< /command >}} |
| 245 | + |
| 246 | +## Supported Flink Versions |
| 247 | + |
| 248 | +| Flink version | Supported by LocalStack | Supported by Apache | |
| 249 | +|:---:|:---:|:---:| |
| 250 | +| 1.20.0 | yes | yes | |
| 251 | +| 1.19.1 | yes | yes | |
| 252 | +| 1.18.1 | yes | yes | |
| 253 | +| 1.15.2 | yes | no | |
| 254 | +| 1.13.1 | yes | no | |
| 255 | + |
| 256 | +## Limitations |
| 257 | + |
| 258 | +- Application versions are not maintained |
| 259 | +- Only S3 zipfile code is supported |
| 260 | +- Values of 20,000 ms for `execution.checkpointing.interval` and 5,000 ms for `execution.checkpointing.min-pause` are used for checkpointing. |
| 261 | + They can not be overridden. |
| 262 | +- In-place [version upgrades](https://docs.aws.amazon.com/managed-flink/latest/java/how-in-place-version-upgrades.html) and [roll-backs](https://docs.aws.amazon.com/managed-flink/latest/java/how-system-rollbacks.html) are not supported |
| 263 | +- [Snapshot/savepoint management](https://docs.aws.amazon.com/managed-flink/latest/java/how-snapshots.html) is not implemented |
| 264 | +- CloudTrail integration and CloudWatch metrics is not implemented. |
| 265 | + The application logging level defaults to `INFO` and can not be overridden. |
| 266 | +- Parallelism is limited to the default value of 1, with one TaskManager that has one [Task Slot](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/flink-architecture/#task-slots-and-resources) allocated. |
| 267 | + [Parallelism configuration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkApplicationConfiguration.html#APIReference-Type-FlinkApplicationConfiguration-ParallelismConfiguration) provided on Flink application creation or update is ignored. |
0 commit comments