33
33
34
34
static const struct inode_operations ceph_symlink_iops ;
35
35
36
- static void ceph_invalidate_work (struct work_struct * work );
37
- static void ceph_writeback_work (struct work_struct * work );
38
- static void ceph_vmtruncate_work (struct work_struct * work );
36
+ static void ceph_inode_work (struct work_struct * work );
39
37
40
38
/*
41
39
* find or create an inode, given the ceph ino number
@@ -509,10 +507,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
509
507
INIT_LIST_HEAD (& ci -> i_snap_realm_item );
510
508
INIT_LIST_HEAD (& ci -> i_snap_flush_item );
511
509
512
- INIT_WORK (& ci -> i_wb_work , ceph_writeback_work );
513
- INIT_WORK (& ci -> i_pg_inv_work , ceph_invalidate_work );
514
-
515
- INIT_WORK (& ci -> i_vmtruncate_work , ceph_vmtruncate_work );
510
+ INIT_WORK (& ci -> i_work , ceph_inode_work );
511
+ ci -> i_work_mask = 0 ;
516
512
517
513
ceph_fscache_inode_init (ci );
518
514
@@ -1480,7 +1476,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
1480
1476
pr_err ("fill_inode badness on %p got %d\n" , in , rc );
1481
1477
err = rc ;
1482
1478
}
1483
- iput (in );
1479
+ /* avoid calling iput_final() in mds dispatch threads */
1480
+ ceph_async_iput (in );
1484
1481
}
1485
1482
1486
1483
return err ;
@@ -1678,8 +1675,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1678
1675
& req -> r_caps_reservation );
1679
1676
if (ret < 0 ) {
1680
1677
pr_err ("fill_inode badness on %p\n" , in );
1681
- if (d_really_is_negative (dn ))
1682
- iput (in );
1678
+ if (d_really_is_negative (dn )) {
1679
+ /* avoid calling iput_final() in mds
1680
+ * dispatch threads */
1681
+ ceph_async_iput (in );
1682
+ }
1683
1683
d_drop (dn );
1684
1684
err = ret ;
1685
1685
goto next_item ;
@@ -1689,7 +1689,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
1689
1689
if (ceph_security_xattr_deadlock (in )) {
1690
1690
dout (" skip splicing dn %p to inode %p"
1691
1691
" (security xattr deadlock)\n" , dn , in );
1692
- iput (in );
1692
+ ceph_async_iput (in );
1693
1693
skipped ++ ;
1694
1694
goto next_item ;
1695
1695
}
@@ -1740,57 +1740,87 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
1740
1740
return ret ;
1741
1741
}
1742
1742
1743
+ /*
1744
+ * Put reference to inode, but avoid calling iput_final() in current thread.
1745
+ * iput_final() may wait for reahahead pages. The wait can cause deadlock in
1746
+ * some contexts.
1747
+ */
1748
+ void ceph_async_iput (struct inode * inode )
1749
+ {
1750
+ if (!inode )
1751
+ return ;
1752
+ for (;;) {
1753
+ if (atomic_add_unless (& inode -> i_count , -1 , 1 ))
1754
+ break ;
1755
+ if (queue_work (ceph_inode_to_client (inode )-> inode_wq ,
1756
+ & ceph_inode (inode )-> i_work ))
1757
+ break ;
1758
+ /* queue work failed, i_count must be at least 2 */
1759
+ }
1760
+ }
1761
+
1743
1762
/*
1744
1763
* Write back inode data in a worker thread. (This can't be done
1745
1764
* in the message handler context.)
1746
1765
*/
1747
1766
void ceph_queue_writeback (struct inode * inode )
1748
1767
{
1768
+ struct ceph_inode_info * ci = ceph_inode (inode );
1769
+ set_bit (CEPH_I_WORK_WRITEBACK , & ci -> i_work_mask );
1770
+
1749
1771
ihold (inode );
1750
- if (queue_work (ceph_inode_to_client (inode )-> wb_wq ,
1751
- & ceph_inode ( inode ) -> i_wb_work )) {
1772
+ if (queue_work (ceph_inode_to_client (inode )-> inode_wq ,
1773
+ & ci -> i_work )) {
1752
1774
dout ("ceph_queue_writeback %p\n" , inode );
1753
1775
} else {
1754
- dout ("ceph_queue_writeback %p failed\n" , inode );
1776
+ dout ("ceph_queue_writeback %p already queued, mask=%lx\n" ,
1777
+ inode , ci -> i_work_mask );
1755
1778
iput (inode );
1756
1779
}
1757
1780
}
1758
1781
1759
- static void ceph_writeback_work (struct work_struct * work )
1760
- {
1761
- struct ceph_inode_info * ci = container_of (work , struct ceph_inode_info ,
1762
- i_wb_work );
1763
- struct inode * inode = & ci -> vfs_inode ;
1764
-
1765
- dout ("writeback %p\n" , inode );
1766
- filemap_fdatawrite (& inode -> i_data );
1767
- iput (inode );
1768
- }
1769
-
1770
1782
/*
1771
1783
* queue an async invalidation
1772
1784
*/
1773
1785
void ceph_queue_invalidate (struct inode * inode )
1774
1786
{
1787
+ struct ceph_inode_info * ci = ceph_inode (inode );
1788
+ set_bit (CEPH_I_WORK_INVALIDATE_PAGES , & ci -> i_work_mask );
1789
+
1775
1790
ihold (inode );
1776
- if (queue_work (ceph_inode_to_client (inode )-> pg_inv_wq ,
1777
- & ceph_inode (inode )-> i_pg_inv_work )) {
1791
+ if (queue_work (ceph_inode_to_client (inode )-> inode_wq ,
1792
+ & ceph_inode (inode )-> i_work )) {
1778
1793
dout ("ceph_queue_invalidate %p\n" , inode );
1779
1794
} else {
1780
- dout ("ceph_queue_invalidate %p failed\n" , inode );
1795
+ dout ("ceph_queue_invalidate %p already queued, mask=%lx\n" ,
1796
+ inode , ci -> i_work_mask );
1781
1797
iput (inode );
1782
1798
}
1783
1799
}
1784
1800
1785
1801
/*
1786
- * Invalidate inode pages in a worker thread. (This can't be done
1787
- * in the message handler context.)
1802
+ * Queue an async vmtruncate. If we fail to queue work, we will handle
1803
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
1788
1804
*/
1789
- static void ceph_invalidate_work (struct work_struct * work )
1805
+ void ceph_queue_vmtruncate (struct inode * inode )
1790
1806
{
1791
- struct ceph_inode_info * ci = container_of (work , struct ceph_inode_info ,
1792
- i_pg_inv_work );
1793
- struct inode * inode = & ci -> vfs_inode ;
1807
+ struct ceph_inode_info * ci = ceph_inode (inode );
1808
+ set_bit (CEPH_I_WORK_VMTRUNCATE , & ci -> i_work_mask );
1809
+
1810
+ ihold (inode );
1811
+ if (queue_work (ceph_inode_to_client (inode )-> inode_wq ,
1812
+ & ci -> i_work )) {
1813
+ dout ("ceph_queue_vmtruncate %p\n" , inode );
1814
+ } else {
1815
+ dout ("ceph_queue_vmtruncate %p already queued, mask=%lx\n" ,
1816
+ inode , ci -> i_work_mask );
1817
+ iput (inode );
1818
+ }
1819
+ }
1820
+
1821
+ static void ceph_do_invalidate_pages (struct inode * inode )
1822
+ {
1823
+ struct ceph_inode_info * ci = ceph_inode (inode );
1794
1824
struct ceph_fs_client * fsc = ceph_inode_to_client (inode );
1795
1825
u32 orig_gen ;
1796
1826
int check = 0 ;
@@ -1842,44 +1872,6 @@ static void ceph_invalidate_work(struct work_struct *work)
1842
1872
out :
1843
1873
if (check )
1844
1874
ceph_check_caps (ci , 0 , NULL );
1845
- iput (inode );
1846
- }
1847
-
1848
-
1849
- /*
1850
- * called by trunc_wq;
1851
- *
1852
- * We also truncate in a separate thread as well.
1853
- */
1854
- static void ceph_vmtruncate_work (struct work_struct * work )
1855
- {
1856
- struct ceph_inode_info * ci = container_of (work , struct ceph_inode_info ,
1857
- i_vmtruncate_work );
1858
- struct inode * inode = & ci -> vfs_inode ;
1859
-
1860
- dout ("vmtruncate_work %p\n" , inode );
1861
- __ceph_do_pending_vmtruncate (inode );
1862
- iput (inode );
1863
- }
1864
-
1865
- /*
1866
- * Queue an async vmtruncate. If we fail to queue work, we will handle
1867
- * the truncation the next time we call __ceph_do_pending_vmtruncate.
1868
- */
1869
- void ceph_queue_vmtruncate (struct inode * inode )
1870
- {
1871
- struct ceph_inode_info * ci = ceph_inode (inode );
1872
-
1873
- ihold (inode );
1874
-
1875
- if (queue_work (ceph_sb_to_client (inode -> i_sb )-> trunc_wq ,
1876
- & ci -> i_vmtruncate_work )) {
1877
- dout ("ceph_queue_vmtruncate %p\n" , inode );
1878
- } else {
1879
- dout ("ceph_queue_vmtruncate %p failed, pending=%d\n" ,
1880
- inode , ci -> i_truncate_pending );
1881
- iput (inode );
1882
- }
1883
1875
}
1884
1876
1885
1877
/*
@@ -1943,6 +1935,25 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
1943
1935
wake_up_all (& ci -> i_cap_wq );
1944
1936
}
1945
1937
1938
+ static void ceph_inode_work (struct work_struct * work )
1939
+ {
1940
+ struct ceph_inode_info * ci = container_of (work , struct ceph_inode_info ,
1941
+ i_work );
1942
+ struct inode * inode = & ci -> vfs_inode ;
1943
+
1944
+ if (test_and_clear_bit (CEPH_I_WORK_WRITEBACK , & ci -> i_work_mask )) {
1945
+ dout ("writeback %p\n" , inode );
1946
+ filemap_fdatawrite (& inode -> i_data );
1947
+ }
1948
+ if (test_and_clear_bit (CEPH_I_WORK_INVALIDATE_PAGES , & ci -> i_work_mask ))
1949
+ ceph_do_invalidate_pages (inode );
1950
+
1951
+ if (test_and_clear_bit (CEPH_I_WORK_VMTRUNCATE , & ci -> i_work_mask ))
1952
+ __ceph_do_pending_vmtruncate (inode );
1953
+
1954
+ iput (inode );
1955
+ }
1956
+
1946
1957
/*
1947
1958
* symlinks
1948
1959
*/
0 commit comments