@@ -43,9 +43,11 @@ func DetermineBaseTime(files []string) time.Time {
43
43
// Handles a single connection/client
44
44
type ClientWorker struct {
45
45
redis * redis.Client
46
+ compare * redis.Client
46
47
incoming chan Record
47
48
processed uint
48
49
pipe redis.Pipeliner
50
+ comparePipe redis.Pipeliner
49
51
}
50
52
51
53
// Pipeline length ranges for summary
@@ -60,6 +62,16 @@ var pipelineRanges = []struct {
60
62
{"200+" , 200 , 1 << 30 },
61
63
}
62
64
65
+ var compareIgnoreCmds = []string {
66
+ "HELLO" ,
67
+ "AUTH" ,
68
+ "SELECT" ,
69
+ "INFO" ,
70
+ "TIME" ,
71
+ "CLIENT" ,
72
+ "CONFIG" ,
73
+ }
74
+
63
75
// Handles a single file and distributes messages to clients
64
76
type FileWorker struct {
65
77
clientGroup sync.WaitGroup
@@ -100,11 +112,82 @@ func trackLatency(worker *FileWorker, batchLatency float64, size int) {
100
112
}
101
113
}
102
114
115
+ func ignoreCompareCmd (c redis.Cmder ) bool {
116
+ args := c .Args ()
117
+ if len (args ) == 0 {
118
+ return true
119
+ }
120
+ name := strings .ToUpper (fmt .Sprint (args [0 ]))
121
+ for _ , ign := range compareIgnoreCmds {
122
+ if name == ign {
123
+ return true
124
+ }
125
+ }
126
+ return false
127
+ }
128
+
129
+ func cmdAsString (c redis.Cmder ) string {
130
+ args := c .Args ()
131
+ if len (args ) == 0 {
132
+ return "<no-args>"
133
+ }
134
+
135
+ name := strings .ToUpper (fmt .Sprint (args [0 ]))
136
+ if len (args ) == 1 {
137
+ return name
138
+ }
139
+
140
+ parts := make ([]string , 0 , len (args ) - 1 )
141
+ for _ , a := range args [1 :] {
142
+ s := fmt .Sprint (a )
143
+ parts = append (parts , s )
144
+ }
145
+ return name + " " + strings .Join (parts , " " )
146
+ }
147
+
148
+ func cmdResultString (cm redis.Cmder ) string {
149
+ if err := cm .Err (); err != nil {
150
+ if err == redis .Nil {
151
+ return "(nil)"
152
+ }
153
+ return "ERR: " + err .Error ()
154
+ }
155
+
156
+ if cmd , ok := cm .(* redis.Cmd ); ok {
157
+ v := cmd .Val ()
158
+ s := fmt .Sprintf ("%v" , v )
159
+ return s
160
+ }
161
+
162
+ return fmt .Sprintf ("<unknown Cmder %T>" , cm )
163
+ }
164
+
165
+ func compareCmdResults (a , b []redis.Cmder , lastMsg Record ) {
166
+ if len (a ) != len (b ) {
167
+ log .Fatalf ("[COMPARE] mismatch count: primary=%d compare=%d (last client=%d time=%d)" , len (a ), len (b ), lastMsg .Client , lastMsg .Time )
168
+ return
169
+ }
170
+
171
+ for i := range a {
172
+ if (ignoreCompareCmd (a [i ])) {
173
+ continue
174
+ }
175
+ pa := cmdResultString (a [i ])
176
+ pb := cmdResultString (b [i ])
177
+ if pa != pb {
178
+ log .Fatalf ("[COMPARE] mismatch at idx %d cmd=%s\n primary=%s\n compare=%s\n (client=%d time=%d)" , i , cmdAsString (a [i ]), pa , pb , lastMsg .Client , lastMsg .Time )
179
+ }
180
+ }
181
+ }
182
+
103
183
func (c * ClientWorker ) Run (pace bool , worker * FileWorker ) {
104
184
for msg := range c .incoming {
105
185
if c .processed == 0 && msg .DbIndex != 0 {
106
186
// There is no easy way to switch, we rely on connection pool consisting only of one connection
107
187
c .redis .Do (context .Background (), []interface {}{"SELECT" , fmt .Sprint (msg .DbIndex )})
188
+ if c .compare != nil {
189
+ c .compare .Do (context .Background (), []interface {}{"SELECT" , fmt .Sprint (msg .DbIndex )})
190
+ }
108
191
}
109
192
110
193
lag := time .Until (worker .HappensAt (time .Unix (0 , int64 (msg .Time ))))
@@ -117,15 +200,24 @@ func (c *ClientWorker) Run(pace bool, worker *FileWorker) {
117
200
}
118
201
119
202
c .pipe .Do (context .Background (), msg .values ... ).Result ()
203
+ if c .comparePipe != nil {
204
+ c .comparePipe .Do (context .Background (), msg .values ... ).Result ()
205
+ }
206
+
120
207
atomic .AddUint64 (& worker .processed , 1 )
121
208
122
209
if msg .HasMore == 0 {
123
210
size := c .pipe .Len ()
124
211
start := time .Now ()
125
- c .pipe .Exec (context .Background ())
212
+ cmds , _ := c .pipe .Exec (context .Background ())
126
213
batchLatency := float64 (time .Since (start ).Microseconds ())
127
214
trackLatency (worker , batchLatency , size )
128
215
c .processed += uint (size )
216
+
217
+ if c .comparePipe != nil {
218
+ ccmds , _ := c .comparePipe .Exec (context .Background ())
219
+ compareCmdResults (cmds , ccmds , msg )
220
+ }
129
221
}
130
222
}
131
223
@@ -147,6 +239,11 @@ func NewClient(w *FileWorker, pace bool) *ClientWorker {
147
239
}
148
240
client .pipe = client .redis .Pipeline ()
149
241
242
+ if * fCompareHost != "" {
243
+ client .compare = redis .NewClient (& redis.Options {Addr : * fCompareHost , PoolSize : 1 , DisableIndentity : true })
244
+ client .comparePipe = client .compare .Pipeline ()
245
+ }
246
+
150
247
atomic .AddUint64 (& w .clients , 1 )
151
248
w .clientGroup .Add (1 )
152
249
go client .Run (pace , w )
0 commit comments