Skip to content

Commit b44d312

Browse files
authored
issue-5288: use O_SYNC/O_DSYNC write flags in libaio/io_uring FileIOService (#5327)
#5288
1 parent aff2df7 commit b44d312

File tree

9 files changed

+318
-30
lines changed

9 files changed

+318
-30
lines changed

cloud/storage/core/libs/aio/service.cpp

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <cloud/storage/core/libs/common/error.h>
44
#include <cloud/storage/core/libs/common/file_io_service.h>
55
#include <cloud/storage/core/libs/common/thread.h>
6+
#include <cloud/storage/core/libs/common/write_sync_flags.h>
67

78
#include <util/stream/file.h>
89
#include <util/string/builder.h>
@@ -221,16 +222,18 @@ class TAIOService final
221222
TFileIOCompletion* completion,
222223
ui32 flags) override
223224
{
224-
Y_UNUSED(flags);
225-
226225
auto req = std::make_unique<iocb>();
227-
228-
io_prep_pwrite(
226+
const ui32 syncFlags = GetWriteSyncFlags(flags);
227+
iovec iov;
228+
iov.iov_base = const_cast<char*>(buffer.data());
229+
iov.iov_len = buffer.size();
230+
io_prep_pwritev2(
229231
req.get(),
230232
file,
231-
const_cast<char*>(buffer.data()),
232-
buffer.size(),
233-
offset);
233+
&iov,
234+
1,
235+
offset,
236+
syncFlags);
234237

235238
req->data = completion;
236239

@@ -244,23 +247,21 @@ class TAIOService final
244247
TFileIOCompletion* completion,
245248
ui32 flags) override
246249
{
247-
Y_UNUSED(flags);
248-
249250
auto req = std::make_unique<iocb>();
251+
const ui32 syncFlags = GetWriteSyncFlags(flags);
250252

251253
TVector<iovec> iov(buffers.size());
252254
for (ui32 i = 0; i < buffers.size(); ++i) {
253-
iov[i].iov_base =
254-
static_cast<void*>(const_cast<char*>(buffers[i].data()));
255+
iov[i].iov_base = const_cast<char*>(buffers[i].data());
255256
iov[i].iov_len = buffers[i].size();
256257
}
257-
258-
io_prep_pwritev(
258+
io_prep_pwritev2(
259259
req.get(),
260260
file,
261261
iov.data(),
262262
iov.size(),
263-
offset);
263+
offset,
264+
syncFlags);
264265

265266
req->data = completion;
266267

cloud/storage/core/libs/aio/service_ut.cpp

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include <library/cpp/threading/future/future.h>
99

10+
#include <fcntl.h>
1011
#include <util/folder/dirut.h>
1112
#include <util/folder/tempdir.h>
1213
#include <util/generic/array_ref.h>
@@ -143,6 +144,105 @@ Y_UNIT_TEST_SUITE(TAioTest)
143144
}
144145
}
145146

147+
Y_UNIT_TEST(ShouldReadWriteWithSyncFlags)
148+
{
149+
auto service = CreateAIOService();
150+
service->Start();
151+
Y_DEFER { service->Stop(); };
152+
153+
const ui32 blockSize = 4_KB;
154+
const ui64 blockCount = 1024;
155+
const auto filePath = TryGetRamDrivePath() / "test_sync_flags";
156+
157+
// Keep open mode non-sync; sync intent must come from write flags.
158+
TFileHandle fileData(filePath, OpenAlways | RdWr | DirectAligned);
159+
fileData.Resize(blockCount * blockSize);
160+
161+
const ui64 requestStartIndex = 20;
162+
const ui64 requestBlockCount = 16;
163+
const ui64 length = requestBlockCount * blockSize;
164+
const i64 offset = requestStartIndex * blockSize;
165+
166+
std::shared_ptr<char> memory {
167+
static_cast<char*>(std::aligned_alloc(blockSize, length)),
168+
std::free
169+
};
170+
171+
TArrayRef<char> buffer {memory.get(), length};
172+
std::memset(buffer.data(), 'X', buffer.size());
173+
174+
for (const ui32 flags: {ui32(O_SYNC), ui32(O_DSYNC), ui32(O_SYNC | O_DSYNC)}) {
175+
auto result = service->AsyncWrite(fileData, offset, buffer, flags);
176+
UNIT_ASSERT_VALUES_EQUAL(buffer.size(), result.GetValueSync());
177+
}
178+
179+
std::memset(buffer.data(), '.', buffer.size());
180+
{
181+
auto result = service->AsyncRead(fileData, offset, buffer);
182+
UNIT_ASSERT_VALUES_EQUAL(buffer.size(), result.GetValueSync());
183+
}
184+
185+
for (char val: buffer) {
186+
UNIT_ASSERT_VALUES_EQUAL('X', val);
187+
}
188+
}
189+
190+
Y_UNIT_TEST(ShouldReadWriteVWithSyncFlags)
191+
{
192+
auto service = CreateAIOService();
193+
service->Start();
194+
Y_DEFER { service->Stop(); };
195+
196+
const ui32 blockSize = 4_KB;
197+
const ui64 blockCount = 1024;
198+
const auto filePath = TryGetRamDrivePath() / "test_sync_flags_v";
199+
200+
// Keep open mode non-sync; sync intent must come from write flags.
201+
TFileHandle fileData(filePath, OpenAlways | RdWr | DirectAligned);
202+
fileData.Resize(blockCount * blockSize);
203+
204+
const ui64 requestStartIndex = 20;
205+
const ui64 requestBlockCount = 16;
206+
const ui64 length = requestBlockCount * blockSize;
207+
const i64 offset = requestStartIndex * blockSize;
208+
209+
std::shared_ptr<char> memory {
210+
static_cast<char*>(std::aligned_alloc(blockSize, length)),
211+
std::free
212+
};
213+
214+
TVector<TArrayRef<char>> buffers;
215+
buffers.emplace_back(memory.get(), 4 * blockSize);
216+
buffers.emplace_back(memory.get() + 4 * blockSize, 6 * blockSize);
217+
buffers.emplace_back(memory.get() + 10 * blockSize, 6 * blockSize);
218+
219+
TVector<TArrayRef<const char>> constBuffers;
220+
for (auto& buffer: buffers) {
221+
std::memset(buffer.data(), 'X', buffer.size());
222+
constBuffers.emplace_back(buffer.data(), buffer.size());
223+
}
224+
225+
for (const ui32 flags: {ui32(O_SYNC), ui32(O_DSYNC), ui32(O_SYNC | O_DSYNC)}) {
226+
auto result = service->AsyncWriteV(fileData, offset, constBuffers, flags);
227+
UNIT_ASSERT_VALUES_EQUAL(length, result.GetValueSync());
228+
}
229+
230+
for (auto& buffer: buffers) {
231+
std::memset(buffer.data(), '.', buffer.size());
232+
}
233+
234+
{
235+
auto result = service->AsyncReadV(fileData, offset, buffers);
236+
UNIT_ASSERT_VALUES_EQUAL(length, result.GetValueSync());
237+
}
238+
239+
for (auto& buffer: buffers) {
240+
for (char val: buffer) {
241+
UNIT_ASSERT_VALUES_EQUAL('X', val);
242+
}
243+
}
244+
}
245+
146246
Y_UNIT_TEST(ShouldRetryIoSetupErrors)
147247
{
148248
const ui32 eventCountLimit =
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include "write_sync_flags.h"
2+
3+
#include <fcntl.h>
4+
#include <linux/fs.h>
5+
6+
namespace NCloud {
7+
8+
////////////////////////////////////////////////////////////////////////////////
9+
10+
ui32 GetWriteSyncFlags(ui32 flags)
11+
{
12+
// O_SYNC includes O_DSYNC bits on Linux, so O_SYNC must take precedence.
13+
if ((flags & O_SYNC) == O_SYNC) {
14+
return RWF_SYNC;
15+
}
16+
if ((flags & O_DSYNC) == O_DSYNC) {
17+
return RWF_DSYNC;
18+
}
19+
20+
return 0;
21+
}
22+
23+
} // namespace NCloud
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include "public.h"
4+
5+
namespace NCloud {
6+
7+
////////////////////////////////////////////////////////////////////////////////
8+
9+
ui32 GetWriteSyncFlags(ui32 flags);
10+
11+
} // namespace NCloud

cloud/storage/core/libs/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ SRCS(
4747
thread_pool.cpp
4848
timer.cpp
4949
timer_test.cpp
50+
write_sync_flags.cpp
5051
verify.cpp
5152
)
5253

cloud/storage/core/libs/io_uring/context.cpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,12 @@ void TContext::AsyncIO(
233233
ui32 len,
234234
ui64 offset,
235235
TFileIOCompletion* completion,
236-
ui32 flags)
236+
ui32 sqeFlags,
237+
ui32 rwFlags)
237238
{
238239
SubmissionThread->ExecuteSimple(
239-
[=, this] { SubmitIO(op, fd, addr, len, offset, completion, flags); });
240+
[=, this]
241+
{ SubmitIO(op, fd, addr, len, offset, completion, sqeFlags, rwFlags); });
240242
}
241243

242244
void TContext::AsyncNOP(TFileIOCompletion* completion, ui32 flags)
@@ -283,7 +285,8 @@ void TContext::SubmitIO(
283285
ui32 len,
284286
ui64 offset,
285287
TFileIOCompletion* completion,
286-
ui32 flags)
288+
ui32 sqeFlags,
289+
ui32 rwFlags)
287290
{
288291
io_uring_sqe* sqe = io_uring_get_sqe(&Ring);
289292
if (!sqe) {
@@ -293,7 +296,8 @@ void TContext::SubmitIO(
293296

294297
io_uring_prep_rw(op, sqe, fd, addr, len, offset);
295298
io_uring_sqe_set_data(sqe, completion);
296-
io_uring_sqe_set_flags(sqe, flags);
299+
io_uring_sqe_set_flags(sqe, sqeFlags);
300+
sqe->rw_flags = rwFlags;
297301

298302
NSan::Release(completion);
299303

@@ -402,7 +406,8 @@ void TContext::AsyncWrite(
402406
TArrayRef<const char> buffer,
403407
ui64 offset,
404408
TFileIOCompletion* completion,
405-
ui32 flags)
409+
ui32 sqeFlags,
410+
ui32 rwFlags)
406411
{
407412
AsyncIO(
408413
IORING_OP_WRITE,
@@ -411,7 +416,8 @@ void TContext::AsyncWrite(
411416
buffer.size(),
412417
offset,
413418
completion,
414-
flags);
419+
sqeFlags,
420+
rwFlags);
415421
}
416422

417423
void TContext::AsyncRead(
@@ -436,7 +442,8 @@ void TContext::AsyncWriteV(
436442
TArrayRef<const TArrayRef<const char>> buffers,
437443
ui64 offset,
438444
TFileIOCompletion* completion,
439-
ui32 flags)
445+
ui32 sqeFlags,
446+
ui32 rwFlags)
440447
{
441448
AsyncIO(
442449
IORING_OP_WRITEV,
@@ -445,7 +452,8 @@ void TContext::AsyncWriteV(
445452
buffers.size(),
446453
offset,
447454
completion,
448-
flags);
455+
sqeFlags,
456+
rwFlags);
449457
}
450458

451459
void TContext::AsyncReadV(

cloud/storage/core/libs/io_uring/context.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ class TContext final
6060
TArrayRef<const char> buffer,
6161
ui64 offset,
6262
TFileIOCompletion* completion,
63-
ui32 flags = 0);
63+
ui32 sqeFlags = 0,
64+
ui32 rwFlags = 0);
6465

6566
void AsyncRead(
6667
int fd,
@@ -74,7 +75,8 @@ class TContext final
7475
TArrayRef<const TArrayRef<const char>> buffer,
7576
ui64 offset,
7677
TFileIOCompletion* completion,
77-
ui32 flags = 0);
78+
ui32 sqeFlags = 0,
79+
ui32 rwFlags = 0);
7880

7981
void AsyncReadV(
8082
int fd,
@@ -95,7 +97,8 @@ class TContext final
9597
ui32 len,
9698
ui64 offset,
9799
TFileIOCompletion* completion,
98-
ui32 flags);
100+
ui32 sqeFlags,
101+
ui32 rwFlags = 0);
99102

100103
void AsyncIO(
101104
int op,
@@ -104,7 +107,8 @@ class TContext final
104107
ui32 len,
105108
ui64 offset,
106109
TFileIOCompletion* completion,
107-
ui32 flags);
110+
ui32 sqeFlags,
111+
ui32 rwFlags = 0);
108112

109113
void SubmitNOP(TFileIOCompletion* completion, ui32 flags);
110114
void SubmitMsg(TFileIOCompletion* completion, int res);

cloud/storage/core/libs/io_uring/service.cpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <cloud/storage/core/libs/common/task_queue.h>
77
#include <cloud/storage/core/libs/common/thread.h>
88
#include <cloud/storage/core/libs/common/thread_pool.h>
9+
#include <cloud/storage/core/libs/common/write_sync_flags.h>
910

1011
#include <library/cpp/threading/future/future.h>
1112

@@ -81,8 +82,13 @@ struct TIoUringService final
8182
TFileIOCompletion* completion,
8283
ui32 flags) final
8384
{
84-
Y_UNUSED(flags);
85-
Context.AsyncWrite(file, buffer, offset, completion, SqeFlags);
85+
Context.AsyncWrite(
86+
file,
87+
buffer,
88+
offset,
89+
completion,
90+
SqeFlags,
91+
GetWriteSyncFlags(flags));
8692
}
8793

8894
void AsyncWriteV(
@@ -92,8 +98,13 @@ struct TIoUringService final
9298
TFileIOCompletion* completion,
9399
ui32 flags) final
94100
{
95-
Y_UNUSED(flags);
96-
Context.AsyncWriteV(file, buffers, offset, completion, SqeFlags);
101+
Context.AsyncWriteV(
102+
file,
103+
buffers,
104+
offset,
105+
completion,
106+
SqeFlags,
107+
GetWriteSyncFlags(flags));
97108
}
98109
};
99110

0 commit comments

Comments
 (0)