Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions .github/actions/setup_databend_cluster/action.yml

This file was deleted.

30 changes: 8 additions & 22 deletions .github/workflows/cron.integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,8 @@ concurrency:
jobs:
test:
runs-on: ubuntu-latest
services:
databend:
image: datafuselabs/databend:nightly
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: true
ports:
- 8000:8000
- 9000:9000
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.ref }}
- uses: actions/checkout@v4

- name: Set up JDK 17
uses: actions/setup-java@v4
Expand All @@ -42,17 +29,16 @@ jobs:
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} # Value of the GPG private key to import
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase

- name: Verify Service Running
run: |
sleep 30
cid=$(docker ps -a | grep databend | cut -d' ' -f1)
docker logs ${cid}
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'
- name: Start Cluster With Nginx and Minio
working-directory: tests
run: make up

- name: Run Maven clean deploy with release profile
run: mvn test -DexcludedGroups=cluster,FLAKY
- name: Test with conn to nginx
run: mvn test -DexcludedGroups=FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
DATABEND_TEST_CONN_PORT: 8000

notify:
if: failure()
needs: [ test ]
Expand Down
38 changes: 6 additions & 32 deletions .github/workflows/test_cluster.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
name: Databend Cluster Tests
name: Cluster Tests

on:
push:
branches:
- main
- master
pull_request:
branches:
- main
- master

jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.ref }}
- uses: actions/checkout@v4

- name: Set up JDK 17
uses: actions/setup-java@v4
Expand All @@ -28,33 +23,12 @@ jobs:
gpg-private-key: ${{ secrets.GPG_PRIVATE_KEY }} # Value of the GPG private key to import
gpg-passphrase: MAVEN_GPG_PASSPHRASE # env variable for GPG private key passphrase

- uses: ./.github/actions/setup_databend_cluster
timeout-minutes: 15
with:
version: '1.2.754-nightly'
target: 'x86_64-unknown-linux-gnu'

- name: Test with conn to node 1
run: mvn test -DexcludedGroups=FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

- name: View Nginx logs
run: docker logs nginx-lb

- name: check nginx
run: |
curl -u 'databend:databend' -X POST "http://localhost:8010/v1/query" \
-H 'Content-Type: application/json' \
-d '{"sql": "select 1", "pagination": { "wait_time_secs": 5 }}' || true
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

- name: View Nginx logs
run: docker logs nginx-lb
- name: Start Cluster With Nginx and Minio
working-directory: tests
run: make up

- name: Test with conn to nginx
run: mvn test -DexcludedGroups=FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
DATABEND_TEST_CONN_PORT: 8010
DATABEND_TEST_CONN_PORT: 8000
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Tests
name: Standalone Test

on:
push:
Expand Down Expand Up @@ -51,6 +51,6 @@ jobs:
curl -u databend:databend --request POST localhost:8000/v1/query --header 'Content-Type:application/json' --data-raw '{"sql":"select 1"}'

- name: Run Maven clean deploy with release profile
run: mvn test -DexcludedGroups=CLUSTER,FLAKY
run: mvn test -DexcludedGroups=MULTI_HOST,FLAKY
env:
MAVEN_GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ databend-jdbc/databend-jdbc-debug.log
target/
/databend/
.databend/
tests/data
tests/compatibility/*.jar
test-output
104 changes: 94 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,28 @@ import java.sql.ResultSet;

public class Main {
public static void main(String[] args) throws SQLException {
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
Statement statement = conn.createStatement();
statement.execute("SELECT number from numbers(200000) order by number");
ResultSet r = statement.getResultSet();
// ** We must call `rs.next()` otherwise the query may be canceled **
while (rs.next()) {
System.out.println(r.getInt(1));
try ( Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000", "root", "");
Statement statement = conn.createStatement()
) {
statement.execute("SELECT number from numbers(200000) order by number");
try(ResultSet r = statement.getResultSet()){
// ** We must call `rs.next()` otherwise the query may be canceled **
while (rs.next()) {
System.out.println(r.getInt(1));
}
}
}
conn.close();
}
}
```

### Important Notes

1. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
1. Close Connection/Statement/ResultSet to release resources faster.
2. Because the `select`, `copy into`, `merge into` are query type SQL, they will return a `ResultSet` object, you must
call `rs.next()` before accessing the data. Otherwise, the query may be canceled. If you do not want get the result,
you can call `while(r.next(){})` to iterate over the result set.
2. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.
3. For other SQL such as `create/drop table` non-query type SQL, you can call `statement.execute()` directly.

## JDBC Java type mapping
The Databend type is mapped to Java type as follows:
Expand Down Expand Up @@ -99,3 +102,84 @@ For detailed references, please take a look at the following Links:

1. [Connection Parameters](./docs/Connection.md) : detailed documentation about how to use connection parameters in a
jdbc connection


# FileTransfer API

The `FileTransferAPI` interface provides a high-performance, Java-based mechanism for streaming data directly between your application and Databend's internal stage, eliminating the need for intermediate local files. It is designed for efficient bulk data operations.

## Key Features

* **Streaming Upload/Download:** Directly transfer data using `InputStream`, supporting large files without excessive memory consumption
* **Direct Table Loading:** Ingest data from streams or staged files directly into Databend tables using the `COPY INTO` command
* **Compression:** Supports on-the-fly compression and decompression during transfer to optimize network traffic
* **Flexible Data Ingestion:** Offers both stage-based and streaming-based methods for loading data into tables

## Core Methods

### `uploadStream`
Uploads a data stream as a single file to the specified internal stage.

**Parameters:**
- `stageName`: The stage which will receive the uploaded file
- `destPrefix`: The prefix of the file name in the stage
- `inputStream`: The input stream of the file data
- `destFileName`: The destination file name in the stage
- `fileSize`: The size of the file being uploaded
- `compressData`: Whether to compress the data during transfer

### `downloadStream`
Downloads a file from the internal stage and returns it as an `InputStream`.

**Parameters:**
- `stageName`: The stage which contains the file to download
- `sourceFileName`: The name of the file in the stage
- `decompress`: Whether to decompress the data during download

**Returns:** `InputStream` of the downloaded file content


### `loadStreamToTable`
A versatile method to load data from a stream directly into a table, using either a staging or streaming approach.

Available with databend-jdbc >= 0.4 AND databend-query >= 1.2.791.

**Parameters:**
- `sql`: SQL statement with specific syntax for data loading
- `inputStream`: The input stream of the file data to load
- `fileSize`: The size of the file being loaded
- `loadMethod`: The loading method - "stage" or "streaming". `stage` method first upload file to a special path in user stage, while `steaming` method load data to while transforming data.

**Returns:** Number of rows successfully loaded

## Quick Start

The following example demonstrates how to upload data and load it into a table:

```java
// 1. Upload a file to the internal stage
Connection conn = DriverManager.getConnection("jdbc:databend://localhost:8000");
FileTransferAPI api = conn.unwrap(DatabendConnection.class);

FileInputStream fileStream = new FileInputStream("data.csv");
api.uploadStream(
"my_stage",
"uploads/",
fileStream,
"data.csv",
Files.size(Paths.get("data.csv")),
true // Compress the data during upload
);
fileStream.close();

// 2. Load the staged file into a table
FileInputStream fileStream = new FileInputStream("data.csv");
String sql = "insert into my_table from @_databend_load file_format=(type=csv)"; // use special stage `_databend_load
api.loadStreamToTable(sql, file_stream, Files.size(Paths.get("data.csv")), "stage");
fileStream.close();
conn.close())


```

> **Important:** Callers are responsible for properly closing the provided `InputStream` objects after operations are complete.
17 changes: 1 addition & 16 deletions databend-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<project.build.targetJdk>8</project.build.targetJdk>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
Expand All @@ -53,14 +49,13 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.1-jre</version>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okio/okio -->

<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
Expand All @@ -76,18 +71,8 @@
<artifactId>okhttp-urlconnection</artifactId>
</dependency>

<dependency>
<groupId>com.github.zafarkhaja</groupId>
<artifactId>java-semver</artifactId>
</dependency>

<!-- for testing -->

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class ClientSettings {
public static final String X_DATABEND_STICKY_NODE = "X-DATABEND-STICKY-NODE";
public static final String DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE";
public static final String DatabendTenantHeader = "X-DATABEND-TENANT";
public static final String DatabendSQLHeader = "X-DATABEND-SQL";
public static final String DatabendQueryContextHeader = "X-DATABEND-QUERY-CONTEXT";
private final String host;
private final DatabendSession session;
private final Integer queryTimeoutSecs;
Expand All @@ -40,14 +42,14 @@ public class ClientSettings {
private final PaginationOptions paginationOptions;

private final StageAttachment stageAttachment;
private Map<String, String> additionalHeaders;
private final Map<String, String> additionalHeaders;

private final int retryAttempts;
// TODO(zhihanz) timezone and locale info

//ClientSettings for test case use
public ClientSettings(String host) {
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<String, String>(), null, DEFAULT_RETRY_ATTEMPTS);
this(host, DatabendSession.createDefault(), DEFAULT_QUERY_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, PaginationOptions.defaultPaginationOptions(), new HashMap<>(), null, DEFAULT_RETRY_ATTEMPTS);
}

public ClientSettings(String host, String database) {
Expand Down
Loading
Loading