You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: articles/hdinsight-aks/flink/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2.md
+27-7Lines changed: 27 additions & 7 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -16,7 +16,7 @@ Apache Flink uses file systems to consume and persistently store data, both for
16
16
17
17
*[Apache Flink cluster on HDInsight on AKS ](../flink/flink-create-cluster-portal.md)
18
18
*[Apache Kafka cluster on HDInsight](../../hdinsight/kafka/apache-kafka-get-started.md)
19
-
* You're required to ensure the network settings are taken care as described on [Using Apache Kafka on HDInsight](../flink/process-and-consume-data.md); that's to make sure HDInsight on AKS and HDInsight clusters are in the same Virtual Network
19
+
* You're required to ensure the network settings taken care as described on [Using Apache Kafka on HDInsight](../flink/process-and-consume-data.md). Make sure HDInsight on AKS and HDInsight clusters are in the same Virtual Network.
20
20
* Use MSI to access ADLS Gen2
21
21
* IntelliJ for development on an Azure VM in HDInsight on AKS Virtual Network
22
22
@@ -126,8 +126,14 @@ public class KafkaSinkToGen2 {
> Make sure to add classloader.resolve-order as ‘parent-first’ and hadoop.classpath.enable as `true`
183
+
184
+
1. Select Job Log aggregation to push job logs to storage account.
165
185
166
-
We are using Maven to package a jar onto local and submitting to Flink, and using Kafka to sink into ADLS Gen2.
186
+
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/enable-job-log.png" alt-text="Screenshot showing how to enable job log." lightbox="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/enable-job-log.png":::
167
187
168
-
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/submit-flink-job.png" alt-text="Diagram showing how to submit Flink Job." border="true" lightbox="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/submit-flink-job.png":::
188
+
1. You can see the job running.
169
189
170
-
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/submit-the-job-flink-ui.png" alt-text="Screenshot showing jar submission to Flink dashboard.":::
HDInsight on AKS provides a set of default configurations of Apache Flink for most properties and a few based on common application profiles. However, in case you're required to tweak Flink configuration properties to improve performance for certain applications with state usage, parallelism, or memory settings, you can change certain properties at cluster level using **Configuration management** section in HDInsight on AKS cluster.
13
+
HDInsight on AKS provides a set of default configurations of Apache Flink for most properties and a few based on common application profiles. However, in case you're required to tweak Flink configuration properties to improve performance for certain applications with state usage, parallelism, or memory settings, you can change Flink job configuration using Flink Jobs Section in HDInsight on AKS cluster.
14
14
15
-
1. Go to **Configuration Management** section on your Apache Flink cluster page
Here the checkpoint interval is changed at *Cluster level*.
24
-
25
-
3. Update the changes by clicking **OK** and then **Save**.
26
-
27
-
Once saved, the new configurations get updated in a few minutes (~5 minutes).
28
-
29
-
Configurations, which can be updated using Configuration Management Settings
30
-
31
-
`processMemory size:`
32
-
33
-
The default settings for the process memory size of or job manager and task manager would be the memory configured by the user during cluster creation.
34
-
35
-
This size can be configured by using the below configuration property. In-order to change task manager process memory, use this configuration
36
-
37
-
`taskmanager.memory.process.size : <value>`
38
-
39
-
Example:
40
-
`taskmanager.memory.process.size : 2000mb`
41
-
42
-
For job manager,
43
-
44
-
`jobmanager.memory.process.size : <value>`
45
-
46
-
> [!NOTE]
47
-
> The maximum configurable process memory is equal to the memory configured for `jobmanager/taskmanager`.
23
+
Here the checkpoint interval is changed at *Cluster level*.
24
+
25
+
1. Update the changes by clicking **OK** and then **Save**.
26
+
27
+
1.Once saved, the new configurations get updated in a few minutes (~5 minutes).
28
+
29
+
1.Configurations, which can be updated using Configuration Management Settings.
30
+
31
+
`processMemory size:`
32
+
33
+
1.The default settings for the process memory size of or job manager and task manager would be the memory configured by the user during cluster creation.
34
+
35
+
1.This size can be configured by using the below configuration property. In-order to change task manager process memory, use this configuration.
36
+
37
+
`taskmanager.memory.process.size : <value>`
38
+
39
+
Example:
40
+
`taskmanager.memory.process.size : 2000mb`
41
+
42
+
1.For job manager
43
+
44
+
`jobmanager.memory.process.size : <value>`
45
+
46
+
> [!NOTE]
47
+
> The maximum configurable process memory is equal to the memory configured for `jobmanager/taskmanager`.
48
48
49
49
## Checkpoint Interval
50
50
51
-
The checkpoint interval determines how often Flink triggers a checkpoint. it's defined in milliseconds and can be set using the following configuration property:
51
+
The checkpoint interval determines how often Flink triggers a checkpoint. Defined in milliseconds and can be set using the following configuration property
52
52
53
53
`execution.checkpoint.interval: <value>`
54
54
55
55
Default setting is 60,000 milliseconds (1 min), this value can be changed as desired.
56
56
57
57
## State Backend
58
58
59
-
The state backend determines how Flink manages and persists the state of your application. It impacts how checkpoints are stored. You can configure the `state backend using the following property:
59
+
The state backend determines how Flink manages and persists the state of your application. It impacts how checkpoints stored. You can configure the `state backend using the following property:
60
60
61
61
`state.backend: <value>`
62
62
63
-
By default Apache Flink clusters in HDInsight on AKS use Rocks db
63
+
By default Apache Flink clusters in HDInsight on AKS use Rocks DB.
64
64
65
65
## Checkpoint Storage Path
66
66
67
67
We allow persistent checkpoints by default by storing the checkpoints in `abfs` storage as configured by the user. Even if the job fails, since the checkpoints are persisted, it can be easily started with the latest checkpoint.
68
68
69
69
`state.checkpoints.dir: <path>`
70
-
Replace `<path>` with the desired path where the checkpoints are stored.
70
+
Replace `<path>` with the desired path where the checkpoints stored.
71
71
72
-
By default, it's stored in the storage account (ABFS), configured by the user. This value can be changed to any path desired as long as the Flink pods can access it.
72
+
By default, stored in the storage account (ABFS), configured by the user. This value can be changed to any path desired as long as the Flink pods can access it.
73
73
74
74
## Maximum Concurrent Checkpoints
75
75
@@ -88,40 +88,28 @@ Replace `<value>` with desired maximum number. By default we retain maximum five
88
88
89
89
We allow persistent savepoints by default by storing the savepoints in `abfs` storage (as configured by the user). If the user wants to stop and later start the job with a particular savepoint, they can configure this location.
90
90
state.checkpoints.dir: `<path>`
91
-
Replace` <path>` with the desired path where the savepoints are stored.
92
-
By default, it's stored in the storage account, configured by the user. (We support ABFS). This value can be changed to any path desired as long as the Flink pods can access it.
91
+
Replace` <path>` with the desired path where the savepoints stored.
92
+
By default, stored in the storage account, configured by the user. (We support ABFS). This value can be changed to any path desired as long as the Flink pods can access it.
93
93
94
94
## Job manager high availability
95
95
96
-
In HDInsight on AKS, Flink uses Kubernetes as backend. Even if the Job Manager fails in between due to any known/unknown issue, the pod is restarted within a few seconds. Hence, even if the job restarts due to this issue, the job is recovered back from the **latest checkpoint**.
96
+
In HDInsight on AKS, Flink uses Kubernetes as backend. Even if the Job Manager fails in between due to any known/unknown issue, the pod is restarted within a few seconds. Hence, even if the job restarts due to this issue, the job is recovered back from the **latest checkpoint**.
97
97
98
98
### FAQ
99
99
100
-
**Why does the Job failure in between
100
+
**Why does the Job failure in between.
101
101
Even if the jobs fail abruptly, if the checkpoints are happening continuously, then the job is restarted by default from the latest checkpoint.**
102
102
103
103
Change the job strategy in between?
104
104
There are use cases, where the job needs to be modified while in production due to some job level bug. During that time, the user can stop the job, which would automatically take a savepoint and save it in savepoint location.
1. Click on `savepoint` and wait for `savepoint` to be completed.
115
107
116
-
Later the user can start the job with bug fix pointing to the savepoint.
108
+
:::image type="content" source="./media/flink-configuration-management/save-point.png" alt-text="Screenshot showing save point options." lightbox="./media/flink-configuration-management/save-point.png":::
117
109
118
-
```
119
-
./bin/flink run <JOB_JAR> -d <SAVEPOINT_LOC>
120
-
root [ ~ ]# ./bin/flink run examples/streaming/StateMachineExample.jar -s abfs://[email protected]/8255a11812144c28b4ddf1068460c96b/savepoints/savepoint-60bdf2-7717485d15e3
121
-
```
122
-
Usage with built-in data generator: StateMachineExample [--error-rate `<probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]`
110
+
1. After savepoint completion, click on start and Start Job Tab will appear. Select the savepoint name from the dropdown. Edit any configurations if necessary. And click **OK**.
123
111
124
-
Usage with Kafka: `StateMachineExample --kafka-topic <topic> [--brokers <brokers>]`
112
+
:::image type="content" source="./media/flink-configuration-management/start-job.png" alt-text="Screenshot showing how to start job." lightbox="./media/flink-configuration-management/start-job.png":::
125
113
126
114
Since savepoint is provided in the job, the Flink knows from where to start processing the data.
0 commit comments