Skip to content

Commit 33a5f17

Browse files
jtlaytonidryomov
authored andcommitted
ceph: add read/modify/write to ceph_sync_write
When doing a synchronous write on an encrypted inode, we have no guarantee that the caller is writing crypto block-aligned data. When that happens, we must do a read/modify/write cycle. First, expand the range to cover complete blocks. If we had to change the original pos or length, issue a read to fill the first and/or last pages, and fetch the version of the object from the result. We then copy data into the pages as usual, encrypt the result and issue a write prefixed by an assertion that the version hasn't changed. If it has changed then we restart the whole thing again. If there is no object at that position in the file (-ENOENT), we prefix the write on an exclusive create of the object instead. Signed-off-by: Jeff Layton <[email protected]> Reviewed-by: Xiubo Li <[email protected]> Reviewed-and-tested-by: Luís Henriques <[email protected]> Reviewed-by: Milind Changire <[email protected]> Signed-off-by: Ilya Dryomov <[email protected]>
1 parent b294fa2 commit 33a5f17

File tree

1 file changed

+290
-28
lines changed

1 file changed

+290
-28
lines changed

fs/ceph/file.c

Lines changed: 290 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,18 +1571,16 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
15711571
struct inode *inode = file_inode(file);
15721572
struct ceph_inode_info *ci = ceph_inode(inode);
15731573
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1574-
struct ceph_vino vino;
1574+
struct ceph_osd_client *osdc = &fsc->client->osdc;
15751575
struct ceph_osd_request *req;
15761576
struct page **pages;
15771577
u64 len;
15781578
int num_pages;
15791579
int written = 0;
1580-
int flags;
15811580
int ret;
15821581
bool check_caps = false;
15831582
struct timespec64 mtime = current_time(inode);
15841583
size_t count = iov_iter_count(from);
1585-
size_t off;
15861584

15871585
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
15881586
return -EROFS;
@@ -1602,72 +1600,335 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
16021600
if (ret < 0)
16031601
dout("invalidate_inode_pages2_range returned %d\n", ret);
16041602

1605-
flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
1606-
16071603
while ((len = iov_iter_count(from)) > 0) {
16081604
size_t left;
16091605
int n;
1606+
u64 write_pos = pos;
1607+
u64 write_len = len;
1608+
u64 objnum, objoff;
1609+
u32 xlen;
1610+
u64 assert_ver = 0;
1611+
bool rmw;
1612+
bool first, last;
1613+
struct iov_iter saved_iter = *from;
1614+
size_t off;
1615+
1616+
ceph_fscrypt_adjust_off_and_len(inode, &write_pos, &write_len);
1617+
1618+
/* clamp the length to the end of first object */
1619+
ceph_calc_file_object_mapping(&ci->i_layout, write_pos,
1620+
write_len, &objnum, &objoff,
1621+
&xlen);
1622+
write_len = xlen;
1623+
1624+
/* adjust len downward if it goes beyond current object */
1625+
if (pos + len > write_pos + write_len)
1626+
len = write_pos + write_len - pos;
16101627

1611-
vino = ceph_vino(inode);
1612-
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1613-
vino, pos, &len, 0, 1,
1614-
CEPH_OSD_OP_WRITE, flags, snapc,
1615-
ci->i_truncate_seq,
1616-
ci->i_truncate_size,
1617-
false);
1618-
if (IS_ERR(req)) {
1619-
ret = PTR_ERR(req);
1620-
break;
1621-
}
1628+
/*
1629+
* If we had to adjust the length or position to align with a
1630+
* crypto block, then we must do a read/modify/write cycle. We
1631+
* use a version assertion to redrive the thing if something
1632+
* changes in between.
1633+
*/
1634+
first = pos != write_pos;
1635+
last = (pos + len) != (write_pos + write_len);
1636+
rmw = first || last;
16221637

1623-
num_pages = calc_pages_for(pos, len);
1638+
dout("sync_write ino %llx %lld~%llu adjusted %lld~%llu -- %srmw\n",
1639+
ci->i_vino.ino, pos, len, write_pos, write_len,
1640+
rmw ? "" : "no ");
1641+
1642+
/*
1643+
* The data is emplaced into the page as it would be if it were
1644+
* in an array of pagecache pages.
1645+
*/
1646+
num_pages = calc_pages_for(write_pos, write_len);
16241647
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
16251648
if (IS_ERR(pages)) {
16261649
ret = PTR_ERR(pages);
1627-
goto out;
1650+
break;
1651+
}
1652+
1653+
/* Do we need to preload the pages? */
1654+
if (rmw) {
1655+
u64 first_pos = write_pos;
1656+
u64 last_pos = (write_pos + write_len) - CEPH_FSCRYPT_BLOCK_SIZE;
1657+
u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE;
1658+
struct ceph_osd_req_op *op;
1659+
1660+
/* We should only need to do this for encrypted inodes */
1661+
WARN_ON_ONCE(!IS_ENCRYPTED(inode));
1662+
1663+
/* No need to do two reads if first and last blocks are same */
1664+
if (first && last_pos == first_pos)
1665+
last = false;
1666+
1667+
/*
1668+
* Allocate a read request for one or two extents,
1669+
* depending on how the request was aligned.
1670+
*/
1671+
req = ceph_osdc_new_request(osdc, &ci->i_layout,
1672+
ci->i_vino, first ? first_pos : last_pos,
1673+
&read_len, 0, (first && last) ? 2 : 1,
1674+
CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ,
1675+
NULL, ci->i_truncate_seq,
1676+
ci->i_truncate_size, false);
1677+
if (IS_ERR(req)) {
1678+
ceph_release_page_vector(pages, num_pages);
1679+
ret = PTR_ERR(req);
1680+
break;
1681+
}
1682+
1683+
/* Something is misaligned! */
1684+
if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) {
1685+
ceph_osdc_put_request(req);
1686+
ceph_release_page_vector(pages, num_pages);
1687+
ret = -EIO;
1688+
break;
1689+
}
1690+
1691+
/* Add extent for first block? */
1692+
op = &req->r_ops[0];
1693+
1694+
if (first) {
1695+
osd_req_op_extent_osd_data_pages(req, 0, pages,
1696+
CEPH_FSCRYPT_BLOCK_SIZE,
1697+
offset_in_page(first_pos),
1698+
false, false);
1699+
/* We only expect a single extent here */
1700+
ret = __ceph_alloc_sparse_ext_map(op, 1);
1701+
if (ret) {
1702+
ceph_osdc_put_request(req);
1703+
ceph_release_page_vector(pages, num_pages);
1704+
break;
1705+
}
1706+
}
1707+
1708+
/* Add extent for last block */
1709+
if (last) {
1710+
/* Init the other extent if first extent has been used */
1711+
if (first) {
1712+
op = &req->r_ops[1];
1713+
osd_req_op_extent_init(req, 1,
1714+
CEPH_OSD_OP_SPARSE_READ,
1715+
last_pos, CEPH_FSCRYPT_BLOCK_SIZE,
1716+
ci->i_truncate_size,
1717+
ci->i_truncate_seq);
1718+
}
1719+
1720+
ret = __ceph_alloc_sparse_ext_map(op, 1);
1721+
if (ret) {
1722+
ceph_osdc_put_request(req);
1723+
ceph_release_page_vector(pages, num_pages);
1724+
break;
1725+
}
1726+
1727+
osd_req_op_extent_osd_data_pages(req, first ? 1 : 0,
1728+
&pages[num_pages - 1],
1729+
CEPH_FSCRYPT_BLOCK_SIZE,
1730+
offset_in_page(last_pos),
1731+
false, false);
1732+
}
1733+
1734+
ceph_osdc_start_request(osdc, req);
1735+
ret = ceph_osdc_wait_request(osdc, req);
1736+
1737+
/* FIXME: length field is wrong if there are 2 extents */
1738+
ceph_update_read_metrics(&fsc->mdsc->metric,
1739+
req->r_start_latency,
1740+
req->r_end_latency,
1741+
read_len, ret);
1742+
1743+
/* Ok if object is not already present */
1744+
if (ret == -ENOENT) {
1745+
/*
1746+
* If there is no object, then we can't assert
1747+
* on its version. Set it to 0, and we'll use an
1748+
* exclusive create instead.
1749+
*/
1750+
ceph_osdc_put_request(req);
1751+
ret = 0;
1752+
1753+
/*
1754+
* zero out the soon-to-be uncopied parts of the
1755+
* first and last pages.
1756+
*/
1757+
if (first)
1758+
zero_user_segment(pages[0], 0,
1759+
offset_in_page(first_pos));
1760+
if (last)
1761+
zero_user_segment(pages[num_pages - 1],
1762+
offset_in_page(last_pos),
1763+
PAGE_SIZE);
1764+
} else {
1765+
if (ret < 0) {
1766+
ceph_osdc_put_request(req);
1767+
ceph_release_page_vector(pages, num_pages);
1768+
break;
1769+
}
1770+
1771+
op = &req->r_ops[0];
1772+
if (op->extent.sparse_ext_cnt == 0) {
1773+
if (first)
1774+
zero_user_segment(pages[0], 0,
1775+
offset_in_page(first_pos));
1776+
else
1777+
zero_user_segment(pages[num_pages - 1],
1778+
offset_in_page(last_pos),
1779+
PAGE_SIZE);
1780+
} else if (op->extent.sparse_ext_cnt != 1 ||
1781+
ceph_sparse_ext_map_end(op) !=
1782+
CEPH_FSCRYPT_BLOCK_SIZE) {
1783+
ret = -EIO;
1784+
ceph_osdc_put_request(req);
1785+
ceph_release_page_vector(pages, num_pages);
1786+
break;
1787+
}
1788+
1789+
if (first && last) {
1790+
op = &req->r_ops[1];
1791+
if (op->extent.sparse_ext_cnt == 0) {
1792+
zero_user_segment(pages[num_pages - 1],
1793+
offset_in_page(last_pos),
1794+
PAGE_SIZE);
1795+
} else if (op->extent.sparse_ext_cnt != 1 ||
1796+
ceph_sparse_ext_map_end(op) !=
1797+
CEPH_FSCRYPT_BLOCK_SIZE) {
1798+
ret = -EIO;
1799+
ceph_osdc_put_request(req);
1800+
ceph_release_page_vector(pages, num_pages);
1801+
break;
1802+
}
1803+
}
1804+
1805+
/* Grab assert version. It must be non-zero. */
1806+
assert_ver = req->r_version;
1807+
WARN_ON_ONCE(ret > 0 && assert_ver == 0);
1808+
1809+
ceph_osdc_put_request(req);
1810+
if (first) {
1811+
ret = ceph_fscrypt_decrypt_block_inplace(inode,
1812+
pages[0], CEPH_FSCRYPT_BLOCK_SIZE,
1813+
offset_in_page(first_pos),
1814+
first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT);
1815+
if (ret < 0) {
1816+
ceph_release_page_vector(pages, num_pages);
1817+
break;
1818+
}
1819+
}
1820+
if (last) {
1821+
ret = ceph_fscrypt_decrypt_block_inplace(inode,
1822+
pages[num_pages - 1],
1823+
CEPH_FSCRYPT_BLOCK_SIZE,
1824+
offset_in_page(last_pos),
1825+
last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT);
1826+
if (ret < 0) {
1827+
ceph_release_page_vector(pages, num_pages);
1828+
break;
1829+
}
1830+
}
1831+
}
16281832
}
16291833

16301834
left = len;
16311835
off = offset_in_page(pos);
16321836
for (n = 0; n < num_pages; n++) {
16331837
size_t plen = min_t(size_t, left, PAGE_SIZE - off);
16341838

1839+
/* copy the data */
16351840
ret = copy_page_from_iter(pages[n], off, plen, from);
1636-
off = 0;
16371841
if (ret != plen) {
16381842
ret = -EFAULT;
16391843
break;
16401844
}
1845+
off = 0;
16411846
left -= ret;
16421847
}
1643-
16441848
if (ret < 0) {
1849+
dout("sync_write write failed with %d\n", ret);
16451850
ceph_release_page_vector(pages, num_pages);
1646-
goto out;
1851+
break;
16471852
}
16481853

1649-
req->r_inode = inode;
1854+
if (IS_ENCRYPTED(inode)) {
1855+
ret = ceph_fscrypt_encrypt_pages(inode, pages,
1856+
write_pos, write_len,
1857+
GFP_KERNEL);
1858+
if (ret < 0) {
1859+
dout("encryption failed with %d\n", ret);
1860+
ceph_release_page_vector(pages, num_pages);
1861+
break;
1862+
}
1863+
}
16501864

1651-
osd_req_op_extent_osd_data_pages(req, 0, pages, len,
1652-
offset_in_page(pos),
1653-
false, true);
1865+
req = ceph_osdc_new_request(osdc, &ci->i_layout,
1866+
ci->i_vino, write_pos, &write_len,
1867+
rmw ? 1 : 0, rmw ? 2 : 1,
1868+
CEPH_OSD_OP_WRITE,
1869+
CEPH_OSD_FLAG_WRITE,
1870+
snapc, ci->i_truncate_seq,
1871+
ci->i_truncate_size, false);
1872+
if (IS_ERR(req)) {
1873+
ret = PTR_ERR(req);
1874+
ceph_release_page_vector(pages, num_pages);
1875+
break;
1876+
}
16541877

1878+
dout("sync_write write op %lld~%llu\n", write_pos, write_len);
1879+
osd_req_op_extent_osd_data_pages(req, rmw ? 1 : 0, pages, write_len,
1880+
offset_in_page(write_pos), false,
1881+
true);
1882+
req->r_inode = inode;
16551883
req->r_mtime = mtime;
1656-
ceph_osdc_start_request(&fsc->client->osdc, req);
1657-
ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
1884+
1885+
/* Set up the assertion */
1886+
if (rmw) {
1887+
/*
1888+
* Set up the assertion. If we don't have a version
1889+
* number, then the object doesn't exist yet. Use an
1890+
* exclusive create instead of a version assertion in
1891+
* that case.
1892+
*/
1893+
if (assert_ver) {
1894+
osd_req_op_init(req, 0, CEPH_OSD_OP_ASSERT_VER, 0);
1895+
req->r_ops[0].assert_ver.ver = assert_ver;
1896+
} else {
1897+
osd_req_op_init(req, 0, CEPH_OSD_OP_CREATE,
1898+
CEPH_OSD_OP_FLAG_EXCL);
1899+
}
1900+
}
1901+
1902+
ceph_osdc_start_request(osdc, req);
1903+
ret = ceph_osdc_wait_request(osdc, req);
16581904

16591905
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
16601906
req->r_end_latency, len, ret);
1661-
out:
16621907
ceph_osdc_put_request(req);
16631908
if (ret != 0) {
1909+
dout("sync_write osd write returned %d\n", ret);
1910+
/* Version changed! Must re-do the rmw cycle */
1911+
if ((assert_ver && (ret == -ERANGE || ret == -EOVERFLOW)) ||
1912+
(!assert_ver && ret == -EEXIST)) {
1913+
/* We should only ever see this on a rmw */
1914+
WARN_ON_ONCE(!rmw);
1915+
1916+
/* The version should never go backward */
1917+
WARN_ON_ONCE(ret == -EOVERFLOW);
1918+
1919+
*from = saved_iter;
1920+
1921+
/* FIXME: limit number of times we loop? */
1922+
continue;
1923+
}
16641924
ceph_set_error_write(ci);
16651925
break;
16661926
}
16671927

16681928
ceph_clear_error_write(ci);
16691929
pos += len;
16701930
written += len;
1931+
dout("sync_write written %d\n", written);
16711932
if (pos > i_size_read(inode)) {
16721933
check_caps = ceph_inode_set_size(inode, pos);
16731934
if (check_caps)
@@ -1681,6 +1942,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
16811942
ret = written;
16821943
iocb->ki_pos = pos;
16831944
}
1945+
dout("sync_write returning %d\n", ret);
16841946
return ret;
16851947
}
16861948

0 commit comments

Comments
 (0)