Skip to content

Commit a9de18f

Browse files
committed
adding spiliting functionality. i.e. spliting an input into equal parts to be processed simultaneously
Signed-off-by: gal salomon <gal.salomon@gmail.com>
1 parent 0e7e69b commit a9de18f

File tree

1 file changed

+152
-21
lines changed

1 file changed

+152
-21
lines changed

example/s3select_scaleup.cpp

Lines changed: 152 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,116 @@ class csv_streamer {
100100
}
101101
};
102102

103-
int splitter()
103+
int csv_splitter(const char* fn,std::vector<std::pair<size_t,size_t>>& ranges)
104104
{
105-
//get single object , split by size , search for \n bounderies
106-
//thread per { split-data-portion --> {while(not-end-of-data-portion){read , process_stream()) }
105+
106+
char row_delim=10;
107+
std::ifstream is(fn, std::ifstream::binary);
108+
//size of file
109+
is.seekg (0, is.end);
110+
uint64_t length = is.tellg();
111+
is.seekg (0, is.beg);
112+
uint16_t num_of_split = getenv("NUM_OF_INST") ? atoi(getenv("NUM_OF_INST")) : 8;//number of cores
113+
//calculate split size
114+
uint64_t split_sz = length / num_of_split;
115+
116+
const uint32_t max_row_size=(split_sz > (num_of_split*1024)) ? 1024 : split_sz/10 ;//should twice as bigger than row max size
117+
char buff[max_row_size];
118+
119+
uint64_t mark=0;
120+
uint64_t prev_mark=0;
121+
int range_no=0;
122+
123+
do
124+
{
125+
126+
is.seekg(mark+(split_sz-max_row_size));//jump to location of next "cut"
127+
is.read(buff,max_row_size); //reading small buff
128+
uint64_t current_pos = is.tellg();
129+
uint64_t actual_read=is.gcount();
130+
131+
char* p=&buff[actual_read-1];
132+
133+
while(*p != row_delim && p != &buff[0])p--;
134+
135+
if(*p != row_delim)
136+
{
137+
printf("row delimiter not found. abort\n");
138+
break;
139+
}
140+
141+
prev_mark = mark;
142+
143+
range_no++;
144+
145+
if(range_no<num_of_split)
146+
{
147+
mark=current_pos - (&buff[actual_read-1] - p);
148+
}
149+
else
150+
{
151+
mark = length;
152+
}
153+
154+
ranges.push_back(std::pair<size_t,size_t>(prev_mark,mark));
155+
printf("%d: range[%ld %ld] %ld\n",range_no,prev_mark,mark,mark-prev_mark);
156+
157+
}while(mark!=length);
158+
107159
return 0;
108160
}
109161

110-
//TODO stream_chunk()
162+
int stream_partof_file(const char* file, csv_streamer *cs,size_t from,size_t to)
163+
{//each part is processed on seperate thread
164+
std::ifstream input_file_stream;
165+
size_t length = to - from;
166+
size_t bytes_been_read = 0;
167+
int status=0;
168+
169+
//open-file
170+
try {
171+
input_file_stream = std::ifstream(file, std::ios::in | std::ios::binary);
172+
input_file_stream.seekg(from);
173+
}
174+
catch( ... )
175+
{
176+
std::cout << "failed to open file " << file << std::endl;
177+
return(-1);
178+
}
179+
180+
//read-chunk
181+
#define BUFFER_SIZE (4*1024*1024)
182+
std::string buff(BUFFER_SIZE,0);
183+
size_t buffer_to_read = BUFFER_SIZE;
184+
while (true)
185+
{
186+
if(buffer_to_read > (length - bytes_been_read) )
187+
{
188+
buffer_to_read = length - bytes_been_read;
189+
}
190+
191+
size_t read_sz = input_file_stream.readsome(buff.data(),buffer_to_read);
192+
bytes_been_read += read_sz;
193+
if(!read_sz || input_file_stream.eof())
194+
{//signaling end of stream
195+
cs->process_stream(0,0,true);
196+
break;
197+
}
198+
199+
status = cs->process_stream(buff.data(),read_sz,false);
200+
if(status<0)
201+
{
202+
std::cout << "failure on execution " << std::endl;
203+
break;
204+
}
205+
206+
if(!read_sz || input_file_stream.eof())
207+
{
208+
break;
209+
}
210+
}
211+
return 0;
212+
}
111213

112214
int stream_file(char* file, csv_streamer *cs)
113215
{//each file processed on seperate thread
@@ -151,23 +253,12 @@ int stream_file(char* file, csv_streamer *cs)
151253
return 0;
152254
}
153255

154-
int start_multiple_execution_flows(std::string q, std::vector<char*> files)
256+
int start_multiple_execution_flows(std::string q, std::vector<csv_streamer*>& all_streamers, std::vector<std::function<int(void)>>& vec_of_fp, shared_queue& sq)
155257
{ //the object-set defines one finite data-set for the query.
156258

157-
shared_queue sq;
158259
boost::thread_group producer_threads, consumer_threads;
159-
std::vector<std::function<int(void)>> vec_of_fp;
160-
std::vector<csv_streamer*> all_streamers;
161260
std::vector<s3select*> s3select_processing_objects;
162261

163-
for(auto f : files)
164-
{
165-
csv_streamer *cs = new csv_streamer(q,&sq);
166-
all_streamers.push_back(cs);
167-
auto thread_func = [f,cs](){return stream_file(f,cs);};
168-
vec_of_fp.push_back( thread_func );
169-
}
170-
171262
for(auto& t : vec_of_fp)
172263
{
173264
//start with query processing
@@ -215,14 +306,15 @@ int start_multiple_execution_flows(std::string q, std::vector<char*> files)
215306

216307
return 0;
217308
}
218-
219-
int main(int argc, char **argv)
309+
int main_for_many_files(int argc, char **argv)
220310
{
221311
if(argc<2) return -1;
222312

223-
char* query=argv[1];
224313
std::string sql_query;
225-
sql_query.assign(query);
314+
sql_query.assign(argv[1]);
315+
shared_queue sq;
316+
std::vector<std::function<int(void)>> vec_of_fp;
317+
std::vector<csv_streamer*> all_streamers;
226318

227319
std::vector<char*> list_of_files;
228320
setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout
@@ -232,8 +324,47 @@ int main(int argc, char **argv)
232324
list_of_files.push_back(argv[i]);
233325
}
234326

235-
start_multiple_execution_flows(sql_query, list_of_files);
327+
for(auto f : list_of_files)
328+
{
329+
csv_streamer *cs = new csv_streamer(sql_query,&sq);
330+
all_streamers.push_back(cs);
331+
auto thread_func = [f,cs](){return stream_file(f,cs);};
332+
vec_of_fp.push_back( thread_func );
333+
}
334+
335+
start_multiple_execution_flows(sql_query,all_streamers,vec_of_fp,sq);
236336

237337
return 0;
238338
}
239339

340+
int main(int argc, char **argv)
341+
{
342+
if(argc<2) return -1;
343+
344+
std::string sql_query;
345+
sql_query.assign(argv[1]);
346+
shared_queue sq;
347+
std::vector<std::function<int(void)>> vec_of_fp;
348+
std::vector<csv_streamer*> all_streamers;
349+
350+
std::string fn;
351+
fn.assign(argv[2]);
352+
353+
setvbuf(stdout, NULL, _IONBF, 0);//unbuffered stdout
354+
355+
//spiliting single input file into ranges
356+
std::vector<std::pair<size_t,size_t>> ranges;
357+
csv_splitter(fn.data(),ranges);
358+
359+
for(auto r : ranges)
360+
{
361+
csv_streamer *cs = new csv_streamer(sql_query,&sq);
362+
all_streamers.push_back(cs);
363+
auto thread_func = [fn,r,cs](){return stream_partof_file(fn.data(), cs, r.first, r.second);};
364+
vec_of_fp.push_back( thread_func );
365+
}
366+
367+
start_multiple_execution_flows(sql_query,all_streamers,vec_of_fp,sq);
368+
369+
return 0;
370+
}

0 commit comments

Comments
 (0)