Skip to content

Commit c339080

Browse files
committed
CDRIVER-3305 stop unordered bulk writes only on invalid server stream
1 parent c899fc3 commit c339080

File tree

2 files changed

+83
-4
lines changed

2 files changed

+83
-4
lines changed

src/libmongoc/src/mongoc/mongoc-write-command.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -625,10 +625,12 @@ _mongoc_write_opmsg (mongoc_write_command_t *command,
625625

626626
if (!ret) {
627627
result->failed = true;
628-
/* Conservatively set must_stop to true. Per CDRIVER-3305 we
629-
* shouldn't stop for unordered bulk writes, but also need to check
630-
* if the server stream was invalidated per CDRIVER-3306. */
631-
result->must_stop = true;
628+
/* Stop for ordered bulk writes or when the server stream has been
629+
* properly invalidated (e.g., due to a network error). */
630+
if (command->flags.ordered || !mongoc_cluster_stream_valid (
631+
&client->cluster, server_stream)) {
632+
result->must_stop = true;
633+
}
632634
}
633635

634636
/* Result merge needs to know the absolute index for a document

src/libmongoc/tests/test-mongoc-bulk.c

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2945,6 +2945,79 @@ test_write_concern_write_command_unordered_multi_err (void)
29452945
}
29462946

29472947

2948+
/* Test for CDRIVER-3305 - Continue unordered bulk writes on error */
2949+
static void
2950+
test_unordered_bulk_writes_with_error (void)
2951+
{
2952+
mock_server_t *server;
2953+
mongoc_uri_t *uri;
2954+
mongoc_client_t *client;
2955+
mongoc_collection_t *collection;
2956+
mongoc_bulk_operation_t *bulk;
2957+
uint32_t i;
2958+
bson_error_t error;
2959+
future_t *future;
2960+
request_t *request;
2961+
bson_t reply;
2962+
2963+
server = mock_server_new ();
2964+
mock_server_run (server);
2965+
2966+
/* server is "recovering": not master, not secondary */
2967+
mock_server_auto_ismaster (server,
2968+
"{'ok': 1,"
2969+
" 'maxWireVersion': %d,"
2970+
" 'maxWriteBatchSize': 1,"
2971+
" 'ismaster': true,"
2972+
" 'secondary': false,"
2973+
" 'setName': 'rs',"
2974+
" 'hosts': ['%s']}",
2975+
WIRE_VERSION_OP_MSG,
2976+
mock_server_get_host_and_port (server));
2977+
2978+
uri = mongoc_uri_copy (mock_server_get_uri (server));
2979+
/* disable retryable writes, so we move to the next operation on error */
2980+
mongoc_uri_set_option_as_bool (uri, MONGOC_URI_RETRYWRITES, false);
2981+
2982+
client = mongoc_client_new_from_uri (uri);
2983+
2984+
collection = mongoc_client_get_collection (client, "db", "test");
2985+
/* use an unordered bulk write; we expect to continue on error */
2986+
bulk = mongoc_collection_create_bulk_operation_with_opts (
2987+
collection, tmp_bson("{'ordered': false}"));
2988+
/* maxWriteBatchSize is set to 1; with 2 inserts we get a batch split */
2989+
for (i = 0; i < 2; i++) {
2990+
mongoc_bulk_operation_insert_with_opts (
2991+
bulk, tmp_bson("{'_id': %d}", i), NULL, &error);
2992+
}
2993+
future = future_bulk_operation_execute (bulk, &reply, &error);
2994+
2995+
request = mock_server_receives_request (server);
2996+
BSON_ASSERT (request);
2997+
mock_server_replies_simple (
2998+
request, "{ 'errmsg': 'random error', 'ok': 0 }");
2999+
request_destroy (request);
3000+
/* should receive a second request */
3001+
request = mock_server_receives_request (server);
3002+
/* a failure of this assertion means that the client did not continue with
3003+
* the next write operation; it stopped permaturely */
3004+
BSON_ASSERT (request);
3005+
mock_server_replies_simple (
3006+
request, "{ 'errmsg': 'random error', 'ok': 0 }");
3007+
request_destroy (request);
3008+
ASSERT (future_wait (future));
3009+
3010+
mongoc_client_destroy (client);
3011+
mongoc_collection_destroy (collection);
3012+
mongoc_bulk_operation_destroy (bulk);
3013+
bson_destroy (&reply);
3014+
future_destroy (future);
3015+
mongoc_uri_destroy (uri);
3016+
mock_server_destroy (server);
3017+
3018+
}
3019+
3020+
29483021
static void
29493022
_test_write_concern_err_api (int32_t error_api_version)
29503023
{
@@ -4899,6 +4972,10 @@ test_bulk_install (TestSuite *suite)
48994972
suite,
49004973
"/BulkOperation/write_concern/write_command/unordered/multi_err",
49014974
test_write_concern_write_command_unordered_multi_err);
4975+
TestSuite_AddMockServerTest (
4976+
suite,
4977+
"/BulkOperation/writes/unordered/error",
4978+
test_unordered_bulk_writes_with_error);
49024979
TestSuite_AddMockServerTest (
49034980
suite,
49044981
"/BulkOperation/write_concern/error/write_command/v1",

0 commit comments

Comments
 (0)