Skip to content

Commit 1ee15cf

Browse files
committed
release 3.0.2.0
1 parent 1019c2a commit 1ee15cf

28 files changed

+735
-523
lines changed

core/include/DBConnectionPoolImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class DBConnectionPoolImpl{
3030

3131
DBConnectionPoolImpl(const string& hostName, int port, int threadNum = 10, const string& userId = "", const string& password = "",
3232
bool loadBalance = true, bool highAvailability = true, bool compress = false, bool reConnect = false,
33-
bool python = false, PROTOCOL protocol = PROTOCOL_DDB, bool show_output = true);
33+
bool python = false, PROTOCOL protocol = PROTOCOL_DDB, bool show_output = true, int sqlStd = 0, int tryReconnectNums = -1);
3434

3535
~DBConnectionPoolImpl(){
3636
shutDown();

core/include/DdbPythonUtil.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct HIDEVISIBILITY Preserved {
116116
py::object pafloat32_;
117117
py::object pafloat64_;
118118
py::object padictionary_int32_utf8_;
119+
py::object paDictionaryType_;
119120
py::object pautf8_;
120121
py::object pafixed_size_binary_16_;
121122
py::object palarge_binary_;

core/include/DolphinDB.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class HIDEVISIBILITY ProtectGil{
8484

8585
class EXPORT_DECL DBConnection {
8686
public:
87-
DBConnection(bool enableSSL = false, bool asyncTask = false, int keepAliveTime = 7200, bool compress = false, bool python = false, bool isReverseStreaming = false);
87+
DBConnection(bool enableSSL = false, bool asyncTask = false, int keepAliveTime = 7200, bool compress = false, bool python = false, bool isReverseStreaming = false, int sqlStd = 0);
8888
virtual ~DBConnection();
8989
DBConnection(DBConnection&& oth);
9090
DBConnection& operator=(DBConnection&& oth);
@@ -95,7 +95,7 @@ class EXPORT_DECL DBConnection {
9595
* please use the login function for authentication separately.
9696
*/
9797
bool connect(const string& hostName, int port, const string& userId = "", const string& password = "", const string& initialScript = "",
98-
bool highAvailability = false, const vector<string>& highAvailabilitySites = vector<string>(), int keepAliveTime=7200, bool reconnect = false);
98+
bool highAvailability = false, const vector<string>& highAvailabilitySites = vector<string>(), int keepAliveTime=7200, bool reconnect = false, int tryReconnectNums = -1, int readTimeout = -1, int writeTimeout = -1);
9999

100100
/**
101101
* Log onto the DolphinDB server using the given userId and password. If the parameter enableEncryption
@@ -155,6 +155,7 @@ class EXPORT_DECL DBConnection {
155155
int fetchSize=0, bool clearMemory=false,
156156
bool pickleTableToList=false, bool disableDecimal=false);
157157
void setKeepAliveTime(int keepAliveTime);
158+
void setTimeout(int readTimeout, int writeTimeout);
158159
const string getSessionId() const;
159160
void setProtocol(PROTOCOL protocol);
160161
void setShowOutput(bool flag);
@@ -208,7 +209,7 @@ class EXPORT_DECL DBConnection {
208209
PROTOCOL protocol_;
209210
bool reconnect_, closed_;
210211
bool msg_;
211-
static const int maxRerunCnt_ = 30;
212+
int tryReconnectNums_;
212213
};
213214

214215
class EXPORT_DECL BlockReader : public Constant{
@@ -234,7 +235,7 @@ class EXPORT_DECL DBConnectionPool{
234235
public:
235236
DBConnectionPool(const string& hostName, int port, int threadNum = 10, const string& userId = "", const string& password = "",
236237
bool loadBalance = false, bool highAvailability = false, bool compress = false, bool reConnect = false, bool python = false,
237-
PROTOCOL protocol = PROTOCOL_DDB, bool showOutput = true);
238+
PROTOCOL protocol = PROTOCOL_DDB, bool showOutput = true, int sqlStd = 0, int tryReconnectNums = -1);
238239
virtual ~DBConnectionPool();
239240

240241
void run(const string& script, int identity, int priority=4, int parallelism=64, int fetchSize=0, bool clearMemory = false);

core/include/DolphinDBImp.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ class DdbInit {
2424

2525
class DBConnectionImpl {
2626
public:
27-
DBConnectionImpl(bool sslEnable = false, bool asynTask = false, int keepAliveTime = 7200, bool compress = false, bool python = false, bool isReverseStreaming = false);
27+
DBConnectionImpl(bool sslEnable = false, bool asynTask = false, int keepAliveTime = 7200, bool compress = false, bool python = false, bool isReverseStreaming = false, int sqlStd = 0);
2828
~DBConnectionImpl();
29-
bool connect(const string& hostName, int port, const string& userId = "", const string& password = "",bool sslEnable = false, bool asynTask = false, int keepAliveTime = -1, bool compress= false, bool python = false);
29+
bool connect(const string& hostName, int port, const string& userId = "", const string& password = "",bool sslEnable = false, bool asynTask = false, int keepAliveTime = -1, bool compress= false, bool python = false, int readTimeout = -1, int writeTimeout = -1);
3030
void login(const string& userId, const string& password, bool enableEncryption);
3131
ConstantSP run(const string& script, int priority = 4, int parallelism = 64, int fetchSize = 0, bool clearMemory = false);
3232
ConstantSP run(const string& funcName, vector<ConstantSP>& args, int priority = 4, int parallelism = 64, int fetchSize = 0, bool clearMemory = false);
@@ -56,7 +56,14 @@ class DBConnectionImpl {
5656
int fetchSize = 0, bool clearMemory = false,
5757
bool pickleTableToList = false, bool disableDecimal = false);
5858
void setkeepAliveTime(int keepAliveTime){
59-
keepAliveTime_ = keepAliveTime;
59+
if (keepAliveTime > 0)
60+
keepAliveTime_ = keepAliveTime;
61+
}
62+
void setTimeout(int readTimeout, int writeTimeout) {
63+
if (readTimeout > 0)
64+
readTimeout_ = readTimeout;
65+
if (writeTimeout > 0)
66+
writeTimeout_ = writeTimeout;
6067
}
6168
const string getSessionId() const {
6269
return sessionId_;
@@ -91,6 +98,9 @@ class DBConnectionImpl {
9198
bool msg_;
9299
static DdbInit ddbInit_;
93100
bool isReverseStreaming_;
101+
int sqlStd_;
102+
int readTimeout_;
103+
int writeTimeout_;
94104
DataInputStreamSP inputStream_;
95105
Mutex mutex_;
96106
};

core/include/SysIO.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,18 @@ class EXPORT_DECL Socket{
7171
bool isBlockingMode() const {return blocking_;}
7272
bool isValid();
7373
void setAutoClose(bool option) { autoClose_ = option;}
74+
void setTimeout(int readTimeout, int writeTimeout);
7475
static void enableTcpNoDelay(bool enable);
7576
static bool ENABLE_TCP_NODELAY;
7677
static void setTcpTimeout(unsigned int timeout);
7778
static unsigned int TcpTimeout;
7879
bool skipAll();
7980

8081
private:
81-
void getTimeout(int &timeoutMs);
82-
void setTimeout(int timeoutMs);
82+
void getReadTimeout(int &timeoutMs);
83+
void getWriteTimeout(int &timeoutMs);
84+
void setReadTimeout(int timeoutMs);
85+
void setWriteTimeout(int timeoutMs);
8386
bool setNonBlocking();
8487
bool setBlocking();
8588
bool setTcpNoDelay();
@@ -99,6 +102,8 @@ class EXPORT_DECL Socket{
99102
SSL_CTX* ctx_;
100103
SSL* ssl_;
101104
int keepAliveTime_;
105+
int readTimeout_;
106+
int writeTimeout_;
102107
};
103108

104109
class EXPORT_DECL UdpSocket{

core/src/DBConnectionPoolImpl.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
namespace dolphindb {
66

77
DBConnectionPoolImpl::DBConnectionPoolImpl(const string& hostName, int port, int threadNum, const string& userId, const string& password,
8-
bool loadBalance, bool highAvailability, bool compress,bool reConnect, bool python, PROTOCOL protocol, bool show_output) :shutDownFlag_(
8+
bool loadBalance, bool highAvailability, bool compress,bool reConnect, bool python, PROTOCOL protocol, bool show_output, int sqlStd, int tryReconnectNums) :shutDownFlag_(
99
false), queue_(new SynchronizedQueue<Task>){
1010
latch_ = new CountDownLatch(threadNum);
1111
if(!loadBalance){
1212
for(int i = 0 ;i < threadNum; i++){
13-
SmartPointer<DBConnection> conn = new DBConnection(false, false, 7200, compress, python);
13+
SmartPointer<DBConnection> conn = new DBConnection(false, false, 7200, compress, python, false, sqlStd);
1414
conn->setProtocol(protocol);
1515
conn->setShowOutput(show_output);
16-
bool ret = conn->connect(hostName, port, userId, password, "", highAvailability, {},7200, reConnect);
16+
bool ret = conn->connect(hostName, port, userId, password, "", highAvailability, {},7200, reConnect, tryReconnectNums);
1717
if(!ret)
1818
throw IOException("Failed to connect to " + hostName + ":" + std::to_string(port));
1919
sessionIds_.push_back(conn->getSessionId());
@@ -23,8 +23,8 @@ DBConnectionPoolImpl::DBConnectionPoolImpl(const string& hostName, int port, int
2323
}
2424
}
2525
else{
26-
SmartPointer<DBConnection> entryPoint = new DBConnection(false, false, 7200, compress, python);
27-
bool ret = entryPoint->connect(hostName, port, userId, password, "", highAvailability, {},7200, reConnect);
26+
SmartPointer<DBConnection> entryPoint = new DBConnection(false, false, 7200, compress, python, false, sqlStd);
27+
bool ret = entryPoint->connect(hostName, port, userId, password, "", highAvailability, {},7200, reConnect, tryReconnectNums);
2828
if(!ret)
2929
throw IOException("Failed to connect to " + hostName + ":" + std::to_string(port));
3030
ConstantSP nodes = entryPoint->run("rpc(getControllerAlias(), getClusterLiveDataNodes{false})");
@@ -40,12 +40,12 @@ DBConnectionPoolImpl::DBConnectionPoolImpl(const string& hostName, int port, int
4040
ports[i] = std::atoi(fields.substr(p + 1, fields.size()).data());
4141
}
4242
for(int i = 0 ;i < threadNum; i++){
43-
SmartPointer<DBConnection> conn = new DBConnection(false, false, 7200, compress, python);
43+
SmartPointer<DBConnection> conn = new DBConnection(false, false, 7200, compress, python, false, sqlStd);
4444
conn->setProtocol(protocol);
4545
conn->setShowOutput(show_output);
4646
string &curhost = hosts[i % nodeCount];
4747
int &curport = ports[i % nodeCount];
48-
bool ret = conn->connect(curhost, curport, userId, password, "", highAvailability, {}, 7200, reConnect);
48+
bool ret = conn->connect(curhost, curport, userId, password, "", highAvailability, {}, 7200, reConnect, tryReconnectNums);
4949
if(!ret)
5050
throw IOException("Failed to connect to " + curhost + ":" + std::to_string(curport));
5151
sessionIds_.push_back(conn->getSessionId());

core/src/DdbPythonUtil.cpp

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "DdbPythonUtil.h"
2+
#include <pybind11/pytypes.h>
23
#include "Concurrent.h"
34
#include "ConstantImp.h"
45
#include "ConstantMarshall.h"
@@ -86,6 +87,7 @@ Preserved::Preserved(){
8687
patimestamp_ns_ = pyarrow_.attr("timestamp")("ns");
8788
pautf8_ = pyarrow_.attr("utf8")();
8889
padictionary_int32_utf8_ = pyarrow_.attr("dictionary")(paint32_, pautf8_);
90+
paDictionaryType_ = pyarrow_.attr("DictionaryType");
8991
pafixed_size_binary_16_ = pyarrow_.attr("binary")(16);
9092
palarge_binary_ = pyarrow_.attr("large_binary")();
9193
padecimal128_ = pyarrow_.attr("Decimal128Type");
@@ -2190,22 +2192,35 @@ ConstantSP _toDolphinDB_Vector_SeriesOrIndex(
21902192
break;
21912193
}
21922194
case DT_SYMBOL: {
2193-
obj = DdbPythonUtil::preserved_->pyarrow_.attr("array")(obj.attr("array"));
2194-
if (py::isinstance(obj, DdbPythonUtil::preserved_->pachunkedarray_))
2195-
obj = obj.attr("combine_chunks")();
2196-
py::array_t<int32_t> indices = DdbPythonUtil::preserved_->pdseries_(obj.attr("indices"), "dtype"_a=DdbPythonUtil::preserved_->pdarrowdtype_(DdbPythonUtil::preserved_->paint32_))
2195+
if (py::isinstance(obj, DdbPythonUtil::preserved_->paDictionaryType_)) {
2196+
obj = DdbPythonUtil::preserved_->pyarrow_.attr("array")(obj.attr("array"));
2197+
if (py::isinstance(obj, DdbPythonUtil::preserved_->pachunkedarray_))
2198+
obj = obj.attr("combine_chunks")();
2199+
py::array_t<int32_t> indices = DdbPythonUtil::preserved_->pdseries_(obj.attr("indices"), "dtype"_a=DdbPythonUtil::preserved_->pdarrowdtype_(DdbPythonUtil::preserved_->paint32_))
21972200
.attr("to_numpy")("dtype"_a="int32", "na_value"_a=INT_MIN);
2198-
py::array dictionary = obj.attr("dictionary");
2199-
vector<std::string> dict_string;
2200-
dict_string.reserve(dictionary.size());
2201-
for (auto &it : dictionary) {
2202-
dict_string.push_back(py::cast<std::string>(it));
2201+
py::array dictionary = obj.attr("dictionary");
2202+
vector<std::string> dict_string;
2203+
dict_string.reserve(dictionary.size());
2204+
for (auto &it : dictionary) {
2205+
dict_string.push_back(py::cast<std::string>(it));
2206+
}
2207+
size_t size = indices.size();
2208+
ddbVec = Util::createVector(DT_SYMBOL, size, size);
2209+
const int32_t *data = indices.data();
2210+
for (int it = 0; it < size; ++it) {
2211+
ddbVec->setString(it, data[it] == INT_MIN ? "" : dict_string[data[it]]);
2212+
}
22032213
}
2204-
size_t size = indices.size();
2205-
ddbVec = Util::createVector(DT_SYMBOL, size, size);
2206-
const int32_t *data = indices.data();
2207-
for (int it = 0; it < size; ++it) {
2208-
ddbVec->setString(it, data[it] == INT_MIN ? "" : dict_string[data[it]]);
2214+
else {
2215+
// maybe string or others
2216+
py::array series_array = obj.attr("to_numpy")("dtype"_a="object", "na_value"_a="");
2217+
size_t size = series_array.size();
2218+
ddbVec = Util::createVector(typeInfer, size, size);
2219+
int index = 0;
2220+
for (auto &it : series_array) {
2221+
ddbVec->setString(index, py::cast<std::string>(it));
2222+
++index;
2223+
}
22092224
}
22102225
break;
22112226
}

0 commit comments

Comments
 (0)