HPCC-33806 Add support of building indexes to dafilesrv#20197
HPCC-33806 Add support of building indexes to dafilesrv#20197jpmcmu wants to merge 2 commits intohpcc-systems:candidate-9.14.xfrom
Conversation
|
Jira Issue: https://hpccsystems.atlassian.net//browse/HPCC-33806 Jirabot Action Result: |
| if (compression == "default") | ||
| { | ||
| flags |= HTREE_COMPRESSED_KEY; | ||
| compression = ""; |
There was a problem hiding this comment.
Is there a better way to determine the "default" compression format?
There was a problem hiding this comment.
this code should probably use translateToCompMethod(compression)
There was a problem hiding this comment.
Not really - it is only a very small subset of compression types.
There was a problem hiding this comment.
Realistically I think you should always set htree_compressed_key and then pass through the compression as is. Row compression is not used outside the regression suite.
I would change the check in keybuild.cpp:
if (!isEmptyString(compression))
to
if (!isEmptyString(compression) && !strsame(compression, "lzw") && && !strsame(compression, "default"))
Which will allow lzw to be explicitly defined if we ever change the default.
fs/dafsserver/dafsserver.cpp
Outdated
| helper->setIndexMeta("_nodeSize", std::to_string(nodeSize)); | ||
| } | ||
|
|
||
| if (config.hasProp("noSeek")) |
There was a problem hiding this comment.
Does it make sense to expose these options? I tried to match was exposed to ECL
There was a problem hiding this comment.
I think this should be defaulting to true - it should be true for blob storage, and doesn't really harm to be true for other systems.
|
|
||
| inline void processRow(const void *row, uint64_t rowSize) | ||
| { | ||
| unsigned __int64 fpos = 0; |
There was a problem hiding this comment.
Setting the fpos correctly here is a bit odd, this would definitely need to come from the incoming record, but an fpos may not always make sense, and because the datasets are often projected it isn't easy to reliable get the fpos of a read dataset.
There was a problem hiding this comment.
fpos only applicable if building an index of a base dataset (where fpos' refer to offset in flat file).
Not sure there's any need to support it.
There was a problem hiding this comment.
Unfortunately ECL has weird semantics to reuse the fileposition field if the last field in the payload is numeric.
I think this is ok as it is, but the index will only be generally readable if it is defined with the FILEPOSITION(FALSE) attribute (if the last field is a numeric value).
If you want to be able to create all keys then you will need to do some horrible transformations to read the integer value of the last field, put it into the fileposition field.
Again create a separate jira to revisit.
|
@ghalliday @jakesmith Still working on a few things here especially in relation to the TLK / publishing, but writing of an index is working. |
| virtual void write(size32_t sz, const void *rowData) override | ||
| { | ||
| size32_t rowOffset = 0; | ||
| while(rowOffset < sz) |
There was a problem hiding this comment.
Need to handle partial records here.
There was a problem hiding this comment.
I think it should be illegal to write partial records to this function. Otherwise you have some notable complications - the call to find the row size needs protecting if the row is partial.
For the moment throw an error if rowOffset > sz
| } | ||
| }; | ||
|
|
||
|
|
There was a problem hiding this comment.
trivial: leave one line, consistent with other spacing between classes.
fs/dafsserver/dafsserver.cpp
Outdated
| translator.setown(createRecordTranslator(outRecord, inRecord)); | ||
| } | ||
|
|
||
| virtual bool getIndexMeta(size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) |
fs/dafsserver/dafsserver.cpp
Outdated
| if (config.hasProp("noSeek")) | ||
| { | ||
| bool noSeek = config.getPropBool("noSeek"); | ||
| helper->setIndexMeta("_noSeek", noSeek ? "true" : "false"); |
fs/dafsserver/dafsserver.cpp
Outdated
| return true; | ||
| } | ||
|
|
||
| void setIndexMeta(const std::string& name, const std::string& value) |
There was a problem hiding this comment.
picky: nicer if virtuals of IHThorIndexWriteArg kept together.
There was a problem hiding this comment.
doesn't getWidth() need to be implemented with count of meta fields for getIndexMeta to be callable ?
fs/dafsserver/dafsserver.cpp
Outdated
| if (idx >= indexMetaData.size()) | ||
| return false; | ||
|
|
||
| auto it = indexMetaData.begin(); |
There was a problem hiding this comment.
would a std::vector of a pair of std::string's be more suitable?
fs/dafsserver/dafsserver.cpp
Outdated
|
|
||
| ~CRemoteIndexWriteActivity() | ||
| { | ||
| if (builder != nullptr && helper != nullptr) |
There was a problem hiding this comment.
there is no alternative at the moment, but when the file is closed (StreamCmd::CLOSE), it should call through to the acitivity to close, so we don't depend on dtor's to do this kind of work.
For now, it would be worth aadding a try/catch - as any unhandled exception at this point (within a dtor) will cause the process to exit.
There was a problem hiding this comment.
Related to this, I think you are going to need to serialize back the last row, so that when the client has finished writing all parts of an index, it can use those last parts to create the TLK.
The response from StreamCmd::CLOSE could be extended to return structured info, that container this serialize row data.
fs/dafsserver/dafsserver.cpp
Outdated
| if (builder != nullptr && helper != nullptr) | ||
| { | ||
| Owned<IPropertyTree> metadata; | ||
| metadata.setown(createPTree("metadata", ipt_fast)); |
There was a problem hiding this comment.
trivial: ^ could be on 1 line : Owned metadata = createPTree("metadata");
not worth diverging away from default to specify ipt_fast in this case, it's the default anyway.
fs/dafsserver/dafsserver.cpp
Outdated
| size32_t rowOffset = 0; | ||
| while(rowOffset < sz) | ||
| { | ||
| const RtlRecord& inputRecordAccessor = inMeta->queryRecordAccessor(true); |
There was a problem hiding this comment.
could be done once and stored as member.
system/jlib/jstring.hpp
Outdated
| } | ||
|
|
||
| extern jlib_decl void toLower(std::string & value); | ||
| extern jlib_decl void trim(std::string & value); |
There was a problem hiding this comment.
could do with comment.. what does it do? Looks like trims leading white space only not trailing
fs/dafsserver/dafsserver.cpp
Outdated
|
|
||
| std::string compression = config.queryProp("compressed", "default"); | ||
| toLower(compression); | ||
| trim(compression); |
There was a problem hiding this comment.
curious why this string might need leading spaces trimmed ? (vs any other string)
There was a problem hiding this comment.
I agree, I'm not sure why you would trim this field.
|
Converting to non draft - since ready to review and to allow copilot to run. |
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for building indexes to the dafilesrv (data file server) component, enabling remote index creation capabilities as part of HPCC-33806. The implementation includes a new index write activity class and supporting infrastructure.
- Adds a new
CRemoteIndexWriteActivityclass to handle remote index building operations - Implements helper classes and utilities for index metadata management and record transformation
- Adds configuration support for index compression, node size, and other index-specific options
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| system/jlib/jstring.hpp | Adds declaration for new trim utility function |
| system/jlib/jstring.cpp | Implements trim function for string whitespace removal |
| fs/dafsserver/dafsserver.cpp | Adds complete index writing functionality with new classes and TAKindexwrite support |
| esp/services/ws_dfu/ws_dfuService.cpp | Adds missing break statement and index file descriptor configuration |
| { | ||
| value.erase(value.begin(), std::find_if(value.begin(), value.end(), [](unsigned char ch) { | ||
| return !std::isspace(ch); | ||
| })); |
There was a problem hiding this comment.
The trim function only removes leading whitespace but not trailing whitespace. A complete trim implementation should remove both leading and trailing whitespace. Consider using value.erase(std::find_if(value.rbegin(), value.rend(), [](unsigned char ch) { return !std::isspace(ch); }).base(), value.end()) to also remove trailing whitespace.
| })); | |
| })); | |
| value.erase(std::find_if(value.rbegin(), value.rend(), [](unsigned char ch) { | |
| return !std::isspace(ch); | |
| }).base(), value.end()); |
fs/dafsserver/dafsserver.cpp
Outdated
| size32_t indexRowSize = helper->transform(rowBuilder, row, this, fpos); | ||
|
|
||
| // Key builder checks for duplicate records so we can just check for sortedness | ||
| if (memcmp(prevRowBuffer.get(), rowBuffer.get(), helper->getKeyedSize()) > 0) |
There was a problem hiding this comment.
The comparison uses memcmp which performs byte-wise comparison, but this may not be correct for all data types. For complex types like strings with different encodings or numeric types with different byte representations, this could give incorrect sort order results. Consider using a proper record comparison function that understands the data types.
| void openFileStream() | ||
| { | ||
| if (!recursiveCreateDirectoryForFile(fileName)) | ||
| throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get()); |
There was a problem hiding this comment.
Typo in error message: 'dirtory' should be 'directory'.
| throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get()); | |
| throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create directory for file: '%s'", fileName.get()); |
| throw MakeStringException(99, "Index maximum record length (%d) exceeds 32k internal limit", maxDiskRecordSize); | ||
|
|
||
| rowBuffer.allocateN(maxDiskRecordSize, true); | ||
| prevRowBuffer.allocateN(maxDiskRecordSize, true); |
There was a problem hiding this comment.
The prevRowBuffer is allocated with the full maxDiskRecordSize but only helper->getKeyedSize() bytes are used in the comparison. Consider allocating only the needed size for the keyed portion to reduce memory usage, especially for records with large non-keyed portions.
| prevRowBuffer.allocateN(maxDiskRecordSize, true); | |
| prevRowBuffer.allocateN(helper->getKeyedSize(), true); |
| } | ||
|
|
||
| ~CRemoteIndexWriteActivity() | ||
| { |
There was a problem hiding this comment.
The destructor performs complex operations including calling builder->finish() which could potentially throw exceptions. Destructors should not throw exceptions as this can lead to undefined behavior. Consider moving the finish() logic to a separate cleanup method that can be called explicitly before destruction.
| { | |
| { | |
| try | |
| { | |
| cleanup(); | |
| } | |
| catch (...) | |
| { | |
| // Log the exception or handle it appropriately | |
| // Avoid propagating exceptions from the destructor | |
| } | |
| close(); | |
| } | |
| void cleanup() | |
| { |
| virtual unsigned getFlags() { return flags; } | ||
| virtual size32_t transform(ARowBuilder & rowBuilder, const void * row, IBlobCreator * blobs, unsigned __int64 & filepos) | ||
| { | ||
| // Seems like an UnexpectedVirtualFieldCallback could be used but what about blobs? |
There was a problem hiding this comment.
Please create a separate jira for supporting blobs. It will need changes to the translator, including a new virtual in the callback interface.
|
|
||
| inline void processRow(const void *row, uint64_t rowSize) | ||
| { | ||
| unsigned __int64 fpos = 0; |
There was a problem hiding this comment.
Unfortunately ECL has weird semantics to reuse the fileposition field if the last field in the payload is numeric.
I think this is ok as it is, but the index will only be generally readable if it is defined with the FILEPOSITION(FALSE) attribute (if the last field is a numeric value).
If you want to be able to create all keys then you will need to do some horrible transformations to read the integer value of the last field, put it into the fileposition field.
Again create a separate jira to revisit.
| maxRecordSizeSeen = indexRowSize; | ||
|
|
||
| processed++; | ||
| memcpy(prevRowBuffer.get(), rowBuffer.get(), maxDiskRecordSize); |
There was a problem hiding this comment.
Only need to save keyedSize. I suspect benefit of catching invalid input data outweighs the cost.
fs/dafsserver/dafsserver.cpp
Outdated
| size32_t indexRowSize = helper->transform(rowBuilder, row, this, fpos); | ||
|
|
||
| // Key builder checks for duplicate records so we can just check for sortedness | ||
| if (memcmp(prevRowBuffer.get(), rowBuffer.get(), helper->getKeyedSize()) > 0) |
There was a problem hiding this comment.
cache helper->getKeyedSize() in a member variable.
fs/dafsserver/dafsserver.cpp
Outdated
| flags &= ~USE_TRAILING_HEADER; | ||
| } | ||
|
|
||
| size32_t fileposSize = hasTrailingFileposition(helper->queryDiskRecordSize()->queryTypeInfo()) ? sizeof(offset_t) : 0; |
There was a problem hiding this comment.
Throw an error if it has a trailing fileposition - will require changes elsewhere.
| } | ||
| }; | ||
|
|
||
| class CRemoteIndexWriteHelper : public CThorIndexWriteArg |
There was a problem hiding this comment.
Does this class actually provided any benefit? It is legal to call createKeyBuilder() with null for the helper.
I suspect it adds complication with no benefit.
I don't think there is currently a way of adding bloom filters without a helper, but that it would be better to add virtuals to allow that, and apply the values directly from a property tree.
Long term that is the direction that disk write is going for many options.
| virtual void write(size32_t sz, const void *rowData) override | ||
| { | ||
| size32_t rowOffset = 0; | ||
| while(rowOffset < sz) |
There was a problem hiding this comment.
I think it should be illegal to write partial records to this function. Otherwise you have some notable complications - the call to find the row size needs protecting if the row is partial.
For the moment throw an error if rowOffset > sz
| unsigned nodeSize = NODESIZE; | ||
| if (config.hasProp("nodeSize")) | ||
| { | ||
| nodeSize = config.getPropInt("nodeSize"); |
There was a problem hiding this comment.
I think this is an example of some code that is made more complicated by having the helper.
fs/dafsserver/dafsserver.cpp
Outdated
|
|
||
| std::string compression = config.queryProp("compressed", "default"); | ||
| toLower(compression); | ||
| trim(compression); |
There was a problem hiding this comment.
I agree, I'm not sure why you would trim this field.
Type of change:
Checklist:
Smoketest:
Testing: