Skip to content

Commit f4cdc0a

Browse files
hdikemanfacebook-github-bot
authored andcommitted
feat: Add tableParameters to HiveInsertTableHandle
Summary: Add a `tableParameters` field to `HiveInsertTableHandle` to carry table-level metadata from the coordinator to the native worker write path, following the existing `serdeParameters` pattern. - Add `tableParameters` constructor parameter, private member, and accessor to `HiveInsertTableHandle` serde changes. - Add `tableParameters` to `FileSink::Options`. - Thread through `createHiveFileSink` and both call sites. - Update all downstream callers and add serde test coverage. Differential Revision: D94986580
1 parent 2042c2a commit f4cdc0a

File tree

8 files changed

+57
-13
lines changed

8 files changed

+57
-13
lines changed

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ std::unique_ptr<dwio::common::FileSink> createHiveFileSink(
6969
const std::shared_ptr<const HiveConfig>& hiveConfig,
7070
memory::MemoryPool* sinkPool,
7171
io::IoStatistics* ioStats,
72-
IoStats* fileSystemStats) {
72+
IoStats* fileSystemStats,
73+
const std::unordered_map<std::string, std::string>& tableParameters) {
7374
return dwio::common::FileSink::create(
7475
path,
7576
{
@@ -80,6 +81,7 @@ std::unique_ptr<dwio::common::FileSink> createHiveFileSink(
8081
.metricLogger = dwio::common::MetricsLog::voidLog(),
8182
.stats = ioStats,
8283
.fileSystemStats = fileSystemStats,
84+
.tableParameters = tableParameters,
8385
});
8486
}
8587

@@ -429,7 +431,8 @@ HiveInsertTableHandle::HiveInsertTableHandle(
429431
// if there's no data. This is useful when the table is bucketed, but the
430432
// engine handles ensuring a 1 to 1 mapping from task to bucket.
431433
const bool ensureFiles,
432-
std::shared_ptr<const FileNameGenerator> fileNameGenerator)
434+
std::shared_ptr<const FileNameGenerator> fileNameGenerator,
435+
const std::unordered_map<std::string, std::string>& tableParameters)
433436
: inputColumns_(std::move(inputColumns)),
434437
locationHandle_(std::move(locationHandle)),
435438
storageFormat_(storageFormat),
@@ -439,6 +442,7 @@ HiveInsertTableHandle::HiveInsertTableHandle(
439442
writerOptions_(writerOptions),
440443
ensureFiles_(ensureFiles),
441444
fileNameGenerator_(std::move(fileNameGenerator)),
445+
tableParameters_(tableParameters),
442446
partitionChannels_(computePartitionChannels(inputColumns_)),
443447
nonPartitionChannels_(computeNonPartitionChannels(inputColumns_)) {
444448
if (compressionKind.has_value()) {
@@ -1076,7 +1080,8 @@ std::unique_ptr<dwio::common::Writer> HiveDataSink::createWriterForIndex(
10761080
hiveConfig_,
10771081
info->sinkPool.get(),
10781082
ioStats_[writerIndex].get(),
1079-
fileSystemStats_.get()),
1083+
fileSystemStats_.get(),
1084+
insertTableHandle_->tableParameters()),
10801085
options);
10811086
return maybeCreateBucketSortWriter(writerIndex, std::move(writer));
10821087
}
@@ -1358,6 +1363,13 @@ folly::dynamic HiveInsertTableHandle::serialize() const {
13581363
params[key] = value;
13591364
}
13601365
obj["serdeParameters"] = params;
1366+
1367+
folly::dynamic tableParams = folly::dynamic::object;
1368+
for (const auto& [key, value] : tableParameters_) {
1369+
tableParams[key] = value;
1370+
}
1371+
obj["tableParameters"] = tableParams;
1372+
13611373
obj["ensureFiles"] = ensureFiles_;
13621374
obj["fileNameGenerator"] = fileNameGenerator_->serialize();
13631375
return obj;
@@ -1389,6 +1401,13 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create(
13891401
serdeParameters.emplace(pair.first.asString(), pair.second.asString());
13901402
}
13911403

1404+
std::unordered_map<std::string, std::string> tableParameters;
1405+
if (obj.count("tableParameters") > 0) {
1406+
for (const auto& pair : obj["tableParameters"].items()) {
1407+
tableParameters.emplace(pair.first.asString(), pair.second.asString());
1408+
}
1409+
}
1410+
13921411
bool ensureFiles = obj["ensureFiles"].asBool();
13931412

13941413
auto fileNameGenerator =
@@ -1402,7 +1421,8 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create(
14021421
serdeParameters,
14031422
nullptr, // writerOptions is not serializable
14041423
ensureFiles,
1405-
fileNameGenerator);
1424+
fileNameGenerator,
1425+
tableParameters);
14061426
}
14071427

14081428
void HiveInsertTableHandle::registerSerDe() {

velox/connectors/hive/HiveDataSink.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
251251
// engine handles ensuring a 1 to 1 mapping from task to bucket.
252252
const bool ensureFiles = false,
253253
std::shared_ptr<const FileNameGenerator> fileNameGenerator =
254-
std::make_shared<const HiveInsertFileNameGenerator>());
254+
std::make_shared<const HiveInsertFileNameGenerator>(),
255+
const std::unordered_map<std::string, std::string>& tableParameters = {});
255256

256257
virtual ~HiveInsertTableHandle() = default;
257258

@@ -276,6 +277,10 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
276277
return serdeParameters_;
277278
}
278279

280+
const std::unordered_map<std::string, std::string>& tableParameters() const {
281+
return tableParameters_;
282+
}
283+
279284
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions() const {
280285
return writerOptions_;
281286
}
@@ -330,6 +335,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
330335
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_;
331336
const bool ensureFiles_;
332337
const std::shared_ptr<const FileNameGenerator> fileNameGenerator_;
338+
const std::unordered_map<std::string, std::string> tableParameters_;
333339
const std::vector<column_index_t> partitionChannels_;
334340
const std::vector<column_index_t> nonPartitionChannels_;
335341
};

velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,11 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) {
231231
{"key2", "value2"},
232232
};
233233

234+
std::unordered_map<std::string, std::string> tableParameters = {
235+
{"key3", "value3"},
236+
{"key4", "value4"},
237+
};
238+
234239
auto hiveInsertTableHandle =
235240
exec::test::HiveConnectorTestBase::makeHiveInsertTableHandle(
236241
tableColumnNames,
@@ -240,7 +245,10 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) {
240245
locationHandle,
241246
dwio::common::FileFormat::NIMBLE,
242247
common::CompressionKind::CompressionKind_SNAPPY,
243-
serdeParameters);
248+
serdeParameters,
249+
nullptr, // writerOptions
250+
false, // ensureFiles
251+
tableParameters);
244252
testSerde(*hiveInsertTableHandle);
245253
}
246254

velox/connectors/hive/tests/HiveDataSinkTest.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
167167
connector::hive::LocationHandle::TableType::kNew),
168168
fileFormat,
169169
CompressionKind::CompressionKind_ZSTD,
170-
{},
170+
{}, // serdeParameters
171171
writerOptions,
172172
ensureFiles);
173173
}
@@ -1290,6 +1290,7 @@ TEST_F(HiveDataSinkTest, ensureFilesUnsupported) {
12901290
dwio::common::FileFormat::DWRF,
12911291
CompressionKind::CompressionKind_ZSTD,
12921292
{}, // serdeParameters
1293+
{}, // tableParameters
12931294
nullptr, // writeOptions
12941295
true // ensureFiles
12951296
),
@@ -1313,6 +1314,7 @@ TEST_F(HiveDataSinkTest, ensureFilesUnsupported) {
13131314
dwio::common::FileFormat::DWRF,
13141315
CompressionKind::CompressionKind_ZSTD,
13151316
{}, // serdeParameters
1317+
{}, // tableParameters
13161318
nullptr, // writeOptions
13171319
true // ensureFiles
13181320
),

velox/dwio/common/FileSink.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class FileSink : public Closeable {
4747
MetricsLogPtr metricLogger{MetricsLog::voidLog()};
4848
IoStatistics* stats{nullptr};
4949
velox::IoStats* fileSystemStats{nullptr};
50+
std::unordered_map<std::string, std::string> tableParameters{};
5051
};
5152

5253
FileSink(std::string name, const Options& options)

velox/exec/tests/utils/HiveConnectorTestBase.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
329329
std::move(locationHandle),
330330
tableStorageFormat,
331331
compressionKind,
332-
{},
332+
{}, // serdeParameters
333333
writerOptions,
334334
ensureFiles);
335335
}
@@ -346,7 +346,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
346346
const std::optional<common::CompressionKind> compressionKind,
347347
const std::unordered_map<std::string, std::string>& serdeParameters,
348348
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
349-
const bool ensureFiles) {
349+
const bool ensureFiles,
350+
const std::unordered_map<std::string, std::string>& tableParameters) {
350351
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
351352
columnHandles;
352353
std::vector<std::string> bucketedBy;
@@ -404,7 +405,9 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
404405
compressionKind,
405406
serdeParameters,
406407
writerOptions,
407-
ensureFiles);
408+
ensureFiles,
409+
std::make_shared<const connector::hive::HiveInsertFileNameGenerator>(),
410+
tableParameters);
408411
}
409412

410413
std::shared_ptr<connector::hive::HiveColumnHandle>

velox/exec/tests/utils/HiveConnectorTestBase.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ class HiveConnectorTestBase : public OperatorTestBase {
209209
const std::unordered_map<std::string, std::string>& serdeParameters = {},
210210
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
211211
nullptr,
212-
const bool ensureFiles = false);
212+
const bool ensureFiles = false,
213+
const std::unordered_map<std::string, std::string>& tableParameters = {});
213214

214215
static std::shared_ptr<connector::hive::HiveInsertTableHandle>
215216
makeHiveInsertTableHandle(

velox/tool/trace/TableWriterReplayer.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ makeHiveInsertTableHandle(
5353
: std::make_shared<connector::hive::HiveBucketProperty>(
5454
*tracedHandle->bucketProperty()),
5555
compressionKind,
56-
std::unordered_map<std::string, std::string>{},
57-
writerOptions);
56+
std::unordered_map<std::string, std::string>{}, // serdeParameters
57+
writerOptions,
58+
tracedHandle->ensureFiles(),
59+
tracedHandle->fileNameGenerator(),
60+
tracedHandle->tableParameters());
5861
}
5962

6063
std::shared_ptr<core::InsertTableHandle> createInsertTableHanlde(

0 commit comments

Comments
 (0)