|
| 1 | +--- |
| 2 | +title: Use MirrorMaker 2 to replicate Apache Kafka topics - Azure HDInsight |
| 3 | +description: Learn how to use Use MirrorMaker 2 to replicate Apache Kafka topics |
| 4 | +ms.service: hdinsight |
| 5 | +ms.topic: how-to |
| 6 | +ms.custom: hdinsightactive |
| 7 | +ms.date: 03/10/2023 |
| 8 | +--- |
| 9 | + |
| 10 | +# Use MirrorMaker 2 to replicate Apache Kafka topics with Kafka on HDInsight |
| 11 | + |
| 12 | +Learn how to use Apache Kafka's mirroring feature to replicate topics to a secondary cluster. You can run mirroring as a continuous process, or intermittently, to migrate data from one cluster to another. |
| 13 | + |
| 14 | +In this article, you use mirroring to replicate topics between two HDInsight clusters. These clusters are in different virtual networks in different datacenters. |
| 15 | + |
| 16 | +> [!WARNING] |
| 17 | +> Don't use mirroring as a means to achieve fault-tolerance. The offset to items within a topic are different between the primary and secondary clusters, so clients can't use the two interchangeably. If you are concerned about fault tolerance, you should set replication for the topics within your cluster. For more information, see [Get started with Apache Kafka on HDInsight](apache-kafka-get-started.md). |
| 18 | +
|
| 19 | +## How Apache Kafka mirroring works |
| 20 | + |
| 21 | +Mirroring works by using the [MirrorMaker2](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0) tool, which is part of Apache Kafka. MirrorMaker consumes records from topics on the primary cluster, and then creates a local copy on the secondary cluster. MirrorMaker2 uses one (or more) *consumers* that read from the primary cluster, and a *producer* that writes to the local (secondary) cluster. |
| 22 | + |
| 23 | +The most useful mirroring setup for disaster recovery uses Kafka clusters in different Azure regions. To achieve this result, the virtual networks where the clusters reside peered together. |
| 24 | + |
| 25 | +The primary and secondary clusters can be different in the number of nodes and partitions, and offsets within the topics are different also. Mirroring maintains the key value that used for partitioning, so record order preserved on a per-key basis. |
| 26 | + |
| 27 | +### Mirroring across network boundaries |
| 28 | + |
| 29 | +If you need to mirror between Kafka clusters in different networks, there are the following more considerations: |
| 30 | + |
| 31 | +* **Gateways**: The networks must be able to communicate at the TCP/IP level. |
| 32 | + |
| 33 | +* **Server addressing**: You can choose to address your cluster nodes by using their IP addresses or fully qualified domain names. |
| 34 | + |
| 35 | + * **IP addresses**: If you configure your Kafka clusters to use IP address advertising, you can proceed with the mirroring setup by using the IP addresses of the broker nodes and ZooKeeper nodes. |
| 36 | + |
| 37 | + * **Domain names**: If you don't configure your Kafka clusters for IP address advertising, the clusters must be able to connect to each other by using fully qualified domain names (FQDNs). This requires a domain name system (DNS) server in each network that configured to forward requests to the other networks. When you're creating an Azure virtual network, instead of using the automatic DNS provided with the network, you must specify a custom DNS server and the IP address for the server. After you create the virtual network, you must then create an Azure virtual machine that uses that IP address. Then you install and configure DNS software on it. |
| 38 | + |
| 39 | + > [!IMPORTANT] |
| 40 | + > Create and configure the custom DNS server before installing HDInsight into the virtual network. There is no additional configuration required for HDInsight to use the DNS server configured for the virtual network. |
| 41 | +
|
| 42 | +For more information on connecting two Azure virtual networks, see [Configure a connection](../../vpn-gateway/vpn-gateway-vnet-vnet-rm-ps.md). |
| 43 | + |
| 44 | +## Mirroring architecture |
| 45 | + |
| 46 | +This architecture features two clusters in different resource groups and virtual networks: a primary and a secondary. |
| 47 | + |
| 48 | +### Creation steps |
| 49 | + |
| 50 | +1. Create two new resource groups: |
| 51 | + |
| 52 | + |Resource group | Location | |
| 53 | + |---|---| |
| 54 | + | kafka-primary-rg | Central US | |
| 55 | + | kafka-secondary-rg | North Central US | |
| 56 | + |
| 57 | +1. Create a new virtual network **kafka-primary-vnet** in **kafka-primary-rg**. Leave the default settings. |
| 58 | +1. Create a new virtual network **kafka-secondary-vnet** in **kafka-secondary-rg**, also with default settings. |
| 59 | + > [!NOTE] |
| 60 | + > Keep the address of both vnet nonoverlapping otherwise vnet peering won't work. |
| 61 | + > Example: |
| 62 | + > 1. kafka-primary-vnet can have address space 10.0.0.0 |
| 63 | + > 2. kafka-secondary-vnet can have address space 10.1.0.0 |
| 64 | + |
| 65 | +1. Create virtual network peerings. This step creates two peerings: one from **kafka-primary-vnet** to **kafka-secondary-vnet**, and one back from **kafka-secondary-vnet** to **kafka-primary-vnet**. |
| 66 | + 1. Select the **kafka-primary-vnet** virtual network. |
| 67 | + 1. Under **Settings**, select **Peerings**. |
| 68 | + 1. Select **Add**. |
| 69 | + 1. On the **Add peering** screen, enter the details as shown in the following screenshot. |
| 70 | + |
| 71 | + :::image type="content" source="./media/apache-kafka-mirror-maker2/peer-1.png" alt-text="Screenshot that shows HDInsight Kafka add virtual network peering primary to secondary." border="true"::: |
| 72 | + :::image type="content" source="./media/apache-kafka-mirror-maker2/peer-2.png" alt-text="Screenshot that shows HDInsight Kafka add virtual network peering from secondary to primary." border="true"::: |
| 73 | + |
| 74 | +1. Create two new Kafka clusters: |
| 75 | + |
| 76 | + | Cluster name | Resource group | Virtual network | Storage account | |
| 77 | + |---|---|---|---| |
| 78 | + | primary-kafka-cluster | kafka-primary-rg | kafka-primary-vnet | kafkaprimarystorage | |
| 79 | + | secondary-kafka-cluster | kafka-secondary-rg | kafka-secondary-vnet | kafkasecondarystorage | |
| 80 | + |
| 81 | + > [!NOTE] |
| 82 | + > From now onwards we will use `primary-kafka-cluster` as `PRIMARYCLUSTER` and `secondary-kafka-cluster` as `SECONDARYCLUSTER`. |
| 83 | +
|
| 84 | +## Configure IP address of PRIMARYCLUSTER worker nodes into client machine for DNS resolution |
| 85 | + |
| 86 | +1. Use head node of `SECONDARYCLUSTER` to run mirror maker script. Then we need IP address of worker nodes of PRIMARYCLUSTER in `/etc/hosts` file of `SECONDARYCLUSTER`. |
| 87 | + |
| 88 | +1. Connect to `PRIMARYCLUSTER` |
| 89 | + ``` |
| 90 | + |
| 91 | + ``` |
| 92 | + |
| 93 | +1. Execute the following command and get the entries of worker nodes IPs and FQDNs `cat /etc/hosts` |
| 94 | + |
| 95 | +1. Copy those entries and connect to `SECONDARYCLUSTER` and run |
| 96 | + ``` |
| 97 | + |
| 98 | + ``` |
| 99 | +1. Edit the `/etc/hosts` file of secondary cluster and add those entries here. |
| 100 | + |
| 101 | +1. After making the changes, the `/etc/hosts` file for `SECONDARYCLUSTER` looks like the given image. |
| 102 | + |
| 103 | + :::image type="content" source="./media/apache-kafka-mirror-maker2/ect-host.png" lightbox="./media/apache-kafka-mirror-maker2/ect-host.png" alt-text="Screenshot that shows etc hosts file output." border="false"::: |
| 104 | + |
| 105 | +1. Save and close the file. |
| 106 | + |
| 107 | +### Create multiple topics in PRIMARYCLUSTER |
| 108 | +1. Use this command to create topics and replace variables. |
| 109 | + |
| 110 | + ``` |
| 111 | + bash /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $KAFKAZKHOSTS --create --topic $TOPICNAME --partitions $NUM_PARTITIONS --replication-factor $REPLICATION_FACTOR |
| 112 | + ``` |
| 113 | +### Configure MirrorMaker2 in SECONDARYCLUSTER |
| 114 | + |
| 115 | +1. Now change the configuration in MirrorMaker2 properties file. |
| 116 | + |
| 117 | +1. Execute following command with admin privilege |
| 118 | + |
| 119 | + ``` |
| 120 | + sudo su |
| 121 | + vi /etc/kafka/conf/connect-mirror-maker.properties |
| 122 | + ``` |
| 123 | + > [!NOTE] |
| 124 | + > This article contains references to the term *blacklist*, a term that Microsoft no longer uses. When the term is removed from the software, we’ll remove it from this article. |
| 125 | + |
| 126 | +1. Property file looks like this. |
| 127 | + ``` |
| 128 | + # specify any number of cluster aliases |
| 129 | + clusters = source, destination |
| 130 | + |
| 131 | + # connection information for each cluster |
| 132 | + # This is a comma separated host:port pairs for each cluster |
| 133 | + # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts source.bootstrap.servers = wn0-src kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092 destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092 |
| 134 | + # enable and configure individual replication flows |
| 135 | + source->destination.enabled = true |
| 136 | + |
| 137 | + # regex which defines which topics gets replicated. For eg "foo-.*" |
| 138 | + source->destination.topics = .* |
| 139 | + groups=.* |
| 140 | + topics.blacklist="*.internal,__.*" |
| 141 | + |
| 142 | + # Setting replication factor of newly created remote topics |
| 143 | + Replication.factor=3 |
| 144 | + |
| 145 | + checkpoints.topic.replication.factor=1 |
| 146 | + heartbeats.topic.replication.factor=1 |
| 147 | + offset-syncs.topic.replication.factor=1 |
| 148 | + |
| 149 | + offset.storage.replication.factor=1 |
| 150 | + status.storage.replication.factor=1 |
| 151 | + config.storage.replication.factor=1 |
| 152 | + ``` |
| 153 | + |
| 154 | +1. Here source is your `PRIMARYCLUSTER` and destination is your `SECONDARYCLUSTR`. Replace it everywhere with correct name and replace `source.bootstrap.servers` and `destination.bootstrap.servers` with correct FQDN or IP of their respective worker nodes. |
| 155 | +1. You can control the topics that you want to replicate along with configurations using regular expressions. `replication.factor=3` makes the replication factor = 3 for all the topic which Mirror maker script creates by itself. |
| 156 | +1. Increase the replication factor from 1 to 3 for these topics |
| 157 | + ``` |
| 158 | + checkpoints.topic.replication.factor=1 |
| 159 | + heartbeats.topic.replication.factor=1 |
| 160 | + offset-syncs.topic.replication.factor=1 |
| 161 | + |
| 162 | + offset.storage.replication.factor=1 |
| 163 | + status.storage.replication.factor=1 |
| 164 | + config.storage.replication.factor=1 |
| 165 | + ``` |
| 166 | + > [!NOTE] |
| 167 | + > The reason being default insync replica for all the topics at the broker level is 2. Keeping replication factor=1, will throw exception while running mirrormaker2 |
| 168 | + |
| 169 | +1. You need to [Enable Auto Topic Creation](./apache-kafka-auto-create-topics.md) functionality and then mirror maker script replicates topics with the name as `PRIMARYCLUSTER.TOPICNAME` and same configs in secondary cluster. Save the file and we're good with configs. |
| 170 | +1. If you want to mirror topics on both sides, like `Primary to Secondary` and `Secondary to Primary` (active-active) then you can add extra configs |
| 171 | + ``` |
| 172 | + destination->source.enabled=true |
| 173 | + destination->source.topics = .* |
| 174 | + ``` |
| 175 | +1. Final Configuration file after changes should look like this |
| 176 | + ``` |
| 177 | + # specify any number of cluster aliases |
| 178 | + clusters = primary-kafka-cluster, secondary-kafka-cluster |
| 179 | + |
| 180 | + # connection information for each cluster |
| 181 | + # This is a comma separated host:port pairs for each cluster |
| 182 | + # for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari -> Hosts |
| 183 | + primary-kafka-cluster.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092 |
| 184 | + secondary-kafka-cluster.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092 |
| 185 | + |
| 186 | + # enable and configure individual replication flows |
| 187 | + primary-kafka-cluster->secondary-kafka-cluster.enabled = true |
| 188 | + |
| 189 | + # enable this for both side replication |
| 190 | + secondary-kafka-cluster->primary-kafka-cluster.enabled = true |
| 191 | +
|
| 192 | + # regex which defines which topics gets replicated. For eg "foo-.*" |
| 193 | + primary-kafka-cluster->secondary-kafka-cluster.topics = .* |
| 194 | + secondary-kafka-cluster->primary-kafka-cluster.topics = .* |
| 195 | + |
| 196 | + groups=.* |
| 197 | + topics.blacklist="*.internal,__.*" |
| 198 | + |
| 199 | + # Setting replication factor of newly created remote topics |
| 200 | + Replication.factor=3 |
| 201 | + |
| 202 | + checkpoints.topic.replication.factor=3 |
| 203 | + heartbeats.topic.replication.factor=3 |
| 204 | + offset-syncs.topic.replication.factor=3 |
| 205 | + offset.storage.replication.factor=3 |
| 206 | + status.storage.replication.factor=3 |
| 207 | + config.storage.replication.factor=3 |
| 208 | + ``` |
| 209 | + |
| 210 | +1. Start Mirror Maker2 in `SECONDARYCLUSTER` and it should run fine |
| 211 | + |
| 212 | + ``` |
| 213 | + /usr/hdp/current/kafka-broker |
| 214 | + ./bin/connect-mirror-maker.sh ./config/connect-mirror-maker.properties |
| 215 | + ``` |
| 216 | + |
| 217 | +1. Now start producer in `PRIMARYCLUSTER` |
| 218 | + |
| 219 | + ``` |
| 220 | + export clusterName='primary-kafka-cluster' |
| 221 | + export TOPICNAME='TestMirrorMakerTopic' |
| 222 | + export KAFKABROKERS='wn0-primar:9092' |
| 223 | + export KAFKAZKHOSTS='zk0-primar:2181' |
| 224 | + |
| 225 | + //Start Producer |
| 226 | + bash /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic $TOPICNAME |
| 227 | + ``` |
| 228 | +1. Now start consumer in `SECONDARYCLUSTER` |
| 229 | + |
| 230 | + ``` |
| 231 | + export clusterName='secondary-kafka-cluster' |
| 232 | + export TOPICNAME='TestMirrorMakerTopic' |
| 233 | + export KAFKABROKERS='wn0-second:9092' |
| 234 | + export KAFKAZKHOSTS='zk0-second:2181' |
| 235 | + |
| 236 | + # List all the topics whether they are replicated or not |
| 237 | + bash /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $KAFKAZKHOSTS --list |
| 238 | + |
| 239 | + # Start Consumer |
| 240 | + bash /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic $TOPICNAME --from-beginning |
| 241 | + ``` |
| 242 | + |
| 243 | +## Delete cluster |
| 244 | + |
| 245 | +[!INCLUDE [delete-cluster-warning](../includes/hdinsight-delete-cluster-warning.md)] |
| 246 | + |
| 247 | +The steps in this article created clusters in different Azure resource groups. To delete all the resources created, you can delete the two resource groups created: **kafka-primary-rg** and **kafka-secondary-rg**. Deleting the resource groups removes all of the resources created by following this article, including clusters, virtual networks, and storage accounts. |
| 248 | + |
| 249 | +## Next steps |
| 250 | + |
| 251 | +In this article, you learned how to use [MirrorMaker2](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0) to create a replica of an [Apache Kafka](https://kafka.apache.org/) cluster. |
| 252 | + |
| 253 | +Use the following links to discover other ways to work with Kafka: |
| 254 | + |
| 255 | +* [Apache Kafka MirrorMaker2 documentation](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0) at cwiki.apache.org. |
| 256 | +* [Get started with Apache Kafka on HDInsight](apache-kafka-get-started.md) |
| 257 | +* [Use Apache Spark with Apache Kafka on HDInsight](../hdinsight-apache-spark-with-kafka.md) |
| 258 | +* [Connect to Apache Kafka through an Azure virtual network](apache-kafka-connect-vpn-gateway.md) |
0 commit comments