Skip to content

Commit 559fbe9

Browse files
committed
Merge branch 'master' into obkv_params2
2 parents 8f24dba + 93181fb commit 559fbe9

File tree

17 files changed

+677
-369
lines changed

17 files changed

+677
-369
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2023 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.oceanbase.example;
19+
20+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadBucket;
21+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadConnection;
22+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadManager;
23+
import com.alipay.oceanbase.rpc.direct_load.ObDirectLoadStatement;
24+
import com.alipay.oceanbase.rpc.direct_load.exception.ObDirectLoadException;
25+
import com.alipay.oceanbase.rpc.direct_load.execution.ObDirectLoadStatementExecutionId;
26+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObLoadDupActionType;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
28+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
29+
30+
import java.sql.Connection;
31+
import java.sql.DriverManager;
32+
import java.sql.ResultSet;
33+
import java.sql.Statement;
34+
35+
public class ObDirectLoadDemo {
36+
37+
private static String host = "0.0.0.0";
38+
private static int port = 0;
39+
private static int sqlPort = 0;
40+
41+
private static String tenantName = "mysql";
42+
private static String userName = "root";
43+
private static String password = "";
44+
private static String dbName = "test";
45+
private static String tableName = "test1";
46+
47+
// parameters of direct load
48+
private static int parallel = 2; // Determines the number of server worker threads
49+
private static ObLoadDupActionType dupAction = ObLoadDupActionType.REPLACE;
50+
private static long timeout = 1000L * 1000 * 1000; // The overall timeout of the direct load task
51+
52+
public static void main(String[] args) {
53+
SimpleTest.run();
54+
ParallelWriteTest.run();
55+
MultiNodeWriteTest.run();
56+
}
57+
58+
private static void prepareTestTable() throws Exception {
59+
String url = String
60+
.format(
61+
"jdbc:mysql://%s/%s?useUnicode=true&characterEncoding=utf-8&connectTimeout=%d&socketTimeout=%d",
62+
host + ":" + sqlPort, dbName, 10000, 10000);
63+
String user = String.format("%s@%s", userName, tenantName);
64+
Class.forName("com.mysql.cj.jdbc.Driver");
65+
Connection connection = DriverManager.getConnection(url, user, password);
66+
Statement statement = connection.createStatement();
67+
String dropSql = "drop table " + tableName + ";";
68+
String tableDefinition = "create table " + tableName + " (c1 int, c2 varchar(255))";
69+
try {
70+
statement.execute(dropSql);
71+
} catch (Exception e) {
72+
// ignore drop error
73+
}
74+
statement.execute(tableDefinition);
75+
statement.close();
76+
connection.close();
77+
}
78+
79+
private static void queryTestTable(int expectedRowCount) throws Exception {
80+
String url = String
81+
.format(
82+
"jdbc:mysql://%s/%s?useUnicode=true&characterEncoding=utf-8&connectTimeout=%d&socketTimeout=%d",
83+
host + ":" + sqlPort, dbName, 10000, 10000);
84+
String user = String.format("%s@%s", userName, tenantName);
85+
Class.forName("com.mysql.cj.jdbc.Driver");
86+
Connection connection = DriverManager.getConnection(url, user, password);
87+
Statement statement = connection.createStatement();
88+
String querySql = "select count(*) from " + tableName + ";";
89+
ResultSet resultSet = statement.executeQuery(querySql);
90+
while (resultSet.next()) {
91+
int count = resultSet.getInt(1);
92+
if (count != expectedRowCount) {
93+
throw new RuntimeException("unexpected row count:" + count + ", expected:"
94+
+ expectedRowCount);
95+
}
96+
}
97+
statement.close();
98+
connection.close();
99+
}
100+
101+
private static ObDirectLoadConnection buildConnection(int writeThreadNum)
102+
throws ObDirectLoadException {
103+
return ObDirectLoadManager.getConnectionBuilder().setServerInfo(host, port)
104+
.setLoginInfo(tenantName, userName, password, dbName)
105+
.enableParallelWrite(writeThreadNum).build();
106+
}
107+
108+
private static ObDirectLoadStatement buildStatement(ObDirectLoadConnection connection)
109+
throws ObDirectLoadException {
110+
return connection.getStatementBuilder().setTableName(tableName).setDupAction(dupAction)
111+
.setParallel(parallel).setQueryTimeout(timeout).build();
112+
}
113+
114+
private static class SimpleTest {
115+
116+
public static void run() {
117+
System.out.println("SimpleTest start");
118+
ObDirectLoadConnection connection = null;
119+
ObDirectLoadStatement statement = null;
120+
try {
121+
prepareTestTable();
122+
123+
connection = buildConnection(1);
124+
statement = buildStatement(connection);
125+
126+
statement.begin();
127+
128+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
129+
ObObj[] rowObjs = new ObObj[2];
130+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 1);
131+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), 2);
132+
bucket.addRow(rowObjs);
133+
statement.write(bucket);
134+
135+
statement.commit();
136+
137+
queryTestTable(1);
138+
} catch (Exception e) {
139+
throw new RuntimeException(e);
140+
} finally {
141+
if (null != statement) {
142+
statement.close();
143+
}
144+
if (null != connection) {
145+
connection.close();
146+
}
147+
}
148+
System.out.println("SimpleTest successful");
149+
}
150+
151+
};
152+
153+
private static class ParallelWriteTest {
154+
155+
private static class ParallelWriter implements Runnable {
156+
157+
private final ObDirectLoadStatement statement;
158+
private final int id;
159+
160+
ParallelWriter(ObDirectLoadStatement statement, int id) {
161+
this.statement = statement;
162+
this.id = id;
163+
}
164+
165+
@Override
166+
public void run() {
167+
try {
168+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
169+
ObObj[] rowObjs = new ObObj[2];
170+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
171+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
172+
bucket.addRow(rowObjs);
173+
statement.write(bucket);
174+
} catch (Exception e) {
175+
throw new RuntimeException(e);
176+
}
177+
}
178+
179+
};
180+
181+
public static void run() {
182+
System.out.println("ParallelWriteTest start");
183+
ObDirectLoadConnection connection = null;
184+
ObDirectLoadStatement statement = null;
185+
try {
186+
prepareTestTable();
187+
188+
connection = buildConnection(parallel);
189+
statement = buildStatement(connection);
190+
191+
statement.begin();
192+
193+
Thread[] threads = new Thread[parallel];
194+
for (int i = 0; i < threads.length; ++i) {
195+
ParallelWriter parallelWriter = new ParallelWriter(statement, i);
196+
Thread thread = new Thread(parallelWriter);
197+
thread.start();
198+
threads[i] = thread;
199+
}
200+
for (int i = 0; i < threads.length; ++i) {
201+
threads[i].join();
202+
}
203+
204+
statement.commit();
205+
206+
queryTestTable(2);
207+
} catch (Exception e) {
208+
throw new RuntimeException(e);
209+
} finally {
210+
if (null != statement) {
211+
statement.close();
212+
}
213+
if (null != connection) {
214+
connection.close();
215+
}
216+
}
217+
System.out.println("ParallelWriteTest successful");
218+
}
219+
220+
};
221+
222+
private static class MultiNodeWriteTest {
223+
224+
private static class MultiNodeWriter implements Runnable {
225+
226+
private final byte[] executionIdBytes;
227+
private final int id;
228+
229+
MultiNodeWriter(byte[] executionIdBytes, int id) {
230+
this.executionIdBytes = executionIdBytes;
231+
this.id = id;
232+
}
233+
234+
@Override
235+
public void run() {
236+
ObDirectLoadConnection connection = null;
237+
ObDirectLoadStatement statement = null;
238+
try {
239+
ObDirectLoadStatementExecutionId executionId = new ObDirectLoadStatementExecutionId();
240+
executionId.decode(executionIdBytes);
241+
242+
connection = buildConnection(1);
243+
statement = buildStatement(connection);
244+
245+
statement.resume(executionId);
246+
247+
ObDirectLoadBucket bucket = new ObDirectLoadBucket();
248+
ObObj[] rowObjs = new ObObj[2];
249+
rowObjs[0] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
250+
rowObjs[1] = new ObObj(ObObjType.ObInt32Type.getDefaultObjMeta(), id);
251+
bucket.addRow(rowObjs);
252+
statement.write(bucket);
253+
} catch (Exception e) {
254+
throw new RuntimeException(e);
255+
} finally {
256+
if (null != statement) {
257+
statement.close();
258+
}
259+
if (null != connection) {
260+
connection.close();
261+
}
262+
}
263+
}
264+
265+
};
266+
267+
public static void run() {
268+
System.out.println("MultiNodeWriteTest start");
269+
final int nodeNum = 10;
270+
ObDirectLoadConnection connection = null;
271+
ObDirectLoadStatement statement = null;
272+
try {
273+
prepareTestTable();
274+
275+
connection = buildConnection(1);
276+
statement = buildStatement(connection);
277+
278+
statement.begin();
279+
280+
ObDirectLoadStatementExecutionId executionId = statement.getExecutionId();
281+
byte[] executionIdBytes = executionId.encode();
282+
283+
Thread[] threads = new Thread[nodeNum];
284+
for (int i = 0; i < threads.length; ++i) {
285+
MultiNodeWriter multiNodeWriter = new MultiNodeWriter(executionIdBytes, i);
286+
Thread thread = new Thread(multiNodeWriter);
287+
thread.start();
288+
threads[i] = thread;
289+
}
290+
for (int i = 0; i < threads.length; ++i) {
291+
threads[i].join();
292+
}
293+
294+
statement.commit();
295+
296+
queryTestTable(nodeNum);
297+
} catch (Exception e) {
298+
throw new RuntimeException(e);
299+
} finally {
300+
if (null != statement) {
301+
statement.close();
302+
}
303+
if (null != connection) {
304+
connection.close();
305+
}
306+
}
307+
System.out.println("MultiNodeWriteTest successful");
308+
}
309+
310+
};
311+
312+
}

0 commit comments

Comments
 (0)