@@ -2656,11 +2656,12 @@ class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity
26562656 }
26572657};
26582658
2659+
26592660class CRemoteIndexWriteHelper : public CThorIndexWriteArg
26602661{
26612662 UnexpectedVirtualFieldCallback fieldCallback;
26622663 Owned<const IDynamicTransform> translator;
2663- std::map <std::string, std::string> indexMetaData;
2664+ std::vector <std::pair<std:: string, std::string> > indexMetaData;
26642665public:
26652666 CRemoteIndexWriteHelper (const char * _filename, const char * _compression, IOutputMetaData * _inMeta, IOutputMetaData * _outMeta, unsigned _flags)
26662667 : filename(_filename), compression(_compression), inMeta(_inMeta), outMeta(_outMeta), flags(_flags)
@@ -2670,51 +2671,50 @@ class CRemoteIndexWriteHelper : public CThorIndexWriteArg
26702671 translator.setown (createRecordTranslator (outRecord, inRecord));
26712672 }
26722673
2673- virtual bool getIndexMeta (size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx)
2674+ void setIndexMeta (const std::string& name, const std::string& value)
2675+ {
2676+ indexMetaData.emplace_back (name, value);
2677+ }
2678+
2679+ virtual const char * getFileName () override { return filename.c_str (); }
2680+ virtual int getSequence () override { return 0 ; }
2681+ virtual IOutputMetaData * queryDiskRecordSize () override { return outMeta; }
2682+ virtual const char * queryRecordECL () override { return nullptr ; }
2683+ virtual unsigned getFlags () override { return flags; }
2684+ virtual unsigned getKeyedSize () override
2685+ {
2686+ if (outMeta == nullptr )
2687+ return 0 ;
2688+
2689+ const RtlRecord& recAccessor = outMeta->queryRecordAccessor (true );
2690+ return recAccessor.getFixedOffset (recAccessor.getNumKeyedFields ());
2691+ }
2692+ virtual unsigned getMaxKeySize () override { return 0 ; }
2693+ virtual unsigned getFormatCrc () override { return 0 ; }
2694+ virtual unsigned getWidth () override { return indexMetaData.size (); }
2695+ virtual const char * queryCompression () override { return compression.c_str (); }
2696+ virtual bool getIndexMeta (size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) override
26742697 {
26752698 if (idx >= indexMetaData.size ())
26762699 return false ;
26772700
2678- auto it = indexMetaData.begin ();
2679- std::advance (it, idx);
2701+ const auto &entry = indexMetaData[idx];
26802702
2681- lenName = it-> first .length ();
2703+ lenName = entry. first .length ();
26822704 name = (char *) rtlMalloc (lenName);
2683- memcpy (name, it-> first .c_str (), lenName);
2705+ memcpy (name, entry. first .c_str (), lenName);
26842706
2685- lenValue = it-> second .length ();
2707+ lenValue = entry. second .length ();
26862708 value = (char *) rtlMalloc (lenValue);
2687- memcpy (value, it-> second .c_str (), lenValue);
2709+ memcpy (value, entry. second .c_str (), lenValue);
26882710
26892711 return true ;
26902712 }
2691-
2692- void setIndexMeta (const std::string& name, const std::string& value)
2693- {
2694- indexMetaData[name] = value;
2695- }
2696-
2697- virtual const char * getFileName () { return filename.c_str (); }
2698- virtual int getSequence () { return 0 ; }
2699- virtual IOutputMetaData * queryDiskRecordSize () { return outMeta; }
2700- virtual const char * queryRecordECL () { return nullptr ; }
2701- virtual unsigned getFlags () { return flags; }
2702- virtual size32_t transform (ARowBuilder & rowBuilder, const void * row, IBlobCreator * blobs, unsigned __int64 & filepos)
2713+ virtual size32_t transform (ARowBuilder & rowBuilder, const void * row, IBlobCreator * blobs, unsigned __int64 & filepos) override
27032714 {
27042715 // Seems like an UnexpectedVirtualFieldCallback could be used but what about blobs?
27052716 return translator->translate (rowBuilder, fieldCallback, (const byte *)row);
27062717 }
2707- virtual unsigned getKeyedSize ()
2708- {
2709- if (outMeta == nullptr )
2710- return 0 ;
2711-
2712- const RtlRecord& recAccessor = outMeta->queryRecordAccessor (true );
2713- return recAccessor.getFixedOffset (recAccessor.getNumKeyedFields ());
2714- }
2715- virtual unsigned getMaxKeySize () { return 0 ; }
2716- virtual unsigned getFormatCrc () { return 0 ; }
2717- virtual const char * queryCompression () { return compression.c_str (); }
27182718
27192719public:
27202720 std::string filename;
@@ -2737,6 +2737,7 @@ class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IB
27372737 uint64_t uncompressedSize = 0 ;
27382738 uint64_t processed = 0 ;
27392739 size32_t maxDiskRecordSize = 0 ;
2740+ size32_t keyedSize = 0 ;
27402741 size32_t maxRecordSizeSeen = 0 ; // used to store the maximum record size seen, for metadata
27412742 bool isTlk = false ;
27422743 bool opened = false ;
@@ -2748,13 +2749,13 @@ class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IB
27482749 size32_t indexRowSize = helper->transform (rowBuilder, row, this , fpos);
27492750
27502751 // Key builder checks for duplicate records so we can just check for sortedness
2751- if (memcmp (prevRowBuffer.get (), rowBuffer.get (), helper-> getKeyedSize () ) > 0 )
2752+ if (memcmp (prevRowBuffer.get (), rowBuffer.get (), keyedSize ) > 0 )
27522753 {
27532754 throw createDafsExceptionV (DAFSERR_cmdstream_generalwritefailure, " CRemoteIndexWriteActivity: Incoming rows are not sorted." );
27542755 }
27552756
27562757 builder->processKeyData (rowBuffer, fpos, indexRowSize);
2757- uncompressedSize += ( indexRowSize + sizeof ( offset_t )); // Include FPOS in the uncompressed size
2758+ uncompressedSize += indexRowSize;
27582759
27592760 if (indexRowSize > maxRecordSizeSeen)
27602761 maxRecordSizeSeen = indexRowSize;
@@ -2768,22 +2769,20 @@ class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IB
27682769 if (!recursiveCreateDirectoryForFile (fileName))
27692770 throw createDafsExceptionV (DAFSERR_cmdstream_openfailure, " Failed to create dirtory for file: '%s'" , fileName.get ());
27702771 OwnedIFile iFile = createIFile (fileName);
2771- assertex (iFile);
27722772
27732773 iFileIO.setown (iFile->open (IFOcreate));
27742774 if (!iFileIO)
27752775 throw createDafsExceptionV (DAFSERR_cmdstream_openfailure, " Failed to open: '%s' for write" , fileName.get ());
27762776
2777- iFileIOStream.setown (createIOStream (iFileIO));
2777+ iFileIOStream.setown (createBufferedIOStream (iFileIO));
27782778 opened = true ;
27792779 }
27802780
2781- virtual unsigned __int64 createBlob (size32_t size, const void * ptr)
2781+ virtual unsigned __int64 createBlob (size32_t size, const void * ptr) override
27822782 {
27832783 return builder->createBlob (size, (const char *) ptr);
27842784 }
27852785public:
2786-
27872786 CRemoteIndexWriteActivity (IPropertyTree &config, IFileDescriptor *fileDesc) : CRemoteWriteBaseActivity(config, fileDesc)
27882787 {
27892788 inMeta.setown (getTypeInfoOutputMetaData (config, " input" , false ));
@@ -2796,28 +2795,16 @@ class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IB
27962795
27972796 std::string compression = config.queryProp (" compressed" , " default" );
27982797 toLower (compression);
2799- trim (compression);
28002798
2801- unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | USE_TRAILING_HEADER;
2799+ unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | USE_TRAILING_HEADER | HTREE_COMPRESSED_KEY | HTREE_QUICK_COMPRESSED_KEY ;
28022800
2803- if (compression == " default" )
2804- {
2805- flags |= HTREE_COMPRESSED_KEY;
2806- compression = " " ;
2807- }
2808- else if (compression == " lzw" )
2809- {
2810- flags |= HTREE_COMPRESSED_KEY;
2811- compression = " " ;
2812- }
2813- else if (compression == " row" )
2801+ if (compression == " default" || compression == " lzw" )
28142802 {
28152803 compression = " " ;
2816- flags |= HTREE_COMPRESSED_KEY | HTREE_QUICK_COMPRESSED_KEY;
28172804 }
28182805 else if (compression.substr (0 ,7 ) == " inplace" )
28192806 {
2820- flags |= HTREE_COMPRESSED_KEY;
2807+ // pass through the compression string as-is
28212808 }
28222809
28232810 bool isVariable = outMeta->isVariableSize ();
@@ -2833,25 +2820,21 @@ class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IB
28332820 helper->setIndexMeta (" _nodeSize" , std::to_string (nodeSize));
28342821 }
28352822
2836- if (config.hasProp (" noSeek" ))
2837- {
2838- bool noSeek = config.getPropBool (" noSeek" );
2839- helper->setIndexMeta (" _noSeek" , noSeek ? " true" : " false" );
2840- if (noSeek)
2841- flags |= TRAILING_HEADER_ONLY;
2842- }
2823+ bool noSeek = config.getPropBool (" noSeek" , true );
2824+ helper->setIndexMeta (" _noSeek" , boolToStr (noSeek));
2825+ if (noSeek)
2826+ flags |= TRAILING_HEADER_ONLY;
28432827
2844- if ( config.hasProp (" useTrailingHeader" ))
2845- {
2846- bool useTrailingHeader = config. getPropBool ( " useTrailingHeader" );
2847- helper-> setIndexMeta ( " _useTrailingHeader " , useTrailingHeader ? " true " : " false " ) ;
2848- if (useTrailingHeader)
2849- flags |= USE_TRAILING_HEADER;
2850- else
2851- flags &= ~USE_TRAILING_HEADER;
2852- }
2828+ bool useTrailingHeader = config.getPropBool (" useTrailingHeader" , true );
2829+ helper-> setIndexMeta ( " _useTrailingHeader " , boolToStr (useTrailingHeader));
2830+ if ( useTrailingHeader)
2831+ flags |= USE_TRAILING_HEADER ;
2832+ else
2833+ flags &= ~ USE_TRAILING_HEADER;
2834+
2835+ if ( hasTrailingFileposition (helper-> queryDiskRecordSize ()-> queryTypeInfo ()))
2836+ throw createDafsException (DAFSERR_cmdstream_protocol_failure, " CRemoteIndexWriteActivity: trailing fileposition not supported, use FILEPOSITION(FALSE) " );
28532837
2854- size32_t fileposSize = hasTrailingFileposition (helper->queryDiskRecordSize ()->queryTypeInfo ()) ? sizeof (offset_t ) : 0 ;
28552838 if (isVariable)
28562839 {
28572840 if (helper->getFlags () & TIWmaxlength)
@@ -2860,50 +2843,69 @@ class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IB
28602843 maxDiskRecordSize = KEYBUILD_MAXLENGTH; // Current default behaviour, could be improved in the future
28612844 }
28622845 else
2863- maxDiskRecordSize = helper->queryDiskRecordSize ()->getFixedSize ()-fileposSize ;
2846+ maxDiskRecordSize = helper->queryDiskRecordSize ()->getFixedSize ();
28642847
28652848 if (maxDiskRecordSize > KEYBUILD_MAXLENGTH)
28662849 throw MakeStringException (99 , " Index maximum record length (%d) exceeds 32k internal limit" , maxDiskRecordSize);
28672850
2851+ keyedSize = helper->getKeyedSize ();
2852+
28682853 rowBuffer.allocateN (maxDiskRecordSize, true );
28692854 prevRowBuffer.allocateN (maxDiskRecordSize, true );
28702855
28712856 openFileStream ();
2872- builder.setown (createKeyBuilder (iFileIOStream.get (), flags, maxDiskRecordSize, nodeSize, helper-> getKeyedSize () , 0 , helper.get (), compression.c_str (), true , false ));
2857+ builder.setown (createKeyBuilder (iFileIOStream.get (), flags, maxDiskRecordSize, nodeSize, keyedSize , 0 , helper.get (), compression.c_str (), true , false ));
28732858 }
28742859
28752860 ~CRemoteIndexWriteActivity ()
28762861 {
2877- if (builder != nullptr && helper != nullptr )
2862+ try
28782863 {
2879- Owned<IPropertyTree> metadata;
2880- metadata.setown (createPTree (" metadata" , ipt_fast));
2881- buildUserMetadata (metadata, *helper);
2864+ if (builder != nullptr && helper != nullptr )
2865+ {
2866+ Owned<IPropertyTree> metadata = createPTree (" metadata" );
2867+ buildUserMetadata (metadata, *helper);
28822868
2883- metadata->setProp (" _record_ECL" , helper->queryRecordECL ());
2884- setRtlFormat (*metadata, helper->queryDiskRecordSize ());
2869+ metadata->setProp (" _record_ECL" , helper->queryRecordECL ());
2870+ setRtlFormat (*metadata, helper->queryDiskRecordSize ());
28852871
2886- unsigned int fileCrc;
2887- builder->finish (metadata, &fileCrc, maxRecordSizeSeen, nullptr );
2888- }
2872+ unsigned int fileCrc;
2873+ builder->finish (metadata, &fileCrc, maxRecordSizeSeen, nullptr );
2874+ }
28892875
2890- close ();
2876+ close ();
2877+ }
2878+ catch (IException *e)
2879+ {
2880+ EXCLOG (e, " ~CRemoteIndexWriteActivity" );
2881+ e->Release ();
2882+ }
2883+ catch (...)
2884+ {
2885+ IERRLOG (" ~CRemoteIndexWriteActivity: unknown exception" );
2886+ }
28912887 }
28922888
28932889 virtual void write (size32_t sz, const void *rowData) override
28942890 {
2891+ const RtlRecord& inputRecordAccessor = inMeta->queryRecordAccessor (true );
28952892 size32_t rowOffset = 0 ;
28962893 while (rowOffset < sz)
28972894 {
2898- const RtlRecord& inputRecordAccessor = inMeta->queryRecordAccessor (true );
2899- size32_t rowSize = inputRecordAccessor.getRecordSize (rowData);
2895+ size32_t rowSize = inputRecordAccessor.getRecordSize ((const byte *)rowData + rowOffset);
29002896 processRow ((const byte *)rowData + rowOffset, rowSize);
29012897 rowOffset += rowSize;
29022898 }
2899+ if (rowOffset > sz)
2900+ throw createDafsExceptionV (DAFSERR_cmdstream_generalwritefailure, " CRemoteIndexWriteActivity: partial record detected (offset=%u, size=%u)" , rowOffset, sz);
29032901 }
29042902
29052903 virtual void serializeCursor (MemoryBuffer &tgt) const override {}
29062904 virtual void restoreCursor (MemoryBuffer &src) override {}
2905+ virtual StringBuffer &getInfoStr (StringBuffer &out) const override
2906+ {
2907+ return out.appendf (" indexwrite[%s]" , fileName.get ());
2908+ }
29072909};
29082910
29092911// create a { unsigned8 } output meta for the count
0 commit comments