|
44 | 44 | #include "mongo/s/catalog/type_chunk.h"
|
45 | 45 | #include "mongo/s/catalog/type_collection.h"
|
46 | 46 | #include "mongo/s/catalog/type_config_version.h"
|
| 47 | +#include "mongo/s/catalog/type_database.h" |
47 | 48 | #include "mongo/s/catalog/type_lockpings.h"
|
48 | 49 | #include "mongo/s/catalog/type_locks.h"
|
49 | 50 | #include "mongo/s/catalog/type_shard.h"
|
50 | 51 | #include "mongo/s/catalog/type_tags.h"
|
51 | 52 | #include "mongo/s/client/shard_registry.h"
|
| 53 | +#include "mongo/s/database_version_gen.h" |
52 | 54 | #include "mongo/s/grid.h"
|
53 | 55 | #include "mongo/s/write_ops/batched_command_request.h"
|
54 | 56 | #include "mongo/s/write_ops/batched_command_response.h"
|
@@ -405,171 +407,134 @@ Lock::ExclusiveLock ShardingCatalogManager::lockZoneMutex(OperationContext* opCt
|
405 | 407 | return lk;
|
406 | 408 | }
|
407 | 409 |
|
408 |
| -void ShardingCatalogManager::upgradeChunksAndTags(OperationContext* opCtx) { |
409 |
| - // Upgrade each chunk document by deleting and re-inserting with the 4.4 _id format. |
410 |
| - { |
411 |
| - Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); |
412 |
| - |
413 |
| - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); |
414 |
| - auto findResponse = uassertStatusOK( |
415 |
| - configShard->exhaustiveFindOnConfig(opCtx, |
416 |
| - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, |
417 |
| - repl::ReadConcernLevel::kLocalReadConcern, |
418 |
| - ChunkType::ConfigNS, |
419 |
| - {}, |
420 |
| - {}, |
421 |
| - boost::none /* limit */)); |
422 |
| - |
423 |
| - AlternativeSessionRegion asr(opCtx); |
424 |
| - AuthorizationSession::get(asr.opCtx()->getClient()) |
425 |
| - ->grantInternalAuthorization(asr.opCtx()->getClient()); |
426 |
| - TxnNumber txnNumber = 0; |
427 |
| - for (const auto& chunkObj : findResponse.docs) { |
428 |
| - auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkObj)); |
429 |
| - |
430 |
| - removeDocumentsInLocalTxn( |
431 |
| - asr.opCtx(), |
432 |
| - ChunkType::ConfigNS, |
433 |
| - BSON(ChunkType::ns(chunk.getNS().ns()) << ChunkType::min(chunk.getMin())), |
434 |
| - true /* startTransaction */, |
435 |
| - txnNumber); |
436 |
| - |
437 |
| - // Note that ChunkType::toConfigBSON() will not include an _id if one hasn't been set, |
438 |
| - // which will be the case for chunks written in the 4.2 format because parsing ignores |
439 |
| - // _ids in the 4.2 format, so the insert path will generate one for us. |
440 |
| - insertDocumentsInLocalTxn(asr.opCtx(), |
441 |
| - ChunkType::ConfigNS, |
442 |
| - {chunk.toConfigBSON()}, |
443 |
| - false /* startTransaction */, |
444 |
| - txnNumber); |
445 |
| - |
446 |
| - commitLocalTxn(asr.opCtx(), txnNumber); |
| 410 | +// TODO SERVER-44034: Remove this function. |
| 411 | +void deleteAndInsertChunk(OperationContext* opCtx, |
| 412 | + const BSONObj& chunkDoc, |
| 413 | + bool startTransaction, |
| 414 | + TxnNumber txnNumber, |
| 415 | + ShardingCatalogManager::ConfigUpgradeType upgradeType) { |
| 416 | + auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc)); |
447 | 417 |
|
448 |
| - txnNumber += 1; |
449 |
| - } |
450 |
| - } |
| 418 | + removeDocumentsInLocalTxn( |
| 419 | + opCtx, |
| 420 | + ChunkType::ConfigNS, |
| 421 | + BSON(ChunkType::ns(chunk.getNS().ns()) << ChunkType::min(chunk.getMin())), |
| 422 | + startTransaction, |
| 423 | + txnNumber); |
451 | 424 |
|
452 |
| - // Upgrade each tag document by deleting and re-inserting with the 4.4 _id format. |
453 |
| - { |
454 |
| - Lock::ExclusiveLock lk(opCtx->lockState(), _kZoneOpLock); |
455 |
| - |
456 |
| - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); |
457 |
| - auto findResponse = uassertStatusOK( |
458 |
| - configShard->exhaustiveFindOnConfig(opCtx, |
459 |
| - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, |
460 |
| - repl::ReadConcernLevel::kLocalReadConcern, |
461 |
| - TagsType::ConfigNS, |
462 |
| - {}, |
463 |
| - {}, |
464 |
| - boost::none /* limit */)); |
465 |
| - |
466 |
| - AlternativeSessionRegion asr(opCtx); |
467 |
| - AuthorizationSession::get(asr.opCtx()->getClient()) |
468 |
| - ->grantInternalAuthorization(asr.opCtx()->getClient()); |
469 |
| - TxnNumber txnNumber = 0; |
470 |
| - for (const auto& tagObj : findResponse.docs) { |
471 |
| - auto tag = uassertStatusOK(TagsType::fromBSON(tagObj)); |
472 |
| - |
473 |
| - removeDocumentsInLocalTxn( |
474 |
| - asr.opCtx(), |
475 |
| - TagsType::ConfigNS, |
476 |
| - BSON(TagsType::ns(tag.getNS().ns()) << TagsType::min(tag.getMinKey())), |
477 |
| - true /* startTransaction */, |
478 |
| - txnNumber); |
479 |
| - |
480 |
| - // Note that TagsType::toBSON() will not include an _id, so the insert path will |
481 |
| - // generate one for us. |
482 |
| - insertDocumentsInLocalTxn(asr.opCtx(), |
483 |
| - TagsType::ConfigNS, |
484 |
| - {tag.toBSON()}, |
485 |
| - false /* startTransaction */, |
486 |
| - txnNumber); |
| 425 | + insertDocumentsInLocalTxn( |
| 426 | + opCtx, |
| 427 | + ChunkType::ConfigNS, |
| 428 | + {upgradeType == ShardingCatalogManager::ConfigUpgradeType::kUpgrade |
| 429 | + // Note that ChunkType::toConfigBSON() will not include an _id if one hasn't been set, |
| 430 | + // which will be the case for chunks written in the 4.2 format because parsing ignores |
| 431 | + // _ids in the 4.2 format, so the insert path will generate one for us. |
| 432 | + ? chunk.toConfigBSON() |
| 433 | + : chunk.toConfigBSONLegacyID()}, |
| 434 | + false /* startTransaction */, |
| 435 | + txnNumber); |
| 436 | +} |
487 | 437 |
|
488 |
| - commitLocalTxn(asr.opCtx(), txnNumber); |
| 438 | +// TODO SERVER-44034: Remove this function. |
| 439 | +void deleteAndInsertTag(OperationContext* opCtx, |
| 440 | + const BSONObj& tagDoc, |
| 441 | + bool startTransaction, |
| 442 | + TxnNumber txnNumber, |
| 443 | + ShardingCatalogManager::ConfigUpgradeType upgradeType) { |
| 444 | + auto tag = uassertStatusOK(TagsType::fromBSON(tagDoc)); |
489 | 445 |
|
490 |
| - txnNumber += 1; |
491 |
| - } |
492 |
| - } |
| 446 | + removeDocumentsInLocalTxn( |
| 447 | + opCtx, |
| 448 | + TagsType::ConfigNS, |
| 449 | + BSON(TagsType::ns(tag.getNS().ns()) << TagsType::min(tag.getMinKey())), |
| 450 | + startTransaction, |
| 451 | + txnNumber); |
| 452 | + |
| 453 | + insertDocumentsInLocalTxn(opCtx, |
| 454 | + TagsType::ConfigNS, |
| 455 | + {upgradeType == ShardingCatalogManager::ConfigUpgradeType::kUpgrade |
| 456 | + // Note that TagsType::toBSON() will not include an _id, so the |
| 457 | + // insert path will generate one for us. |
| 458 | + ? tag.toBSON() |
| 459 | + : tag.toBSONLegacyID()}, |
| 460 | + false /* startTransaction */, |
| 461 | + txnNumber); |
493 | 462 | }
|
494 | 463 |
|
495 |
| -void ShardingCatalogManager::downgradeChunksAndTags(OperationContext* opCtx) { |
496 |
| - // Downgrade each chunk document by deleting and re-inserting with the 4.2 _id format. |
497 |
| - { |
498 |
| - Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); |
499 |
| - |
500 |
| - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); |
501 |
| - auto findResponse = uassertStatusOK( |
502 |
| - configShard->exhaustiveFindOnConfig(opCtx, |
503 |
| - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, |
504 |
| - repl::ReadConcernLevel::kLocalReadConcern, |
505 |
| - ChunkType::ConfigNS, |
506 |
| - {}, |
507 |
| - {}, |
508 |
| - boost::none /* limit */)); |
509 |
| - |
510 |
| - AlternativeSessionRegion asr(opCtx); |
511 |
| - AuthorizationSession::get(asr.opCtx()->getClient()) |
512 |
| - ->grantInternalAuthorization(asr.opCtx()->getClient()); |
513 |
| - TxnNumber txnNumber = 0; |
514 |
| - for (const auto& chunkObj : findResponse.docs) { |
515 |
| - auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkObj)); |
516 |
| - |
517 |
| - removeDocumentsInLocalTxn( |
518 |
| - asr.opCtx(), |
519 |
| - ChunkType::ConfigNS, |
520 |
| - BSON(ChunkType::ns(chunk.getNS().ns()) << ChunkType::min(chunk.getMin())), |
521 |
| - true /* startTransaction */, |
522 |
| - txnNumber); |
523 |
| - |
524 |
| - insertDocumentsInLocalTxn(asr.opCtx(), |
525 |
| - ChunkType::ConfigNS, |
526 |
| - {chunk.toConfigBSONLegacyID()}, |
527 |
| - false /* startTransaction */, |
528 |
| - txnNumber); |
529 |
| - |
| 464 | +// TODO SERVER-44034: Remove this function and type. |
| 465 | +using ConfigDocModFunction = std::function<void( |
| 466 | + OperationContext*, BSONObj, bool, TxnNumber, ShardingCatalogManager::ConfigUpgradeType)>; |
| 467 | +void forEachConfigDocInBatchedTransactions(OperationContext* opCtx, |
| 468 | + const NamespaceString& configNss, |
| 469 | + const NamespaceString& shardedCollNss, |
| 470 | + ConfigDocModFunction configDocModFn, |
| 471 | + ShardingCatalogManager::ConfigUpgradeType upgradeType) { |
| 472 | + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); |
| 473 | + auto findResponse = uassertStatusOK( |
| 474 | + configShard->exhaustiveFindOnConfig(opCtx, |
| 475 | + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, |
| 476 | + repl::ReadConcernLevel::kLocalReadConcern, |
| 477 | + configNss, |
| 478 | + BSON("ns" << shardedCollNss.ns()), |
| 479 | + {}, |
| 480 | + boost::none /* limit */)); |
| 481 | + |
| 482 | + AlternativeSessionRegion asr(opCtx); |
| 483 | + AuthorizationSession::get(asr.opCtx()->getClient()) |
| 484 | + ->grantInternalAuthorization(asr.opCtx()->getClient()); |
| 485 | + TxnNumber txnNumber = 0; |
| 486 | + |
| 487 | + const auto batchSizeLimit = 100; |
| 488 | + auto currentBatchSize = 0; |
| 489 | + for (const auto& doc : findResponse.docs) { |
| 490 | + auto startTransaction = currentBatchSize == 0; |
| 491 | + |
| 492 | + configDocModFn(asr.opCtx(), doc, startTransaction, txnNumber, upgradeType); |
| 493 | + |
| 494 | + currentBatchSize += 1; |
| 495 | + if (currentBatchSize == batchSizeLimit) { |
530 | 496 | commitLocalTxn(asr.opCtx(), txnNumber);
|
531 |
| - |
532 | 497 | txnNumber += 1;
|
| 498 | + currentBatchSize = 0; |
533 | 499 | }
|
534 | 500 | }
|
535 | 501 |
|
536 |
| - // Downgrade each tag document by deleting and re-inserting with the 4.2 _id format. |
537 |
| - { |
538 |
| - Lock::ExclusiveLock lk(opCtx->lockState(), _kZoneOpLock); |
539 |
| - |
540 |
| - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); |
541 |
| - auto findResponse = uassertStatusOK( |
542 |
| - configShard->exhaustiveFindOnConfig(opCtx, |
543 |
| - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, |
544 |
| - repl::ReadConcernLevel::kLocalReadConcern, |
545 |
| - TagsType::ConfigNS, |
546 |
| - {}, |
547 |
| - {}, |
548 |
| - boost::none /* limit */)); |
549 |
| - |
550 |
| - AlternativeSessionRegion asr(opCtx); |
551 |
| - AuthorizationSession::get(asr.opCtx()->getClient()) |
552 |
| - ->grantInternalAuthorization(asr.opCtx()->getClient()); |
553 |
| - TxnNumber txnNumber = 0; |
554 |
| - for (const auto& tagObj : findResponse.docs) { |
555 |
| - auto tag = uassertStatusOK(TagsType::fromBSON(tagObj)); |
556 |
| - |
557 |
| - removeDocumentsInLocalTxn( |
558 |
| - asr.opCtx(), |
559 |
| - TagsType::ConfigNS, |
560 |
| - BSON(TagsType::ns(tag.getNS().ns()) << TagsType::min(tag.getMinKey())), |
561 |
| - true /* startTransaction */, |
562 |
| - txnNumber); |
563 |
| - |
564 |
| - insertDocumentsInLocalTxn(asr.opCtx(), |
565 |
| - TagsType::ConfigNS, |
566 |
| - {tag.toBSONLegacyID()}, |
567 |
| - false /* startTransaction */, |
568 |
| - txnNumber); |
569 |
| - |
570 |
| - commitLocalTxn(asr.opCtx(), txnNumber); |
| 502 | + if (currentBatchSize != 0) { |
| 503 | + commitLocalTxn(asr.opCtx(), txnNumber); |
| 504 | + } |
| 505 | +} |
571 | 506 |
|
572 |
| - txnNumber += 1; |
| 507 | +void ShardingCatalogManager::upgradeOrDowngradeChunksAndTags(OperationContext* opCtx, |
| 508 | + ConfigUpgradeType upgradeType) { |
| 509 | + const auto grid = Grid::get(opCtx); |
| 510 | + auto allDbs = uassertStatusOK(grid->catalogClient()->getAllDBs( |
| 511 | + opCtx, repl::ReadConcernLevel::kLocalReadConcern)) |
| 512 | + .value; |
| 513 | + |
| 514 | + // The 'config' database contains the sharded 'config.system.sessions' collection but does not |
| 515 | + // have an entry in config.databases. |
| 516 | + allDbs.emplace_back("config", ShardId("config"), true, DatabaseVersion()); |
| 517 | + |
| 518 | + for (const auto& db : allDbs) { |
| 519 | + auto collections = uassertStatusOK(grid->catalogClient()->getCollections( |
| 520 | + opCtx, &db.getName(), nullptr, repl::ReadConcernLevel::kLocalReadConcern)); |
| 521 | + |
| 522 | + for (const auto& coll : collections) { |
| 523 | + if (coll.getDropped()) { |
| 524 | + continue; |
| 525 | + } |
| 526 | + |
| 527 | + { |
| 528 | + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); |
| 529 | + forEachConfigDocInBatchedTransactions( |
| 530 | + opCtx, ChunkType::ConfigNS, coll.getNs(), deleteAndInsertChunk, upgradeType); |
| 531 | + } |
| 532 | + |
| 533 | + { |
| 534 | + Lock::ExclusiveLock lk(opCtx->lockState(), _kZoneOpLock); |
| 535 | + forEachConfigDocInBatchedTransactions( |
| 536 | + opCtx, TagsType::ConfigNS, coll.getNs(), deleteAndInsertTag, upgradeType); |
| 537 | + } |
573 | 538 | }
|
574 | 539 | }
|
575 | 540 | }
|
|
0 commit comments