Skip to content

Commit 9f31222

Browse files
CDRIVER-4363 Support multiple OP_MSG doc sequences (#1578)
Support sending an OP_MSG with two document sequences (OP_MSG Section with payloadType=1). Intended for upcoming support of the `bulkWrite` command. Drivers are expected to send document sequences for both `nsInfo` and `ops` fields. * remove unnecessary `_mongoc_get_documents_field_name` The payloadType=1 identifier is already available in `mongoc_cmd_t::payload_identifier` * define a `mongoc_cmd_payload_t` struct * define `MONGOC_CMD_PAYLOADS_COUNT_MAX` --------- Co-authored-by: Ezra Chung <[email protected]>
1 parent 460de49 commit 9f31222

File tree

9 files changed

+142
-82
lines changed

9 files changed

+142
-82
lines changed

src/libmongoc/src/mongoc/mongoc-apm.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ static bson_oid_t kObjectIdZero = {{0}};
3131
static void
3232
append_documents_from_cmd (const mongoc_cmd_t *cmd, mongoc_apm_command_started_t *event)
3333
{
34-
if (!cmd->payload || !cmd->payload_size) {
34+
// If there are no document sequences (OP_MSG Section with payloadType=1), return the command unchanged.
35+
if (cmd->payloads_count == 0) {
3536
return;
3637
}
3738

src/libmongoc/src/mongoc/mongoc-client-side-encryption.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,8 +1010,8 @@ append_bson_range_opts (bson_t *bson_range_opts, const mongoc_client_encryption_
10101010
static void
10111011
_prep_for_auto_encryption (const mongoc_cmd_t *cmd, bson_t *out)
10121012
{
1013-
/* If there is no type=1 payload, return the command unchanged. */
1014-
if (!cmd->payload || !cmd->payload_size) {
1013+
// If there are no document sequences (OP_MSG Section with payloadType=1), return the command unchanged.
1014+
if (cmd->payloads_count == 0) {
10151015
BSON_ASSERT (bson_init_static (out, bson_get_data (cmd->command), cmd->command->len));
10161016
return;
10171017
}
@@ -1226,10 +1226,9 @@ _mongoc_cse_auto_encrypt (mongoc_client_t *client_encrypted,
12261226

12271227
/* Create the modified cmd_t. */
12281228
memcpy (encrypted_cmd, cmd, sizeof (mongoc_cmd_t));
1229-
/* Modify the mongoc_cmd_t and clear the payload, since
1230-
* _mongoc_cse_auto_encrypt converted the payload into an embedded array. */
1231-
encrypted_cmd->payload = NULL;
1232-
encrypted_cmd->payload_size = 0;
1229+
/* Modify the mongoc_cmd_t and clear the payloads, since
1230+
* _mongoc_cse_auto_encrypt converted the payloads into an embedded array. */
1231+
encrypted_cmd->payloads_count = 0;
12331232
encrypted_cmd->command = encrypted;
12341233

12351234
ret = true;

src/libmongoc/src/mongoc/mongoc-cluster.c

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3227,24 +3227,28 @@ _mongoc_cluster_run_opmsg_send (
32273227
message_length += mcd_rpc_header_set_response_to (rpc, 0);
32283228
message_length += mcd_rpc_header_set_op_code (rpc, MONGOC_OP_CODE_MSG);
32293229

3230-
mcd_rpc_op_msg_set_sections_count (rpc, cmd->payload ? 2u : 1u);
3230+
BSON_ASSERT (cmd->payloads_count <= MONGOC_CMD_PAYLOADS_COUNT_MAX);
3231+
// Reserve one section for the body (kind 0) and any needed sections for document sequences (kind 1)
3232+
mcd_rpc_op_msg_set_sections_count (rpc, 1u + cmd->payloads_count);
32313233

32323234
message_length += mcd_rpc_op_msg_set_flag_bits (rpc, flags);
32333235
message_length += mcd_rpc_op_msg_section_set_kind (rpc, 0u, 0);
32343236
message_length += mcd_rpc_op_msg_section_set_body (rpc, 0u, bson_get_data (cmd->command));
32353237

3236-
if (cmd->payload) {
3237-
BSON_ASSERT (bson_in_range_signed (size_t, cmd->payload_size));
3238+
for (size_t i = 0; i < cmd->payloads_count; i++) {
3239+
const mongoc_cmd_payload_t payload = cmd->payloads[i];
32383240

3239-
const size_t section_length =
3240-
sizeof (int32_t) + strlen (cmd->payload_identifier) + 1u + (size_t) cmd->payload_size;
3241+
BSON_ASSERT (bson_in_range_signed (size_t, payload.size));
3242+
3243+
const size_t section_length = sizeof (int32_t) + strlen (payload.identifier) + 1u + (size_t) payload.size;
32413244
BSON_ASSERT (bson_in_range_unsigned (int32_t, section_length));
32423245

3243-
message_length += mcd_rpc_op_msg_section_set_kind (rpc, 1u, 1);
3244-
message_length += mcd_rpc_op_msg_section_set_length (rpc, 1u, (int32_t) section_length);
3245-
message_length += mcd_rpc_op_msg_section_set_identifier (rpc, 1u, cmd->payload_identifier);
3246+
size_t section_idx = 1u + i;
3247+
message_length += mcd_rpc_op_msg_section_set_kind (rpc, section_idx, 1);
3248+
message_length += mcd_rpc_op_msg_section_set_length (rpc, section_idx, (int32_t) section_length);
3249+
message_length += mcd_rpc_op_msg_section_set_identifier (rpc, section_idx, payload.identifier);
32463250
message_length +=
3247-
mcd_rpc_op_msg_section_set_document_sequence (rpc, 1u, cmd->payload, (size_t) cmd->payload_size);
3251+
mcd_rpc_op_msg_section_set_document_sequence (rpc, section_idx, payload.documents, (size_t) payload.size);
32483252
}
32493253

32503254
mcd_rpc_message_set_length (rpc, message_length);

src/libmongoc/src/mongoc/mongoc-cmd-private.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,24 @@ typedef enum {
5151
MONGOC_CMD_PARTS_ALLOW_TXN_NUMBER_NO
5252
} mongoc_cmd_parts_allow_txn_number_t;
5353

54+
// `mongoc_cmd_payload_t` represents a document sequence (OP_MSG Section with payloadType=1).
55+
typedef struct {
56+
int32_t size;
57+
const char *identifier;
58+
const uint8_t *documents;
59+
} mongoc_cmd_payload_t;
60+
61+
// OP_MSG supports any number of document sequences. Increase array size to support more document sequences.
62+
#define MONGOC_CMD_PAYLOADS_COUNT_MAX 2
63+
5464
typedef struct _mongoc_cmd_t {
5565
const char *db_name;
5666
mongoc_query_flags_t query_flags;
5767
const bson_t *command;
5868
const char *command_name;
59-
const uint8_t *payload;
60-
int32_t payload_size;
61-
const char *payload_identifier;
69+
size_t payloads_count;
70+
// `payloads[i]` may be read only when `0 <= i < payloads_count`.
71+
mongoc_cmd_payload_t payloads[MONGOC_CMD_PAYLOADS_COUNT_MAX];
6272
mongoc_server_stream_t *server_stream;
6373
int64_t operation_id;
6474
mongoc_client_session_t *session;

src/libmongoc/src/mongoc/mongoc-cmd.c

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ mongoc_cmd_parts_init (mongoc_cmd_parts_t *parts,
5555
parts->assembled.command = NULL;
5656
parts->assembled.query_flags = MONGOC_QUERY_NONE;
5757
parts->assembled.op_msg_is_exhaust = false;
58-
parts->assembled.payload_identifier = NULL;
59-
parts->assembled.payload = NULL;
58+
parts->assembled.payloads_count = 0;
6059
parts->assembled.session = NULL;
6160
parts->assembled.is_acknowledged = true;
6261
parts->assembled.is_txn_finish = false;
@@ -960,27 +959,11 @@ mongoc_cmd_is_compressible (mongoc_cmd_t *cmd)
960959
!!strcasecmp (cmd->command_name, "createuser") && !!strcasecmp (cmd->command_name, "updateuser");
961960
}
962961

963-
/*--------------------------------------------------------------------------
964-
*
965-
* _mongoc_cmd_append_payload_as_array --
966-
* Append a write command payload as an array in a BSON document.
967-
* Used by APM and Client-Side Encryption
968-
*
969-
* Arguments:
970-
* cmd The mongoc_cmd_t, which may contain a payload to be appended.
971-
* out A bson_t, which will be appended to if @cmd->payload is set.
972-
*
973-
* Pre-conditions:
974-
* - @out is initialized.
975-
* - cmd has a payload (i.e. is a write command).
976-
*
977-
* Post-conditions:
978-
* - If @cmd->payload is set, then @out is appended to with the payload
979-
* field's name ("documents" if insert, "updates" if update,
980-
* "deletes" if delete) an the payload as a BSON array.
981-
*
982-
*--------------------------------------------------------------------------
983-
*/
962+
963+
//`_mongoc_cmd_append_payload_as_array` appends document seqence payloads as BSON arrays.
964+
// `cmd` must contain one or more document sequence payloads (`cmd->payloads_count` > 0).
965+
// `out` must be initialized by the caller.
966+
// Used by APM and In-Use Encryption (document sequences are not supported for auto encryption).
984967
void
985968
_mongoc_cmd_append_payload_as_array (const mongoc_cmd_t *cmd, bson_t *out)
986969
{
@@ -990,25 +973,28 @@ _mongoc_cmd_append_payload_as_array (const mongoc_cmd_t *cmd, bson_t *out)
990973
const char *field_name;
991974
bson_array_builder_t *bson;
992975

993-
BSON_ASSERT (cmd->payload && cmd->payload_size);
976+
BSON_ASSERT (cmd->payloads_count > 0);
977+
BSON_ASSERT (cmd->payloads_count <= MONGOC_CMD_PAYLOADS_COUNT_MAX);
994978

995-
/* make array from outgoing OP_MSG payload type 1 on an "insert",
996-
* "update", or "delete" command. */
997-
field_name = _mongoc_get_documents_field_name (cmd->command_name);
998-
BSON_ASSERT (field_name);
999-
BSON_ASSERT (BSON_APPEND_ARRAY_BUILDER_BEGIN (out, field_name, &bson));
979+
for (size_t i = 0; i < cmd->payloads_count; i++) {
980+
BSON_ASSERT (cmd->payloads[i].documents && cmd->payloads[i].size);
1000981

1001-
pos = cmd->payload;
1002-
while (pos < cmd->payload + cmd->payload_size) {
1003-
memcpy (&doc_len, pos, sizeof (doc_len));
1004-
doc_len = BSON_UINT32_FROM_LE (doc_len);
1005-
BSON_ASSERT (bson_init_static (&doc, pos, (size_t) doc_len));
1006-
bson_array_builder_append_document (bson, &doc);
982+
// Create a BSON array from a document sequence (OP_MSG Section with payloadType=1).
983+
field_name = cmd->payloads[i].identifier;
984+
BSON_ASSERT (field_name);
985+
BSON_ASSERT (BSON_APPEND_ARRAY_BUILDER_BEGIN (out, field_name, &bson));
1007986

1008-
pos += doc_len;
1009-
}
987+
pos = cmd->payloads[i].documents;
988+
while (pos < cmd->payloads[i].documents + cmd->payloads[i].size) {
989+
memcpy (&doc_len, pos, sizeof (doc_len));
990+
doc_len = BSON_UINT32_FROM_LE (doc_len);
991+
BSON_ASSERT (bson_init_static (&doc, pos, (size_t) doc_len));
992+
bson_array_builder_append_document (bson, &doc);
1010993

1011-
bson_append_array_builder_end (out, bson);
994+
pos += doc_len;
995+
}
996+
bson_append_array_builder_end (out, bson);
997+
}
1012998
}
1013999

10141000
/*--------------------------------------------------------------------------

src/libmongoc/src/mongoc/mongoc-util-private.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,6 @@ _mongoc_get_real_time_ms (void);
8282
const char *
8383
_mongoc_get_command_name (const bson_t *command);
8484

85-
const char *
86-
_mongoc_get_documents_field_name (const char *command_name);
87-
8885
bool
8986
_mongoc_lookup_bool (const bson_t *bson, const char *key, bool default_value);
9087

src/libmongoc/src/mongoc/mongoc-util.c

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -163,25 +163,6 @@ _mongoc_get_command_name (const bson_t *command)
163163
return name;
164164
}
165165

166-
167-
const char *
168-
_mongoc_get_documents_field_name (const char *command_name)
169-
{
170-
if (!strcmp (command_name, "insert")) {
171-
return "documents";
172-
}
173-
174-
if (!strcmp (command_name, "update")) {
175-
return "updates";
176-
}
177-
178-
if (!strcmp (command_name, "delete")) {
179-
return "deletes";
180-
}
181-
182-
return NULL;
183-
}
184-
185166
bool
186167
_mongoc_lookup_bool (const bson_t *bson, const char *key, bool default_value)
187168
{

src/libmongoc/src/mongoc/mongoc-write-command.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -664,11 +664,13 @@ _mongoc_write_opmsg (mongoc_write_command_t *command,
664664
}
665665

666666
if (ship_it) {
667+
parts.assembled.payloads_count = 1;
668+
mongoc_cmd_payload_t *const payload = &parts.assembled.payloads[0];
667669
/* Seek past the document offset we have already sent */
668-
parts.assembled.payload = command->payload.data + payload_total_offset;
670+
payload->documents = command->payload.data + payload_total_offset;
669671
/* Only send the documents up to this size */
670-
parts.assembled.payload_size = payload_batch_size;
671-
parts.assembled.payload_identifier = gCommandFields[command->type];
672+
payload->size = payload_batch_size;
673+
payload->identifier = gCommandFields[command->type];
672674

673675

674676
mongoc_server_stream_t *new_retry_server_stream = NULL;

src/libmongoc/tests/test-mongoc-cmd.c

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "mock_server/mock-server.h"
2424
#include "mock_server/future-functions.h"
2525
#include "test-libmongoc.h"
26+
#include "mongoc-cluster-private.h"
2627

2728
#undef MONGOC_LOG_DOMAIN
2829
#define MONGOC_LOG_DOMAIN "cmd-test-options"
@@ -68,9 +69,88 @@ test_client_cmd_options (void)
6869
mock_server_destroy (server);
6970
}
7071

72+
static void
73+
capture_last_command (const mongoc_apm_command_started_t *event)
74+
{
75+
bson_t *last_captured = mongoc_apm_command_started_get_context (event);
76+
bson_destroy (last_captured);
77+
const bson_t *cmd = mongoc_apm_command_started_get_command (event);
78+
bson_copy_to (cmd, last_captured);
79+
}
80+
81+
// `test_cmd_with_two_payload1` tests sending an OP_MSG with two document sequence payloads (payloadType=1).
82+
static void
83+
test_cmd_with_two_payload1 (void *ctx)
84+
{
85+
BSON_UNUSED (ctx);
86+
mongoc_client_t *client = test_framework_new_default_client ();
87+
88+
bson_t last_captured = BSON_INITIALIZER;
89+
// Set callback to capture the last command.
90+
{
91+
mongoc_apm_callbacks_t *cbs = mongoc_apm_callbacks_new ();
92+
mongoc_apm_set_command_started_cb (cbs, capture_last_command);
93+
mongoc_client_set_apm_callbacks (client, cbs, &last_captured);
94+
mongoc_apm_callbacks_destroy (cbs);
95+
}
96+
97+
mongoc_cluster_t *cluster = &client->cluster;
98+
bson_error_t error;
99+
100+
// Use `bulkWrite`. Currently, only the `bulkWrite` command supports two document sequence payloads.
101+
bson_t *payload0 = tmp_bson (BSON_STR ({"bulkWrite" : 1}));
102+
bson_t *op = tmp_bson (BSON_STR ({"insert" : 0, "document" : {}}));
103+
bson_t *nsInfo = tmp_bson (BSON_STR ({"ns" : "db.coll"}));
104+
105+
// Create the `mongoc_cmd_t`.
106+
mongoc_cmd_parts_t parts;
107+
mongoc_cmd_parts_init (&parts, client, "admin", MONGOC_QUERY_NONE, payload0);
108+
mongoc_server_stream_t *server_stream = mongoc_cluster_stream_for_writes (
109+
cluster, NULL /* session */, NULL /* deprioritized servers */, NULL /* reply */, &error);
110+
ASSERT_OR_PRINT (server_stream, error);
111+
bool ok = mongoc_cmd_parts_assemble (&parts, server_stream, &error);
112+
ASSERT_OR_PRINT (ok, error);
113+
114+
parts.assembled.payloads_count = 2;
115+
116+
// Set `ops` as a payload1 (of one document)
117+
parts.assembled.payloads[0].identifier = "ops";
118+
parts.assembled.payloads[0].documents = bson_get_data (op);
119+
parts.assembled.payloads[0].size = op->len;
120+
121+
// Set `nsInfo` as a payload1 (of one document)
122+
parts.assembled.payloads[1].identifier = "nsInfo";
123+
parts.assembled.payloads[1].documents = bson_get_data (nsInfo);
124+
parts.assembled.payloads[1].size = nsInfo->len;
125+
126+
// Run the command.
127+
bson_t reply;
128+
ok = mongoc_cluster_run_command_monitored (cluster, &parts.assembled, &reply, &error);
129+
ASSERT_OR_PRINT (ok, error);
130+
ASSERT_MATCH (&reply, BSON_STR ({"ok" : 1}));
131+
132+
// Check that document sequences are converted to a BSON arrays for command monitoring.
133+
ASSERT_MATCH (
134+
&last_captured,
135+
BSON_STR ({"bulkWrite" : 1, "ops" : [ {"insert" : 0, "document" : {}} ], "nsInfo" : [ {"ns" : "db.coll"} ]}));
136+
137+
bson_destroy (&reply);
138+
mongoc_server_stream_cleanup (server_stream);
139+
mongoc_cmd_parts_cleanup (&parts);
140+
mongoc_client_destroy (client);
141+
// Destroy `last_captured` after `client`. `mongoc_client_destroy` sends an `endSessions` command.
142+
bson_destroy (&last_captured);
143+
}
71144

72145
void
73146
test_client_cmd_install (TestSuite *suite)
74147
{
75148
TestSuite_AddMockServerTest (suite, "/Client/cmd/options", test_client_cmd_options);
149+
TestSuite_AddFull (suite,
150+
"/cmd/with_two_payload1",
151+
test_cmd_with_two_payload1,
152+
NULL /* dtor */,
153+
NULL /* ctx */,
154+
test_framework_skip_if_max_wire_version_less_than_25 // require server 8.0 for `bulkWrite`
155+
);
76156
}

0 commit comments

Comments
 (0)