File tree Expand file tree Collapse file tree 1 file changed +16
-3
lines changed
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror Expand file tree Collapse file tree 1 file changed +16
-3
lines changed Original file line number Diff line number Diff line change @@ -33,7 +33,6 @@ public String offsetSyncsTopic(String clusterAlias) {
33
33
if (offsetSyncsTopic == null ) {
34
34
return super .offsetSyncsTopic (clusterAlias );
35
35
}
36
- log .info ("Using offset syncs topic: {}" , offsetSyncsTopic );
37
36
return offsetSyncsTopic ;
38
37
}
39
38
@@ -43,7 +42,6 @@ public String checkpointsTopic(String clusterAlias) {
43
42
if (checkpointsTopic == null ) {
44
43
return super .checkpointsTopic (clusterAlias );
45
44
}
46
- log .info ("Using checkpoints topic: {}" , checkpointsTopic );
47
45
return checkpointsTopic ;
48
46
}
49
47
@@ -53,7 +51,22 @@ public String heartbeatsTopic() {
53
51
if (heartbeatsTopic == null ) {
54
52
return super .heartbeatsTopic ();
55
53
}
56
- log .info ("Using heartbeats topic: {}" , heartbeatsTopic );
57
54
return heartbeatsTopic ;
58
55
}
56
+
57
+ @ Override
58
+ public boolean isCheckpointsTopic (String topic ) {
59
+ String checkpointsTopic = System .getenv (CHECKPOINTS_TOPIC_ENV_KEY );
60
+ return super .isCheckpointsTopic (topic ) || topic .equals (checkpointsTopic );
61
+ }
62
+
63
+ @ Override
64
+ public boolean isHeartbeatsTopic (String topic ) {
65
+ return super .isHeartbeatsTopic (topic ) || topic .equals (heartbeatsTopic ());
66
+ }
67
+
68
+ @ Override
69
+ public boolean isMM2InternalTopic (String topic ) {
70
+ return super .isMM2InternalTopic (topic ) || isHeartbeatsTopic (topic ) || isCheckpointsTopic (topic );
71
+ }
59
72
}
You can’t perform that action at this time.
0 commit comments