Skip to content

Commit 7b12a5a

Browse files
hdikemanmeta-codesync[bot]
authored andcommitted
feat: Add tableParameters to HiveInsertTableHandle (facebookincubator#16637)
Summary: Pull Request resolved: facebookincubator#16637 Add a `tableParameters` field to `HiveInsertTableHandle` to carry table-level metadata from the coordinator to the native worker write path, following the existing `serdeParameters` pattern. - Add `tableParameters` constructor parameter, private member, and accessor to `HiveInsertTableHandle` serde changes. - Add `tableParameters` to `FileSink::Options`. - Thread through `createHiveFileSink` and both call sites. - Update all downstream callers and add serde test coverage. Differential Revision: D94986580
1 parent d807c6e commit 7b12a5a

File tree

8 files changed

+57
-15
lines changed

8 files changed

+57
-15
lines changed

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ std::unique_ptr<dwio::common::FileSink> createHiveFileSink(
7070
const std::shared_ptr<const HiveConfig>& hiveConfig,
7171
memory::MemoryPool* sinkPool,
7272
io::IoStatistics* ioStats,
73-
IoStats* fileSystemStats) {
73+
IoStats* fileSystemStats,
74+
const std::unordered_map<std::string, std::string>& tableParameters) {
7475
return dwio::common::FileSink::create(
7576
path,
7677
{
@@ -81,6 +82,7 @@ std::unique_ptr<dwio::common::FileSink> createHiveFileSink(
8182
.metricLogger = dwio::common::MetricsLog::voidLog(),
8283
.stats = ioStats,
8384
.fileSystemStats = fileSystemStats,
85+
.tableParameters = tableParameters,
8486
});
8587
}
8688

@@ -430,7 +432,8 @@ HiveInsertTableHandle::HiveInsertTableHandle(
430432
// if there's no data. This is useful when the table is bucketed, but the
431433
// engine handles ensuring a 1 to 1 mapping from task to bucket.
432434
const bool ensureFiles,
433-
std::shared_ptr<const FileNameGenerator> fileNameGenerator)
435+
std::shared_ptr<const FileNameGenerator> fileNameGenerator,
436+
const std::unordered_map<std::string, std::string>& tableParameters)
434437
: inputColumns_(std::move(inputColumns)),
435438
locationHandle_(std::move(locationHandle)),
436439
storageFormat_(storageFormat),
@@ -440,6 +443,7 @@ HiveInsertTableHandle::HiveInsertTableHandle(
440443
writerOptions_(writerOptions),
441444
ensureFiles_(ensureFiles),
442445
fileNameGenerator_(std::move(fileNameGenerator)),
446+
tableParameters_(tableParameters),
443447
partitionChannels_(computePartitionChannels(inputColumns_)),
444448
nonPartitionChannels_(computeNonPartitionChannels(inputColumns_)) {
445449
if (compressionKind.has_value()) {
@@ -1077,7 +1081,8 @@ std::unique_ptr<dwio::common::Writer> HiveDataSink::createWriterForIndex(
10771081
hiveConfig_,
10781082
info->sinkPool.get(),
10791083
ioStats_[writerIndex].get(),
1080-
fileSystemStats_.get()),
1084+
fileSystemStats_.get(),
1085+
insertTableHandle_->tableParameters()),
10811086
options);
10821087
return maybeCreateBucketSortWriter(writerIndex, std::move(writer));
10831088
}
@@ -1369,6 +1374,13 @@ folly::dynamic HiveInsertTableHandle::serialize() const {
13691374
params[key] = value;
13701375
}
13711376
obj["serdeParameters"] = params;
1377+
1378+
folly::dynamic tableParams = folly::dynamic::object;
1379+
for (const auto& [key, value] : tableParameters_) {
1380+
tableParams[key] = value;
1381+
}
1382+
obj["tableParameters"] = tableParams;
1383+
13721384
obj["ensureFiles"] = ensureFiles_;
13731385
obj["fileNameGenerator"] = fileNameGenerator_->serialize();
13741386
return obj;
@@ -1400,6 +1412,13 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create(
14001412
serdeParameters.emplace(pair.first.asString(), pair.second.asString());
14011413
}
14021414

1415+
std::unordered_map<std::string, std::string> tableParameters;
1416+
if (obj.count("tableParameters") > 0) {
1417+
for (const auto& pair : obj["tableParameters"].items()) {
1418+
tableParameters.emplace(pair.first.asString(), pair.second.asString());
1419+
}
1420+
}
1421+
14031422
bool ensureFiles = obj["ensureFiles"].asBool();
14041423

14051424
auto fileNameGenerator =
@@ -1413,7 +1432,8 @@ HiveInsertTableHandlePtr HiveInsertTableHandle::create(
14131432
serdeParameters,
14141433
nullptr, // writerOptions is not serializable
14151434
ensureFiles,
1416-
fileNameGenerator);
1435+
fileNameGenerator,
1436+
tableParameters);
14171437
}
14181438

14191439
void HiveInsertTableHandle::registerSerDe() {

velox/connectors/hive/HiveDataSink.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
254254
// engine handles ensuring a 1 to 1 mapping from task to bucket.
255255
const bool ensureFiles = false,
256256
std::shared_ptr<const FileNameGenerator> fileNameGenerator =
257-
std::make_shared<const HiveInsertFileNameGenerator>());
257+
std::make_shared<const HiveInsertFileNameGenerator>(),
258+
const std::unordered_map<std::string, std::string>& tableParameters = {});
258259

259260
virtual ~HiveInsertTableHandle() = default;
260261

@@ -279,6 +280,10 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
279280
return serdeParameters_;
280281
}
281282

283+
const std::unordered_map<std::string, std::string>& tableParameters() const {
284+
return tableParameters_;
285+
}
286+
282287
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions() const {
283288
return writerOptions_;
284289
}
@@ -333,6 +338,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
333338
const std::shared_ptr<dwio::common::WriterOptions> writerOptions_;
334339
const bool ensureFiles_;
335340
const std::shared_ptr<const FileNameGenerator> fileNameGenerator_;
341+
const std::unordered_map<std::string, std::string> tableParameters_;
336342
const std::vector<column_index_t> partitionChannels_;
337343
const std::vector<column_index_t> nonPartitionChannels_;
338344
};

velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) {
230230
{"key2", "value2"},
231231
};
232232

233+
std::unordered_map<std::string, std::string> tableParameters = {
234+
{"key3", "value3"},
235+
{"key4", "value4"},
236+
};
237+
233238
auto hiveInsertTableHandle =
234239
exec::test::HiveConnectorTestBase::makeHiveInsertTableHandle(
235240
tableColumnNames,
@@ -239,7 +244,10 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) {
239244
locationHandle,
240245
dwio::common::FileFormat::NIMBLE,
241246
common::CompressionKind::CompressionKind_SNAPPY,
242-
serdeParameters);
247+
serdeParameters,
248+
nullptr, // writerOptions
249+
false, // ensureFiles
250+
tableParameters);
243251
testSerde(*hiveInsertTableHandle);
244252
}
245253

velox/connectors/hive/tests/HiveDataSinkTest.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
167167
connector::hive::LocationHandle::TableType::kNew),
168168
fileFormat,
169169
CompressionKind::CompressionKind_ZSTD,
170-
{},
170+
{}, // serdeParameters
171171
writerOptions,
172172
ensureFiles);
173173
}
@@ -1290,7 +1290,7 @@ TEST_F(HiveDataSinkTest, ensureFilesUnsupported) {
12901290
dwio::common::FileFormat::DWRF,
12911291
CompressionKind::CompressionKind_ZSTD,
12921292
{}, // serdeParameters
1293-
nullptr, // writeOptions
1293+
nullptr, // writerOptions
12941294
true // ensureFiles
12951295
),
12961296
"ensureFiles is not supported with partition keys in the data");
@@ -1313,7 +1313,7 @@ TEST_F(HiveDataSinkTest, ensureFilesUnsupported) {
13131313
dwio::common::FileFormat::DWRF,
13141314
CompressionKind::CompressionKind_ZSTD,
13151315
{}, // serdeParameters
1316-
nullptr, // writeOptions
1316+
nullptr, // writerOptions
13171317
true // ensureFiles
13181318
),
13191319
"ensureFiles is not supported with bucketing");

velox/dwio/common/FileSink.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class FileSink : public Closeable {
4747
MetricsLogPtr metricLogger{MetricsLog::voidLog()};
4848
IoStatistics* stats{nullptr};
4949
velox::IoStats* fileSystemStats{nullptr};
50+
std::unordered_map<std::string, std::string> tableParameters{};
5051
};
5152

5253
FileSink(std::string name, const Options& options)

velox/exec/tests/utils/HiveConnectorTestBase.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
329329
std::move(locationHandle),
330330
tableStorageFormat,
331331
compressionKind,
332-
{},
332+
{}, // serdeParameters
333333
writerOptions,
334334
ensureFiles);
335335
}
@@ -346,7 +346,8 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
346346
const std::optional<common::CompressionKind> compressionKind,
347347
const std::unordered_map<std::string, std::string>& serdeParameters,
348348
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
349-
const bool ensureFiles) {
349+
const bool ensureFiles,
350+
const std::unordered_map<std::string, std::string>& tableParameters) {
350351
std::vector<std::shared_ptr<const connector::hive::HiveColumnHandle>>
351352
columnHandles;
352353
std::vector<std::string> bucketedBy;
@@ -404,7 +405,9 @@ HiveConnectorTestBase::makeHiveInsertTableHandle(
404405
compressionKind,
405406
serdeParameters,
406407
writerOptions,
407-
ensureFiles);
408+
ensureFiles,
409+
std::make_shared<const connector::hive::HiveInsertFileNameGenerator>(),
410+
tableParameters);
408411
}
409412

410413
std::shared_ptr<connector::hive::HiveColumnHandle>

velox/exec/tests/utils/HiveConnectorTestBase.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ class HiveConnectorTestBase : public OperatorTestBase {
207207
const std::unordered_map<std::string, std::string>& serdeParameters = {},
208208
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
209209
nullptr,
210-
const bool ensureFiles = false);
210+
const bool ensureFiles = false,
211+
const std::unordered_map<std::string, std::string>& tableParameters = {});
211212

212213
static std::shared_ptr<connector::hive::HiveInsertTableHandle>
213214
makeHiveInsertTableHandle(

velox/tool/trace/TableWriterReplayer.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ makeHiveInsertTableHandle(
5353
: std::make_shared<connector::hive::HiveBucketProperty>(
5454
*tracedHandle->bucketProperty()),
5555
compressionKind,
56-
std::unordered_map<std::string, std::string>{},
57-
writerOptions);
56+
std::unordered_map<std::string, std::string>{}, // serdeParameters
57+
writerOptions,
58+
tracedHandle->ensureFiles(),
59+
tracedHandle->fileNameGenerator(),
60+
tracedHandle->tableParameters());
5861
}
5962

6063
std::shared_ptr<core::InsertTableHandle> createInsertTableHanlde(

0 commit comments

Comments
 (0)