@@ -30,7 +30,10 @@ static std::mutex in_flight_mutex;
3030
3131enum op_type {
3232 Write,
33- Read
33+ WriteFull,
34+ Read,
35+ Truncate,
36+ Zero
3437};
3538
3639struct Op {
@@ -43,17 +46,17 @@ struct Op {
4346 shared_ptr<string> who;
4447 librados::AioCompletion *completion;
4548 bufferlist read_bl;
46-
49+
4750 Op (
4851 time_t at,
4952 op_type type,
5053 uint64_t offset,
5154 uint64_t length,
5255 shared_ptr<string> object,
5356 shared_ptr<string> collection,
54- shared_ptr<string> who
57+ shared_ptr<string> who
5558 ) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr ) {}
56-
59+
5760};
5861
5962void gen_buffer (bufferlist& bl, uint64_t size) {
@@ -67,11 +70,11 @@ void completion_cb(librados::completion_t cb, void *arg) {
6770 Op *op = static_cast <Op*>(arg);
6871 // Process the completed operation here
6972 std::cout << fmt::format (" Completed op {} object={} range={}~{}" , op->type , *op->object , op->offset , op->length ) << std::endl;
70-
73+
7174 delete op->completion ;
7275 op->completion = nullptr ;
7376 if (op->type == Read) {
74- op->read_bl .clear ();
77+ op->read_bl .clear ();
7578 }
7679
7780 {
@@ -89,16 +92,16 @@ int main(int argc, char** argv) {
8992 uint64_t io_depth = 64 ;
9093 string file;
9194 std::filesystem::path ceph_conf_path;
92-
95+
9396 if (argc < 3 ) {
9497 cout << fmt::format (" usage: ops_replayer file ceph.conf" ) << endl;
9598 }
9699 file = argv[1 ];
97100 ceph_conf_path = argv[2 ];
98101 cout << file << endl;
99-
100-
101-
102+
103+
104+
102105 string date, time, who, type, range, object, collection;
103106 ifstream fstream (file, ifstream::in);
104107 const char * date_format_first_column = " %Y-%m-%d" ;
@@ -113,61 +116,82 @@ int main(int argc, char** argv) {
113116 continue ;
114117 }
115118 fstream >> time >> who >> type >> range >> object >> collection;
116-
119+
117120 date += " " + time;
118121 cout << date << endl;
119122 // FIXME: this is wrong but it returns a reasonable bad timestamp :P
120123 const char * date_format_full = " %Y-%m-%d %H:%M:%S.%f%z" ;
121124 res = strptime (date.c_str (), date_format_full, &t);
122125 time_t at = mktime (&t);
123-
126+
124127 cout << fmt::format (" {} {} {} {} {} {} {}" , date, at, who, type, range, object, collection) << endl;
125-
128+
126129 shared_ptr<string> who_ptr = make_shared<string>(who);
127130 auto who_it = string_cache.find (who);
128131 if (who_it == string_cache.end ()) {
129132 string_cache.insert ({ who, who_ptr });
130133 } else {
131134 who_ptr = who_it->second ;
132135 }
133-
136+
134137 shared_ptr<string> object_ptr = make_shared<string>(object);
135138 auto object_it = string_cache.find (object);
136139 if (object_it == string_cache.end ()) {
137140 string_cache.insert ({ object, object_ptr });
138141 } else {
139142 object_ptr = object_it->second ;
140143 }
141-
144+
145+ op_type ot;
146+ if (type == " write" ) {
147+ ot = Write;
148+ } else if (type == " writefull" ) {
149+ ot = WriteFull;
150+ } else if (type == " read" ) {
151+ ot = Read;
152+ } else if (type == " sparse-read" ) {
153+ ot = Read;
154+ } else if (type == " truncate" ) {
155+ ot = Truncate;
156+ } else if (type == " zero" ) {
157+ ot = Zero;
158+ } else {
159+ cout << " invalid type " << type << endl;
160+ exit (1 );
161+ }
162+
142163 shared_ptr<string> collection_ptr = make_shared<string>(collection);
143164 auto collection_it = string_cache.find (collection);
144165 if (collection_it == string_cache.end ()) {
145166 string_cache.insert ({ collection, collection_ptr });
146167 } else {
147168 collection_ptr = collection_it->second ;
148169 }
149-
170+
150171 uint64_t offset = 0 , length = 0 ;
151172 stringstream range_stream (range);
152173 string offset_str, length_str;
153174 getline (range_stream, offset_str, ' ~' );
154- getline (range_stream, length_str, ' ~' );
155175 offset = stoll (offset_str);
156- length = stoll (length_str);
157-
176+
177+ if (ot != Truncate) {
178+ // Truncate doesn't only has one number
179+ getline (range_stream, length_str, ' ~' );
180+ length = stoll (length_str);
181+ }
182+
158183 max_buffer_size = max (length, max_buffer_size);
159-
160- op_type ot = type == " write" ? Write : Read;
184+
161185 ops.push_back (Op (at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
162186 }
163-
187+
164188 int ret = cluster.init2 (" client.admin" , " ceph" , 0 );
165189 if (ret < 0 ) {
166190 std::cerr << " Couldn't init ceph! error " << ret << std::endl;
167191 return EXIT_FAILURE;
168192 }
169193 std::cout << " cluster init ready" << std::endl;
170-
194+
171195 ret = cluster.conf_read_file (ceph_conf_path.c_str ());
172196 if (ret < 0 ) {
173197 std::cerr << " Couldn't read the Ceph configuration file! error " << ret << std::endl;
@@ -186,18 +210,18 @@ int main(int argc, char** argv) {
186210 exit (EXIT_FAILURE);
187211 }
188212 std::cout << " test-pool ready" << std::endl;
189-
190-
213+
214+
191215 // process ops
192216 // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
193217 bufferlist bl;
194218 gen_buffer (bl, max_buffer_size);
195-
219+
196220 for (auto &op : ops) {
197221 {
198222 std::unique_lock<std::mutex> lock (in_flight_mutex);
199223 cv.wait (lock, [&io_depth] { return in_flight_ops < io_depth; });
200-
224+
201225 }
202226 cout << fmt::format (" Running op {} object={} range={}~{}" , op.type , *op.object , op.offset , op.length ) << endl;
203227 op.completion = librados::Rados::aio_create_completion (static_cast <void *>(&op), completion_cb);
@@ -209,6 +233,13 @@ int main(int argc, char** argv) {
209233 }
210234 break ;
211235 }
236+ case WriteFull: {
237+ int ret = io.aio_write_full (*op.object , op.completion , bl);
238+ if (ret != 0 ) {
239+ cout << fmt::format (" Error writing full ecode={}" , ret) << endl;;
240+ }
241+ break ;
242+ }
212243 case Read: {
213244 bufferlist read;
214245 int ret = io.aio_read (*op.object , op.completion , &op.read_bl , op.length , op.offset );
@@ -217,14 +248,32 @@ int main(int argc, char** argv) {
217248 }
218249 break ;
219250 }
251+ case Truncate: {
252+ librados::ObjectWriteOperation write_operation;
253+ write_operation.truncate (op.offset );
254+ int ret = io.aio_operate (*op.object , op.completion , &write_operation);
255+ if (ret != 0 ) {
256+ cout << fmt::format (" Error truncating ecode={}" , ret) << endl;;
257+ }
258+ break ;
259+ }
260+ case Zero: {
261+ librados::ObjectWriteOperation write_operation;
262+ write_operation.zero (op.offset , op.length );
263+ int ret = io.aio_operate (*op.object , op.completion , &write_operation);
264+ if (ret != 0 ) {
265+ cout << fmt::format (" Error zeroing ecode={}" , ret) << endl;;
266+ }
267+ break ;
268+ }
220269 }
221270 in_flight_ops++;
222271 }
223272 while (in_flight_ops > 0 ) {
224273 std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
225274 }
226275 // io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off)
227-
276+
228277 cout << ops.size () << endl;
229278 return 0 ;
230279}
0 commit comments