Skip to content

Commit 78f5d2e

Browse files
authored
Performance/9697143845/mvp arrow performance profiling and optimisation (#2573)
#### Reference Issues/PRs [9697143845](https://man312219.monday.com/boards/7852509418/pulses/9697143845) #### What does this implement or fix? Adds basic ASV benchmarks for Arrow reads, and improves the performance of reading strings. Eventually we can parametrize all the benchmarks with Arrow as the input/output formats, this just makes sure we don't break anything egregiously until we get to that point. #### Benchmarking findings summary Timings: - Plain read calls `lib.read(sym)` - Numeric data. Tested with 10 columns/1, 1k, 100k, 1m, and 100m rows of int64 data - Arrow performance identical to performance returning Pandas when consolidation is disabled (the default with the V2 API for Pandas >=2.0.0). This is expected, as the only real difference in this case is the output frame is made up of multiple buffers per column with Arrow, and just one with Pandas, but the timing difference of this is negligible, and the time is dominated by decoding + memcpying with both. - Pandas with consolidation enabled (the default inside Man) is strictly slower, although this not noticeable at very low row counts. - String data. Tested with 10 columns/1, 100, 10k, and 1m rows/1, 100, and 100k unique 10 character strings - No noticeable difference with <=100 rows as timings dominated by other operations - On a single core, Arrow ~x2 faster than Pandas before this change, and ~x3 faster after it - Using more cores increased the performance difference - Date range read calls `lib.read(sym, date_range=blah)` - Tested with a variety of ranges to test the various Arrow truncation code paths and Pandas post-processing - Numeric data. Tested with 10 columns/1m rows of int64 data - Additional memcpys are visible in timings - In the pathological case of excluding just 1 row from each end of the symbol, adds ~17% compared to Pandas with consolidation disabled, taking performance back to about where Pandas is with consolidation enabled - String data. Tested with 10 columns/1m rows/1, 100, and 100k unique 10 character strings - Same performance hit observed as with numeric data - But because the strings performance is faster than Pandas, overall it is still always faster to use Arrow Peak memory usage: - For reading 10 columns x 100m rows = 1B 64 bit integers (8GB of data), Arrow needs a few hundred MB in addition to the raw data size, and Pandas without consolidation is similar. Pandas with consolidation is about double as it memcpys all of the buffers we return into one larger buffer - Reading 10 columns x 1m rows = 10m strings with 1/100/100k unique strings - Arrow needs 400MB/400MB/666MB - Pandas without consolidation needs 400MB/400MB/900MB - Pandas with consolidation needs 500MB/500MB/1GB
1 parent 2785b43 commit 78f5d2e

File tree

3 files changed

+245
-16
lines changed

3 files changed

+245
-16
lines changed

cpp/arcticdb/arrow/arrow_handlers.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,41 +63,41 @@ void ArrowStringHandler::convert_type(
6363
struct DictEntry {
6464
int32_t offset_buffer_pos_;
6565
int64_t string_buffer_pos_;
66+
std::string_view strv;
6667
};
6768
std::vector<StringPool::offset_t> unique_offsets_in_order;
6869
ankerl::unordered_dense::map<StringPool::offset_t, DictEntry> unique_offsets;
70+
// Trade some memory for more performance
71+
// TODO: Use unique count column stat in V2 encoding
72+
unique_offsets_in_order.reserve(source_column.row_count());
73+
unique_offsets.reserve(source_column.row_count());
6974
int64_t bytes = 0;
75+
int32_t unique_offset_count = 0;
7076
auto dest_ptr = reinterpret_cast<int32_t*>(dest_column.bytes_at(mapping.offset_bytes_, source_column.row_count() * sizeof(int32_t)));
7177

7278
// First go through the source column once to compute the size of offset and string buffers.
7379
while(pos != end) {
74-
auto [entry, is_emplaced] = unique_offsets.try_emplace(*pos, DictEntry{static_cast<int32_t>(unique_offsets_in_order.size()), bytes});
80+
auto [entry, is_emplaced] = unique_offsets.try_emplace(*pos, DictEntry{unique_offset_count, bytes, string_pool->get_const_view(*pos)});
7581
if(is_emplaced) {
76-
bytes += string_pool->get_const_view(*pos).size();
82+
bytes += entry->second.strv.size();
7783
unique_offsets_in_order.push_back(*pos);
84+
++unique_offset_count;
7885
}
7986
++pos;
80-
*dest_ptr = entry->second.offset_buffer_pos_;
81-
++dest_ptr;
87+
*dest_ptr++ = entry->second.offset_buffer_pos_;
8288
}
8389
auto& string_buffer = dest_column.create_extra_buffer(mapping.offset_bytes_, ExtraBufferType::STRING, bytes, AllocationType::DETACHABLE);
8490
auto& offsets_buffer = dest_column.create_extra_buffer(mapping.offset_bytes_, ExtraBufferType::OFFSET, (unique_offsets_in_order.size() + 1) * sizeof(int64_t), AllocationType::DETACHABLE);
85-
8691
// Then go through unique_offsets to fill up the offset and string buffers.
8792
auto offsets_ptr = reinterpret_cast<int64_t*>(offsets_buffer.data());
8893
auto string_ptr = reinterpret_cast<char*>(string_buffer.data());
89-
auto string_begin_ptr = string_ptr;
90-
for(auto i=0u; i<unique_offsets_in_order.size(); ++i) {
91-
auto string_pool_offset = unique_offsets_in_order[i];
92-
auto& entry = unique_offsets[string_pool_offset];
93-
util::check(static_cast<int32_t>(i) == entry.offset_buffer_pos_, "Mismatch in offset buffer pos");
94-
util::check(string_ptr - string_begin_ptr == entry.string_buffer_pos_, "Mismatch in string buffer pos");
95-
offsets_ptr[i] = entry.string_buffer_pos_;
96-
const auto strv = string_pool->get_const_view(string_pool_offset);
97-
memcpy(string_ptr, strv.data(), strv.size());
98-
string_ptr += strv.size();
94+
for (auto unique_offset: unique_offsets_in_order) {
95+
const auto& entry = unique_offsets[unique_offset];
96+
*offsets_ptr++ = entry.string_buffer_pos_;
97+
memcpy(string_ptr, entry.strv.data(), entry.strv.size());
98+
string_ptr += entry.strv.size();
9999
}
100-
offsets_ptr[unique_offsets_in_order.size()] = bytes;
100+
*offsets_ptr = bytes;
101101
}
102102

103103
TypeDescriptor ArrowStringHandler::output_type(const TypeDescriptor&) const {

python/.asv/results/benchmarks.json

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,120 @@
11
{
2+
"arrow.ArrowReadNumeric.peakmem_read": {
3+
"code": "class ArrowReadNumeric:\n def peakmem_read(self, rows, date_range):\n self.lib.read(self.symbol_name(rows), date_range=self.date_range)\n\n def setup(self, rows, date_range):\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n self.lib = self.ac.get_library(self.lib_name)\n if date_range is None:\n self.date_range = None\n else:\n # Create a date range that excludes the first and last 10 rows of the data only\n self.date_range = (pd.Timestamp(10), pd.Timestamp(rows - 10))\n\n def setup_cache(self):\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n num_rows, date_ranges = self.params\n num_cols = 9 # 10 including the index column\n self.ac.delete_library(self.lib_name)\n self.ac.create_library(self.lib_name)\n lib = self.ac.get_library(self.lib_name)\n for rows in num_rows:\n df = pd.DataFrame(\n {\n f\"col{idx}\": np.arange(idx * rows, (idx + 1) * rows, dtype=np.int64) for idx in range(num_cols)\n },\n index = pd.date_range(\"1970-01-01\", freq=\"ns\", periods=rows)\n )\n lib.write(self.symbol_name(rows), df)",
4+
"name": "arrow.ArrowReadNumeric.peakmem_read",
5+
"param_names": [
6+
"rows",
7+
"date_range"
8+
],
9+
"params": [
10+
[
11+
"100000",
12+
"100000000"
13+
],
14+
[
15+
"None",
16+
"'middle'"
17+
]
18+
],
19+
"setup_cache_key": "arrow:30",
20+
"timeout": 6000,
21+
"type": "peakmemory",
22+
"unit": "bytes",
23+
"version": "f41e907f991caa155f765981fb845308e6ac55ba912a18f44b73ec87587d3667"
24+
},
25+
"arrow.ArrowReadNumeric.time_read": {
26+
"code": "class ArrowReadNumeric:\n def time_read(self, rows, date_range):\n self.lib.read(self.symbol_name(rows), date_range=self.date_range)\n\n def setup(self, rows, date_range):\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n self.lib = self.ac.get_library(self.lib_name)\n if date_range is None:\n self.date_range = None\n else:\n # Create a date range that excludes the first and last 10 rows of the data only\n self.date_range = (pd.Timestamp(10), pd.Timestamp(rows - 10))\n\n def setup_cache(self):\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n num_rows, date_ranges = self.params\n num_cols = 9 # 10 including the index column\n self.ac.delete_library(self.lib_name)\n self.ac.create_library(self.lib_name)\n lib = self.ac.get_library(self.lib_name)\n for rows in num_rows:\n df = pd.DataFrame(\n {\n f\"col{idx}\": np.arange(idx * rows, (idx + 1) * rows, dtype=np.int64) for idx in range(num_cols)\n },\n index = pd.date_range(\"1970-01-01\", freq=\"ns\", periods=rows)\n )\n lib.write(self.symbol_name(rows), df)",
27+
"min_run_count": 2,
28+
"name": "arrow.ArrowReadNumeric.time_read",
29+
"number": 5,
30+
"param_names": [
31+
"rows",
32+
"date_range"
33+
],
34+
"params": [
35+
[
36+
"100000",
37+
"100000000"
38+
],
39+
[
40+
"None",
41+
"'middle'"
42+
]
43+
],
44+
"repeat": 0,
45+
"rounds": 1,
46+
"sample_time": 0.01,
47+
"setup_cache_key": "arrow:30",
48+
"timeout": 6000,
49+
"type": "time",
50+
"unit": "seconds",
51+
"version": "2e1ac87c8fa79349da6d59f4f8618a1fdb207b72b692092d0c9c2c69c26c297f",
52+
"warmup_time": 0
53+
},
54+
"arrow.ArrowReadStrings.peakmem_read": {
55+
"code": "class ArrowReadStrings:\n def peakmem_read(self, rows, date_range, unique_string_count):\n self.lib.read(self.symbol_name(rows, unique_string_count), date_range=self.date_range)\n\n def setup(self, rows, date_range, unique_string_count):\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n self.lib = self.ac.get_library(self.lib_name)\n if date_range is None:\n self.date_range = None\n else:\n # Create a date range that excludes the first and last 10 rows of the data only\n self.date_range = (pd.Timestamp(10), pd.Timestamp(rows - 10))\n\n def setup_cache(self):\n rng = np.random.default_rng()\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n num_rows, date_ranges, unique_string_counts = self.params\n num_cols = 10\n self.ac.delete_library(self.lib_name)\n self.ac.create_library(self.lib_name)\n lib = self.ac.get_library(self.lib_name)\n for unique_string_count in unique_string_counts:\n strings = np.array(random_strings_of_length(unique_string_count, 10, unique=True))\n for rows in num_rows:\n df = pd.DataFrame(\n {\n f\"col{idx}\": rng.choice(strings, rows) for idx in range(num_cols)\n },\n index = pd.date_range(\"1970-01-01\", freq=\"ns\", periods=rows)\n )\n lib.write(self.symbol_name(rows, unique_string_count), df)",
56+
"name": "arrow.ArrowReadStrings.peakmem_read",
57+
"param_names": [
58+
"rows",
59+
"date_range",
60+
"unique_string_count"
61+
],
62+
"params": [
63+
[
64+
"10000",
65+
"1000000"
66+
],
67+
[
68+
"None",
69+
"'middle'"
70+
],
71+
[
72+
"1",
73+
"100",
74+
"100000"
75+
]
76+
],
77+
"setup_cache_key": "arrow:78",
78+
"timeout": 6000,
79+
"type": "peakmemory",
80+
"unit": "bytes",
81+
"version": "daf31f0aa67cce7ace0d495e3648983ba46ca8cb5a4184f5875421398de5a862"
82+
},
83+
"arrow.ArrowReadStrings.time_read": {
84+
"code": "class ArrowReadStrings:\n def time_read(self, rows, date_range, unique_string_count):\n self.lib.read(self.symbol_name(rows, unique_string_count), date_range=self.date_range)\n\n def setup(self, rows, date_range, unique_string_count):\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n self.lib = self.ac.get_library(self.lib_name)\n if date_range is None:\n self.date_range = None\n else:\n # Create a date range that excludes the first and last 10 rows of the data only\n self.date_range = (pd.Timestamp(10), pd.Timestamp(rows - 10))\n\n def setup_cache(self):\n rng = np.random.default_rng()\n self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)\n num_rows, date_ranges, unique_string_counts = self.params\n num_cols = 10\n self.ac.delete_library(self.lib_name)\n self.ac.create_library(self.lib_name)\n lib = self.ac.get_library(self.lib_name)\n for unique_string_count in unique_string_counts:\n strings = np.array(random_strings_of_length(unique_string_count, 10, unique=True))\n for rows in num_rows:\n df = pd.DataFrame(\n {\n f\"col{idx}\": rng.choice(strings, rows) for idx in range(num_cols)\n },\n index = pd.date_range(\"1970-01-01\", freq=\"ns\", periods=rows)\n )\n lib.write(self.symbol_name(rows, unique_string_count), df)",
85+
"min_run_count": 2,
86+
"name": "arrow.ArrowReadStrings.time_read",
87+
"number": 5,
88+
"param_names": [
89+
"rows",
90+
"date_range",
91+
"unique_string_count"
92+
],
93+
"params": [
94+
[
95+
"10000",
96+
"1000000"
97+
],
98+
[
99+
"None",
100+
"'middle'"
101+
],
102+
[
103+
"1",
104+
"100",
105+
"100000"
106+
]
107+
],
108+
"repeat": 0,
109+
"rounds": 1,
110+
"sample_time": 0.01,
111+
"setup_cache_key": "arrow:78",
112+
"timeout": 6000,
113+
"type": "time",
114+
"unit": "seconds",
115+
"version": "5879913710f02f75634c0d679be8af1f8ec3e017af5a6300c3ee965779f5ffbb",
116+
"warmup_time": 0
117+
},
2118
"basic_functions.BasicFunctions.peakmem_read": {
3119
"code": "class BasicFunctions:\n def peakmem_read(self, rows):\n self.lib.read(f\"sym\").data\n\n def setup(self, rows):\n self.ac = Arctic(BasicFunctions.CONNECTION_STRING)\n \n self.df = generate_pseudo_random_dataframe(rows)\n self.df_short_wide = generate_random_floats_dataframe(BasicFunctions.WIDE_DF_ROWS, BasicFunctions.WIDE_DF_COLS)\n \n self.lib = self.ac[get_prewritten_lib_name(rows)]\n self.fresh_lib = self.get_fresh_lib()\n\n def setup_cache(self):\n self.ac = Arctic(BasicFunctions.CONNECTION_STRING)\n rows_values = BasicFunctions.params\n \n self.dfs = {rows: generate_pseudo_random_dataframe(rows) for rows in rows_values}\n for rows in rows_values:\n lib = get_prewritten_lib_name(rows)\n self.ac.delete_library(lib)\n self.ac.create_library(lib)\n lib = self.ac[lib]\n lib.write(f\"sym\", self.dfs[rows])\n \n lib_name = get_prewritten_lib_name(BasicFunctions.WIDE_DF_ROWS)\n self.ac.delete_library(lib_name)\n lib = self.ac.create_library(lib_name)\n lib.write(\n \"short_wide_sym\",\n generate_random_floats_dataframe(BasicFunctions.WIDE_DF_ROWS, BasicFunctions.WIDE_DF_COLS),\n )\n \n lib_name = get_prewritten_lib_name(BasicFunctions.ULTRA_SHORT_WIDE_DF_ROWS)\n self.ac.delete_library(lib_name)\n lib = self.ac.create_library(lib_name)\n lib.write(\n \"ultra_short_wide_sym\",\n generate_random_floats_dataframe(BasicFunctions.ULTRA_SHORT_WIDE_DF_ROWS, BasicFunctions.WIDE_DF_COLS),\n )",
4120
"name": "basic_functions.BasicFunctions.peakmem_read",

python/benchmarks/arrow.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
"""
2+
Copyright 2025 Man Group Operations Limited
3+
4+
Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
5+
6+
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
7+
"""
8+
9+
10+
import numpy as np
11+
import pandas as pd
12+
13+
from arcticdb import Arctic, OutputFormat
14+
from arcticdb.util.test import random_strings_of_length
15+
16+
17+
class ArrowReadNumeric:
18+
number = 5
19+
warmup_time = 0
20+
timeout = 6000
21+
rounds = 1
22+
connection_string = "lmdb://arrow_read_numeric?map_size=20GB"
23+
lib_name = "arrow_read_numeric"
24+
params = ([100_000, 100_000_000], [None, "middle"])
25+
param_names = ["rows", "date_range"]
26+
27+
def symbol_name(self, num_rows: int):
28+
return f"numeric_{num_rows}_rows"
29+
30+
def setup_cache(self):
31+
self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)
32+
num_rows, date_ranges = self.params
33+
num_cols = 9 # 10 including the index column
34+
self.ac.delete_library(self.lib_name)
35+
self.ac.create_library(self.lib_name)
36+
lib = self.ac.get_library(self.lib_name)
37+
for rows in num_rows:
38+
df = pd.DataFrame(
39+
{
40+
f"col{idx}": np.arange(idx * rows, (idx + 1) * rows, dtype=np.int64) for idx in range(num_cols)
41+
},
42+
index = pd.date_range("1970-01-01", freq="ns", periods=rows)
43+
)
44+
lib.write(self.symbol_name(rows), df)
45+
46+
def teardown(self, rows, date_range):
47+
del self.ac
48+
49+
def setup(self, rows, date_range):
50+
self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)
51+
self.lib = self.ac.get_library(self.lib_name)
52+
if date_range is None:
53+
self.date_range = None
54+
else:
55+
# Create a date range that excludes the first and last 10 rows of the data only
56+
self.date_range = (pd.Timestamp(10), pd.Timestamp(rows - 10))
57+
58+
def time_read(self, rows, date_range):
59+
self.lib.read(self.symbol_name(rows), date_range=self.date_range)
60+
61+
def peakmem_read(self, rows, date_range):
62+
self.lib.read(self.symbol_name(rows), date_range=self.date_range)
63+
64+
65+
class ArrowReadStrings:
66+
number = 5
67+
warmup_time = 0
68+
timeout = 6000
69+
rounds = 1
70+
connection_string = "lmdb://arrow_read_strings?map_size=20GB"
71+
lib_name = "arrow_read_strings"
72+
params = ([10_000, 1_000_000], [None, "middle"], [1, 100, 100_000])
73+
param_names = ["rows", "date_range", "unique_string_count"]
74+
75+
def symbol_name(self, num_rows: int, unique_strings: int):
76+
return f"string_{num_rows}_rows_{unique_strings}_unique_strings"
77+
78+
def setup_cache(self):
79+
rng = np.random.default_rng()
80+
self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)
81+
num_rows, date_ranges, unique_string_counts = self.params
82+
num_cols = 10
83+
self.ac.delete_library(self.lib_name)
84+
self.ac.create_library(self.lib_name)
85+
lib = self.ac.get_library(self.lib_name)
86+
for unique_string_count in unique_string_counts:
87+
strings = np.array(random_strings_of_length(unique_string_count, 10, unique=True))
88+
for rows in num_rows:
89+
df = pd.DataFrame(
90+
{
91+
f"col{idx}": rng.choice(strings, rows) for idx in range(num_cols)
92+
},
93+
index = pd.date_range("1970-01-01", freq="ns", periods=rows)
94+
)
95+
lib.write(self.symbol_name(rows, unique_string_count), df)
96+
97+
def teardown(self, rows, date_range, unique_string_count):
98+
del self.ac
99+
100+
def setup(self, rows, date_range, unique_string_count):
101+
self.ac = Arctic(self.connection_string, output_format=OutputFormat.EXPERIMENTAL_ARROW)
102+
self.lib = self.ac.get_library(self.lib_name)
103+
if date_range is None:
104+
self.date_range = None
105+
else:
106+
# Create a date range that excludes the first and last 10 rows of the data only
107+
self.date_range = (pd.Timestamp(10), pd.Timestamp(rows - 10))
108+
109+
def time_read(self, rows, date_range, unique_string_count):
110+
self.lib.read(self.symbol_name(rows, unique_string_count), date_range=self.date_range)
111+
112+
def peakmem_read(self, rows, date_range, unique_string_count):
113+
self.lib.read(self.symbol_name(rows, unique_string_count), date_range=self.date_range)

0 commit comments

Comments
 (0)