Skip to content

Commit 89b1c7d

Browse files
author
bingtao.yin
committed
add argo catalog
1 parent 61a493d commit 89b1c7d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1465
-106
lines changed

be/CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,17 @@ else()
690690
endif()
691691
endif()
692692

693+
# Add tddms deps
694+
set(TDDMS_INCLUDE_DIR $ENV{DORIS_THIRDPARTY}/tddms_client/include)
695+
set(TDDMS_LIB_DIR $ENV{DORIS_THIRDPARTY}/tddms_client/lib)
696+
set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} shiva_client)
697+
link_directories(${TDDMS_LIB_DIR} ${TDDMS_LIB_DIR})
698+
699+
# Enable multithread for linking; see ld.gold --help
700+
if(COMPILER_GCC)
701+
set(DORIS_LINK_LIBS "${DORIS_LINK_LIBS} -fuse-ld=gold -Wl,--threads -Wl,--thread-count=4")
702+
endif()
703+
693704
# Set libraries for test
694705
set (TEST_LINK_LIBS ${DORIS_LINK_LIBS}
695706
${WL_START_GROUP}

be/cmake/thirdparty.cmake

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,43 +27,55 @@ set(COMMON_THIRDPARTY)
2727
# if arg exist wholelibpath, use wholelibpath to find library
2828
function(add_thirdparty)
2929
cmake_parse_arguments(DORIS_THIRDPARTY
30-
"NOTADD;LIB64"
30+
"NOTADD;LIB64;SHARED"
3131
"LIBNAME;WHOLELIBPATH"
3232
""
3333
${ARGN})
3434

3535
set(DORIS_THIRDPARTY_NAME ${DORIS_THIRDPARTY_UNPARSED_ARGUMENTS})
36-
add_library(${DORIS_THIRDPARTY_NAME} STATIC IMPORTED)
36+
if (DORIS_THIRDPARTY_SHARED)
37+
add_library(${DORIS_THIRDPARTY_NAME} SHARED IMPORTED)
38+
else()
39+
add_library(${DORIS_THIRDPARTY_NAME} STATIC IMPORTED)
40+
endif()
3741

3842
if (NOT DORIS_THIRDPARTY_NOTADD)
3943
set(COMMON_THIRDPARTY ${COMMON_THIRDPARTY} ${DORIS_THIRDPARTY_NAME} PARENT_SCOPE)
4044
endif()
4145

4246
if (DORIS_THIRDPARTY_LIB64)
43-
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/lib64/lib${DORIS_THIRDPARTY_NAME}.a)
47+
if (DORIS_THIRDPARTY_SHARED)
48+
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/lib64/lib${DORIS_THIRDPARTY_NAME}.so)
49+
else()
50+
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/lib64/lib${DORIS_THIRDPARTY_NAME}.a)
51+
endif()
4452
elseif (DORIS_THIRDPARTY_LIBNAME)
4553
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/${DORIS_THIRDPARTY_LIBNAME})
4654
elseif (DORIS_THIRDPARTY_WHOLELIBPATH)
4755
set(DORIS_THIRDPARTY_LIBPATH ${DORIS_THIRDPARTY_WHOLELIBPATH})
4856
else()
49-
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/lib/lib${DORIS_THIRDPARTY_NAME}.a)
57+
if (DORIS_THIRDPARTY_SHARED)
58+
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/lib/lib${DORIS_THIRDPARTY_NAME}.so)
59+
else()
60+
set(DORIS_THIRDPARTY_LIBPATH ${THIRDPARTY_DIR}/lib/lib${DORIS_THIRDPARTY_NAME}.a)
61+
endif()
5062
endif()
5163
set_target_properties(${DORIS_THIRDPARTY_NAME} PROPERTIES IMPORTED_LOCATION ${DORIS_THIRDPARTY_LIBPATH})
5264
endfunction()
5365

54-
add_thirdparty(gflags)
55-
add_thirdparty(glog)
66+
add_thirdparty(gflags SHARED)
67+
add_thirdparty(glog SHARED)
5668
add_thirdparty(backtrace)
57-
add_thirdparty(re2)
69+
add_thirdparty(re2 SHARED)
5870
add_thirdparty(hyperscan LIBNAME "lib64/libhs.a")
5971
add_thirdparty(odbc)
6072
add_thirdparty(pprof WHOLELIBPATH ${GPERFTOOLS_HOME}/lib/libprofiler.a)
61-
add_thirdparty(protobuf)
73+
add_thirdparty(protobuf SHARED)
6274
add_thirdparty(gtest)
6375
add_thirdparty(gtest_main)
6476
add_thirdparty(benchmark)
6577
add_thirdparty(gmock)
66-
add_thirdparty(snappy)
78+
add_thirdparty(snappy SHARED)
6779
add_thirdparty(curl)
6880
add_thirdparty(lz4)
6981
add_thirdparty(thrift)
@@ -90,14 +102,14 @@ if (USE_UNWIND)
90102
add_thirdparty(libunwind LIBNAME "lib64/libunwind.a")
91103
endif()
92104

93-
add_thirdparty(grpc++_reflection LIB64)
94-
add_thirdparty(grpc LIB64)
95-
add_thirdparty(grpc++ LIB64)
96-
add_thirdparty(grpc++_unsecure LIB64)
97-
add_thirdparty(gpr LIB64)
98-
add_thirdparty(upb LIB64)
99-
add_thirdparty(cares LIB64)
100-
add_thirdparty(address_sorting LIB64)
105+
add_thirdparty(grpc++_reflection SHARED LIB64)
106+
add_thirdparty(grpc SHARED LIB64)
107+
add_thirdparty(grpc++ SHARED LIB64)
108+
add_thirdparty(grpc++_unsecure SHARED LIB64)
109+
add_thirdparty(gpr SHARED LIB64)
110+
add_thirdparty(upb SHARED LIB64)
111+
add_thirdparty(cares SHARED LIB64)
112+
add_thirdparty(address_sorting SHARED LIB64)
101113
add_thirdparty(z LIB64)
102114

103115
add_thirdparty(brotlicommon LIB64)
@@ -108,9 +120,9 @@ add_thirdparty(arrow LIB64)
108120
add_thirdparty(arrow_flight LIB64)
109121
add_thirdparty(arrow_flight_sql LIB64)
110122
add_thirdparty(parquet LIB64)
111-
add_thirdparty(brpc LIB64)
123+
add_thirdparty(brpc SHARED LIB64)
112124
add_thirdparty(rocksdb)
113-
add_thirdparty(cyrus-sasl WHOLELIBPATH "/lib/x86_64-linux-gnu/libsasl2.so")
125+
add_thirdparty(cyrus-sasl SHARED LIBNAME "lib/libsasl2.so")
114126
# put this after lz4 to avoid using lz4 lib in librdkafka
115127
add_thirdparty(rdkafka_cpp LIBNAME "lib/librdkafka++.a")
116128
add_thirdparty(rdkafka)
@@ -121,10 +133,10 @@ add_thirdparty(fmt)
121133
add_thirdparty(cctz)
122134
add_thirdparty(base64)
123135

124-
add_thirdparty(aws-cpp-sdk-core LIB64)
125-
add_thirdparty(aws-cpp-sdk-s3 LIB64)
126-
add_thirdparty(aws-cpp-sdk-transfer LIB64)
127-
add_thirdparty(aws-cpp-sdk-s3-crt LIB64)
136+
add_thirdparty(aws-cpp-sdk-core SHARED LIB64)
137+
add_thirdparty(aws-cpp-sdk-s3 SHARED LIB64)
138+
add_thirdparty(aws-cpp-sdk-transfer SHARED LIB64)
139+
add_thirdparty(aws-cpp-sdk-s3-crt SHARED LIB64)
128140
add_thirdparty(aws-crt-cpp LIB64)
129141
add_thirdparty(aws-c-cal LIB64)
130142
add_thirdparty(aws-c-auth LIB64)
@@ -137,8 +149,8 @@ add_thirdparty(aws-c-mqtt LIB64)
137149
add_thirdparty(aws-checksums LIB64)
138150
add_thirdparty(aws-c-s3 LIB64)
139151
add_thirdparty(aws-c-sdkutils LIB64)
140-
add_thirdparty(aws-cpp-sdk-identity-management LIB64)
141-
add_thirdparty(aws-cpp-sdk-sts LIB64)
152+
add_thirdparty(aws-cpp-sdk-identity-management SHARED LIB64)
153+
add_thirdparty(aws-cpp-sdk-sts SHARED LIB64)
142154
if (NOT OS_MACOSX)
143155
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
144156
endif()
@@ -156,11 +168,11 @@ add_thirdparty(idn LIB64)
156168
add_thirdparty(xml2 LIB64)
157169
add_thirdparty(lzma LIB64)
158170
add_thirdparty(gsasl)
159-
add_thirdparty(krb5support WHOLELIBPATH "/lib/x86_64-linux-gnu/libkrb5support.so")
160-
add_thirdparty(krb5 WHOLELIBPATH "/lib/x86_64-linux-gnu/libkrb5.so")
161-
add_thirdparty(com_err WHOLELIBPATH "/lib/x86_64-linux-gnu/libcom_err.so")
162-
add_thirdparty(k5crypto WHOLELIBPATH "/lib/x86_64-linux-gnu/libk5crypto.so")
163-
add_thirdparty(gssapi_krb5 WHOLELIBPATH "/lib/x86_64-linux-gnu/libgssapi_krb5.so")
171+
add_thirdparty(krb5support SHARED)
172+
add_thirdparty(krb5 SHARED)
173+
add_thirdparty(com_err SHARED)
174+
add_thirdparty(k5crypto SHARED)
175+
add_thirdparty(gssapi_krb5 SHARED)
164176
add_thirdparty(streamvbyte LIB64)
165177

166178
if (OS_MACOSX)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Created on: 2025-08-28
3+
* Author: bingtao.yin@transwarp.io
4+
*/
5+
6+
#include "pipeline/exec/argo_scan_operator.h"
7+
8+
#include "vec/exec/scan/argo_scanner.h"
9+
10+
namespace doris::pipeline {
11+
12+
Status ArgoScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
13+
auto& p = _parent->cast<ArgoScanOperatorX>();
14+
std::unique_ptr<vectorized::ArgoScanner> scanner =
15+
vectorized::ArgoScanner::create_unique(state(), this, p._limit, _scanner_profile.get());
16+
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
17+
scanners->push_back(std::move(scanner));
18+
return Status::OK();
19+
}
20+
21+
std::string ArgoScanLocalState::name_suffix() const {
22+
return fmt::format(" (id={}. nereids_id={}. argo table name= {})",
23+
std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()),
24+
_parent->cast<ArgoScanOperatorX>()._table_name);
25+
};
26+
27+
ArgoScanOperatorX::ArgoScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
28+
const DescriptorTbl& descs, int parallel_tasks)
29+
: ScanOperatorX<ArgoScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks),
30+
_tuple_id(tnode.argo_scan_node.tuple_id),
31+
_table_name(tnode.argo_scan_node.table_name) {
32+
_output_tuple_id = tnode.argo_scan_node.tuple_id;
33+
}
34+
35+
}; // namespace doris::pipeline
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Created on: 2025-08-28
3+
* Author: bingtao.yin@transwarp.io
4+
*/
5+
6+
#pragma once
7+
8+
#include "common/status.h"
9+
#include "operator.h"
10+
#include "pipeline/exec/scan_operator.h"
11+
12+
namespace doris {}
13+
14+
namespace doris::pipeline {
15+
16+
class ArgoScanOperatorX;
17+
class ArgoScanLocalState final : public ScanLocalState<ArgoScanLocalState> {
18+
public:
19+
using Parent = ArgoScanOperatorX;
20+
ENABLE_FACTORY_CREATOR(ArgoScanLocalState);
21+
ArgoScanLocalState(RuntimeState* state, OperatorXBase* parent)
22+
: ScanLocalState<ArgoScanLocalState>(state, parent) {}
23+
24+
Status _init_scanners(std::list<vectorized::VScannerSPtr> *scanners) override;
25+
26+
std::string name_suffix() const override;
27+
};
28+
29+
class ArgoScanOperatorX final : public ScanOperatorX<ArgoScanLocalState> {
30+
public:
31+
ArgoScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
32+
const DescriptorTbl& descs, int parallel_tasks);
33+
private:
34+
friend class ArgoScanLocalState;
35+
36+
TupleId _tuple_id;
37+
std::string _table_name;
38+
39+
TupleDescriptor* _tuple_desc = nullptr;
40+
};
41+
42+
} // namespace doris::pipeline

be/src/pipeline/exec/operator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "pipeline/exec/aggregation_source_operator.h"
2424
#include "pipeline/exec/analytic_sink_operator.h"
2525
#include "pipeline/exec/analytic_source_operator.h"
26+
#include "pipeline/exec/argo_scan_operator.h"
2627
#include "pipeline/exec/assert_num_rows_operator.h"
2728
#include "pipeline/exec/cache_sink_operator.h"
2829
#include "pipeline/exec/cache_source_operator.h"
@@ -701,6 +702,7 @@ DECLARE_OPERATOR(GroupCommitLocalState)
701702
DECLARE_OPERATOR(JDBCScanLocalState)
702703
DECLARE_OPERATOR(FileScanLocalState)
703704
DECLARE_OPERATOR(EsScanLocalState)
705+
DECLARE_OPERATOR(ArgoScanLocalState)
704706
DECLARE_OPERATOR(AnalyticLocalState)
705707
DECLARE_OPERATOR(SortLocalState)
706708
DECLARE_OPERATOR(SpillSortLocalState)

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <memory>
2424

2525
#include "pipeline/common/runtime_filter_consumer.h"
26+
#include "pipeline/exec/argo_scan_operator.h"
2627
#include "pipeline/exec/es_scan_operator.h"
2728
#include "pipeline/exec/file_scan_operator.h"
2829
#include "pipeline/exec/group_commit_scan_operator.h"
@@ -1322,6 +1323,8 @@ template class ScanOperatorX<EsScanLocalState>;
13221323
template class ScanLocalState<EsScanLocalState>;
13231324
template class ScanLocalState<MetaScanLocalState>;
13241325
template class ScanOperatorX<MetaScanLocalState>;
1326+
template class ScanLocalState<ArgoScanLocalState>;
1327+
template class ScanOperatorX<ArgoScanLocalState>;
13251328
template class ScanOperatorX<GroupCommitLocalState>;
13261329
template class ScanLocalState<GroupCommitLocalState>;
13271330

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "pipeline/exec/aggregation_source_operator.h"
4444
#include "pipeline/exec/analytic_sink_operator.h"
4545
#include "pipeline/exec/analytic_source_operator.h"
46+
#include "pipeline/exec/argo_scan_operator.h"
4647
#include "pipeline/exec/assert_num_rows_operator.h"
4748
#include "pipeline/exec/cache_sink_operator.h"
4849
#include "pipeline/exec/cache_source_operator.h"
@@ -1220,6 +1221,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
12201221
fe_with_old_version = !tnode.__isset.is_serial_operator;
12211222
break;
12221223
}
1224+
case TPlanNodeType::ARGO_SCAN_NODE:{
1225+
op.reset(new ArgoScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
1226+
RETURN_IF_ERROR(cur_pipe->add_operator(
1227+
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
1228+
fe_with_old_version = !tnode.__isset.is_serial_operator;
1229+
break;
1230+
}
12231231
case TPlanNodeType::EXCHANGE_NODE: {
12241232
int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0);
12251233
DCHECK_GT(num_senders, 0);

be/src/runtime/descriptors.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,13 @@ std::string JdbcTableDescriptor::debug_string() const {
305305
return fmt::to_string(buf);
306306
}
307307

308+
ArgoTableDescriptor::ArgoTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {}
309+
std::string ArgoTableDescriptor::debug_string() const {
310+
fmt::memory_buffer buf;
311+
fmt::format_to(buf, "ArgoTable({})", TableDescriptor::debug_string());
312+
return fmt::to_string(buf);
313+
}
314+
308315
TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc, bool own_slots)
309316
: _id(tdesc.id),
310317
_num_materialized_slots(0),
@@ -577,6 +584,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
577584
case TTableType::TRINO_CONNECTOR_TABLE:
578585
desc = pool->add(new TrinoConnectorTableDescriptor(tdesc));
579586
break;
587+
case TTableType::ARGO_TABLE:
588+
desc = pool->add(new ArgoTableDescriptor(tdesc));
589+
break;
580590
default:
581591
DCHECK(false) << "invalid table type: " << tdesc.tableType;
582592
}

be/src/runtime/descriptors.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ class JdbcTableDescriptor : public TableDescriptor {
333333
bool _connection_pool_keep_alive;
334334
};
335335

336+
class ArgoTableDescriptor : public TableDescriptor {
337+
public:
338+
ArgoTableDescriptor(const TTableDescriptor& tdesc);
339+
std::string debug_string() const override;
340+
};
341+
336342
class TupleDescriptor {
337343
public:
338344
TupleDescriptor(TupleDescriptor&&) = delete;

be/src/util/stack_util.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
#include "util/mem_info.h"
2828
#include "util/pretty_printer.h"
2929

30-
namespace google {
31-
namespace glog_internal_namespace_ {
32-
void DumpStackTraceToString(std::string* stacktrace);
33-
}
34-
} // namespace google
30+
// namespace google {
31+
// namespace glog_internal_namespace_ {
32+
// void DumpStackTraceToString(std::string* stacktrace);
33+
// }
34+
// } // namespace google
3535

3636
namespace doris {
3737

@@ -67,7 +67,7 @@ std::string get_stack_trace(int start_pointers_index, std::string dwarf_location
6767

6868
std::string get_stack_trace_by_glog() {
6969
std::string s;
70-
google::glog_internal_namespace_::DumpStackTraceToString(&s);
70+
// google::glog_internal_namespace_::DumpStackTraceToString(&s);
7171
return s;
7272
}
7373

0 commit comments

Comments
 (0)