Skip to content

Commit 4f8c525

Browse files
committed
Add CLP connector native code
1 parent 9c86114 commit 4f8c525

20 files changed

+906
-4
lines changed

presto-native-execution/presto_cpp/main/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ target_link_libraries(
5555
velox_abfs
5656
velox_aggregates
5757
velox_caching
58+
velox_clp_connector
5859
velox_common_base
5960
velox_core
6061
velox_dwio_common_exception

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.cpp

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
1616
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
1717
#include "presto_cpp/main/types/TypeParser.h"
18+
#include "presto_cpp/presto_protocol/connector/clp/ClpConnectorProtocol.h"
1819
#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h"
1920
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
2021
#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h"
2122

2223
#include <velox/type/fbhive/HiveTypeParser.h>
24+
#include "velox/connectors/clp/ClpColumnHandle.h"
25+
#include "velox/connectors/clp/ClpConnectorSplit.h"
26+
#include "velox/connectors/clp/ClpTableHandle.h"
2327
#include "velox/connectors/hive/HiveConnector.h"
2428
#include "velox/connectors/hive/HiveConnectorSplit.h"
2529
#include "velox/connectors/hive/HiveDataSink.h"
@@ -1412,9 +1416,11 @@ IcebergPrestoToVeloxConnector::toVeloxColumnHandle(
14121416
// constructor similar to how Hive Connector is handling for bucketing
14131417
velox::type::fbhive::HiveTypeParser hiveTypeParser;
14141418
auto type = stringToType(icebergColumn->type, typeParser);
1415-
connector::hive::HiveColumnHandle::ColumnParseParameters columnParseParameters;
1419+
connector::hive::HiveColumnHandle::ColumnParseParameters
1420+
columnParseParameters;
14161421
if (type->isDate()) {
1417-
columnParseParameters.partitionDateValueFormat = connector::hive::HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch;
1422+
columnParseParameters.partitionDateValueFormat = connector::hive::
1423+
HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch;
14181424
}
14191425
return std::make_unique<connector::hive::HiveColumnHandle>(
14201426
icebergColumn->columnIdentity.name,
@@ -1548,4 +1554,62 @@ std::unique_ptr<protocol::ConnectorProtocol>
15481554
TpchPrestoToVeloxConnector::createConnectorProtocol() const {
15491555
return std::make_unique<protocol::tpch::TpchConnectorProtocol>();
15501556
}
1557+
1558+
std::unique_ptr<velox::connector::ConnectorSplit>
1559+
ClpPrestoToVeloxConnector::toVeloxSplit(
1560+
const protocol::ConnectorId& catalogId,
1561+
const protocol::ConnectorSplit* connectorSplit,
1562+
const protocol::SplitContext* splitContext) const {
1563+
auto clpSplit = dynamic_cast<const protocol::clp::ClpSplit*>(connectorSplit);
1564+
VELOX_CHECK_NOT_NULL(
1565+
clpSplit, "Unexpected split type {}", connectorSplit->_type);
1566+
return std::make_unique<connector::clp::ClpConnectorSplit>(
1567+
catalogId, clpSplit->path);
1568+
}
1569+
1570+
std::unique_ptr<velox::connector::ColumnHandle>
1571+
ClpPrestoToVeloxConnector::toVeloxColumnHandle(
1572+
const protocol::ColumnHandle* column,
1573+
const TypeParser& typeParser) const {
1574+
auto clpColumn = dynamic_cast<const protocol::clp::ClpColumnHandle*>(column);
1575+
VELOX_CHECK_NOT_NULL(
1576+
clpColumn, "Unexpected column handle type {}", column->_type);
1577+
return std::make_unique<connector::clp::ClpColumnHandle>(
1578+
clpColumn->columnName,
1579+
clpColumn->originalColumnName,
1580+
typeParser.parse(clpColumn->columnType),
1581+
clpColumn->nullable);
1582+
}
1583+
1584+
std::unique_ptr<velox::connector::ConnectorTableHandle>
1585+
ClpPrestoToVeloxConnector::toVeloxTableHandle(
1586+
const protocol::TableHandle& tableHandle,
1587+
const VeloxExprConverter& exprConverter,
1588+
const TypeParser& typeParser,
1589+
std::unordered_map<
1590+
std::string,
1591+
std::shared_ptr<velox::connector::ColumnHandle>>& assignments) const {
1592+
auto clpLayout =
1593+
std::dynamic_pointer_cast<const protocol::clp::ClpTableLayoutHandle>(
1594+
tableHandle.connectorTableLayout);
1595+
VELOX_CHECK_NOT_NULL(
1596+
clpLayout,
1597+
"Unexpected layout type {}",
1598+
tableHandle.connectorTableLayout->_type);
1599+
auto storageType =
1600+
(clpLayout->table.storageType == protocol::clp::StorageType::S3)
1601+
? connector::clp::ClpTableHandle::StorageType::kS3
1602+
: connector::clp::ClpTableHandle::StorageType::kFS;
1603+
return std::make_unique<connector::clp::ClpTableHandle>(
1604+
tableHandle.connectorId,
1605+
clpLayout->table.schemaTableName.table,
1606+
storageType,
1607+
clpLayout->kqlQuery);
1608+
}
1609+
1610+
std::unique_ptr<protocol::ConnectorProtocol>
1611+
ClpPrestoToVeloxConnector::createConnectorProtocol() const {
1612+
return std::make_unique<protocol::clp::ClpConnectorProtocol>();
1613+
}
1614+
15511615
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/connectors/PrestoToVeloxConnector.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
*/
1414
#pragma once
1515

16+
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
1617
#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h"
1718
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
18-
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
1919
#include "velox/connectors/Connector.h"
2020
#include "velox/connectors/hive/TableHandle.h"
2121
#include "velox/core/PlanNode.h"
@@ -223,4 +223,31 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector {
223223
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
224224
const final;
225225
};
226+
227+
class ClpPrestoToVeloxConnector final : public PrestoToVeloxConnector {
228+
public:
229+
explicit ClpPrestoToVeloxConnector(std::string connectorName)
230+
: PrestoToVeloxConnector(std::move(connectorName)) {}
231+
232+
std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
233+
const protocol::ConnectorId& catalogId,
234+
const protocol::ConnectorSplit* connectorSplit,
235+
const protocol::SplitContext* splitContext) const final;
236+
237+
std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
238+
const protocol::ColumnHandle* column,
239+
const TypeParser& typeParser) const final;
240+
241+
std::unique_ptr<velox::connector::ConnectorTableHandle> toVeloxTableHandle(
242+
const protocol::TableHandle& tableHandle,
243+
const VeloxExprConverter& exprConverter,
244+
const TypeParser& typeParser,
245+
std::unordered_map<
246+
std::string,
247+
std::shared_ptr<velox::connector::ColumnHandle>>& assignments)
248+
const final;
249+
250+
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
251+
const final;
252+
};
226253
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/connectors/Registration.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "presto_cpp/main/connectors/arrow_flight/ArrowPrestoToVeloxConnector.h"
2020
#endif
2121

22+
#include "velox/connectors/clp/ClpConnector.h"
2223
#include "velox/connectors/hive/HiveConnector.h"
2324
#include "velox/connectors/tpch/TpchConnector.h"
2425

@@ -45,6 +46,12 @@ void registerConnectorFactories() {
4546
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
4647
}
4748

49+
if (!velox::connector::hasConnectorFactory(
50+
velox::connector::clp::ClpConnectorFactory::kClpConnectorName)) {
51+
velox::connector::registerConnectorFactory(
52+
std::make_shared<velox::connector::clp::ClpConnectorFactory>());
53+
}
54+
4855
// Register Velox connector factory for iceberg.
4956
// The iceberg catalog is handled by the hive connector factory.
5057
if (!velox::connector::hasConnectorFactory(kIcebergConnectorName)) {
@@ -74,6 +81,8 @@ void registerConnectors() {
7481
std::make_unique<IcebergPrestoToVeloxConnector>(kIcebergConnectorName));
7582
registerPrestoToVeloxConnector(std::make_unique<TpchPrestoToVeloxConnector>(
7683
velox::connector::tpch::TpchConnectorFactory::kTpchConnectorName));
84+
registerPrestoToVeloxConnector(std::make_unique<ClpPrestoToVeloxConnector>(
85+
velox::connector::clp::ClpConnectorFactory::kClpConnectorName));
7786

7887
// Presto server uses system catalog or system schema in other catalogs
7988
// in different places in the code. All these resolve to the SystemConnector.

presto-native-execution/presto_cpp/main/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ target_link_libraries(
4747
$<TARGET_OBJECTS:presto_types>
4848
velox_hive_connector
4949
velox_tpch_connector
50+
velox_clp_connector
5051
velox_presto_serializer
5152
velox_functions_prestosql
5253
velox_aggregates

presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ target_link_libraries(
2727
velox_dwio_orc_reader
2828
velox_hive_connector
2929
velox_tpch_connector
30+
velox_clp_connector
3031
velox_exec
3132
velox_dwio_common_exception
3233
presto_type_converter
@@ -64,6 +65,7 @@ target_link_libraries(
6465
velox_functions_lib
6566
velox_hive_connector
6667
velox_tpch_connector
68+
velox_clp_connector
6769
velox_hive_partition_function
6870
velox_presto_serializer
6971
velox_serialization
@@ -96,6 +98,7 @@ target_link_libraries(
9698
velox_dwio_common
9799
velox_hive_connector
98100
velox_tpch_connector
101+
velox_clp_connector
99102
GTest::gtest
100103
GTest::gtest_main)
101104

presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ TEST_F(PrestoToVeloxConnectorTest, registerVariousConnectors) {
3333
"iceberg", std::make_unique<IcebergPrestoToVeloxConnector>("iceberg")));
3434
connectorList.emplace_back(
3535
std::pair("tpch", std::make_unique<HivePrestoToVeloxConnector>("tpch")));
36+
connectorList.emplace_back(
37+
std::pair("clp", std::make_unique<ClpPrestoToVeloxConnector>("clp")));
3638

3739
for (auto& [connectorName, connector] : connectorList) {
3840
registerPrestoToVeloxConnector(std::move(connector));

presto-native-execution/presto_cpp/presto_protocol/Makefile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,25 @@ presto_protocol-cpp: presto_protocol-json
5252
chevron -d connector/arrow_flight/presto_protocol_arrow_flight.json connector/arrow_flight/presto_protocol-json-hpp.mustache >> connector/arrow_flight/presto_protocol_arrow_flight.h
5353
clang-format -style=file -i connector/arrow_flight/presto_protocol_arrow_flight.h connector/arrow_flight/presto_protocol_arrow_flight.cpp
5454

55-
presto_protocol-json:
55+
# build clp connector related structs
56+
echo "// DO NOT EDIT : This file is generated by chevron" > connector/clp/presto_protocol_clp.cpp
57+
chevron -d connector/clp/presto_protocol_clp.json connector/clp/presto_protocol-json-cpp.mustache >> connector/clp/presto_protocol_clp.cpp
58+
echo "// DO NOT EDIT : This file is generated by chevron" > connector/clp/presto_protocol_clp.h
59+
chevron -d connector/clp/presto_protocol_clp.json connector/clp/presto_protocol-json-hpp.mustache >> connector/clp/presto_protocol_clp.h
60+
clang-format -style=file -i connector/clp/presto_protocol_clp.h connector/clp/presto_protocol_clp.cpp
61+
62+
presto_protocol-json:
5663
./java-to-struct-json.py --config core/presto_protocol_core.yml core/special/*.java core/special/*.inc -j | jq . > core/presto_protocol_core.json
5764
./java-to-struct-json.py --config connector/hive/presto_protocol_hive.yml connector/hive/special/*.inc -j | jq . > connector/hive/presto_protocol_hive.json
5865
./java-to-struct-json.py --config connector/iceberg/presto_protocol_iceberg.yml connector/iceberg/special/*.inc -j | jq . > connector/iceberg/presto_protocol_iceberg.json
5966
./java-to-struct-json.py --config connector/tpch/presto_protocol_tpch.yml connector/tpch/special/*.inc -j | jq . > connector/tpch/presto_protocol_tpch.json
6067
./java-to-struct-json.py --config connector/arrow_flight/presto_protocol_arrow_flight.yml connector/arrow_flight/special/*.inc -j | jq . > connector/arrow_flight/presto_protocol_arrow_flight.json
68+
./java-to-struct-json.py --config connector/clp/presto_protocol_clp.yml connector/clp/special/*.inc -j | jq . > connector/clp/presto_protocol_clp.json
6169

6270
presto_protocol.proto: presto_protocol-json
6371
pystache presto_protocol-protobuf.mustache core/presto_protocol_core.json > core/presto_protocol_core.proto
6472
pystache presto_protocol-protobuf.mustache connector/hive/presto_protocol_hive.json > connector/hive/presto_protocol_hive.proto
6573
pystache presto_protocol-protobuf.mustache connector/iceberg/presto_protocol_iceberg.json > connector/iceberg/presto_protocol_iceberg.proto
6674
pystache presto_protocol-protobuf.mustache connector/tpch/presto_protocol_tpch.json > connector/tpch/presto_protocol_tpch.proto
6775
pystache presto_protocol-protobuf.mustache connector/arrow_flight/presto_protocol_arrow_flight.json > connector/arrow_flight/presto_protocol_arrow_flight.proto
76+
pystache presto_protocol-protobuf.mustache connector/clp/presto_protocol_clp.json > connector/clp/presto_protocol_clp.proto
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
#pragma once
15+
#include "presto_cpp/presto_protocol/connector/clp/presto_protocol_clp.h"
16+
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"
17+
18+
namespace facebook::presto::protocol::clp {
19+
using ClpConnectorProtocol = ConnectorProtocolTemplate<
20+
ClpTableHandle,
21+
ClpTableLayoutHandle,
22+
ClpColumnHandle,
23+
NotImplemented,
24+
NotImplemented,
25+
ClpSplit,
26+
NotImplemented,
27+
ClpTransactionHandle,
28+
NotImplemented>;
29+
} // namespace facebook::presto::protocol::clp

0 commit comments

Comments
 (0)