1515package  main
1616
1717import  (
18+ 	"hash/fnv" 
1819	"io/ioutil" 
1920	"net/http" 
21+ 	"strings" 
2022
2123	"github.com/gin-gonic/gin" 
2224	"github.com/sirupsen/logrus" 
@@ -62,10 +64,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
6264		}
6365
6466		for  topic , metrics  :=  range  metricsPerTopic  {
65- 			t  :=  topic 
66- 			part  :=  kafka.TopicPartition {
67- 				Partition : kafka .PartitionAny ,
68- 				Topic :     & t ,
67+ 			parts  :=  strings .Split (topic , "|" )
68+ 			var  part  kafka.TopicPartition 
69+ 
70+ 			if  len (parts ) ==  1  {
71+ 				part  =  kafka.TopicPartition {
72+ 					Partition : kafka .PartitionAny ,
73+ 					Topic :     & parts [0 ],
74+ 				}
75+ 			} else  {
76+ 
77+ 				part  =  kafka.TopicPartition {
78+ 					Partition : getPartition (parts [0 ], parts [1 ]),
79+ 					Topic :     & parts [0 ],
80+ 				}
6981			}
7082			for  _ , metric  :=  range  metrics  {
7183				err  :=  producer .Produce (& kafka.Message {
@@ -83,3 +95,15 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
8395
8496	}
8597}
98+ 
99+ func  getPartition (topic  string , hashKey  string ) int32  {
100+ 	h  :=  fnv .New32a ()
101+ 	h .Write ([]byte (hashKey ))
102+ 
103+ 	v , ok  :=  topicPartitionCount .Load (topic )
104+ 	if  ! ok  {
105+ 		logrus .WithField ("topic" , topic ).Error ("did not find metadata requested topic" )
106+ 		return  0 
107+ 	}
108+ 	return  int32 (h .Sum32 () %  uint32 (v .(int )))
109+ }
0 commit comments