Skip to content

Commit 5cd3127

Browse files
committed
Unified cancel for sql statements, for both running and queued sql.
Syntax: exec procedure sys.cmd.cancel('all') // cancel all statements in libevent sql_evbuffers exec procedure sys.cmd.cancel('running') // cancel all running statements exec procedure sys.cmd.cancel('queued') // cancel all queued statements exec procedure sys.cmd.cancel('cnonce', 'uuid') // cancel 'uuid' statement, 'uuid' provided by comdb2_connections exec procedure sys.cmd.cancel('fp', 'uuid') // cancel all fp 'uuid' statements, fp provided by comdb2_connections. NOTE: canceled requests will show as 'queued_canceled', and so on, in comdb2_connections Signed-off-by: Dorin Hogea <dhogea@bloomberg.net>
1 parent 99a72d1 commit 5cd3127

File tree

16 files changed

+705
-21
lines changed

16 files changed

+705
-21
lines changed

db/process_message.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ static const char *HELP_SQL[] = {
237237
"hist - show recently run statements",
238238
"cancel N - cancel running statement with id N",
239239
"cancelcnonce N - cancel running statement with cnonce N",
240+
"ucancel cnonce N - cancel running statement with unified uuid N (per comdb2_connections)",
241+
"ucancel fp N - cancel running statement with fingerprint N (per comdb2_connections)",
242+
"ucancel running - cancel all running statement (leaves queued statements intact)",
243+
"ucancel queued - cancel all queued statement (leaves running statements intact)",
244+
"ucancel all - cancel all queued and running statements",
240245
"wrtimeout N - set write timeout in ms",
241246
"help - this information",
242247
NULL,
@@ -3118,6 +3123,34 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
31183123
cancel_sql_statement_with_cnonce(cnonce);
31193124
free(cnonce);
31203125
}
3126+
} else if (tokcmp(tok, ltok, "ucancel") == 0) {
3127+
char *uuid = NULL;
3128+
enum ucancel_type t = UCANCEL_INV;
3129+
tok = segtok(line, lline, &st, &ltok);
3130+
if (ltok) {
3131+
if (tokcmp(tok, ltok, "all") == 0)
3132+
t = UCANCEL_ALL;
3133+
else if (tokcmp(tok, ltok, "running") == 0)
3134+
t = UCANCEL_RUN;
3135+
else if (tokcmp(tok, ltok, "queued") == 0)
3136+
t = UCANCEL_QUE;
3137+
else if (tokcmp(tok, ltok, "cnonce") == 0)
3138+
t = UCANCEL_CNO;
3139+
else if (tokcmp(tok, ltok, "fp") == 0)
3140+
t = UCANCEL_FPT;
3141+
3142+
if (t == UCANCEL_CNO || t == UCANCEL_FPT) {
3143+
tok = segtok(line, lline, &st, &ltok);
3144+
if (ltok)
3145+
uuid = tokdup(tok, ltok);
3146+
else
3147+
t = UCANCEL_INV;
3148+
}
3149+
if (t != UCANCEL_INV)
3150+
ucancel_sql_statements(t, uuid);
3151+
}
3152+
if (t == UCANCEL_INV)
3153+
logmsg(LOGMSG_ERROR, "Usage sql ucancel [all|queued|running|fp N|cnonce N]");
31213154
} else if (tokcmp(tok, ltok, "help") == 0) {
31223155
print_help_page(HELP_SQL);
31233156
} else if (tokcmp(tok, ltok, "debug") == 0) {

db/sql.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,17 @@ typedef struct sqlclntstate_fdb {
322322
int failed_heartbeats; /* used to signal failed communication with remotes */
323323
} sqlclntstate_fdb_t;
324324

325+
enum ucancel_type {
326+
UCANCEL_INV = 0,
327+
UCANCEL_ALL = 1, /* both queued and running */
328+
UCANCEL_RUN = 2, /* running only */
329+
UCANCEL_QUE = 4, /* queued only */
330+
UCANCEL_CNO = 8, /* filter by cnonce */
331+
UCANCEL_FPT = 16 /* filter by fp */
332+
};
333+
334+
int ucancel_sql_statements(enum ucancel_type type, char *uuid);
335+
325336
CurRange *currange_new();
326337
#define CURRANGEARR_INIT_CAP 2
327338
void currangearr_init(CurRangeArr *arr);
@@ -745,6 +756,7 @@ struct sqlclntstate {
745756

746757
struct rawnodestats *rawnodestats;
747758

759+
uuid_t unifieduuid; /* assigned to any statement running, used for canceling live sql */
748760
osqlstate_t osql; /* offload sql state is kept here */
749761
enum ctrl_sqleng ctrl_sqlengine; /* use to mark a begin/end out of state,
750762
see enum ctrl_sqleng
@@ -1070,6 +1082,7 @@ struct sqlclntstate {
10701082
int tail_offset;
10711083

10721084
struct features features;
1085+
int discard_this; /* set by a cancel() trap, complement thd->stop_this_statement for queued request */
10731086
};
10741087
typedef struct sqlclntstate sqlclntstate;
10751088

@@ -1340,6 +1353,8 @@ struct connection_info {
13401353
enum connection_state state_int;
13411354
int64_t in_transaction;
13421355
int64_t in_local_cache;
1356+
char *uuid;
1357+
int64_t is_canceled;
13431358
};
13441359

13451360
/* makes master swing verbose */
@@ -1723,6 +1738,8 @@ int osql_test_create_genshard(struct schema_change_type *sc, char **errmsg, int
17231738
char **dbnames, uint32_t numcols, char **columns, char **shardnames);
17241739
int osql_test_remove_genshard(struct schema_change_type *sc, char **errmsg);
17251740

1741+
void cancel_connections(int only_queued, uuid_t filter_fp);
1742+
17261743
struct sp_tmptbl {
17271744
pthread_mutex_t lk;
17281745
uint64_t rowid;

db/sqlglue.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
629629
return 1;
630630
}
631631
}
632+
if (clnt->discard_this) {
633+
clnt->thd->sqlthd->stop_this_statement = 1;
634+
clnt->discard_this = 0;
635+
}
632636
return 0;
633637
}
634638

@@ -665,6 +669,10 @@ static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
665669
if (thd->stop_this_statement) {
666670
rc = SQLITE_ABORT;
667671
goto done;
672+
} else if (clnt->discard_this) {
673+
rc = SQLITE_ABORT;
674+
clnt->discard_this = 0;
675+
goto done;
668676
}
669677

670678
if (clnt->statement_timedout) {
@@ -9707,6 +9715,105 @@ void cancel_sql_statement_with_cnonce(const char *cnonce)
97079715
logmsg(LOGMSG_USER, "Query with cnonce %s not found (finished?)\n", cnonce);
97089716
}
97099717

9718+
static int _ucancel_sql_statements_cno(char *uuidstr)
9719+
{
9720+
struct sql_thread *thd;
9721+
int rc = -1;
9722+
9723+
uuid_t uuid;
9724+
9725+
if (uuid_parse(uuidstr, uuid)) {
9726+
logmsg(LOGMSG_ERROR, "Failed to parse uuid \"%s\"\n", uuidstr);
9727+
return rc;
9728+
}
9729+
9730+
Pthread_mutex_lock(&gbl_sql_lock);
9731+
LISTC_FOR_EACH(&thedb->sql_threads, thd, lnk) {
9732+
if (thd->clnt && memcmp(thd->clnt->unifieduuid, uuid, sizeof(uuid_t)) == 0) {
9733+
logmsg(LOGMSG_ERROR, "Cancelling sql %s\n", thd->clnt->sql);
9734+
thd->stop_this_statement = 1;
9735+
rc = 0;
9736+
break;
9737+
}
9738+
}
9739+
Pthread_mutex_unlock(&gbl_sql_lock);
9740+
if (rc)
9741+
logmsg(LOGMSG_ERROR, "Sql cnonce \"%s\" not found\n", uuidstr);
9742+
return rc;
9743+
}
9744+
9745+
static int _ucancel_sql_statements_run(void)
9746+
{
9747+
struct sql_thread *thd;
9748+
9749+
Pthread_mutex_lock(&gbl_sql_lock);
9750+
LISTC_FOR_EACH(&thedb->sql_threads, thd, lnk) {
9751+
if (thd->clnt)
9752+
thd->stop_this_statement = 1;
9753+
}
9754+
Pthread_mutex_unlock(&gbl_sql_lock);
9755+
9756+
return 0;
9757+
}
9758+
9759+
static int _ucancel_sql_statements_que(void)
9760+
{
9761+
uuid_t zerouuid;
9762+
comdb2uuid_clear(zerouuid);
9763+
cancel_connections(1, zerouuid);
9764+
return 0;
9765+
}
9766+
9767+
static int _ucancel_sql_statements_all(void)
9768+
{
9769+
uuid_t zerouuid;
9770+
comdb2uuid_clear(zerouuid);
9771+
cancel_connections(0, zerouuid);
9772+
return 0;
9773+
}
9774+
9775+
static int _ucancel_sql_statements_fp(char * uuidstr)
9776+
{
9777+
uuid_t uuid;
9778+
9779+
if (uuid_parse(uuidstr, uuid)) {
9780+
logmsg(LOGMSG_ERROR, "Failed to parse uuid \"%s\"\n", uuidstr);
9781+
return -1;
9782+
}
9783+
9784+
cancel_connections(0, uuid);
9785+
return 0;
9786+
}
9787+
9788+
/*
9789+
* "Unified" sql statements cancel routine
9790+
* MODES:
9791+
* type == UCANCEL_ALL #cancel all running and queued statements
9792+
* type == UCANCEL_RUN #cancel all running statements
9793+
* type == UCANCEL_QUE #cancel all queued statements
9794+
* type == UCANCEL_CNO #cancel one running statement with cnonce = "uuid"
9795+
* type == UCANCEL_FPT #cancel all running statement with fingerprint = "uuid"
9796+
*
9797+
*/
9798+
int ucancel_sql_statements(enum ucancel_type type, char *uuid) {
9799+
switch(type) {
9800+
case UCANCEL_CNO:
9801+
return _ucancel_sql_statements_cno(uuid);
9802+
case UCANCEL_RUN:
9803+
return _ucancel_sql_statements_run();
9804+
case UCANCEL_QUE:
9805+
return _ucancel_sql_statements_que();
9806+
case UCANCEL_ALL:
9807+
return _ucancel_sql_statements_all();
9808+
case UCANCEL_FPT:
9809+
return _ucancel_sql_statements_fp(uuid);
9810+
default:
9811+
logmsg(LOGMSG_ERROR, "%d unimplemented\n", type);
9812+
return -1;
9813+
}
9814+
return 0;
9815+
}
9816+
97109817
void sql_dump_running_statements(void)
97119818
{
97129819
struct sql_thread *thd;

db/sqlinterfaces.c

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4440,7 +4440,13 @@ static void sqlengine_work_lua_thread(void *thddata, void *work)
44404440
clnt->deque_timeus = comdb2_time_epochus();
44414441
clnt->thd = thd;
44424442
/* Reset the cancel-statement flag */
4443-
thd->sqlthd->stop_this_statement = 0;
4443+
if (clnt->discard_this) {
4444+
thd->sqlthd->stop_this_statement = 1;
4445+
clnt->discard_this = 0;
4446+
} else
4447+
thd->sqlthd->stop_this_statement = 0;
4448+
4449+
clnt->discard_this = 0;
44444450
sql_update_usertran_state(clnt);
44454451

44464452
rdlock_schema_lk();
@@ -4698,7 +4704,11 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)
46984704
sqlthd->clnt = clnt;
46994705
clnt->thd = thd;
47004706
/* Reset the cancel-statement flag */
4701-
sqlthd->stop_this_statement = 0;
4707+
if (clnt->discard_this) {
4708+
thd->sqlthd->stop_this_statement = 1;
4709+
clnt->discard_this = 0;
4710+
} else
4711+
thd->sqlthd->stop_this_statement = 0;
47024712

47034713
thr_set_user("appsock", (intptr_t)clnt->appsock_id);
47044714

@@ -6480,6 +6490,11 @@ static void gather_connection_int(struct connection_info *c, struct sqlclntstate
64806490
c->in_transaction = clnt->in_client_trans;
64816491
c->in_local_cache = clnt->in_local_cache;
64826492
Pthread_mutex_unlock(&clnt->state_lk);
6493+
uuidstr_t us;
6494+
comdb2uuidstr(clnt->unifieduuid, us);
6495+
c->uuid = malloc(strlen(us) + 1);
6496+
snprintf(c->uuid, strlen(us) + 1, "%s", us);
6497+
c->is_canceled = clnt->discard_this;
64836498
}
64846499

64856500
static void gather_connections_evbuffer(struct connection_info **info, int *num_connections)
@@ -6509,13 +6524,40 @@ void free_connection_info(struct connection_info *info, int num_connections)
65096524
{
65106525
if (info == NULL) return;
65116526
for (int i = 0; i < num_connections; i++) {
6512-
if (info[i].sql) free(info[i].sql);
6513-
if (info[i].fingerprint) free(info[i].fingerprint);
6527+
free(info[i].sql);
6528+
free(info[i].fingerprint);
6529+
free(info[i].uuid);
65146530
/* state is static, don't free */
65156531
}
65166532
free(info);
65176533
}
65186534

6535+
void cancel_connections(int only_queued, uuid_t filter_fp)
6536+
{
6537+
struct sqlclntstate *clnt;
6538+
Pthread_mutex_lock(&lru_evbuffers_mtx);
6539+
TAILQ_FOREACH(clnt, &sql_evbuffers, sql_entry) {
6540+
if (!comdb2uuid_is_zero(filter_fp)) {
6541+
if (!comdb2uuidcmp(filter_fp, clnt->unifieduuid)) {
6542+
clnt->discard_this = 1;
6543+
fprintf(stderr, "%s:%d Cancelling %s\n", __func__, __LINE__, clnt->sql);
6544+
}
6545+
} else {
6546+
if (only_queued) {
6547+
if (clnt->state == CONNECTION_QUEUED) {
6548+
clnt->discard_this = 1;
6549+
fprintf(stderr, "%s:%d Cancelling %s\n", __func__, __LINE__, clnt->sql);
6550+
}
6551+
} else if (clnt->state == CONNECTION_RUNNING ||
6552+
clnt->state == CONNECTION_QUEUED) {
6553+
clnt->discard_this = 1;
6554+
fprintf(stderr, "%s:%d Cancelling %s\n", __func__, __LINE__, clnt->sql);
6555+
}
6556+
}
6557+
}
6558+
Pthread_mutex_unlock(&lru_evbuffers_mtx);
6559+
}
6560+
65196561
void reqlog_long_running_sql_statements(void)
65206562
{
65216563
struct sqlclntstate *clnt;

0 commit comments

Comments
 (0)