Skip to content

Commit 5cf44ec

Browse files
committed
Merge remote-tracking branch 'origin/candidate-10.2.x'
Signed-off-by: Gordon Smith <GordonJSmith@gmail.com> # Conflicts: # helm/hpcc/Chart.yaml # helm/hpcc/templates/_helpers.tpl # version.cmake
2 parents a896d7f + a48da44 commit 5cf44ec

29 files changed

+703
-625
lines changed

common/eventconsumption/README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,13 @@ If algorithm estimation is enabled, the value is replaced. If the node will not
320320

321321

322322

323-
### CMetaInfoState
323+
### CMetaInfoState::CCollector
324324

325-
A concrete implementation of `IEventVisitationLink` that observes all `FileInformation` and `QueryStart` events. The relationships between `FileId` and `Path` attributes, and also `EventTraceId` and `ServiceName` attributes, are retained and made available to operations and other event visitors. When visited before a filter, the state information is available to all operations and visitors even if the filter suppresses the observed events.
325+
`CMetaInfoState` provides a runtime repository of event-defined state information, including values from the `FileInformation` and `QueryStart` events. As a concrete implementation of `IEventVisitationLink`, `CCollector` is a companion class that extracts event data for the repository.
326+
327+
To avoid data loss, this link must precede all other links. It is the responsibility of event iterating code to ensure this requirement is satisfied.
328+
329+
The repository is available to all operations and all other visitors. The collector itself is not.
326330

327331
# CEventVisitationLinkTester
328332

common/eventconsumption/eventindexhotspot.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,12 @@ class CHotspotEventVisitor : public CInterfaceOf<IEventVisitor>
138138
if (event.hasAttribute(EvAttrFileId))
139139
{
140140
EventType type = event.queryType();
141-
if (type != MetaFileInformation && observedEvents.count(type) == 0)
141+
if (type == MetaFileInformation || observedEvents.count(type) == 0)
142142
return true;
143143
__uint64 fileId = event.queryNumericValue(EvAttrFileId);
144144
auto [it, inserted] = activity.try_emplace(fileId, *this, fileId);
145145
NodeKind nodeKind = queryIndexNodeKind(event);
146-
if (observedEvents.count(type))
147-
it->second.recordEvent(event.queryNumericValue(EvAttrFileOffset), nodeKind);
146+
it->second.recordEvent(event.queryNumericValue(EvAttrFileOffset), nodeKind);
148147
}
149148
return true;
150149
}

common/eventconsumption/eventindexhotspotbuckets.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ class CAllBucketVisitor : public CStreamingBucketVisitor
148148
lines.append(" ").append(mapNodeKind(nodeKind)).append('\n');
149149
firstOfKind[nodeKind] = false;
150150
}
151-
lines.append(" - first-node: ").append(bucket2page(bucket, currentGranularity)).append('\n');
151+
152+
auto firstNode = bucket2page(bucket, currentGranularity);
153+
auto lastNode = bucket2page(bucket + 1, currentGranularity) - 1;
154+
155+
lines.append(" - node-range: ").append(firstNode).append('-').append(lastNode).append('\n');
156+
lines.append(" offset-range: ").append(page2Offset(firstNode, defaultPageBits)).append('-').append(page2Offset(lastNode + 1, defaultPageBits) - 1).append('\n');
152157
lines.append(" events: ").append(stat).append('\n');
153158
out->put(lines.length(), lines.str());
154159
}
@@ -281,7 +286,11 @@ class CTopBucketVisitor : public CStreamingBucketVisitor
281286
lines.append(" bucket:\n");
282287
for (const Hit& hit : info.hits)
283288
{
284-
lines.append(" - first-node: ").append(bucket2page(hit.second, currentGranularity)).append('\n');
289+
auto firstNode = bucket2page(hit.second, currentGranularity);
290+
auto lastNode = bucket2page(hit.second + 1, currentGranularity) - 1;
291+
292+
lines.append(" - node-range: ").append(firstNode).append('-').append(lastNode).append('\n');
293+
lines.append(" offset-range: ").append(page2Offset(firstNode, defaultPageBits)).append('-').append(page2Offset(lastNode + 1, defaultPageBits) - 1).append('\n');
285294
lines.append(" events: ").append(hit.first).append('\n');
286295
}
287296
if (info.droppedCount)

common/eventconsumption/eventmetaparser.cpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
#include "eventmetaparser.hpp"
1919
#include "jevent.hpp"
2020

21-
void CMetaInfoState::configure(const IPropertyTree& config)
21+
void CMetaInfoState::CCollector::configure(const IPropertyTree& config)
2222
{
2323
}
2424

25-
bool CMetaInfoState::visitEvent(CEvent& event)
25+
bool CMetaInfoState::CCollector::visitEvent(CEvent& event)
2626
{
2727
switch (event.queryType())
2828
{
@@ -32,8 +32,8 @@ bool CMetaInfoState::visitEvent(CEvent& event)
3232
const char* path = event.queryTextValue(EvAttrPath);
3333
if (!isEmptyString(path))
3434
{
35-
fileIdToPath[fileId].set(path);
36-
pathToFileId[path] = fileId;
35+
metaState->fileIdToPath[fileId].set(path);
36+
metaState->pathToFileId[path] = fileId;
3737
}
3838
}
3939
break;
@@ -44,7 +44,7 @@ bool CMetaInfoState::visitEvent(CEvent& event)
4444
const char* traceId = event.queryTextValue(EvAttrEventTraceId);
4545
const char* serviceName = event.queryTextValue(EvAttrServiceName);
4646
if (!isEmptyString(traceId) && !isEmptyString(serviceName))
47-
traceIdToService.emplace(traceId, serviceName);
47+
metaState->traceIdToService.emplace(traceId, serviceName);
4848
}
4949
}
5050
break;
@@ -56,6 +56,16 @@ bool CMetaInfoState::visitEvent(CEvent& event)
5656
return true;
5757
}
5858

59+
CMetaInfoState::CCollector::CCollector(CMetaInfoState& _metaState)
60+
: metaState(&_metaState)
61+
{
62+
}
63+
64+
IEventVisitationLink* CMetaInfoState::getCollector()
65+
{
66+
return new CCollector(*this);
67+
}
68+
5969
const char* CMetaInfoState::queryFilePath(__uint64 fileId) const
6070
{
6171
auto it = fileIdToPath.find(fileId);

common/eventconsumption/eventmetaparser.hpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@
2424
#include <string>
2525

2626
// Visitor that parses and caches file ID to path mappings and trace ID to service name mappings
27-
class event_decl CMetaInfoState : public CInterfaceOf<IEventVisitationLink>
27+
class event_decl CMetaInfoState : public CInterface
2828
{
29-
public:
30-
CMetaInfoState() = default;
31-
~CMetaInfoState() = default;
32-
33-
// IEventVisitationLink interface
34-
IMPLEMENT_IEVENTVISITATIONLINK
35-
virtual void configure(const IPropertyTree& config) override;
36-
virtual bool visitEvent(CEvent& event) override;
29+
class event_decl CCollector : public CInterfaceOf<IEventVisitationLink>
30+
{
31+
public: // IEventVisitor
32+
virtual bool visitEvent(CEvent& event) override;
33+
public: // IEventVisitationLink
34+
IMPLEMENT_IEVENTVISITATIONLINK
35+
virtual void configure(const IPropertyTree& config) override;
36+
public:
37+
CCollector(CMetaInfoState& _metaState);
38+
private:
39+
Linked<CMetaInfoState> metaState;
40+
};
3741

42+
public:
43+
IEventVisitationLink* getCollector();
3844
// Accessor functions for file ID to path mappings
3945
const char* queryFilePath(__uint64 fileId) const;
4046
bool hasFileMapping(__uint64 fileId) const;
@@ -58,4 +64,4 @@ class event_decl CMetaInfoState : public CInterfaceOf<IEventVisitationLink>
5864

5965
// Trace ID to service name mappings (from EventQueryStart events)
6066
std::unordered_map<std::string, std::string> traceIdToService;
61-
};
67+
};

common/eventconsumption/eventoperation.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,8 @@ bool CEventConsumingOp::traverseEvents(const char* path, IEventVisitor& visitor)
7777
model->setNextLink(*actual);
7878
actual = model;
7979
}
80+
Owned<IEventVisitationLink> metaCollector = metaState->getCollector();
81+
metaCollector->setNextLink(*actual);
8082

81-
// Always include the meta information parser as the first link in the chain
82-
// to ensure MetaFileInformation and EventQueryStart events are captured
83-
metaState->setNextLink(*actual);
84-
85-
return readEvents(path, *metaState);
83+
return readEvents(path, *metaCollector);
8684
}

common/eventconsumption/eventoperation.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class event_decl CEventConsumingOp : public CInterface
4747
Owned<CMetaInfoState> metaState;
4848
StringAttr inputPath;
4949
Linked<IBufferedSerialOutputStream> out;
50+
private:
5051
Owned<IEventFilter> filter;
5152
Owned<IEventModel> model;
5253
};

common/eventconsumption/eventunittests.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,9 @@ void testEventVisitationLinks(const IPropertyTree& inputTree, const IPropertyTre
186186
}
187187

188188
// Always include the meta information parser as the first link in the chain
189-
metaState->setNextLink(*currentVisitor);
190-
visitIterableEvents(*input, *metaState);
189+
Owned<IEventVisitationLink> metaCollector = metaState->getCollector();
190+
metaCollector->setNextLink(*currentVisitor);
191+
visitIterableEvents(*input, *metaCollector);
191192
return true;
192193
}
193194

dali/ft/daftformat.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ void CPartitioner::commonCalcPartitions()
143143
//Don't add an empty block on the start of the this chunk to transfer.
144144
if ((split != firstSplit) || (inputOffset != startInputOffset))
145145
{
146-
results.append(*new PartitionPoint(whichInput, split-1, startInputOffset-thisOffset+thisHeaderSize, inputOffset - startInputOffset - cursor.trimLength, cursor.outputOffset-startOutputOffset));
146+
offset_t startInputOffsetWithTrim = startInputOffset + cursor.trimLength;
147+
assertex(startInputOffsetWithTrim <= inputOffset); // guard against length underflow
148+
results.append(*new PartitionPoint(whichInput, split-1, startInputOffset-thisOffset+thisHeaderSize, inputOffset - startInputOffsetWithTrim, cursor.outputOffset-startOutputOffset));
147149
JSON_DBGLOG("commonCalcPartitions: startInputOffset:%lld, inputOffset: %lld, startOutputOffset: %lld, cursor.outputOffset: %lld",startInputOffset ,inputOffset, startOutputOffset, cursor.outputOffset);
148150
startInputOffset = inputOffset;
149151
startOutputOffset = cursor.outputOffset;

dali/ft/daftformat.ipp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -563,11 +563,13 @@ protected:
563563
JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: splitOffset %lld", splitOffset);
564564
JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: cursor(inputOffset: %lld, nextInputOffset: %lld, outputOffset: %lld, trimLength: %lld",
565565
cursor.inputOffset, cursor.nextInputOffset, cursor.outputOffset, cursor.trimLength);
566-
cursor.inputOffset = 0;
567566
if (!splitOffset) //header + 0 is first offset
567+
{
568+
cursor.inputOffset = 0;
568569
return;
570+
}
569571

570-
if (eof)
572+
if (eof) // leave cursor.inputOffset untouched
571573
return;
572574

573575
// To prevent the splitOffset points into an already processed file area
@@ -579,15 +581,12 @@ protected:
579581
bool foundRowEnd = json->findRowEnd(splitOffset-thisOffset + thisHeaderSize, prevRowEnd);
580582
JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: thisOffset: %lld, thisHeaderSize: %u, prevRowEnd: %lld, foundRowEnd:%s ", thisOffset, thisHeaderSize, prevRowEnd, (foundRowEnd ? "True" : "False"));
581583
if (! foundRowEnd) //false return just means we're processing the end
582-
{
583-
//cursor.inputOffset = prevRowEnd;
584-
return;
585-
}
584+
return; // leave cursor.inputOffset untouched
586585

587586
bool checkFoundRowStartRes = json->checkFoundRowStart();
588587
JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: checkFoundRowStartRes:%s)", (checkFoundRowStartRes ? "True" : "False"));
589588
if (!checkFoundRowStartRes)
590-
return;
589+
return; // leave cursor.inputOffset untouched
591590

592591
bool isNewRowSet = json->newRowSet;
593592
JSON_DBGLOG("CJsonInputPartitioner::findSplitPoint: isNewRowSet:%s", (isNewRowSet ? "True" : "False"));

0 commit comments

Comments
 (0)