-
Notifications
You must be signed in to change notification settings - Fork 310
HPCC-33806 Add support of building indexes to dafilesrv #20197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: candidate-9.14.x
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -70,6 +70,9 @@ | |||||||||||||||||||||||||||||||||
| #include "rmtfile.hpp" | ||||||||||||||||||||||||||||||||||
| #include "rmtclient_impl.hpp" | ||||||||||||||||||||||||||||||||||
| #include "dafsserver.hpp" | ||||||||||||||||||||||||||||||||||
| #include "keybuild.hpp" | ||||||||||||||||||||||||||||||||||
| #include "eclhelper_base.hpp" | ||||||||||||||||||||||||||||||||||
| #include "thorfile.hpp" | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| #include "ftslavelib.hpp" | ||||||||||||||||||||||||||||||||||
| #include "filecopy.hpp" | ||||||||||||||||||||||||||||||||||
|
|
@@ -2575,8 +2578,6 @@ class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity> | |||||||||||||||||||||||||||||||||
| return this; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| typedef CRemoteWriteBaseActivity PARENT; | ||||||||||||||||||||||||||||||||||
|
|
@@ -2655,6 +2656,255 @@ class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity | |||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| class CRemoteIndexWriteHelper : public CThorIndexWriteArg | ||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this class actually provided any benefit? It is legal to call createKeyBuilder() with null for the helper. Long term that is the direction that disk write is going for many options. |
||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| UnexpectedVirtualFieldCallback fieldCallback; | ||||||||||||||||||||||||||||||||||
| Owned<const IDynamicTransform> translator; | ||||||||||||||||||||||||||||||||||
| std::map<std::string, std::string> indexMetaData; | ||||||||||||||||||||||||||||||||||
| public: | ||||||||||||||||||||||||||||||||||
| CRemoteIndexWriteHelper(const char * _filename, const char* _compression, IOutputMetaData * _inMeta, IOutputMetaData * _outMeta, unsigned _flags) | ||||||||||||||||||||||||||||||||||
| : filename(_filename), compression(_compression), inMeta(_inMeta), outMeta(_outMeta), flags(_flags) | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| const RtlRecord &inRecord = inMeta->queryRecordAccessor(true); | ||||||||||||||||||||||||||||||||||
| const RtlRecord &outRecord = outMeta->queryRecordAccessor(true); | ||||||||||||||||||||||||||||||||||
| translator.setown(createRecordTranslator(outRecord, inRecord)); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| virtual bool getIndexMeta(size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| if (idx >= indexMetaData.size()) | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| auto it = indexMetaData.begin(); | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| std::advance(it, idx); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| lenName = it->first.length(); | ||||||||||||||||||||||||||||||||||
| name = (char*) rtlMalloc(lenName); | ||||||||||||||||||||||||||||||||||
| memcpy(name, it->first.c_str(), lenName); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| lenValue = it->second.length(); | ||||||||||||||||||||||||||||||||||
| value = (char*) rtlMalloc(lenValue); | ||||||||||||||||||||||||||||||||||
| memcpy(value, it->second.c_str(), lenValue); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| void setIndexMeta(const std::string& name, const std::string& value) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| indexMetaData[name] = value; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| virtual const char * getFileName() { return filename.c_str(); } | ||||||||||||||||||||||||||||||||||
| virtual int getSequence() { return 0; } | ||||||||||||||||||||||||||||||||||
| virtual IOutputMetaData * queryDiskRecordSize() { return outMeta; } | ||||||||||||||||||||||||||||||||||
| virtual const char * queryRecordECL() { return nullptr; } | ||||||||||||||||||||||||||||||||||
| virtual unsigned getFlags() { return flags; } | ||||||||||||||||||||||||||||||||||
| virtual size32_t transform(ARowBuilder & rowBuilder, const void * row, IBlobCreator * blobs, unsigned __int64 & filepos) | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| // Seems like an UnexpectedVirtualFieldCallback could be used but what about blobs? | ||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please create a separate jira for supporting blobs. It will need changes to the translator, including a new virtual in the callback interface. |
||||||||||||||||||||||||||||||||||
| return translator->translate(rowBuilder, fieldCallback, (const byte *)row); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| virtual unsigned getKeyedSize() | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| if (outMeta == nullptr) | ||||||||||||||||||||||||||||||||||
| return 0; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| const RtlRecord& recAccessor = outMeta->queryRecordAccessor(true); | ||||||||||||||||||||||||||||||||||
| return recAccessor.getFixedOffset(recAccessor.getNumKeyedFields()); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| virtual unsigned getMaxKeySize() { return 0; } | ||||||||||||||||||||||||||||||||||
| virtual unsigned getFormatCrc() { return 0; } | ||||||||||||||||||||||||||||||||||
| virtual const char * queryCompression() { return compression.c_str(); } | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| public: | ||||||||||||||||||||||||||||||||||
| std::string filename; | ||||||||||||||||||||||||||||||||||
| std::string compression; | ||||||||||||||||||||||||||||||||||
| IOutputMetaData * inMeta = nullptr; | ||||||||||||||||||||||||||||||||||
| IOutputMetaData * outMeta = nullptr; | ||||||||||||||||||||||||||||||||||
| unsigned flags = 0; | ||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IBlobCreator | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| Owned<IFileIOStream> iFileIOStream; | ||||||||||||||||||||||||||||||||||
| Owned<IKeyBuilder> builder; | ||||||||||||||||||||||||||||||||||
| Owned<CRemoteIndexWriteHelper> helper; | ||||||||||||||||||||||||||||||||||
| Linked<IOutputMetaData> inMeta, outMeta; | ||||||||||||||||||||||||||||||||||
| UnexpectedVirtualFieldCallback fieldCallback; | ||||||||||||||||||||||||||||||||||
| OwnedMalloc<char> prevRowBuffer; | ||||||||||||||||||||||||||||||||||
| OwnedMalloc<char> rowBuffer; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| uint64_t uncompressedSize = 0; | ||||||||||||||||||||||||||||||||||
| uint64_t processed = 0; | ||||||||||||||||||||||||||||||||||
| size32_t maxDiskRecordSize = 0; | ||||||||||||||||||||||||||||||||||
| size32_t maxRecordSizeSeen = 0; // used to store the maximum record size seen, for metadata | ||||||||||||||||||||||||||||||||||
| bool isTlk = false; | ||||||||||||||||||||||||||||||||||
| bool opened = false; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| inline void processRow(const void *row, uint64_t rowSize) | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| unsigned __int64 fpos = 0; | ||||||||||||||||||||||||||||||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting the fpos correctly here is a bit odd, this would definitely need to come from the incoming record, but an fpos may not always make sense, and because the datasets are often projected it isn't easy to reliable get the fpos of a read dataset.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fpos only applicable if building an index of a base dataset (where fpos' refer to offset in flat file).
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately ECL has weird semantics to reuse the fileposition field if the last field in the payload is numeric. I think this is ok as it is, but the index will only be generally readable if it is defined with the FILEPOSITION(FALSE) attribute (if the last field is a numeric value). If you want to be able to create all keys then you will need to do some horrible transformations to read the integer value of the last field, put it into the fileposition field. |
||||||||||||||||||||||||||||||||||
| RtlStaticRowBuilder rowBuilder(rowBuffer, maxDiskRecordSize); | ||||||||||||||||||||||||||||||||||
| size32_t indexRowSize = helper->transform(rowBuilder, row, this, fpos); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| // Key builder checks for duplicate records so we can just check for sortedness | ||||||||||||||||||||||||||||||||||
| if (memcmp(prevRowBuffer.get(), rowBuffer.get(), helper->getKeyedSize()) > 0) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| throw createDafsExceptionV(DAFSERR_cmdstream_generalwritefailure, "CRemoteIndexWriteActivity: Incoming rows are not sorted."); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| builder->processKeyData(rowBuffer, fpos, indexRowSize); | ||||||||||||||||||||||||||||||||||
| uncompressedSize += (indexRowSize + sizeof(offset_t)); // Include FPOS in the uncompressed size | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| if (indexRowSize > maxRecordSizeSeen) | ||||||||||||||||||||||||||||||||||
| maxRecordSizeSeen = indexRowSize; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| processed++; | ||||||||||||||||||||||||||||||||||
| memcpy(prevRowBuffer.get(), rowBuffer.get(), maxDiskRecordSize); | ||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. related to earlier comment - this is only validation, should it be in a debug build only, or via a configurable option?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only need to save keyedSize. I suspect benefit of catching invalid input data outweighs the cost. |
||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| void openFileStream() | ||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||
| if (!recursiveCreateDirectoryForFile(fileName)) | ||||||||||||||||||||||||||||||||||
| throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get()); | ||||||||||||||||||||||||||||||||||
jpmcmu marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||
| throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get()); | |
| throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create directory for file: '%s'", fileName.get()); |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check - never fails.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably use a buffered io (e.g. createBufferedIOStream)
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: missing "override"
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why this string might need leading spaces trimmed ? (vs any other string)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I'm not sure why you would trim this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better way to determine the "default" compression format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code should probably use translateToCompMethod(compression)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really - it is only a very small subset of compression types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realistically I think you should always set htree_compressed_key and then pass through the compression as is. Row compression is not used outside the regression suite.
I would change the check in keybuild.cpp:
if (!isEmptyString(compression))
to
if (!isEmptyString(compression) && !strsame(compression, "lzw") && && !strsame(compression, "default"))
Which will allow lzw to be explicitly defined if we ever change the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is an example of some code that is made more complicated by having the helper.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to expose these options? I tried to match was exposed to ECL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be defaulting to true - it should be true for blob storage, and doesn't really harm to be true for other systems.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: can use boolToStr
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should always default on.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw an error if it has a trailing fileposition - will require changes elsewhere.
Copilot
AI
Jul 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The prevRowBuffer is allocated with the full maxDiskRecordSize but only helper->getKeyedSize() bytes are used in the comparison. Consider allocating only the needed size for the keyed portion to reduce memory usage, especially for records with large non-keyed portions.
| prevRowBuffer.allocateN(maxDiskRecordSize, true); | |
| prevRowBuffer.allocateN(helper->getKeyedSize(), true); |
Copilot
AI
Jul 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The destructor performs complex operations including calling builder->finish() which could potentially throw exceptions. Destructors should not throw exceptions as this can lead to undefined behavior. Consider moving the finish() logic to a separate cleanup method that can be called explicitly before destruction.
| { | |
| { | |
| try | |
| { | |
| cleanup(); | |
| } | |
| catch (...) | |
| { | |
| // Log the exception or handle it appropriately | |
| // Avoid propagating exceptions from the destructor | |
| } | |
| close(); | |
| } | |
| void cleanup() | |
| { |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no alternative at the moment, but when the file is closed (StreamCmd::CLOSE), it should call through to the acitivity to close, so we don't depend on dtor's to do this kind of work.
For now, it would be worth aadding a try/catch - as any unhandled exception at this point (within a dtor) will cause the process to exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to this, I think you are going to need to serialize back the last row, so that when the client has finished writing all parts of an index, it can use those last parts to create the TLK.
The response from StreamCmd::CLOSE could be extended to return structured info, that container this serialize row data.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: ^ could be on 1 line : Owned metadata = createPTree("metadata");
not worth diverging away from default to specify ipt_fast in this case, it's the default anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle partial records here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be illegal to write partial records to this function. Otherwise you have some notable complications - the call to find the row size needs protecting if the row is partial.
For the moment throw an error if rowOffset > sz
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be done once and stored as member.
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2699,7 +2699,12 @@ void toLower(std::string & value) | |||||||||||
| std::transform(value.cbegin(), value.cend(), value.begin(), func); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| void trim(std::string & value) | ||||||||||||
| { | ||||||||||||
| value.erase(value.begin(), std::find_if(value.begin(), value.end(), [](unsigned char ch) { | ||||||||||||
| return !std::isspace(ch); | ||||||||||||
| })); | ||||||||||||
|
||||||||||||
| })); | |
| })); | |
| value.erase(std::find_if(value.rbegin(), value.rend(), [](unsigned char ch) { | |
| return !std::isspace(ch); | |
| }).base(), value.end()); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -588,6 +588,7 @@ inline StringBuffer& operator << (StringBuffer& s, const TValue& value) | |
| } | ||
|
|
||
| extern jlib_decl void toLower(std::string & value); | ||
| extern jlib_decl void trim(std::string & value); | ||
|
||
|
|
||
| extern jlib_decl bool checkUnicodeLiteral(char const * str, unsigned length, unsigned & ep, StringBuffer & msg); | ||
| extern jlib_decl void decodeCppEscapeSequence(StringBuffer & out, const char * in, bool errorIfInvalid); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: leave one line, consistent with other spacing between classes.