|
5 | 5 | * GNU General Public License version 2.
|
6 | 6 | */
|
7 | 7 |
|
| 8 | +use anyhow::anyhow; |
8 | 9 | use anyhow::Result;
|
9 | 10 | use blobstore::Blobstore;
|
10 | 11 | use blobstore::BlobstoreGetData;
|
| 12 | +use blobstore::BlobstoreIsPresent; |
11 | 13 | use blobstore::BlobstorePutOps;
|
12 | 14 | use blobstore_sync_queue::BlobstoreWal;
|
13 | 15 | use blobstore_sync_queue::OperationKey;
|
@@ -558,8 +560,214 @@ async fn test_get_on_existing(fb: FacebookInit) -> Result<()> {
|
558 | 560 | Ok(())
|
559 | 561 | }
|
560 | 562 |
|
561 |
| -async fn assert_pending<T: PartialEq + Debug>(fut: &mut (impl Future<Output = T> + Unpin)) { |
562 |
| - assert_eq!(PollOnce::new(fut).await, Poll::Pending); |
| 563 | +#[fbinit::test] |
| 564 | +async fn test_is_present_missing(fb: FacebookInit) -> Result<()> { |
| 565 | + let ctx = CoreContext::test_mock(fb); |
| 566 | + |
| 567 | + let (_tickable_queue, wal_queue) = setup_queue(); |
| 568 | + let (tickable_blobstores, blobstores) = setup_blobstores(3); |
| 569 | + |
| 570 | + let quorum = 2; |
| 571 | + let multiplex = |
| 572 | + WalMultiplexedBlobstore::new(MultiplexId::new(1), wal_queue, blobstores, vec![], quorum)?; |
| 573 | + |
| 574 | + // No blobstores have the key |
| 575 | + let k = "k1"; |
| 576 | + |
| 577 | + // all `is_present` succeed, multiplexed returns `Absent` |
| 578 | + { |
| 579 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 580 | + assert_pending(&mut fut).await; |
| 581 | + |
| 582 | + tickable_blobstores[0].1.tick(None); |
| 583 | + assert_pending(&mut fut).await; |
| 584 | + tickable_blobstores[1].1.tick(None); |
| 585 | + |
| 586 | + // the read-quorum on `None` achieved, multiplexed returns `Absent` |
| 587 | + assert_is_present_ok(fut.await, BlobstoreIsPresent::Absent); |
| 588 | + tickable_blobstores[2].1.drain(1); |
| 589 | + } |
| 590 | + |
| 591 | + // two `is_present`s succeed, multiplexed returns `Absent` |
| 592 | + { |
| 593 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 594 | + assert_pending(&mut fut).await; |
| 595 | + |
| 596 | + tickable_blobstores[0].1.tick(None); |
| 597 | + tickable_blobstores[1].1.tick(Some("bs1 failed")); |
| 598 | + // muliplexed is_present waits on the third |
| 599 | + assert_pending(&mut fut).await; |
| 600 | + tickable_blobstores[2].1.tick(None); |
| 601 | + |
| 602 | + // the read-quorum achieved, multiplexed returns `Absent` |
| 603 | + assert_is_present_ok(fut.await, BlobstoreIsPresent::Absent); |
| 604 | + } |
| 605 | + |
| 606 | + // two `is_present`s fail, multiplexed returns `ProbablyNotPresent` |
| 607 | + { |
| 608 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 609 | + assert_pending(&mut fut).await; |
| 610 | + |
| 611 | + tickable_blobstores[0].1.tick(Some("bs0 failed")); |
| 612 | + tickable_blobstores[1].1.tick(None); |
| 613 | + // muliplexed is_present waits on the third |
| 614 | + assert_pending(&mut fut).await; |
| 615 | + tickable_blobstores[2].1.tick(Some("bs2 failed")); |
| 616 | + |
| 617 | + assert_is_present_ok( |
| 618 | + fut.await, |
| 619 | + BlobstoreIsPresent::ProbablyNotPresent(anyhow!("some failed!")), |
| 620 | + ); |
| 621 | + } |
| 622 | + |
| 623 | + // all `is_present`s fail, multiplexed fails |
| 624 | + { |
| 625 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 626 | + assert_pending(&mut fut).await; |
| 627 | + |
| 628 | + for (id, store) in tickable_blobstores { |
| 629 | + store.tick(Some(format!("bs{} failed!", id).as_str())); |
| 630 | + } |
| 631 | + assert!(fut.await.is_err()); |
| 632 | + } |
| 633 | + |
| 634 | + Ok(()) |
| 635 | +} |
| 636 | + |
| 637 | +#[fbinit::test] |
| 638 | +async fn test_is_present_existing(fb: FacebookInit) -> Result<()> { |
| 639 | + let ctx = CoreContext::test_mock(fb); |
| 640 | + |
| 641 | + let (tickable_queue, wal_queue) = setup_queue(); |
| 642 | + let (tickable_blobstores, blobstores) = setup_blobstores(3); |
| 643 | + |
| 644 | + let quorum = 2; |
| 645 | + let multiplex = |
| 646 | + WalMultiplexedBlobstore::new(MultiplexId::new(1), wal_queue, blobstores, vec![], quorum)?; |
| 647 | + |
| 648 | + // Two blobstores have the key, one failed to write: [ ] [x] [ ] |
| 649 | + { |
| 650 | + let v = make_value("v1"); |
| 651 | + let k = "k1"; |
| 652 | + |
| 653 | + let mut put_fut = multiplex |
| 654 | + .put(&ctx, k.to_owned(), v.clone()) |
| 655 | + .map_err(|_| ()) |
| 656 | + .boxed(); |
| 657 | + assert_pending(&mut put_fut).await; |
| 658 | + |
| 659 | + // wal queue write succeeds |
| 660 | + tickable_queue.tick(None); |
| 661 | + assert_pending(&mut put_fut).await; |
| 662 | + |
| 663 | + tickable_blobstores[0].1.tick(None); |
| 664 | + tickable_blobstores[1].1.tick(Some("bs1 failed")); |
| 665 | + tickable_blobstores[2].1.tick(None); |
| 666 | + |
| 667 | + // multiplexed put succeeds: write quorum achieved |
| 668 | + assert!(put_fut.await.is_ok()); |
| 669 | + |
| 670 | + // first `is_present` succeed with `Present`, multiplexed returns `Present` |
| 671 | + { |
| 672 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 673 | + assert_pending(&mut fut).await; |
| 674 | + |
| 675 | + tickable_blobstores[0].1.tick(None); |
| 676 | + assert_is_present_ok(fut.await, BlobstoreIsPresent::Present); |
| 677 | + |
| 678 | + for (_id, store) in &tickable_blobstores[1..] { |
| 679 | + store.drain(1); |
| 680 | + } |
| 681 | + } |
| 682 | + |
| 683 | + // first `is_present` fails, second succeed with `Absent`, third returns `Present` |
| 684 | + // multiplexed returns `Present` |
| 685 | + { |
| 686 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 687 | + assert_pending(&mut fut).await; |
| 688 | + |
| 689 | + tickable_blobstores[0].1.tick(Some("bs0 failed")); |
| 690 | + tickable_blobstores[1].1.tick(None); |
| 691 | + assert_pending(&mut fut).await; |
| 692 | + |
| 693 | + tickable_blobstores[2].1.tick(None); |
| 694 | + assert_is_present_ok(fut.await, BlobstoreIsPresent::Present); |
| 695 | + } |
| 696 | + } |
| 697 | + |
| 698 | + // Two blobstores failed to write, one succeeded: [x] [ ] [x] |
| 699 | + { |
| 700 | + let v = make_value("v2"); |
| 701 | + let k = "k2"; |
| 702 | + |
| 703 | + let mut put_fut = multiplex |
| 704 | + .put(&ctx, k.to_owned(), v.clone()) |
| 705 | + .map_err(|_| ()) |
| 706 | + .boxed(); |
| 707 | + assert_pending(&mut put_fut).await; |
| 708 | + |
| 709 | + // wal queue write succeeds |
| 710 | + tickable_queue.tick(None); |
| 711 | + assert_pending(&mut put_fut).await; |
| 712 | + |
| 713 | + tickable_blobstores[0].1.tick(Some("bs0 failed")); |
| 714 | + tickable_blobstores[1].1.tick(None); |
| 715 | + tickable_blobstores[2].1.tick(Some("bs2 failed")); |
| 716 | + |
| 717 | + // multiplexed put failed: no write quorum |
| 718 | + assert!(put_fut.await.is_err()); |
| 719 | + |
| 720 | + // the first `is_present` to succeed returns `Present`, multiplexed returns `Present` |
| 721 | + { |
| 722 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 723 | + assert_pending(&mut fut).await; |
| 724 | + |
| 725 | + tickable_blobstores[1].1.tick(None); |
| 726 | + assert_is_present_ok(fut.await, BlobstoreIsPresent::Present); |
| 727 | + |
| 728 | + tickable_blobstores[0].1.drain(1); |
| 729 | + tickable_blobstores[2].1.drain(1); |
| 730 | + } |
| 731 | + |
| 732 | + // if the first two `is_present` calls return `Absent`, multiplexed returns `Absent` |
| 733 | + { |
| 734 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 735 | + assert_pending(&mut fut).await; |
| 736 | + |
| 737 | + tickable_blobstores[0].1.tick(None); |
| 738 | + tickable_blobstores[2].1.tick(None); |
| 739 | + |
| 740 | + assert_is_present_ok(fut.await, BlobstoreIsPresent::Absent); |
| 741 | + tickable_blobstores[1].1.drain(1); |
| 742 | + } |
| 743 | + |
| 744 | + // if one `is_present` returns `Absent`, another 2 fail, multiplexed is unsure |
| 745 | + { |
| 746 | + let mut fut = multiplex.is_present(&ctx, k).map_err(|_| ()).boxed(); |
| 747 | + assert_pending(&mut fut).await; |
| 748 | + |
| 749 | + tickable_blobstores[0].1.tick(None); |
| 750 | + for (id, store) in &tickable_blobstores[1..] { |
| 751 | + store.tick(Some(format!("bs{} failed", id).as_str())); |
| 752 | + } |
| 753 | + |
| 754 | + assert_is_present_ok( |
| 755 | + fut.await, |
| 756 | + BlobstoreIsPresent::ProbablyNotPresent(anyhow!("some failed!")), |
| 757 | + ); |
| 758 | + } |
| 759 | + } |
| 760 | + |
| 761 | + Ok(()) |
| 762 | +} |
| 763 | + |
| 764 | +async fn assert_pending<T: Debug>(fut: &mut (impl Future<Output = T> + Unpin)) { |
| 765 | + match PollOnce::new(fut).await { |
| 766 | + Poll::Pending => {} |
| 767 | + state => { |
| 768 | + panic!("future must be pending, received: {:?}", state); |
| 769 | + } |
| 770 | + } |
563 | 771 | }
|
564 | 772 |
|
565 | 773 | type TickableBytes = Tickable<(BlobstoreBytes, u64)>;
|
@@ -601,3 +809,19 @@ fn validate_blob(
|
601 | 809 | assert_eq!(get_data.as_ref(), expected);
|
602 | 810 | }
|
603 | 811 | }
|
| 812 | + |
| 813 | +fn assert_is_present_ok(result: Result<BlobstoreIsPresent, ()>, expected: BlobstoreIsPresent) { |
| 814 | + assert!(result.is_ok()); |
| 815 | + match (result.unwrap(), expected) { |
| 816 | + (BlobstoreIsPresent::Absent, BlobstoreIsPresent::Absent) |
| 817 | + | (BlobstoreIsPresent::Present, BlobstoreIsPresent::Present) |
| 818 | + | (BlobstoreIsPresent::ProbablyNotPresent(_), BlobstoreIsPresent::ProbablyNotPresent(_)) => { |
| 819 | + } |
| 820 | + (res, exp) => { |
| 821 | + panic!( |
| 822 | + "`is_present` call must return {:?}, received: {:?}", |
| 823 | + exp, res |
| 824 | + ); |
| 825 | + } |
| 826 | + } |
| 827 | +} |
0 commit comments