|
8 | 8 | import uuid |
9 | 9 |
|
10 | 10 | import pytest |
| 11 | +from pathlib import Path |
11 | 12 |
|
12 | 13 | import helpers.client |
13 | 14 | from helpers.cluster import ClickHouseCluster, ClickHouseInstance |
@@ -2791,3 +2792,129 @@ def test_key_value_args(started_cluster): |
2791 | 2792 | f"S3(\\'{url}\\', \\'TSVRaw\\', format = \\'TSVRaw\\', access_key_id = \\'minio\\', secret_access_key = \\'[HIDDEN]\\', compression_method = \\'gzip\\')" |
2792 | 2793 | in node.query(f"SHOW CREATE TABLE {table_name}") |
2793 | 2794 | ) |
| 2795 | + |
| 2796 | + |
| 2797 | +def test_file_pruning_with_hive_style_partitioning(started_cluster): |
| 2798 | + node = started_cluster.instances["dummy"] |
| 2799 | + table_name = f"test_pruning_with_hive_style_partitioning_{generate_random_string()}" |
| 2800 | + bucket = started_cluster.minio_bucket |
| 2801 | + minio = started_cluster.minio_client |
| 2802 | + |
| 2803 | + url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}" |
| 2804 | + node.query( |
| 2805 | + f""" |
| 2806 | + CREATE TABLE {table_name} (a Int32, b Int32, c String) ENGINE = S3('{url}', format = 'Parquet', partition_strategy = 'hive') |
| 2807 | + PARTITION BY (b, c) |
| 2808 | + """ |
| 2809 | + ) |
| 2810 | + node.query( |
| 2811 | + f"INSERT INTO {table_name} SELECT number, number % 5, toString(number % 2) FROM numbers(20)", |
| 2812 | + settings={"use_hive_partitioning": True}, |
| 2813 | + ) |
| 2814 | + |
| 2815 | + objects = [] |
| 2816 | + for obj in list( |
| 2817 | + minio.list_objects( |
| 2818 | + started_cluster.minio_bucket, |
| 2819 | + prefix=table_name, |
| 2820 | + recursive=True, |
| 2821 | + ) |
| 2822 | + ): |
| 2823 | + objects.append(obj.object_name) |
| 2824 | + |
| 2825 | + objects.sort() |
| 2826 | + assert len(objects) == 10 |
| 2827 | + |
| 2828 | + prefixes = [] |
| 2829 | + for object in objects: |
| 2830 | + assert object.endswith(".parquet") |
| 2831 | + path = Path(object) |
| 2832 | + prefixes.append(str(path.parent)) |
| 2833 | + |
| 2834 | + assert len(prefixes) == 10 |
| 2835 | + assert prefixes == [ |
| 2836 | + f"{table_name}/b=0/c=0", |
| 2837 | + f"{table_name}/b=0/c=1", |
| 2838 | + f"{table_name}/b=1/c=0", |
| 2839 | + f"{table_name}/b=1/c=1", |
| 2840 | + f"{table_name}/b=2/c=0", |
| 2841 | + f"{table_name}/b=2/c=1", |
| 2842 | + f"{table_name}/b=3/c=0", |
| 2843 | + f"{table_name}/b=3/c=1", |
| 2844 | + f"{table_name}/b=4/c=0", |
| 2845 | + f"{table_name}/b=4/c=1", |
| 2846 | + ] |
| 2847 | + |
| 2848 | + def check_read_files(expected, query_id): |
| 2849 | + node.query("SYSTEM FLUSH LOGS") |
| 2850 | + assert expected == int( |
| 2851 | + node.query( |
| 2852 | + f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id = '{query_id}' AND type='QueryFinish'" |
| 2853 | + ) |
| 2854 | + ) |
| 2855 | + |
| 2856 | + # 5 files, each file contains 2 rows |
| 2857 | + assert 5 == int( |
| 2858 | + node.query(f"SELECT uniqExact(_path) FROM {table_name} WHERE c == '0'") |
| 2859 | + ) |
| 2860 | + |
| 2861 | + query_id = f"{table_name}_query_1" |
| 2862 | + assert 10 == int( |
| 2863 | + node.query( |
| 2864 | + f"SELECT count() FROM {table_name} WHERE c == '0'", query_id=query_id |
| 2865 | + ) |
| 2866 | + ) |
| 2867 | + # Check files are pruned. |
| 2868 | + check_read_files(5, query_id) |
| 2869 | + |
| 2870 | + # 2 files, each contains 2 rows |
| 2871 | + assert 2 == int( |
| 2872 | + node.query(f"SELECT uniqExact(_path) FROM {table_name} WHERE b == 3") |
| 2873 | + ) |
| 2874 | + |
| 2875 | + query_id = f"{table_name}_query_2" |
| 2876 | + assert 4 == int( |
| 2877 | + node.query(f"SELECT count() FROM {table_name} WHERE b == 3", query_id=query_id) |
| 2878 | + ) |
| 2879 | + # Check files are pruned. |
| 2880 | + check_read_files(2, query_id) |
| 2881 | + |
| 2882 | + # 1 file with 2 rows. |
| 2883 | + assert 1 == int( |
| 2884 | + node.query( |
| 2885 | + f"SELECT uniqExact(_path) FROM {table_name} WHERE b == 3 AND c == '1'" |
| 2886 | + ) |
| 2887 | + ) |
| 2888 | + |
| 2889 | + query_id = f"{table_name}_query_3" |
| 2890 | + assert 2 == int( |
| 2891 | + node.query( |
| 2892 | + f"SELECT count() FROM {table_name} WHERE b == 3 AND c == '1'", |
| 2893 | + query_id=query_id, |
| 2894 | + ) |
| 2895 | + ) |
| 2896 | + # Check files are pruned. |
| 2897 | + check_read_files(1, query_id) |
| 2898 | + |
| 2899 | + query_id = f"{table_name}_query_4" |
| 2900 | + assert 1 == int( |
| 2901 | + node.query(f"SELECT count() FROM {table_name} WHERE a == 1", query_id=query_id) |
| 2902 | + ) |
| 2903 | + # Nothing is pruned, because `a` is not a partition column. |
| 2904 | + check_read_files(10, query_id) |
| 2905 | + |
| 2906 | + |
| 2907 | +def test_partition_by_without_wildcard(started_cluster): |
| 2908 | + node = started_cluster.instances["dummy"] |
| 2909 | + table_name = f"test_partition_by_without_wildcard_{generate_random_string()}" |
| 2910 | + bucket = started_cluster.minio_bucket |
| 2911 | + |
| 2912 | + url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}" |
| 2913 | + # An exception "Partition strategy wildcard can not be used without a '_partition_id' wildcard" |
| 2914 | + # should not be thrown. |
| 2915 | + node.query( |
| 2916 | + f""" |
| 2917 | +CREATE TABLE {table_name} (a Int32, b Int32, c String) ENGINE = S3('{url}', format = 'Parquet') |
| 2918 | +PARTITION BY (b, c) |
| 2919 | +""" |
| 2920 | + ) |
0 commit comments