Skip to content

Commit d1b317e

Browse files
author
zhengshuxin
committed
fixed one bug in redis_client_pipeline.cpp about handling redirect message.
1 parent 471ef22 commit d1b317e

File tree

10 files changed

+170
-59
lines changed

10 files changed

+170
-59
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ endif
8282
##############################################################################
8383

8484
.PHONY = check help all_lib all samples all clean install uninstall uninstall_all build_one
85-
VERSION = 3.5.3-13
85+
VERSION = 3.5.3-14
8686

8787
default: build_one acl_master
8888
help h:

lib_acl/src/init/acl_init.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
#include "init.h"
2727

28-
static char *version = "3.5.3-13 20220607-20:36";
28+
static char *version = "3.5.3-14 20220615-15:00";
2929

3030
const char *acl_version(void)
3131
{

lib_acl_cpp/include/acl_cpp/redis/redis_client_pipeline.hpp

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ class token_tree;
2222
class redis_client;
2323

2424
typedef enum {
25-
redis_pipeline_t_cmd, // redis command type
26-
redis_pipeline_t_redirect, // should redirect to another node
27-
redis_pipeline_t_clusterdonw, // the redis node has been down
28-
redis_pipeline_t_stop, // the current channel should stop
25+
redis_pipeline_t_cmd, // Redis command type
26+
redis_pipeline_t_redirect, // Should redirect to another node
27+
redis_pipeline_t_clusterdonw, // The redis node has been down
28+
redis_pipeline_t_stop, // The current channel should stop
2929
} redis_pipeline_type_t;
3030

3131
/**
32-
* the message for transfering between redis command, redis client pipline
33-
* and redis pipeline channel, which holds the redis command or not.
32+
* The message for transfering between redis command, redis client pipline
33+
* and redis pipeline channel, which holds the redis command.
3434
*/
3535
class redis_pipeline_message {
3636
public:
@@ -176,6 +176,10 @@ class redis_pipeline_message {
176176

177177
class redis_client_pipeline;
178178

179+
/**
180+
* One pipeline channel thread for one redis node, which waits for message
181+
* from pipline thread and try to combine more messages and sends to redis.
182+
*/
179183
class redis_pipeline_channel : public thread {
180184
public:
181185
redis_pipeline_channel(redis_client_pipeline& pipeline,
@@ -214,7 +218,7 @@ class redis_pipeline_channel : public thread {
214218
};
215219

216220
/**
217-
* redis pipline communication, be set and used in redis_command to
221+
* Redis pipline communication, be set and used in redis_command to
218222
* improve the performance of redis commands, but not all redis commands
219223
* in acl can be used in pipeline mode, such as below:
220224
* 1. multiple keys operation
@@ -225,36 +229,36 @@ class ACL_CPP_API redis_client_pipeline : public thread {
225229
redis_client_pipeline(const char* addr);
226230
~redis_client_pipeline(void);
227231

228-
// start the pipeline thread
232+
// Start the pipeline thread
229233
void start_thread(void);
230234

231-
// stop the pipeline thread
235+
// Stop the pipeline thread
232236
void stop_thread(void);
233237

234238
public:
235-
// called by redis_command in pipeline mode
239+
// Called by redis_command in pipeline mode
236240
const redis_result* run(redis_pipeline_message& msg);
237241

238-
// called by redis_pipeline_channel
242+
// Called by redis_pipeline_channel
239243
void push(redis_pipeline_message* msg);
240244

241245
public:
242-
// set the password for connecting the redis server
246+
// Set the password for connecting the redis server
243247
redis_client_pipeline& set_password(const char* passwd);
244248

245-
// set network IO timeout
249+
// Set network IO timeout
246250
redis_client_pipeline& set_timeout(int conn_timeout, int rw_timeout);
247251

248-
// set if retry on IO failed in redis_client
252+
// Set if retry on IO failed in redis_client
249253
redis_client_pipeline& set_retry(bool on);
250254

251-
// set the max hash slot of redis, the default valud is 16384
255+
// Set the max hash slot of redis, the default valud is 16384
252256
redis_client_pipeline& set_max_slot(int max_slot);
253257

254-
// set if connecting all the redis nodes after starting
258+
// Set if connecting all the redis nodes after starting
255259
redis_client_pipeline& set_preconnect(bool yes);
256260

257-
// get the max hash slot of redis
261+
// Get the max hash slot of redis
258262
int get_max_slot(void) const {
259263
return max_slot_;
260264
}
@@ -264,62 +268,62 @@ class ACL_CPP_API redis_client_pipeline : public thread {
264268
void* run(void);
265269

266270
private:
267-
string addr_; // the default redis address
268-
string passwd_; // password for connecting redis
269-
int max_slot_; // the max hash slot for redis cluster
270-
int conn_timeout_; // timeout to connect redis
271+
string addr_; // The default redis address
272+
string passwd_; // Password for connecting redis
273+
int max_slot_; // The max hash slot for redis cluster
274+
int conn_timeout_; // Timeout to connect redis
271275
int rw_timeout_; // IO timeout with redis
272-
bool retry_; // if try again when disconnect from redis
273-
bool preconn_; // if connecting all redis nodes when starting
276+
bool retry_; // If try again when disconnect from redis
277+
bool preconn_; // If connecting all redis nodes when starting
274278

275279
token_tree* channels_; // holds and manage all pipeline channels
276280

277-
// the message queue for receiving redis message from other threads
281+
// The message queue for receiving redis message from other threads
278282
BOX<redis_pipeline_message> box_;
279283

280-
std::vector<char*> addrs_; // hold all redises addresses
281-
const char** slot_addrs_; // map hash slot with address
284+
std::vector<char*> addrs_; // Hold all redis's addresses
285+
const char** slot_addrs_; // Map hash slot with address
282286

283-
// set the hash slot with the specified redis address
287+
// Set the hash slot with the specified redis address
284288
void set_slot(int slot, const char* addr);
285289

286-
// set all hash slots' addresses of all redises
290+
// Set all hash slots' addresses of all redises
287291
void set_all_slot(void);
288292

289-
// start all pipeline channels threads
293+
// Start all pipeline channels threads
290294
void start_channels(void);
291295

292-
// stop all pipeline channels threads
296+
// Stop all pipeline channels threads
293297
void stop_channels(void);
294298

295-
// start one pipeline channel thread with the specified redis address
299+
// Start one pipeline channel thread with the specified redis address
296300
redis_pipeline_channel* start_channel(const char* addr);
297301

298-
// stop one pipeline channel thread with the specified redis address
302+
// Stop one pipeline channel thread with the specified redis address
299303
void stop_channel(const char* addr);
300304

301-
// get one pipeline channel thread with the specified hash slot
305+
// Get one pipeline channel thread with the specified hash slot
302306
redis_pipeline_channel* get_channel(int slot);
303307

304-
// redirect one slot to another redis address
308+
// Redirect one slot to another redis address
305309
void redirect(const redis_pipeline_message& msg, int slot);
306310

307-
// when one redis node down, we should clear the node's hash slot map
311+
// When one redis node down, we should clear the node's hash slot map
308312
// and stop the pipeline channel thread
309313
void cluster_down(const redis_pipeline_message& msg);
310314
};
311315

312316
/**
313-
* sample:
317+
* Sample:
314318
* void main_thread(void) {
315319
* acl::redis_client_pipeline pipeline("127.0.0.1:6379");
316320
* pipeline.start_thread();
317-
* // start some threads
321+
* // Start some threads
318322
* ...
319-
* // wait for thease threads to exit and stop pipeline thread.
323+
* // Wait for thease threads to exit and stop pipeline thread.
320324
* pipeline.stop_thread();
321325
* }
322-
* // execute redis command in one thread
326+
* // Execute redis command in one thread
323327
* void test_thread(acl::redis_client_pipeline& pipeline) {
324328
* acl::redis cmd;
325329
* cmd.set_pipeline(&pipeline);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
base_path = ../../..
2+
PROG = redis_pipeline
3+
include ../../Makefile.in
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include "stdafx.h"
2+
3+
static void usage(const char* procname) {
4+
printf("usage: %s -h[help]\r\n"
5+
"-s one_redis_addr[127.0.0.1:6379]\r\n"
6+
"-n loop_count[default: 10]\r\n"
7+
"-p password [set the password of redis cluster]\r\n"
8+
"-a cmd[hset|get|expire|ttl|exists|type|del]\r\n",
9+
procname);
10+
}
11+
12+
int main(int argc, char* argv[]) {
13+
int ch, count = 2;
14+
acl::string addr("127.0.0.1:6379"), passwd;
15+
acl::string cmd("del");
16+
17+
while ((ch = getopt(argc, argv, "ha:s:n:p:")) > 0) {
18+
switch (ch) {
19+
case 'h':
20+
usage(argv[0]);
21+
return 0;
22+
case 'a':
23+
cmd = optarg;
24+
break;
25+
case 's':
26+
addr = optarg;
27+
break;
28+
case 'n':
29+
count = atoi(optarg);
30+
break;
31+
case 'p':
32+
passwd = optarg;
33+
break;;
34+
default:
35+
break;
36+
}
37+
}
38+
39+
acl::acl_cpp_init();
40+
acl::log::stdout_open(true);
41+
42+
acl::redis_client_pipeline pipeline(addr);
43+
if (!passwd.empty()) {
44+
pipeline.set_password(passwd);
45+
}
46+
pipeline.start_thread();
47+
48+
acl::string key, name, value;
49+
50+
acl::redis redis(&pipeline);
51+
for (int i = 0; i < count; i++) {
52+
key.format("hkey-%d", i);
53+
name.format("hname-%d", i);
54+
value.format("hvalue-%d", i);
55+
56+
if (redis.hset(key, name, value) >= 0) {
57+
printf("hset %s %s %s ok\n",
58+
key.c_str(), name.c_str(), value.c_str());
59+
} else {
60+
printf("hset %s %s %s error\n",
61+
key.c_str(), name.c_str(), value.c_str());
62+
}
63+
}
64+
65+
pipeline.stop_thread();
66+
return 0;
67+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// stdafx.cpp : 只包括标准包含文件的源文件
2+
// xml.pch 将成为预编译头
3+
// stdafx.obj 将包含预编译类型信息
4+
5+
#include "stdafx.h"
6+
7+
// TODO: 在 STDAFX.H 中
8+
//引用任何所需的附加头文件,而不是在此文件中引用
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// stdafx.h : 标准系统包含文件的包含文件,
2+
// 或是常用但不常更改的项目特定的包含文件
3+
//
4+
5+
#pragma once
6+
7+
//
8+
//#include <iostream>
9+
//#include <tchar.h>
10+
11+
// TODO: 在此处引用程序要求的附加头文件
12+
#include "acl_cpp/lib_acl.hpp"
13+
#include "lib_acl.h"
14+
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/sh
2+
3+
valgrind --tool=memcheck --leak-check=yes -v ./redis_pipeline -s 127.0.0.1:6379 -a del -n 10 -c 10

0 commit comments

Comments
 (0)