Skip to content

Commit c9c43f9

Browse files
author
bingtao.yin
committed
add argo catalog
1 parent 15f570d commit c9c43f9

40 files changed

+1016
-12
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Created on: 2025-08-28
3+
* Author: bingtao.yin@transwarp.io
4+
*/
5+
6+
#include "pipeline/exec/argo_scan_operator.h"
7+
8+
#include "vec/exec/scan/argo_scanner.h"
9+
10+
namespace doris::pipeline {
11+
12+
Status ArgoScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
13+
auto& p = _parent->cast<ArgoScanOperatorX>();
14+
std::unique_ptr<vectorized::ArgoScanner> scanner =
15+
vectorized::ArgoScanner::create_unique(state(), this, p._limit, _scanner_profile.get());
16+
RETURN_IF_ERROR(scanner->prepare(state(), _conjuncts));
17+
scanners->push_back(std::move(scanner));
18+
return Status::OK();
19+
}
20+
21+
std::string ArgoScanLocalState::name_suffix() const {
22+
return fmt::format(" (id={}. nereids_id={}. argo table name= {})",
23+
std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()),
24+
_parent->cast<ArgoScanOperatorX>()._table_name);
25+
};
26+
27+
ArgoScanOperatorX::ArgoScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
28+
const DescriptorTbl& descs, int parallel_tasks)
29+
: ScanOperatorX<ArgoScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks),
30+
_tuple_id(tnode.argo_scan_node.tuple_id),
31+
_table_name(tnode.argo_scan_node.table_name) {
32+
_output_tuple_id = tnode.argo_scan_node.tuple_id;
33+
}
34+
35+
}; // namespace doris::pipeline
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Created on: 2025-08-28
3+
* Author: bingtao.yin@transwarp.io
4+
*/
5+
6+
#pragma once
7+
8+
#include "common/status.h"
9+
#include "operator.h"
10+
#include "pipeline/exec/scan_operator.h"
11+
12+
namespace doris {}
13+
14+
namespace doris::pipeline {
15+
16+
class ArgoScanOperatorX;
17+
class ArgoScanLocalState final : public ScanLocalState<ArgoScanLocalState> {
18+
public:
19+
using Parent = ArgoScanOperatorX;
20+
ENABLE_FACTORY_CREATOR(ArgoScanLocalState);
21+
ArgoScanLocalState(RuntimeState* state, OperatorXBase* parent)
22+
: ScanLocalState<ArgoScanLocalState>(state, parent) {}
23+
24+
Status _init_scanners(std::list<vectorized::VScannerSPtr> *scanners) override;
25+
26+
std::string name_suffix() const override;
27+
};
28+
29+
class ArgoScanOperatorX final : public ScanOperatorX<ArgoScanLocalState> {
30+
public:
31+
ArgoScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
32+
const DescriptorTbl& descs, int parallel_tasks);
33+
private:
34+
friend class ArgoScanLocalState;
35+
36+
TupleId _tuple_id;
37+
std::string _table_name;
38+
39+
TupleDescriptor* _tuple_desc = nullptr;
40+
};
41+
42+
} // namespace doris::pipeline

be/src/pipeline/exec/operator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "pipeline/exec/aggregation_source_operator.h"
2424
#include "pipeline/exec/analytic_sink_operator.h"
2525
#include "pipeline/exec/analytic_source_operator.h"
26+
#include "pipeline/exec/argo_scan_operator.h"
2627
#include "pipeline/exec/assert_num_rows_operator.h"
2728
#include "pipeline/exec/cache_sink_operator.h"
2829
#include "pipeline/exec/cache_source_operator.h"
@@ -701,6 +702,7 @@ DECLARE_OPERATOR(GroupCommitLocalState)
701702
DECLARE_OPERATOR(JDBCScanLocalState)
702703
DECLARE_OPERATOR(FileScanLocalState)
703704
DECLARE_OPERATOR(EsScanLocalState)
705+
DECLARE_OPERATOR(ArgoScanLocalState)
704706
DECLARE_OPERATOR(AnalyticLocalState)
705707
DECLARE_OPERATOR(SortLocalState)
706708
DECLARE_OPERATOR(SpillSortLocalState)

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <memory>
2424

2525
#include "pipeline/common/runtime_filter_consumer.h"
26+
#include "pipeline/exec/argo_scan_operator.h"
2627
#include "pipeline/exec/es_scan_operator.h"
2728
#include "pipeline/exec/file_scan_operator.h"
2829
#include "pipeline/exec/group_commit_scan_operator.h"
@@ -1322,6 +1323,8 @@ template class ScanOperatorX<EsScanLocalState>;
13221323
template class ScanLocalState<EsScanLocalState>;
13231324
template class ScanLocalState<MetaScanLocalState>;
13241325
template class ScanOperatorX<MetaScanLocalState>;
1326+
template class ScanLocalState<ArgoScanLocalState>;
1327+
template class ScanOperatorX<ArgoScanLocalState>;
13251328
template class ScanOperatorX<GroupCommitLocalState>;
13261329
template class ScanLocalState<GroupCommitLocalState>;
13271330

be/src/pipeline/pipeline_fragment_context.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "pipeline/exec/aggregation_source_operator.h"
4444
#include "pipeline/exec/analytic_sink_operator.h"
4545
#include "pipeline/exec/analytic_source_operator.h"
46+
#include "pipeline/exec/argo_scan_operator.h"
4647
#include "pipeline/exec/assert_num_rows_operator.h"
4748
#include "pipeline/exec/cache_sink_operator.h"
4849
#include "pipeline/exec/cache_source_operator.h"
@@ -1220,6 +1221,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
12201221
fe_with_old_version = !tnode.__isset.is_serial_operator;
12211222
break;
12221223
}
1224+
case TPlanNodeType::ARGO_SCAN_NODE:{
1225+
op.reset(new ArgoScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
1226+
RETURN_IF_ERROR(cur_pipe->add_operator(
1227+
op, request.__isset.parallel_instances ? request.parallel_instances : 0));
1228+
fe_with_old_version = !tnode.__isset.is_serial_operator;
1229+
break;
1230+
}
12231231
case TPlanNodeType::EXCHANGE_NODE: {
12241232
int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0);
12251233
DCHECK_GT(num_senders, 0);

be/src/runtime/descriptors.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,13 @@ std::string JdbcTableDescriptor::debug_string() const {
305305
return fmt::to_string(buf);
306306
}
307307

308+
ArgoTableDescriptor::ArgoTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {}
309+
std::string ArgoTableDescriptor::debug_string() const {
310+
fmt::memory_buffer buf;
311+
fmt::format_to(buf, "ArgoTable({})", TableDescriptor::debug_string());
312+
return fmt::to_string(buf);
313+
}
314+
308315
TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc, bool own_slots)
309316
: _id(tdesc.id),
310317
_num_materialized_slots(0),
@@ -577,6 +584,9 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
577584
case TTableType::TRINO_CONNECTOR_TABLE:
578585
desc = pool->add(new TrinoConnectorTableDescriptor(tdesc));
579586
break;
587+
case TTableType::ARGO_TABLE:
588+
desc = pool->add(new ArgoTableDescriptor(tdesc));
589+
break;
580590
default:
581591
DCHECK(false) << "invalid table type: " << tdesc.tableType;
582592
}

be/src/runtime/descriptors.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,12 @@ class JdbcTableDescriptor : public TableDescriptor {
333333
bool _connection_pool_keep_alive;
334334
};
335335

336+
class ArgoTableDescriptor : public TableDescriptor {
337+
public:
338+
ArgoTableDescriptor(const TTableDescriptor& tdesc);
339+
std::string debug_string() const override;
340+
};
341+
336342
class TupleDescriptor {
337343
public:
338344
TupleDescriptor(TupleDescriptor&&) = delete;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Created on: 2025-08-28
3+
* Author: bingtao.yin@transwarp.io
4+
*/
5+
6+
#include "vec/exec/scan/argo_scanner.h"
7+
8+
#include "common/logging.h"
9+
#include "common/status.h"
10+
11+
namespace doris::vectorized {
12+
13+
ArgoScanner::ArgoScanner(RuntimeState* state, doris::pipeline::ArgoScanLocalState* local_state,
14+
int64_t limit, RuntimeProfile* profile)
15+
: VScanner(state, local_state, limit, profile) {}
16+
17+
Status ArgoScanner::open(RuntimeState* state) {
18+
VLOG_CRITICAL << "ArgoScanner::open";
19+
RETURN_IF_CANCELLED(state);
20+
RETURN_IF_ERROR(VScanner::open(state));
21+
return Status::OK();
22+
}
23+
24+
Status ArgoScanner::close(RuntimeState* state) {
25+
VLOG_CRITICAL << "ArgoScanner::close";
26+
RETURN_IF_ERROR(VScanner::close(state));
27+
return Status::OK();
28+
}
29+
30+
Status ArgoScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
31+
VLOG_CRITICAL << "ArgoScanner::prepare";
32+
RETURN_IF_ERROR(VScanner::prepare(state, conjuncts));
33+
// TODO: tuple descriptor
34+
return Status::OK();
35+
}
36+
37+
Status ArgoScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eos) {
38+
*eos = true;
39+
return Status::OK();
40+
}
41+
42+
} // namespace doris::vectorized
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Created on: 2025-08-28
3+
* Author: bingtao.yin@transwarp.io
4+
*/
5+
6+
#pragma once
7+
8+
#include "pipeline/exec/argo_scan_operator.h"
9+
#include "vec/exec/scan/vscanner.h"
10+
11+
namespace doris::vectorized {
12+
13+
class ArgoScanner : public VScanner {
14+
ENABLE_FACTORY_CREATOR(ArgoScanner);
15+
16+
public:
17+
ArgoScanner(RuntimeState* state, doris::pipeline::ArgoScanLocalState* parent, int64_t limit,
18+
RuntimeProfile* profile);
19+
Status open(RuntimeState* state) override;
20+
Status close(RuntimeState* state) override;
21+
22+
Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts) override;
23+
24+
protected:
25+
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
26+
};
27+
28+
} // namespace doris::vectorized

fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class JdbcResource extends Resource {
7979
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
8080
public static final String JDBC_DB2 = "jdbc:db2";
8181
public static final String JDBC_GBASE = "jdbc:gbase";
82+
public static final String JDBC_ARGO = "jdbc:transwarp2";
8283

8384
public static final String MYSQL = "MYSQL";
8485
public static final String POSTGRESQL = "POSTGRESQL";
@@ -92,6 +93,7 @@ public class JdbcResource extends Resource {
9293
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
9394
public static final String DB2 = "DB2";
9495
public static final String GBASE = "GBASE";
96+
public static final String ARGO = "ARGO";
9597

9698
public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
9799
public static final String JDBC_URL = "jdbc_url";
@@ -342,6 +344,8 @@ public static String parseDbType(String url) throws DdlException {
342344
return DB2;
343345
} else if (url.startsWith(JDBC_GBASE)) {
344346
return GBASE;
347+
} else if (url.startsWith(JDBC_ARGO)) {
348+
return ARGO;
345349
}
346350
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
347351
}

0 commit comments

Comments
 (0)