Skip to content

Commit 4990d23

Browse files
committed
Java:MultiDataSource 为 APIJSON 和 SQLAuto 新增支持 Cassandra, Snowflake, Databricks
1 parent 438406f commit 4990d23

File tree

6 files changed

+161
-62
lines changed

6 files changed

+161
-62
lines changed

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,21 @@
116116
<artifactId>influxdb-java</artifactId>
117117
<version>2.23</version>
118118
</dependency>
119+
<dependency>
120+
<groupId>net.snowflake</groupId>
121+
<artifactId>snowflake-jdbc</artifactId>
122+
<version>3.0.1</version>
123+
</dependency>
124+
<dependency>
125+
<groupId>com.datastax.oss</groupId>
126+
<artifactId>java-driver-core</artifactId>
127+
<version>4.15.0</version>
128+
</dependency>
129+
<dependency>
130+
<groupId>com.databricks</groupId>
131+
<artifactId>databricks-jdbc</artifactId>
132+
<version>2.6.25-1</version>
133+
</dependency>
119134
<!-- Oracle, SQLServer 等其它数据库的 JDBC 驱动,可以在这里加上 Maven 依赖或 libs 目录放 Jar 包并依赖 -->
120135
<!-- 数据库 JDBC 驱动 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -->
121136

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/boot/DemoApplication.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public static void main(String[] args) throws Exception {
100100
// SpringBoot 2.x 自定义端口方式
101101
@Override
102102
public void customize(ConfigurableServletWebServerFactory server) {
103-
server.setPort(8080);
103+
server.setPort(9090);
104104
}
105105

106106
// 支持 APIAuto 中 JavaScript 代码跨域请求
@@ -171,6 +171,26 @@ public void addCorsMappings(CorsRegistry registry) {
171171
Log.e(TAG, "加载 Dameng 驱动失败,请检查 pom.xml 中 com.dameng 版本是否存在以及可用 !!!");
172172
}
173173

174+
try { //加载驱动程序
175+
Log.d(TAG, "尝试加载 Snowflake 驱动 <<<<<<<<<<<<<<<<<<<<< ");
176+
Class.forName("net.snowflake.client.jdbc.SnowflakeDriver");
177+
Log.d(TAG, "成功加载 Snowflake 驱动!>>>>>>>>>>>>>>>>>>>>> ");
178+
}
179+
catch (ClassNotFoundException e) {
180+
e.printStackTrace();
181+
Log.e(TAG, "加载 Snowflake 驱动失败,请检查 pom.xml 中 net.snowflake 版本是否存在以及可用 !!!");
182+
}
183+
184+
try { //加载驱动程序
185+
Log.d(TAG, "尝试加载 Databricks 驱动 <<<<<<<<<<<<<<<<<<<<< ");
186+
Class.forName("com.databricks.client.jdbc.Driver");
187+
Log.d(TAG, "成功加载 Databricks 驱动!>>>>>>>>>>>>>>>>>>>>> ");
188+
}
189+
catch (ClassNotFoundException e) {
190+
e.printStackTrace();
191+
Log.e(TAG, "加载 Databricks 驱动失败,请检查 pom.xml 中 com.databricks 版本是否存在以及可用 !!!");
192+
}
193+
174194
// try { //加载驱动程序
175195
// Log.d(TAG, "尝试加载 TDengine 驱动 <<<<<<<<<<<<<<<<<<<<< ");
176196
// Class.forName("com.taosdata.jdbc.TSDBDriver");

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/boot/DemoController.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1373,12 +1373,16 @@ public String execute(@RequestBody String request, HttpSession session) {
13731373
JSONObject req = JSON.parseObject(request);
13741374
String sql = req == null ? null : req.getString("sql");
13751375
if (StringUtil.isEmpty(sql)) {
1376-
throw new IllegalArgumentException("SQL 不能为空!");
1376+
throw new IllegalArgumentException("sql 不能为空!");
13771377
}
13781378

13791379
String database = req.getString("database");
1380-
String schema = req.getString("schema");
13811380
String uri = req.getString("uri");
1381+
if (StringUtil.isEmpty(database, true) && StringUtil.isEmpty(uri, true)) {
1382+
throw new NullPointerException("database 和 uri 不能同时为空!至少其中一个要传有效值!");
1383+
}
1384+
String schema = req.getString("schema");
1385+
13821386
String account = req.getString("account");
13831387
String password = req.getString("password");
13841388
JSONArray arg = req.getJSONArray("args");
@@ -1392,7 +1396,7 @@ public String execute(@RequestBody String request, HttpSession session) {
13921396
prefix = prefix.substring(mid + 1);
13931397
}
13941398

1395-
if (DemoSQLConfig.DATABASE_INFLUXDB.equalsIgnoreCase(prefix)) {
1399+
if (DemoSQLConfig.DATABASE_INFLUXDB.equalsIgnoreCase(prefix) || DemoSQLExecutor.DATABASE_CASSANDRA.equalsIgnoreCase(prefix)) {
13961400
database = prefix.toUpperCase();
13971401

13981402
int end = uri.lastIndexOf("/");
@@ -1403,10 +1407,16 @@ public String execute(@RequestBody String request, HttpSession session) {
14031407
uri = uri.substring(0, end);
14041408
}
14051409

1406-
uri = "http" + uri.substring(start);
1410+
// if (DemoSQLConfig.DATABASE_INFLUXDB.equalsIgnoreCase(prefix)) {
1411+
uri = "http" + uri.substring(start);
1412+
// account = "root";
1413+
// password = "apijson@123";
1414+
// }
14071415

1408-
// account = "root";
1409-
// password = "apijson@123";
1416+
if (DemoSQLExecutor.DATABASE_CASSANDRA.equalsIgnoreCase(prefix)) { // unknown protocol: jdbc
1417+
account = "test"; // "peter";
1418+
password = "test"; // null;
1419+
}
14101420
} else if (DemoSQLExecutor.DATABASE_NEBULA.equalsIgnoreCase(prefix)) {
14111421
database = prefix.toUpperCase();
14121422
}
@@ -1440,7 +1450,7 @@ public String execute(@RequestBody String request, HttpSession session) {
14401450
long rsDuration = 0;
14411451
long executeStartTime = System.currentTimeMillis();
14421452

1443-
if (DemoSQLConfig.DATABASE_INFLUXDB.equals(database)) {
1453+
if (DemoSQLConfig.DATABASE_INFLUXDB.equals(database) || DemoSQLExecutor.DATABASE_CASSANDRA.equals(database)) {
14441454
JSONObject result = executor.execute(config, false);
14451455
executeDuration = System.currentTimeMillis() - executeStartTime;
14461456

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoFunctionParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import javax.servlet.http.HttpSession;
2424

25+
import apijson.orm.script.JavaScriptExecutor;
2526
import com.alibaba.fastjson.JSONArray;
2627
import com.alibaba.fastjson.JSONObject;
2728

@@ -42,6 +43,10 @@
4243
public class DemoFunctionParser extends APIJSONFunctionParser {
4344
public static final String TAG = "DemoFunctionParser";
4445

46+
static {
47+
SCRIPT_EXECUTOR_MAP.put("js", new JavaScriptExecutor());
48+
}
49+
4550
public DemoFunctionParser() {
4651
this(null, null, 0, null, null);
4752
}

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public String getDBAccount() {
260260
return "root";
261261
}
262262

263-
return "";
263+
return null;
264264
}
265265

266266
private String dbPassword;
@@ -303,7 +303,7 @@ public String getDBPassword() {
303303
return "apijson@123"; //TODO 改成你自己的
304304
}
305305

306-
return "";
306+
return null;
307307
}
308308

309309
private String sql;

APIJSON-Java-Server/APIJSONBoot-MultiDataSource/src/main/java/apijson/demo/DemoSQLExecutor.java

Lines changed: 101 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,17 @@
1717
import apijson.*;
1818
import com.alibaba.druid.pool.DruidDataSource;
1919
import com.alibaba.fastjson.JSONObject;
20+
import com.datastax.oss.driver.api.core.CqlSession;
21+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
22+
import com.datastax.oss.driver.api.core.cql.ResultSet;
23+
import com.datastax.oss.driver.api.core.cql.Row;
2024
import com.vesoft.nebula.jdbc.impl.NebulaDriver;
2125
import com.zaxxer.hikari.HikariDataSource;
2226

2327
import java.io.Serializable;
28+
import java.net.URI;
29+
import java.net.URL;
30+
import java.nio.file.Paths;
2431
import java.sql.Connection;
2532
import java.sql.SQLException;
2633
import java.util.Collection;
@@ -37,7 +44,6 @@
3744
import org.influxdb.BatchOptions;
3845
import org.influxdb.InfluxDB;
3946
import org.influxdb.InfluxDBFactory;
40-
import org.influxdb.dto.Point;
4147
import org.influxdb.dto.Query;
4248
import org.influxdb.dto.QueryResult;
4349
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
@@ -94,7 +100,7 @@ public synchronized void putCache(String sql, List<JSONObject> list, SQLConfig c
94100
super.putCache(sql, list, config);
95101

96102
String table = config != null && config.isMain() ? config.getTable() : null;
97-
if (table != null && DemoSQLConfig.CONFIG_TABLE_LIST.contains(table) == false) {
103+
if (table != null && ! DemoSQLConfig.CONFIG_TABLE_LIST.contains(table)) {
98104
try {
99105
if (config.isExplain() || RequestMethod.isHeadMethod(config.getMethod(), true)) {
100106
REDIS_TEMPLATE.opsForValue().set(sql, JSON.toJSONString(list), 10 * 60, TimeUnit.SECONDS);
@@ -124,6 +130,8 @@ public synchronized void removeCache(String sql, SQLConfig config) {
124130
// Redis 缓存 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
125131

126132
public static final String DATABASE_NEBULA = "NEBULA";
133+
public static final String DATABASE_SNOWFLAKE = "SNOWFLAKE";
134+
public static final String DATABASE_CASSANDRA = "CASSANDRA";
127135

128136
// 适配连接池,如果这里能拿到连接池的有效 Connection,则 SQLConfig 不需要配置 dbVersion, dbUri, dbAccount, dbPassword
129137
@Override
@@ -196,82 +204,123 @@ public Connection getConnection(SQLConfig config) throws Exception {
196204

197205
@Override
198206
public JSONObject execute(@NotNull SQLConfig config, boolean unknownType) throws Exception {
199-
if (DemoSQLConfig.DATABASE_INFLUXDB.equals(config.getDatabase())) {
200-
InfluxDB influxDB = InfluxDBFactory.connect(config.getDBUri(), config.getDBAccount(), config.getDBPassword());
201-
influxDB.setDatabase(config.getSchema());
207+
if (DATABASE_CASSANDRA.equals(config.getDatabase()) || DemoSQLConfig.DATABASE_INFLUXDB.equals(config.getDatabase())) {
202208

203209
String sql = config.getSQL(config.isPrepared());
204210

205211
RequestMethod method = config.getMethod();
206-
boolean isWrite = ! RequestMethod.isQueryMethod(method);
207-
if (method == null && ! isWrite) {
212+
boolean isWrite = !RequestMethod.isQueryMethod(method);
213+
if (method == null && !isWrite) {
208214
String trimmedSQL = sql == null ? null : sql.trim();
209215
String sqlPrefix = trimmedSQL == null || trimmedSQL.length() < 7 ? "" : trimmedSQL.substring(0, 7).toUpperCase();
210216
isWrite = sqlPrefix.startsWith("INSERT ") || sqlPrefix.startsWith("UPDATE ") || sqlPrefix.startsWith("DELETE ");
211217
}
212218

213-
if (isWrite) {
214-
influxDB.enableBatch(
215-
BatchOptions.DEFAULTS
216-
.threadFactory(runnable -> {
217-
Thread thread = new Thread(runnable);
218-
thread.setDaemon(true);
219-
return thread;
220-
})
221-
);
222219

223-
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
220+
if (DATABASE_CASSANDRA.equals(config.getDatabase())) {
221+
CqlSession session = CqlSession.builder()
222+
// .withCloudSecureConnectBundle(Paths.get("/path/to/secure-connect-database_name.zip"))
223+
.withCloudSecureConnectBundle(new URL(config.getDBUri()))
224+
.withAuthCredentials(config.getDBAccount(), config.getDBPassword())
225+
.withKeyspace(config.getSchema())
226+
.build();
227+
228+
// if (config.isPrepared()) {
229+
// PreparedStatement stt = session.prepare(sql);
230+
//
231+
// List<Object> pl = config.getPreparedValueList();
232+
// if (pl != null) {
233+
// for (Object o : pl) {
234+
// stt.bind(pl.toArray());
235+
// }
236+
// }
237+
// sql = stt.getQuery();
238+
// }
239+
240+
ResultSet rs = session.execute(sql);
241+
242+
List<Row> list = rs.all();
243+
if (list == null || list.isEmpty()) {
244+
return new JSONObject(true);
245+
}
224246

225-
influxDB.write(sql);
247+
JSONObject result = JSON.parseObject(list.get(0));
248+
if (list.size() > 1) {
249+
result.put(KEY_RAW_LIST, list);
250+
}
226251

227-
JSONObject result = DemoParser.newSuccessResult();
252+
return result;
253+
}
228254

229-
if (method == RequestMethod.POST) {
230-
List<List<Object>> values = config.getValues();
231-
result.put(JSONResponse.KEY_COUNT, values == null ? 0 : values.size());
232-
} else {
233-
String idKey = config.getIdKey();
234-
Object id = config.getId();
235-
Object idIn = config.getIdIn();
236-
if (id != null) {
237-
result.put(idKey, id);
238-
}
239-
if (idIn != null) {
240-
result.put(idKey + "[]", idIn);
241-
}
242255

243-
if (method == RequestMethod.PUT) {
244-
Map<String, Object> content = config.getContent();
245-
result.put(JSONResponse.KEY_COUNT, content == null ? 0 : content.size());
256+
if (DemoSQLConfig.DATABASE_INFLUXDB.equals(config.getDatabase())) {
257+
InfluxDB influxDB = InfluxDBFactory.connect(config.getDBUri(), config.getDBAccount(), config.getDBPassword());
258+
influxDB.setDatabase(config.getSchema());
259+
260+
261+
if (isWrite) {
262+
influxDB.enableBatch(
263+
BatchOptions.DEFAULTS
264+
.threadFactory(runnable -> {
265+
Thread thread = new Thread(runnable);
266+
thread.setDaemon(true);
267+
return thread;
268+
})
269+
);
270+
271+
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
272+
273+
influxDB.write(sql);
274+
275+
JSONObject result = DemoParser.newSuccessResult();
276+
277+
if (method == RequestMethod.POST) {
278+
List<List<Object>> values = config.getValues();
279+
result.put(JSONResponse.KEY_COUNT, values == null ? 0 : values.size());
246280
} else {
247-
result.put(JSONResponse.KEY_COUNT, id == null && idIn instanceof Collection ? ((Collection<?>) idIn).size() : 1); // FIXME 直接 SQLAuto 传 Flux/InfluxQL INSERT 如何取数量?
281+
String idKey = config.getIdKey();
282+
Object id = config.getId();
283+
Object idIn = config.getIdIn();
284+
if (id != null) {
285+
result.put(idKey, id);
286+
}
287+
if (idIn != null) {
288+
result.put(idKey + "[]", idIn);
289+
}
290+
291+
if (method == RequestMethod.PUT) {
292+
Map<String, Object> content = config.getContent();
293+
result.put(JSONResponse.KEY_COUNT, content == null ? 0 : content.size());
294+
} else {
295+
result.put(JSONResponse.KEY_COUNT, id == null && idIn instanceof Collection ? ((Collection<?>) idIn).size() : 1); // FIXME 直接 SQLAuto 传 Flux/InfluxQL INSERT 如何取数量?
296+
}
248297
}
298+
299+
return result;
249300
}
250301

251-
return result;
252-
}
302+
QueryResult qr = influxDB.query(new Query(sql));
253303

254-
QueryResult qr = influxDB.query(new Query(sql));
304+
String err = qr == null ? null : qr.getError();
305+
if (StringUtil.isNotEmpty(qr, true)) {
306+
throw new SQLException(err);
307+
}
255308

256-
String err = qr == null ? null : qr.getError();
257-
if (StringUtil.isNotEmpty(qr, true)) {
258-
throw new SQLException(err);
259-
}
309+
List<QueryResult.Result> list = qr == null ? null : qr.getResults();
310+
if (list == null || list.isEmpty()) {
311+
return new JSONObject(true);
312+
}
260313

261-
List<QueryResult.Result> list = qr == null ? null : qr.getResults();
262-
if (list == null || list.isEmpty()) {
263-
return new JSONObject(true);
264-
}
314+
JSONObject result = JSON.parseObject(list.get(0));
315+
if (list.size() > 1) {
316+
result.put(KEY_RAW_LIST, list);
317+
}
265318

266-
JSONObject result = JSON.parseObject(list.get(0));
267-
if (list.size() > 1) {
268-
result.put(KEY_RAW_LIST, list);
319+
return result;
269320
}
270321

271-
return result;
272322
}
273323

274-
275324
return super.execute(config, unknownType);
276325
}
277326

0 commit comments

Comments
 (0)