Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions esp/services/ws_dfu/ws_dfuService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6520,6 +6520,7 @@ bool CWsDfuEx::onDFUFileCreateV2(IEspContext &context, IEspDFUFileCreateV2Reques
break;
case CDFUFileType_Index:
fileType = "key";
break;
default:
throw makeStringExceptionV(ECLWATCH_MISSING_FILETYPE, "DFUFileCreateV2: File type not provided");
}
Expand Down Expand Up @@ -6568,7 +6569,15 @@ bool CWsDfuEx::onDFUFileCreateV2(IEspContext &context, IEspDFUFileCreateV2Reques
Owned<IGroup> group = queryNamedGroupStore().lookup(groupName.str());
if (!group)
throw makeStringExceptionV(ECLWATCH_INVALID_INPUT, "DFUFileCreateV2: Failed to get Group %s.", groupName.str());

fileDesc.setown(createFileDescriptor(tempFileName, clusterTypeString(clusterType, false), groupName, group));
if (kind == CDFUFileType_Index)
{
uint tlkPartIndex = fileDesc->numParts();
fileDesc->setNumParts(fileDesc->numParts() + 1);
fileDesc->queryPart(tlkPartIndex)->queryProperties().setProp("@kind", "topLevelKey");
}

// NB: if file has copies on >1 cluster, they must share the same key
std::vector<std::string> groups;
groups.push_back(groupName.str());
Expand Down
274 changes: 269 additions & 5 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@
#include "rmtfile.hpp"
#include "rmtclient_impl.hpp"
#include "dafsserver.hpp"
#include "keybuild.hpp"
#include "eclhelper_base.hpp"
#include "thorfile.hpp"

#include "ftslavelib.hpp"
#include "filecopy.hpp"
Expand Down Expand Up @@ -2575,8 +2578,6 @@ class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity>
return this;
}
};


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: leave one line, consistent with other spacing between classes.

class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity
{
typedef CRemoteWriteBaseActivity PARENT;
Expand Down Expand Up @@ -2656,6 +2657,257 @@ class CRemoteDiskWriteActivity : public CRemoteWriteBaseActivity
};


class CRemoteIndexWriteHelper : public CThorIndexWriteArg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class actually provided any benefit? It is legal to call createKeyBuilder() with null for the helper.
I suspect it adds complication with no benefit.
I don't think there is currently a way of adding bloom filters without a helper, but that it would be better to add virtuals to allow that, and apply the values directly from a property tree.

Long term that is the direction that disk write is going for many options.

{
UnexpectedVirtualFieldCallback fieldCallback;
Owned<const IDynamicTransform> translator;
std::vector<std::pair<std::string, std::string>> indexMetaData;
public:
CRemoteIndexWriteHelper(const char * _filename, const char* _compression, IOutputMetaData * _inMeta, IOutputMetaData * _outMeta, unsigned _flags)
: filename(_filename), compression(_compression), inMeta(_inMeta), outMeta(_outMeta), flags(_flags)
{
const RtlRecord &inRecord = inMeta->queryRecordAccessor(true);
const RtlRecord &outRecord = outMeta->queryRecordAccessor(true);
translator.setown(createRecordTranslator(outRecord, inRecord));
}

void setIndexMeta(const std::string& name, const std::string& value)
{
indexMetaData.emplace_back(name, value);
}

virtual const char * getFileName() override { return filename.c_str(); }
virtual int getSequence() override { return 0; }
virtual IOutputMetaData * queryDiskRecordSize() override { return outMeta; }
virtual const char * queryRecordECL() override { return nullptr; }
virtual unsigned getFlags() override { return flags; }
virtual unsigned getKeyedSize() override
{
if (outMeta == nullptr)
return 0;

const RtlRecord& recAccessor = outMeta->queryRecordAccessor(true);
return recAccessor.getFixedOffset(recAccessor.getNumKeyedFields());
}
virtual unsigned getMaxKeySize() override { return 0; }
virtual unsigned getFormatCrc() override { return 0; }
virtual unsigned getWidth() override { return indexMetaData.size(); }
virtual const char * queryCompression() override { return compression.c_str(); }
virtual bool getIndexMeta(size32_t & lenName, char * & name, size32_t & lenValue, char * & value, unsigned idx) override
{
if (idx >= indexMetaData.size())
return false;

const auto &entry = indexMetaData[idx];

lenName = entry.first.length();
name = (char*) rtlMalloc(lenName);
memcpy(name, entry.first.c_str(), lenName);

lenValue = entry.second.length();
value = (char*) rtlMalloc(lenValue);
memcpy(value, entry.second.c_str(), lenValue);

return true;
}
virtual size32_t transform(ARowBuilder & rowBuilder, const void * row, IBlobCreator * blobs, unsigned __int64 & filepos) override
{
// Seems like an UnexpectedVirtualFieldCallback could be used but what about blobs?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a separate jira for supporting blobs. It will need changes to the translator, including a new virtual in the callback interface.

return translator->translate(rowBuilder, fieldCallback, (const byte *)row);
}

public:
std::string filename;
std::string compression;
IOutputMetaData * inMeta = nullptr;
IOutputMetaData * outMeta = nullptr;
unsigned flags = 0;
};

class CRemoteIndexWriteActivity : public CRemoteWriteBaseActivity, implements IBlobCreator
{
Owned<IFileIOStream> iFileIOStream;
Owned<IKeyBuilder> builder;
Owned<CRemoteIndexWriteHelper> helper;
Linked<IOutputMetaData> inMeta, outMeta;
UnexpectedVirtualFieldCallback fieldCallback;
OwnedMalloc<char> prevRowBuffer;
OwnedMalloc<char> rowBuffer;

uint64_t uncompressedSize = 0;
uint64_t processed = 0;
size32_t maxDiskRecordSize = 0;
size32_t keyedSize = 0;
size32_t maxRecordSizeSeen = 0; // used to store the maximum record size seen, for metadata
bool isTlk = false;
bool opened = false;

inline void processRow(const void *row, uint64_t rowSize)
{
unsigned __int64 fpos = 0;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the fpos correctly here is a bit odd, this would definitely need to come from the incoming record, but an fpos may not always make sense, and because the datasets are often projected it isn't easy to reliable get the fpos of a read dataset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fpos only applicable if building an index of a base dataset (where fpos' refer to offset in flat file).
Not sure there's any need to support it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately ECL has weird semantics to reuse the fileposition field if the last field in the payload is numeric.

I think this is ok as it is, but the index will only be generally readable if it is defined with the FILEPOSITION(FALSE) attribute (if the last field is a numeric value).

If you want to be able to create all keys then you will need to do some horrible transformations to read the integer value of the last field, put it into the fileposition field.
Again create a separate jira to revisit.

RtlStaticRowBuilder rowBuilder(rowBuffer, maxDiskRecordSize);
size32_t indexRowSize = helper->transform(rowBuilder, row, this, fpos);

// Key builder checks for duplicate records so we can just check for sortedness
if (memcmp(prevRowBuffer.get(), rowBuffer.get(), keyedSize) > 0)
{
throw createDafsExceptionV(DAFSERR_cmdstream_generalwritefailure, "CRemoteIndexWriteActivity: Incoming rows are not sorted.");
}

builder->processKeyData(rowBuffer, fpos, indexRowSize);
uncompressedSize += indexRowSize;

if (indexRowSize > maxRecordSizeSeen)
maxRecordSizeSeen = indexRowSize;

processed++;
memcpy(prevRowBuffer.get(), rowBuffer.get(), maxDiskRecordSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to earlier comment - this is only validation, should it be in a debug build only, or via a configurable option?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only need to save keyedSize. I suspect benefit of catching invalid input data outweighs the cost.

}

void openFileStream()
{
if (!recursiveCreateDirectoryForFile(fileName))
throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get());
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in error message: 'dirtory' should be 'directory'.

Suggested change
throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create dirtory for file: '%s'", fileName.get());
throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to create directory for file: '%s'", fileName.get());

Copilot uses AI. Check for mistakes.
OwnedIFile iFile = createIFile(fileName);

iFileIO.setown(iFile->open(IFOcreate));
if (!iFileIO)
throw createDafsExceptionV(DAFSERR_cmdstream_openfailure, "Failed to open: '%s' for write", fileName.get());

iFileIOStream.setown(createBufferedIOStream(iFileIO));
opened = true;
}

virtual unsigned __int64 createBlob(size32_t size, const void * ptr) override
{
return builder->createBlob(size, (const char *) ptr);
}
public:
CRemoteIndexWriteActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : CRemoteWriteBaseActivity(config, fileDesc)
{
inMeta.setown(getTypeInfoOutputMetaData(config, "input", false));
if (!inMeta)
throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteIndexWriteActivity: input metadata missing");

outMeta.setown(getTypeInfoOutputMetaData(config, "output", false));
if (!outMeta)
throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteIndexWriteActivity: output metadata missing");

std::string compression = config.queryProp("compressed", "default");
toLower(compression);

unsigned flags = COL_PREFIX | HTREE_FULLSORT_KEY | USE_TRAILING_HEADER | HTREE_COMPRESSED_KEY | HTREE_QUICK_COMPRESSED_KEY;

if (compression == "default" || compression == "lzw")
{
compression = "";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to determine the "default" compression format?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code should probably use translateToCompMethod(compression)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really - it is only a very small subset of compression types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically I think you should always set htree_compressed_key and then pass through the compression as is. Row compression is not used outside the regression suite.

I would change the check in keybuild.cpp:

if (!isEmptyString(compression))

to

if (!isEmptyString(compression) && !strsame(compression, "lzw") && && !strsame(compression, "default"))

Which will allow lzw to be explicitly defined if we ever change the default.

}
else if (compression.substr(0,7) == "inplace")
{
// pass through the compression string as-is
}

bool isVariable = outMeta->isVariableSize();
if (isVariable)
flags |= HTREE_VARSIZE;

helper.setown(new CRemoteIndexWriteHelper(fileName.get(), compression.c_str(), inMeta, outMeta, flags));

unsigned nodeSize = NODESIZE;
if (config.hasProp("nodeSize"))
{
nodeSize = config.getPropInt("nodeSize");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an example of some code that is made more complicated by having the helper.

helper->setIndexMeta("_nodeSize", std::to_string(nodeSize));
}

bool noSeek = config.getPropBool("noSeek", true);
helper->setIndexMeta("_noSeek", boolToStr(noSeek));
if (noSeek)
flags |= TRAILING_HEADER_ONLY;

bool useTrailingHeader = config.getPropBool("useTrailingHeader", true);
helper->setIndexMeta("_useTrailingHeader", boolToStr(useTrailingHeader));
if (useTrailingHeader)
flags |= USE_TRAILING_HEADER;
else
flags &= ~USE_TRAILING_HEADER;

if (hasTrailingFileposition(helper->queryDiskRecordSize()->queryTypeInfo()))
throw createDafsException(DAFSERR_cmdstream_protocol_failure, "CRemoteIndexWriteActivity: trailing fileposition not supported, use FILEPOSITION(FALSE)");

if (isVariable)
{
if (helper->getFlags() & TIWmaxlength)
maxDiskRecordSize = helper->getMaxKeySize();
else
maxDiskRecordSize = KEYBUILD_MAXLENGTH; // Current default behaviour, could be improved in the future
}
else
maxDiskRecordSize = helper->queryDiskRecordSize()->getFixedSize();

if (maxDiskRecordSize > KEYBUILD_MAXLENGTH)
throw MakeStringException(99, "Index maximum record length (%d) exceeds 32k internal limit", maxDiskRecordSize);

keyedSize = helper->getKeyedSize();

rowBuffer.allocateN(maxDiskRecordSize, true);
prevRowBuffer.allocateN(maxDiskRecordSize, true);
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prevRowBuffer is allocated with the full maxDiskRecordSize but only helper->getKeyedSize() bytes are used in the comparison. Consider allocating only the needed size for the keyed portion to reduce memory usage, especially for records with large non-keyed portions.

Suggested change
prevRowBuffer.allocateN(maxDiskRecordSize, true);
prevRowBuffer.allocateN(helper->getKeyedSize(), true);

Copilot uses AI. Check for mistakes.

openFileStream();
builder.setown(createKeyBuilder(iFileIOStream.get(), flags, maxDiskRecordSize, nodeSize, keyedSize, 0, helper.get(), compression.c_str(), true, false));
}

~CRemoteIndexWriteActivity()
{
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The destructor performs complex operations including calling builder->finish() which could potentially throw exceptions. Destructors should not throw exceptions as this can lead to undefined behavior. Consider moving the finish() logic to a separate cleanup method that can be called explicitly before destruction.

Suggested change
{
{
try
{
cleanup();
}
catch (...)
{
// Log the exception or handle it appropriately
// Avoid propagating exceptions from the destructor
}
close();
}
void cleanup()
{

Copilot uses AI. Check for mistakes.
try
{
if (builder != nullptr && helper != nullptr)
{
Owned<IPropertyTree> metadata = createPTree("metadata");
buildUserMetadata(metadata, *helper);

metadata->setProp("_record_ECL", helper->queryRecordECL());
setRtlFormat(*metadata, helper->queryDiskRecordSize());

unsigned int fileCrc;
builder->finish(metadata, &fileCrc, maxRecordSizeSeen, nullptr);
}

close();
}
catch (IException *e)
{
EXCLOG(e, "~CRemoteIndexWriteActivity");
e->Release();
}
catch (...)
{
IERRLOG("~CRemoteIndexWriteActivity: unknown exception");
}
}

virtual void write(size32_t sz, const void *rowData) override
{
const RtlRecord& inputRecordAccessor = inMeta->queryRecordAccessor(true);
size32_t rowOffset = 0;
while(rowOffset < sz)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to handle partial records here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be illegal to write partial records to this function. Otherwise you have some notable complications - the call to find the row size needs protecting if the row is partial.
For the moment throw an error if rowOffset > sz

{
size32_t rowSize = inputRecordAccessor.getRecordSize((const byte *)rowData + rowOffset);
processRow((const byte *)rowData + rowOffset, rowSize);
rowOffset += rowSize;
}
if (rowOffset > sz)
throw createDafsExceptionV(DAFSERR_cmdstream_generalwritefailure, "CRemoteIndexWriteActivity: partial record detected (offset=%u, size=%u)", rowOffset, sz);
}

virtual void serializeCursor(MemoryBuffer &tgt) const override {}
virtual void restoreCursor(MemoryBuffer &src) override {}
virtual StringBuffer &getInfoStr(StringBuffer &out) const override
{
return out.appendf("indexwrite[%s]", fileName.get());
}
};

// create a { unsigned8 } output meta for the count
static const RtlIntTypeInfo indexCountFieldType(type_unsigned|type_int, 8);
static const RtlFieldStrInfo indexCountField("count", nullptr, &indexCountFieldType);
Expand Down Expand Up @@ -2924,6 +3176,11 @@ IRemoteActivity *createRemoteActivity(IPropertyTree &actNode, bool authorizedOnl
activity.setown(new CRemoteDiskWriteActivity(actNode, fileDesc));
break;
}
case TAKindexwrite:
{
activity.setown(new CRemoteIndexWriteActivity(actNode, fileDesc));
break;
}
default: // in absense of type, read is assumed and file format is auto-detected.
{
const char *action = actNode.queryProp("action");
Expand Down Expand Up @@ -4938,13 +5195,20 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
* {
* "format" : "binary",
* "command": "newstream"
* "replyLimit" : "64",
* "replylimit" : "64",
* "compressed" : "LZW", // Default, LZW, ROW, INPLACE:*
* "nodeSize" : 32768,
* "noSeek" : false, // if true don't add the header that allows seeking
* "node" : {
* "kind" : "indexwrite",
* "fileName": "examplefilename",
* "filename": "examplefilename",
* "input" : {
* "f1" : "string",
* "f2" : "string"
* },
* "output" : {
* "f1" : "string5",
* "f2" : "string5"
* "f2" : "string5",
* }
* }
* }
Expand Down
3 changes: 2 additions & 1 deletion system/jhtree/keybuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "eclhelper.hpp"
#include "bloom.hpp"
#include "jmisc.hpp"
#include "jstring.hpp"
#include "jhinplace.hpp"

struct CRC32HTE
Expand Down Expand Up @@ -220,7 +221,7 @@ class CKeyBuilder : public CInterfaceOf<IKeyBuilder>
compression = _helper->queryCompression();
}

if (!isEmptyString(compression))
if (!isEmptyString(compression) && !strsame(compression, "lzw") && !strsame(compression, "default"))
{
hdr->version = 2; // Old builds will give a reasonable error message
if (strieq(compression, "POC") || startsWithIgnoreCase(compression, "POC:"))
Expand Down
7 changes: 6 additions & 1 deletion system/jlib/jstring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2699,7 +2699,12 @@ void toLower(std::string & value)
std::transform(value.cbegin(), value.cend(), value.begin(), func);
}


void ltrim(std::string & value)
{
value.erase(value.begin(), std::find_if(value.begin(), value.end(), [](unsigned char ch) {
return !std::isspace(ch);
}));
Copy link

Copilot AI Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trim function only removes leading whitespace but not trailing whitespace. A complete trim implementation should remove both leading and trailing whitespace. Consider using value.erase(std::find_if(value.rbegin(), value.rend(), [](unsigned char ch) { return !std::isspace(ch); }).base(), value.end()) to also remove trailing whitespace.

Suggested change
}));
}));
value.erase(std::find_if(value.rbegin(), value.rend(), [](unsigned char ch) {
return !std::isspace(ch);
}).base(), value.end());

Copilot uses AI. Check for mistakes.
}

StringBuffer & ncnameEscape(char const * in, StringBuffer & out)
{
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jstring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ inline StringBuffer& operator << (StringBuffer& s, const TValue& value)
}

extern jlib_decl void toLower(std::string & value);
extern jlib_decl void ltrim(std::string & value);

extern jlib_decl bool checkUnicodeLiteral(char const * str, unsigned length, unsigned & ep, StringBuffer & msg);
extern jlib_decl void decodeCppEscapeSequence(StringBuffer & out, const char * in, bool errorIfInvalid);
Expand Down
Loading