@@ -2208,3 +2208,78 @@ def test_filtering_by_virtual_columns(started_cluster):
22082208 f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id = '{ query_id } ' and type = 'QueryFinish'"
22092209 )
22102210 )
2211+
2212+
2213+ def test_column_pruning (started_cluster ):
2214+ instance = started_cluster .instances ["node1" ]
2215+ spark = started_cluster .spark_session
2216+ minio_client = started_cluster .minio_client
2217+ bucket = started_cluster .minio_bucket
2218+ TABLE_NAME = randomize_table_name ("test_column_pruning" )
2219+ result_file = f"{ TABLE_NAME } "
2220+ partition_columns = []
2221+
2222+ schema = StructType (
2223+ [
2224+ StructField ("id" , IntegerType (), nullable = False ),
2225+ StructField ("name" , StringType (), nullable = False ),
2226+ StructField ("age" , IntegerType (), nullable = False ),
2227+ StructField ("country" , StringType (), nullable = False ),
2228+ StructField ("year" , StringType (), nullable = False ),
2229+ ]
2230+ )
2231+
2232+ num_rows = 100000
2233+ now = datetime .now ()
2234+ data = [
2235+ (i , f"name_{ i } " , 32 , "" .join ("a" for _ in range (1000 )), "2025" )
2236+ for i in range (num_rows )
2237+ ]
2238+ df = spark .createDataFrame (data = data , schema = schema )
2239+ df .printSchema ()
2240+ df .write .mode ("append" ).format ("delta" ).partitionBy (partition_columns ).save (
2241+ f"/{ TABLE_NAME } "
2242+ )
2243+
2244+ minio_client = started_cluster .minio_client
2245+ bucket = started_cluster .minio_bucket
2246+
2247+ files = upload_directory (minio_client , bucket , f"/{ TABLE_NAME } " , "" )
2248+ assert len (files ) > 0
2249+ print (f"Uploaded files: { files } " )
2250+
2251+ table_function = f"deltaLake('http://{ started_cluster .minio_ip } :{ started_cluster .minio_port } /{ bucket } /{ result_file } /', 'minio', '{ minio_secret_key } ')"
2252+
2253+ query_id = f"query_{ TABLE_NAME } _1"
2254+ sum = int (
2255+ instance .query (
2256+ f"SELECT sum(id) FROM { table_function } SETTINGS allow_experimental_delta_kernel_rs=0, max_read_buffer_size_remote_fs=100" ,
2257+ query_id = query_id ,
2258+ )
2259+ )
2260+ instance .query ("SYSTEM FLUSH LOGS" )
2261+ assert 467413 == int (
2262+ instance .query (
2263+ f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE query_id = '{ query_id } ' and type = 'QueryFinish'"
2264+ )
2265+ )
2266+
2267+ query_id = f"query_{ TABLE_NAME } _2"
2268+ assert sum == int (
2269+ instance .query (
2270+ f"SELECT sum(id) FROM { table_function } SETTINGS enable_filesystem_cache=0, max_read_buffer_size_remote_fs=100" ,
2271+ query_id = query_id ,
2272+ )
2273+ )
2274+ instance .query ("SYSTEM FLUSH LOGS" )
2275+ assert 1 == int (
2276+ instance .query (
2277+ f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id = '{ query_id } ' and type = 'QueryFinish'"
2278+ )
2279+ )
2280+ # Small diff because in case of delta-kernel metadata reading is not counted in the metric.
2281+ assert 465864 == int (
2282+ instance .query (
2283+ f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE query_id = '{ query_id } ' and type = 'QueryFinish'"
2284+ )
2285+ )
0 commit comments