@@ -451,12 +451,11 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
451
451
}
452
452
doc_len = (size_t ) msg_len - reply_header_size ;
453
453
454
- _mongoc_rpc_swab_from_le (& rpc );
455
- if (rpc .header .opcode == MONGOC_OPCODE_COMPRESSED ) {
454
+ if (BSON_UINT32_FROM_LE (rpc .header .opcode ) == MONGOC_OPCODE_COMPRESSED ) {
456
455
bson_t tmp = BSON_INITIALIZER ;
457
456
uint8_t * buf = NULL ;
458
- size_t len =
459
- rpc . compressed . uncompressed_size + sizeof (mongoc_rpc_header_t );
457
+ size_t len = BSON_UINT32_FROM_LE ( rpc . compressed . uncompressed_size ) +
458
+ sizeof (mongoc_rpc_header_t );
460
459
461
460
reply_buf = bson_malloc0 (msg_len );
462
461
memcpy (reply_buf , reply_header_buf , reply_header_size );
@@ -474,7 +473,6 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
474
473
if (!_mongoc_rpc_scatter (& rpc , reply_buf , msg_len )) {
475
474
GOTO (done );
476
475
}
477
- _mongoc_cluster_inc_ingress_rpc (& rpc );
478
476
479
477
buf = bson_malloc0 (len );
480
478
if (!_mongoc_rpc_decompress (& rpc , buf , len )) {
@@ -486,12 +484,15 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
486
484
GOTO (done );
487
485
}
488
486
487
+ _mongoc_rpc_swab_from_le (& rpc );
488
+ _mongoc_cluster_inc_ingress_rpc (& rpc );
489
+
489
490
_mongoc_rpc_get_first_document (& rpc , & tmp );
490
491
bson_copy_to (& tmp , reply_ptr );
491
492
bson_free (reply_buf );
492
493
bson_free (buf );
493
- } else if (rpc .header .opcode == MONGOC_OPCODE_REPLY &&
494
- rpc .reply_header .n_returned == 1 ) {
494
+ } else if (BSON_UINT32_FROM_LE ( rpc .header .opcode ) == MONGOC_OPCODE_REPLY &&
495
+ BSON_UINT32_FROM_LE ( rpc .reply_header .n_returned ) == 1 ) {
495
496
reply_buf = bson_reserve_buffer (reply_ptr , (uint32_t ) doc_len );
496
497
BSON_ASSERT (reply_buf );
497
498
@@ -505,6 +506,7 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
505
506
"socket error or timeout" );
506
507
GOTO (done );
507
508
}
509
+ _mongoc_rpc_swab_from_le (& rpc );
508
510
} else {
509
511
GOTO (done );
510
512
}
@@ -1981,8 +1983,8 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
1981
1983
1982
1984
if (!mongoc_cluster_check_interval (cluster , server_id )) {
1983
1985
/* Server Selection Spec: try once more */
1984
- server_id = mongoc_topology_select_server_id (
1985
- topology , optype , read_prefs , error );
1986
+ server_id =
1987
+ mongoc_topology_select_server_id ( topology , optype , read_prefs , error );
1986
1988
1987
1989
if (!server_id ) {
1988
1990
RETURN (NULL );
@@ -2332,6 +2334,7 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
2332
2334
need_gle = _mongoc_rpc_needs_gle (rpc , write_concern );
2333
2335
_mongoc_cluster_inc_egress_rpc (rpc );
2334
2336
_mongoc_rpc_gather (rpc , & cluster -> iov );
2337
+ _mongoc_rpc_swab_to_le (rpc );
2335
2338
2336
2339
#ifdef MONGOC_ENABLE_COMPRESSION
2337
2340
if (compressor_id ) {
@@ -2344,13 +2347,13 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
2344
2347
2345
2348
max_msg_size = mongoc_server_stream_max_msg_size (server_stream );
2346
2349
2347
- if (rpc -> header .msg_len > max_msg_size ) {
2350
+ if (BSON_UINT32_FROM_LE ( rpc -> header .msg_len ) > max_msg_size ) {
2348
2351
bson_set_error (error ,
2349
2352
MONGOC_ERROR_CLIENT ,
2350
2353
MONGOC_ERROR_CLIENT_TOO_BIG ,
2351
2354
"Attempted to send an RPC larger than the "
2352
2355
"max allowed message size. Was %u, allowed %u." ,
2353
- rpc -> header .msg_len ,
2356
+ BSON_UINT32_FROM_LE ( rpc -> header .msg_len ) ,
2354
2357
max_msg_size );
2355
2358
GOTO (done );
2356
2359
}
@@ -2362,7 +2365,7 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
2362
2365
gle .header .opcode = MONGOC_OPCODE_QUERY ;
2363
2366
gle .query .flags = MONGOC_QUERY_NONE ;
2364
2367
2365
- switch (rpc -> header .opcode ) {
2368
+ switch (BSON_UINT32_FROM_LE ( rpc -> header .opcode ) ) {
2366
2369
case MONGOC_OPCODE_INSERT :
2367
2370
DB_AND_CMD_FROM_COLLECTION (cmdname , rpc -> insert .collection );
2368
2371
break ;
@@ -2389,8 +2392,6 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
2389
2392
_mongoc_rpc_swab_to_le (& gle );
2390
2393
}
2391
2394
2392
- _mongoc_rpc_swab_to_le (rpc );
2393
-
2394
2395
if (!_mongoc_stream_writev_full (server_stream -> stream ,
2395
2396
cluster -> iov .data ,
2396
2397
cluster -> iov .len ,
@@ -2523,6 +2524,24 @@ mongoc_cluster_try_recv (mongoc_cluster_t *cluster,
2523
2524
RETURN (false);
2524
2525
}
2525
2526
2527
+ if (BSON_UINT32_FROM_LE (rpc -> header .opcode ) == MONGOC_OPCODE_COMPRESSED ) {
2528
+ uint8_t * buf = NULL ;
2529
+ size_t len = BSON_UINT32_FROM_LE (rpc -> compressed .uncompressed_size ) +
2530
+ sizeof (mongoc_rpc_header_t );
2531
+
2532
+ buf = bson_malloc0 (len );
2533
+ if (!_mongoc_rpc_decompress (rpc , buf , len )) {
2534
+ bson_free (buf );
2535
+ bson_set_error (error ,
2536
+ MONGOC_ERROR_PROTOCOL ,
2537
+ MONGOC_ERROR_PROTOCOL_INVALID_REPLY ,
2538
+ "Could not decompress server reply" );
2539
+ RETURN (false);
2540
+ }
2541
+
2542
+ _mongoc_buffer_destroy (buffer );
2543
+ _mongoc_buffer_init (buffer , buf , len , NULL , NULL );
2544
+ }
2526
2545
_mongoc_rpc_swab_from_le (rpc );
2527
2546
2528
2547
_mongoc_cluster_inc_ingress_rpc (rpc );
0 commit comments