Skip to content

Commit 5e1b656

Browse files
committed
Unified cancel for sql statements, for both running and queued sql.
Syntax: exec procedure sys.cmd.cancel('all') // cancel all statements in libevent sql_evbuffers exec procedure sys.cmd.cancel('running') // cancel all running statements exec procedure sys.cmd.cancel('queued') // cancel all queued statements exec procedure sys.cmd.cancel('cnonce', 'uuid') // cancel 'uuid' statement, 'uuid' provided by comdb2_connections exec procedure sys.cmd.cancel('fp', 'uuid') // cancel all fp 'uuid' statements, fp provided by comdb2_connections. NOTE: canceled requests will show as 'queued_canceled', and so on, in comdb2_connections
1 parent 96912b8 commit 5e1b656

File tree

16 files changed

+705
-21
lines changed

16 files changed

+705
-21
lines changed

db/process_message.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ static const char *HELP_SQL[] = {
237237
"hist - show recently run statements",
238238
"cancel N - cancel running statement with id N",
239239
"cancelcnonce N - cancel running statement with cnonce N",
240+
"ucancel cnonce N - cancel running statement with unified uuid N (per comdb2_connections)",
241+
"ucancel fp N - cancel running statement with fingerprint N (per comdb2_connections)",
242+
"ucancel running - cancel all running statement (leaves queued statements intact)",
243+
"ucancel queued - cancel all queued statement (leaves running statements intact)",
244+
"ucancel all - cancel all queued and running statements",
240245
"wrtimeout N - set write timeout in ms",
241246
"help - this information",
242247
NULL,
@@ -3118,6 +3123,34 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st)
31183123
cancel_sql_statement_with_cnonce(cnonce);
31193124
free(cnonce);
31203125
}
3126+
} else if (tokcmp(tok, ltok, "ucancel") == 0) {
3127+
char *uuid = NULL;
3128+
enum ucancel_type t = UCANCEL_INV;
3129+
tok = segtok(line, lline, &st, &ltok);
3130+
if (ltok) {
3131+
if (tokcmp(tok, ltok, "all") == 0)
3132+
t = UCANCEL_ALL;
3133+
else if (tokcmp(tok, ltok, "running") == 0)
3134+
t = UCANCEL_RUN;
3135+
else if (tokcmp(tok, ltok, "queued") == 0)
3136+
t = UCANCEL_QUE;
3137+
else if (tokcmp(tok, ltok, "cnonce") == 0)
3138+
t = UCANCEL_CNO;
3139+
else if (tokcmp(tok, ltok, "fp") == 0)
3140+
t = UCANCEL_FPT;
3141+
3142+
if (t == UCANCEL_CNO || t == UCANCEL_FPT) {
3143+
tok = segtok(line, lline, &st, &ltok);
3144+
if (ltok)
3145+
uuid = tokdup(tok, ltok);
3146+
else
3147+
t = UCANCEL_INV;
3148+
}
3149+
if (t != UCANCEL_INV)
3150+
ucancel_sql_statements(t, uuid);
3151+
}
3152+
if (t == UCANCEL_INV)
3153+
logmsg(LOGMSG_ERROR, "Usage sql ucancel [all|queued|running|fp N|cnonce N]");
31213154
} else if (tokcmp(tok, ltok, "help") == 0) {
31223155
print_help_page(HELP_SQL);
31233156
} else if (tokcmp(tok, ltok, "debug") == 0) {

db/sql.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,17 @@ typedef struct sqlclntstate_fdb {
322322
int failed_heartbeats; /* used to signal failed communication with remotes */
323323
} sqlclntstate_fdb_t;
324324

325+
enum ucancel_type {
326+
UCANCEL_INV = 0,
327+
UCANCEL_ALL = 1, /* both queued and running */
328+
UCANCEL_RUN = 2, /* running only */
329+
UCANCEL_QUE = 4, /* queued only */
330+
UCANCEL_CNO = 8, /* filter by cnonce */
331+
UCANCEL_FPT = 16 /* filter by fp */
332+
};
333+
334+
int ucancel_sql_statements(enum ucancel_type type, char *uuid);
335+
325336
CurRange *currange_new();
326337
#define CURRANGEARR_INIT_CAP 2
327338
void currangearr_init(CurRangeArr *arr);
@@ -745,6 +756,7 @@ struct sqlclntstate {
745756

746757
struct rawnodestats *rawnodestats;
747758

759+
uuid_t unifieduuid; /* assigned to any statement running, used for canceling live sql */
748760
osqlstate_t osql; /* offload sql state is kept here */
749761
enum ctrl_sqleng ctrl_sqlengine; /* use to mark a begin/end out of state,
750762
see enum ctrl_sqleng
@@ -1068,6 +1080,7 @@ struct sqlclntstate {
10681080
int tail_offset;
10691081

10701082
struct features features;
1083+
int discard_this; /* set by a cancel() trap, complement thd->stop_this_statement for queued request */
10711084
};
10721085
typedef struct sqlclntstate sqlclntstate;
10731086

@@ -1338,6 +1351,8 @@ struct connection_info {
13381351
enum connection_state state_int;
13391352
int64_t in_transaction;
13401353
int64_t in_local_cache;
1354+
char *uuid;
1355+
int64_t is_canceled;
13411356
};
13421357

13431358
/* makes master swing verbose */
@@ -1721,6 +1736,8 @@ int osql_test_create_genshard(struct schema_change_type *sc, char **errmsg, int
17211736
char **dbnames, uint32_t numcols, char **columns, char **shardnames);
17221737
int osql_test_remove_genshard(struct schema_change_type *sc, char **errmsg);
17231738

1739+
void cancel_connections(int only_queued, uuid_t filter_fp);
1740+
17241741
struct sp_tmptbl {
17251742
pthread_mutex_t lk;
17261743
uint64_t rowid;

db/sqlglue.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ int check_sql_client_disconnect(struct sqlclntstate *clnt, char *file, int line)
629629
return 1;
630630
}
631631
}
632+
if (clnt->discard_this) {
633+
clnt->thd->sqlthd->stop_this_statement = 1;
634+
clnt->discard_this = 0;
635+
}
632636
return 0;
633637
}
634638

@@ -665,6 +669,10 @@ static int sql_tick(struct sql_thread *thd, int no_recover_deadlock)
665669
if (thd->stop_this_statement) {
666670
rc = SQLITE_ABORT;
667671
goto done;
672+
} else if (clnt->discard_this) {
673+
rc = SQLITE_ABORT;
674+
clnt->discard_this = 0;
675+
goto done;
668676
}
669677

670678
if (clnt->statement_timedout) {
@@ -9700,6 +9708,105 @@ void cancel_sql_statement_with_cnonce(const char *cnonce)
97009708
logmsg(LOGMSG_USER, "Query with cnonce %s not found (finished?)\n", cnonce);
97019709
}
97029710

9711+
static int _ucancel_sql_statements_cno(char *uuidstr)
9712+
{
9713+
struct sql_thread *thd;
9714+
int rc = -1;
9715+
9716+
uuid_t uuid;
9717+
9718+
if (uuid_parse(uuidstr, uuid)) {
9719+
logmsg(LOGMSG_ERROR, "Failed to parse uuid \"%s\"\n", uuidstr);
9720+
return rc;
9721+
}
9722+
9723+
Pthread_mutex_lock(&gbl_sql_lock);
9724+
LISTC_FOR_EACH(&thedb->sql_threads, thd, lnk) {
9725+
if (thd->clnt && memcmp(thd->clnt->unifieduuid, uuid, sizeof(uuid_t)) == 0) {
9726+
logmsg(LOGMSG_ERROR, "Cancelling sql %s\n", thd->clnt->sql);
9727+
thd->stop_this_statement = 1;
9728+
rc = 0;
9729+
break;
9730+
}
9731+
}
9732+
Pthread_mutex_unlock(&gbl_sql_lock);
9733+
if (rc)
9734+
logmsg(LOGMSG_ERROR, "Sql cnonce \"%s\" not found\n", uuidstr);
9735+
return rc;
9736+
}
9737+
9738+
static int _ucancel_sql_statements_run(void)
9739+
{
9740+
struct sql_thread *thd;
9741+
9742+
Pthread_mutex_lock(&gbl_sql_lock);
9743+
LISTC_FOR_EACH(&thedb->sql_threads, thd, lnk) {
9744+
if (thd->clnt)
9745+
thd->stop_this_statement = 1;
9746+
}
9747+
Pthread_mutex_unlock(&gbl_sql_lock);
9748+
9749+
return 0;
9750+
}
9751+
9752+
static int _ucancel_sql_statements_que(void)
9753+
{
9754+
uuid_t zerouuid;
9755+
comdb2uuid_clear(zerouuid);
9756+
cancel_connections(1, zerouuid);
9757+
return 0;
9758+
}
9759+
9760+
static int _ucancel_sql_statements_all(void)
9761+
{
9762+
uuid_t zerouuid;
9763+
comdb2uuid_clear(zerouuid);
9764+
cancel_connections(0, zerouuid);
9765+
return 0;
9766+
}
9767+
9768+
static int _ucancel_sql_statements_fp(char * uuidstr)
9769+
{
9770+
uuid_t uuid;
9771+
9772+
if (uuid_parse(uuidstr, uuid)) {
9773+
logmsg(LOGMSG_ERROR, "Failed to parse uuid \"%s\"\n", uuidstr);
9774+
return -1;
9775+
}
9776+
9777+
cancel_connections(0, uuid);
9778+
return 0;
9779+
}
9780+
9781+
/*
9782+
* "Unified" sql statements cancel routine
9783+
* MODES:
9784+
* type == UCANCEL_ALL #cancel all running and queued statements
9785+
* type == UCANCEL_RUN #cancel all running statements
9786+
* type == UCANCEL_QUE #cancel all queued statements
9787+
* type == UCANCEL_CNO #cancel one running statement with cnonce = "uuid"
9788+
* type == UCANCEL_FPT #cancel all running statement with fingerprint = "uuid"
9789+
*
9790+
*/
9791+
int ucancel_sql_statements(enum ucancel_type type, char *uuid) {
9792+
switch(type) {
9793+
case UCANCEL_CNO:
9794+
return _ucancel_sql_statements_cno(uuid);
9795+
case UCANCEL_RUN:
9796+
return _ucancel_sql_statements_run();
9797+
case UCANCEL_QUE:
9798+
return _ucancel_sql_statements_que();
9799+
case UCANCEL_ALL:
9800+
return _ucancel_sql_statements_all();
9801+
case UCANCEL_FPT:
9802+
return _ucancel_sql_statements_fp(uuid);
9803+
default:
9804+
logmsg(LOGMSG_ERROR, "%d unimplemented\n", type);
9805+
return -1;
9806+
}
9807+
return 0;
9808+
}
9809+
97039810
void sql_dump_running_statements(void)
97049811
{
97059812
struct sql_thread *thd;

db/sqlinterfaces.c

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4437,7 +4437,13 @@ static void sqlengine_work_lua_thread(void *thddata, void *work)
44374437
clnt->deque_timeus = comdb2_time_epochus();
44384438
clnt->thd = thd;
44394439
/* Reset the cancel-statement flag */
4440-
thd->sqlthd->stop_this_statement = 0;
4440+
if (clnt->discard_this) {
4441+
thd->sqlthd->stop_this_statement = 1;
4442+
clnt->discard_this = 0;
4443+
} else
4444+
thd->sqlthd->stop_this_statement = 0;
4445+
4446+
clnt->discard_this = 0;
44414447
sql_update_usertran_state(clnt);
44424448

44434449
rdlock_schema_lk();
@@ -4695,7 +4701,11 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)
46954701
sqlthd->clnt = clnt;
46964702
clnt->thd = thd;
46974703
/* Reset the cancel-statement flag */
4698-
sqlthd->stop_this_statement = 0;
4704+
if (clnt->discard_this) {
4705+
thd->sqlthd->stop_this_statement = 1;
4706+
clnt->discard_this = 0;
4707+
} else
4708+
thd->sqlthd->stop_this_statement = 0;
46994709

47004710
thr_set_user("appsock", (intptr_t)clnt->appsock_id);
47014711

@@ -6475,6 +6485,11 @@ static void gather_connection_int(struct connection_info *c, struct sqlclntstate
64756485
c->in_transaction = clnt->in_client_trans;
64766486
c->in_local_cache = clnt->in_local_cache;
64776487
Pthread_mutex_unlock(&clnt->state_lk);
6488+
uuidstr_t us;
6489+
comdb2uuidstr(clnt->unifieduuid, us);
6490+
c->uuid = malloc(strlen(us) + 1);
6491+
snprintf(c->uuid, strlen(us) + 1, "%s", us);
6492+
c->is_canceled = clnt->discard_this;
64786493
}
64796494

64806495
static void gather_connections_evbuffer(struct connection_info **info, int *num_connections)
@@ -6504,13 +6519,40 @@ void free_connection_info(struct connection_info *info, int num_connections)
65046519
{
65056520
if (info == NULL) return;
65066521
for (int i = 0; i < num_connections; i++) {
6507-
if (info[i].sql) free(info[i].sql);
6508-
if (info[i].fingerprint) free(info[i].fingerprint);
6522+
free(info[i].sql);
6523+
free(info[i].fingerprint);
6524+
free(info[i].uuid);
65096525
/* state is static, don't free */
65106526
}
65116527
free(info);
65126528
}
65136529

6530+
void cancel_connections(int only_queued, uuid_t filter_fp)
6531+
{
6532+
struct sqlclntstate *clnt;
6533+
Pthread_mutex_lock(&lru_evbuffers_mtx);
6534+
TAILQ_FOREACH(clnt, &sql_evbuffers, sql_entry) {
6535+
if (!comdb2uuid_is_zero(filter_fp)) {
6536+
if (!comdb2uuidcmp(filter_fp, clnt->unifieduuid)) {
6537+
clnt->discard_this = 1;
6538+
fprintf(stderr, "%s:%d Cancelling %s\n", __func__, __LINE__, clnt->sql);
6539+
}
6540+
} else {
6541+
if (only_queued) {
6542+
if (clnt->state == CONNECTION_QUEUED) {
6543+
clnt->discard_this = 1;
6544+
fprintf(stderr, "%s:%d Cancelling %s\n", __func__, __LINE__, clnt->sql);
6545+
}
6546+
} else if (clnt->state == CONNECTION_RUNNING ||
6547+
clnt->state == CONNECTION_QUEUED) {
6548+
clnt->discard_this = 1;
6549+
fprintf(stderr, "%s:%d Cancelling %s\n", __func__, __LINE__, clnt->sql);
6550+
}
6551+
}
6552+
}
6553+
Pthread_mutex_unlock(&lru_evbuffers_mtx);
6554+
}
6555+
65146556
void reqlog_long_running_sql_statements(void)
65156557
{
65166558
struct sqlclntstate *clnt;

0 commit comments

Comments
 (0)