|
16 | 16 | #include "common/ceph_mutex.h" |
17 | 17 |
|
18 | 18 |
|
19 | | -// Forward declaration |
20 | | -class BucketIndexAioManager; |
21 | | -/* |
22 | | - * Bucket index AIO request argument, this is used to pass a argument |
23 | | - * to callback. |
24 | | - */ |
25 | | -struct BucketIndexAioArg : public RefCountedObject { |
26 | | - BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) : |
27 | | - id(_id), manager(_manager) {} |
28 | | - int id; |
29 | | - BucketIndexAioManager* manager; |
30 | | -}; |
31 | | - |
32 | | -/* |
33 | | - * This class manages AIO completions. This class is not completely |
34 | | - * thread-safe, methods like *get_next_request_id* is not thread-safe |
35 | | - * and is expected to be called from within one thread. |
36 | | - */ |
37 | | -class BucketIndexAioManager { |
38 | | -public: |
39 | | - |
40 | | - // allows us to reaccess the shard id and shard's oid during and |
41 | | - // after the asynchronous call is made |
42 | | - struct RequestObj { |
43 | | - int shard_id; |
44 | | - std::string oid; |
45 | | - |
46 | | - RequestObj(int _shard_id, const std::string& _oid) : |
47 | | - shard_id(_shard_id), oid(_oid) |
48 | | - {/* empty */} |
49 | | - }; |
50 | | - |
51 | | - |
52 | | -private: |
53 | | - // NB: the following 4 maps use the request_id as the key; this |
54 | | - // is not the same as the shard_id! |
55 | | - std::map<int, librados::AioCompletion*> pendings; |
56 | | - std::map<int, librados::AioCompletion*> completions; |
57 | | - std::map<int, const RequestObj> pending_objs; |
58 | | - std::map<int, const RequestObj> completion_objs; |
59 | | - |
60 | | - int next = 0; |
61 | | - ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock"); |
62 | | - ceph::condition_variable cond; |
63 | | - /* |
64 | | - * Callback implementation for AIO request. |
65 | | - */ |
66 | | - static void bucket_index_op_completion_cb(void* cb, void* arg) { |
67 | | - BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg; |
68 | | - cb_arg->manager->do_completion(cb_arg->id); |
69 | | - cb_arg->put(); |
70 | | - } |
71 | | - |
72 | | - /* |
73 | | - * Get next request ID. This method is not thread-safe. |
74 | | - * |
75 | | - * Return next request ID. |
76 | | - */ |
77 | | - int get_next_request_id() { return next++; } |
78 | | - |
79 | | - /* |
80 | | - * Add a new pending AIO completion instance. |
81 | | - * |
82 | | - * @param id - the request ID. |
83 | | - * @param completion - the AIO completion instance. |
84 | | - * @param oid - the object id associated with the object, if it is NULL, we don't |
85 | | - * track the object id per callback. |
86 | | - */ |
87 | | - void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) { |
88 | | - pendings[request_id] = completion; |
89 | | - pending_objs.emplace(request_id, RequestObj(shard_id, oid)); |
90 | | - } |
91 | | - |
92 | | -public: |
93 | | - /* |
94 | | - * Create a new instance. |
95 | | - */ |
96 | | - BucketIndexAioManager() = default; |
97 | | - |
98 | | - /* |
99 | | - * Do completion for the given AIO request. |
100 | | - */ |
101 | | - void do_completion(int request_id); |
102 | | - |
103 | | - /* |
104 | | - * Wait for AIO completions. |
105 | | - * |
106 | | - * valid_ret_code - valid AIO return code. |
107 | | - * num_completions - number of completions. |
108 | | - * ret_code - return code of failed AIO. |
109 | | - * objs - a std::list of objects that has been finished the AIO. |
110 | | - * |
111 | | - * Return false if there is no pending AIO, true otherwise. |
112 | | - */ |
113 | | - bool wait_for_completions(int valid_ret_code, |
114 | | - int *num_completions = nullptr, |
115 | | - int *ret_code = nullptr, |
116 | | - std::map<int, std::string> *completed_objs = nullptr, |
117 | | - std::map<int, std::string> *retry_objs = nullptr); |
118 | | - |
119 | | - /** |
120 | | - * Do aio read operation. |
121 | | - */ |
122 | | - bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) { |
123 | | - std::lock_guard l{lock}; |
124 | | - const int request_id = get_next_request_id(); |
125 | | - BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this); |
126 | | - librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb); |
127 | | - int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL); |
128 | | - if (r >= 0) { |
129 | | - add_pending(arg->id, c, shard_id, oid); |
130 | | - } else { |
131 | | - arg->put(); |
132 | | - c->release(); |
133 | | - } |
134 | | - return r; |
135 | | - } |
136 | | - |
137 | | - /** |
138 | | - * Do aio write operation. |
139 | | - */ |
140 | | - bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) { |
141 | | - std::lock_guard l{lock}; |
142 | | - const int request_id = get_next_request_id(); |
143 | | - BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this); |
144 | | - librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb); |
145 | | - int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op); |
146 | | - if (r >= 0) { |
147 | | - add_pending(arg->id, c, shard_id, oid); |
148 | | - } else { |
149 | | - arg->put(); |
150 | | - c->release(); |
151 | | - } |
152 | | - return r; |
153 | | - } |
154 | | -}; |
155 | | - |
156 | 19 | class RGWGetDirHeader_CB : public boost::intrusive_ref_counter<RGWGetDirHeader_CB> { |
157 | 20 | public: |
158 | 21 | virtual ~RGWGetDirHeader_CB() {} |
|
0 commit comments