Skip to content

Commit 59cf291

Browse files
committed
release 2.0.11.0
1 parent 9b9e7f7 commit 59cf291

File tree

14 files changed

+212
-138
lines changed

14 files changed

+212
-138
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ To use DolphinDB Python SDK, you'll need:
1010
- CPython: version 3.6 and newer
1111
- DolphinDB Server
1212
- Packages:
13-
- NumPy: version 1.18 to 1.24.4
13+
- NumPy: version 1.18 and newer
1414
- pandas: version 1.0.0 and newer, but not version 1.3.0
1515
- future
1616
- Extension Packages:

core/include/Streaming.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,15 @@ class EXPORT_DECL StreamDeserializer {
8686
//symbol->[dbPath,tableName], dbPath can be empty for table in memory.
8787
StreamDeserializer(const unordered_map<string, pair<string, string>> &sym2tableName, DBConnection *pconn = nullptr);
8888
StreamDeserializer(const unordered_map<string, DictionarySP> &sym2schema);
89+
// do not use this constructor if there are decimal or decimal-array columns (need schema to get decimal scale)
8990
StreamDeserializer(const unordered_map<string, vector<DATA_TYPE>> &symbol2col);
9091
bool parseBlob(const ConstantSP &src, vector<VectorSP> &rows, vector<string> &symbols, ErrorCodeInfo &errorInfo);
9192
private:
9293
void create(DBConnection &conn);
9394
void parseSchema(const unordered_map<string, DictionarySP> &sym2schema);
9495
unordered_map<string, pair<string, string>> sym2tableName_;
9596
unordered_map<string, vector<DATA_TYPE>> symbol2col_;
97+
unordered_map<string, vector<int>> symbol2scale_;
9698
Mutex mutex_;
9799
friend class StreamingClientImpl;
98100
};

core/src/ConstantImp.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,8 @@ int StringVector::serialize(char* buf, int bufSize, INDEX indexStart, int offset
451451
if (!blob_) {
452452
while (bufSize > 0 && indexStart < size_) {
453453
const string& str = data_[indexStart];
454-
if (str.size() >= 65536) {
455-
throw RuntimeException("String too long, Serialization failed, length must be less than 64K bytes.");
454+
if (str.size() >= 262144) {
455+
throw RuntimeException("String too long, Serialization failed, length must be less than 256K bytes.");
456456
}
457457
int len = str.size() + 1 - offset;
458458
if (bufSize >= len) {

core/src/ConstantMarshall.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ short ConstantMarshallImp::encodeFlag(const ConstantSP& target, bool compress){
2222
if (target->isTable()) {
2323
flag += ((Table*)target.get())->getTableType();
2424
}else
25-
flag += (target->isVector() && target->getType() == DT_SYMBOL) ? DT_SYMBOL + 128 : target->getType();
25+
flag += (target->isVector() && target->getType() == DT_SYMBOL && target->size() != 0) ? DT_SYMBOL + 128 : target->getType();
2626
}
2727
else {
2828
if (target->isTable())

core/src/DdbPythonUtil.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,9 +1783,11 @@ ConstantSP toDolphinDB_Vector_SeriesOrIndex(py::object obj, Type typeIndicator,
17831783
}
17841784
case DT_UUID:
17851785
case DT_INT128: {
1786+
char tmp[16] = {0};
1787+
py::bytes na_value = py::bytes(tmp, 16);
17861788
py::array values = DdbPythonUtil::preserved_->pdseries_(obj.attr("values"),
17871789
"dtype"_a=DdbPythonUtil::preserved_->pdarrowdtype_(DdbPythonUtil::preserved_->pafixed_size_binary_16_))
1788-
.attr("to_numpy")("dtype"_a="object", "na_value"_a="");
1790+
.attr("to_numpy")("dtype"_a="object", "na_value"_a=na_value);
17891791
size_t size = values.size();
17901792
VectorSP valVec = Util::createVector(elemType.first, size, size);
17911793
int index = 0;
@@ -1993,7 +1995,9 @@ ConstantSP toDolphinDB_Vector_SeriesOrIndex(py::object obj, Type typeIndicator,
19931995
}
19941996
case DT_UUID:
19951997
case DT_INT128: {
1996-
py::array series_array = obj.attr("to_numpy")("dtype"_a="object", "na_value"_a="");
1998+
char tmp[16] = {0};
1999+
py::bytes na_value = py::bytes(tmp, 16);
2000+
py::array series_array = obj.attr("to_numpy")("dtype"_a="object", "na_value"_a=na_value);
19972001
size_t size = series_array.size();
19982002
ddbVec = Util::createVector(typeInfer, size, size);
19992003
int index = 0;

core/src/DolphinDB.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2595,8 +2595,8 @@ int SymbolBase::serialize(char* buf, int bufSize, INDEX indexStart, int offset,
25952595
int index = indexStart;
25962596
int initSize = bufSize;
25972597
while(index < (int)syms_.size() && bufSize > 0){
2598-
if (syms_[index].size() >= 65536) {
2599-
throw RuntimeException("String too long, Serialization failed, length must be less than 64K bytes.");
2598+
if (syms_[index].size() >= 262144) {
2599+
throw RuntimeException("String too long, Serialization failed, length must be less than 256K bytes.");
26002600
}
26012601
int size = std::min(bufSize, (int)syms_[index].size() + 1 - offset);
26022602
memcpy(buf, syms_[index].data() + offset, size);

core/src/ScalarImp.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,10 @@ IO_ERR Void::deserialize(DataInputStream* in, INDEX indexStart, INDEX targetNumE
212212

213213
int String::serialize(char* buf, int bufSize, INDEX indexStart, int offset, int& numElement, int& partial) const {
214214
int len = val_.size();
215-
if (len >= 65536) {
216-
throw RuntimeException("String too long, Serialization failed, length must be less than 64K bytes.");
217-
}
218215
if (!blob_) {
216+
if (len >= 262144) {
217+
throw RuntimeException("String too long, Serialization failed, length must be less than 256K bytes.");
218+
}
219219
if (offset > len)
220220
return -1;
221221
if (bufSize >= len - offset + 1) {
@@ -266,6 +266,10 @@ IO_ERR String::deserialize(DataInputStream* in, INDEX indexStart, INDEX targetNu
266266
size_t acLen = 0;
267267
if ((ret = in->readInt(len)) != OK)
268268
return ret;
269+
if (len == 0) {
270+
val_.clear();
271+
return OK;
272+
}
269273
std::unique_ptr<char[]> buf(new char[len]);
270274
if ((ret = in->readBytes(buf.get(), len, acLen)) != OK)
271275
return ret;

core/src/Streaming.cpp

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -289,30 +289,52 @@ bool StreamDeserializer::parseBlob(const ConstantSP &src, vector<VectorSP> &rows
289289
INDEX rowSize = symbolVec->rows();
290290
rows.resize(rowSize);
291291
symbols.resize(rowSize);
292-
unordered_map<string, vector<DATA_TYPE>>::iterator iter;
292+
unordered_map<string, vector<DATA_TYPE>>::iterator colTypeIter;
293+
unordered_map<string, vector<int>>::iterator colScaleIter;
293294
for (INDEX rowIndex = 0; rowIndex < rowSize; rowIndex++) {
294295
string symbol = symbolVec->getString(rowIndex);
295296
{
296297
LockGuard<Mutex> lock(&mutex_);
297-
iter = symbol2col_.find(symbol);
298-
if (iter == symbol2col_.end()) {
299-
errorInfo.set(ErrorCodeInfo::EC_InvalidParameter, string("Unknow symbol ") + symbol);
298+
colTypeIter = symbol2col_.find(symbol);
299+
colScaleIter = symbol2scale_.find(symbol);
300+
if (colTypeIter == symbol2col_.end()) {
301+
errorInfo.set(ErrorCodeInfo::EC_InvalidParameter, string("Unknown symbol ") + symbol);
300302
return false;
301303
}
302304
}
303305
symbols[rowIndex] = std::move(symbol);
304-
vector<DATA_TYPE> &cols = iter->second;
306+
307+
vector<DATA_TYPE> &cols = colTypeIter->second;
308+
vector<int> scales;
309+
if (colScaleIter != symbol2scale_.end()) {
310+
scales = colScaleIter->second;
311+
}
312+
305313
const string &blob = blobVec->getStringRef(rowIndex);
306314
DataInputStreamSP dis = new DataInputStream(blob.data(), blob.size(), false);
307315
INDEX num;
308316
IO_ERR ioError;
309317
ConstantSP value;
310318
int colIndex = 0;
311319
VectorSP rowVec = Util::createVector(DT_ANY, cols.size());
312-
for (auto &colOne : cols) {
320+
for (auto i = 0; i < (int)cols.size(); ++i) {
321+
auto &colOne = cols[i];
313322
num = 0;
323+
auto scale = 0;
324+
if (Util::getCategory(colOne) == DENARY || colOne == DT_DECIMAL32_ARRAY ||
325+
colOne == DT_DECIMAL64_ARRAY || colOne == DT_DECIMAL128_ARRAY) {
326+
if (scales.empty()) {
327+
errorInfo.set(
328+
ErrorCodeInfo::EC_InvalidParameter,
329+
string("Unknown scale for decimal. StreamDeserializer should be initialized with sym2schema")
330+
);
331+
return false;
332+
}
333+
scale = scales[i];
334+
}
335+
314336
if (colOne < ARRAY_TYPE_BASE) {
315-
value = Util::createConstant(colOne);
337+
value = Util::createConstant(colOne, scale);
316338
ioError = value->deserialize(dis.get(), 0, 1, num);
317339
if (ioError != OK) {
318340
errorInfo.set(ErrorCodeInfo::EC_InvalidObject, "Deserialize blob error " + std::to_string(ioError));
@@ -321,7 +343,7 @@ bool StreamDeserializer::parseBlob(const ConstantSP &src, vector<VectorSP> &rows
321343
rowVec->set(colIndex, value);
322344
}
323345
else {
324-
value = Util::createArrayVector(colOne, 1);
346+
value = Util::createArrayVector(colOne, 1, 1, true, scale);
325347
ioError = value->deserialize(dis.get(), 0, 1, num);
326348
if (ioError != OK) {
327349
errorInfo.set(ErrorCodeInfo::EC_InvalidObject, "Deserialize blob error " + std::to_string(ioError));
@@ -336,22 +358,29 @@ bool StreamDeserializer::parseBlob(const ConstantSP &src, vector<VectorSP> &rows
336358
return true;
337359
}
338360
void StreamDeserializer::parseSchema(const unordered_map<string, DictionarySP> &sym2schema) {
361+
LockGuard<Mutex> lock(&mutex_);
339362
for (auto &one : sym2schema) {
340363
const DictionarySP &schema = one.second;
341364
TableSP colDefs = schema->getMember("colDefs");
342-
ConstantSP colDefsTypeInt = colDefs->getColumn("typeInt");
343-
ConstantSP colDefsTypeString = colDefs->getColumn("typeString");
344365
size_t columnSize = colDefs->size();
345366

346-
vector<DATA_TYPE> colTypes;
347-
//tableInfo.colNames.resize(columnSize);
348-
colTypes.resize(columnSize);
349-
for (size_t i = 0; i < columnSize; i++) {
350-
colTypes[i] = (DATA_TYPE)colDefsTypeInt->getInt(i);
351-
//tableInfo.colNames[i] = colDefsTypeString->getString(i);
352-
}
353-
LockGuard<Mutex> lock(&mutex_);
354-
symbol2col_[one.first] = colTypes;
367+
// types
368+
ConstantSP colDefsTypeInt = colDefs->getColumn("typeInt");
369+
vector<DATA_TYPE> colTypes(columnSize);
370+
for (auto i = 0; i < (int)columnSize; i++) {
371+
colTypes[i] = (DATA_TYPE)colDefsTypeInt->getInt(i);
372+
}
373+
symbol2col_[one.first] = colTypes;
374+
375+
// scales for decimals (server 130 doesn't have this column)
376+
if (colDefs->contain("extra")) {
377+
ConstantSP colDefsScales = colDefs->getColumn("extra");
378+
vector<int> colScales(columnSize);
379+
for (auto i = 0; i < (int)columnSize; i++) {
380+
colScales[i] = colDefsScales->getInt(i);
381+
}
382+
symbol2scale_[one.first] = colScales;
383+
}
355384
}
356385
}
357386

dolphindb/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from .database import Database
2020
from .utils import month
2121

22-
__version__ = "1.30.22.6"
22+
__version__ = "2.0.11.0"
2323

2424
name = "dolphindb"
2525

dolphindb/session.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ def shutDown(self):
266266
self.loop = None
267267
self.thread = None
268268

269-
def getSessionId(self) -> List[int]:
269+
def getSessionId(self) -> List[str]:
270270
"""Obtain Session ID of all sessions.
271271
272272
Returns:
@@ -525,7 +525,7 @@ def runFile(self, filepath: str, *args, **kwargs):
525525
script = fp.read()
526526
return self.run(script, *args, **kwargs)
527527

528-
def getSessionId(self) -> int:
528+
def getSessionId(self) -> str:
529529
"""Get the Session ID of the current Session.
530530
531531
Returns:
@@ -551,7 +551,7 @@ def subscribe(
551551
self, host: str, port: int, handler: Callable, tableName: str, actionName: str = None,
552552
offset: int = -1, resub: bool = False, filter=None,
553553
msgAsTable: bool = False, batchSize: int = 0, throttle: float = 1.0,
554-
userName: str = None, password: str = None, streamDeserializer: Optional[Type["streamDeserializer"]] = None
554+
userName: str = None, password: str = None, streamDeserializer: Optional["streamDeserializer"] = None
555555
) -> None:
556556
"""Subscribe to stream tables in DolphinDB.
557557
@@ -666,7 +666,7 @@ def saveTable(self, tbl: Table, dbPath: str) -> bool:
666666
self.run(s2)
667667
return True
668668

669-
def loadText(self, remoteFilePath: str, delimiter: str = ",") -> Type["Table"]:
669+
def loadText(self, remoteFilePath: str, delimiter: str = ",") -> "Table":
670670
"""Import text files into DolphinDB as an in-memory table.
671671
672672
Args:
@@ -685,9 +685,9 @@ def loadText(self, remoteFilePath: str, delimiter: str = ",") -> Type["Table"]:
685685
return Table(data=tableName, s=self, isMaterialized=True)
686686

687687
def loadTextEx(
688-
self, dbPath: str, tableName: str, partitionColumns: Optional[List[str]] = None,
689-
remoteFilePath: str = None, delimiter: str = ","
690-
) -> Type["Table"]:
688+
self, dbPath: str, tableName: str, partitionColumns: Optional[List[str]] = None,
689+
remoteFilePath: str = None, delimiter: str = ",", sortColumns: Optional[List[str]] = None,
690+
) -> "Table":
691691
"""Import a partitioned in-memory table.
692692
693693
Args:
@@ -696,35 +696,43 @@ def loadTextEx(
696696
partitionColumns : list of strings indicating the partitioning columns. Defaults to None.
697697
remoteFilePath : remote file path. Defaults to None.
698698
delimiter : delimiter of each column. Defaults to ",".
699+
sortColumns : list of strings indicating the sort columns. Defaults to None.
699700
700701
Returns:
701702
a DolphinDB Table object.
702703
"""
703704
if partitionColumns is None:
704705
partitionColumns = []
706+
if sortColumns is None:
707+
sortColumns = []
705708
isDBPath = True
706709
if "/" in dbPath or "\\" in dbPath or "dfs://" in dbPath:
707-
dbstr ='db=database("' + dbPath + '")'
710+
dbstr = 'db=database("' + dbPath + '")'
708711
self.run(dbstr)
709-
tbl_str = '{tableNameNEW} = loadTextEx(db, "{tableName}", {partitionColumns}, "{remoteFilePath}", {delimiter})'
712+
tbl_str = '{tableNameNEW} = loadTextEx(db, "{tableName}", {partitionColumns}, "{remoteFilePath}", {delimiter} {extraParams})'
710713
else:
711714
isDBPath = False
712-
tbl_str = '{tableNameNEW} = loadTextEx('+dbPath+', "{tableName}", {partitionColumns}, "{remoteFilePath}", {delimiter})'
715+
tbl_str = '{tableNameNEW} = loadTextEx("{dbPath}", "{tableName}", {partitionColumns}, "{remoteFilePath}", {delimiter} {extraParams})'
713716
fmtDict = dict()
714717
fmtDict['tableNameNEW'] = _generate_tablename()
718+
fmtDict['dbPath'] = dbPath
715719
fmtDict['tableName'] = tableName
716-
fmtDict['partitionColumns'] = str(partitionColumns)
720+
fmtDict['partitionColumns'] = '[' + ','.join([f'"{_}"' for _ in partitionColumns]) + ']'
717721
fmtDict['remoteFilePath'] = remoteFilePath if remoteFilePath is not None else ""
718722
fmtDict['delimiter'] = delimiter
719-
# tbl_str = tableName+'=loadTextEx(db,"' + tableName + '",'+ str(partitionColumns) +',"'+ remoteFilePath+"\",'"+delimiter+"')"
723+
if sortColumns:
724+
extraParams = ", sortColumns=" + '[' + ','.join([f'"{_}"' for _ in sortColumns]) + ']'
725+
else:
726+
extraParams = ""
727+
fmtDict['extraParams'] = extraParams
720728
tbl_str = re.sub(' +', ' ', tbl_str.format(**fmtDict).strip())
721729
self.run(tbl_str)
722730
if isDBPath:
723731
return Table(data=fmtDict['tableName'], dbPath=dbPath, s=self)
724732
else:
725733
return Table(data=fmtDict['tableNameNEW'], s=self)
726734

727-
def ploadText(self, remoteFilePath: str, delimiter: str = ",") -> Type["Table"]:
735+
def ploadText(self, remoteFilePath: str, delimiter: str = ",") -> "Table":
728736
"""Import text files in parallel into DolphinDB as a partitioned in-memory table, which is faster than method loadText.
729737
730738
Args:
@@ -742,7 +750,7 @@ def ploadText(self, remoteFilePath: str, delimiter: str = ",") -> Type["Table"]:
742750
def table(
743751
self, dbPath: str = None, data=None, tableAliasName: str = None,
744752
inMem: bool = False, partitions: Optional[List[str]] = None
745-
) -> Type["Table"]:
753+
) -> "Table":
746754
"""Create a DolphinDB table object and upload it to the server.
747755
748756
Deprecated:
@@ -762,7 +770,7 @@ def table(
762770
partitions = []
763771
return Table(dbPath=dbPath, data=data, tableAliasName=tableAliasName, inMem=inMem, partitions=partitions, s=self, isMaterialized=True)
764772

765-
def loadTable(self, tableName: str, dbPath: Optional[str] = None, partitions=None, memoryMode: bool = False) -> Type["Table"]:
773+
def loadTable(self, tableName: str, dbPath: Optional[str] = None, partitions=None, memoryMode: bool = False) -> "Table":
766774
"""Load a DolphinDB table.
767775
768776
Args:
@@ -828,7 +836,7 @@ def myStr(x):
828836
else:
829837
return Table(data=tableName, s=self, needGC=False, isMaterialized=True)
830838

831-
def loadTableBySQL(self, tableName: str, dbPath: str, sql: str) -> Type["Table"]:
839+
def loadTableBySQL(self, tableName: str, dbPath: str, sql: str) -> "Table":
832840
"""Load records that satisfy the filtering conditions in a SQL query as a partitioned in-memory table.
833841
834842
Args:
@@ -850,7 +858,7 @@ def loadTableBySQL(self, tableName: str, dbPath: str, sql: str) -> Type["Table"]
850858
def database(
851859
self, dbName: str = None, partitionType: int = None, partitions=None,
852860
dbPath: str = None, engine: str = None, atomic: str = None, chunkGranularity: str = None
853-
) -> Type["Database"]:
861+
) -> "Database":
854862
"""Create database.
855863
856864
Args:

0 commit comments

Comments
 (0)