1515package  main
1616
1717import  (
18+ 	"fmt" 
19+ 	"hash/fnv" 
1820	"io/ioutil" 
1921	"net/http" 
22+ 	"strings" 
2023
2124	"github.com/gin-gonic/gin" 
2225	"github.com/sirupsen/logrus" 
@@ -61,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
6164			return 
6265		}
6366
64- 		for  topic , metrics  :=  range  metricsPerTopic  {
65- 			t  :=  topic 
67+ 		for  topicAndHashKey , metrics  :=  range  metricsPerTopic  {
68+ 
69+ 			topic , partitionID , err  :=  getPartitionAndTopic (topicAndHashKey )
70+ 			if  err  !=  nil  {
71+ 				continue 
72+ 			}
73+ 
6674			part  :=  kafka.TopicPartition {
67- 				Partition : kafka . PartitionAny ,
68- 				Topic :     & t ,
75+ 				Partition : partitionID ,
76+ 				Topic :     & topic ,
6977			}
7078			for  _ , metric  :=  range  metrics  {
7179				err  :=  producer .Produce (& kafka.Message {
@@ -83,3 +91,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
8391
8492	}
8593}
94+ 
95+ func  getPartitionAndTopic (topic  string ) (string , int32 , error ) {
96+ 	parts  :=  strings .Split (topic , "|" )
97+ 
98+ 	if  len (parts ) ==  1  {
99+ 		return  parts [0 ], kafka .PartitionAny , nil 
100+ 	}
101+ 	h  :=  fnv .New32a ()
102+ 	h .Write ([]byte (parts [1 ]))
103+ 
104+ 	v , ok  :=  topicPartitionCount .Load (parts [0 ])
105+ 	if  ! ok  {
106+ 		logrus .WithField ("topic" , parts [0 ]).Error ("did not find metadata requested topic" )
107+ 		return  topic , kafka .PartitionAny , fmt .Errorf ("could not" )
108+ 	}
109+ 	return  parts [0 ], int32 (h .Sum32 () %  uint32 (v .(int ))), nil 
110+ }
0 commit comments