Skip to content

Commit 829a792

Browse files
committed
[native] Fix session properties race condition
Summary: The global SessionProperties object shall not be updated by every query. It is meaningless to update the default value and it is not thread safe. Removing the updates and corresponding update method to avoid inconsistent session property display. Differential Revision: D79859171
1 parent b0b8a8d commit 829a792

File tree

5 files changed

+28
-43
lines changed

5 files changed

+28
-43
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,10 @@ protocol::NodeState convertNodeState(presto::NodeState nodeState) {
107107
}
108108

109109
void enableChecksum() {
110-
velox::exec::OutputBufferManager::getInstanceRef()->setListenerFactory(
111-
[]() {
112-
return std::make_unique<
113-
velox::serializer::presto::PrestoOutputStreamListener>();
114-
});
110+
velox::exec::OutputBufferManager::getInstanceRef()->setListenerFactory([]() {
111+
return std::make_unique<
112+
velox::serializer::presto::PrestoOutputStreamListener>();
113+
});
115114
}
116115

117116
// Log only the catalog keys that are configured to avoid leaking
@@ -502,7 +501,8 @@ void PrestoServer::run() {
502501
<< "' has " << httpSrvCpuExecutor_->numThreads() << " threads.";
503502
for (auto evb : httpSrvIoExecutor_->getAllEventBases()) {
504503
evb->setMaxLatency(
505-
std::chrono::milliseconds(systemConfig->httpSrvIoEvbViolationThresholdMs()),
504+
std::chrono::milliseconds(
505+
systemConfig->httpSrvIoEvbViolationThresholdMs()),
506506
[]() { RECORD_METRIC_VALUE(kCounterHttpServerIoEvbViolation, 1); },
507507
/*dampen=*/false);
508508
}
@@ -834,7 +834,8 @@ void PrestoServer::initializeThreadPools() {
834834
<< " threads.";
835835
for (auto evb : exchangeHttpIoExecutor_->getAllEventBases()) {
836836
evb->setMaxLatency(
837-
std::chrono::milliseconds(systemConfig->exchangeIoEvbViolationThresholdMs()),
837+
std::chrono::milliseconds(
838+
systemConfig->exchangeIoEvbViolationThresholdMs()),
838839
[]() { RECORD_METRIC_VALUE(kCounterExchangeIoEvbViolation, 1); },
839840
/*dampen=*/false);
840841
}

presto-native-execution/presto_cpp/main/QueryContextManager.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,21 @@ void updateFromSystemConfigs(
4949
{core::QueryConfig::kSpillFileCreateConfig,
5050
std::string(SystemConfig::kSpillerFileCreateConfig)},
5151
{core::QueryConfig::kSpillEnabled,
52-
std::string(SystemConfig::kSpillEnabled)},
52+
std::string(SystemConfig::kSpillEnabled)},
5353
{core::QueryConfig::kJoinSpillEnabled,
54-
std::string(SystemConfig::kJoinSpillEnabled)},
54+
std::string(SystemConfig::kJoinSpillEnabled)},
5555
{core::QueryConfig::kOrderBySpillEnabled,
56-
std::string(SystemConfig::kOrderBySpillEnabled)},
56+
std::string(SystemConfig::kOrderBySpillEnabled)},
5757
{core::QueryConfig::kAggregationSpillEnabled,
58-
std::string(SystemConfig::kAggregationSpillEnabled)},
58+
std::string(SystemConfig::kAggregationSpillEnabled)},
5959
{core::QueryConfig::kMaxSpillBytes,
6060
std::string(SystemConfig::kMaxSpillBytes)},
6161
{core::QueryConfig::kRequestDataSizesMaxWaitSec,
62-
std::string(SystemConfig::kRequestDataSizesMaxWaitSec)},
62+
std::string(SystemConfig::kRequestDataSizesMaxWaitSec)},
6363
{core::QueryConfig::kMaxSplitPreloadPerDriver,
64-
std::string(SystemConfig::kDriverMaxSplitPreload)},
64+
std::string(SystemConfig::kDriverMaxSplitPreload)},
6565
{core::QueryConfig::kMaxLocalExchangePartitionBufferSize,
66-
std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize)}};
66+
std::string(SystemConfig::kMaxLocalExchangePartitionBufferSize)}};
6767
for (const auto& configNameEntry : veloxToSystemConfigMapping) {
6868
const auto& veloxConfigName = configNameEntry.first;
6969
const auto& systemConfigName = configNameEntry.second;
@@ -93,8 +93,7 @@ toConnectorConfigs(const protocol::TaskUpdateRequest& taskUpdateRequest) {
9393
taskUpdateRequest.extraCredentials.begin(),
9494
taskUpdateRequest.extraCredentials.end());
9595
connectorConfig.insert({"user", taskUpdateRequest.session.user});
96-
connectorConfigs.insert(
97-
{entry.first, connectorConfig});
96+
connectorConfigs.insert({entry.first, connectorConfig});
9897
}
9998

10099
return connectorConfigs;
@@ -252,7 +251,6 @@ QueryContextManager::toVeloxConfigs(
252251
velox::common::compressionKindToString(compressionKind);
253252
} else {
254253
configs[sessionProperties_.toVeloxConfig(it.first)] = it.second;
255-
sessionProperties_.updateVeloxConfig(it.first, it.second);
256254
}
257255
}
258256

presto-native-execution/presto_cpp/main/SessionProperties.cpp

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,6 @@ SessionProperties::SessionProperties() {
316316
QueryConfig::kQueryTraceMaxBytes,
317317
std::to_string(c.queryTraceMaxBytes()));
318318

319-
320319
addSessionProperty(
321320
kOpTraceDirectoryCreateConfig,
322321
"Config used to create operator trace directory. This config is provided to"
@@ -507,31 +506,20 @@ SessionProperties::SessionProperties() {
507506
}
508507

509508
const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
510-
SessionProperties::getSessionProperties() {
509+
SessionProperties::testingSessionProperties() const {
511510
return sessionProperties_;
512511
}
513512

514-
const std::string SessionProperties::toVeloxConfig(const std::string& name) {
513+
const std::string SessionProperties::toVeloxConfig(
514+
const std::string& name) const {
515515
auto it = sessionProperties_.find(name);
516516
return it == sessionProperties_.end() ? name
517517
: it->second->getVeloxConfigName();
518518
}
519519

520-
void SessionProperties::updateVeloxConfig(
521-
const std::string& name,
522-
const std::string& value) {
523-
auto it = sessionProperties_.find(name);
524-
// Velox config value is updated only for presto session properties.
525-
if (it == sessionProperties_.end()) {
526-
return;
527-
}
528-
it->second->updateValue(value);
529-
}
530-
531-
json SessionProperties::serialize() {
520+
json SessionProperties::serialize() const {
532521
json j = json::array();
533-
const auto sessionProperties = getSessionProperties();
534-
for (const auto& entry : sessionProperties) {
522+
for (const auto& entry : sessionProperties_) {
535523
j.push_back(entry.second->serialize());
536524
}
537525
return j;

presto-native-execution/presto_cpp/main/SessionProperties.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -324,18 +324,16 @@ class SessionProperties {
324324

325325
SessionProperties();
326326

327-
const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
328-
getSessionProperties();
329-
330327
/// Utility function to translate a config name in Presto to its equivalent in
331328
/// Velox. Returns 'name' as is if there is no mapping.
332-
const std::string toVeloxConfig(const std::string& name);
329+
const std::string toVeloxConfig(const std::string& name) const;
333330

334-
void updateVeloxConfig(const std::string& name, const std::string& value);
331+
json serialize() const;
335332

336-
json serialize();
333+
const std::unordered_map<std::string, std::shared_ptr<SessionProperty>>&
334+
testingSessionProperties() const;
337335

338-
protected:
336+
private:
339337
void addSessionProperty(
340338
const std::string& name,
341339
const std::string& description,

presto-native-execution/presto_cpp/main/tests/SessionPropertiesTest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ TEST_F(SessionPropertiesTest, validateMapping) {
5353
core::QueryConfig::kRequestDataSizesMaxWaitSec,
5454
core::QueryConfig::kQueryMemoryReclaimerPriority,
5555
core::QueryConfig::kMaxNumSplitsListenedTo};
56-
auto sessionProperties = SessionProperties().getSessionProperties();
56+
auto sessionProperties = SessionProperties().testingSessionProperties();
5757
const auto len = names.size();
5858
for (auto i = 0; i < len; i++) {
5959
EXPECT_EQ(
@@ -68,7 +68,7 @@ TEST_F(SessionPropertiesTest, serializeProperty) {
6868
for (const auto& property : j) {
6969
auto name = property["name"];
7070
json expectedProperty =
71-
properties.getSessionProperties().at(name)->serialize();
71+
properties.testingSessionProperties().at(name)->serialize();
7272
EXPECT_EQ(property, expectedProperty);
7373
}
7474
}

0 commit comments

Comments
 (0)