Skip to content

Commit 8e8f6cd

Browse files
feat: support apiserver query, dbapi close and fix (#2091)
apiserver query, POST, should set the execute mode in request body. e.g. `curl -X POST http://127.0.0.1:9080/dbs/airflow_example -d'{"sql":"select 1", "mode":"offsync"}'` apiserver get won't return ResultSet now, todo.
1 parent b44617e commit 8e8f6cd

File tree

13 files changed

+426
-306
lines changed

13 files changed

+426
-306
lines changed

python/openmldb/dbapi/dbapi.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ def commit(self):
575575
pass
576576

577577
def close(self):
578-
raise NotSupportedError("Unsupported in OpenMLDB")
578+
self._sdk = None
579579

580580
def cursor(self):
581581
return Cursor(self._db, self)

src/apiserver/api_server_impl.cc

Lines changed: 115 additions & 57 deletions
Large diffs are not rendered by default.

src/apiserver/api_server_impl.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@ class APIServerImpl : public APIServer {
5151
google::protobuf::Closure* done) override;
5252
static std::string InnerTypeTransform(const std::string& s);
5353

54-
void Refresh(google::protobuf::RpcController* cntl_base, const HttpRequest*, HttpResponse*,
55-
google::protobuf::Closure* done) override;
54+
void Refresh();
5655

5756
private:
57+
void RegisterQuery();
5858
void RegisterPut();
5959
void RegisterExecSP();
6060
void RegisterExecDeployment();
@@ -63,8 +63,8 @@ class APIServerImpl : public APIServer {
6363
void RegisterGetDB();
6464
void RegisterGetTable();
6565

66-
void ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param,
67-
const butil::IOBuf& req_body, JsonWriter& writer); // NOLINT
66+
void ExecuteProcedure(bool has_common_col, const InterfaceProvider::Params& param, const butil::IOBuf& req_body,
67+
JsonWriter& writer); // NOLINT
6868

6969
static bool Json2SQLRequestRow(const butil::rapidjson::Value& non_common_cols_v,
7070
const butil::rapidjson::Value& common_cols_v,
@@ -80,17 +80,17 @@ class APIServerImpl : public APIServer {
8080
::openmldb::sdk::DBSDK* cluster_sdk_ = nullptr;
8181
};
8282

83-
struct PutResp {
84-
PutResp() = default;
85-
int code = 0;
86-
std::string msg = "ok";
83+
struct QueryReq {
84+
std::string mode;
85+
std::string sql;
8786
};
8887

8988
template <typename Archiver>
90-
Archiver& operator&(Archiver& ar, PutResp& s) { // NOLINT
89+
Archiver& operator&(Archiver& ar, QueryReq& s) { // NOLINT
9190
ar.StartObject();
92-
ar.Member("code") & s.code;
93-
ar.Member("msg") & s.msg;
91+
// mode is not optional
92+
ar.Member("mode") & s.mode;
93+
ar.Member("sql") & s.sql;
9494
return ar.EndObject();
9595
}
9696

src/apiserver/api_server_test.cc

Lines changed: 59 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include <memory>
18+
#include <random>
19+
1720
#include "apiserver/api_server_impl.h"
1821
#include "brpc/channel.h"
19-
#include "memory"
2022
#include "brpc/restful.h"
2123
#include "brpc/server.h"
2224
#include "butil/logging.h"
@@ -25,7 +27,6 @@
2527
#include "json2pb/rapidjson.h"
2628
#include "sdk/mini_cluster.h"
2729

28-
2930
namespace openmldb::apiserver {
3031

3132
class APIServerTestEnv : public testing::Environment {
@@ -148,7 +149,7 @@ TEST_F(APIServerTest, invalidPut) {
148149
const auto env = APIServerTestEnv::Instance();
149150
brpc::Controller cntl;
150151
cntl.http_request().set_method(brpc::HTTP_METHOD_PUT);
151-
PutResp resp;
152+
GeneralResp resp;
152153

153154
// Empty body
154155
// NOTE: host:port is defined in SetUp, so the host:port here won't work. Only the path works.
@@ -234,7 +235,7 @@ TEST_F(APIServerTest, validPut) {
234235
"\", 111, 1.4, \"2021-04-27\", 1620471840256, true, \"more str\", null]]}");
235236
env->http_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
236237
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
237-
PutResp resp;
238+
GeneralResp resp;
238239
JsonReader reader(cntl.response_attachment().to_string().c_str());
239240
reader >> resp;
240241
ASSERT_EQ(0, resp.code) << resp.msg;
@@ -286,7 +287,7 @@ TEST_F(APIServerTest, putCase1) {
286287
env->http_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
287288
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
288289
LOG(INFO) << cntl.response_attachment().to_string();
289-
PutResp resp;
290+
GeneralResp resp;
290291
JsonReader reader(cntl.response_attachment().to_string().c_str());
291292
reader >> resp;
292293
ASSERT_EQ(0, resp.code) << resp.msg;
@@ -304,7 +305,7 @@ TEST_F(APIServerTest, putCase1) {
304305
env->http_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
305306
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
306307
LOG(INFO) << cntl.response_attachment().to_string();
307-
PutResp resp;
308+
GeneralResp resp;
308309
JsonReader reader(cntl.response_attachment().to_string().c_str());
309310
reader >> resp;
310311
ASSERT_EQ(0, resp.code) << resp.msg;
@@ -323,7 +324,7 @@ TEST_F(APIServerTest, putCase1) {
323324
env->http_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
324325
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
325326
LOG(INFO) << cntl.response_attachment().to_string();
326-
PutResp resp;
327+
GeneralResp resp;
327328
JsonReader reader(cntl.response_attachment().to_string().c_str());
328329
reader >> resp;
329330
ASSERT_EQ(-1, resp.code);
@@ -341,7 +342,7 @@ TEST_F(APIServerTest, putCase1) {
341342
env->http_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
342343
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
343344
LOG(INFO) << cntl.response_attachment().to_string();
344-
PutResp resp;
345+
GeneralResp resp;
345346
JsonReader reader(cntl.response_attachment().to_string().c_str());
346347
reader >> resp;
347348
ASSERT_EQ(0, resp.code) << resp.msg;
@@ -360,7 +361,7 @@ TEST_F(APIServerTest, putCase1) {
360361
env->http_channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
361362
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
362363
LOG(INFO) << cntl.response_attachment().to_string();
363-
PutResp resp;
364+
GeneralResp resp;
364365
JsonReader reader(cntl.response_attachment().to_string().c_str());
365366
reader >> resp;
366367
ASSERT_EQ(0, resp.code) << resp.msg;
@@ -509,10 +510,9 @@ TEST_F(APIServerTest, no_common) {
509510
std::string sql =
510511
"SELECT c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM trans WINDOW w1 AS"
511512
" (PARTITION BY trans.c1 ORDER BY trans.c7 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW);";
512-
std::string sp_ddl =
513-
"create procedure " + sp_name +
514-
" (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, c8 date" + ")" +
515-
" begin " + sql + " end;";
513+
std::string sp_ddl = "create procedure " + sp_name +
514+
" (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, c8 date" + ")" +
515+
" begin " + sql + " end;";
516516
ASSERT_TRUE(env->cluster_remote->ExecuteDDL(env->db, sp_ddl, &status)) << "fail to create procedure";
517517
ASSERT_TRUE(env->cluster_sdk->Refresh());
518518

@@ -592,14 +592,14 @@ TEST_F(APIServerTest, no_common_not_first_string) {
592592
ASSERT_TRUE(env->cluster_remote->ExecuteInsert(env->db, insert_sql, &status));
593593
// create procedure
594594
std::string sp_name = "sp1";
595-
std::string sql = "SELECT id, c1, sum(c4) OVER (w1) AS w1_c4_sum FROM trans1 "
595+
std::string sql =
596+
"SELECT id, c1, sum(c4) OVER (w1) AS w1_c4_sum FROM trans1 "
596597
"WINDOW w1 AS (PARTITION BY trans1.c1 ORDER BY trans1.c7 "
597598
"ROWS BETWEEN 2 PRECEDING AND 1 PRECEDING);";
598599

599-
std::string sp_ddl =
600-
"create procedure " + sp_name +
601-
" (id int, c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, c8 date" + ")" +
602-
" begin " + sql + " end;";
600+
std::string sp_ddl = "create procedure " + sp_name +
601+
" (id int, c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, c8 date" + ")" +
602+
" begin " + sql + " end;";
603603
ASSERT_TRUE(env->cluster_remote->ExecuteDDL(env->db, sp_ddl, &status)) << "fail to create procedure";
604604
ASSERT_TRUE(env->cluster_sdk->Refresh());
605605

@@ -656,9 +656,14 @@ TEST_F(APIServerTest, no_common_not_first_string) {
656656

657657
TEST_F(APIServerTest, getDBs) {
658658
const auto env = APIServerTestEnv::Instance();
659+
std::default_random_engine e;
660+
std::string db_name = "" + std::to_string(e() % 100000); // to avoid use exists db, e.g. api_server_test
661+
LOG(INFO) << "test on db " << db_name;
662+
663+
std::set<std::string> test_dbs = {db_name, "monkey", "shark", "zebra"};
664+
// cluster may have some dbs
665+
std::set<std::string> exists_db_set;
659666
{
660-
hybridse::sdk::Status status;
661-
env->cluster_remote->DropDB("api_server_test", &status);
662667
brpc::Controller show_cntl; // default is GET
663668
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs";
664669
env->http_channel.CallMethod(NULL, &show_cntl, NULL, NULL, NULL);
@@ -671,16 +676,25 @@ TEST_F(APIServerTest, getDBs) {
671676
ASSERT_TRUE(document.HasMember("msg"));
672677
ASSERT_STREQ("ok", document["msg"].GetString());
673678
ASSERT_TRUE(document.HasMember("dbs"));
674-
ASSERT_TRUE(document["dbs"].IsArray());
675-
ASSERT_EQ(document["dbs"].Size(), 0);
679+
auto& exists_dbs = document["dbs"];
680+
ASSERT_TRUE(exists_dbs.IsArray());
681+
for (int i = 0; i < exists_dbs.Size(); ++i) {
682+
auto db = exists_dbs[i].GetString();
683+
if (test_dbs.find(db) != test_dbs.end()) {
684+
FAIL() << "can't have test db " << db;
685+
}
686+
exists_db_set.emplace(db);
687+
}
676688
}
677689
{
678690
hybridse::sdk::Status status;
679-
std::vector<std::string> db_names = {"api_server_test", "monkey", "shark", "zebra"};
680-
for (size_t i = 0; i < db_names.size(); i++) {
681-
env->cluster_remote->DropDB(db_names[i], &status);
682-
env->cluster_remote->CreateDB(db_names[i], &status);
691+
for (auto& db : test_dbs) {
692+
// empty db can be droped
693+
env->cluster_remote->DropDB(db, &status);
694+
ASSERT_TRUE(env->cluster_remote->CreateDB(db, &status));
683695
}
696+
env->queue_svc->Refresh();
697+
684698
brpc::Controller show_cntl;
685699
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs";
686700
env->http_channel.CallMethod(NULL, &show_cntl, NULL, NULL, NULL);
@@ -694,23 +708,29 @@ TEST_F(APIServerTest, getDBs) {
694708
ASSERT_STREQ("ok", document["msg"].GetString());
695709
ASSERT_TRUE(document.HasMember("dbs"));
696710
ASSERT_TRUE(document["dbs"].IsArray());
697-
ASSERT_EQ(document["dbs"].Size(), db_names.size());
698-
std::vector<std::string> result;
711+
ASSERT_EQ(document["dbs"].Size(), test_dbs.size() + exists_db_set.size());
712+
std::set<std::string> result;
699713
for (size_t i = 0; i < document["dbs"].Size(); i++) {
700-
result.push_back(document["dbs"][i].GetString());
701-
}
702-
sort(result.begin(), result.end());
703-
for (size_t i = 0; i < document["dbs"].Size(); i++) {
704-
ASSERT_EQ(result[i], db_names[i]);
714+
result.emplace(document["dbs"][i].GetString());
705715
}
716+
717+
test_dbs.merge(exists_db_set);
718+
ASSERT_EQ(result, test_dbs);
706719
}
707720
}
708721

709722
TEST_F(APIServerTest, getTables) {
710723
const auto env = APIServerTestEnv::Instance();
724+
std::default_random_engine e;
725+
std::string db_name = "" + std::to_string(e() % 100000); // to avoid use db which has tables
726+
LOG(INFO) << "test on db " << db_name;
727+
// setup
711728
{
729+
hybridse::sdk::Status status;
730+
env->cluster_remote->CreateDB(db_name, &status);
731+
env->queue_svc->Refresh();
712732
brpc::Controller show_cntl; // default is GET
713-
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/api_server_test/tables";
733+
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/" + db_name + "/tables";
714734
env->http_channel.CallMethod(NULL, &show_cntl, NULL, NULL, NULL);
715735
ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText();
716736
butil::rapidjson::Document document;
@@ -736,12 +756,12 @@ TEST_F(APIServerTest, getTables) {
736756
" c7 timestamp,\n"
737757
" c8 date,\n"
738758
" index(key=c1, ts=c7));";
739-
ASSERT_TRUE(env->cluster_remote->ExecuteDDL(env->db, ddl, &status)) << "fail to create table";
759+
ASSERT_TRUE(env->cluster_remote->ExecuteDDL(db_name, ddl, &status)) << "fail to create table";
740760
ASSERT_TRUE(env->cluster_sdk->Refresh());
741761
}
742762
{
743763
brpc::Controller show_cntl; // default is GET
744-
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/api_server_test/tables";
764+
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/" + db_name + "/tables";
745765
env->http_channel.CallMethod(NULL, &show_cntl, NULL, NULL, NULL);
746766
ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText();
747767
butil::rapidjson::Document document;
@@ -779,7 +799,7 @@ TEST_F(APIServerTest, getTables) {
779799
}
780800
for (auto table : tables) {
781801
brpc::Controller show_cntl; // default is GET
782-
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/api_server_test/tables/" + table;
802+
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/" + db_name + "/tables/" + table;
783803
env->http_channel.CallMethod(NULL, &show_cntl, NULL, NULL, NULL);
784804
ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText();
785805
butil::rapidjson::Document document;
@@ -795,7 +815,7 @@ TEST_F(APIServerTest, getTables) {
795815
}
796816
{
797817
brpc::Controller show_cntl; // default is GET
798-
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/api_server_test/tables/not_exist";
818+
show_cntl.http_request().uri() = "http://127.0.0.1:8010/dbs/" + db_name + "/tables/not_exist";
799819
env->http_channel.CallMethod(NULL, &show_cntl, NULL, NULL, NULL);
800820
ASSERT_FALSE(show_cntl.Failed()) << show_cntl.ErrorText();
801821
butil::rapidjson::Document document;
@@ -823,6 +843,7 @@ TEST_F(APIServerTest, getTables) {
823843
env->cluster_remote->ExecuteDDL(env->db, "drop table " + table + ";", &status);
824844
ASSERT_TRUE(env->cluster_sdk->Refresh());
825845
}
846+
env->cluster_remote->DropDB(db_name, &status);
826847
}
827848

828849
} // namespace openmldb::apiserver

src/apiserver/interface_provider.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ void InterfaceProvider::registerRequest(brpc::HttpMethod type, std::string const
159159

160160
bool InterfaceProvider::handle(const std::string& path, const brpc::HttpMethod& method, const butil::IOBuf& req_body,
161161
JsonWriter& writer) {
162-
auto err = GeneralError();
162+
auto err = GeneralResp();
163163
Url url;
164164

165165
if (!ReducedUrlParser::parse(path, &url)) {

src/apiserver/interface_provider.h

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,26 @@ class InterfaceProvider {
163163
std::unordered_map<int, std::vector<BuiltRequest>> requests_;
164164
};
165165

166-
struct GeneralError {
167-
GeneralError() = default;
168-
explicit GeneralError(std::string m) : msg(std::move(m)) {}
169-
GeneralError& Set(std::string m) {
166+
struct GeneralResp {
167+
GeneralResp() = default;
168+
explicit GeneralResp(std::string m) : msg(std::move(m)) {}
169+
// If set err message without code, code will be -1
170+
GeneralResp& Set(std::string m) {
170171
msg = std::move(m);
172+
code = -1;
171173
return *this;
172174
}
173-
int code = -1;
174-
std::string msg;
175+
GeneralResp& Set(int c, std::string m) {
176+
msg = std::move(m);
177+
code = c;
178+
return *this;
179+
}
180+
int code = 0;
181+
std::string msg = "ok";
175182
};
176183

177184
template <typename Archiver>
178-
Archiver& operator&(Archiver& ar, GeneralError& s) { // NOLINT
185+
Archiver& operator&(Archiver& ar, GeneralResp& s) { // NOLINT
179186
ar.StartObject();
180187
ar.Member("code") & s.code;
181188
ar.Member("msg") & s.msg;

0 commit comments

Comments
 (0)