Skip to content

Commit 28955bd

Browse files
authored
Merge pull request ceph#56352 from ifed01/wip-ifed-many-many-extents-read
blk/aio: fix long batch (64+K entries) submission. Reviewed-by: Adam Kupczyk <[email protected]>
2 parents 25fb8aa + acf7f15 commit 28955bd

File tree

6 files changed

+186
-30
lines changed

6 files changed

+186
-30
lines changed

src/blk/aio/aio.cc

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ std::ostream& operator<<(std::ostream& os, const aio_t& aio)
1616
}
1717

1818
int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
19-
uint16_t aios_size, void *priv,
19+
void *priv,
2020
int *retries)
2121
{
2222
// 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
@@ -25,33 +25,43 @@ int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
2525
int r;
2626

2727
aio_iter cur = begin;
28-
struct aio_t *piocb[aios_size];
29-
int left = 0;
30-
while (cur != end) {
31-
cur->priv = priv;
32-
*(piocb+left) = &(*cur);
33-
++left;
34-
++cur;
35-
}
36-
ceph_assert(aios_size >= left);
28+
#if defined(HAVE_LIBAIO)
29+
struct aio_t *piocb[max_iodepth];
30+
#endif
3731
int done = 0;
38-
while (left > 0) {
32+
int pushed = 0; //used for LIBAIO only
33+
int pulled = 0;
34+
while (cur != end || pushed < pulled) {
3935
#if defined(HAVE_LIBAIO)
40-
r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done));
36+
while (cur != end && pulled < max_iodepth) {
37+
cur->priv = priv;
38+
piocb[pulled] = &(*cur);
39+
++pulled;
40+
++cur;
41+
}
42+
int toSubmit = pulled - pushed;
43+
r = io_submit(ctx, toSubmit, (struct iocb**)(piocb + pushed));
44+
if (r >= 0 && r < toSubmit) {
45+
pushed += r;
46+
done += r;
47+
r = -EAGAIN;
48+
}
4149
#elif defined(HAVE_POSIXAIO)
42-
if (piocb[done]->n_aiocb == 1) {
50+
cur->priv = priv;
51+
if ((cur->n_aiocb == 1) {
4352
// TODO: consider batching multiple reads together with lio_listio
44-
piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
45-
piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
46-
piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done];
47-
r = aio_read(&piocb[done]->aio.aiocb);
53+
cur->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
54+
cur->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
55+
cur->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = &(*cur);
56+
r = aio_write(&cur->aio.aiocb);
4857
} else {
4958
struct sigevent sev;
5059
sev.sigev_notify = SIGEV_KEVENT;
5160
sev.sigev_notify_kqueue = ctx;
52-
sev.sigev_value.sival_ptr = piocb[done];
53-
r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev);
61+
sev.sigev_value.sival_ptr = &(*cur);
62+
r = lio_listio(LIO_NOWAIT, &cur->aio.aiocbp, cur->n_aiocb, &sev);
5463
}
64+
++cur;
5565
#endif
5666
if (r < 0) {
5767
if (r == -EAGAIN && attempts-- > 0) {
@@ -64,9 +74,9 @@ int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
6474
}
6575
ceph_assert(r > 0);
6676
done += r;
67-
left -= r;
6877
attempts = 16;
6978
delay = 125;
79+
pushed = pulled = 0;
7080
}
7181
return done;
7282
}

src/blk/aio/aio.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ struct io_queue_t {
100100

101101
virtual int init(std::vector<int> &fds) = 0;
102102
virtual void shutdown() = 0;
103-
virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
103+
virtual int submit_batch(aio_iter begin, aio_iter end,
104104
void *priv, int *retries) = 0;
105105
virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
106106
};
@@ -153,7 +153,7 @@ struct aio_queue_t final : public io_queue_t {
153153
}
154154
}
155155

156-
int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
156+
int submit_batch(aio_iter begin, aio_iter end,
157157
void *priv, int *retries) final;
158158
int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
159159
};

src/blk/kernel/KernelDevice.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,11 @@ void KernelDevice::close()
344344
extblkdev::release_device(ebd_impl);
345345

346346
for (int i = 0; i < WRITE_LIFE_MAX; i++) {
347-
assert(fd_directs[i] >= 0);
347+
ceph_assert(fd_directs[i] >= 0);
348348
VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
349349
fd_directs[i] = -1;
350350

351-
assert(fd_buffereds[i] >= 0);
351+
ceph_assert(fd_buffereds[i] >= 0);
352352
VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
353353
fd_buffereds[i] = -1;
354354
}
@@ -925,10 +925,8 @@ void KernelDevice::aio_submit(IOContext *ioc)
925925

926926
void *priv = static_cast<void*>(ioc);
927927
int r, retries = 0;
928-
// num of pending aios should not overflow when passed to submit_batch()
929-
assert(pending <= std::numeric_limits<uint16_t>::max());
930928
r = io_queue->submit_batch(ioc->running_aios.begin(), e,
931-
pending, priv, &retries);
929+
priv, &retries);
932930

933931
if (retries)
934932
derr << __func__ << " retries " << retries << dendl;

src/blk/kernel/io_uring.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,9 @@ void ioring_queue_t::shutdown()
176176
}
177177

178178
int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
179-
uint16_t aios_size, void *priv,
179+
void *priv,
180180
int *retries)
181181
{
182-
(void)aios_size;
183182
(void)retries;
184183

185184
pthread_mutex_lock(&d->sq_mutex);

src/blk/kernel/io_uring.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct ioring_queue_t final : public io_queue_t {
2727
int init(std::vector<int> &fds) final;
2828
void shutdown() final;
2929

30-
int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
30+
int submit_batch(aio_iter begin, aio_iter end,
3131
void *priv, int *retries) final;
3232
int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
3333
};

src/test/objectstore/store_test.cc

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7750,6 +7750,155 @@ TEST_P(StoreTestSpecificAUSize, BlobReuseOnOverwrite) {
77507750
}
77517751
}
77527752

7753+
TEST_P(StoreTestSpecificAUSize, ManyManyExtents) {
7754+
7755+
if (string(GetParam()) != "bluestore")
7756+
return;
7757+
7758+
size_t block_size = 4096;
7759+
StartDeferred(block_size);
7760+
7761+
int r;
7762+
coll_t cid;
7763+
ghobject_t hoid(hobject_t("test", "", CEPH_NOSNAP, 0, -1, ""));
7764+
7765+
const PerfCounters* logger = store->get_perf_counters();
7766+
7767+
auto ch = store->create_new_collection(cid);
7768+
{
7769+
ObjectStore::Transaction t;
7770+
t.create_collection(cid, 0);
7771+
r = queue_transaction(store, ch, std::move(t));
7772+
ASSERT_EQ(r, 0);
7773+
}
7774+
const size_t max_iterations = 129;
7775+
const size_t max_txn_ops = 512;
7776+
bufferlist bl;
7777+
{
7778+
for (size_t i = 0; i < max_iterations; i++) {
7779+
ObjectStore::Transaction t;
7780+
for (size_t j = 0; j < max_txn_ops; j++) {
7781+
bl.clear();
7782+
bl.append(std::string(1, 'a' + j % 26));
7783+
t.write(cid, hoid, (i * max_txn_ops + j) * 4096, bl.length(), bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
7784+
}
7785+
r = queue_transaction(store, ch, std::move(t));
7786+
ASSERT_EQ(r, 0);
7787+
cerr << "iter " << i << "/" << max_iterations - 1 << std::endl;
7788+
}
7789+
}
7790+
ch.reset();
7791+
store->umount();
7792+
store->mount();
7793+
ch = store->open_collection(cid);
7794+
{
7795+
bl.clear();
7796+
size_t len = (max_iterations * max_txn_ops) * 4096 - 4095;
7797+
cerr << "reading in a single chunk, size =" << len << std::endl;
7798+
r = store->read(ch, hoid,
7799+
0, len,
7800+
bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
7801+
ASSERT_EQ(r, len);
7802+
ASSERT_EQ(r, bl.length());
7803+
size_t idx = 0;
7804+
for (size_t i = 0; i < max_iterations; i++) {
7805+
for (size_t j = 0; j < max_txn_ops; j++) {
7806+
ASSERT_EQ(bl[idx], 'a' + j % 26);
7807+
idx += 4096;
7808+
}
7809+
}
7810+
}
7811+
ch.reset();
7812+
store->umount();
7813+
store->mount();
7814+
ch = store->open_collection(cid);
7815+
{
7816+
cerr << "reading in multiple chunks..." << std::endl;
7817+
bl.clear();
7818+
store->fiemap(ch, hoid, 0, 1ull << 31, bl);
7819+
map<uint64_t,uint64_t> m;
7820+
auto p = bl.cbegin();
7821+
decode(m, p);
7822+
7823+
bl.clear();
7824+
interval_set<uint64_t> im(std::move(m));
7825+
r = store->readv(ch, hoid, im, bl, 0);
7826+
ASSERT_EQ(r, max_txn_ops * max_iterations);
7827+
ASSERT_EQ(r, bl.length());
7828+
size_t idx = 0;
7829+
for (size_t i = 0; i < max_iterations; i++) {
7830+
for (size_t j = 0; j < max_txn_ops; j++) {
7831+
ASSERT_EQ(bl[idx++], 'a' + j % 26);
7832+
}
7833+
}
7834+
}
7835+
store->refresh_perf_counters();
7836+
cerr << "blobs = " << logger->get(l_bluestore_blobs)
7837+
<< " extents = " << logger->get(l_bluestore_extents)
7838+
<< std::endl;
7839+
{
7840+
ObjectStore::Transaction t;
7841+
t.remove(cid, hoid);
7842+
t.remove_collection(cid);
7843+
cerr << "Cleaning" << std::endl;
7844+
r = queue_transaction(store, ch, std::move(t));
7845+
ASSERT_EQ(r, 0);
7846+
}
7847+
}
7848+
7849+
TEST_P(StoreTestSpecificAUSize, ManyManyExtents2) {
7850+
7851+
if (string(GetParam()) != "bluestore")
7852+
return;
7853+
7854+
size_t block_size = 4096;
7855+
StartDeferred(block_size);
7856+
7857+
int r;
7858+
coll_t cid;
7859+
ghobject_t hoid(hobject_t("test", "", CEPH_NOSNAP, 0, -1, ""));
7860+
7861+
auto ch = store->create_new_collection(cid);
7862+
{
7863+
ObjectStore::Transaction t;
7864+
t.create_collection(cid, 0);
7865+
r = queue_transaction(store, ch, std::move(t));
7866+
ASSERT_EQ(r, 0);
7867+
}
7868+
{
7869+
ObjectStore::Transaction t;
7870+
bufferlist bl;
7871+
bl.append(std::string(1024 * 1024, 'a'));
7872+
t.write(cid, hoid, 0, bl.length(), bl, 0);
7873+
r = queue_transaction(store, ch, std::move(t));
7874+
ASSERT_EQ(r, 0);
7875+
}
7876+
ch.reset();
7877+
store->umount();
7878+
store->mount();
7879+
ch = store->open_collection(cid);
7880+
{
7881+
cerr << "reading in multiple chunks..." << std::endl;
7882+
bufferlist bl;
7883+
interval_set<uint64_t> im;
7884+
for (int i=0; i < 100000;i++) {
7885+
im.insert(i * 2, 1);
7886+
}
7887+
r = store->readv(ch, hoid, im, bl, 0);
7888+
ASSERT_EQ(r, 100000);
7889+
ASSERT_EQ(r, bl.length());
7890+
}
7891+
store->refresh_perf_counters();
7892+
{
7893+
ObjectStore::Transaction t;
7894+
t.remove(cid, hoid);
7895+
t.remove_collection(cid);
7896+
cerr << "Cleaning" << std::endl;
7897+
r = queue_transaction(store, ch, std::move(t));
7898+
ASSERT_EQ(r, 0);
7899+
}
7900+
}
7901+
77537902
TEST_P(StoreTestSpecificAUSize, ZeroBlockDetectionSmallAppend) {
77547903
CephContext *cct = (new CephContext(CEPH_ENTITY_TYPE_CLIENT))->get();
77557904
if (string(GetParam()) != "bluestore" || !cct->_conf->bluestore_zero_block_detection) {

0 commit comments

Comments
 (0)