Skip to content

Commit e26dbf7

Browse files
committed
[add] improved subscriber performance. displaying received message rate on subscriber
1 parent 1f10c9f commit e26dbf7

File tree

2 files changed

+109
-65
lines changed

2 files changed

+109
-65
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,8 @@ bin/
8282
.Trashes
8383
ehthumbs.db
8484
Thumbs.db
85+
86+
87+
# Json Results #
88+
################
89+
*.json

subscriber.go

Lines changed: 104 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,54 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"flag"
56
"fmt"
67
"github.com/mediocregopher/radix"
8+
"io/ioutil"
79
"log"
810
"math/rand"
911
"os"
1012
"os/signal"
11-
"strconv"
1213
"strings"
1314
"sync"
15+
"sync/atomic"
1416
"text/tabwriter"
1517
"time"
1618
)
1719

18-
func subscriberRoutine(addr string, subscriberName string, channel string, printMessages bool, stop chan struct{}, wg *sync.WaitGroup) {
20+
var totalMessages uint64
21+
22+
type testResult struct {
23+
StartTime int64 `json:"StartTime"`
24+
Duration float64 `json:"Duration"`
25+
MessageRate float64 `json:"MessageRate"`
26+
TotalMessages uint64 `json:"TotalMessages"`
27+
TotalSubscriptions int `json:"TotalSubscriptions"`
28+
ChannelMin int `json:"ChannelMin"`
29+
ChannelMax int `json:"ChannelMax"`
30+
SubscribersPerChannel int `json:"SubscribersPerChannel"`
31+
MessagesPerChannel int64 `json:"MessagesPerChannel"`
32+
MessageRateTs []float64 `json:"MessageRateTs"`
33+
OSSDistributedSlots bool `json:"OSSDistributedSlots"`
34+
Addresses []string `json:"Addresses"`
35+
}
36+
37+
func subscriberRoutine(addr string, subscriberName string, channel string, messages_per_channel int64, printMessages bool, stop chan struct{}, wg *sync.WaitGroup) {
1938
// tell the caller we've stopped
2039
defer wg.Done()
2140

22-
conn, err, ps, msgCh, tick := bootstrapPubSub(addr, subscriberName, channel)
41+
conn, _, _, msgCh, tick := bootstrapPubSub(addr, subscriberName, channel)
2342
defer conn.Close()
2443
defer tick.Stop()
2544

2645
for {
2746
select {
28-
case <-tick.C:
29-
err = ps.Ping()
30-
if err != nil {
31-
//try to bootstrap again
32-
conn, err, ps, msgCh, tick = bootstrapPubSub(addr, subscriberName, channel)
33-
defer conn.Close()
34-
defer tick.Stop()
35-
if err != nil {
36-
panic(err)
37-
}
38-
}
39-
// loop
40-
4147
case msg := <-msgCh:
4248
if printMessages {
4349
fmt.Println(fmt.Sprintf("received message in channel %s. Message: %s", msg.Channel, msg.Message))
4450
}
45-
51+
atomic.AddUint64(&totalMessages, 1)
4652
break
4753
case <-stop:
4854
return
@@ -70,7 +76,7 @@ func bootstrapPubSub(addr string, subscriberName string, channel string) (radix.
7076
if err != nil {
7177
panic(err)
7278
}
73-
tickTime := 10 + rand.Intn(10)
79+
tickTime := 1 + rand.Intn(2)
7480
tick := time.NewTicker(time.Duration(tickTime) * time.Second)
7581

7682
return conn, err, ps, msgCh, tick
@@ -82,16 +88,24 @@ func main() {
8288
channel_minimum := flag.Int("channel-minimum", 1, "channel ID minimum value ( each channel has a dedicated thread ).")
8389
channel_maximum := flag.Int("channel-maximum", 100, "channel ID maximum value ( each channel has a dedicated thread ).")
8490
subscribers_per_channel := flag.Int("subscribers-per-channel", 1, "number of subscribers per channel.")
91+
messages_per_channel_subscriber := flag.Int64("messages", 0, "Number of total messages per subscriber per channel.")
92+
json_out_file := flag.String("json-out-file", "", "Name of json output file, if not set, will not print to json.")
8593
client_update_tick := flag.Int("client-update-tick", 1, "client update tick.")
94+
test_time := flag.Int("test-time", 0, "Number of seconds to run the test, after receiving the first message.")
8695
subscribe_prefix := flag.String("subscriber-prefix", "channel-", "prefix for subscribing to channel, used in conjunction with key-minimum and key-maximum.")
8796
client_output_buffer_limit_pubsub := flag.String("client-output-buffer-limit-pubsub", "", "Specify client output buffer limits for clients subscribed to at least one pubsub channel or pattern. If the value specified is different that the one present on the DB, this setting will apply.")
8897
distributeSubscribers := flag.Bool("oss-cluster-api-distribute-subscribers", false, "read cluster slots and distribute subscribers among them.")
8998
printMessages := flag.Bool("print-messages", false, "print messages.")
9099
flag.Parse()
91-
100+
totalMessages = 0
92101
var nodes []radix.ClusterNode
102+
var nodesAddresses []string
93103
var node_subscriptions_count []int
94104

105+
if *test_time != 0 && *messages_per_channel_subscriber != 0 {
106+
log.Fatal(fmt.Errorf("--messages and --test-time are mutially exclusive ( please specify one or the other )"))
107+
}
108+
95109
if *distributeSubscribers {
96110
// Create a normal redis connection
97111
conn, err := radix.Dial("tcp", fmt.Sprintf("%s:%s", *host, *port))
@@ -111,6 +125,7 @@ func main() {
111125
slot.Addr = fmt.Sprintf("%s:%s", *host, slot_port)
112126
}
113127
nodes = append(nodes, slot)
128+
nodesAddresses = append(nodesAddresses, slot.Addr)
114129
node_subscriptions_count = append(node_subscriptions_count, 0)
115130
}
116131
conn.Close()
@@ -126,6 +141,7 @@ func main() {
126141
SecondaryOfID: "",
127142
}
128143
nodes = append(nodes, node)
144+
nodesAddresses = append(nodesAddresses, node.Addr)
129145
node_subscriptions_count = append(node_subscriptions_count, 0)
130146
}
131147

@@ -143,100 +159,123 @@ func main() {
143159
total_channels := *channel_maximum - *channel_minimum + 1
144160
subscriptions_per_node := total_channels / len(nodes)
145161
total_subscriptions := total_channels * *subscribers_per_channel
146-
fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d", total_subscriptions, subscriptions_per_node))
162+
total_messages := int64(total_subscriptions) * *messages_per_channel_subscriber
163+
fmt.Println(fmt.Sprintf("Total subcriptions: %d. Subscriptions per node %d. Total messages: %d", total_subscriptions, subscriptions_per_node, total_messages))
164+
165+
// Launch routine to accumulate message count
166+
//go accumulate()
147167

148168
for channel_id := *channel_minimum; channel_id <= *channel_maximum; channel_id++ {
149169
for channel_subscriber_number := 1; channel_subscriber_number <= *subscribers_per_channel; channel_subscriber_number++ {
150170
nodes_pos := channel_id % len(nodes)
151-
node_subscriptions_count[nodes_pos] = node_subscriptions_count[nodes_pos] + 1
171+
node_subscriptions_count[nodes_pos]++
152172
addr := nodes[nodes_pos]
153173
channel := fmt.Sprintf("%s%d", *subscribe_prefix, channel_id)
154174
subscriberName := fmt.Sprintf("subscriber#%d-%s%d", channel_subscriber_number, *subscribe_prefix, channel_id)
155175
wg.Add(1)
156-
go subscriberRoutine(addr.Addr, subscriberName, channel, *printMessages, stopChan, &wg)
176+
go subscriberRoutine(addr.Addr, subscriberName, channel, *messages_per_channel_subscriber, *printMessages, stopChan, &wg)
157177
}
158-
159178
}
160179

161180
// listen for C-c
162181
c := make(chan os.Signal, 1)
163182
signal.Notify(c, os.Interrupt)
183+
w := new(tabwriter.Writer)
164184

165185
tick := time.NewTicker(time.Duration(*client_update_tick) * time.Second)
166186
var connections []radix.Conn
167-
if updateCLI(nodes, connections, node_subscriptions_count, tick, c) {
187+
closed, start_time, duration, totalMessages, messageRateTs := updateCLI(nodes, connections, node_subscriptions_count, tick, c, total_messages, w, *test_time)
188+
messageRate := float64(totalMessages) / float64(duration.Seconds())
189+
fmt.Fprint(w, fmt.Sprintf("#################################################\nTotal Duration %f Seconds\nMessage Rate %f\n#################################################\n", duration.Seconds(), messageRate))
190+
fmt.Fprint(w, "\r\n")
191+
w.Flush()
192+
193+
if strings.Compare(*json_out_file, "") != 0 {
194+
res := testResult{
195+
StartTime: start_time.Unix(),
196+
Duration: duration.Seconds(),
197+
MessageRate: messageRate,
198+
TotalMessages: totalMessages,
199+
TotalSubscriptions: total_subscriptions,
200+
ChannelMin: *channel_minimum,
201+
ChannelMax: *channel_maximum,
202+
SubscribersPerChannel: *subscribers_per_channel,
203+
MessagesPerChannel: *messages_per_channel_subscriber,
204+
MessageRateTs: messageRateTs,
205+
OSSDistributedSlots: *distributeSubscribers,
206+
Addresses: nodesAddresses,
207+
}
208+
file, err := json.MarshalIndent(res, "", " ")
209+
if err != nil {
210+
log.Fatal(err)
211+
}
212+
213+
err = ioutil.WriteFile(*json_out_file, file, 0644)
214+
if err != nil {
215+
log.Fatal(err)
216+
}
217+
}
218+
219+
if closed {
168220
return
169221
}
170222

171223
// tell the goroutine to stop
172224
close(stopChan)
173225
// and wait for them both to reply back
174226
wg.Wait()
175-
176227
}
177228

178-
func updateCLI(nodes []radix.ClusterNode, connections []radix.Conn, node_subscriptions_count []int, tick *time.Ticker, c chan os.Signal) bool {
179-
w := new(tabwriter.Writer)
229+
func updateCLI(nodes []radix.ClusterNode, connections []radix.Conn, node_subscriptions_count []int, tick *time.Ticker, c chan os.Signal, message_limit int64, w *tabwriter.Writer, test_time int) (bool, time.Time, time.Duration, uint64, []float64) {
230+
231+
start := time.Now()
232+
prevTime := time.Now()
233+
prevMessageCount := uint64(0)
234+
messageRateTs := []float64{}
235+
180236
w.Init(os.Stdout, 25, 0, 1, ' ', tabwriter.AlignRight)
181-
for idx, slot := range nodes {
182-
c, err := radix.Dial("tcp", slot.Addr)
183-
if err != nil {
184-
panic(err)
185-
}
186-
fmt.Fprint(w, fmt.Sprintf("shard #%d\t", idx+1))
187-
connections = append(connections, c)
188-
}
237+
fmt.Fprint(w, fmt.Sprintf("Test Time\tTotal Messages\t Message Rate \t"))
189238
fmt.Fprint(w, "\n")
190239
w.Flush()
191240
for {
192241
select {
193242
case <-tick.C:
194243
{
195-
for idx, c := range connections {
196-
var infoOutput string
197-
e := c.Do(radix.FlatCmd(&infoOutput, "INFO", "CLIENTS"))
198-
if e != nil {
199-
fmt.Fprint(w, fmt.Sprintf("----\t"))
200-
} else {
201-
connected_clients_line := strings.TrimSuffix(strings.Split(infoOutput, "\r\n")[1], "\r\n")
202-
i := strings.Index(connected_clients_line, ":")
203-
shard_clients, eint := strconv.Atoi(connected_clients_line[i+1:])
204-
if eint != nil {
205-
fmt.Fprint(w, fmt.Sprintf("----\t"))
206-
} else {
207-
var pubsubList string
208-
e := c.Do(radix.FlatCmd(&pubsubList, "CLIENT", "LIST", "TYPE","PUBSUB"))
209-
if e != nil {
210-
fmt.Fprint(w, fmt.Sprintf("----\t"))
211-
}
212-
subscribersList := strings.Split(pubsubList, "\n")
213-
//fmt.Println(subscribersList)
214-
expected_subscribers := node_subscriptions_count[idx]
215-
number_subscribers := len(subscribersList)-1
216-
others := shard_clients - 1 - number_subscribers
217-
218-
fmt.Fprint(w, fmt.Sprintf("%d (other %d sub %d==%d)\t", shard_clients, others, number_subscribers, expected_subscribers))
219-
}
220-
221-
}
244+
now := time.Now()
245+
took := now.Sub(prevTime)
246+
messageRate := float64(totalMessages-prevMessageCount) / float64(took.Seconds())
247+
if prevMessageCount == 0 && totalMessages != 0 {
248+
start = time.Now()
222249
}
250+
if totalMessages != 0 {
251+
messageRateTs = append(messageRateTs, messageRate)
252+
}
253+
prevMessageCount = totalMessages
254+
prevTime = now
255+
256+
fmt.Fprint(w, fmt.Sprintf("%.0f\t%d\t%.2f\t", time.Since(start).Seconds(), totalMessages, messageRate))
223257
fmt.Fprint(w, "\r\n")
224258
w.Flush()
259+
if message_limit > 0 && totalMessages >= uint64(message_limit) {
260+
return true, start, time.Since(start), totalMessages, messageRateTs
261+
}
262+
if test_time > 0 && time.Since(start) >= time.Duration(test_time*1000*1000*1000) && totalMessages != 0 {
263+
return true, start, time.Since(start), totalMessages, messageRateTs
264+
}
225265

226266
break
227267
}
228268

229269
case <-c:
230270
fmt.Println("received Ctrl-c - shutting down")
231-
return true
271+
return true, start, time.Since(start), totalMessages, messageRateTs
232272
}
233273
}
234-
return false
274+
return false, start, time.Since(start), totalMessages, messageRateTs
235275
}
236276

237277
func checkClientOutputBufferLimitPubSub(nodes []radix.ClusterNode, client_output_buffer_limit_pubsub *string) {
238278
for _, slot := range nodes {
239-
//fmt.Println(slot)
240279
conn, err := radix.Dial("tcp", slot.Addr)
241280
if err != nil {
242281
panic(err)

0 commit comments

Comments
 (0)