1818
1919import com .navercorp .pinpoint .collector .heatmap .config .HeatmapProperties ;
2020import com .navercorp .pinpoint .collector .heatmap .dao .HeatmapDao ;
21+ import com .navercorp .pinpoint .collector .heatmap .util .HashmapSortKeyUtils ;
2122import com .navercorp .pinpoint .collector .heatmap .vo .HeatmapStat ;
2223import com .navercorp .pinpoint .common .server .metric .dao .TopicNameManager ;
2324import org .apache .logging .log4j .LogManager ;
2425import org .apache .logging .log4j .Logger ;
26+ import org .springframework .beans .factory .annotation .Value ;
2527import org .springframework .kafka .core .KafkaTemplate ;
2628import org .springframework .stereotype .Repository ;
2729
2830import java .util .Objects ;
31+ import java .util .concurrent .ThreadLocalRandom ;
2932
3033/**
3134 * @author minwoo-jung
@@ -36,10 +39,12 @@ public class PinotHeatmapDao implements HeatmapDao {
3639 private final Logger logger = LogManager .getLogger (getClass ());
3740 private final KafkaTemplate <String , HeatmapStat > kafkaHeatmapStatTemplate ;
3841 private final TopicNameManager topicNameManager ;
42+ private final int keyPartitionCount ;
3943
4044 public PinotHeatmapDao (KafkaTemplate <String , HeatmapStat > kafkaHeatmapStatTemplate , HeatmapProperties heatmapProperties ) {
4145 this .kafkaHeatmapStatTemplate = Objects .requireNonNull (kafkaHeatmapStatTemplate , "kafkaHeatmapStatTemplate" );
4246 this .topicNameManager = new TopicNameManager (heatmapProperties .getHeatmapTopicPrefix (), heatmapProperties .getHeatMapTopicPaddingLength (), heatmapProperties .getHeatmapTopicCount ());
47+ this .keyPartitionCount = heatmapProperties .getHeatmapKeyPartitionCount ();
4348 }
4449
4550 @ Override
@@ -49,6 +54,14 @@ public void insert(HeatmapStat heatmapStat) {
4954 return ;
5055 }
5156 String topic = topicNameManager .getTopicName (heatmapStat .getApplicationName ());
52- kafkaHeatmapStatTemplate .send (topic , heatmapStat .getAgentId (), heatmapStat );
57+ kafkaHeatmapStatTemplate .send (topic , heatmapStat .getSortKey () + "#" + randomPartitionSuffix (), heatmapStat );
58+ }
59+
60+ private int randomPartitionSuffix () {
61+ if (keyPartitionCount <= 1 ) {
62+ return 0 ;
63+ }
64+ int nonNegativeRandom = ThreadLocalRandom .current ().nextInt () & 0x7fffffff ;
65+ return nonNegativeRandom % keyPartitionCount ;
5366 }
5467}
0 commit comments