11#include < algorithm>
22#include < cassert>
3+ #include < fcntl.h>
4+ #include < string_view>
5+ #include < sys/mman.h>
6+ #include < sys/stat.h>
37#include < thread>
48#include < condition_variable>
59#include < cstdint>
@@ -59,6 +63,30 @@ struct Op {
5963
6064};
6165
66+ struct ParserContext {
67+ map<string, shared_ptr<string>> string_cache;
68+ vector<Op> ops;
69+ char *start; // starts and ends in new line or eof
70+ char *end;
71+ uint64_t max_buffer_size;
72+ };
73+
74+ class MemoryStreamBuf : public std ::streambuf {
75+ public:
76+ MemoryStreamBuf (const char * start, const char * end) {
77+ this ->setg (const_cast <char *>(start), const_cast <char *>(start), const_cast <char *>(end));
78+ }
79+ };
80+
81+ class MemoryInputStream : public std ::istream {
82+ MemoryStreamBuf _buffer;
83+ public:
84+ MemoryInputStream (const char * start, const char * end)
85+ : std::istream(&_buffer), _buffer(start, end) {
86+ rdbuf (&_buffer);
87+ }
88+ };
89+
6290void gen_buffer (bufferlist& bl, uint64_t size) {
6391 std::unique_ptr<char []> buffer = std::make_unique<char []>(size);
6492 std::independent_bits_engine<std::default_random_engine, CHAR_BIT, unsigned char > e;
@@ -69,7 +97,7 @@ void gen_buffer(bufferlist& bl, uint64_t size) {
6997void completion_cb (librados::completion_t cb, void *arg) {
7098 Op *op = static_cast <Op*>(arg);
7199 // Process the completed operation here
72- std::cout << fmt::format (" Completed op {} object={} range={}~{}" , op->type , *op->object , op->offset , op->length ) << std::endl;
100+ // std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl;
73101
74102 delete op->completion ;
75103 op->completion = nullptr ;
@@ -84,6 +112,91 @@ void completion_cb(librados::completion_t cb, void *arg) {
84112 cv.notify_one ();
85113}
86114
115+ void parse_entry_point (shared_ptr<ParserContext> context) {
116+ string date, time, who, type, range, object, collection;
117+ MemoryInputStream fstream (context->start , context->end );
118+ const char * date_format_first_column = " %Y-%m-%d" ;
119+ // we expect this input:
120+ // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
121+ while (fstream >> date){
122+ // cout << date << endl;
123+ tm t;
124+ char * res = strptime (date.c_str (), date_format_first_column, &t);
125+ if (res == nullptr ) {
126+ fstream.ignore (std::numeric_limits<std::streamsize>::max (), ' \n ' );
127+ continue ;
128+ }
129+ fstream >> time >> who >> type >> range >> object >> collection;
130+
131+ date += " " + time;
132+ // cout << date << endl;
133+ // FIXME: this is wrong but it returns a reasonable bad timestamp :P
134+ const char * date_format_full = " %Y-%m-%d %H:%M:%S.%f%z" ;
135+ res = strptime (date.c_str (), date_format_full, &t);
136+ time_t at = mktime (&t);
137+
138+ // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
139+
140+ shared_ptr<string> who_ptr = make_shared<string>(who);
141+ auto who_it = string_cache.find (who);
142+ if (who_it == string_cache.end ()) {
143+ string_cache.insert ({ who, who_ptr });
144+ } else {
145+ who_ptr = who_it->second ;
146+ }
147+
148+ shared_ptr<string> object_ptr = make_shared<string>(object);
149+ auto object_it = string_cache.find (object);
150+ if (object_it == string_cache.end ()) {
151+ string_cache.insert ({ object, object_ptr });
152+ } else {
153+ object_ptr = object_it->second ;
154+ }
155+
156+ op_type ot;
157+ if (type == " write" ) {
158+ ot = Write;
159+ } else if (type == " writefull" ) {
160+ ot = WriteFull;
161+ } else if (type == " read" ) {
162+ ot = Read;
163+ } else if (type == " sparse-read" ) {
164+ ot = Read;
165+ } else if (type == " truncate" ) {
166+ ot = Truncate;
167+ } else if (type == " zero" ) {
168+ ot = Zero;
169+ } else {
170+ cout << " invalid type " << type << endl;
171+ exit (1 );
172+ }
173+
174+ shared_ptr<string> collection_ptr = make_shared<string>(collection);
175+ auto collection_it = string_cache.find (collection);
176+ if (collection_it == string_cache.end ()) {
177+ string_cache.insert ({ collection, collection_ptr });
178+ } else {
179+ collection_ptr = collection_it->second ;
180+ }
181+
182+ uint64_t offset = 0 , length = 0 ;
183+ stringstream range_stream (range);
184+ string offset_str, length_str;
185+ getline (range_stream, offset_str, ' ~' );
186+ offset = stoll (offset_str);
187+
188+ if (ot != Truncate) {
189+ // Truncate doesn't only has one number
190+ getline (range_stream, length_str, ' ~' );
191+ length = stoll (length_str);
192+ }
193+
194+ context->max_buffer_size = max (length, context->max_buffer_size );
195+
196+ context->ops .push_back (Op (at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
197+ }
198+ }
199+
87200int main (int argc, char ** argv) {
88201 vector<Op> ops;
89202 librados::Rados cluster;
@@ -100,90 +213,49 @@ int main(int argc, char** argv) {
100213 ceph_conf_path = argv[2 ];
101214 cout << file << endl;
102215
103-
104-
105- string date, time, who, type, range, object, collection;
106- ifstream fstream (file, ifstream::in);
107- const char * date_format_first_column = " %Y-%m-%d" ;
108- // we expect this input:
109- // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
110- while (fstream >> date){
111- // cout << date << endl;
112- tm t;
113- char * res = strptime (date.c_str (), date_format_first_column, &t);
114- if (res == nullptr ) {
115- fstream.ignore (std::numeric_limits<std::streamsize>::max (), ' \n ' );
116- continue ;
117- }
118- fstream >> time >> who >> type >> range >> object >> collection;
119-
120- date += " " + time;
121- // cout << date << endl;
122- // FIXME: this is wrong but it returns a reasonable bad timestamp :P
123- const char * date_format_full = " %Y-%m-%d %H:%M:%S.%f%z" ;
124- res = strptime (date.c_str (), date_format_full, &t);
125- time_t at = mktime (&t);
126-
127- // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
128-
129- shared_ptr<string> who_ptr = make_shared<string>(who);
130- auto who_it = string_cache.find (who);
131- if (who_it == string_cache.end ()) {
132- string_cache.insert ({ who, who_ptr });
133- } else {
134- who_ptr = who_it->second ;
135- }
136-
137- shared_ptr<string> object_ptr = make_shared<string>(object);
138- auto object_it = string_cache.find (object);
139- if (object_it == string_cache.end ()) {
140- string_cache.insert ({ object, object_ptr });
141- } else {
142- object_ptr = object_it->second ;
143- }
144-
145- op_type ot;
146- if (type == " write" ) {
147- ot = Write;
148- } else if (type == " writefull" ) {
149- ot = WriteFull;
150- } else if (type == " read" ) {
151- ot = Read;
152- } else if (type == " sparse-read" ) {
153- ot = Read;
154- } else if (type == " truncate" ) {
155- ot = Truncate;
156- } else if (type == " zero" ) {
157- ot = Zero;
158- } else {
159- cout << " invalid type " << type << endl;
160- exit (1 );
161- }
162-
163- shared_ptr<string> collection_ptr = make_shared<string>(collection);
164- auto collection_it = string_cache.find (collection);
165- if (collection_it == string_cache.end ()) {
166- string_cache.insert ({ collection, collection_ptr });
167- } else {
168- collection_ptr = collection_it->second ;
216+ uint64_t nthreads = 16 ;
217+ vector<std::thread> parser_threads;
218+ vector<shared_ptr<ParserContext>> parser_contexts;
219+ int fd = open (file.c_str (), O_RDONLY);
220+ if (fd == -1 ) {
221+ cout << " Error opening file" << endl;
222+ }
223+ struct stat file_stat;
224+ fstat (fd, &file_stat);
225+ char * mapped_buffer = (char *)mmap (NULL , file_stat.st_size , PROT_READ, MAP_SHARED, fd, 0 );
226+ if (mapped_buffer == nullptr ) {
227+ cout << " error mapping buffer" << endl;
228+ }
229+ uint64_t start_offset = 0 ;
230+ uint64_t step_size = file_stat.st_size / nthreads;
231+ for (int i = 0 ; i < nthreads; i++) {
232+ char * end = mapped_buffer + start_offset + step_size;
233+ while (*end != ' \n ' ) {
234+ end--;
169235 }
170-
171- uint64_t offset = 0 , length = 0 ;
172- stringstream range_stream (range);
173- string offset_str, length_str;
174- getline (range_stream, offset_str, ' ~' );
175- offset = stoll (offset_str);
176-
177- if (ot != Truncate) {
178- // Truncate doesn't only has one number
179- getline (range_stream, length_str, ' ~' );
180- length = stoll (length_str);
236+ if (i == nthreads-1 ) {
237+ end = mapped_buffer + file_stat.st_size ;
181238 }
239+ shared_ptr<ParserContext> context = make_shared<ParserContext>();
240+ context->start = mapped_buffer + start_offset;
241+ context->end = end;
242+ context->max_buffer_size = 0 ;
243+ parser_contexts.push_back (context);
244+ parser_threads.push_back (std::thread (parse_entry_point, context));
245+ start_offset += (end - mapped_buffer - start_offset);
246+ }
247+ for (auto & t : parser_threads) {
248+ t.join ();
249+ }
250+ for (auto context : parser_contexts) {
251+ string_cache.insert (context->string_cache .begin (), context->string_cache .end ());
252+ ops.insert (ops.end (), context->ops .begin (), context->ops .end ());
253+ max_buffer_size = max (context->max_buffer_size , max_buffer_size);
254+ context->string_cache .clear ();
255+ context->ops .clear ();
256+ }
182257
183- max_buffer_size = max (length, max_buffer_size);
184258
185- ops.push_back (Op (at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
186- }
187259
188260 int ret = cluster.init2 (" client.admin" , " ceph" , 0 );
189261 if (ret < 0 ) {
0 commit comments