Skip to content

Commit 2b3ffd9

Browse files
authored
feat: support spark conf path for sdk (#2180)
1 parent 239ada3 commit 2b3ffd9

File tree

7 files changed

+26
-13
lines changed

7 files changed

+26
-13
lines changed

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SdkOption.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@
1818

1919
import lombok.Data;
2020

21-
2221
@Data
2322
public class SdkOption {
24-
// options for cluster mode
23+
// options for cluster mode
2524
private String zkCluster;
2625
private String zkPath;
27-
// options for standalone mode
26+
private String sparkConfPath = "";
27+
28+
// options for standalone mode
2829
private String host;
2930
private long port;
3031

3132
private long sessionTimeout = 10000;
3233
private Boolean enableDebug = false;
3334
private long requestTimeout = 60000;
3435
private boolean isClusterMode = true;
36+
3537
}

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/impl/SqlClusterExecutor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public SqlClusterExecutor(SdkOption option, String libraryPath) throws SqlExcept
6969
sqlOpt.setZk_path(option.getZkPath());
7070
sqlOpt.setEnable_debug(option.getEnableDebug());
7171
sqlOpt.setRequest_timeout(option.getRequestTimeout());
72+
sqlOpt.setSpark_conf_path(option.getSparkConfPath());
7273
this.sqlRouter = sql_router_sdk.NewClusterSQLRouter(sqlOpt);
7374
sqlOpt.delete();
7475
} else {

python/openmldb/sdk/sdk.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@
3232

3333

3434
class OpenMLDBClusterSdkOptions(object):
35-
def __init__(self, zk_cluster, zk_path, session_timeout=3000):
35+
def __init__(self, zk_cluster, zk_path, session_timeout=3000, spark_conf_path=""):
3636
self.zk_cluster = zk_cluster
3737
self.zk_path = zk_path
3838
self.zk_session_timeout = session_timeout
39+
self.spark_conf_path = spark_conf_path
3940

4041

4142
class OpenMLDBStandaloneSdkOptions(object):

src/cmd/sql_cmd.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
DEFINE_bool(interactive, true, "Set the interactive");
3636
DEFINE_string(database, "", "Set database");
3737
DECLARE_string(cmd);
38+
DEFINE_string(spark_conf, "", "The config file of Spark job");
3839

3940
// cluster mode
4041
DECLARE_string(zk_cluster);
@@ -209,6 +210,7 @@ bool InitClusterSDK() {
209210
copt.zk_session_timeout = FLAGS_zk_session_timeout;
210211
copt.zk_log_level = FLAGS_zk_log_level;
211212
copt.zk_log_file = FLAGS_zk_log_file;
213+
212214
cs = new ::openmldb::sdk::ClusterSDK(copt);
213215
if (!cs->Init()) {
214216
std::cout << "ERROR: Failed to connect to db" << std::endl;
@@ -220,6 +222,9 @@ bool InitClusterSDK() {
220222
return false;
221223
}
222224
sr->SetInteractive(FLAGS_interactive);
225+
226+
sr->GetSqlRouterOptions().spark_conf_path = FLAGS_spark_conf;
227+
223228
return true;
224229
}
225230

src/sdk/sql_cluster_router.cc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050

5151
DECLARE_int32(request_timeout_ms);
5252
DECLARE_string(bucket_size);
53-
DEFINE_string(spark_conf, "", "The config file of Spark job");
5453
DECLARE_uint32(replica_num);
5554

5655
namespace openmldb {
@@ -2442,7 +2441,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(const std
24422441
} else {
24432442
::openmldb::taskmanager::JobInfo job_info;
24442443
std::map<std::string, std::string> config;
2445-
ReadSparkConfFromFile(FLAGS_spark_conf, &config);
2444+
ReadSparkConfFromFile(options_.spark_conf_path, &config);
24462445
auto base_status = ExportOfflineData(sql, config, db, is_sync_job, offline_job_timeout, &job_info);
24472446
if (base_status.OK()) {
24482447
*status = {};
@@ -2473,7 +2472,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(const std
24732472
// Handle in cluster mode
24742473
::openmldb::taskmanager::JobInfo job_info;
24752474
std::map<std::string, std::string> config;
2476-
ReadSparkConfFromFile(FLAGS_spark_conf, &config);
2475+
ReadSparkConfFromFile(options_.spark_conf_path, &config);
24772476

24782477
::openmldb::base::Status base_status;
24792478
if (is_online_mode) {
@@ -2518,7 +2517,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteOfflineQuery(
25182517
bool is_sync_job, int job_timeout,
25192518
::hybridse::sdk::Status* status) {
25202519
std::map<std::string, std::string> config;
2521-
ReadSparkConfFromFile(FLAGS_spark_conf, &config);
2520+
ReadSparkConfFromFile(options_.spark_conf_path, &config);
25222521

25232522
if (is_sync_job) {
25242523
// Run offline sql and wait to get output
@@ -3721,15 +3720,15 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteShowTableStat
37213720
return ResultSetSQL::MakeResultSet(GetTableStatusSchema(), data, status);
37223721
}
37233722

3724-
void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file, std::map<std::string, std::string>* config) {
3725-
if (!conf_file.empty()) {
3723+
void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::map<std::string, std::string>* config) {
3724+
if (!conf_file_path.empty()) {
37263725
boost::property_tree::ptree pt;
37273726

37283727
try {
3729-
boost::property_tree::ini_parser::read_ini(FLAGS_spark_conf, pt);
3730-
LOG(INFO) << "Load Spark conf file: " << conf_file;
3728+
boost::property_tree::ini_parser::read_ini(conf_file_path, pt);
3729+
LOG(INFO) << "Load Spark conf file: " << conf_file_path;
37313730
} catch (...) {
3732-
LOG(WARNING) << "Fail to load Spark conf file: " << conf_file;
3731+
LOG(WARNING) << "Fail to load Spark conf file: " << conf_file_path;
37333732
return;
37343733
}
37353734

src/sdk/sql_cluster_router.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,10 @@ class SQLClusterRouter : public SQLRouter {
298298

299299
void ReadSparkConfFromFile(std::string conf_file, std::map<std::string, std::string>* config);
300300

301+
SQLRouterOptions GetSqlRouterOptions() {
302+
return options_;
303+
}
304+
301305
private:
302306
bool IsSyncJob();
303307
// get job timeout from the session variables, we will use the timeout when sending requests to the taskmanager

src/sdk/sql_router.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct SQLRouterOptions : BasicRouterOptions {
4646
std::string zk_cluster;
4747
std::string zk_path;
4848
uint32_t zk_session_timeout = 2000;
49+
std::string spark_conf_path;
4950
};
5051

5152
struct StandaloneOptions : BasicRouterOptions {

0 commit comments

Comments
 (0)