Skip to content

Commit 649c29e

Browse files
authored
Merge pull request #344 from youngsofun/fix
feat: support streaming load
2 parents 21af7da + 855303f commit 649c29e

File tree

62 files changed

+1346
-1354
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+1346
-1354
lines changed

.github/actions/setup_databend_cluster/action.yml

Lines changed: 0 additions & 55 deletions
This file was deleted.

.github/workflows/cron.integration.yml

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,8 @@ concurrency:
1717
jobs:
1818
test:
1919
runs-on: ubuntu-latest
20-
services:
21-
databend:
22-
image: datafuselabs/databend:nightly
23-
env:
24-
QUERY_DEFAULT_USER: databend
25-
QUERY_DEFAULT_PASSWORD: databend
26-
MINIO_ENABLED: true
27-
ports:
28-
- 8000:8000
29-
- 9000:9000
3020
steps:
31-
- name: Checkout repository
32-
uses: actions/checkout@v4
33-
with:
34-
ref: ${{ github.ref }}
21+
- uses: actions/checkout@v4
3522

3623
- name: Set up JDK 17
3724
uses: actions/setup-java@v4
@@ -42,17 +29,16 @@ jobs:
4229
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} # Value of the GPG private key to import
4330
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase
4431

45-
- name: Verify Service Running
46-
run: |
47-
sleep 30
48-
cid=$(docker ps -a | grep databend | cut -d' ' -f1)
49-
docker logs ${cid}
50-
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'
32+
- name: Start Cluster With Nginx and Minio
33+
working-directory: tests
34+
run: make up
5135

52-
- name: Run Maven clean deploy with release profile
53-
run: mvn test -DexcludedGroups=cluster,FLAKY
36+
- name: Test with conn to nginx
37+
run: mvn test -DexcludedGroups=FLAKY
5438
env:
5539
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
40+
DATABEND_TEST_CONN_PORT: 8000
41+
5642
notify:
5743
if: failure()
5844
needs: [ test ]

.github/workflows/test_cluster.yml

Lines changed: 14 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
1-
name: Databend Cluster Tests
1+
name: Cluster Tests
22

33
on:
44
push:
55
branches:
66
- main
7-
- master
87
pull_request:
98
branches:
109
- main
11-
- master
1210

1311
jobs:
1412
test:
1513
runs-on: ubuntu-latest
14+
strategy:
15+
fail-fast: false
16+
matrix:
17+
version:
18+
- "nightly"
19+
- "v1.2.790-nightly"
1620
steps:
17-
- name: Checkout repository
18-
uses: actions/checkout@v4
19-
with:
20-
ref: ${{ github.ref }}
21+
- uses: actions/checkout@v4
2122

2223
- name: Set up JDK 17
2324
uses: actions/setup-java@v4
@@ -28,33 +29,15 @@ jobs:
2829
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} # Value of the GPG private key to import
2930
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase
3031

31-
- uses: ./.github/actions/setup_databend_cluster
32-
timeout-minutes: 15
33-
with:
34-
version: '1.2.754-nightly'
35-
target: 'x86_64-unknown-linux-gnu'
36-
37-
- name: Test with conn to node 1
38-
run: mvn test -DexcludedGroups=FLAKY
39-
env:
40-
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
41-
42-
- name: View Nginx logs
43-
run: docker logs nginx-lb
44-
45-
- name: check nginx
46-
run: |
47-
curl -u 'databend:databend' -X POST "http://localhost:8010/v1/query" \
48-
-H 'Content-Type: application/json' \
49-
-d '{"sql": "select 1", "pagination": { "wait_time_secs": 5 }}' || true
32+
- name: Start Cluster With Nginx and Minio
33+
working-directory: tests
34+
run: make up
5035
env:
51-
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
52-
53-
- name: View Nginx logs
54-
run: docker logs nginx-lb
36+
DATABEND_QUERY_VERSION: ${{ matrix.version }}
5537

5638
- name: Test with conn to nginx
5739
run: mvn test -DexcludedGroups=FLAKY
5840
env:
5941
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
60-
DATABEND_TEST_CONN_PORT: 8010
42+
DATABEND_TEST_CONN_PORT: 8000
43+
DATABEND_QUERY_VERSION: ${{ matrix.version }}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
name: Compatibility Tests
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
pull_request:
8+
branches:
9+
- main
10+
11+
jobs:
12+
test:
13+
runs-on: ubuntu-latest
14+
strategy:
15+
fail-fast: false
16+
matrix:
17+
server:
18+
- "nightly"
19+
- "v1.2.790-nightly"
20+
driver:
21+
- "0.4.0"
22+
- "0.3.9"
23+
steps:
24+
- uses: actions/checkout@v4
25+
26+
- name: Set up JDK 17
27+
uses: actions/setup-java@v4
28+
with:
29+
distribution: 'temurin'
30+
java-version: '17'
31+
cache: 'maven'
32+
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} # Value of the GPG private key to import
33+
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase
34+
35+
- name: Start Cluster With Nginx and Minio
36+
working-directory: tests
37+
run: make up
38+
env:
39+
DATABEND_QUERY_VERSION: ${{ matrix.server }}
40+
41+
- name: Test with conn to nginx
42+
run: mvn clean package -DskipTests
43+
env:
44+
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
45+
46+
- name: Test with conn to nginx
47+
working-directory: tests/compatibility
48+
run: sh test_compatibility.sh
49+
env:
50+
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
51+
DATABEND_TEST_CONN_PORT: 8000
52+
DATABEND_QUERY_VERSION: ${{ matrix.server }}
53+
DATABEND_JDBC_VERSION: ${{ matrix.driver }}
54+
TEST_SIDE: "driver"
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Tests
1+
name: Standalone Test
22

33
on:
44
push:
@@ -51,6 +51,6 @@ jobs:
5151
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'
5252
5353
- name: Run Maven clean deploy with release profile
54-
run: mvn test -DexcludedGroups=CLUSTER,FLAKY
54+
run: mvn test -DexcludedGroups=MULTI_HOST,FLAKY
5555
env:
5656
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ databend-jdbc/databend-jdbc-debug.log
66
target/
77
/databend/
88
.databend/
9+
tests/data
10+
tests/compatibility/*.jar
11+
test-output

README.md

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,28 @@ import java.sql.ResultSet;
5050

5151
public class Main {
5252
public static void main(String[] args) throws SQLException {
53-
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
54-
Statement statement = conn.createStatement();
55-
statement.execute("SELECT number from numbers(200000) order by number");
56-
ResultSet r = statement.getResultSet();
57-
// ** We must call `rs.next()` otherwise the query may be canceled **
58-
while (rs.next()) {
59-
System.out.println(r.getInt(1));
53+
try ( Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
54+
Statement statement = conn.createStatement()
55+
) {
56+
statement.execute("SELECT number from numbers(200000) order by number");
57+
try(ResultSet r = statement.getResultSet()){
58+
// ** We must call `rs.next()` otherwise the query may be canceled **
59+
while (rs.next()) {
60+
System.out.println(r.getInt(1));
61+
}
62+
}
6063
}
61-
conn.close();
6264
}
6365
}
6466
```
6567

6668
### Important Notes
6769

68-
1. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
70+
1. Close Connection/Statement/ResultSet to release resources faster.
71+
2. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
6972
call `rs.next()` before accessing the data. Otherwise, the query may be canceled. If you do not want get the result,
7073
you can call `while(r.next(){})` to iterate over the result set.
71-
2. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.
74+
3. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.
7275

7376
## JDBC Java type mapping
7477
The Databend type is mapped to Java type as follows:
@@ -99,3 +102,84 @@ For detailed references, please take a look at the following Links:
99102

100103
1. [Connection Parameters](./docs/Connection.md) : detailed documentation about how to use connection parameters in a
101104
jdbc connection
105+
106+
107+
# FileTransfer API
108+
109+
The `FileTransferAPI` interface provides a high-performance, Java-based mechanism for streaming data directly between your application and Databend's internal stage, eliminating the need for intermediate local files. It is designed for efficient bulk data operations.
110+
111+
## Key Features
112+
113+
* **Streaming Upload/Download:** Directly transfer data using `InputStream`, supporting large files without excessive memory consumption
114+
* **Direct Table Loading:** Ingest data from streams or staged files directly into Databend tables using the `COPY INTO` command
115+
* **Compression:** Supports on-the-fly compression and decompression during transfer to optimize network traffic
116+
* **Flexible Data Ingestion:** Offers both stage-based and streaming-based methods for loading data into tables
117+
118+
## Core Methods
119+
120+
### `uploadStream`
121+
Uploads a data stream as a single file to the specified internal stage.
122+
123+
**Parameters:**
124+
- `stageName`: The stage which will receive the uploaded file
125+
- `destPrefix`: The prefix of the file name in the stage
126+
- `inputStream`: The input stream of the file data
127+
- `destFileName`: The destination file name in the stage
128+
- `fileSize`: The size of the file being uploaded
129+
- `compressData`: Whether to compress the data during transfer
130+
131+
### `downloadStream`
132+
Downloads a file from the internal stage and returns it as an `InputStream`.
133+
134+
**Parameters:**
135+
- `stageName`: The stage which contains the file to download
136+
- `sourceFileName`: The name of the file in the stage
137+
- `decompress`: Whether to decompress the data during download
138+
139+
**Returns:** `InputStream` of the downloaded file content
140+
141+
142+
### `loadStreamToTable`
143+
A versatile method to load data from a stream directly into a table, using either a staging or streaming approach.
144+
145+
Available with databend-jdbc >= 0.4 AND databend-query >= 1.2.791.
146+
147+
**Parameters:**
148+
- `sql`: SQL statement with specific syntax for data loading
149+
- `inputStream`: The input stream of the file data to load
150+
- `fileSize`: The size of the file being loaded
151+
- `loadMethod`: The loading method - "stage" or "streaming". `stage` method first upload file to a special path in user stage, while `steaming` method load data to while transforming data.
152+
153+
**Returns:** Number of rows successfully loaded
154+
155+
## Quick Start
156+
157+
The following example demonstrates how to upload data and load it into a table:
158+
159+
```java
160+
// 1. Upload a file to the internal stage
161+
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000");
162+
FileTransferAPI api = conn.unwrap(DatabendConnection.class);
163+
164+
FileInputStream fileStream = new FileInputStream("data.csv");
165+
api.uploadStream(
166+
"my_stage",
167+
"uploads/",
168+
fileStream,
169+
"data.csv",
170+
Files.size(Paths.get("data.csv")),
171+
true // Compress the data during upload
172+
);
173+
fileStream.close();
174+
175+
// 2. Load the staged file into a table
176+
FileInputStream fileStream = new FileInputStream("data.csv");
177+
String sql = "insert into my_table from @_databend_load file_format=(type=csv)"; // use special stage `_databend_load
178+
api.loadStreamToTable(sql, file_stream, Files.size(Paths.get("data.csv")), "stage");
179+
fileStream.close();
180+
conn.close())
181+
182+
183+
```
184+
185+
> **Important:** Callers are responsible for properly closing the provided `InputStream` objects after operations are complete.

0 commit comments

Comments
 (0)