Skip to content

Commit 7cbea30

Browse files
CDRIVER-2386 refactor retryable writes into shared function (#1576)
Adds a new private function `mongoc_cluster_run_retryable_write` and removes duplicate retry logic. No functional changes are expected. * rename `is_retryable` to `can_retry` Intended to better describe. `is_retryable_write` describes if the command is eligible for retry (regardless of whether retry occurred). `can_retry` is initially true if `is_retryable_write` is true. `can_retry` is set to false after a retry (can only retry once). * simplify condition `_mongoc_write_error_update_if_unsupported_storage_engine` only applies if the server does not support retryable writes. Apply regardless of whether a retry occurred. * remove `_mongoc_client_retryable_write_command_with_stream` --------- Co-authored-by: Ezra Chung <[email protected]>
1 parent badc2af commit 7cbea30

File tree

5 files changed

+138
-275
lines changed

5 files changed

+138
-275
lines changed

src/libmongoc/src/mongoc/mongoc-client.c

Lines changed: 11 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,109 +1554,6 @@ mongoc_client_command (mongoc_client_t *client,
15541554
}
15551555

15561556

1557-
static bool
1558-
_mongoc_client_retryable_write_command_with_stream (mongoc_client_t *client,
1559-
mongoc_cmd_parts_t *parts,
1560-
mongoc_server_stream_t *server_stream,
1561-
bson_t *reply,
1562-
bson_error_t *error)
1563-
{
1564-
mongoc_server_stream_t *retry_server_stream = NULL;
1565-
bson_iter_t txn_number_iter;
1566-
bool is_retryable = true;
1567-
bool ret;
1568-
1569-
ENTRY;
1570-
1571-
BSON_ASSERT_PARAM (client);
1572-
BSON_ASSERT (parts->is_retryable_write);
1573-
1574-
/* increment the transaction number for the first attempt of each retryable
1575-
* write command */
1576-
BSON_ASSERT (bson_iter_init_find (&txn_number_iter, parts->assembled.command, "txnNumber"));
1577-
bson_iter_overwrite_int64 (&txn_number_iter, ++parts->assembled.session->server_session->txn_number);
1578-
1579-
// Store the original error and reply if needed.
1580-
struct {
1581-
bson_t reply;
1582-
bson_error_t error;
1583-
bool set;
1584-
} original_error = {.reply = {0}, .error = {0}, false};
1585-
1586-
retry:
1587-
ret = mongoc_cluster_run_command_monitored (&client->cluster, &parts->assembled, reply, error);
1588-
1589-
_mongoc_write_error_handle_labels (ret, error, reply, server_stream->sd);
1590-
1591-
if (is_retryable) {
1592-
_mongoc_write_error_update_if_unsupported_storage_engine (ret, error, reply);
1593-
}
1594-
1595-
/* If a retryable error is encountered and the write is retryable, select
1596-
* a new writable stream and retry. If server selection fails or the selected
1597-
* server does not support retryable writes, fall through and allow the
1598-
* original error to be reported. */
1599-
if (is_retryable && _mongoc_write_error_get_type (reply) == MONGOC_WRITE_ERR_RETRY) {
1600-
bson_error_t ignored_error;
1601-
1602-
// The write command may be retried at most once.
1603-
is_retryable = false;
1604-
1605-
{
1606-
mongoc_deprioritized_servers_t *const ds = mongoc_deprioritized_servers_new ();
1607-
1608-
mongoc_deprioritized_servers_add_if_sharded (ds, server_stream->topology_type, server_stream->sd);
1609-
1610-
BSON_ASSERT (!retry_server_stream);
1611-
retry_server_stream =
1612-
mongoc_cluster_stream_for_writes (&client->cluster, parts->assembled.session, ds, NULL, &ignored_error);
1613-
1614-
mongoc_deprioritized_servers_destroy (ds);
1615-
}
1616-
1617-
if (retry_server_stream) {
1618-
parts->assembled.server_stream = retry_server_stream;
1619-
{
1620-
// Store the original error and reply before retry.
1621-
BSON_ASSERT (!original_error.set); // Retry only happens once.
1622-
original_error.set = true;
1623-
bson_copy_to (reply, &original_error.reply);
1624-
if (error) {
1625-
original_error.error = *error;
1626-
}
1627-
}
1628-
bson_destroy (reply);
1629-
GOTO (retry);
1630-
}
1631-
}
1632-
1633-
if (retry_server_stream) {
1634-
mongoc_server_stream_cleanup (retry_server_stream);
1635-
}
1636-
1637-
// If a retry attempt fails with an error labeled NoWritesPerformed,
1638-
// drivers MUST return the original error.
1639-
if (original_error.set && mongoc_error_has_label (reply, "NoWritesPerformed")) {
1640-
if (error) {
1641-
*error = original_error.error;
1642-
}
1643-
bson_destroy (reply);
1644-
bson_copy_to (&original_error.reply, reply);
1645-
}
1646-
1647-
if (original_error.set) {
1648-
bson_destroy (&original_error.reply);
1649-
}
1650-
1651-
if (ret && error) {
1652-
/* if a retry succeeded, clear the initial error */
1653-
memset (error, 0, sizeof (bson_error_t));
1654-
}
1655-
1656-
RETURN (ret);
1657-
}
1658-
1659-
16601557
static bool
16611558
_mongoc_client_retryable_read_command_with_stream (mongoc_client_t *client,
16621559
mongoc_cmd_parts_t *parts,
@@ -1750,7 +1647,17 @@ _mongoc_client_command_with_stream (mongoc_client_t *client,
17501647
}
17511648

17521649
if (parts->is_retryable_write) {
1753-
RETURN (_mongoc_client_retryable_write_command_with_stream (client, parts, server_stream, reply, error));
1650+
mongoc_server_stream_t *retry_server_stream = NULL;
1651+
1652+
bool ret = mongoc_cluster_run_retryable_write (
1653+
&client->cluster, &parts->assembled, true /* is_retryable */, &retry_server_stream, reply, error);
1654+
1655+
if (retry_server_stream) {
1656+
mongoc_server_stream_cleanup (retry_server_stream);
1657+
parts->assembled.server_stream = NULL;
1658+
}
1659+
1660+
RETURN (ret);
17541661
}
17551662

17561663
if (parts->is_retryable_read) {

src/libmongoc/src/mongoc/mongoc-cluster-private.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,18 @@ mongoc_cluster_stream_valid (mongoc_cluster_t *cluster, mongoc_server_stream_t *
186186
bool
187187
mongoc_cluster_run_command_monitored (mongoc_cluster_t *cluster, mongoc_cmd_t *cmd, bson_t *reply, bson_error_t *error);
188188

189+
// `mongoc_cluster_run_retryable_write` executes a write command and may apply retryable writes behavior.
190+
// `cmd->server_stream` is set to `*retry_server_stream` on retry. Otherwise, it is unmodified.
191+
// `*retry_server_stream` is set to a new stream on retry. The caller must call `mongoc_server_stream_cleanup`.
192+
// `*reply` must be uninitialized and is always initialized upon return. The caller must call `bson_destroy`.
193+
bool
194+
mongoc_cluster_run_retryable_write (mongoc_cluster_t *cluster,
195+
mongoc_cmd_t *cmd,
196+
bool is_retryable_write,
197+
mongoc_server_stream_t **retry_server_stream,
198+
bson_t *reply,
199+
bson_error_t *error);
200+
189201
bool
190202
mongoc_cluster_run_command_parts (mongoc_cluster_t *cluster,
191203
mongoc_server_stream_t *server_stream,

src/libmongoc/src/mongoc/mongoc-cluster.c

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3636,3 +3636,99 @@ mcd_rpc_message_decompress_if_necessary (mcd_rpc_message *rpc, void **data, size
36363636

36373637
return mcd_rpc_message_decompress (rpc, data, data_len);
36383638
}
3639+
3640+
bool
3641+
mongoc_cluster_run_retryable_write (mongoc_cluster_t *cluster,
3642+
mongoc_cmd_t *cmd,
3643+
bool is_retryable_write,
3644+
mongoc_server_stream_t **retry_server_stream,
3645+
bson_t *reply,
3646+
bson_error_t *error)
3647+
{
3648+
BSON_ASSERT_PARAM (cluster);
3649+
BSON_ASSERT_PARAM (cmd);
3650+
BSON_ASSERT_PARAM (retry_server_stream);
3651+
BSON_ASSERT_PARAM (reply);
3652+
BSON_ASSERT (error || true);
3653+
3654+
bool ret;
3655+
// `can_retry` is set to false on retry. A retry may only happen once.
3656+
bool can_retry = is_retryable_write;
3657+
3658+
// Increment the transaction number for the first attempt of each retryable write command.
3659+
if (is_retryable_write) {
3660+
bson_iter_t txn_number_iter;
3661+
BSON_ASSERT (bson_iter_init_find (&txn_number_iter, cmd->command, "txnNumber"));
3662+
bson_iter_overwrite_int64 (&txn_number_iter, ++cmd->session->server_session->txn_number);
3663+
}
3664+
3665+
// Store the original error and reply if needed.
3666+
struct {
3667+
bson_t reply;
3668+
bson_error_t error;
3669+
bool set;
3670+
} original_error = {.reply = {0}, .error = {0}, .set = false};
3671+
3672+
// Ensure `*retry_server_stream` is always valid or null.
3673+
*retry_server_stream = NULL;
3674+
3675+
retry:
3676+
ret = mongoc_cluster_run_command_monitored (cluster, cmd, reply, error);
3677+
3678+
if (is_retryable_write) {
3679+
_mongoc_write_error_handle_labels (ret, error, reply, cmd->server_stream->sd);
3680+
_mongoc_write_error_update_if_unsupported_storage_engine (ret, error, reply);
3681+
}
3682+
3683+
// If a retryable error is encountered and the write is retryable, select a new writable stream and retry. If server
3684+
// selection fails or the selected server does not support retryable writes, fall through and allow the original
3685+
// error to be reported.
3686+
if (can_retry && _mongoc_write_error_get_type (reply) == MONGOC_WRITE_ERR_RETRY) {
3687+
bson_error_t ignored_error;
3688+
3689+
can_retry = false; // Only retry once.
3690+
3691+
// Select a server.
3692+
{
3693+
mongoc_deprioritized_servers_t *const ds = mongoc_deprioritized_servers_new ();
3694+
3695+
// If talking to a sharded cluster, deprioritize the just-used mongos to prefer a new mongos for the retry.
3696+
mongoc_deprioritized_servers_add_if_sharded (ds, cmd->server_stream->topology_type, cmd->server_stream->sd);
3697+
3698+
*retry_server_stream =
3699+
mongoc_cluster_stream_for_writes (cluster, cmd->session, ds, NULL /* reply */, &ignored_error);
3700+
3701+
mongoc_deprioritized_servers_destroy (ds);
3702+
}
3703+
3704+
if (*retry_server_stream) {
3705+
cmd->server_stream = *retry_server_stream; // Non-owning.
3706+
{
3707+
// Store the original error and reply before retry.
3708+
BSON_ASSERT (!original_error.set); // Retry only happens once.
3709+
original_error.set = true;
3710+
bson_copy_to (reply, &original_error.reply);
3711+
if (error) {
3712+
original_error.error = *error;
3713+
}
3714+
}
3715+
bson_destroy (reply);
3716+
GOTO (retry);
3717+
}
3718+
}
3719+
3720+
// If a retry attempt fails with an error labeled NoWritesPerformed, drivers MUST return the original error.
3721+
if (original_error.set && mongoc_error_has_label (reply, "NoWritesPerformed")) {
3722+
if (error) {
3723+
*error = original_error.error;
3724+
}
3725+
bson_destroy (reply);
3726+
bson_copy_to (&original_error.reply, reply);
3727+
}
3728+
3729+
if (original_error.set) {
3730+
bson_destroy (&original_error.reply);
3731+
}
3732+
3733+
RETURN (ret);
3734+
}

src/libmongoc/src/mongoc/mongoc-collection.c

Lines changed: 13 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -3130,13 +3130,11 @@ mongoc_collection_find_and_modify_with_opts (mongoc_collection_t *collection,
31303130
{
31313131
mongoc_cluster_t *cluster;
31323132
mongoc_cmd_parts_t parts;
3133-
bool is_retryable;
31343133
bson_iter_t iter;
31353134
bson_iter_t inner;
31363135
const char *name;
31373136
bson_t ss_reply;
3138-
bson_t reply_local;
3139-
bson_t *reply_ptr;
3137+
bson_t reply_local = BSON_INITIALIZER;
31403138
bool ret = false;
31413139
bson_t command = BSON_INITIALIZER;
31423140
mongoc_server_stream_t *server_stream = NULL;
@@ -3150,23 +3148,26 @@ mongoc_collection_find_and_modify_with_opts (mongoc_collection_t *collection,
31503148
BSON_ASSERT_PARAM (query);
31513149
BSON_ASSERT_PARAM (opts);
31523150

3153-
reply_ptr = reply ? reply : &reply_local;
3151+
if (reply) {
3152+
bson_init (reply);
3153+
} else {
3154+
// Caller did not pass an output `reply`. Use a local `reply` to determine if a server error is retryable.
3155+
reply = &reply_local;
3156+
}
31543157
cluster = &collection->client->cluster;
31553158

31563159
mongoc_cmd_parts_init (&parts, collection->client, collection->db, MONGOC_QUERY_NONE, &command);
31573160
parts.is_read_command = true;
31583161
parts.is_write_command = true;
31593162

3160-
bson_init (reply_ptr);
3161-
31623163
if (!_mongoc_find_and_modify_appended_opts_parse (cluster->client, &opts->extra, &appended_opts, error)) {
31633164
GOTO (done);
31643165
}
31653166

31663167
server_stream = mongoc_cluster_stream_for_writes (cluster, appended_opts.client_session, NULL, &ss_reply, error);
31673168

31683169
if (!server_stream) {
3169-
bson_concat (reply_ptr, &ss_reply);
3170+
bson_concat (reply, &ss_reply);
31703171
bson_destroy (&ss_reply);
31713172
GOTO (done);
31723173
}
@@ -3282,86 +3283,11 @@ mongoc_collection_find_and_modify_with_opts (mongoc_collection_t *collection,
32823283
GOTO (done);
32833284
}
32843285

3285-
is_retryable = parts.is_retryable_write;
3286-
3287-
/* increment the transaction number for the first attempt of each retryable
3288-
* write command */
3289-
if (is_retryable) {
3290-
bson_iter_t txn_number_iter;
3291-
BSON_ASSERT (bson_iter_init_find (&txn_number_iter, parts.assembled.command, "txnNumber"));
3292-
bson_iter_overwrite_int64 (&txn_number_iter, ++parts.assembled.session->server_session->txn_number);
3293-
}
3294-
3295-
// Store the original error and reply if needed.
3296-
struct {
3297-
bson_t reply;
3298-
bson_error_t error;
3299-
bool set;
3300-
} original_error = {.reply = {0}, .error = {0}, .set = false};
3301-
3302-
retry:
3303-
bson_destroy (reply_ptr);
3304-
ret = mongoc_cluster_run_command_monitored (cluster, &parts.assembled, reply_ptr, error);
3305-
3306-
if (parts.is_retryable_write) {
3307-
_mongoc_write_error_handle_labels (ret, error, reply_ptr, server_stream->sd);
3308-
}
3309-
3310-
if (is_retryable) {
3311-
_mongoc_write_error_update_if_unsupported_storage_engine (ret, error, reply_ptr);
3312-
}
3313-
3314-
/* If a retryable error is encountered and the write is retryable, select
3315-
* a new writable stream and retry. If server selection fails or the selected
3316-
* server does not support retryable writes, fall through and allow the
3317-
* original error to be reported. */
3318-
if (is_retryable && _mongoc_write_error_get_type (reply_ptr) == MONGOC_WRITE_ERR_RETRY) {
3319-
bson_error_t ignored_error;
3286+
bson_destroy (reply);
3287+
ret = mongoc_cluster_run_retryable_write (
3288+
cluster, &parts.assembled, parts.is_retryable_write, &retry_server_stream, reply, error);
33203289

3321-
/* each write command may be retried at most once */
3322-
is_retryable = false;
3323-
3324-
{
3325-
mongoc_deprioritized_servers_t *const ds = mongoc_deprioritized_servers_new ();
3326-
3327-
mongoc_deprioritized_servers_add_if_sharded (ds, server_stream->topology_type, server_stream->sd);
3328-
3329-
retry_server_stream =
3330-
mongoc_cluster_stream_for_writes (cluster, parts.assembled.session, ds, NULL /* reply */, &ignored_error);
3331-
3332-
mongoc_deprioritized_servers_destroy (ds);
3333-
}
3334-
3335-
if (retry_server_stream) {
3336-
parts.assembled.server_stream = retry_server_stream;
3337-
{
3338-
// Store the original error and reply before retry.
3339-
BSON_ASSERT (!original_error.set); // Retry only happens once.
3340-
original_error.set = true;
3341-
bson_copy_to (reply_ptr, &original_error.reply);
3342-
if (error) {
3343-
original_error.error = *error;
3344-
}
3345-
}
3346-
GOTO (retry);
3347-
}
3348-
}
3349-
3350-
// If a retry attempt fails with an error labeled NoWritesPerformed,
3351-
// drivers MUST return the original error.
3352-
if (original_error.set && mongoc_error_has_label (reply_ptr, "NoWritesPerformed")) {
3353-
if (error) {
3354-
*error = original_error.error;
3355-
}
3356-
bson_destroy (reply_ptr);
3357-
bson_copy_to (&original_error.reply, reply_ptr);
3358-
}
3359-
3360-
if (original_error.set) {
3361-
bson_destroy (&original_error.reply);
3362-
}
3363-
3364-
if (bson_iter_init_find (&iter, reply_ptr, "writeConcernError") && BSON_ITER_HOLDS_DOCUMENT (&iter)) {
3290+
if (bson_iter_init_find (&iter, reply, "writeConcernError") && BSON_ITER_HOLDS_DOCUMENT (&iter)) {
33653291
const char *errmsg = NULL;
33663292
int32_t code = 0;
33673293

@@ -3388,9 +3314,7 @@ mongoc_collection_find_and_modify_with_opts (mongoc_collection_t *collection,
33883314
}
33893315
mongoc_cmd_parts_cleanup (&parts);
33903316
bson_destroy (&command);
3391-
if (&reply_local == reply_ptr) {
3392-
bson_destroy (&reply_local);
3393-
}
3317+
bson_destroy (&reply_local);
33943318
_mongoc_find_and_modify_appended_opts_cleanup (&appended_opts);
33953319
RETURN (ret);
33963320
}

0 commit comments

Comments
 (0)