|
| 1 | +import os |
1 | 2 | import time |
2 | 3 | from typing import Callable, Dict, Generator |
3 | 4 | from unittest.mock import patch |
4 | 5 |
|
| 6 | +import httpx |
5 | 7 | from httpx import URL |
6 | 8 | from pytest import fixture, mark |
7 | 9 | from pytest_httpx import HTTPXMock |
8 | 10 |
|
9 | 11 | from firebolt.async_db import connect |
10 | 12 | from firebolt.client.auth import Auth |
| 13 | +from firebolt.client.auth.client_credentials import ClientCredentials |
11 | 14 | from firebolt.utils.cache import CACHE_EXPIRY_SECONDS, _firebolt_cache |
| 15 | +from tests.unit.response import Response |
12 | 16 |
|
13 | 17 |
|
14 | 18 | @fixture(autouse=True) |
@@ -627,3 +631,331 @@ def use_engine_callback_counter(request, **kwargs): |
627 | 631 |
|
628 | 632 | # Verify USE ENGINE was not called again (cache hit) |
629 | 633 | assert use_engine_call_counter == 1, "USE ENGINE was called when cache should hit" |
| 634 | + |
| 635 | + |
| 636 | +async def test_token_cache_expiry_forces_reauthentication( |
| 637 | + db_name: str, |
| 638 | + engine_name: str, |
| 639 | + api_endpoint: str, |
| 640 | + auth: Auth, |
| 641 | + account_name: str, |
| 642 | + mock_connection_flow: Callable, |
| 643 | +): |
| 644 | + """Test that expired authentication tokens force re-authentication.""" |
| 645 | + mock_connection_flow() |
| 646 | + |
| 647 | + fixed_time = 1000000 |
| 648 | + |
| 649 | + with patch("time.time", return_value=fixed_time): |
| 650 | + # First connection should cache token |
| 651 | + async with await connect( |
| 652 | + database=db_name, |
| 653 | + engine_name=engine_name, |
| 654 | + auth=auth, |
| 655 | + account_name=account_name, |
| 656 | + api_endpoint=api_endpoint, |
| 657 | + ) as connection: |
| 658 | + # Connection established and token cached |
| 659 | + assert auth.token is not None |
| 660 | + cached_token_1 = auth.token |
| 661 | + |
| 662 | + # Simulate time passing within token validity |
| 663 | + with patch("time.time", return_value=fixed_time + 1800): # 30 minutes later |
| 664 | + async with await connect( |
| 665 | + database=db_name, |
| 666 | + engine_name=engine_name, |
| 667 | + auth=auth, |
| 668 | + account_name=account_name, |
| 669 | + api_endpoint=api_endpoint, |
| 670 | + ) as connection: |
| 671 | + # Should use cached token |
| 672 | + assert auth.token == cached_token_1 |
| 673 | + |
| 674 | + # Simulate time passing beyond token expiry |
| 675 | + expired_time = fixed_time + CACHE_EXPIRY_SECONDS + 100 |
| 676 | + with patch("time.time", return_value=expired_time): |
| 677 | + # Clear the current token to simulate expiry |
| 678 | + auth._token = None |
| 679 | + auth._expires = fixed_time # Set to expired time |
| 680 | + |
| 681 | + async with await connect( |
| 682 | + database=db_name, |
| 683 | + engine_name=engine_name, |
| 684 | + auth=auth, |
| 685 | + account_name=account_name, |
| 686 | + api_endpoint=api_endpoint, |
| 687 | + ) as connection: |
| 688 | + # Token expired, new authentication should occur |
| 689 | + assert auth.token is not None |
| 690 | + # Should get a new token (implementation detail may vary) |
| 691 | + assert auth.token is not None |
| 692 | + |
| 693 | + |
| 694 | +@mark.usefixtures("fs") |
| 695 | +@mark.parametrize("use_cache", [True, False]) |
| 696 | +async def test_connection_cache_isolation_by_credentials( |
| 697 | + db_name: str, |
| 698 | + engine_name: str, |
| 699 | + auth_url: str, |
| 700 | + httpx_mock: HTTPXMock, |
| 701 | + get_system_engine_url: str, |
| 702 | + get_system_engine_callback: Callable, |
| 703 | + system_engine_query_url: str, |
| 704 | + system_engine_no_db_query_url: str, |
| 705 | + query_url: str, |
| 706 | + use_database_callback: Callable, |
| 707 | + use_engine_callback: Callable, |
| 708 | + query_callback: Callable, |
| 709 | + account_name: str, |
| 710 | + api_endpoint: str, |
| 711 | + access_token: str, |
| 712 | + use_cache: bool, |
| 713 | + enable_cache, # Use enable_cache directly instead of autouse fixture |
| 714 | +): |
| 715 | + """Test that connections with different credentials are cached separately.""" |
| 716 | + |
| 717 | + # Manual cache isolation for this pyfakefs test |
| 718 | + original_memory = _firebolt_cache.memory_cache._cache.copy() |
| 719 | + original_file_cache_disabled = _firebolt_cache.disabled |
| 720 | + original_data_dir = _firebolt_cache._data_dir |
| 721 | + |
| 722 | + try: |
| 723 | + _firebolt_cache.clear() |
| 724 | + _firebolt_cache.enable() |
| 725 | + |
| 726 | + # Set cache directory to fake filesystem path (pyfakefs is active) |
| 727 | + _firebolt_cache._data_dir = "/tmp/test_firebolt_cache" |
| 728 | + os.makedirs(_firebolt_cache._data_dir, exist_ok=True) |
| 729 | + |
| 730 | + # Create a flexible credentials callback that accepts any client_id |
| 731 | + def flexible_check_credentials( |
| 732 | + request: httpx.Request = None, |
| 733 | + **kwargs, |
| 734 | + ) -> Response: |
| 735 | + assert request, "empty request" |
| 736 | + body = request.read().decode("utf-8") |
| 737 | + assert "client_id" in body, "Missing id" |
| 738 | + assert "client_secret" in body, "Missing secret" |
| 739 | + assert "grant_type" in body, "Missing grant_type" |
| 740 | + assert "grant_type=client_credentials" in body, "Invalid grant_type" |
| 741 | + |
| 742 | + return Response( |
| 743 | + status_code=200, |
| 744 | + json={ |
| 745 | + "access_token": access_token, |
| 746 | + "token_type": "Bearer", |
| 747 | + "expires_in": 3600, |
| 748 | + }, |
| 749 | + ) |
| 750 | + |
| 751 | + # Create two different auth objects |
| 752 | + auth1 = ClientCredentials( |
| 753 | + client_id="client_1", client_secret="secret_1", use_token_cache=use_cache |
| 754 | + ) |
| 755 | + auth2 = ClientCredentials( |
| 756 | + client_id="client_2", client_secret="secret_2", use_token_cache=use_cache |
| 757 | + ) |
| 758 | + |
| 759 | + system_engine_call_counter = 0 |
| 760 | + |
| 761 | + def system_engine_callback_counter(request, **kwargs): |
| 762 | + nonlocal system_engine_call_counter |
| 763 | + system_engine_call_counter += 1 |
| 764 | + return get_system_engine_callback(request, **kwargs) |
| 765 | + |
| 766 | + # Set up mocks for both auth credentials |
| 767 | + httpx_mock.add_callback( |
| 768 | + flexible_check_credentials, url=auth_url, is_reusable=True |
| 769 | + ) |
| 770 | + httpx_mock.add_callback( |
| 771 | + system_engine_callback_counter, |
| 772 | + url=get_system_engine_url, |
| 773 | + is_reusable=True, |
| 774 | + ) |
| 775 | + httpx_mock.add_callback( |
| 776 | + use_database_callback, |
| 777 | + url=system_engine_no_db_query_url, |
| 778 | + match_content=f'USE DATABASE "{db_name}"'.encode("utf-8"), |
| 779 | + is_reusable=True, |
| 780 | + ) |
| 781 | + httpx_mock.add_callback( |
| 782 | + use_engine_callback, |
| 783 | + url=system_engine_query_url, |
| 784 | + match_content=f'USE ENGINE "{engine_name}"'.encode("utf-8"), |
| 785 | + is_reusable=True, |
| 786 | + ) |
| 787 | + httpx_mock.add_callback( |
| 788 | + query_callback, |
| 789 | + url=query_url, |
| 790 | + is_reusable=True, |
| 791 | + ) |
| 792 | + |
| 793 | + # Connect with first credentials |
| 794 | + async with await connect( |
| 795 | + database=db_name, |
| 796 | + engine_name=engine_name, |
| 797 | + auth=auth1, |
| 798 | + account_name=account_name, |
| 799 | + api_endpoint=api_endpoint, |
| 800 | + disable_cache=not use_cache, |
| 801 | + ) as connection: |
| 802 | + await connection.cursor().execute("SELECT 1") |
| 803 | + |
| 804 | + first_call_count = system_engine_call_counter |
| 805 | + |
| 806 | + # Connect with second credentials |
| 807 | + async with await connect( |
| 808 | + database=db_name, |
| 809 | + engine_name=engine_name, |
| 810 | + auth=auth2, |
| 811 | + account_name=account_name, |
| 812 | + api_endpoint=api_endpoint, |
| 813 | + disable_cache=not use_cache, |
| 814 | + ) as connection: |
| 815 | + await connection.cursor().execute("SELECT 1") |
| 816 | + |
| 817 | + second_call_count = system_engine_call_counter |
| 818 | + |
| 819 | + # Connect again with first credentials |
| 820 | + async with await connect( |
| 821 | + database=db_name, |
| 822 | + engine_name=engine_name, |
| 823 | + auth=auth1, |
| 824 | + account_name=account_name, |
| 825 | + api_endpoint=api_endpoint, |
| 826 | + disable_cache=not use_cache, |
| 827 | + ) as connection: |
| 828 | + await connection.cursor().execute("SELECT 1") |
| 829 | + |
| 830 | + third_call_count = system_engine_call_counter |
| 831 | + |
| 832 | + if use_cache: |
| 833 | + # With caching: different credentials should trigger separate system engine calls, |
| 834 | + # but same credentials should reuse cache |
| 835 | + assert first_call_count == 1, "First auth should trigger system engine call" |
| 836 | + assert ( |
| 837 | + second_call_count == 2 |
| 838 | + ), "Second auth (different credentials) should trigger another system engine call" |
| 839 | + assert third_call_count == 2, "Third call should reuse cache" |
| 840 | + else: |
| 841 | + # Without caching: every connection should trigger system engine call |
| 842 | + assert first_call_count >= 1, "System engine should be called" |
| 843 | + assert ( |
| 844 | + second_call_count > first_call_count |
| 845 | + ), "Each connection should call system engine" |
| 846 | + assert third_call_count > second_call_count, "No caching means more calls" |
| 847 | + |
| 848 | + finally: |
| 849 | + # Restore original cache state |
| 850 | + _firebolt_cache.memory_cache._cache = original_memory |
| 851 | + _firebolt_cache.disabled = original_file_cache_disabled |
| 852 | + _firebolt_cache._data_dir = original_data_dir |
| 853 | + |
| 854 | + |
| 855 | +async def test_connection_cache_isolation_by_accounts( |
| 856 | + db_name: str, |
| 857 | + engine_name: str, |
| 858 | + auth_url: str, |
| 859 | + httpx_mock: HTTPXMock, |
| 860 | + check_credentials_callback: Callable, |
| 861 | + get_system_engine_url: str, |
| 862 | + get_system_engine_callback: Callable, |
| 863 | + system_engine_query_url: str, |
| 864 | + system_engine_no_db_query_url: str, |
| 865 | + query_url: str, |
| 866 | + use_database_callback: Callable, |
| 867 | + use_engine_callback: Callable, |
| 868 | + query_callback: Callable, |
| 869 | + auth: Auth, |
| 870 | + api_endpoint: str, |
| 871 | +): |
| 872 | + """Test that connections to different accounts are cached separately.""" |
| 873 | + system_engine_call_counter = 0 |
| 874 | + |
| 875 | + def system_engine_callback_counter(request, **kwargs): |
| 876 | + nonlocal system_engine_call_counter |
| 877 | + system_engine_call_counter += 1 |
| 878 | + return get_system_engine_callback(request, **kwargs) |
| 879 | + |
| 880 | + httpx_mock.add_callback(check_credentials_callback, url=auth_url, is_reusable=True) |
| 881 | + |
| 882 | + # Add callbacks for both accounts |
| 883 | + # Mock both account URLs using regex patterns or multiple exact URLs |
| 884 | + account_1_url = str(get_system_engine_url).replace("mock_account_name", "account_1") |
| 885 | + account_2_url = str(get_system_engine_url).replace("mock_account_name", "account_2") |
| 886 | + |
| 887 | + httpx_mock.add_callback( |
| 888 | + system_engine_callback_counter, |
| 889 | + url=account_1_url, |
| 890 | + is_reusable=True, |
| 891 | + ) |
| 892 | + httpx_mock.add_callback( |
| 893 | + system_engine_callback_counter, |
| 894 | + url=account_2_url, |
| 895 | + is_reusable=True, |
| 896 | + ) |
| 897 | + |
| 898 | + httpx_mock.add_callback( |
| 899 | + use_database_callback, |
| 900 | + url=system_engine_no_db_query_url, |
| 901 | + match_content=f'USE DATABASE "{db_name}"'.encode("utf-8"), |
| 902 | + is_reusable=True, |
| 903 | + ) |
| 904 | + httpx_mock.add_callback( |
| 905 | + use_engine_callback, |
| 906 | + url=system_engine_query_url, |
| 907 | + match_content=f'USE ENGINE "{engine_name}"'.encode("utf-8"), |
| 908 | + is_reusable=True, |
| 909 | + ) |
| 910 | + httpx_mock.add_callback( |
| 911 | + query_callback, |
| 912 | + url=query_url, |
| 913 | + is_reusable=True, |
| 914 | + ) |
| 915 | + |
| 916 | + # Connect to first account |
| 917 | + async with await connect( |
| 918 | + database=db_name, |
| 919 | + engine_name=engine_name, |
| 920 | + auth=auth, |
| 921 | + account_name="account_1", |
| 922 | + api_endpoint=api_endpoint, |
| 923 | + ) as connection: |
| 924 | + await connection.cursor().execute("SELECT 1") |
| 925 | + |
| 926 | + first_account_calls = system_engine_call_counter |
| 927 | + |
| 928 | + # Connect to second account with same credentials |
| 929 | + async with await connect( |
| 930 | + database=db_name, |
| 931 | + engine_name=engine_name, |
| 932 | + auth=auth, |
| 933 | + account_name="account_2", |
| 934 | + api_endpoint=api_endpoint, |
| 935 | + ) as connection: |
| 936 | + await connection.cursor().execute("SELECT 1") |
| 937 | + |
| 938 | + second_account_calls = system_engine_call_counter |
| 939 | + |
| 940 | + # Connect again to first account |
| 941 | + async with await connect( |
| 942 | + database=db_name, |
| 943 | + engine_name=engine_name, |
| 944 | + auth=auth, |
| 945 | + account_name="account_1", |
| 946 | + api_endpoint=api_endpoint, |
| 947 | + ) as connection: |
| 948 | + await connection.cursor().execute("SELECT 1") |
| 949 | + |
| 950 | + third_connection_calls = system_engine_call_counter |
| 951 | + |
| 952 | + # Each account should require its own system engine call initially |
| 953 | + assert first_account_calls == 1, "First account should trigger system engine call" |
| 954 | + assert ( |
| 955 | + second_account_calls == 2 |
| 956 | + ), "Second account should trigger another system engine call" |
| 957 | + |
| 958 | + # Return to first account should use cached data |
| 959 | + assert ( |
| 960 | + third_connection_calls == 2 |
| 961 | + ), "First account second connection should use cache" |
0 commit comments