Skip to content

Commit 1423136

Browse files
committed
refector py table session
1 parent d171a53 commit 1423136

22 files changed

+215
-99
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
from typing import Union
19+
20+
from iotdb.Session import Session
21+
from iotdb.utils.NumpyTablet import NumpyTablet
22+
from iotdb.utils.SessionDataSet import SessionDataSet
23+
from iotdb.utils.Tablet import Tablet
24+
25+
26+
class TableSession(object):
27+
28+
def __init__(self, **kwargs):
29+
self.__session_pool = kwargs.get("__session_pool", None)
30+
if self.__session_pool is None:
31+
__node_urls = kwargs.get("node_urls", ["localhost:6667"])
32+
__username = kwargs.get("username", Session.DEFAULT_USER)
33+
__password = kwargs.get("password", Session.DEFAULT_PASSWORD)
34+
__database = kwargs.get("database", None)
35+
__query_timeout_in_ms = kwargs.get("query_timeout_in_ms", 60000)
36+
__fetch_size = kwargs.get("fetch_size", 5000)
37+
__zone_id = kwargs.get("zone_id", Session.DEFAULT_ZONE_ID)
38+
self.__session = Session.init_from_node_urls(
39+
__node_urls,
40+
__username,
41+
__password,
42+
__database,
43+
__query_timeout_in_ms,
44+
__fetch_size,
45+
)
46+
self.__session.open(kwargs.get("enable_rpc_compression", False))
47+
else:
48+
self.__session = self.__session_pool.get_session()
49+
50+
def insert(self, tablet: Union[Tablet | NumpyTablet]):
51+
self.__session.insert_relational_tablet(tablet)
52+
53+
def execute_non_query_statement(self, sql: str):
54+
self.__session.execute_non_query_statement(sql)
55+
56+
def execute_query_statement(
57+
self, sql: str, timeout_in_ms: int = 0
58+
) -> SessionDataSet:
59+
return self.__session.execute_query_statement(sql, timeout_in_ms)
60+
61+
def close(self):
62+
if self.__session_pool is None:
63+
self.__session.close()
64+
else:
65+
self.__session_pool.put_back(self.__session)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
from iotdb.Session import Session
19+
from iotdb.SessionPool import SessionPool, PoolConfig
20+
from iotdb.table_session import TableSession
21+
22+
23+
class TableSessionPool(object):
24+
25+
def __init__(self, **kwargs):
26+
pool_config = PoolConfig(
27+
node_urls=kwargs.get("node_urls", ["localhost:6667"]),
28+
user_name=kwargs.get("username", Session.DEFAULT_USER),
29+
password=kwargs.get("password", Session.DEFAULT_PASSWORD),
30+
fetch_size=kwargs.get("fetch_size", 5000),
31+
time_zone=kwargs.get("zone_id", Session.DEFAULT_ZONE_ID),
32+
max_retry=3,
33+
)
34+
max_pool_size = 5
35+
wait_timeout_in_ms = 3000
36+
self.__session_pool = SessionPool(
37+
pool_config, max_pool_size, wait_timeout_in_ms
38+
)
39+
40+
def get_session(self) -> TableSession:
41+
return TableSession(config={"__session_pool": self.__session_pool})
42+
43+
def close(self):
44+
self.__session_pool.close()

iotdb-client/client-py/TableModelSessionExample.py renamed to iotdb-client/client-py/table_model_session_example.py

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@
1717
#
1818
import numpy as np
1919

20-
from iotdb.Session import Session
20+
from iotdb.table_session import TableSession
2121
from iotdb.utils.IoTDBConstants import TSDataType
2222
from iotdb.utils.NumpyTablet import NumpyTablet
2323
from iotdb.utils.Tablet import ColumnType, Tablet
2424

2525
# creating session connection.
26-
ip = "127.0.0.1"
27-
port_ = "6667"
28-
username_ = "root"
29-
password_ = "root"
30-
3126
# don't specify database in constructor
32-
session = Session(ip, port_, username_, password_, sql_dialect="table", database="db1")
33-
session.open(False)
27+
config = {
28+
"node_urls": ["localhost:6667"],
29+
"username": "root",
30+
"password": "root",
31+
"time_zone": "UTC+8",
32+
}
33+
session = TableSession(**config)
3434

3535
session.execute_non_query_statement("CREATE DATABASE test1")
3636
session.execute_non_query_statement("CREATE DATABASE test2")
@@ -64,10 +64,14 @@
6464
session.close()
6565

6666
# specify database in constructor
67-
session = Session(
68-
ip, port_, username_, password_, sql_dialect="table", database="test1"
69-
)
70-
session.open(False)
67+
config = {
68+
"node_urls": ["localhost:6667"],
69+
"username": "root",
70+
"password": "root",
71+
"time_zone": "UTC+8",
72+
"database": "test1",
73+
}
74+
session = TableSession(**config)
7175

7276
# show tables from current database
7377
with session.execute_query_statement("SHOW TABLES") as session_data_set:
@@ -87,9 +91,13 @@
8791

8892
session.close()
8993

90-
# insert tablet by insert_relational_tablet
91-
session = Session(ip, port_, username_, password_, sql_dialect="table")
92-
session.open(False)
94+
# insert data by tablet
95+
config = {
96+
"node_urls": ["localhost:6667"],
97+
"username": "root",
98+
"password": "root",
99+
}
100+
session = TableSession(**config)
93101
session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS db1")
94102
session.execute_non_query_statement('USE "db1"')
95103
session.execute_non_query_statement(
@@ -115,7 +123,7 @@
115123
timestamps.append(row)
116124
values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
117125
tablet = Tablet("table5", column_names, data_types, values, timestamps, column_types)
118-
session.insert_relational_tablet(tablet)
126+
session.insert(tablet)
119127

120128
session.execute_non_query_statement("FLush")
121129

@@ -134,16 +142,16 @@
134142
np_timestamps,
135143
column_types=column_types,
136144
)
137-
session.insert_relational_tablet(np_tablet)
145+
session.insert(np_tablet)
138146

139147
with session.execute_query_statement("select * from table5 order by time") as dataset:
140148
print(dataset.get_column_names())
141149
while dataset.has_next():
142150
row_record = dataset.next()
143-
# print(row_record.get_fields()[0].get_long_value())
144-
# print(row_record.get_fields()[1].get_string_value())
145-
# print(row_record.get_fields()[2].get_string_value())
146-
# print(row_record.get_fields()[3].get_double_value())
151+
print(row_record.get_fields()[0].get_long_value())
152+
print(row_record.get_fields()[1].get_string_value())
153+
print(row_record.get_fields()[2].get_string_value())
154+
print(row_record.get_fields()[3].get_double_value())
147155
print(row_record)
148156

149157
with session.execute_query_statement("select * from table5 order by time") as dataset:

iotdb-client/client-py/TableModelSessionPoolExample.py renamed to iotdb-client/client-py/table_model_session_pool_example.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import numpy as np
2121

22-
from iotdb.SessionPool import PoolConfig, SessionPool
22+
from iotdb.table_session_pool import TableSessionPool
2323
from iotdb.utils.IoTDBConstants import TSDataType
2424
from iotdb.utils.NumpyTablet import NumpyTablet
2525
from iotdb.utils.Tablet import ColumnType, Tablet
@@ -48,7 +48,7 @@ def prepare_data():
4848
while res.has_next():
4949
print(res.next())
5050

51-
session_pool.put_back(session)
51+
session.close()
5252

5353

5454
def insert_data(num: int):
@@ -74,7 +74,7 @@ def insert_data(num: int):
7474
tablet = Tablet(
7575
"table" + str(num), column_names, data_types, values, timestamps, column_types
7676
)
77-
session.insert_relational_tablet(tablet)
77+
session.insert(tablet)
7878
session.execute_non_query_statement("FLush")
7979

8080
np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
@@ -92,8 +92,8 @@ def insert_data(num: int):
9292
np_timestamps,
9393
column_types=column_types,
9494
)
95-
session.insert_relational_tablet(np_tablet)
96-
session_pool.put_back(session)
95+
session.insert(np_tablet)
96+
session.close()
9797

9898

9999
def query_data():
@@ -110,38 +110,35 @@ def query_data():
110110
while res.has_next():
111111
print(res.next())
112112

113-
session_pool.put_back(session)
113+
session.close()
114114

115115

116116
def delete_data():
117117
session = session_pool.get_session()
118118
session.execute_non_query_statement("drop database db1")
119119
print("data has been deleted. now the databases are:")
120-
res = session.execute_statement("show databases")
120+
res = session.execute_query_statement("show databases")
121121
while res.has_next():
122122
print(res.next())
123-
session_pool.put_back(session)
123+
session.close()
124124

125125

126126
ip = "127.0.0.1"
127127
port = "6667"
128128
username = "root"
129129
password = "root"
130-
pool_config = PoolConfig(
131-
node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
132-
user_name=username,
133-
password=password,
134-
fetch_size=1024,
135-
time_zone="UTC+8",
136-
max_retry=3,
137-
sql_dialect="table",
138-
database="db1",
139-
)
130+
config = {
131+
"node_urls": ["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
132+
"username": username,
133+
"password": password,
134+
"fetch_size": 1024,
135+
"database": "db1",
136+
}
140137
max_pool_size = 5
141138
wait_timeout_in_ms = 3000
142139

143140
# Create a session pool
144-
session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
141+
session_pool = TableSessionPool(**config)
145142

146143
prepare_data()
147144

iotdb-client/client-py/iotdb/dbapi/tests/__init__.py renamed to iotdb-client/client-py/tests/integration/dbapi/__init__.py

File renamed without changes.

iotdb-client/client-py/iotdb/dbapi/tests/test_connection.py renamed to iotdb-client/client-py/tests/integration/dbapi/test_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
#
1818

19-
from iotdb.IoTDBContainer import IoTDBContainer
19+
from tests.integration.iotdb_container import IoTDBContainer
2020
from iotdb.dbapi import connect
2121

2222
final_flag = True

iotdb-client/client-py/iotdb/dbapi/tests/test_cursor.py renamed to iotdb-client/client-py/tests/integration/dbapi/test_cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717
#
1818

19-
from iotdb.IoTDBContainer import IoTDBContainer
19+
from tests.integration.iotdb_container import IoTDBContainer
2020
from iotdb.dbapi import connect
2121
from iotdb.dbapi.Cursor import Cursor
2222

File renamed without changes.

iotdb-client/client-py/iotdb/sqlalchemy/tests/__init__.py renamed to iotdb-client/client-py/tests/integration/sqlalchemy/__init__.py

File renamed without changes.

iotdb-client/client-py/iotdb/sqlalchemy/tests/test_dialect.py renamed to iotdb-client/client-py/tests/integration/sqlalchemy/test_dialect.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from sqlalchemy import create_engine, inspect
2222
from sqlalchemy.dialects import registry
2323

24-
from iotdb.IoTDBContainer import IoTDBContainer
24+
from tests.integration.iotdb_container import IoTDBContainer
2525

2626
final_flag = True
2727
failed_count = 0

0 commit comments

Comments
 (0)