diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/config/HeatmapProperties.java b/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/config/HeatmapProperties.java index 1a9d9ffa5eb3..6bcd6df1ca23 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/config/HeatmapProperties.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/config/HeatmapProperties.java @@ -31,7 +31,8 @@ public class HeatmapProperties { private int heatMapTopicPaddingLength; @Value("${kafka.heatmap.topic.count}") private int heatmapTopicCount; - + @Value("${kafka.heatmap.key.partition.count}") + private int heatmapKeyPartitionCount; public String getHeatmapTopicPrefix() { return heatmapTopicPrefix; @@ -44,4 +45,8 @@ public int getHeatMapTopicPaddingLength() { public int getHeatmapTopicCount() { return heatmapTopicCount; } + + public int getHeatmapKeyPartitionCount() { + return heatmapKeyPartitionCount; + } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/dao/pinot/PinotHeatmapDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/dao/pinot/PinotHeatmapDao.java index d71dbc16df5d..42099bdaaa46 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/dao/pinot/PinotHeatmapDao.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/heatmap/dao/pinot/PinotHeatmapDao.java @@ -26,6 +26,7 @@ import org.springframework.stereotype.Repository; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; /** * @author minwoo-jung @@ -36,10 +37,12 @@ public class PinotHeatmapDao implements HeatmapDao { private final Logger logger = LogManager.getLogger(getClass()); private final KafkaTemplate kafkaHeatmapStatTemplate; private final TopicNameManager topicNameManager; + private final int keyPartitionCount; public PinotHeatmapDao(KafkaTemplate kafkaHeatmapStatTemplate, HeatmapProperties heatmapProperties) { this.kafkaHeatmapStatTemplate = Objects.requireNonNull(kafkaHeatmapStatTemplate, "kafkaHeatmapStatTemplate"); this.topicNameManager = new TopicNameManager(heatmapProperties.getHeatmapTopicPrefix(), heatmapProperties.getHeatMapTopicPaddingLength(), heatmapProperties.getHeatmapTopicCount()); + this.keyPartitionCount = heatmapProperties.getHeatmapKeyPartitionCount(); } @Override @@ -49,6 +52,14 @@ public void insert(HeatmapStat heatmapStat) { return; } String topic = topicNameManager.getTopicName(heatmapStat.getApplicationName()); - kafkaHeatmapStatTemplate.send(topic, heatmapStat.getAgentId(), heatmapStat); + kafkaHeatmapStatTemplate.send(topic, heatmapStat.getSortKey() + "#" + randomPartitionSuffix(), heatmapStat); + } + + private int randomPartitionSuffix() { + if (keyPartitionCount <= 1) { + return 0; + } + int nonNegativeRandom = ThreadLocalRandom.current().nextInt() & 0x7fffffff; + return nonNegativeRandom % keyPartitionCount; } } diff --git a/collector/src/main/resources/heatmap/collector/profiles/local/heatmap-collector.properties b/collector/src/main/resources/heatmap/collector/profiles/local/heatmap-collector.properties index 80f445a9d6ee..05226a5987e0 100644 --- a/collector/src/main/resources/heatmap/collector/profiles/local/heatmap-collector.properties +++ b/collector/src/main/resources/heatmap/collector/profiles/local/heatmap-collector.properties @@ -1,3 +1,4 @@ kafka.heatmap.topic.count=1 kafka.heatmap.topic.prefix=heatmap-stat-app- -kafka.heatmap.topic.padding.length=2 \ No newline at end of file +kafka.heatmap.topic.padding.length=2 +kafka.heatmap.key.partition.count=1 \ No newline at end of file diff --git a/collector/src/main/resources/heatmap/collector/profiles/release/heatmap-collector.properties b/collector/src/main/resources/heatmap/collector/profiles/release/heatmap-collector.properties index 80f445a9d6ee..05226a5987e0 100644 --- a/collector/src/main/resources/heatmap/collector/profiles/release/heatmap-collector.properties +++ b/collector/src/main/resources/heatmap/collector/profiles/release/heatmap-collector.properties @@ -1,3 +1,4 @@ kafka.heatmap.topic.count=1 kafka.heatmap.topic.prefix=heatmap-stat-app- -kafka.heatmap.topic.padding.length=2 \ No newline at end of file +kafka.heatmap.topic.padding.length=2 +kafka.heatmap.key.partition.count=1 \ No newline at end of file