Skip to content

Commit 65e4a5f

Browse files
committed
[add] cleaned subscriber code. improved readability
1 parent e26dbf7 commit 65e4a5f

File tree

1 file changed

+65
-60
lines changed

1 file changed

+65
-60
lines changed

subscriber.go

Lines changed: 65 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/mediocregopher/radix"
88
"io/ioutil"
99
"log"
10-
"math/rand"
1110
"os"
1211
"os/signal"
1312
"strings"
@@ -34,13 +33,12 @@ type testResult struct {
3433
Addresses []string `json:"Addresses"`
3534
}
3635

37-
func subscriberRoutine(addr string, subscriberName string, channel string, messages_per_channel int64, printMessages bool, stop chan struct{}, wg *sync.WaitGroup) {
36+
func subscriberRoutine(addr string, subscriberName string, channel string, printMessages bool, stop chan struct{}, wg *sync.WaitGroup) {
3837
// tell the caller we've stopped
3938
defer wg.Done()
4039

41-
conn, _, _, msgCh, tick := bootstrapPubSub(addr, subscriberName, channel)
40+
conn, _, _, msgCh, _ := bootstrapPubSub(addr, subscriberName, channel)
4241
defer conn.Close()
43-
defer tick.Stop()
4442

4543
for {
4644
select {
@@ -76,15 +74,14 @@ func bootstrapPubSub(addr string, subscriberName string, channel string) (radix.
7674
if err != nil {
7775
panic(err)
7876
}
79-
tickTime := 1 + rand.Intn(2)
80-
tick := time.NewTicker(time.Duration(tickTime) * time.Second)
8177

82-
return conn, err, ps, msgCh, tick
78+
return conn, err, ps, msgCh, nil
8379
}
8480

8581
func main() {
8682
host := flag.String("host", "127.0.0.1", "redis host.")
8783
port := flag.String("port", "6379", "redis port.")
84+
subscribers_placement := flag.String("subscribers-placement-per-channel", "dense", "(dense,sparse) dense - Place all subscribers to channel in a specific shard. sparse- spread the subscribers across as many shards possible, in a round-robin manner.")
8885
channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).")
8986
channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).")
9087
subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.")
@@ -107,51 +104,15 @@ func main() {
107104
}
108105

109106
if *distributeSubscribers {
110-
// Create a normal redis connection
111-
conn, err := radix.Dial("tcp", fmt.Sprintf("%s:%s", *host, *port))
112-
if err != nil {
113-
panic(err)
114-
}
115-
var topology radix.ClusterTopo
116-
err = conn.Do(radix.FlatCmd(&topology, "CLUSTER", "SLOTS"))
117-
if err != nil {
118-
log.Fatal(err)
119-
}
120-
121-
for _, slot := range topology.Map() {
122-
slot_host := strings.Split(slot.Addr, ":")[0]
123-
slot_port := strings.Split(slot.Addr, ":")[1]
124-
if strings.Compare(slot_host, "127.0.0.1") == 0 {
125-
slot.Addr = fmt.Sprintf("%s:%s", *host, slot_port)
126-
}
127-
nodes = append(nodes, slot)
128-
nodesAddresses = append(nodesAddresses, slot.Addr)
129-
node_subscriptions_count = append(node_subscriptions_count, 0)
130-
}
131-
conn.Close()
107+
nodes, nodesAddresses, node_subscriptions_count = getClusterNodesFromTopology(host, port, nodes, nodesAddresses, node_subscriptions_count)
132108
} else {
133-
nodes = []radix.ClusterNode{}
134-
ports := strings.Split(*port, ",")
135-
for idx, nhost := range strings.Split(*host, ",") {
136-
node := radix.ClusterNode{
137-
Addr: fmt.Sprintf("%s:%s", nhost, ports[idx]),
138-
ID: "",
139-
Slots: nil,
140-
SecondaryOfAddr: "",
141-
SecondaryOfID: "",
142-
}
143-
nodes = append(nodes, node)
144-
nodesAddresses = append(nodesAddresses, node.Addr)
145-
node_subscriptions_count = append(node_subscriptions_count, 0)
146-
}
147-
109+
nodes, nodesAddresses, node_subscriptions_count = getClusterNodesFromArgs(nodes, port, host, nodesAddresses, node_subscriptions_count)
148110
}
149111

150112
if strings.Compare(*client_output_buffer_limit_pubsub, "") != 0 {
151113
checkClientOutputBufferLimitPubSub(nodes, client_output_buffer_limit_pubsub)
152114
}
153115

154-
// a channel to tell `tick()` and `tock()` to stop
155116
stopChan := make(chan struct{})
156117

157118
// a WaitGroup for the goroutines to tell us they've stopped
@@ -162,18 +123,17 @@ func main() {
162123
total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber
163124
fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages))
164125

165-
// Launch routine to accumulate message count
166-
//go accumulate()
167-
168-
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
169-
for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ {
170-
nodes_pos := channel_id % len(nodes)
171-
node_subscriptions_count[nodes_pos]++
172-
addr := nodes[nodes_pos]
173-
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
174-
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
175-
wg.Add(1)
176-
go subscriberRoutine(addr.Addr, subscriberName, channel, *messages_per_channel_subscriber, *printMessages, stopChan, &wg)
126+
if strings.Compare(*subscribers_placement, "dense") == 0 {
127+
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
128+
for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ {
129+
nodes_pos := channel_id % len(nodes)
130+
node_subscriptions_count[nodes_pos]++
131+
addr := nodes[nodes_pos]
132+
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
133+
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
134+
wg.Add(1)
135+
go subscriberRoutine(addr.Addr, subscriberName, channel, *printMessages, stopChan, &wg)
136+
}
177137
}
178138
}
179139

@@ -183,14 +143,15 @@ func main() {
183143
w := new(tabwriter.Writer)
184144

185145
tick := time.NewTicker(time.Duration(*client_update_tick) * time.Second)
186-
var connections []radix.Conn
187-
closed, start_time, duration, totalMessages, messageRateTs := updateCLI(nodes, connections, node_subscriptions_count, tick, c, total_messages, w, *test_time)
146+
closed, start_time, duration, totalMessages, messageRateTs := updateCLI(tick, c, total_messages, w, *test_time)
188147
messageRate := float64(totalMessages) / float64(duration.Seconds())
148+
189149
fmt.Fprint(w, fmt.Sprintf("#################################################\nTotal Duration %f Seconds\nMessage Rate %f\n#################################################\n", duration.Seconds(), messageRate))
190150
fmt.Fprint(w, "\r\n")
191151
w.Flush()
192152

193153
if strings.Compare(*json_out_file, "") != 0 {
154+
194155
res := testResult{
195156
StartTime: start_time.Unix(),
196157
Duration: duration.Seconds(),
@@ -226,7 +187,51 @@ func main() {
226187
wg.Wait()
227188
}
228189

229-
func updateCLI(nodes []radix.ClusterNode, connections []radix.Conn, node_subscriptions_count []int, tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) {
190+
func getClusterNodesFromArgs(nodes []radix.ClusterNode, port *string, host *string, nodesAddresses []string, node_subscriptions_count []int) ([]radix.ClusterNode, []string, []int) {
191+
nodes = []radix.ClusterNode{}
192+
ports := strings.Split(*port, ",")
193+
for idx, nhost := range strings.Split(*host, ",") {
194+
node := radix.ClusterNode{
195+
Addr: fmt.Sprintf("%s:%s", nhost, ports[idx]),
196+
ID: "",
197+
Slots: nil,
198+
SecondaryOfAddr: "",
199+
SecondaryOfID: "",
200+
}
201+
nodes = append(nodes, node)
202+
nodesAddresses = append(nodesAddresses, node.Addr)
203+
node_subscriptions_count = append(node_subscriptions_count, 0)
204+
}
205+
return nodes, nodesAddresses, node_subscriptions_count
206+
}
207+
208+
func getClusterNodesFromTopology(host *string, port *string, nodes []radix.ClusterNode, nodesAddresses []string, node_subscriptions_count []int) ([]radix.ClusterNode, []string, []int) {
209+
// Create a normal redis connection
210+
conn, err := radix.Dial("tcp", fmt.Sprintf("%s:%s", *host, *port))
211+
if err != nil {
212+
panic(err)
213+
}
214+
var topology radix.ClusterTopo
215+
err = conn.Do(radix.FlatCmd(&topology, "CLUSTER", "SLOTS"))
216+
if err != nil {
217+
log.Fatal(err)
218+
}
219+
220+
for _, slot := range topology.Map() {
221+
slot_host := strings.Split(slot.Addr, ":")[0]
222+
slot_port := strings.Split(slot.Addr, ":")[1]
223+
if strings.Compare(slot_host, "127.0.0.1") == 0 {
224+
slot.Addr = fmt.Sprintf("%s:%s", *host, slot_port)
225+
}
226+
nodes = append(nodes, slot)
227+
nodesAddresses = append(nodesAddresses, slot.Addr)
228+
node_subscriptions_count = append(node_subscriptions_count, 0)
229+
}
230+
conn.Close()
231+
return nodes, nodesAddresses, node_subscriptions_count
232+
}
233+
234+
func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) {
230235

231236
start := time.Now()
232237
prevTime := time.Now()

0 commit comments

Comments
 (0)