Skip to content

Commit e99c17f

Browse files
committed
Merge PR ceph#48038 into main
* refs/pull/48038/head: client test: Add fsync to ll_preadv_pwritev test libcephfs: Option to write + fsync via ceph_ll_nonblocking_readv_writev Client: Hook nonblocking fsync into the write path of ll_preadv_pwritev Client: Add non-blocking fsync Client/Inode: wait_for_caps fixups Client: change several waitfor_* to use Context list test: Add nonblocking I/O client test libcephfs: Add nonblocking readv/writev I/O interface Client: Add ll_preadv_pwritev to expose non-blocking I/O to libcephfs Client: Add non-blocking helper classes Client: Break some code into new methods in prep for non-blocking I/O Buffers: Add function to buffer.h to copy bufferlist to an iovec ObjectCacher: Prepare file_write path for non-blocking I/O Reviewed-by: Venky Shankar <[email protected]> Reviewed-by: Adam C. Emerson <[email protected]>
2 parents f3a659f + 13162f6 commit e99c17f

File tree

11 files changed

+1291
-127
lines changed

11 files changed

+1291
-127
lines changed

src/client/Client.cc

Lines changed: 776 additions & 110 deletions
Large diffs are not rendered by default.

src/client/Client.h

Lines changed: 278 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,11 @@ class Client : public Dispatcher, public md_config_obs_t {
646646
int ll_write(Fh *fh, loff_t off, loff_t len, const char *data);
647647
int64_t ll_readv(struct Fh *fh, const struct iovec *iov, int iovcnt, int64_t off);
648648
int64_t ll_writev(struct Fh *fh, const struct iovec *iov, int iovcnt, int64_t off);
649+
int64_t ll_preadv_pwritev(struct Fh *fh, const struct iovec *iov, int iovcnt,
650+
int64_t offset, bool write,
651+
Context *onfinish = nullptr,
652+
bufferlist *blp = nullptr,
653+
bool do_fsync = false, bool syncdataonly = false);
649654
loff_t ll_lseek(Fh *fh, loff_t offset, int whence);
650655
int ll_flush(Fh *fh);
651656
int ll_fsync(Fh *fh, bool syncdataonly);
@@ -753,6 +758,7 @@ class Client : public Dispatcher, public md_config_obs_t {
753758
void flush_snaps(Inode *in);
754759
void get_cap_ref(Inode *in, int cap);
755760
void put_cap_ref(Inode *in, int cap);
761+
void submit_sync_caps(Inode *in, ceph_tid_t want, Context *onfinish);
756762
void wait_sync_caps(Inode *in, ceph_tid_t want);
757763
void wait_sync_caps(ceph_tid_t want);
758764
void queue_cap_snap(Inode *in, SnapContext &old_snapc);
@@ -1019,8 +1025,12 @@ class Client : public Dispatcher, public md_config_obs_t {
10191025
// helpers
10201026
void wake_up_session_caps(MetaSession *s, bool reconnect);
10211027

1028+
void add_nonblocking_onfinish_to_context_list(std::list<Context*>& ls, Context *onfinish) {
1029+
ls.push_back(onfinish);
1030+
}
10221031
void wait_on_context_list(std::list<Context*>& ls);
10231032
void signal_context_list(std::list<Context*>& ls);
1033+
void signal_caps_inode(Inode *in);
10241034

10251035
// -- metadata cache stuff
10261036

@@ -1245,6 +1255,258 @@ class Client : public Dispatcher, public md_config_obs_t {
12451255
struct initialize_state_t initialize_state;
12461256

12471257
private:
1258+
class C_Read_Finisher : public Context {
1259+
public:
1260+
bool iofinished;
1261+
void finish_io(int r);
1262+
1263+
C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish,
1264+
bool is_read_async, int have_caps, bool movepos,
1265+
utime_t start, Fh *f, Inode *in, uint64_t fpos,
1266+
int64_t offset, uint64_t size)
1267+
: clnt(clnt), onfinish(onfinish), iofinish(iofinish),
1268+
is_read_async(is_read_async), have_caps(have_caps), f(f), in(in),
1269+
start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) {
1270+
iofinished = false;
1271+
}
1272+
1273+
void finish(int r) override {
1274+
// We need to override finish, but have nothing to do.
1275+
}
1276+
1277+
private:
1278+
Client *clnt;
1279+
Context *onfinish;
1280+
Context *iofinish;
1281+
bool is_read_async;
1282+
int have_caps;
1283+
Fh *f;
1284+
Inode *in;
1285+
utime_t start;
1286+
uint64_t fpos;
1287+
int64_t offset;
1288+
uint64_t size;
1289+
bool movepos;
1290+
};
1291+
1292+
struct CRF_iofinish : public Context {
1293+
Client::C_Read_Finisher *CRF;
1294+
1295+
CRF_iofinish()
1296+
: CRF(nullptr) {}
1297+
1298+
void finish(int r) override {
1299+
CRF->finish_io(r);
1300+
}
1301+
1302+
// For _read_async, we may not finish in one go, so be prepared for multiple
1303+
// calls to complete. All the handling though is in C_Read_Finisher.
1304+
void complete(int r) override {
1305+
finish(r);
1306+
if (CRF->iofinished)
1307+
delete this;
1308+
}
1309+
};
1310+
1311+
class C_Read_Sync_NonBlocking : public Context {
1312+
// When operating in non-blocking mode, what used to be done by _read_sync
1313+
// still needs to be handled, but it needs to be handled without blocking
1314+
// while still following the semantics. Note that _read_sync is actually
1315+
// asynchronous. it just uses condition variables to wait. Now instead, we use
1316+
// this Context class to synchronize the steps.
1317+
//
1318+
// The steps will be accomplished by complete/finish being called to complete
1319+
// each step, with complete only releasing this object once all is finally
1320+
// complete.
1321+
public:
1322+
C_Read_Sync_NonBlocking(Client *clnt, Context *onfinish, Fh *f, Inode *in,
1323+
uint64_t fpos, uint64_t off, uint64_t len,
1324+
bufferlist *bl, Filer *filer, int have_caps)
1325+
: clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl),
1326+
filer(filer), have_caps(have_caps)
1327+
{
1328+
left = len;
1329+
wanted = len;
1330+
read = 0;
1331+
pos = off;
1332+
fini = false;
1333+
}
1334+
1335+
void retry();
1336+
1337+
private:
1338+
Client *clnt;
1339+
Context *onfinish;
1340+
Fh *f;
1341+
Inode *in;
1342+
uint64_t off;
1343+
uint64_t len;
1344+
int left;
1345+
int wanted;
1346+
bufferlist *bl;
1347+
bufferlist tbl;
1348+
Filer *filer;
1349+
int have_caps;
1350+
int read;
1351+
uint64_t pos;
1352+
bool fini;
1353+
1354+
void finish(int r) override;
1355+
1356+
void complete(int r) override
1357+
{
1358+
finish(r);
1359+
if (fini)
1360+
delete this;
1361+
}
1362+
};
1363+
1364+
class C_Read_Async_Finisher : public Context {
1365+
public:
1366+
C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in,
1367+
uint64_t fpos, uint64_t off, uint64_t len)
1368+
: clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {}
1369+
1370+
private:
1371+
Client *clnt;
1372+
Context *onfinish;
1373+
Fh *f;
1374+
Inode *in;
1375+
uint64_t off;
1376+
uint64_t len;
1377+
1378+
void finish(int r) override;
1379+
};
1380+
1381+
class C_Write_Finisher : public Context {
1382+
public:
1383+
void finish_io(int r);
1384+
void finish_onuninline(int r);
1385+
void finish_fsync(int r);
1386+
1387+
C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
1388+
bool is_file_write, utime_t start, Fh *f, Inode *in,
1389+
uint64_t fpos, int64_t offset, uint64_t size,
1390+
bool do_fsync, bool syncdataonly)
1391+
: clnt(clnt), onfinish(onfinish),
1392+
is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
1393+
offset(offset), size(size), syncdataonly(syncdataonly) {
1394+
iofinished_r = 0;
1395+
onuninlinefinished_r = 0;
1396+
fsync_r = 0;
1397+
iofinished = false;
1398+
onuninlinefinished = dont_need_uninline;
1399+
fsync_finished = !do_fsync;
1400+
}
1401+
1402+
void finish(int r) override {
1403+
// We need to override finish, but have nothing to do.
1404+
}
1405+
1406+
private:
1407+
Client *clnt;
1408+
Context *onfinish;
1409+
bool is_file_write;
1410+
utime_t start;
1411+
Fh *f;
1412+
Inode *in;
1413+
uint64_t fpos;
1414+
int64_t offset;
1415+
uint64_t size;
1416+
bool syncdataonly;
1417+
int64_t iofinished_r;
1418+
int64_t onuninlinefinished_r;
1419+
int64_t fsync_r;
1420+
bool iofinished;
1421+
bool onuninlinefinished;
1422+
bool fsync_finished;
1423+
bool try_complete();
1424+
};
1425+
1426+
struct CWF_iofinish : public Context {
1427+
C_Write_Finisher *CWF;
1428+
1429+
CWF_iofinish()
1430+
: CWF(nullptr) {}
1431+
1432+
void finish(int r) override {
1433+
CWF->finish_io(r);
1434+
}
1435+
};
1436+
1437+
struct CWF_fsync_finish : public Context {
1438+
C_Write_Finisher *CWF;
1439+
1440+
CWF_fsync_finish(C_Write_Finisher *CWF)
1441+
: CWF(CWF) {}
1442+
1443+
void finish(int r) override {
1444+
CWF->finish_fsync(r);
1445+
}
1446+
};
1447+
1448+
struct C_nonblocking_fsync_state {
1449+
Client *clnt;
1450+
1451+
// nonblocking_fsync parms
1452+
Inode *in;
1453+
bool syncdataonly;
1454+
Context *onfinish;
1455+
1456+
// were local variables
1457+
ceph_tid_t flush_tid;
1458+
InodeRef tmp_ref;
1459+
utime_t start;
1460+
MetaRequest *req;
1461+
1462+
// we need to keep track of where we are
1463+
int progress;
1464+
bool flush_wait;
1465+
bool flush_completed;
1466+
int result;
1467+
bool waitfor_safe;
1468+
1469+
C_nonblocking_fsync_state(Client *clnt, Inode *in, bool syncdataonly, Context *onfinish)
1470+
: clnt(clnt), in(in), syncdataonly(syncdataonly), onfinish(onfinish) {
1471+
flush_tid = 0;
1472+
start = ceph_clock_now();
1473+
progress = 0;
1474+
flush_wait = false;
1475+
flush_completed = false;
1476+
result = 0;
1477+
waitfor_safe = false;
1478+
}
1479+
1480+
void advance();
1481+
1482+
void complete_flush(int r);
1483+
};
1484+
1485+
struct C_nonblocking_fsync_state_advancer : Context {
1486+
Client *clnt;
1487+
Client::C_nonblocking_fsync_state *state;
1488+
1489+
C_nonblocking_fsync_state_advancer(Client *clnt, Client::C_nonblocking_fsync_state *state)
1490+
: clnt(clnt), state(state) {
1491+
}
1492+
1493+
void finish(int r) override;
1494+
};
1495+
1496+
struct C_nonblocking_fsync_flush_finisher : Context {
1497+
Client *clnt;
1498+
Client::C_nonblocking_fsync_state *state;
1499+
1500+
C_nonblocking_fsync_flush_finisher(Client *clnt, Client::C_nonblocking_fsync_state *state)
1501+
: clnt(clnt), state(state) {
1502+
}
1503+
1504+
void finish(int r) override {
1505+
ceph_assert(ceph_mutex_is_locked_by_me(clnt->client_lock));
1506+
state->complete_flush(r);
1507+
}
1508+
};
1509+
12481510
struct C_Readahead : public Context {
12491511
C_Readahead(Client *c, Fh *f);
12501512
~C_Readahead() override;
@@ -1331,7 +1593,8 @@ class Client : public Dispatcher, public md_config_obs_t {
13311593
std::pair<int, bool> _do_remount(bool retry_on_error);
13321594

13331595
int _read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, bool *checkeof);
1334-
int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl);
1596+
int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
1597+
Context *onfinish);
13351598

13361599
bool _dentry_valid(const Dentry *dn);
13371600

@@ -1399,15 +1662,25 @@ class Client : public Dispatcher, public md_config_obs_t {
13991662
std::string alternate_name);
14001663

14011664
loff_t _lseek(Fh *fh, loff_t offset, int whence);
1402-
int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl);
1665+
int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl,
1666+
Context *onfinish = nullptr);
1667+
void do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len);
1668+
int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos,
1669+
int64_t offset, uint64_t size, Inode *in);
14031670
int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
1404-
const struct iovec *iov, int iovcnt);
1671+
const struct iovec *iov, int iovcnt, Context *onfinish = nullptr,
1672+
bool do_fsync = false, bool syncdataonly = false);
14051673
int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
14061674
unsigned iovcnt, int64_t offset,
1407-
bool write, bool clamp_to_int);
1675+
bool write, bool clamp_to_int,
1676+
Context *onfinish = nullptr,
1677+
bufferlist *blp = nullptr,
1678+
bool do_fsync = false, bool syncdataonly = false);
14081679
int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
1409-
int64_t offset, bool write);
1680+
int64_t offset, bool write, Context *onfinish = nullptr,
1681+
bufferlist *blp = nullptr);
14101682
int _flush(Fh *fh);
1683+
void nonblocking_fsync(Inode *in, bool syncdataonly, Context *onfinish);
14111684
int _fsync(Fh *fh, bool syncdataonly);
14121685
int _fsync(Inode *in, bool syncdataonly);
14131686
int _sync_fs();

src/client/Inode.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,9 @@ struct Inode : RefCountedObject {
241241
std::map<frag_t,int> fragmap; // known frag -> mds mappings
242242
std::map<frag_t, std::vector<mds_rank_t>> frag_repmap; // non-auth mds mappings
243243

244-
std::list<ceph::condition_variable*> waitfor_caps;
245-
std::list<ceph::condition_variable*> waitfor_commit;
244+
std::list<Context*> waitfor_caps;
245+
std::list<Context*> waitfor_caps_pending;
246+
std::list<Context*> waitfor_commit;
246247
std::list<ceph::condition_variable*> waitfor_deleg;
247248

248249
Dentry *get_first_parent() {

src/client/MetaRequest.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ struct MetaRequest {
7070

7171
ceph::condition_variable *caller_cond = NULL; // who to take up
7272
ceph::condition_variable *dispatch_cond = NULL; // who to kick back
73-
std::list<ceph::condition_variable*> waitfor_safe;
73+
std::list<Context*> waitfor_safe;
7474

7575
InodeRef target;
7676
UserPerm perms;

src/include/buffer.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,23 @@ inline bufferhash& operator<<(bufferhash& l, const bufferlist &r) {
12861286
return l;
12871287
}
12881288

1289+
static inline
1290+
void copy_bufferlist_to_iovec(const struct iovec *iov, unsigned iovcnt,
1291+
bufferlist *bl, int64_t r)
1292+
{
1293+
auto iter = bl->cbegin();
1294+
for (unsigned j = 0, resid = r; j < iovcnt && resid > 0; j++) {
1295+
/*
1296+
* This piece of code aims to handle the case that bufferlist
1297+
* does not have enough data to fill in the iov
1298+
*/
1299+
const auto round_size = std::min<unsigned>(resid, iov[j].iov_len);
1300+
iter.copy(round_size, reinterpret_cast<char*>(iov[j].iov_base));
1301+
resid -= round_size;
1302+
/* iter is self-updating */
1303+
}
1304+
}
1305+
12891306
} // namespace buffer
12901307

12911308
} // namespace ceph

0 commit comments

Comments
 (0)