|
12 | 12 | import pytest |
13 | 13 | import requests |
14 | 14 | import urllib3 |
| 15 | +from datetime import datetime, timedelta |
15 | 16 | from minio import Minio |
16 | 17 | from pyiceberg.catalog import load_catalog |
17 | 18 | from pyiceberg.partitioning import PartitionField, PartitionSpec |
18 | 19 | from pyiceberg.schema import Schema |
19 | 20 | from pyiceberg.table.sorting import SortField, SortOrder |
20 | 21 | from pyiceberg.transforms import DayTransform, IdentityTransform |
| 22 | +from helpers.config_cluster import minio_access_key, minio_secret_key |
| 23 | +import decimal |
21 | 24 | from pyiceberg.types import ( |
22 | 25 | DoubleType, |
23 | | - FloatType, |
24 | 26 | NestedField, |
25 | 27 | StringType, |
26 | 28 | StructType, |
27 | 29 | TimestampType, |
| 30 | + MapType, |
| 31 | + DecimalType, |
28 | 32 | ) |
29 | 33 |
|
30 | 34 | from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm |
|
39 | 43 | BASE_URL = "http://glue:3000" |
40 | 44 | BASE_URL_LOCAL_HOST = "http://localhost:3000" |
41 | 45 |
|
| 46 | +def generate_decimal(precision=9, scale=2): |
| 47 | + max_value = 10**(precision - scale) - 1 |
| 48 | + value = random.uniform(0, max_value) |
| 49 | + return round(decimal.Decimal(value), scale) |
| 50 | + |
42 | 51 | DEFAULT_SCHEMA = Schema( |
43 | 52 | NestedField( |
44 | 53 | field_id=1, name="datetime", field_type=TimestampType(), required=False |
|
59 | 68 | ), |
60 | 69 | required=False, |
61 | 70 | ), |
| 71 | + NestedField( |
| 72 | + field_id=6, |
| 73 | + name="map_string_decimal", |
| 74 | + field_type=MapType( |
| 75 | + key_type=StringType(), |
| 76 | + value_type=DecimalType(9, 2), |
| 77 | + key_id=7, |
| 78 | + value_id=8, |
| 79 | + value_required=False, |
| 80 | + ), |
| 81 | + required=False, |
| 82 | + ), |
62 | 83 | ) |
63 | 84 |
|
64 | | -DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" |
| 85 | +DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\\n(\\n `datetime` Nullable(DateTime64(6)),\\n `symbol` Nullable(String),\\n `bid` Nullable(Float64),\\n `ask` Nullable(Float64),\\n `details` Tuple(created_by Nullable(String)),\\n `map_string_decimal` Map(String, Nullable(Decimal(9, 2)))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-glue/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" |
65 | 86 |
|
66 | 87 | DEFAULT_PARTITION_SPEC = PartitionSpec( |
67 | 88 | PartitionField( |
@@ -107,15 +128,59 @@ def create_table( |
107 | 128 | ) |
108 | 129 |
|
109 | 130 |
|
110 | | -def generate_record(): |
111 | | - return { |
112 | | - "datetime": datetime.now(), |
113 | | - "symbol": str("kek"), |
114 | | - "bid": round(random.uniform(100, 200), 2), |
115 | | - "ask": round(random.uniform(200, 300), 2), |
116 | | - "details": {"created_by": "Alice Smith"}, |
117 | | - } |
118 | 131 |
|
| 132 | +def generate_arrow_data(num_rows=5): |
| 133 | + datetimes = [] |
| 134 | + symbols = [] |
| 135 | + bids = [] |
| 136 | + asks = [] |
| 137 | + details_created_by = [] |
| 138 | + map_keys = [] |
| 139 | + map_values = [] |
| 140 | + |
| 141 | + offsets = [0] |
| 142 | + |
| 143 | + for _ in range(num_rows): |
| 144 | + datetimes.append(datetime.utcnow() - timedelta(minutes=random.randint(0, 60))) |
| 145 | + symbols.append(random.choice(["AAPL", "GOOG", "MSFT"])) |
| 146 | + bids.append(random.uniform(100, 150)) |
| 147 | + asks.append(random.uniform(150, 200)) |
| 148 | + details_created_by.append(random.choice(["alice", "bob", "carol"])) |
| 149 | + |
| 150 | + # map<string, decimal(9,2)> |
| 151 | + keys = [] |
| 152 | + values = [] |
| 153 | + for i in range(random.randint(1, 3)): |
| 154 | + keys.append(f"key{i}") |
| 155 | + values.append(generate_decimal()) |
| 156 | + map_keys.extend(keys) |
| 157 | + map_values.extend(values) |
| 158 | + offsets.append(offsets[-1] + len(keys)) |
| 159 | + |
| 160 | + # Struct for 'details' |
| 161 | + struct_array = pa.StructArray.from_arrays( |
| 162 | + [pa.array(details_created_by, type=pa.string())], |
| 163 | + names=["created_by"] |
| 164 | + ) |
| 165 | + |
| 166 | + # Map array |
| 167 | + map_array = pa.MapArray.from_arrays( |
| 168 | + offsets=pa.array(offsets, type=pa.int32()), |
| 169 | + keys=pa.array(map_keys, type=pa.string()), |
| 170 | + items=pa.array(map_values, type=pa.decimal128(9, 2)) |
| 171 | + ) |
| 172 | + |
| 173 | + # Final table |
| 174 | + table = pa.table({ |
| 175 | + "datetime": pa.array(datetimes, type=pa.timestamp("us")), |
| 176 | + "symbol": pa.array(symbols, type=pa.string()), |
| 177 | + "bid": pa.array(bids, type=pa.float64()), |
| 178 | + "ask": pa.array(asks, type=pa.float64()), |
| 179 | + "details": struct_array, |
| 180 | + "map_string_decimal": map_array, |
| 181 | + }) |
| 182 | + |
| 183 | + return table |
119 | 184 |
|
120 | 185 | def create_clickhouse_glue_database( |
121 | 186 | started_cluster, node, name, additional_settings={} |
@@ -259,8 +324,7 @@ def test_select(started_cluster): |
259 | 324 | table = create_table(catalog, namespace, table_name) |
260 | 325 |
|
261 | 326 | num_rows = 10 |
262 | | - data = [generate_record() for _ in range(num_rows)] |
263 | | - df = pa.Table.from_pylist(data) |
| 327 | + df = generate_arrow_data(num_rows) |
264 | 328 | table.append(df) |
265 | 329 |
|
266 | 330 | create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME) |
|
0 commit comments