Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit a573239

Browse files
committed
l0 async fetch
1 parent a46a5e2 commit a573239

File tree

17 files changed

+578
-24
lines changed

17 files changed

+578
-24
lines changed
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 1,
6+
"metadata": {},
7+
"outputs": [],
8+
"source": [
9+
"# Initialization\n",
10+
"import pyhdk \n",
11+
"import pandas\n",
12+
"import time\n",
13+
"import pyarrow as pa\n",
14+
"import pyarrow.csv\n",
15+
"import os, sys\n",
16+
"\n",
17+
"config = pyhdk.buildConfig(enable_heterogeneous=True,\n",
18+
" force_heterogeneous_distribution=True,\n",
19+
" enable_multifrag_heterogeneous=True,\n",
20+
" enable_debug_timer=True,\n",
21+
" )\n",
22+
"pyhdk.initLogger(log_severity=\"INFO\")\n",
23+
"storage = pyhdk.storage.ArrowStorage(1)\n",
24+
"data_mgr = pyhdk.storage.DataMgr(config)\n",
25+
"data_mgr.registerDataProvider(storage)\n",
26+
"\n",
27+
"calcite = pyhdk.sql.Calcite(storage, config)\n",
28+
"executor = pyhdk.Executor(data_mgr, config)"
29+
]
30+
},
31+
{
32+
"cell_type": "code",
33+
"execution_count": 2,
34+
"metadata": {},
35+
"outputs": [],
36+
"source": [
37+
"# Helper Functions\n",
38+
"default_step = 50\n",
39+
"default_iters = 3\n",
40+
"\n",
41+
"def get_rel_alg(sql):\n",
42+
" return calcite.process(sql)\n",
43+
"\n",
44+
"def run_query(sql):\n",
45+
" ra = get_rel_alg(sql)\n",
46+
" # One RelAlgExecutor per query\n",
47+
" rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n",
48+
" return rel_alg_executor.execute()\n",
49+
"\n",
50+
"\n",
51+
"def import_hdk_pyarrow(storage, arrow_table, hdk_table_name, fragment_size, overwrite=True):\n",
52+
" \"\"\"\n",
53+
" Imports a pyarrow table to HDK with the given fragment size.\n",
54+
" overwrite: By default overwrites previously existing table.\n",
55+
" \"\"\"\n",
56+
" opt = pyhdk.storage.TableOptions(fragment_size)\n",
57+
" start_timer = time.perf_counter()\n",
58+
" try:\n",
59+
" storage.importArrowTable(arrow_table, hdk_table_name, opt)\n",
60+
" except:\n",
61+
" if not overwrite:\n",
62+
" raise Exception(f\"Cannot overwrite table{hdk_table_name}, overwrite={overwrite}\")\n",
63+
" storage.dropTable(hdk_table_name)\n",
64+
" storage.importArrowTable(arrow_table, hdk_table_name, opt)\n",
65+
" print(f\"[PyHDK] Importing pyarrow table: {(time.perf_counter()-start_timer):.4f}s\")\n",
66+
"\n",
67+
"\n",
68+
"def run_query_het_all_props(sql, query_name=\"\", prop_step=default_step, n_iters=default_iters, clear_memory_devices=[]):\n",
69+
" \"\"\"\n",
70+
" Runs SQL query multiple times at each proportion, feel free try and experiment with loops order.\n",
71+
" clear_memory_devices: clear memory of the device manager: 1:CPU, 2:GPU \n",
72+
" \"\"\"\n",
73+
" cython_enum_dict = {\"CPU\":1, \"GPU\":2} # May move up to cython for easier interface\n",
74+
" ra = get_rel_alg(sql)\n",
75+
" col_names = [\"GPU_prop\", f\"QueryT_{query_name}\"]\n",
76+
" prop_time = {col_names[0] : [], col_names[1]: []}\n",
77+
" # Walking over proportions\n",
78+
" for gpu_proportion in range(0, 101, prop_step):\n",
79+
" # Multiple iterations\n",
80+
" for _ in range(1, n_iters + 1):\n",
81+
" rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n",
82+
" query_start = time.perf_counter()\n",
83+
" result = rel_alg_executor.execute(forced_gpu_proportion=gpu_proportion)\n",
84+
" query_finish = time.perf_counter()\n",
85+
" prop_time[col_names[0]].append(gpu_proportion)\n",
86+
" prop_time[col_names[1]].append(query_finish - query_start)\n",
87+
" [executor.clearMemory(data_mgr, cython_enum_dict[device]) for device in clear_memory_devices]\n",
88+
"\n",
89+
" df_prop_time = pandas.DataFrame(prop_time, columns=col_names)\n",
90+
" # Some metadata to get idea about the output cardinality\n",
91+
" df_output = result.to_arrow().to_pandas()\n",
92+
" output_size_KB = df_output.memory_usage(index=True).sum() // (1024)\n",
93+
" df_prop_time.rename(columns={col_names[1]:f\"{col_names[1]}_{output_size_KB}KB\"}, inplace=True)\n",
94+
" return [df_prop_time, df_output]\n",
95+
"\n",
96+
"def run_queries_all_props(query_dict, step=default_step, n_iters=default_iters, clear_memory_devices=[]):\n",
97+
" \"\"\"\n",
98+
" Runs query dictionary of SQL queries with the following structure: dict(query_name:{SQL_string})\n",
99+
" clear_memory_devices: clear memory of the device manager after each query: \"CPU\", \"GPU\" \n",
100+
" \"\"\"\n",
101+
" q_timings_dict = dict()\n",
102+
" for q_name in query_dict:\n",
103+
" [df_prop_time, df_output] = run_query_het_all_props(query_dict[q_name], \n",
104+
" query_name=q_name, \n",
105+
" prop_step=step, \n",
106+
" n_iters=n_iters, \n",
107+
" clear_memory_devices=clear_memory_devices)\n",
108+
" df_prop_time.set_index(\"GPU_prop\", inplace=True)\n",
109+
" q_timings_dict[q_name] = (df_prop_time)\n",
110+
" return q_timings_dict\n",
111+
"\n",
112+
"def fragment_size_calc(num_rows):\n",
113+
" \"\"\"Taken from Modin, you can experiment with it.\"\"\"\n",
114+
" cpu_count = os.cpu_count()\n",
115+
" if cpu_count is not None:\n",
116+
" fragment_size = num_rows // cpu_count\n",
117+
" fragment_size = min(fragment_size, 2**25)\n",
118+
" fragment_size = max(fragment_size, 2**18)\n",
119+
" return fragment_size\n",
120+
" else:\n",
121+
" return None\n",
122+
"\n",
123+
"def fragment_size_test_range(num_rows):\n",
124+
" \"\"\"\n",
125+
" Take two power of two steps around default frag_size: [x/4,x/2,x,x*2,x*4].\n",
126+
" \"\"\"\n",
127+
" res_range = []\n",
128+
" default_fragment_size = fragment_size_calc(num_rows)\n",
129+
" print(f\"Default fragment_size={default_fragment_size}\")\n",
130+
" power_two_steps = 2\n",
131+
" range_start = default_fragment_size//(2**power_two_steps)\n",
132+
" range_end = default_fragment_size*(2**power_two_steps)\n",
133+
" fragment_size = range_start\n",
134+
" while fragment_size < range_end+1:\n",
135+
" res_range.append(fragment_size)\n",
136+
" fragment_size *= 2\n",
137+
" return res_range\n",
138+
"\n",
139+
"def test_groups_fragment_sizes(storage, pyarrow_tbl, table_name, get_q_dict_callback, step, n_iters, clear_memory_devices=[]):\n",
140+
" \"\"\" \n",
141+
" Produces the follwing result grouping: fragment_size{query_name{timings_df}}\n",
142+
" \"\"\"\n",
143+
" part_group_timings_dict = dict()\n",
144+
" for frag_size in fragment_size_test_range(pyarrow_tbl.num_rows):\n",
145+
" table_size_MB = pyarrow_tbl.nbytes // (1024*1024)\n",
146+
" print(f\"Testing {table_size_MB}MB Table with Frag.size={frag_size}\")\n",
147+
" refragmented_view_name = f\"{table_name}_{frag_size}\"\n",
148+
" storage.createRefragmentedView(table_name, refragmented_view_name, frag_size)\n",
149+
" part_group_timings_dict[f\"Tbl_size_{table_size_MB}MB_frag_size_{frag_size}\"] = run_queries_all_props(get_q_dict_callback(refragmented_view_name), step, n_iters, clear_memory_devices)\n",
150+
" storage.dropTable(refragmented_view_name)\n",
151+
" return part_group_timings_dict"
152+
]
153+
},
154+
{
155+
"cell_type": "code",
156+
"execution_count": 3,
157+
"metadata": {},
158+
"outputs": [
159+
{
160+
"name": "stdout",
161+
"output_type": "stream",
162+
"text": [
163+
"One column of 100.0 Mil. rows takes 762.939453125 MB\n",
164+
"Chunk size: 40000\n"
165+
]
166+
}
167+
],
168+
"source": [
169+
"import numpy as np\n",
170+
"\n",
171+
"num_groups = [100 * i for i in range(1,8)] #[500, 1000, 2000, 5000, 10000]#, 4000, 30000, 20000, 1000000]\n",
172+
"# # num_groups = [200, 512, 513, 1000, 2000, 5000, 10000] #4000, 10000, 20000, 1000000]\n",
173+
"total_rows = int(100*1000*1000)\n",
174+
"table_columns = []\n",
175+
"column_data = []\n",
176+
"chunk_size = 40000\n",
177+
"for group in num_groups:\n",
178+
" groups = np.random.randint(0, group, total_rows)\n",
179+
" column_name = f\"group_{group}\"\n",
180+
" chunks = [pa.array(groups[i:i+chunk_size], pa.int64()) for i in range(0, len(groups), chunk_size)]\n",
181+
" column = pa.chunked_array(chunks)\n",
182+
" table_columns.append(pa.field(column_name, column.type))\n",
183+
" column_data.append(column)\n",
184+
" \n",
185+
"aggregated_data = np.random.uniform(0.0, 1000000.0, total_rows).astype(np.float64)\n",
186+
"chunks = [pa.array(aggregated_data[i:i+chunk_size], pa.float64()) for i in range(0, len(aggregated_data), chunk_size)]\n",
187+
"aggregated_column = pa.chunked_array(chunks)\n",
188+
"\n",
189+
"# aggregated_data = np.random.randint(0, 1000000, total_rows)\n",
190+
"# aggregated_column = pa.array(aggregated_data, pa.int64())\n",
191+
"table_columns.append(pa.field(\"aggregated_data\", aggregated_column.type))\n",
192+
"column_data.append(aggregated_column)\n",
193+
"\n",
194+
"table_schema = pa.schema(table_columns)\n",
195+
"groups_tbl = pa.Table.from_arrays(column_data, schema=table_schema)\n",
196+
"print(f\"One column of {total_rows/(1000000)} Mil. rows takes {(total_rows*8)/(1024*1024)} MB\")\n",
197+
"print(f\"Chunk size: {len(groups_tbl.column(0).chunks[0])}\")"
198+
]
199+
},
200+
{
201+
"cell_type": "code",
202+
"execution_count": 4,
203+
"metadata": {},
204+
"outputs": [
205+
{
206+
"name": "stdout",
207+
"output_type": "stream",
208+
"text": [
209+
"[PyHDK] Importing pyarrow table: 0.2104s\n"
210+
]
211+
}
212+
],
213+
"source": [
214+
"default_fragment_size = fragment_size_calc(groups_tbl.num_rows)\n",
215+
"import_hdk_pyarrow(storage, groups_tbl, f\"groups_table\", groups_tbl.num_rows//200)"
216+
]
217+
},
218+
{
219+
"cell_type": "code",
220+
"execution_count": 5,
221+
"metadata": {},
222+
"outputs": [
223+
{
224+
"name": "stdout",
225+
"output_type": "stream",
226+
"text": [
227+
" EXPR$0\n",
228+
"0 100000000\n",
229+
" EXPR$0\n",
230+
"0 100000000\n",
231+
" EXPR$0\n",
232+
"0 100000000\n",
233+
" EXPR$0\n",
234+
"0 100000000\n",
235+
" EXPR$0\n",
236+
"0 100000000\n",
237+
" EXPR$0\n",
238+
"0 100000000\n",
239+
" EXPR$0\n",
240+
"0 100000000\n"
241+
]
242+
}
243+
],
244+
"source": [
245+
"for group in num_groups[:8]:\n",
246+
" # ra = get_rel_alg(f\"SELECT MIN({groups_tbl.column_names[-1]}), MAX({groups_tbl.column_names[-1]}) FROM groups_table GROUP BY group_{group};\")\n",
247+
" ra = get_rel_alg(f\"SELECT SUM(x) FROM (SELECT COUNT(*) x FROM groups_table GROUP BY group_{group});\")\n",
248+
" gpu_prop = 100\n",
249+
" rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n",
250+
" result = rel_alg_executor.execute(forced_gpu_proportion=50)\n",
251+
" print(result.to_arrow().to_pandas())"
252+
]
253+
},
254+
{
255+
"cell_type": "code",
256+
"execution_count": 6,
257+
"metadata": {},
258+
"outputs": [],
259+
"source": [
260+
"executor.clearMemory(data_mgr, 2)"
261+
]
262+
},
263+
{
264+
"cell_type": "code",
265+
"execution_count": 1,
266+
"metadata": {},
267+
"outputs": [],
268+
"source": [
269+
"# HDK Cleanup\n",
270+
"storage.dropTable(table_name)"
271+
]
272+
}
273+
],
274+
"metadata": {
275+
"kernelspec": {
276+
"display_name": "omnisci-dev",
277+
"language": "python",
278+
"name": "python3"
279+
},
280+
"language_info": {
281+
"codemirror_mode": {
282+
"name": "ipython",
283+
"version": 3
284+
},
285+
"file_extension": ".py",
286+
"mimetype": "text/x-python",
287+
"name": "python",
288+
"nbconvert_exporter": "python",
289+
"pygments_lexer": "ipython3",
290+
"version": "3.9.16"
291+
},
292+
"orig_nbformat": 4
293+
},
294+
"nbformat": 4,
295+
"nbformat_minor": 2
296+
}

omniscidb/CudaMgr/CudaMgr.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,17 @@ void CudaMgr::copyHostToDevice(int8_t* device_ptr,
112112
cuMemcpyHtoD(reinterpret_cast<CUdeviceptr>(device_ptr), host_ptr, num_bytes));
113113
}
114114

115+
void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
116+
const int8_t* host_ptr,
117+
const size_t num_bytes,
118+
const int device_num) {
119+
if constexpr (async_data_load_available) {
120+
copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_num);
121+
} else {
122+
copyHostToDevice(device_ptr, host_ptr, num_bytes, device_num);
123+
}
124+
}
125+
115126
void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
116127
const int8_t* host_ptr,
117128
const size_t num_bytes,

omniscidb/CudaMgr/CudaMgr.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ class CudaMgr : public GpuMgr {
9696
const size_t num_bytes,
9797
const int device_num) override;
9898

99+
void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
100+
const int8_t* host_ptr,
101+
const size_t num_bytes,
102+
const int device_num) override;
103+
99104
void copyHostToDeviceAsync(int8_t* device_ptr,
100105
const int8_t* host_ptr,
101106
const size_t num_bytes,

omniscidb/CudaMgr/CudaMgrNoCuda.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ void CudaMgr::copyHostToDeviceAsync(int8_t* device_ptr,
4343
CHECK(false);
4444
}
4545

46+
void CudaMgr::copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
47+
const int8_t* host_ptr,
48+
const size_t num_bytes,
49+
const int device_num) {
50+
CHECK(false);
51+
}
52+
4653
void CudaMgr::synchronizeStream(const int device_num) {
4754
CHECK(false);
4855
}

omniscidb/DataMgr/Allocators/DeviceAllocator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,5 @@ class DeviceAllocator : public Allocator {
5353
virtual void setDeviceMem(int8_t* device_ptr,
5454
unsigned char uc,
5555
const size_t num_bytes) const = 0;
56+
virtual void sync() = 0;
5657
};

omniscidb/DataMgr/Allocators/GpuAllocator.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,7 @@ void GpuAllocator::setDeviceMem(int8_t* device_ptr,
8484
const size_t num_bytes) const {
8585
buffer_provider_->setDeviceMem(device_ptr, uc, num_bytes, device_id_);
8686
}
87+
88+
void GpuAllocator::sync(){
89+
buffer_provider_->synchronizeStream(device_id_);
90+
}

omniscidb/DataMgr/Allocators/GpuAllocator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class GpuAllocator : public DeviceAllocator {
5858
void setDeviceMem(int8_t* device_ptr,
5959
unsigned char uc,
6060
const size_t num_bytes) const override;
61+
void sync() override;
6162

6263
private:
6364
std::vector<Data_Namespace::AbstractBuffer*> owned_buffers_;

omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ void CpuBuffer::readData(int8_t* const dst,
4848
memcpy(dst, mem_ + offset, num_bytes);
4949
} else if (dst_memory_level == GPU_LEVEL) {
5050
CHECK_GE(dst_device_id, 0);
51-
gpu_mgr_->copyHostToDevice(dst, mem_ + offset, num_bytes, dst_device_id);
51+
gpu_mgr_->copyHostToDeviceAsyncIfPossible(
52+
dst, mem_ + offset, num_bytes, dst_device_id);
5253
} else {
5354
LOG(FATAL) << "Unsupported buffer type";
5455
}

omniscidb/DataMgr/DataMgrBufferProvider.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,7 @@ void DataMgrBufferProvider::copyToDeviceAsyncIfPossible(int8_t* device_ptr,
5757
CHECK(data_mgr_);
5858
const auto gpu_mgr = data_mgr_->getGpuMgr();
5959
CHECK(gpu_mgr);
60-
if (gpu_mgr->canLoadAsync()) {
61-
gpu_mgr->copyHostToDeviceAsync(device_ptr, host_ptr, num_bytes, device_id);
62-
} else {
63-
gpu_mgr->copyHostToDevice(device_ptr, host_ptr, num_bytes, device_id);
64-
}
60+
gpu_mgr->copyHostToDeviceAsyncIfPossible(device_ptr, host_ptr, num_bytes, device_id);
6561
}
6662

6763
void DataMgrBufferProvider::synchronizeStream(const int device_num) const {

omniscidb/DataMgr/GpuMgr.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ struct GpuMgr {
3838
const size_t num_bytes,
3939
const int device_num) = 0;
4040

41+
virtual void copyHostToDeviceAsyncIfPossible(int8_t* device_ptr,
42+
const int8_t* host_ptr,
43+
const size_t num_bytes,
44+
const int device_num) = 0;
45+
4146
virtual void synchronizeStream(const int device_num) = 0;
4247

4348
virtual void copyDeviceToHost(int8_t* host_ptr,

0 commit comments

Comments
 (0)