33#include " buffer_pool.h"
44#include " compress.h"
55#include " cso.h"
6+ #include " dax.h"
67
78namespace maxcso {
89
@@ -50,14 +51,7 @@ void Output::SetFile(uv_file file, int64_t srcSize, uint32_t blockSize, CSOForma
5051 const uint32_t sectors = static_cast <uint32_t >((srcSize + blockSize_ - 1 ) >> blockShift_);
5152 // Start after the header and index, which we'll fill in later.
5253 index_ = new uint32_t [sectors + 1 ];
53- if (flags_ & TASKFLAG_DECOMPRESS) {
54- // Decompressing, so no header.
55- // We still track the index for code simplicity, but throw it away.
56- dstPos_ = 0 ;
57- } else {
58- // Start after the end of the index data and header.
59- dstPos_ = sizeof (CSOHeader) + (sectors + 1 ) * sizeof (uint32_t );
60- }
54+ dstPos_ = DstFirstSectorPos (sectors);
6155
6256 // TODO: We might be able to optimize shift better by running through the data.
6357 // That would require either a second pass or keeping the entire result in RAM.
@@ -74,6 +68,17 @@ void Output::SetFile(uv_file file, int64_t srcSize, uint32_t blockSize, CSOForma
7468 }
7569 }
7670
71+ if (fmt == CSO_FMT_DAX) {
72+ if (indexShift_ != 0 || static_cast <uint32_t >(srcSize_) < srcSize_) {
73+ finish_ (false , " File too large to compress as DAX" );
74+ return ;
75+ }
76+ if (blockSize_ != DAX_FRAME_SIZE) {
77+ finish_ (false , " DAX requires a block size of 8192" );
78+ return ;
79+ }
80+ }
81+
7782 // If the shift is above 11, the padding could make it need more space.
7883 // But that would be > 4 TB anyway, so let's not worry about it.
7984 indexAlign_ = 1 << indexShift_;
@@ -88,6 +93,21 @@ void Output::SetFile(uv_file file, int64_t srcSize, uint32_t blockSize, CSOForma
8893 }
8994}
9095
96+ int64_t Output::DstFirstSectorPos (uint32_t totalSectors) {
97+ if (flags_ & TASKFLAG_DECOMPRESS) {
98+ // Decompressing, so no header.
99+ // We still track the index for code simplicity, but throw it away.
100+ return 0 ;
101+ } else if (flags_ & TASKFLAG_FMT_DAX) {
102+ // Pos (32 bits) and size (16 bits) per sector, plus header.
103+ // TODO: We don't support NC areas, but if we did, we'd have to know them here already...
104+ return sizeof (DAXHeader) + totalSectors * (sizeof (uint32_t ) + sizeof (uint16_t ));
105+ } else {
106+ // Start after the end of the index data and header.
107+ return sizeof (CSOHeader) + (totalSectors + 1 ) * sizeof (uint32_t );
108+ }
109+ }
110+
91111int32_t Output::Align (int64_t &pos) {
92112 uint32_t off = static_cast <uint32_t >(pos % indexAlign_);
93113 if (off != 0 ) {
@@ -200,35 +220,11 @@ void Output::HandleReadySector(Sector *sector) {
200220 static char padding[2048 ] = {0 };
201221 for (size_t i = 0 ; i < sectors.size (); ++i) {
202222 unsigned int bestSize = sectors[i]->BestSize ();
203- bufs[nbufs++] = uv_buf_init (reinterpret_cast <char *>(sectors[i]->BestBuffer ()), bestSize);
204-
205- // Update the index.
206- const int32_t s = static_cast <int32_t >(sectors[i]->Pos () >> blockShift_);
207- index_[s] = static_cast <int32_t >(dstPos >> indexShift_);
208- // CSO2 doesn't use a flag for uncompressed, only the size of the block.
209- if (!sectors[i]->Compressed () && fmt_ != CSO_FMT_CSO2) {
210- index_[s] |= CSO_INDEX_UNCOMPRESSED;
211- }
212- switch (fmt_) {
213- case CSO_FMT_CSO1:
214- if (sectors[i]->Format () == SECTOR_FMT_LZ4) {
215- finish_ (false , " LZ4 format not supported within CSO v1 file" );
216- return ;
217- }
218- break ;
219- case CSO_FMT_ZSO:
220- if (sectors[i]->Format () == SECTOR_FMT_DEFLATE) {
221- finish_ (false , " Deflate format not supported within ZSO file" );
222- return ;
223- }
224- break ;
225- case CSO_FMT_CSO2:
226- if (sectors[i]->Format () == SECTOR_FMT_LZ4) {
227- index_[s] |= CSO2_INDEX_LZ4;
228- }
229- break ;
223+ if (!UpdateIndex (sectors[i]->Pos (), dstPos, bestSize, sectors[i]->Format ())) {
224+ return ;
230225 }
231226
227+ bufs[nbufs++] = uv_buf_init (reinterpret_cast <char *>(sectors[i]->BestBuffer ()), bestSize);
232228 dstPos += bestSize;
233229 int32_t padSize = Align (dstPos);
234230 if (padSize != 0 ) {
@@ -276,6 +272,42 @@ void Output::HandleReadySector(Sector *sector) {
276272 });
277273}
278274
275+ bool Output::UpdateIndex (int64_t srcPos, int64_t dstPos, uint32_t compressedSize, SectorFormat compressedFmt) {
276+ const int32_t s = static_cast <int32_t >(srcPos >> blockShift_);
277+ index_[s] = static_cast <int32_t >(dstPos >> indexShift_);
278+ // CSO2 doesn't use a flag for uncompressed, only the size of the block.
279+ if (compressedFmt == SECTOR_FMT_ORIG && fmt_ != CSO_FMT_CSO2 && fmt_ != CSO_FMT_DAX) {
280+ index_[s] |= CSO_INDEX_UNCOMPRESSED;
281+ }
282+ switch (fmt_) {
283+ case CSO_FMT_CSO1:
284+ if (compressedFmt == SECTOR_FMT_LZ4) {
285+ finish_ (false , " LZ4 format not supported within CSO v1 file" );
286+ return false ;
287+ }
288+ break ;
289+ case CSO_FMT_ZSO:
290+ if (compressedFmt == SECTOR_FMT_DEFLATE) {
291+ finish_ (false , " Deflate format not supported within ZSO file" );
292+ return false ;
293+ }
294+ break ;
295+ case CSO_FMT_CSO2:
296+ if (compressedFmt == SECTOR_FMT_LZ4) {
297+ index_[s] |= CSO2_INDEX_LZ4;
298+ }
299+ break ;
300+ case CSO_FMT_DAX:
301+ if (compressedFmt != SECTOR_FMT_DEFLATE) {
302+ finish_ (false , " Sector format must be ZLIB for entire file" );
303+ return false ;
304+ }
305+ break ;
306+ }
307+
308+ return true ;
309+ }
310+
279311bool Output::ShouldCompress (int64_t pos, uint8_t *buffer) {
280312 if (flags_ & TASKFLAG_DECOMPRESS) {
281313 return false ;
@@ -320,6 +352,20 @@ void Output::Flush() {
320352 return ;
321353 }
322354
355+ switch (fmt_) {
356+ case CSO_FMT_CSO1:
357+ case CSO_FMT_CSO2:
358+ case CSO_FMT_ZSO:
359+ WriteCSOIndex ();
360+ break ;
361+
362+ case CSO_FMT_DAX:
363+ WriteDAXIndex ();
364+ break ;
365+ }
366+ }
367+
368+ void Output::WriteCSOIndex () {
323369 CSOHeader *header = new CSOHeader;
324370 if (fmt_ == CSO_FMT_ZSO) {
325371 memcpy (header->magic , ZSO_MAGIC, sizeof (header->magic ));
@@ -352,6 +398,48 @@ void Output::Flush() {
352398 });
353399}
354400
401+ void Output::WriteDAXIndex () {
402+ DAXHeader *header = new DAXHeader;
403+ memcpy (header->magic , DAX_MAGIC, sizeof (header->magic ));
404+ header->uncompressed_size = static_cast <uint32_t >(srcSize_);
405+ // TODO: 0 because we don't support NC areas in writing currently.
406+ header->version = 0 ;
407+ header->nc_areas = 0 ;
408+ header->unused [0 ] = 0 ;
409+ header->unused [1 ] = 0 ;
410+ header->unused [2 ] = 0 ;
411+ header->unused [3 ] = 0 ;
412+
413+ const uint32_t sectors = static_cast <uint32_t >(SrcSizeAligned () >> blockShift_);
414+ uint16_t *sizes = new uint16_t [sectors];
415+ for (uint32_t i = 0 ; i < sectors; ++i) {
416+ uint32_t size = index_[i + 1 ] - index_[i];
417+ if (size < (1 << 16 )) {
418+ sizes[i] = size;
419+ } else {
420+ finish_ (false , " Compressed sector larger than 16 bits" );
421+ }
422+ }
423+
424+ uv_buf_t bufs[3 ];
425+ bufs[0 ] = uv_buf_init (reinterpret_cast <char *>(header), sizeof (DAXHeader));
426+ // We skip the last entry of the index, which is the end.
427+ bufs[1 ] = uv_buf_init (reinterpret_cast <char *>(index_), sectors * sizeof (uint32_t ));
428+ bufs[2 ] = uv_buf_init (reinterpret_cast <char *>(sizes), sectors * sizeof (uint16_t ));
429+ const ssize_t totalBytes = sizeof (DAXHeader) + sectors * (sizeof (uint32_t ) + sizeof (uint16_t ));
430+ uv_.fs_write (loop_, &flush_, file_, bufs, 3 , 0 , [this , header, sizes, totalBytes](uv_fs_t *req) {
431+ if (req->result != totalBytes) {
432+ finish_ (false , " Unable to write header data" );
433+ } else {
434+ state_ |= STATE_INDEX_WRITTEN;
435+ CheckFinish ();
436+ }
437+ uv_fs_req_cleanup (req);
438+ delete header;
439+ delete [] sizes;
440+ });
441+ }
442+
355443void Output::CheckFinish () {
356444 if ((state_ & STATE_INDEX_WRITTEN) && (state_ & STATE_DATA_WRITTEN)) {
357445 finish_ (true , nullptr );
0 commit comments