Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion db/sltdbt.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,10 @@ int handle_ireq(struct ireq *iq)
}

if (rc == RC_INTERNAL_FORWARD) {
rc = 0;
if (iq->ipc_sndbak) {
rc = ERR_NOMASTER;
} else
rc = 0;
} else {
/* SNDBAK RESPONSE */
if (iq->debug) {
Expand Down
114 changes: 58 additions & 56 deletions db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ struct plugin_callbacks {
plugin_func *clr_high_availability; /* newsql_clr_high_availability */
plugin_func *get_high_availability; /* newsql_get_high_availability*/
plugin_func *has_parallel_sql; /* newsql_has_parallel_sql */
plugin_func *is_legacy_request; /* newsql_is_legacy_request */

add_steps_func *add_steps; /* newsql_add_steps */
setup_client_info_func *setup_client_info; /* newsql_setup_client_info */
Expand Down Expand Up @@ -515,62 +516,63 @@ struct plugin_callbacks {
#define make_plugin_optional_null(clnt, name) \
(clnt)->plugin.column_##name = NULL

#define plugin_set_callbacks(clnt, name) \
do { \
make_plugin_callback(clnt, name, write_response); \
make_plugin_callback(clnt, name, read_response); \
make_plugin_callback(clnt, name, save_stmt); \
make_plugin_callback(clnt, name, restore_stmt); \
make_plugin_callback(clnt, name, destroy_stmt); \
make_plugin_callback(clnt, name, print_stmt); \
make_plugin_callback(clnt, name, param_count); \
make_plugin_callback(clnt, name, param_index); \
make_plugin_callback(clnt, name, param_value); \
make_plugin_callback(clnt, name, override_count); \
make_plugin_callback(clnt, name, override_type); \
make_plugin_callback(clnt, name, has_cnonce); \
make_plugin_callback(clnt, name, set_cnonce); \
make_plugin_callback(clnt, name, clr_cnonce); \
make_plugin_callback(clnt, name, get_cnonce); \
make_plugin_callback(clnt, name, get_snapshot); \
make_plugin_callback(clnt, name, upd_snapshot); \
make_plugin_callback(clnt, name, clr_snapshot); \
make_plugin_callback(clnt, name, has_high_availability); \
make_plugin_callback(clnt, name, set_high_availability); \
make_plugin_callback(clnt, name, clr_high_availability); \
make_plugin_callback(clnt, name, get_high_availability); \
make_plugin_callback(clnt, name, has_parallel_sql); \
make_plugin_callback(clnt, name, add_steps); \
make_plugin_callback(clnt, name, setup_client_info); \
make_plugin_callback(clnt, name, skip_row); \
make_plugin_callback(clnt, name, log_context); \
make_plugin_callback(clnt, name, get_client_starttime); \
make_plugin_callback(clnt, name, get_client_retries); \
make_plugin_callback(clnt, name, send_intrans_response); \
make_plugin_callback(clnt, name, close); \
make_plugin_callback(clnt, name, flush); \
make_plugin_callback(clnt, name, get_fileno); \
make_plugin_callback(clnt, name, get_x509_attr); \
make_plugin_callback(clnt, name, has_ssl); \
make_plugin_callback(clnt, name, has_x509); \
make_plugin_callback(clnt, name, local_check); \
make_plugin_callback(clnt, name, peer_check); \
make_plugin_callback(clnt, name, get_authdata); \
make_plugin_callback(clnt, name, api_type); \
make_plugin_optional_null(clnt, count); \
make_plugin_optional_null(clnt, type); \
make_plugin_optional_null(clnt, int64); \
make_plugin_optional_null(clnt, double); \
make_plugin_optional_null(clnt, text); \
make_plugin_optional_null(clnt, bytes); \
make_plugin_optional_null(clnt, blob); \
make_plugin_optional_null(clnt, datetime); \
make_plugin_optional_null(clnt, interval); \
make_plugin_optional_null(clnt, value); \
(clnt)->plugin.state = NULL; \
(clnt)->plugin.next_row = NULL; \
(clnt)->plugin.tzname = NULL; \
(clnt)->plugin.query_data_func = NULL; \
#define plugin_set_callbacks(clnt, name) \
do { \
make_plugin_callback(clnt, name, write_response); \
make_plugin_callback(clnt, name, read_response); \
make_plugin_callback(clnt, name, save_stmt); \
make_plugin_callback(clnt, name, restore_stmt); \
make_plugin_callback(clnt, name, destroy_stmt); \
make_plugin_callback(clnt, name, print_stmt); \
make_plugin_callback(clnt, name, param_count); \
make_plugin_callback(clnt, name, param_index); \
make_plugin_callback(clnt, name, param_value); \
make_plugin_callback(clnt, name, override_count); \
make_plugin_callback(clnt, name, override_type); \
make_plugin_callback(clnt, name, has_cnonce); \
make_plugin_callback(clnt, name, set_cnonce); \
make_plugin_callback(clnt, name, clr_cnonce); \
make_plugin_callback(clnt, name, get_cnonce); \
make_plugin_callback(clnt, name, get_snapshot); \
make_plugin_callback(clnt, name, upd_snapshot); \
make_plugin_callback(clnt, name, clr_snapshot); \
make_plugin_callback(clnt, name, has_high_availability); \
make_plugin_callback(clnt, name, set_high_availability); \
make_plugin_callback(clnt, name, clr_high_availability); \
make_plugin_callback(clnt, name, get_high_availability); \
make_plugin_callback(clnt, name, has_parallel_sql); \
make_plugin_callback(clnt, name, add_steps); \
make_plugin_callback(clnt, name, setup_client_info); \
make_plugin_callback(clnt, name, skip_row); \
make_plugin_callback(clnt, name, log_context); \
make_plugin_callback(clnt, name, get_client_starttime); \
make_plugin_callback(clnt, name, get_client_retries); \
make_plugin_callback(clnt, name, send_intrans_response); \
make_plugin_callback(clnt, name, close); \
make_plugin_callback(clnt, name, flush); \
make_plugin_callback(clnt, name, get_fileno); \
make_plugin_callback(clnt, name, get_x509_attr); \
make_plugin_callback(clnt, name, has_ssl); \
make_plugin_callback(clnt, name, has_x509); \
make_plugin_callback(clnt, name, local_check); \
make_plugin_callback(clnt, name, peer_check); \
make_plugin_callback(clnt, name, get_authdata); \
make_plugin_callback(clnt, name, api_type); \
make_plugin_callback(clnt, name, is_legacy_request); \
make_plugin_optional_null(clnt, count); \
make_plugin_optional_null(clnt, type); \
make_plugin_optional_null(clnt, int64); \
make_plugin_optional_null(clnt, double); \
make_plugin_optional_null(clnt, text); \
make_plugin_optional_null(clnt, bytes); \
make_plugin_optional_null(clnt, blob); \
make_plugin_optional_null(clnt, datetime); \
make_plugin_optional_null(clnt, interval); \
make_plugin_optional_null(clnt, value); \
(clnt)->plugin.state = NULL; \
(clnt)->plugin.next_row = NULL; \
(clnt)->plugin.tzname = NULL; \
(clnt)->plugin.query_data_func = NULL; \
} while (0)

int param_count(struct sqlclntstate *);
Expand Down
43 changes: 27 additions & 16 deletions db/sqlinterfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -4715,18 +4715,21 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)

assert(clnt->dbtran.pStmt == NULL);

/* everything going in is cursor based */
int rc = get_curtran(thedb->bdb_env, clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "%s td %p: unable to get a CURSOR transaction, rc=%d!\n", __func__, (void *)pthread_self(),
rc);
send_run_error(clnt, "Transaction is not durable", CDB2ERR_NOTDURABLE);
clnt->query_rc = -1;
clnt->osql.timings.query_finished = osql_log_time();
osql_log_time_done(clnt);
clnt_change_state(clnt, CONNECTION_IDLE);
signal_clnt_as_done(clnt);
return;
int is_legacy_request = clnt->plugin.is_legacy_request(clnt);
if (!is_legacy_request) {
/* everything going in is cursor based */
int rc = get_curtran(thedb->bdb_env, clnt);
if (rc) {
logmsg(LOGMSG_ERROR, "%s td %p: unable to get a CURSOR transaction, rc=%d!\n", __func__,
(void *)pthread_self(), rc);
send_run_error(clnt, "Transaction is not durable", CDB2ERR_NOTDURABLE);
clnt->query_rc = -1;
clnt->osql.timings.query_finished = osql_log_time();
osql_log_time_done(clnt);
clnt_change_state(clnt, CONNECTION_IDLE);
signal_clnt_as_done(clnt);
return;
}
}

/* it is a new query, it is time to clean the error */
Expand All @@ -4746,7 +4749,8 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)
/* actually execute the query */
thrman_setfd(thd->thr_self, get_fileno(clnt));

osql_shadtbl_begin_query(thedb->bdb_env, clnt);
if (!is_legacy_request)
osql_shadtbl_begin_query(thedb->bdb_env, clnt);

if (clnt->fdb_state.remote_sql_sb) {
clnt->query_rc = execute_sql_query_offload(thd, clnt);
Expand All @@ -4760,16 +4764,19 @@ void sqlengine_work_appsock(struct sqlthdstate *thd, struct sqlclntstate *clnt)
put_ref(&clnt->sql_ref);
}

osql_shadtbl_done_query(thedb->bdb_env, clnt);
if (!is_legacy_request)
osql_shadtbl_done_query(thedb->bdb_env, clnt);
thrman_setfd(thd->thr_self, -1);

/* this is a compromise; we release the curtran here, even though
we might have a begin/commit transaction pending
any query inside the begin/commit will be performed under its
own locker id;
*/
if (put_curtran(thedb->bdb_env, clnt)) {
logmsg(LOGMSG_ERROR, "%s: unable to destroy a CURSOR transaction!\n", __func__);
if (!is_legacy_request) {
if (put_curtran(thedb->bdb_env, clnt)) {
logmsg(LOGMSG_ERROR, "%s: unable to destroy a CURSOR transaction!\n", __func__);
}
}
clnt->osql.timings.query_finished = osql_log_time();
osql_log_time_done(clnt);
Expand Down Expand Up @@ -6804,6 +6811,10 @@ static const char * internal_api_type(struct sqlclntstate *clnt)
{
return "internal";
}
static int internal_is_legacy_request(struct sqlclntstate *clnt)
{
return 0;
}

void start_internal_sql_clnt(struct sqlclntstate *clnt)
{
Expand Down
8 changes: 8 additions & 0 deletions db/toblock.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ static int forward_longblock_to_master(struct ireq *iq,
return ERR_NOMASTER;
}

// Don't attempt to forward legacy requests - return an error and let
// the API/proxy handle it.
if (iq->ipc_sndbak)
return ERR_NOMASTER;

/*modify request to indicate forwarded and send off to remote */
if (req_hdr_get(&req_hdr, iq->p_buf_out_start,
p_blkstate->p_buf_req_start, 0) != p_blkstate->p_buf_req_start)
Expand Down Expand Up @@ -532,6 +537,9 @@ static int forward_block_to_master(struct ireq *iq, block_state_t *p_blkstate,
return ERR_NOMASTER;
}

if (iq->ipc_sndbak) {
return ERR_NOMASTER;
}

/*modify request to indicate forwarded and send off to remote */
if (req_hdr_get(&req_hdr, iq->p_buf_out_start,
Expand Down
16 changes: 14 additions & 2 deletions plugins/newsql/newsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ extern int gbl_incoherent_clnt_wait;
extern int gbl_new_leader_duration;
extern int gbl_use_modsnap_for_snapshot;
extern int gbl_gen_shard_verbose;
void dump_response(const CDB2SQLRESPONSE *r);
static void dump_response(const CDB2SQLRESPONSE *r);

struct newsql_appdata {
NEWSQL_APPDATA_COMMON
Expand Down Expand Up @@ -2551,6 +2551,17 @@ static void *newsql_get_authdata(struct sqlclntstate *clnt)
return NULL;
}

static int newsql_is_legacy_request(sqlclntstate *clnt)
{
struct newsql_appdata *appdata = clnt->appdata;
if (appdata) {
CDB2SQLQUERY *sql_query = appdata->sqlquery;
if (sql_query->is_legacy_request)
return 1;
}
return 0;
}

void newsql_setup_clnt(struct sqlclntstate *clnt)
{
struct newsql_appdata *appdata = clnt->appdata;
Expand Down Expand Up @@ -2878,7 +2889,8 @@ static const char *feature_str(CDB2ServerFeatures f) {
};
}

void dump_response(const CDB2SQLRESPONSE *r) {
static void dump_response(const CDB2SQLRESPONSE *r)
{
int depth = 0;
dump(depth, "CDB2_SQLRESPONSE: {\n");
depth++;
Expand Down
1 change: 1 addition & 0 deletions protobuf/sqlquery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ message CDB2_SQLQUERY {
required bytes data = 4;
}
optional IdentityBlob identity = 18;
optional bool is_legacy_request = 19 [ default = false ];
}

message CDB2_DBINFO {
Expand Down