|
| 1 | +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. |
| 2 | +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. |
| 3 | +// All rights not expressly granted are reserved. |
| 4 | +// |
| 5 | +// This software is distributed under the terms of the GNU General Public |
| 6 | +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". |
| 7 | +// |
| 8 | +// In applying this license CERN does not waive the privileges and immunities |
| 9 | +// granted to it by virtue of its status as an Intergovernmental Organization |
| 10 | +// or submit itself to any jurisdiction. |
| 11 | + |
| 12 | + |
| 13 | + |
| 14 | +// This utility does timeframe building on a set of RAW data files and output a single merged RAW data file |
| 15 | +// Input files must have a certain level of synchronization: same timeframes in all files, in same order. |
| 16 | + |
| 17 | + |
| 18 | +#include <lz4.h> |
| 19 | +#include <stdio.h> |
| 20 | +#include <string> |
| 21 | +#include <inttypes.h> |
| 22 | + |
| 23 | +#include "DataBlock.h" |
| 24 | +#include "DataBlockContainer.h" |
| 25 | +#include "DataSet.h" |
| 26 | +#include "RdhUtils.h" |
| 27 | +#include "CounterStats.h" |
| 28 | + |
| 29 | +#include <filesystem> |
| 30 | +#include <string.h> |
| 31 | + |
| 32 | + |
| 33 | +struct RawFileDescriptor { |
| 34 | + std::string path; // path to file |
| 35 | + FILE *fp = NULL; // file descriptor |
| 36 | + void *buffer = NULL; // memory buffer to read file chunk by chunk |
| 37 | + uint64_t bufferUsed; // amount of buffer in use (filled with data from file) |
| 38 | + uint64_t bufferProcessed; // number of bytes in buffer already processed |
| 39 | + long fileSize; // size of file on disk |
| 40 | + long fileOffset; // current location in file |
| 41 | + uint64_t currentTimeframe; // current timeframe |
| 42 | + uint64_t nextTimeframe; // next timeframe detected |
| 43 | + long bytesOut; // number of bytes written |
| 44 | + bool done; // flag set when file read completed |
| 45 | +}; |
| 46 | + |
| 47 | +int main(int argc, const char* argv[]) |
| 48 | +{ |
| 49 | + std::vector<std::string> filenames; |
| 50 | + std::vector<RawFileDescriptor> fds; |
| 51 | + bool isError = 0; |
| 52 | + |
| 53 | + std::string outputFile = "/tmp/out.raw"; // path to output merged file |
| 54 | + long bufferSize = 1000000; // chunck size for reading in memory |
| 55 | + bool fileReadVerbose = true; // flag to print more info (chunk size, etc) when reading file |
| 56 | + uint64_t TFperiod = 32; // period of a timeframe |
| 57 | + long totalSize = 0; // input files total size (bytes) |
| 58 | + |
| 59 | + // parse arguments (key=value pairs) and build list of input files |
| 60 | + for (int i = 1; i < argc; i++) { |
| 61 | + |
| 62 | + // check if argument is an option of the form key=value |
| 63 | + const char* option = argv[i]; |
| 64 | + std::string key(option); |
| 65 | + size_t separatorPosition = key.find('='); |
| 66 | + if (separatorPosition != std::string::npos) { |
| 67 | + key.resize(separatorPosition); |
| 68 | + std::string value = &(option[separatorPosition + 1]); |
| 69 | + |
| 70 | + if (key == "outputFile") { |
| 71 | + outputFile = value; |
| 72 | + } else if (key == "bufferSize") { |
| 73 | + bufferSize = std::atoi(value.c_str()); |
| 74 | + } else { |
| 75 | + printf("unknown option %s\n", key.c_str()); |
| 76 | + isError = 1; |
| 77 | + } |
| 78 | + |
| 79 | + continue; |
| 80 | + } |
| 81 | + |
| 82 | + filenames.push_back(option); |
| 83 | + } |
| 84 | + |
| 85 | + // check success |
| 86 | + if (isError) { |
| 87 | + printf("Aborting\n"); |
| 88 | + return -1; |
| 89 | + } |
| 90 | + |
| 91 | + // summary of options |
| 92 | + printf("Using options:\n\t outputFile = %s\n\t bufferSize = %lu\n", outputFile.c_str(), (long unsigned)bufferSize); |
| 93 | + |
| 94 | + // open files and init |
| 95 | + // not done in arg paring loop, so that we now have all options set |
| 96 | + for (const auto &fn : filenames) { |
| 97 | + FILE *fp = fopen(fn.c_str(), "rb"); |
| 98 | + if (fp == NULL) { |
| 99 | + printf("Can't open %s\n", fn.c_str()); |
| 100 | + isError = 1 ; |
| 101 | + continue; |
| 102 | + } |
| 103 | + printf("%s\n", fn.c_str()); |
| 104 | + |
| 105 | + // get file size |
| 106 | + long fileSize = std::filesystem::file_size(fn); |
| 107 | + if (fileReadVerbose) { |
| 108 | + printf("File size: %ld bytes\n", fileSize); |
| 109 | + } |
| 110 | + totalSize += fileSize; |
| 111 | + |
| 112 | + void *buffer = malloc(bufferSize); |
| 113 | + if (buffer == NULL) { |
| 114 | + printf("Failed to allocate buffer\n"); |
| 115 | + isError = 1; |
| 116 | + break; |
| 117 | + } |
| 118 | + |
| 119 | + fds.push_back({.path = fn, .fp = fp, .buffer = buffer, .bufferUsed = 0, .bufferProcessed = 0, .fileSize = fileSize, .fileOffset = 0, .currentTimeframe = 0, .nextTimeframe = 0, .bytesOut = 0, .done = 0}); |
| 120 | + } |
| 121 | + |
| 122 | + // check success |
| 123 | + if (isError) { |
| 124 | + printf("Aborting\n"); |
| 125 | + return -1; |
| 126 | + } |
| 127 | + |
| 128 | + // open output file |
| 129 | + printf("Opening %s for output\n", outputFile.c_str()); |
| 130 | + FILE *fdout = fopen(outputFile.c_str(), "w"); |
| 131 | + if (fdout == NULL) { |
| 132 | + printf("Can't open %s for writing\n", outputFile.c_str()); |
| 133 | + return -1; |
| 134 | + } |
| 135 | + printf("Expected output size: %ld\n", totalSize); |
| 136 | + |
| 137 | + for(;;) { |
| 138 | + |
| 139 | + unsigned int nCompleted = 0; |
| 140 | + printf("\n\n\n*** LOOP\n"); |
| 141 | + |
| 142 | + // are all files at the same TF now ? |
| 143 | + bool sameTimeframeId = true; |
| 144 | + uint64_t nextTF=0; |
| 145 | + for(auto &fd : fds) { |
| 146 | + if (fd.fileOffset >= fd.fileSize) continue; |
| 147 | + |
| 148 | + if (nextTF == 0) { |
| 149 | + nextTF=fd.nextTimeframe; |
| 150 | + } |
| 151 | + |
| 152 | + if (fd.nextTimeframe != nextTF) { |
| 153 | + |
| 154 | + sameTimeframeId = 0; |
| 155 | + printf("TF %d != %d @ file %s\n", (int) fd.nextTimeframe, (int) nextTF, fd.path.c_str()); |
| 156 | + break; |
| 157 | + } |
| 158 | + } |
| 159 | + |
| 160 | + |
| 161 | + for(auto &fd : fds) { |
| 162 | + printf("\nFile %s\n",fd.path.c_str()); |
| 163 | + |
| 164 | + bool skip=0; |
| 165 | + for (; !fd.done; ) { |
| 166 | + |
| 167 | + if ((fd.bufferUsed == 0)||(fd.bufferUsed == fd.bufferProcessed)) { |
| 168 | + // read new chunk |
| 169 | + |
| 170 | + long dataSize = fd.fileSize - fd.fileOffset; |
| 171 | + if (dataSize > bufferSize) { |
| 172 | + dataSize = bufferSize; |
| 173 | + } |
| 174 | + |
| 175 | + if (fread(fd.buffer, dataSize, 1, fd.fp) != 1) { |
| 176 | + break; |
| 177 | + } |
| 178 | + printf("Got block %ld bytes @ %ld (total: %ld /%ld)\n", dataSize, fd.fileOffset, fd.fileOffset + dataSize, fd.fileSize); |
| 179 | + |
| 180 | + fd.bufferUsed = dataSize; |
| 181 | + fd.bufferProcessed = 0; |
| 182 | + fd.fileOffset += dataSize; |
| 183 | + } else { |
| 184 | + printf("Continuing with buffer @ %ld (%ld /%ld) \n",fd.fileOffset, fd.bufferProcessed, fd.bufferUsed); |
| 185 | + } |
| 186 | + |
| 187 | + uint64_t bufferProcessedInIteration = 0; |
| 188 | + |
| 189 | + // process current chunk until next timeframe |
| 190 | + while (fd.bufferProcessed < fd.bufferUsed) { |
| 191 | + |
| 192 | + // check we are not at page boundary |
| 193 | + if (fd.bufferProcessed + sizeof(o2::Header::RAWDataHeader) <= fd.bufferUsed) { |
| 194 | + |
| 195 | + RdhHandle h(((uint8_t*)fd.buffer) + fd.bufferProcessed); |
| 196 | + |
| 197 | + std::string err; |
| 198 | + if (h.validateRdh(err)) { |
| 199 | + printf("RDH error @ %ld: %s", (long)fd.bufferProcessed, err.c_str()); |
| 200 | + return -1; |
| 201 | + } |
| 202 | + |
| 203 | + long nBytes = h.getOffsetNextPacket(); |
| 204 | + |
| 205 | + if (fd.bufferProcessed + nBytes <= fd.bufferUsed) { |
| 206 | + uint64_t TFid = 1 + h.getHbOrbit() / TFperiod; |
| 207 | + |
| 208 | + if (TFid != fd.currentTimeframe) { |
| 209 | + if (TFid != fd.nextTimeframe) { |
| 210 | + printf("Next TF detected %ld @ %ld\n", TFid, fd.bufferProcessed); |
| 211 | + if (sameTimeframeId) { |
| 212 | + // wait that all files are at the same TF before checking next |
| 213 | + fd.nextTimeframe = TFid; |
| 214 | + } |
| 215 | + skip = 1; |
| 216 | + break; |
| 217 | + } else { |
| 218 | + if (!sameTimeframeId) { |
| 219 | + skip = 1; |
| 220 | + break; |
| 221 | + } |
| 222 | + fd.currentTimeframe = TFid; // we can start with this one |
| 223 | + printf("Starting new TF %ld @ %ld\n", fd.currentTimeframe, fd.bufferProcessed); |
| 224 | + skip=0; |
| 225 | + } |
| 226 | + } |
| 227 | + // h.dumpRdh(fd.fileOffset + fd.bufferProcessed - fd.bufferUsed, 1); |
| 228 | + bufferProcessedInIteration += nBytes; |
| 229 | + fd.bufferProcessed += nBytes; |
| 230 | + continue; |
| 231 | + } |
| 232 | + } |
| 233 | + |
| 234 | + if (!skip) { |
| 235 | + // rewind a bit |
| 236 | + int delta = fd.bufferUsed - fd.bufferProcessed; |
| 237 | + if (delta) { |
| 238 | + fd.fileOffset -= delta; |
| 239 | + printf("%ld / %ld : %d -> new position %ld\n", fd.bufferProcessed, fd.bufferUsed, delta, fd.fileOffset); |
| 240 | + if (fseek(fd.fp, fd.fileOffset, SEEK_SET)) { |
| 241 | + printf("Failed to seek in file"); |
| 242 | + return -1; |
| 243 | + } |
| 244 | + } |
| 245 | + fd.bufferUsed = 0; // re-read from file from beginning of chunk |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + // write validated data |
| 250 | + if (bufferProcessedInIteration) { |
| 251 | + if (fwrite(&((char *)fd.buffer)[fd.bufferProcessed - bufferProcessedInIteration], bufferProcessedInIteration, 1, fdout)!=1) { |
| 252 | + printf("Failed to write %d bytes\n",(int) fd.bufferProcessed); |
| 253 | + printf("%s\n",strerror(errno)); |
| 254 | + return -1; |
| 255 | + } |
| 256 | + printf("Wrote %d bytes\n", (int)bufferProcessedInIteration); |
| 257 | + fd.bytesOut += bufferProcessedInIteration; |
| 258 | + } |
| 259 | + if (skip) { |
| 260 | + printf("skipping until next loop \n"); |
| 261 | + break; |
| 262 | + } |
| 263 | + } |
| 264 | + if ((fd.fileOffset >= fd.fileSize)&&(fd.bufferUsed == fd.bufferProcessed)) { |
| 265 | + fd.done = 1; |
| 266 | + printf("File read completed %ld %ld\n",fd.bufferUsed,fd.bufferProcessed); |
| 267 | + nCompleted++; |
| 268 | + } |
| 269 | + } |
| 270 | + |
| 271 | + |
| 272 | + printf("*** %d / %ld completed\n", nCompleted, fds.size()); |
| 273 | + if (nCompleted == fds.size()) { |
| 274 | + // all files read |
| 275 | + break; |
| 276 | + } |
| 277 | + |
| 278 | + } |
| 279 | + |
| 280 | + fclose(fdout); |
| 281 | + |
| 282 | + long totalBytesOut = 0; |
| 283 | + for(auto &fd : fds) { |
| 284 | + printf("\nFile %s: %ld / %ld\n",fd.path.c_str(),fd.bytesOut, fd.fileSize); |
| 285 | + totalBytesOut += fd.bytesOut; |
| 286 | + } |
| 287 | + if (totalBytesOut!=totalSize) { |
| 288 | + printf("Warning: output size mismatch input %ld != %ld\n",totalBytesOut, totalSize); |
| 289 | + } |
| 290 | + return 0; |
| 291 | + |
| 292 | +} |
0 commit comments