Skip to content

Commit 3b25951

Browse files
committed
move timestamps to dedicated structure to be more organized
1 parent 00eb637 commit 3b25951

File tree

1 file changed

+61
-42
lines changed

1 file changed

+61
-42
lines changed

src/fread.c

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,12 @@ static void init_const_literals(void)
103103
typedef struct FieldParseContext {
104104
// Pointer to the current parsing location
105105
const char **ch;
106+
106107
// Parse target buffers, indexed by size. A parser that reads values of byte
107108
// size `sz` will attempt to write that value into `targets[sz]`. Thus,
108109
// generally this is an array with elements 0, 1, 4, and 8 defined, while all
109110
// other pointers are NULL.
111+
110112
void **targets;
111113
// String "anchor" for `Field()` parser -- the difference `ch - anchor` will
112114
// be written out as the string offset.
@@ -334,7 +336,7 @@ static inline int countfields(const char **pch)
334336
{
335337
static lenOff trash; // see comment on other trash declarations
336338
static void *targets[9];
337-
targets[8] = (void*) &trash;
339+
targets[8] = (void*)&trash;
338340
const char *ch = *pch;
339341
if (sep == ' ') while (*ch == ' ') ch++; // multiple sep==' ' at the start does not mean sep
340342
skip_white(&ch);
@@ -637,7 +639,7 @@ static void str_to_i32_core(const char **pch, int32_t *target, bool parse_date)
637639

638640
static void StrtoI32(FieldParseContext *ctx)
639641
{
640-
str_to_i32_core(ctx->ch, (int32_t*) ctx->targets[sizeof(int32_t)], false);
642+
str_to_i32_core(ctx->ch, (int32_t*)ctx->targets[sizeof(int32_t)], false);
641643
}
642644

643645
static void StrtoI64(FieldParseContext *ctx)
@@ -826,7 +828,7 @@ static void parse_double_regular_core(const char **pch, double *target)
826828

827829
static void parse_double_regular(FieldParseContext *ctx)
828830
{
829-
parse_double_regular_core(ctx->ch, (double*) ctx->targets[sizeof(double)]);
831+
parse_double_regular_core(ctx->ch, (double*)ctx->targets[sizeof(double)]);
830832
}
831833

832834
/**
@@ -1033,7 +1035,7 @@ static void parse_iso8601_date_core(const char **pch, int32_t *target)
10331035

10341036
static void parse_iso8601_date(FieldParseContext *ctx)
10351037
{
1036-
parse_iso8601_date_core(ctx->ch, (int32_t*) ctx->targets[sizeof(int32_t)]);
1038+
parse_iso8601_date_core(ctx->ch, (int32_t*)ctx->targets[sizeof(int32_t)]);
10371039
}
10381040

10391041
static void parse_iso8601_timestamp(FieldParseContext *ctx)
@@ -1329,8 +1331,22 @@ static int detect_types(const char **pch, int ncol, bool *bumped)
13291331
//=================================================================================================
13301332
int freadMain(freadMainArgs _args)
13311333
{
1334+
struct
1335+
{
1336+
double t0;
1337+
double map; // moment when memory-map step has finished
1338+
double layout; // Timer for assigning column names
1339+
double coltype; // Timer for applying user column class overrides
1340+
double alloc;
1341+
double read;
1342+
double reread;
1343+
double th_read;
1344+
double th_push; // reductions of timings within the parallel region
1345+
double tot;
1346+
} timestamps = { 0 };
1347+
13321348
args = _args; // assign to global for use by DTPRINT() in other functions
1333-
double t0 = wallclock();
1349+
timestamps.t0 = wallclock();
13341350

13351351
//*********************************************************************************************
13361352
// [1] Extract the arguments and check their validity
@@ -1438,7 +1454,7 @@ int freadMain(freadMainArgs _args)
14381454
// [2] Open and memory-map the input file, setting up the parsing context
14391455
// (sof, eof, ch).
14401456
//*********************************************************************************************
1441-
double tMap; // moment when memory-map step has finished
1457+
14421458
{
14431459
if (verbose) DTPRINT(_("[02] Opening the file\n"));
14441460
mmp = NULL;
@@ -1514,13 +1530,13 @@ int freadMain(freadMainArgs _args)
15141530
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, // # nocov
15151531
nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); // # nocov
15161532
}
1517-
sof = (const char*) mmp;
1533+
sof = (const char*)mmp;
15181534
if (verbose) DTPRINT(_(" Memory mapped ok\n"));
15191535
} else {
15201536
INTERNAL_STOP("neither `input` nor `filename` are given, nothing to read"); // # nocov
15211537
}
15221538
eof = sof + fileSize;
1523-
tMap = wallclock();
1539+
timestamps.map = wallclock();
15241540
}
15251541

15261542

@@ -2179,7 +2195,6 @@ int freadMain(freadMainArgs _args)
21792195
// [8] Assign column names
21802196
// Updates pos(ition) to rest after the column names (if any) at the start of the first data row
21812197
//*********************************************************************************************
2182-
double tLayout; // Timer for assigning column names
21832198
const char *colNamesAnchor = pos;
21842199
{
21852200
if (verbose) DTPRINT(_("[08] Assign column names\n"));
@@ -2204,7 +2219,7 @@ int freadMain(freadMainArgs _args)
22042219
// Use Field() here as it handles quotes, leading space etc inside it
22052220
ch++;
22062221
Field(&fctx); // stores the string length and offset as <uint,uint> in colNames[i]
2207-
((lenOff**) fctx.targets)[8]++;
2222+
((lenOff**)fctx.targets)[8]++;
22082223
if (*ch != sep) break;
22092224
if (sep == ' ') {
22102225
while (ch[1] == ' ') ch++;
@@ -2217,13 +2232,12 @@ int freadMain(freadMainArgs _args)
22172232
// now on first data row (row after column names)
22182233
// when fill=TRUE and column names shorter (test 1635.2), leave calloc initialized lenOff.len==0
22192234
}
2220-
tLayout = wallclock();
2235+
timestamps.layout = wallclock();
22212236
}
22222237

22232238
//*********************************************************************************************
22242239
// [9] Apply colClasses, select, drop and integer64
22252240
//*********************************************************************************************
2226-
double tColType; // Timer for applying user column class overrides
22272241
int ndrop; // Number of columns dropped that will be dropped from the file being read
22282242
int nStringCols; // Number of string columns in the file
22292243
int nNonStringCols; // Number of all other columns in the file
@@ -2269,7 +2283,7 @@ int freadMain(freadMainArgs _args)
22692283
if (type[j] == CT_STRING) nStringCols++; else nNonStringCols++;
22702284
}
22712285
if (verbose) DTPRINT(_(" After %d type and %d drop user overrides : %s\n"), nUserBumped, ndrop, typesAsString(ncol));
2272-
tColType = wallclock();
2286+
timestamps.coltype = wallclock();
22732287
}
22742288

22752289
//*********************************************************************************************
@@ -2281,15 +2295,13 @@ int freadMain(freadMainArgs _args)
22812295
ncol - ndrop, ncol, ndrop, allocnrow);
22822296
}
22832297
size_t DTbytes = allocateDT(type, size, ncol, ndrop, allocnrow);
2284-
double tAlloc = wallclock();
2298+
timestamps.alloc = wallclock();
22852299

22862300
//*********************************************************************************************
22872301
// [11] Read the data
22882302
//*********************************************************************************************
22892303
bool stopTeam = false, firstTime = true, restartTeam = false; // bool for MT-safey (cannot ever read half written bool value badly)
22902304
int nTypeBump = 0, nTypeBumpCols = 0;
2291-
double tRead = 0, tReread = 0;
2292-
double thRead = 0, thPush = 0; // reductions of timings within the parallel region
22932305
int max_col = 0;
22942306
char *typeBumpMsg = NULL; size_t typeBumpMsgSize = 0;
22952307
int typeCounts[NUMTYPE]; // used for verbose output; needs populating after first read and before reread (if any) -- see later comment
@@ -2384,8 +2396,10 @@ int freadMain(freadMainArgs _args)
23842396
stopTeam = true;
23852397
}
23862398
prepareThreadContext(&ctx);
2387-
2388-
#pragma omp for ordered schedule(dynamic) reduction(+:thRead,thPush) reduction(max:max_col)
2399+
2400+
double th_read_openmp = timestamps.th_read;//pragma workaround
2401+
double th_push_openmp = timestamps.th_push;
2402+
#pragma omp for ordered schedule(dynamic) reduction(+:th_read_openmp,th_push_openmp) reduction(max:max_col)
23892403
for (int jump = jump0; jump < nJumps; jump++) {
23902404
if (stopTeam) continue; // must continue and not break. We desire not to depend on (relatively new) omp cancel directive, yet
23912405
double tLast = 0.0; // thread local wallclock time at last measuring point for verbose mode only.
@@ -2406,13 +2420,13 @@ int freadMain(freadMainArgs _args)
24062420
myNrow = 0;
24072421
if (verbose || myShowProgress) {
24082422
double now = wallclock();
2409-
thPush += now - tLast;
2423+
timestamps.th_push += now - tLast;
24102424
tLast = now;
24112425
if (myShowProgress && /*wait for all threads to process 2 jumps*/jump >= nth * 2) {
24122426
// Important for thread safety inside progress() that this is called not just from critical but that
24132427
// it's the master thread too, hence me==0. OpenMP doesn't allow '#pragma omp master' here, but we
24142428
// did check above that master's me==0.
2415-
int ETA = (int)(((now - tAlloc) / jump) * (nJumps - jump));
2429+
int ETA = (int)(((now - timestamps.alloc) / jump) * (nJumps - jump));
24162430
progress((int)(100.0 * jump / nJumps), ETA);
24172431
}
24182432
}
@@ -2462,7 +2476,7 @@ int freadMain(freadMainArgs _args)
24622476
fun[IGNORE_BUMP(thisType)](&fctx);
24632477
if (*tch != sep) break;
24642478
int8_t thisSize = size[j];
2465-
if (thisSize) ((char**) targets)[thisSize] += thisSize; // 'if' for when rereading to avoid undefined NULL+0
2479+
if (thisSize) ((char**)targets)[thisSize] += thisSize; // 'if' for when rereading to avoid undefined NULL+0
24662480
tch++;
24672481
j++;
24682482
}
@@ -2476,7 +2490,7 @@ int freadMain(freadMainArgs _args)
24762490
}
24772491
else if (eol(&tch) && j < ncol) { // j<ncol needed for #2523 (erroneous extra comma after last field)
24782492
int8_t thisSize = size[j];
2479-
if (thisSize) ((char**) targets)[thisSize] += thisSize;
2493+
if (thisSize) ((char**)targets)[thisSize] += thisSize;
24802494
j++;
24812495
if (j > max_col) max_col = j;
24822496
if (j == ncol) { tch++; myNrow++; continue; } // next line. Back up to while (tch<nextJumpStart). Usually happens, fastest path
@@ -2593,7 +2607,7 @@ int freadMain(freadMainArgs _args)
25932607
}
25942608
}
25952609
int8_t thisSize = size[j];
2596-
if (thisSize) ((char**) targets)[size[j]] += size[j]; // 'if' to avoid undefined NULL+=0 when rereading
2610+
if (thisSize) ((char**)targets)[size[j]] += size[j]; // 'if' to avoid undefined NULL+=0 when rereading
25972611
j++;
25982612
if (*tch == sep) { tch++; continue; }
25992613
if (fill && (*tch == '\n' || *tch == '\r' || tch == eof) && j < ncol) continue; // reuse processors to write appropriate NA to target; saves maintenance of a type switch down here
@@ -2609,7 +2623,7 @@ int freadMain(freadMainArgs _args)
26092623
if (tch != eof) tch++;
26102624
myNrow++;
26112625
}
2612-
if (verbose) { double now = wallclock(); thRead += now - tLast; tLast = now; }
2626+
if (verbose) { double now = wallclock(); timestamps.th_read += now - tLast; tLast = now; }
26132627
ctx.anchor = thisJumpStart;
26142628
ctx.nRows = myNrow;
26152629
postprocessBuffer(&ctx);
@@ -2672,14 +2686,18 @@ int freadMain(freadMainArgs _args)
26722686
// Next thread can now start her ordered section and write her results to the final DT at the same time as me.
26732687
// Ordered has to be last in some OpenMP implementations currently. Logically though, pushBuffer happens now.
26742688
}
2689+
2690+
timestamps.th_read = th_read_openmp;//pragma workaround
2691+
timestamps.th_push = th_push_openmp;
2692+
26752693
// End for loop over all jump points
26762694

26772695
// Push out all buffers one last time (only needed because of gomp ordered workaround above with push first in the loop)
26782696
// If stopped early, this will happen once for thread at headPos (the only one left with myNrow>0)
26792697
if (myNrow) {
26802698
double now = verbose ? wallclock() : 0;
26812699
pushBuffer(&ctx);
2682-
if (verbose) thPush += wallclock() - now;
2700+
if (verbose) timestamps.th_push += wallclock() - now;
26832701
}
26842702
// Each thread to free their own buffer.
26852703
free(ctx.buff8); ctx.buff8 = NULL;
@@ -2737,7 +2755,7 @@ int freadMain(freadMainArgs _args)
27372755
if (args.showProgress) progress(100, 0);
27382756

27392757
if (firstTime) {
2740-
tReread = tRead = wallclock();
2758+
timestamps.reread = timestamps.read = wallclock();
27412759

27422760
// if nTypeBump>0, not-bumped columns are about to be assigned parse type TOGGLE_BUMP(CT_STRING) for the reread, so we have to count
27432761
// parse types now (for log). We can't count final column types afterwards because many parse types map to the same column type.
@@ -2776,14 +2794,15 @@ int freadMain(freadMainArgs _args)
27762794
continue;
27772795
}
27782796
} else {
2779-
tReread = wallclock();
2797+
timestamps.reread = wallclock();
27802798
}
27812799

27822800
break;
27832801
}
2784-
double tTot = tReread - t0; // tReread==tRead when there was no reread
2802+
2803+
timestamps.tot = timestamps.reread - timestamps.t0; // timestamps.reread == timestamps.read when there was no reread
27852804
if (verbose) DTPRINT(_("Read %"PRIu64" rows x %d columns from %s file in %02d:%06.3f wall clock time\n"),
2786-
(uint64_t)DTi, ncol - ndrop, filesize_to_str(fileSize), (int)tTot / 60, fmod(tTot, 60.0));
2805+
(uint64_t)DTi, ncol - ndrop, filesize_to_str(fileSize), (int)timestamps.tot / 60, fmod(timestamps.tot, 60.0));
27872806

27882807
//*********************************************************************************************
27892808
// [12] Finalize the datatable
@@ -2829,25 +2848,25 @@ int freadMain(freadMainArgs _args)
28292848

28302849
if (verbose) {
28312850
DTPRINT("=============================\n"); // # notranslate
2832-
if (tTot < 0.000001) tTot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s
2833-
DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), tMap - t0, 100.0 * (tMap - t0) / tTot, 1.0 * fileSize / (1024 * 1024 * 1024));
2834-
DTPRINT(_("%8.3fs (%3.0f%%) sep="), tLayout - tMap, 100.0 * (tLayout - tMap) / tTot);
2851+
if (timestamps.tot < 0.000001) timestamps.tot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s
2852+
DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), timestamps.map - timestamps.t0, 100.0 * (timestamps.map - timestamps.t0) / timestamps.tot, 1.0 * fileSize / (1024 * 1024 * 1024));
2853+
DTPRINT(_("%8.3fs (%3.0f%%) sep="), timestamps.layout - timestamps.map, 100.0 * (timestamps.layout - timestamps.map) / timestamps.tot);
28352854
DTPRINT(sep == '\t' ? "'\\t'" : (sep == '\n' ? "'\\n'" : "'%c'"), sep); // # notranslate
28362855
DTPRINT(_(" ncol=%d and header detection\n"), ncol);
28372856
DTPRINT(_("%8.3fs (%3.0f%%) Column type detection using %"PRId64" sample rows\n"),
2838-
tColType - tLayout, 100.0 * (tColType - tLayout) / tTot, sampleLines);
2857+
timestamps.coltype - timestamps.layout, 100.0 * (timestamps.coltype - timestamps.layout) / timestamps.tot, sampleLines);
28392858
DTPRINT(_("%8.3fs (%3.0f%%) Allocation of %"PRId64" rows x %d cols (%.3fGiB) of which %"PRId64" (%3.0f%%) rows used\n"),
2840-
tAlloc - tColType, 100.0 * (tAlloc - tColType) / tTot, allocnrow, ncol, DTbytes / (1024.0 * 1024 * 1024), DTi, 100.0 * DTi / allocnrow);
2841-
thRead /= nth; thPush /= nth;
2842-
double thWaiting = tReread - tAlloc - thRead - thPush;
2859+
timestamps.alloc - timestamps.coltype, 100.0 * (timestamps.alloc - timestamps.coltype) / timestamps.tot, allocnrow, ncol, DTbytes / (1024.0 * 1024 * 1024), DTi, 100.0 * DTi / allocnrow);
2860+
timestamps.th_read /= nth; timestamps.th_push /= nth;
2861+
double thWaiting = timestamps.reread - timestamps.alloc - timestamps.th_read - timestamps.th_push;
28432862
DTPRINT(_("%8.3fs (%3.0f%%) Reading %d chunks (%d swept) of %.3fMiB (each chunk %"PRId64" rows) using %d threads\n"),
2844-
tReread - tAlloc, 100.0 * (tReread - tAlloc) / tTot, nJumps, nSwept, (double)chunkBytes / (1024 * 1024), DTi / nJumps, nth);
2845-
DTPRINT(_(" + %8.3fs (%3.0f%%) Parse to row-major thread buffers (grown %d times)\n"), thRead, 100.0 * thRead / tTot, buffGrown);
2846-
DTPRINT(_(" + %8.3fs (%3.0f%%) Transpose\n"), thPush, 100.0 * thPush / tTot);
2847-
DTPRINT(_(" + %8.3fs (%3.0f%%) Waiting\n"), thWaiting, 100.0 * thWaiting / tTot);
2863+
timestamps.reread - timestamps.alloc, 100.0 * (timestamps.reread - timestamps.alloc) / timestamps.tot, nJumps, nSwept, (double)chunkBytes / (1024 * 1024), DTi / nJumps, nth);
2864+
DTPRINT(_(" + %8.3fs (%3.0f%%) Parse to row-major thread buffers (grown %d times)\n"), timestamps.th_read, 100.0 * timestamps.th_read / timestamps.tot, buffGrown);
2865+
DTPRINT(_(" + %8.3fs (%3.0f%%) Transpose\n"), timestamps.th_push, 100.0 * timestamps.th_push / timestamps.tot);
2866+
DTPRINT(_(" + %8.3fs (%3.0f%%) Waiting\n"), thWaiting, 100.0 * thWaiting / timestamps.tot);
28482867
DTPRINT(_("%8.3fs (%3.0f%%) Rereading %d columns due to out-of-sample type exceptions\n"),
2849-
tReread - tRead, 100.0 * (tReread - tRead) / tTot, nTypeBumpCols);
2850-
DTPRINT(_("%8.3fs Total\n"), tTot);
2868+
timestamps.reread - timestamps.read, 100.0 * (timestamps.reread - timestamps.read) / timestamps.tot, nTypeBumpCols);
2869+
DTPRINT(_("%8.3fs Total\n"), timestamps.tot);
28512870
if (typeBumpMsg) {
28522871
// if type bumps happened, it's useful to see them at the end after the timing 2 lines up showing the reread time
28532872
// TODO - construct and output the copy and pastable colClasses argument so user can avoid the reread time if they are

0 commit comments

Comments
 (0)