Skip to content

Commit ace9783

Browse files
Huameng (Michael) Jiangmeta-codesync[bot]
authored andcommitted
collect physical stats through aggregation at encode time (facebookincubator#446)
Summary: Pull Request resolved: facebookincubator#446 Builds the mapping from accumulated streams to the correct aggregation node id, and ensuring aggregation to them after encoding the streams. Replaces the old behavior to use less space and be compatible with the current in-memory stats builders/collectors. Reviewed By: helfman Differential Revision: D90174063 fbshipit-source-id: 8211309c26bd3b6b6282e50a624afc0fddc968c5
1 parent f36586d commit ace9783

File tree

5 files changed

+161
-82
lines changed

5 files changed

+161
-82
lines changed

dwio/nimble/encodings/EncodingSelectionPolicy.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -614,12 +614,11 @@ class ReplayedEncodingSelectionPolicy
614614
CYAN << "Replaying encoding " << encodingLayout_.encodingType());
615615
return {
616616
.encodingType = encodingLayout_.encodingType(),
617-
.compressionPolicyFactory =
618-
[this]() {
619-
return std::make_unique<ReplayedCompressionPolicy>(
620-
encodingLayout_.compressionType(), compressionOptions_);
621-
},
622-
.encodingConfig = encodingLayout_.config()};
617+
.encodingConfig = encodingLayout_.config(),
618+
.compressionPolicyFactory = [this]() {
619+
return std::make_unique<ReplayedCompressionPolicy>(
620+
encodingLayout_.compressionType(), compressionOptions_);
621+
}};
623622
}
624623

625624
EncodingSelectionResult selectNullable(

dwio/nimble/velox/FieldWriter.cpp

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ class SimpleFieldWriter : public FieldWriter {
340340
context.schemaBuilder().createScalarTypeBuilder(
341341
NimbleTypeTraits<K>::scalarKind)),
342342
valuesStream_{context.createNullableContentStreamData<TargetType>(
343-
typeBuilder_->asScalar().scalarDescriptor())},
343+
typeBuilder_->asScalar().scalarDescriptor(),
344+
nodeId)},
344345
statisticsCollector_{context.getStatsCollector(nodeId)} {}
345346

346347
void write(
@@ -486,7 +487,8 @@ class StringFieldWriter : public FieldWriter {
486487
context.schemaBuilder().createScalarTypeBuilder(
487488
NimbleTypeTraits<K>::scalarKind)),
488489
valuesStream_{context.createNullableContentStringStreamData(
489-
typeBuilder_->asScalar().scalarDescriptor())},
490+
typeBuilder_->asScalar().scalarDescriptor(),
491+
nodeId)},
490492
statisticsCollector_{context.getStatsCollector(nodeId)} {
491493
static_assert(
492494
K == velox::TypeKind::VARCHAR || K == velox::TypeKind::VARBINARY,
@@ -597,9 +599,11 @@ class TimestampFieldWriter : public FieldWriter {
597599
explicit TimestampFieldWriter(FieldWriterContext& context, uint32_t nodeId)
598600
: FieldWriter{context, context.schemaBuilder().createTimestampMicroNanoTypeBuilder()},
599601
microsStream_{context.createNullableContentStreamData<int64_t>(
600-
typeBuilder_->asTimestampMicroNano().microsDescriptor())},
602+
typeBuilder_->asTimestampMicroNano().microsDescriptor(),
603+
nodeId)},
601604
nanosStream_{context.createContentStreamData<uint16_t>(
602-
typeBuilder_->asTimestampMicroNano().nanosDescriptor())},
605+
typeBuilder_->asTimestampMicroNano().nanosDescriptor(),
606+
nodeId)},
603607
statisticsCollector_{context.getStatsCollector(nodeId)} {}
604608

605609
void write(
@@ -690,7 +694,8 @@ class RowFieldWriter : public FieldWriter {
690694
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type)
691695
: FieldWriter{context, context.schemaBuilder().createRowTypeBuilder(type->size())},
692696
nullsStream_{context_.createNullsStreamData(
693-
typeBuilder_->asRow().nullsDescriptor())},
697+
typeBuilder_->asRow().nullsDescriptor(),
698+
type->id())},
694699
statisticsCollector_{context.getStatsCollector(type->id())},
695700
ignoreNulls_{type->id() == 0 && context.ignoreTopLevelNulls()} {
696701
auto rowType =
@@ -815,8 +820,8 @@ class MultiValueFieldWriter : public FieldWriter {
815820
std::shared_ptr<LengthsTypeBuilder> typeBuilder)
816821
: FieldWriter{context, std::move(typeBuilder)},
817822
lengthsStream_{context.createNullableContentStreamData<uint32_t>(
818-
static_cast<LengthsTypeBuilder&>(*typeBuilder_)
819-
.lengthsDescriptor())},
823+
static_cast<LengthsTypeBuilder&>(*typeBuilder_).lengthsDescriptor(),
824+
type->id())},
820825
statisticsCollector_{context.getStatsCollector(type->id())} {}
821826

822827
void reset() override {
@@ -981,9 +986,11 @@ class SlidingWindowMapFieldWriter : public FieldWriter {
981986
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type)
982987
: FieldWriter{context, context.schemaBuilder().createSlidingWindowMapTypeBuilder()},
983988
offsetsStream_{context.createNullableContentStreamData<uint32_t>(
984-
typeBuilder_->asSlidingWindowMap().offsetsDescriptor())},
989+
typeBuilder_->asSlidingWindowMap().offsetsDescriptor(),
990+
type->id())},
985991
lengthsStream_{context.createContentStreamData<uint32_t>(
986-
typeBuilder_->asSlidingWindowMap().lengthsDescriptor())},
992+
typeBuilder_->asSlidingWindowMap().lengthsDescriptor(),
993+
type->id())},
987994
currentOffset_(0),
988995
cached_{false},
989996
cachedLength_{0},
@@ -1161,9 +1168,11 @@ class FlatMapPassthroughValueFieldWriter {
11611168
FlatMapPassthroughValueFieldWriter(
11621169
FieldWriterContext& context,
11631170
const StreamDescriptorBuilder& inMapDescriptor,
1164-
std::unique_ptr<FieldWriter> valueField)
1171+
std::unique_ptr<FieldWriter> valueField,
1172+
uint32_t nodeId)
11651173
: valueField_{std::move(valueField)},
1166-
inMapStream_{context.createContentStreamData<bool>(inMapDescriptor)} {}
1174+
inMapStream_{
1175+
context.createContentStreamData<bool>(inMapDescriptor, nodeId)} {}
11671176

11681177
// Write without an explicit inMaps buffer; assume all inMap bits are set.
11691178
void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) {
@@ -1225,9 +1234,11 @@ class FlatMapValueFieldWriter {
12251234
FlatMapValueFieldWriter(
12261235
FieldWriterContext& context,
12271236
const StreamDescriptorBuilder& inMapDescriptor,
1228-
std::unique_ptr<FieldWriter> valueField)
1237+
std::unique_ptr<FieldWriter> valueField,
1238+
uint32_t nodeId)
12291239
: valueField_{std::move(valueField)},
1230-
inMapStream_{context.createContentStreamData<bool>(inMapDescriptor)} {}
1240+
inMapStream_{
1241+
context.createContentStreamData<bool>(inMapDescriptor, nodeId)} {}
12311242

12321243
// Clear the ranges and extend the inMapBuffer
12331244
void prepare(uint32_t numValues) {
@@ -1312,7 +1323,8 @@ class FlatMapFieldWriter : public FieldWriter {
13121323
valueType_{type->childAt(1)},
13131324
nodeId_{type->id()},
13141325
nullsStream_{context_.createNullsStreamData(
1315-
typeBuilder_->asFlatMap().nullsDescriptor())} {
1326+
typeBuilder_->asFlatMap().nullsDescriptor(),
1327+
type->id())} {
13161328
auto statsBuilder = context.getStatsCollector(type->id());
13171329
// Sanity check that the stats builders are shared and thread safe.
13181330
NIMBLE_CHECK(statsBuilder->isShared());
@@ -1371,7 +1383,10 @@ class FlatMapFieldWriter : public FieldWriter {
13711383
.insert(
13721384
{key,
13731385
std::make_unique<FlatMapPassthroughValueFieldWriter>(
1374-
context_, inMapDescriptor, std::move(fieldWriter))})
1386+
context_,
1387+
inMapDescriptor,
1388+
std::move(fieldWriter),
1389+
nodeId_)})
13751390
.first;
13761391
return *it->second;
13771392
}
@@ -1386,12 +1401,15 @@ class FlatMapFieldWriter : public FieldWriter {
13861401
}
13871402

13881403
void collectStatistics(uint64_t nullCount, uint64_t valueCount) {
1404+
LOG(INFO) << "Collecting statistics for flatmap";
13891405
if (!statisticsCollector_) {
13901406
return;
13911407
}
13921408

13931409
statisticsCollector_->addCounts(valueCount, nullCount);
13941410
statisticsCollector_->addLogicalSize(nullCount);
1411+
LOG(INFO) << "New flatmap logical size: "
1412+
<< statisticsCollector_->getLogicalSize();
13951413
}
13961414

13971415
// Collects key statistics for flatmap with string keys (VARCHAR/VARBINARY).
@@ -1820,7 +1838,7 @@ class FlatMapFieldWriter : public FieldWriter {
18201838
context_.handleFlatmapFieldAddEvent(
18211839
*typeBuilder_, stringKey, *valueFieldWriter->typeBuilder());
18221840
auto flatMapValueField = std::make_unique<FlatMapValueFieldWriter>(
1823-
context_, inMapDescriptor, std::move(valueFieldWriter));
1841+
context_, inMapDescriptor, std::move(valueFieldWriter), nodeId_);
18241842
flatFieldIt =
18251843
allValueFields_.emplace(key, std::move(flatMapValueField)).first;
18261844
}
@@ -1903,9 +1921,11 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
19031921
const std::shared_ptr<const velox::dwio::common::TypeWithId>& type)
19041922
: FieldWriter{context, context.schemaBuilder().createArrayWithOffsetsTypeBuilder()},
19051923
offsetsStream_{context.createNullableContentStreamData<uint32_t>(
1906-
typeBuilder_->asArrayWithOffsets().offsetsDescriptor())},
1924+
typeBuilder_->asArrayWithOffsets().offsetsDescriptor(),
1925+
type->id())},
19071926
lengthsStream_{context.createContentStreamData<uint32_t>(
1908-
typeBuilder_->asArrayWithOffsets().lengthsDescriptor())},
1927+
typeBuilder_->asArrayWithOffsets().lengthsDescriptor(),
1928+
type->id())},
19091929
cached_(false),
19101930
cachedValue_(nullptr),
19111931
cachedSize_(0),

dwio/nimble/velox/FieldWriter.h

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -285,43 +285,63 @@ class FieldWriterContext {
285285
buffer_ = std::make_unique<Buffer>(*bufferMemoryPool_);
286286
}
287287

288-
inline const std::vector<std::unique_ptr<StreamData>>& streams() {
288+
inline const std::vector<std::pair<uint32_t, std::unique_ptr<StreamData>>>&
289+
streams() {
289290
return streams_;
290291
}
291292

292293
inline NullsStreamData& createNullsStreamData(
293-
const StreamDescriptorBuilder& descriptor) {
294-
return static_cast<NullsStreamData&>(*streams_.emplace_back(
295-
std::make_unique<NullsStreamData>(
296-
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_)));
294+
const StreamDescriptorBuilder& descriptor,
295+
uint32_t nodeId) {
296+
return static_cast<NullsStreamData&>(
297+
*streams_
298+
.emplace_back(
299+
nodeId,
300+
std::make_unique<NullsStreamData>(
301+
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_))
302+
.second);
297303
}
298304

299305
template <typename T>
300306
ContentStreamData<T>& createContentStreamData(
301-
const StreamDescriptorBuilder& descriptor) {
302-
return static_cast<ContentStreamData<T>&>(*streams_.emplace_back(
303-
std::make_unique<ContentStreamData<T>>(
304-
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_)));
307+
const StreamDescriptorBuilder& descriptor,
308+
uint32_t nodeId) {
309+
return static_cast<ContentStreamData<T>&>(
310+
*streams_
311+
.emplace_back(
312+
nodeId,
313+
std::make_unique<ContentStreamData<T>>(
314+
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_))
315+
.second);
305316
}
306317

307318
template <typename T>
308319
NullableContentStreamData<T>& createNullableContentStreamData(
309-
const StreamDescriptorBuilder& descriptor) {
310-
return static_cast<NullableContentStreamData<T>&>(*streams_.emplace_back(
311-
std::make_unique<NullableContentStreamData<T>>(
312-
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_)));
320+
const StreamDescriptorBuilder& descriptor,
321+
uint32_t nodeId) {
322+
return static_cast<NullableContentStreamData<T>&>(
323+
*streams_
324+
.emplace_back(
325+
nodeId,
326+
std::make_unique<NullableContentStreamData<T>>(
327+
*bufferMemoryPool_, descriptor, *inputBufferGrowthPolicy_))
328+
.second);
313329
}
314330

315331
nimble::NullableContentStringStreamData&
316332
createNullableContentStringStreamData(
317-
const nimble::StreamDescriptorBuilder& descriptor) {
333+
const nimble::StreamDescriptorBuilder& descriptor,
334+
uint32_t nodeId) {
318335
return static_cast<nimble::NullableContentStringStreamData&>(
319-
*streams_.emplace_back(
320-
std::make_unique<nimble::NullableContentStringStreamData>(
321-
*bufferMemoryPool_,
322-
descriptor,
323-
*inputBufferGrowthPolicy_,
324-
*stringBufferGrowthPolicy_)));
336+
*streams_
337+
.emplace_back(
338+
nodeId,
339+
std::make_unique<nimble::NullableContentStringStreamData>(
340+
*bufferMemoryPool_,
341+
descriptor,
342+
*inputBufferGrowthPolicy_,
343+
*stringBufferGrowthPolicy_))
344+
.second);
325345
}
326346

327347
inline bool disableSharedStringBuffers() const {
@@ -527,7 +547,7 @@ class FieldWriterContext {
527547

528548
std::unique_ptr<Buffer> buffer_;
529549
DecodingContextPool decodingContextPool_;
530-
std::vector<std::unique_ptr<StreamData>> streams_;
550+
std::vector<std::pair<uint32_t, std::unique_ptr<StreamData>>> streams_;
531551
std::shared_ptr<const velox::dwio::common::TypeWithId> schemaWithId_;
532552
std::vector<std::unique_ptr<StatisticsCollector>> statsCollectors_;
533553
bool statsFinalized_{false};

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,7 @@ bool VeloxWriter::write(const velox::VectorPtr& input) {
734734
}
735735

736736
uint64_t memoryUsed{0};
737-
for (const auto& stream : context_->streams()) {
737+
for (const auto& [_, stream] : context_->streams()) {
738738
memoryUsed += stream->memoryUsed();
739739
}
740740

@@ -933,12 +933,13 @@ void VeloxWriter::writeStreams() {
933933
if (context_->options().encodingExecutor) {
934934
velox::dwio::common::ExecutorBarrier barrier{
935935
context_->options().encodingExecutor};
936-
// TODO: make context_->streams() vector<pair<nodeId, StreamData>>
937-
// Then get stats builder for the node id and pass it into the callback.
938-
for (auto& streamData : context_->streams()) {
939-
barrier.add([&, _streamData = streamData.get()]() {
940-
uint64_t streamSize = 0;
936+
for (auto& [nodeId, streamData] : context_->streams()) {
937+
barrier.add([&,
938+
statsCollector = context_->getStatsCollector(nodeId),
939+
_streamData = streamData.get()]() {
940+
uint64_t streamSize{0};
941941
processStream(*_streamData, streamSize, chunkSize);
942+
statsCollector->addPhysicalSize(streamSize);
942943
});
943944
}
944945

@@ -947,9 +948,11 @@ void VeloxWriter::writeStreams() {
947948
barrier.waitAll();
948949
} else {
949950
const auto& streams = context_->streams();
950-
for (auto& streamData : streams) {
951-
uint64_t streamSize = 0;
951+
for (auto& [nodeId, streamData] : streams) {
952+
auto statsCollector = context_->getStatsCollector(nodeId);
953+
uint64_t streamSize{0};
952954
processStream(*streamData, streamSize, chunkSize);
955+
statsCollector->addPhysicalSize(streamSize);
953956
}
954957

955958
// Handle keyStream if index is enabled
@@ -1124,10 +1127,12 @@ bool VeloxWriter::writeChunks(
11241127
velox::dwio::common::ExecutorBarrier barrier{
11251128
context_->options().encodingExecutor};
11261129
for (auto streamIndex : streamIndices) {
1127-
auto& streamData = streams[streamIndex];
1130+
auto& [nodeId, streamData] = streams[streamIndex];
11281131
const auto offset = streamData->descriptor().offset();
11291132
auto& encodeStream = encodedStreams_[offset];
1130-
barrier.add([&] {
1133+
barrier.add([&,
1134+
streamData = streamData.get(),
1135+
statsCollector = context_->getStatsCollector(nodeId)] {
11311136
uint64_t streamSize = 0;
11321137
if (encodeStreamChunk(
11331138
*streamData,
@@ -1140,6 +1145,7 @@ bool VeloxWriter::writeChunks(
11401145
logicalBytes)) {
11411146
writtenChunk = true;
11421147
}
1148+
statsCollector->addPhysicalSize(streamSize);
11431149
});
11441150
}
11451151

@@ -1148,9 +1154,10 @@ bool VeloxWriter::writeChunks(
11481154
barrier.waitAll();
11491155
} else {
11501156
for (auto streamIndex : streamIndices) {
1151-
auto* streamData = streams[streamIndex].get();
1152-
uint64_t streamSize = 0;
1157+
auto& [nodeId, streamData] = streams[streamIndex];
11531158
const auto offset = streamData->descriptor().offset();
1159+
uint64_t streamSize = 0;
1160+
auto statsCollector = context_->getStatsCollector(nodeId);
11541161
if (encodeStreamChunk(
11551162
*streamData,
11561163
minChunkSize,
@@ -1162,6 +1169,7 @@ bool VeloxWriter::writeChunks(
11621169
logicalBytes)) {
11631170
writtenChunk = true;
11641171
}
1172+
statsCollector->addPhysicalSize(streamSize);
11651173
}
11661174

11671175
maybeEncodeKeyStreamChunk(lastChunk, ensureFullChunks);
@@ -1287,7 +1295,7 @@ bool VeloxWriter::evaluateFlushPolicy() {
12871295
? options.wideSchemaMaxStreamChunkRawSize
12881296
: options.maxStreamChunkRawSize;
12891297
for (auto streamIndex = 0; streamIndex < streams.size(); ++streamIndex) {
1290-
if (streams[streamIndex]->memoryUsed() >= maxChunkSize) {
1298+
if (streams[streamIndex].second->memoryUsed() >= maxChunkSize) {
12911299
streamIndices.push_back(streamIndex);
12921300
}
12931301
}
@@ -1309,7 +1317,8 @@ bool VeloxWriter::evaluateFlushPolicy() {
13091317
streamIndices.begin(),
13101318
streamIndices.end(),
13111319
[&](const uint32_t& a, const uint32_t& b) {
1312-
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1320+
return streams[a].second->memoryUsed() >
1321+
streams[b].second->memoryUsed();
13131322
});
13141323
flushChunks(streamIndices, /*ensureFullChunks=*/false, flushPolicy.get());
13151324
}

0 commit comments

Comments
 (0)