@@ -143,6 +143,44 @@ int redis_pubsub::punsubscribe(const std::vector<string>& patterns)
143143 return subop (" PUNSUBSCRIBE" , patterns);
144144}
145145
146+ int redis_pubsub::subop_result (const char * cmd,
147+ const std::vector<const char *>& channels)
148+ {
149+ int nchannels = 0 , ret;
150+ size_t i = 0 ;
151+ do
152+ {
153+ const redis_result* res = run ();
154+ if (res == NULL || res->get_type () != REDIS_RESULT_ARRAY)
155+ return -1 ;
156+
157+ // clear request, so in next loop we just read the data from
158+ // redis-server that the data maybe the message to be skipped
159+ clear_request ();
160+
161+ const redis_result* o = res->get_child (0 );
162+ if (o == NULL || o->get_type () != REDIS_RESULT_STRING)
163+ return -1 ;
164+
165+ string tmp;
166+ o->argv_to_string (tmp);
167+ // just skip message in subscribe process
168+ if (tmp.equal (" message" , false ) || tmp.equal (" pmessage" , false ))
169+ continue ;
170+
171+ if ((ret = check_channel (res, cmd, channels[i])) < 0 )
172+ return -1 ;
173+
174+ if (ret > nchannels)
175+ nchannels = ret;
176+
177+ i++;
178+
179+ } while (i < channels.size ());
180+
181+ return nchannels;
182+ }
183+
146184int redis_pubsub::subop (const char * cmd, const std::vector<const char *>& channels)
147185{
148186 size_t argc = 1 + channels.size ();
@@ -163,23 +201,42 @@ int redis_pubsub::subop(const char* cmd, const std::vector<const char*>& channel
163201 hash_slot (channels[0 ]);
164202
165203 build_request (argc, argv, lens);
166- const redis_result* result = run (channels.size ());
167- if (result == NULL || result->get_type () != REDIS_RESULT_ARRAY)
168- return -1 ;
169204
170- size_t size = channels. size ( );
171- int nchannels = 0 , ret;
205+ return subop_result (cmd, channels);
206+ }
172207
173- for (size_t i = 0 ; i < size; i++)
208+ int redis_pubsub::subop_result (const char * cmd,
209+ const std::vector<string>& channels)
210+ {
211+ int nchannels = 0 , ret;
212+ size_t i = 0 ;
213+ do
174214 {
175- const redis_result* obj = result-> get_child (i );
176- if (obj == NULL )
215+ const redis_result* res = run ( );
216+ if (res == NULL || res-> get_type () != REDIS_RESULT_ARRAY )
177217 return -1 ;
178- if (( ret = check_channel (obj, argv[0 ], channels[i])) < 0 )
218+
219+ clear_request ();
220+
221+ const redis_result* o = res->get_child (0 );
222+ if (o == NULL || o->get_type () != REDIS_RESULT_STRING)
223+ return -1 ;
224+
225+ string tmp;
226+ o->argv_to_string (tmp);
227+ if (tmp.equal (" message" , false ) || tmp.equal (" pmessage" , false ))
228+ continue ;
229+
230+ if ((ret = check_channel (res, cmd, channels[i])) < 0 )
179231 return -1 ;
232+
180233 if (ret > nchannels)
181234 nchannels = ret;
182- }
235+
236+ i++;
237+
238+ } while (i < channels.size ());
239+
183240 return nchannels;
184241}
185242
@@ -203,28 +260,11 @@ int redis_pubsub::subop(const char* cmd, const std::vector<string>& channels)
203260 hash_slot (channels[0 ].c_str ());
204261
205262 build_request (argc, argv, lens);
206- const redis_result* result = run (channels.size ());
207- if (result == NULL || result->get_type () != REDIS_RESULT_ARRAY)
208- return -1 ;
209-
210- size_t size = channels.size ();
211- int nchannels = 0 , ret;
212-
213- for (size_t i = 0 ; i < size; i++)
214- {
215- const redis_result* obj = result->get_child (i);
216- if (obj == NULL )
217- return -1 ;
218- if (( ret = check_channel (obj, argv[0 ], channels[i])) < 0 )
219- return -1 ;
220- if (ret > nchannels)
221- nchannels = ret;
222- }
223- return nchannels;
263+ return subop_result (cmd, channels);
224264}
225265
226266int redis_pubsub::check_channel (const redis_result* obj, const char * cmd,
227- const string& channel)
267+ const char * channel)
228268{
229269 if (obj->get_type () != REDIS_RESULT_ARRAY)
230270 return -1 ;
@@ -236,16 +276,28 @@ int redis_pubsub::check_channel(const redis_result* obj, const char* cmd,
236276 string buf;
237277 rr->argv_to_string (buf);
238278 if (strcasecmp (buf.c_str (), cmd) != 0 )
279+ {
280+ acl::string tmp;
281+ obj->to_string (tmp);
282+ logger_warn (" invalid cmd=%s, %s, result=%s" ,
283+ buf.c_str (), cmd, tmp.c_str ());
239284 return -1 ;
285+ }
240286
241287 rr = obj->get_child (1 );
242288 if (rr == NULL || rr->get_type () != REDIS_RESULT_STRING)
243289 return -1 ;
244290
245291 buf.clear ();
246292 rr->argv_to_string (buf);
247- if (strcasecmp (buf.c_str (), channel.c_str ()) != 0 )
293+ if (strcasecmp (buf.c_str (), channel) != 0 )
294+ {
295+ acl::string tmp;
296+ obj->to_string (tmp);
297+ logger_warn (" invalid channel=%s, %s, result=%s" ,
298+ buf.c_str (), channel, tmp.c_str ());
248299 return -1 ;
300+ }
249301
250302 rr = obj->get_child (2 );
251303 if (rr == NULL || rr->get_type () != REDIS_RESULT_INTEGER)
0 commit comments