Skip to content

Commit 12b43e3

Browse files
committed
PAX: optimize io read for multiple discrete columns in a group
When reading multiple discrete columns in a group, the code reads columnar data block by block in synchronous mode. It means that all I/O requests on the columnar data are completed in a serialized manner, which is low efficient. This commit uses iouring to submit a batch of IO request to allow OS optimizes IO in parallel for better throughput. libaio is another candidate. But it doesn't bring improvement in our benchmark test(without O_DIRECT).
1 parent 9210269 commit 12b43e3

File tree

10 files changed

+314
-8
lines changed

10 files changed

+314
-8
lines changed

contrib/pax_storage/doc/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ PAX will be built with `--enable-pax` when you build the Cloudberry. Dependency
4646
- **CMake**: 3.11 or later
4747
- **Protobuf**: 3.5.0 or later
4848
- **ZSTD (libzstd)**: 1.4.0 or later
49+
- **liburing**: 2.12 or later
4950

5051
Also, you need to run the following command at the top level of the Cloudberry source code directory to download the submodules:
5152

contrib/pax_storage/src/cpp/cmake/pax.cmake

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ set(pax_comm_src
3030
comm/bitmap.cc
3131
comm/bloomfilter.cc
3232
comm/byte_buffer.cc
33+
comm/fast_io.cc
3334
comm/guc.cc
3435
comm/paxc_wrappers.cc
3536
comm/pax_memory.cc
@@ -173,7 +174,7 @@ add_subdirectory(contrib/tabulate)
173174
set(pax_target_src ${PROTO_SRCS} ${pax_storage_src} ${pax_clustering_src} ${pax_exceptions_src}
174175
${pax_access_src} ${pax_comm_src} ${pax_catalog_src} ${pax_vec_src})
175176
set(pax_target_include ${pax_target_include} ${ZTSD_HEADER} ${CMAKE_CURRENT_SOURCE_DIR} ${CBDB_INCLUDE_DIR} contrib/tabulate/include)
176-
set(pax_target_link_libs ${pax_target_link_libs} protobuf zstd z postgres)
177+
set(pax_target_link_libs ${pax_target_link_libs} protobuf zstd z postgres uring)
177178
if (PAX_USE_LZ4)
178179
list(APPEND pax_target_link_libs lz4)
179180
endif()
@@ -207,7 +208,7 @@ endif(VEC_BUILD)
207208

208209
target_include_directories(pax PUBLIC ${pax_target_include})
209210
target_link_directories(pax PUBLIC ${pax_target_link_directories})
210-
target_link_libraries(pax PUBLIC ${pax_target_link_libs})
211+
target_link_libraries(pax PRIVATE ${pax_target_link_libs})
211212
set_target_properties(pax PROPERTIES
212213
BUILD_RPATH_USE_ORIGIN ON
213214
BUILD_WITH_INSTALL_RPATH ON
@@ -233,8 +234,8 @@ if (BUILD_GTEST)
233234
add_dependencies(test_main ${pax_target_dependencies} gtest gmock)
234235
target_include_directories(test_main PUBLIC ${pax_target_include} ${CMAKE_CURRENT_SOURCE_DIR} ${gtest_SOURCE_DIR}/include contrib/cpp-stub/src/ contrib/cpp-stub/src_linux/)
235236

236-
target_link_directories(test_main PUBLIC ${pax_target_link_directories})
237-
target_link_libraries(test_main PUBLIC ${pax_target_link_libs} gtest gmock postgres)
237+
target_link_directories(test_main PRIVATE ${pax_target_link_directories})
238+
target_link_libraries(test_main PRIVATE ${pax_target_link_libs} gtest gmock postgres)
238239
endif(BUILD_GTEST)
239240

240241
if(BUILD_GBENCH)

contrib/pax_storage/src/cpp/cmake/pax_format.cmake

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ set(pax_comm_src
2020
comm/bitmap.cc
2121
comm/bloomfilter.cc
2222
comm/byte_buffer.cc
23+
comm/fast_io.cc
2324
comm/guc.cc
2425
comm/paxc_wrappers.cc
2526
comm/pax_memory.cc
@@ -108,7 +109,7 @@ set(pax_vec_src ${pax_vec_src}
108109
endif()
109110

110111
set(pax_target_include ${ZTSD_HEADER} ${CMAKE_CURRENT_SOURCE_DIR} ${CBDB_INCLUDE_DIR} contrib/tabulate/include)
111-
set(pax_target_link_libs uuid protobuf zstd z)
112+
set(pax_target_link_libs uuid protobuf zstd z uring)
112113
if (PAX_USE_LZ4)
113114
list(APPEND pax_target_link_libs lz4)
114115
endif()
@@ -135,7 +136,7 @@ endif(VEC_BUILD)
135136
add_library(paxformat SHARED ${PROTO_SRCS} ${pax_storage_src} ${pax_clustering_src} ${pax_exceptions_src} ${pax_comm_src} ${pax_vec_src})
136137
target_include_directories(paxformat PUBLIC ${pax_target_include})
137138
target_link_directories(paxformat PUBLIC ${pax_target_link_directories})
138-
target_link_libraries(paxformat PUBLIC ${pax_target_link_libs})
139+
target_link_libraries(paxformat PRIVATE ${pax_target_link_libs})
139140

140141
set_target_properties(paxformat PROPERTIES
141142
OUTPUT_NAME paxformat)
@@ -196,4 +197,4 @@ install(TARGETS paxformat
196197
add_executable(paxformat_test paxformat_test.cc)
197198
target_include_directories(paxformat_test PUBLIC ${pax_target_include} ${CMAKE_CURRENT_SOURCE_DIR})
198199
add_dependencies(paxformat_test paxformat)
199-
target_link_libraries(paxformat_test PUBLIC paxformat postgres)
200+
target_link_libraries(paxformat_test PRIVATE paxformat postgres)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
* common_io.h
21+
*
22+
* IDENTIFICATION
23+
* contrib/pax_storage/src/cpp/comm/common_io.h
24+
*
25+
*-------------------------------------------------------------------------
26+
*/
27+
28+
#pragma once
29+
#include <cstddef>
30+
#include <fcntl.h>
31+
32+
namespace pax
33+
{
34+
struct IORequest {
35+
void* buffer;
36+
size_t size;
37+
off_t offset;
38+
};
39+
} // namespace pax
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
* fast_io.cc
21+
*
22+
* IDENTIFICATION
23+
* contrib/pax_storage/src/cpp/comm/fast_io.cc
24+
*
25+
*-------------------------------------------------------------------------
26+
*/
27+
28+
#include "fast_io.h"
29+
30+
namespace pax
31+
{
32+
33+
bool IOUringFastIO::available() {
34+
static char support_io_uring = 0;
35+
36+
if (support_io_uring == 1) return true;
37+
if (support_io_uring == -1) return false;
38+
39+
struct io_uring ring;
40+
bool supported = (io_uring_queue_init(128, &ring, 0) == 0);
41+
if (supported) {
42+
io_uring_queue_exit(&ring);
43+
}
44+
support_io_uring = supported ? 1 : -1;
45+
return supported;
46+
}
47+
48+
// if pair.first == 0, all read requests are successful
49+
// pair.second indicates the number of successful read requests
50+
std::pair<int, int> IOUringFastIO::read(int fd, std::vector<IORequest> &request, std::vector<bool> &result) {
51+
size_t index = 0;
52+
int success_read = 0;
53+
int retcode = 0;
54+
size_t completed = 0;
55+
size_t total_requests = request.size();
56+
57+
// Implementation for synchronous read using io_uring
58+
if (uring_likely(request.empty())) return {0, 0};
59+
if (status_ != 'i') return {-EINVAL, 0};
60+
61+
result.resize(request.size(), false);
62+
63+
while (completed < total_requests) {
64+
struct io_uring_sqe *sqe;
65+
struct io_uring_cqe *cqe;
66+
unsigned head;
67+
unsigned count;
68+
int rc;
69+
// Submit read requests
70+
while (index < total_requests) {
71+
sqe = io_uring_get_sqe(&ring_);
72+
if (!sqe) break; // No more SQEs available, retry later
73+
74+
io_uring_prep_read(sqe, fd, request[index].buffer, request[index].size, request[index].offset);
75+
io_uring_sqe_set_data(sqe, (void*)(uintptr_t)index);
76+
index++;
77+
}
78+
79+
// submit and wait for completions
80+
do {
81+
rc = io_uring_submit_and_wait(&ring_, 1);
82+
} while (rc == -EINTR);
83+
if (rc < 0) return {rc, success_read};
84+
85+
count = 0;
86+
io_uring_for_each_cqe(&ring_, head, cqe) {
87+
size_t req_index = (size_t)(uintptr_t)io_uring_cqe_get_data(cqe);
88+
if (cqe->res >= 0) {
89+
// Successful read
90+
result[req_index] = true;
91+
success_read++;
92+
} else if (retcode == 0) {
93+
retcode = cqe->res; // capture the first error
94+
}
95+
completed++;
96+
count++;
97+
}
98+
io_uring_cq_advance(&ring_, count);
99+
}
100+
return {retcode, success_read}; // Placeholder
101+
}
102+
103+
std::pair<int, int> SyncFastIO::read(int fd, std::vector<IORequest> &request, std::vector<bool> &result) {
104+
size_t total_requests = request.size();
105+
if (total_requests == 0) return {0, 0};
106+
107+
result.resize(total_requests, false);
108+
109+
int success_read = 0;
110+
int retcode = 0;
111+
112+
for (size_t i = 0; i < total_requests; ++i) {
113+
ssize_t bytes_read = 0;
114+
ssize_t nbytes;
115+
auto &req = request[i];
116+
do {
117+
nbytes = pread(fd, (char *)req.buffer + bytes_read, req.size - bytes_read, req.offset + bytes_read);
118+
if (nbytes > 0) bytes_read += nbytes;
119+
} while ((nbytes == -1 && errno == EINTR) || (nbytes > 0 && static_cast<size_t>(bytes_read) < req.size));
120+
121+
if (bytes_read < 0) {
122+
if (retcode == 0) {
123+
retcode = static_cast<int>(bytes_read); // capture first error
124+
}
125+
} else if (static_cast<size_t>(bytes_read) == request[i].size) {
126+
result[i] = true;
127+
success_read++;
128+
}
129+
}
130+
131+
return {retcode, success_read};
132+
}
133+
134+
} // namespace pax
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
* fast_io.h
21+
*
22+
* IDENTIFICATION
23+
* contrib/pax_storage/src/cpp/comm/fast_io.h
24+
*
25+
*-------------------------------------------------------------------------
26+
*/
27+
28+
#pragma once
29+
30+
#include "comm/common_io.h"
31+
32+
#include <liburing.h>
33+
#include <cstddef>
34+
#include <cstdio>
35+
#include <algorithm>
36+
#include <vector>
37+
38+
namespace pax
39+
{
40+
41+
template<typename T>
42+
int fast_io_read(int fd, std::vector<IORequest> &request) {
43+
T io_handler(request.size());
44+
return io_handler.read(fd, request).first;
45+
}
46+
47+
template<typename T>
48+
std::pair<int, int> fast_io_read2(int fd, std::vector<IORequest> &request) {
49+
T io_handler(request.size());
50+
return io_handler.read(fd, request);
51+
}
52+
53+
class SyncFastIO {
54+
public:
55+
SyncFastIO(size_t dummy_queue_size = 0) {}
56+
std::pair<int, int> read(int fd, std::vector<IORequest> &request, std::vector<bool> &result);
57+
};
58+
59+
// io_uring-based FastIO
60+
class IOUringFastIO {
61+
public:
62+
IOUringFastIO(size_t queue_size = 128) {
63+
int ret = io_uring_queue_init(std::max(queue_size, static_cast<size_t>(128)), &ring_, 0);
64+
65+
// ret < 0: unsupported
66+
// otherwise initialized
67+
status_ = ret < 0 ? 'x' : 'i';
68+
}
69+
70+
~IOUringFastIO() {
71+
if (status_ == 'i')
72+
io_uring_queue_exit(&ring_);
73+
}
74+
75+
static bool available();
76+
77+
// if pair.first == 0, all read requests are successful
78+
// pair.second indicates the number of successful read requests
79+
std::pair<int, int> read(int fd, std::vector<IORequest> &request, std::vector<bool> &result);
80+
81+
private:
82+
struct io_uring ring_;
83+
84+
// 'u' for uninitialized, 'i' for initialized, 'x' for unsupported
85+
char status_ = 'u';
86+
};
87+
88+
} // namespace pax

contrib/pax_storage/src/cpp/storage/file_system.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,14 @@ void File::PWriteN(const void *buf, size_t count, off64_t offset) {
6565
"errno=%d], %s",
6666
offset, count, num, errno, DebugString().c_str()));
6767
}
68+
69+
void File::ReadBatch(const std::vector<IORequest> &requests) const {
70+
if (requests.empty()) {
71+
return;
72+
}
73+
for (const auto &req : requests) {
74+
PReadN(req.buffer, req.size, req.offset);
75+
}
76+
}
77+
6878
} // namespace pax

contrib/pax_storage/src/cpp/storage/file_system.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <string>
3434
#include <vector>
3535

36+
#include "comm/common_io.h"
3637
#include "comm/pax_memory.h"
3738

3839
namespace pax {
@@ -74,6 +75,7 @@ class File {
7475
virtual void WriteN(const void *ptr, size_t n);
7576
virtual void PWriteN(const void *buf, size_t count, off_t offset);
7677
virtual void PReadN(void *buf, size_t count, off_t offset) const;
78+
virtual void ReadBatch(const std::vector<IORequest> &requests) const;
7779

7880
virtual void Flush() = 0;
7981
virtual void Delete() = 0;

0 commit comments

Comments
 (0)