Skip to content

Commit ad99ec2

Browse files
authored
Merge pull request #913 from Altinity/backports/25.3.6/78926
25.3.6 Backport of ClickHouse#78926 Add _time virtual column in S3Queue engine
2 parents 26ba19a + aeabbb0 commit ad99ec2

File tree

4 files changed

+63
-5
lines changed

4 files changed

+63
-5
lines changed

docs/en/engines/table-engines/integrations/s3queue.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ Example:
260260

261261
- `_path` — Path to the file.
262262
- `_file` — Name of the file.
263+
- `_size` — Size of the file.
264+
- `_time` — Time of the file creation.
263265

264266
For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).
265267

src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,11 +958,14 @@ Chunk ObjectStorageQueueSource::generateImpl()
958958
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueReadRows, chunk.getNumRows());
959959
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueReadBytes, chunk.bytes());
960960

961+
const auto & object_metadata = reader.getObjectInfo()->metadata;
962+
961963
VirtualColumnUtils::addRequestedFileLikeStorageVirtualsToChunk(
962964
chunk, read_from_format_info.requested_virtual_columns,
963965
{
964966
.path = path,
965-
.size = reader.getObjectInfo()->metadata->size_bytes
967+
.size = object_metadata->size_bytes,
968+
.last_modified = object_metadata->last_modified
966969
}, getContext());
967970

968971
return chunk;

tests/integration/helpers/s3_queue_common.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ def create_mv(
283283
mv_name=None,
284284
create_dst_table_first=True,
285285
format="column1 UInt32, column2 UInt32, column3 UInt32",
286+
virtual_columns="_path String",
286287
):
287288
if mv_name is None:
288289
mv_name = f"{src_table_name}_mv"
@@ -292,21 +293,29 @@ def create_mv(
292293
DROP TABLE IF EXISTS {mv_name};
293294
""")
294295

296+
virtual_format = ""
297+
virtual_names = ""
298+
virtual_columns_list = virtual_columns.split(",")
299+
for column in virtual_columns_list:
300+
virtual_format += f", {column}"
301+
name, _ = column.strip().rsplit(" ", 1)
302+
virtual_names += f", {name}"
303+
295304
if create_dst_table_first:
296305
node.query(
297306
f"""
298-
CREATE TABLE {dst_table_name} ({format}, _path String)
307+
CREATE TABLE {dst_table_name} ({format} {virtual_format})
299308
ENGINE = MergeTree()
300309
ORDER BY column1;
301-
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
310+
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT * {virtual_names} FROM {src_table_name};
302311
"""
303312
)
304313
else:
305314
node.query(
306315
f"""
307316
SET allow_materialized_view_with_bad_select=1;
308-
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
309-
CREATE TABLE {dst_table_name} ({format}, _path String)
317+
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT * {virtual_names} FROM {src_table_name};
318+
CREATE TABLE {dst_table_name} ({format} {virtual_format})
310319
ENGINE = MergeTree()
311320
ORDER BY column1;
312321
"""

tests/integration/test_storage_s3_queue/test_0.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import string
66
import time
77
import uuid
8+
from datetime import datetime
89
from multiprocessing.dummy import Pool
910

1011
import pytest
@@ -595,3 +596,46 @@ def test_multiple_tables_meta_mismatch(started_cluster):
595596
"keeper_path": keeper_path,
596597
},
597598
)
599+
600+
601+
def test_virtual_columns(started_cluster):
602+
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
603+
node = started_cluster.instances["instance"]
604+
table_name = f"test_s3queue_virtual_columns_{generate_random_string()}"
605+
# A unique path is necessary for repeatable tests
606+
keeper_path = f"/clickhouse/test_{table_name}"
607+
dst_table_name = f"{table_name}_dst"
608+
files_path = f"{table_name}_data"
609+
610+
total_values = generate_random_files(started_cluster, files_path, 1)
611+
create_table(
612+
started_cluster,
613+
node,
614+
table_name,
615+
"ordered",
616+
files_path,
617+
additional_settings={"keeper_path": keeper_path},
618+
)
619+
create_mv(node, table_name, dst_table_name, virtual_columns="_path String, _file String, _size UInt64, _time DateTime")
620+
expected_values = set([tuple(i) for i in total_values])
621+
for i in range(20):
622+
selected_values = {
623+
tuple(map(int, l.split()))
624+
for l in node.query(
625+
f"SELECT column1, column2, column3 FROM {dst_table_name}"
626+
).splitlines()
627+
}
628+
if selected_values == expected_values:
629+
break
630+
time.sleep(1)
631+
assert selected_values == expected_values
632+
virtual_values = node.query(
633+
f"SELECT count(), _path, _file, _size, _time FROM {dst_table_name} GROUP BY _path, _file, _size, _time"
634+
).splitlines()
635+
assert len(virtual_values) > 0
636+
(_, res_path, res_file, res_size, res_time) = virtual_values[0].split("\t")
637+
finish_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
638+
assert f"{files_path}/{res_file}" == res_path
639+
assert int(res_size) > 0
640+
assert start_time <= res_time
641+
assert res_time <= finish_time

0 commit comments

Comments
 (0)