Skip to content

Commit 52132d8

Browse files
authored
refactor: Remove unnecessary BroadcastFactory (#26448)
Summary: Broadcast reader/writer is not used in a factory framework so there is no need to have a factory class. Differential Revision: D85187166 ``` == NO RELEASE NOTE == ```
1 parent fab96c5 commit 52132d8

File tree

7 files changed

+30
-76
lines changed

7 files changed

+30
-76
lines changed

presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
#include <fmt/format.h>
1515
#include <folly/Uri.h>
1616

17-
#include "presto_cpp/main/common/Configs.h"
1817
#include "presto_cpp/main/operators/BroadcastExchangeSource.h"
1918

2019
using namespace facebook::velox;
@@ -133,12 +132,14 @@ BroadcastExchangeSource::createExchangeSource(
133132
VELOX_USER_FAIL("BroadcastInfo deserialization failed: {}", e.what());
134133
}
135134

136-
auto fileSystemBroadcast = BroadcastFactory(broadcastFileInfo->filePath_);
135+
auto fileSystem =
136+
velox::filesystems::getFileSystem(broadcastFileInfo->filePath_, nullptr);
137137
return std::make_shared<BroadcastExchangeSource>(
138138
uri.host(),
139139
destination,
140140
queue,
141-
fileSystemBroadcast.createReader(std::move(broadcastFileInfo), pool),
141+
std::make_shared<BroadcastFileReader>(
142+
broadcastFileInfo, fileSystem, pool),
142143
pool);
143144
}
144145
} // namespace facebook::presto::operators

presto-native-execution/presto_cpp/main/operators/BroadcastExchangeSource.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313
*/
1414
#pragma once
1515

16-
#include "presto_cpp/main/operators/BroadcastFactory.h"
17-
#include "velox/core/PlanNode.h"
18-
#include "velox/exec/Exchange.h"
19-
#include "velox/exec/Operator.h"
16+
#include "presto_cpp/main/operators/BroadcastFile.h"
17+
#include "velox/exec/ExchangeQueue.h"
18+
#include "velox/exec/ExchangeSource.h"
2019

2120
namespace facebook::presto::operators {
2221

presto-native-execution/presto_cpp/main/operators/BroadcastFactory.cpp renamed to presto-native-execution/presto_cpp/main/operators/BroadcastFile.cpp

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14-
#include "presto_cpp/main/operators/BroadcastFactory.h"
15-
#include <boost/lexical_cast.hpp>
16-
#include <boost/uuid/uuid_generators.hpp>
17-
#include <boost/uuid/uuid_io.hpp>
14+
#include "presto_cpp/main/operators/BroadcastFile.h"
1815
#include "presto_cpp/external/json/nlohmann/json.hpp"
1916
#include "presto_cpp/main/common/Exception.h"
2017
#include "presto_cpp/main/thrift/ThriftIO.h"
@@ -39,40 +36,6 @@ namespace facebook::presto::operators {
3936
"{}", \
4037
errorMessage);
4138

42-
namespace {
43-
std::string makeUuid() {
44-
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
45-
}
46-
} // namespace
47-
48-
BroadcastFactory::BroadcastFactory(const std::string& basePath)
49-
: basePath_(basePath) {
50-
VELOX_CHECK(!basePath.empty(), "Base path for broadcast files is empty!");
51-
fileSystem_ = velox::filesystems::getFileSystem(basePath, nullptr);
52-
}
53-
54-
std::unique_ptr<BroadcastFileWriter> BroadcastFactory::createWriter(
55-
uint64_t writeBufferSize,
56-
uint64_t maxBroadcastBytes,
57-
velox::memory::MemoryPool* pool,
58-
std::unique_ptr<VectorSerde::Options> serdeOptions) {
59-
fileSystem_->mkdir(basePath_);
60-
return std::make_unique<BroadcastFileWriter>(
61-
fmt::format("{}/file_broadcast_{}", basePath_, makeUuid()),
62-
maxBroadcastBytes,
63-
writeBufferSize,
64-
std::move(serdeOptions),
65-
pool);
66-
}
67-
68-
std::shared_ptr<BroadcastFileReader> BroadcastFactory::createReader(
69-
std::unique_ptr<BroadcastFileInfo> fileInfo,
70-
velox::memory::MemoryPool* pool) {
71-
auto broadcastFileReader =
72-
std::make_shared<BroadcastFileReader>(fileInfo, fileSystem_, pool);
73-
return broadcastFileReader;
74-
}
75-
7639
// static
7740
std::unique_ptr<BroadcastFileInfo> BroadcastFileInfo::deserialize(
7841
const std::string& info) {

presto-native-execution/presto_cpp/main/operators/BroadcastFactory.h renamed to presto-native-execution/presto_cpp/main/operators/BroadcastFile.h

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -118,27 +118,4 @@ class BroadcastFileReader {
118118
uint32_t numPagesRead_{0};
119119
std::vector<int64_t> pageSizes_;
120120
};
121-
122-
/// Factory to create Writers & Reader for file based broadcast.
123-
class BroadcastFactory {
124-
public:
125-
/// Create FileBroadcast to write files under specified basePath.
126-
BroadcastFactory(const std::string& basePath);
127-
128-
virtual ~BroadcastFactory() = default;
129-
130-
std::unique_ptr<BroadcastFileWriter> createWriter(
131-
uint64_t writeBufferSize,
132-
uint64_t maxBroadcastBytes,
133-
velox::memory::MemoryPool* pool,
134-
std::unique_ptr<velox::VectorSerde::Options> serdeOptions);
135-
136-
std::shared_ptr<BroadcastFileReader> createReader(
137-
const std::unique_ptr<BroadcastFileInfo> fileInfo,
138-
velox::memory::MemoryPool* pool);
139-
140-
private:
141-
const std::string basePath_;
142-
std::shared_ptr<velox::filesystems::FileSystem> fileSystem_;
143-
};
144121
} // namespace facebook::presto::operators

presto-native-execution/presto_cpp/main/operators/BroadcastWrite.cpp

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,21 @@
1212
* limitations under the License.
1313
*/
1414
#include "presto_cpp/main/operators/BroadcastWrite.h"
15-
#include "presto_cpp/main/common/Configs.h"
16-
#include "presto_cpp/main/operators/BroadcastFactory.h"
15+
#include <boost/lexical_cast.hpp>
16+
#include <boost/uuid/uuid_generators.hpp>
17+
#include <boost/uuid/uuid_io.hpp>
18+
#include "presto_cpp/main/operators/BroadcastFile.h"
19+
#include "velox/common/file/FileSystems.h"
1720

1821
using namespace facebook::velox::exec;
1922
using namespace facebook::velox;
2023

2124
namespace facebook::presto::operators {
2225
namespace {
26+
std::string makeUuid() {
27+
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
28+
}
29+
2330
velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
2431
return obj["id"].asString();
2532
}
@@ -58,12 +65,16 @@ class BroadcastWriteOperator : public Operator {
5865
planNode->serdeRowType(),
5966
planNode->serdeRowType())),
6067
maxBroadcastBytes_(planNode->maxBroadcastBytes()) {
61-
auto fileBroadcast = BroadcastFactory(planNode->basePath());
62-
fileBroadcastWriter_ = fileBroadcast.createWriter(
63-
8 << 20,
68+
const auto& basePath = planNode->basePath();
69+
VELOX_CHECK(!basePath.empty(), "Base path for broadcast files is empty!");
70+
auto fileSystem = velox::filesystems::getFileSystem(basePath, nullptr);
71+
fileSystem->mkdir(basePath);
72+
fileBroadcastWriter_ = std::make_unique<BroadcastFileWriter>(
73+
fmt::format("{}/file_broadcast_{}", basePath, makeUuid()),
6474
planNode->maxBroadcastBytes(),
65-
operatorCtx_->pool(),
66-
getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto));
75+
8 << 20,
76+
getVectorSerdeOptions(ctx->queryConfig(), VectorSerde::Kind::kPresto),
77+
operatorCtx_->pool());
6778
}
6879

6980
bool needsInput() const override {

presto-native-execution/presto_cpp/main/operators/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ add_library(
1919
ShuffleWrite.cpp
2020
LocalShuffle.cpp
2121
BroadcastWrite.cpp
22-
BroadcastFactory.cpp
22+
BroadcastFile.cpp
2323
BroadcastExchangeSource.cpp
2424
BinarySortableSerializer.cpp
2525
)

presto-native-execution/presto_cpp/main/operators/tests/BroadcastTest.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
#include <folly/Uri.h>
1616
#include "presto_cpp/main/common/Exception.h"
1717
#include "presto_cpp/main/operators/BroadcastExchangeSource.h"
18+
#include "presto_cpp/main/operators/BroadcastFile.h"
1819
#include "presto_cpp/main/operators/BroadcastWrite.h"
1920
#include "presto_cpp/main/operators/tests/PlanBuilder.h"
2021
#include "velox/buffer/Buffer.h"
2122
#include "velox/common/base/tests/GTestUtils.h"
2223
#include "velox/common/compression/Compression.h"
2324
#include "velox/common/file/FileSystems.h"
2425
#include "velox/core/QueryConfig.h"
26+
#include "velox/exec/Exchange.h"
27+
#include "velox/exec/ExchangeSource.h"
2528
#include "velox/exec/tests/utils/OperatorTestBase.h"
2629
#include "velox/exec/tests/utils/PlanBuilder.h"
2730
#include "velox/exec/tests/utils/QueryAssertions.h"

0 commit comments

Comments
 (0)