Skip to content

Commit 34d7a0a

Browse files
author
bingtao.yin
committed
add argo catalog
1 parent 15f570d commit 34d7a0a

28 files changed

+793
-10
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class JdbcResource extends Resource {
7979
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
8080
public static final String JDBC_DB2 = "jdbc:db2";
8181
public static final String JDBC_GBASE = "jdbc:gbase";
82+
public static final String JDBC_ARGO = "jdbc:transwarp2";
8283

8384
public static final String MYSQL = "MYSQL";
8485
public static final String POSTGRESQL = "POSTGRESQL";
@@ -92,6 +93,7 @@ public class JdbcResource extends Resource {
9293
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
9394
public static final String DB2 = "DB2";
9495
public static final String GBASE = "GBASE";
96+
public static final String ARGO = "ARGO";
9597

9698
public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
9799
public static final String JDBC_URL = "jdbc_url";
@@ -342,6 +344,8 @@ public static String parseDbType(String url) throws DdlException {
342344
return DB2;
343345
} else if (url.startsWith(JDBC_GBASE)) {
344346
return GBASE;
347+
} else if (url.startsWith(JDBC_ARGO)) {
348+
return ARGO;
345349
}
346350
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
347351
}

fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ enum TableType {
387387
@Deprecated ICEBERG, @Deprecated HUDI, JDBC,
388388
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
389389
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
390-
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE;
390+
HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, ARGO_EXTERNAL_TABLE;
391391

392392
public String toEngineName() {
393393
switch (this) {
@@ -424,6 +424,8 @@ public String toEngineName() {
424424
case ICEBERG:
425425
case ICEBERG_EXTERNAL_TABLE:
426426
return "iceberg";
427+
case ARGO_EXTERNAL_TABLE:
428+
return "argo";
427429
default:
428430
return null;
429431
}
@@ -462,6 +464,7 @@ public String toMysqlType() {
462464
case PAIMON_EXTERNAL_TABLE:
463465
case MATERIALIZED_VIEW:
464466
case TRINO_CONNECTOR_EXTERNAL_TABLE:
467+
case ARGO_EXTERNAL_TABLE:
465468
return "BASE TABLE";
466469
default:
467470
return null;

fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.doris.catalog.Resource;
2929
import org.apache.doris.common.DdlException;
3030
import org.apache.doris.common.FeConstants;
31+
import org.apache.doris.datasource.argo.ArgoExternalCatalog;
3132
import org.apache.doris.datasource.es.EsExternalCatalog;
3233
import org.apache.doris.datasource.hive.HMSExternalCatalog;
3334
import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
@@ -147,6 +148,9 @@ private static CatalogIf createCatalog(long catalogId, String name, String resou
147148
}
148149
catalog = new TestExternalCatalog(catalogId, name, resource, props, comment);
149150
break;
151+
case "argo":
152+
catalog = new ArgoExternalCatalog(catalogId, name, resource, props, comment);
153+
break;
150154
default:
151155
throw new DdlException("Unknown catalog type: " + catalogType);
152156
}

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
4343
import org.apache.doris.common.util.Util;
4444
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
45+
import org.apache.doris.datasource.argo.ArgoExternalDatabase;
4546
import org.apache.doris.datasource.es.EsExternalDatabase;
4647
import org.apache.doris.datasource.hive.HMSExternalCatalog;
4748
import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -928,6 +929,8 @@ protected ExternalDatabase<? extends ExternalTable> buildDbForInit(String remote
928929
return new PaimonExternalDatabase(this, dbId, localDbName, remoteDbName);
929930
case TRINO_CONNECTOR:
930931
return new TrinoConnectorExternalDatabase(this, dbId, localDbName, remoteDbName);
932+
case ARGO:
933+
return new ArgoExternalDatabase(this, dbId, localDbName, remoteDbName);
931934
default:
932935
break;
933936
}

fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public enum Type {
4343
LAKESOUL,
4444
TEST,
4545
TRINO_CONNECTOR,
46+
ARGO,
4647
UNKNOWN;
4748
}
4849

fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public enum Type {
4444
TEST,
4545
INFO_SCHEMA_DB,
4646
TRINO_CONNECTOR,
47+
ARGO,
4748
UNKNOWN;
4849
}
4950

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package org.apache.doris.datasource.argo;
2+
3+
import org.apache.doris.common.DdlException;
4+
import org.apache.doris.datasource.CatalogProperty;
5+
import org.apache.doris.datasource.ExternalCatalog;
6+
import org.apache.doris.datasource.InitCatalogLog;
7+
import org.apache.doris.datasource.SessionContext;
8+
import org.apache.doris.datasource.argo.util.Auth;
9+
import org.apache.doris.datasource.jdbc.client.JdbcArgoClient;
10+
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
import java.io.File;
15+
import java.util.List;
16+
import java.util.Locale;
17+
import java.util.Map;
18+
19+
/**
20+
* Created by bingtao.yin@transwarp.io on 2025/8/26.
21+
*/
22+
public class ArgoExternalCatalog extends ExternalCatalog {
23+
24+
private static final Logger log = LogManager.getLogger(ArgoExternalCatalog.class);
25+
private static final String ARGO_DIR = System.getenv("DORIS_HOME") + "/argo";
26+
27+
public ArgoExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, String comment) {
28+
super(catalogId, name, InitCatalogLog.Type.ARGO, comment);
29+
30+
this.catalogProperty = new CatalogProperty(resource, props);
31+
}
32+
33+
@Override
34+
public void checkProperties() throws DdlException {
35+
super.checkProperties();
36+
final Map<String, String> properties = catalogProperty.getProperties();
37+
if (!properties.containsKey("jdbc.url")) {
38+
throw new DdlException("[jdbc.url] is required");
39+
}
40+
if (!properties.containsKey("driver.url")) {
41+
throw new DdlException("[driver.url] is required");
42+
}
43+
44+
String authType = catalogProperty.getOrDefault("auth.type", "plain").toLowerCase(Locale.ROOT);
45+
switch (authType) {
46+
case "kerberos":
47+
if (!properties.containsKey("auth.kerberos.principal")) {
48+
throw new DdlException("For kerberos authentication, [auth.kerberos.principal] is required");
49+
}
50+
if (!properties.containsKey("auth.kerberos.kuser")) {
51+
throw new DdlException("For kerberos authentication, [auth.kerberos.kuser] is required");
52+
}
53+
if (!properties.containsKey("auth.kerberos.keytab")) {
54+
throw new DdlException("For kerberos authentication, [auth.kerberos.keytab] is required");
55+
}
56+
break;
57+
default:
58+
break;
59+
}
60+
}
61+
62+
@Override
63+
public void setDefaultPropsIfMissing(boolean isReplay) {
64+
super.setDefaultPropsIfMissing(isReplay);
65+
}
66+
67+
@Override
68+
public void onClose() {
69+
if (metadataOps != null) {
70+
metadataOps.close();
71+
metadataOps = null;
72+
}
73+
}
74+
75+
@Override
76+
public List<String> listTableNames(SessionContext ctx, String dbName) {
77+
return getMetadataOps().listTableNames(dbName);
78+
}
79+
80+
@Override
81+
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
82+
return getMetadataOps().tableExist(dbName, tblName);
83+
}
84+
85+
@Override
86+
protected void initLocalObjectsImpl() {
87+
final Map<String, String> properties = catalogProperty.getProperties();
88+
JdbcClientConfig jdbcClientConfig = new JdbcClientConfig();
89+
jdbcClientConfig.setCatalog(name).setUser("").setPassword("")
90+
.setDriverClass("org.apache.hive.jdbc.HiveDriver")
91+
.setDriverUrl(properties.get("driver.url"));
92+
93+
String authType = catalogProperty.getOrDefault("auth.type", "plain").toLowerCase(Locale.ROOT);
94+
Auth auth = null;
95+
switch (authType) {
96+
case "plain":
97+
auth = new Auth.PlainAuth();
98+
break;
99+
case "ldap": {
100+
final String username = catalogProperty.getOrDefault("auth.plain.username", "");
101+
final String password = catalogProperty.getOrDefault("auth.plain.password", "");
102+
103+
Auth.LdapAuth ldapAuth = new Auth.LdapAuth();
104+
ldapAuth.setUsername(username);
105+
ldapAuth.setPassword(password);
106+
auth = ldapAuth;
107+
108+
jdbcClientConfig.setUser(username).setPassword(password);
109+
}
110+
break;
111+
case "kerberos": {
112+
Auth.KerberosAuth kerberosAuth = new Auth.KerberosAuth();
113+
kerberosAuth.setPrincipal(properties.get("auth.kerberos.principal"));
114+
kerberosAuth.setKuser(properties.get("auth.kerberos.kuser"));
115+
String keytab = properties.get("auth.kerberos.keytab");
116+
String keytabAbs = ARGO_DIR + "/" + keytab;
117+
if (!new File(keytabAbs).exists()) {
118+
throw new RuntimeException("keytab file does not exist: " + keytabAbs);
119+
}
120+
kerberosAuth.setKeytab(keytabAbs);
121+
122+
String krb5Conf = catalogProperty.getOrDefault("auth.kerberos.krb5conf", "krb5.conf");
123+
final String krb5ConfAbs = ARGO_DIR + "/" + krb5Conf;
124+
if (!new File(krb5ConfAbs).exists()) {
125+
throw new RuntimeException("krb5conf file does not exist: " + krb5ConfAbs);
126+
}
127+
kerberosAuth.setKrb5conf(krb5ConfAbs);
128+
if (System.getProperty("java.security.krb5.conf") != null) {
129+
log.warn("java.security.krb5.conf is already set to {}, it will be overridden to {}",
130+
System.getProperty("java.security.krb5.conf"), krb5ConfAbs);
131+
}
132+
// TODO: also set -Dsun.security.krb5.debug=true?
133+
System.setProperty("java.security.krb5.conf", krb5ConfAbs);
134+
135+
auth = kerberosAuth;
136+
}
137+
break;
138+
default:
139+
throw new RuntimeException("Unknown auth type: " + authType);
140+
}
141+
142+
jdbcClientConfig.setJdbcUrl(auth.getJdbcUrl(properties.get("jdbc.url")));
143+
metadataOps = new ArgoMetadataOps(new JdbcArgoClient(jdbcClientConfig));
144+
}
145+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.apache.doris.datasource.argo;
2+
3+
import org.apache.doris.datasource.ExternalCatalog;
4+
import org.apache.doris.datasource.ExternalDatabase;
5+
import org.apache.doris.datasource.InitDatabaseLog;
6+
import org.apache.logging.log4j.LogManager;
7+
import org.apache.logging.log4j.Logger;
8+
9+
/**
10+
* Created by bingtao.yin@transwarp.io on 2025/8/26.
11+
*/
12+
public class ArgoExternalDatabase extends ExternalDatabase<ArgoExternalTable> {
13+
14+
private static final Logger log = LogManager.getLogger(ArgoExternalDatabase.class);
15+
16+
/**
17+
* Create external database.
18+
*
19+
* @param extCatalog The catalog this database belongs to.
20+
* @param id Database id.
21+
* @param name Database name.
22+
* @param remoteName Remote database name.
23+
*/
24+
public ArgoExternalDatabase(ExternalCatalog extCatalog, long id, String name, String remoteName) {
25+
super(extCatalog, id, name, remoteName, InitDatabaseLog.Type.ARGO);
26+
}
27+
28+
@Override
29+
protected ArgoExternalTable buildTableInternal(String remoteTableName, String localTableName, long tblId, ExternalCatalog catalog, ExternalDatabase db) {
30+
return new ArgoExternalTable(tblId, localTableName, remoteTableName, (ArgoExternalCatalog) extCatalog, (ArgoExternalDatabase) db);
31+
}
32+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.apache.doris.datasource.argo;
2+
3+
import org.apache.doris.catalog.Column;
4+
import org.apache.doris.datasource.ExternalCatalog;
5+
import org.apache.doris.datasource.ExternalDatabase;
6+
import org.apache.doris.datasource.ExternalTable;
7+
import org.apache.doris.datasource.SchemaCacheValue;
8+
import org.apache.doris.datasource.operations.ExternalMetadataOps;
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
12+
import java.util.List;
13+
import java.util.Optional;
14+
15+
/**
16+
* Created by bingtao.yin@transwarp.io on 2025/8/26.
17+
*/
18+
public class ArgoExternalTable extends ExternalTable {
19+
20+
private static final Logger log = LogManager.getLogger(ArgoExternalTable.class);
21+
22+
public ArgoExternalTable() {
23+
}
24+
25+
public ArgoExternalTable(long id, String name, String remoteName, ArgoExternalCatalog catalog, ArgoExternalDatabase db) {
26+
super(id, name, remoteName, catalog, db, TableType.ARGO_EXTERNAL_TABLE);
27+
}
28+
29+
@Override
30+
public Optional<SchemaCacheValue> initSchema() {
31+
ArgoExternalCatalog catalog = (ArgoExternalCatalog) getCatalog();
32+
ArgoMetadataOps metadataOps = (ArgoMetadataOps) catalog.getMetadataOps();
33+
34+
List<Column> columns = metadataOps.describeTable(dbName, name);
35+
return Optional.of(new ArgoSchemaCacheValue(columns));
36+
}
37+
}

0 commit comments

Comments
 (0)