Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/engines/table-engines/integrations/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ ENGINE = JDBC(datasource_uri, external_database, external_table)

- `external_table` — Name of the table in `external_database` or a select query like `select * from table1 where column1=1`.

- These parameters can also be passed using [named collections](operations/named-collections.md).

## Usage Example {#usage-example}

Creating a table in MySQL server by connecting directly with it’s console client:
Expand Down
2 changes: 2 additions & 0 deletions docs/en/engines/table-engines/integrations/odbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ The table structure can differ from the source table structure:
- `external_database` — Name of a database in an external DBMS.
- `external_table` — Name of a table in the `external_database`.

These parameters can also be passed using [named collections](operations/named-collections.md).

## Usage Example {#usage-example}

**Retrieving data from the local MySQL installation via ODBC**
Expand Down
10 changes: 9 additions & 1 deletion docs/en/sql-reference/table-functions/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@ clickhouse-jdbc-bridge contains experimental codes and is no longer supported. I
ClickHouse recommend using built-in table functions in ClickHouse which provide a better alternative for ad-hoc querying scenarios (Postgres, MySQL, MongoDB, etc).
:::

`jdbc(datasource, schema, table)` - returns table that is connected via JDBC driver.
JDBC table function returns table that is connected via JDBC driver.

This table function requires separate [clickhouse-jdbc-bridge](https://github.com/ClickHouse/clickhouse-jdbc-bridge) program to be running.
It supports Nullable types (based on DDL of remote table that is queried).

## Syntax {#syntax}

```sql
jdbc(datasource, schema, table)
jdbc(datasource, table)
jdbc(named_collection)
```

**Examples**

``` sql
Expand Down
4 changes: 4 additions & 0 deletions docs/en/sql-reference/table-functions/odbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Returns table that is connected via [ODBC](https://en.wikipedia.org/wiki/Open_Da

``` sql
odbc(connection_settings, external_database, external_table)
odbc(connection_settings, external_table)
odbc(named_collection)
```

Parameters:
Expand All @@ -18,6 +20,8 @@ Parameters:
- `external_database` — Name of a database in an external DBMS.
- `external_table` — Name of a table in the `external_database`.

These parameters can also be passed using [named collections](operations/named-collections.md).

To safely implement ODBC connections, ClickHouse uses a separate program `clickhouse-odbc-bridge`. If the ODBC driver is loaded directly from `clickhouse-server`, driver problems can crash the ClickHouse server. ClickHouse automatically starts `clickhouse-odbc-bridge` when it is required. The ODBC bridge program is installed from the same package as the `clickhouse-server`.

The fields with the `NULL` values from the external table are converted into the default values for the base data type. For example, if a remote MySQL table field has the `INT NULL` type it is converted to 0 (the default value for ClickHouse `Int32` data type).
Expand Down
56 changes: 45 additions & 11 deletions src/Storages/StorageXDBC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Storages/StorageURL.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/NamedCollectionsHelpers.h>

#include <Formats/FormatFactory.h>
#include <IO/ConnectionTimeouts.h>
Expand Down Expand Up @@ -171,21 +172,54 @@ namespace
{
ASTs & engine_args = args.engine_args;

if (engine_args.size() != 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage {} requires exactly 3 parameters: {}('DSN', database or schema, table)", name, name);

for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.getLocalContext());

BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<BridgeHelperMixin>>(args.getContext(),
String connection_string;
String database_or_schema;
String table;

if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, args.getLocalContext()))
{
if (name == "JDBC")
{
validateNamedCollection<>(*named_collection, {"datasource", "schema", "table"}, {});
connection_string = named_collection->get<String>("datasource");
database_or_schema = named_collection->get<String>("schema");
table = named_collection->get<String>("table");
}
else
{
validateNamedCollection<>(*named_collection, {"connection_settings", "external_database", "external_table"}, {});
connection_string = named_collection->get<String>("connection_settings");
database_or_schema = named_collection->get<String>("external_database");
table = named_collection->get<String>("external_table");
}
}
else
{
if (engine_args.size() != 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage {} requires exactly 3 parameters: {}('DSN', database or schema, table)",
name,
name);

for (size_t i = 0; i < 3; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.getLocalContext());

connection_string = checkAndGetLiteralArgument<String>(engine_args[0], "connection_string");
database_or_schema = checkAndGetLiteralArgument<String>(engine_args[1], "database_name");
table = checkAndGetLiteralArgument<String>(engine_args[2], "table_name");
}

BridgeHelperPtr bridge_helper = std::make_shared<XDBCBridgeHelper<BridgeHelperMixin>>(
args.getContext(),
args.getContext()->getSettingsRef().http_receive_timeout.value,
checkAndGetLiteralArgument<String>(engine_args[0], "connection_string"),
connection_string,
args.getContext()->getSettingsRef().odbc_bridge_use_connection_pooling.value);

return std::make_shared<StorageXDBC>(
args.table_id,
checkAndGetLiteralArgument<String>(engine_args[1], "database_name"),
checkAndGetLiteralArgument<String>(engine_args[2], "table_name"),
database_or_schema,
table,
args.columns,
args.constraints,
args.comment,
Expand Down
57 changes: 45 additions & 12 deletions src/TableFunctions/ITableFunctionXDBC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/parseQuery.h>
#include <Storages/StorageXDBC.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Poco/Net/HTTPRequest.h>
Expand All @@ -26,6 +27,7 @@ namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}

namespace
Expand Down Expand Up @@ -109,23 +111,54 @@ void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, ContextPtr
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table function '{}' must have arguments.", getName());

ASTs & args = args_func.arguments->children;
if (args.size() != 2 && args.size() != 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Table function '{0}' requires 2 or 3 arguments: {0}('DSN', table) or {0}('DSN', schema, table)", getName());

for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
if (args.empty() || args.size() > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Table function '{0}' requires 1, 2 or 3 arguments: {0}(named_collection) or {0}('DSN', table) or {0}('DSN', schema, table)", getName());

if (args.size() == 3)
if (args.size() == 1)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
schema_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
if (auto named_collection = tryGetNamedCollectionWithOverrides(ast_function->children.at(0)->children, context))
{
if (getName() == "JDBC")
{
validateNamedCollection<>(*named_collection, {"datasource"}, {"schema", "table"});
connection_string = named_collection->get<String>("datasource");
schema_name = named_collection->getOrDefault<String>("schema", "");
remote_table_name = named_collection->getOrDefault<String>("table", "");
}
else
{
validateNamedCollection<>(*named_collection, {"connection_settings"}, {"external_database", "external_table"});

connection_string = named_collection->get<String>("connection_settings");
schema_name = named_collection->getOrDefault<String>("external_database", "");
remote_table_name = named_collection->getOrDefault<String>("external_table", "");

}
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Table function '{0}' has 1 argument, it is expected to be named collection", getName());
}
}
else if (args.size() == 2)
else
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);

if (args.size() == 3)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
schema_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
}
else if (args.size() == 2)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_odbc_interaction/configs/users.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
</networks>
<profile>default</profile>
<quota>default</quota>
<named_collection_control>1</named_collection_control>
<use_named_collections>1</use_named_collections>
</default>
</users>

Expand Down
71 changes: 71 additions & 0 deletions tests/integration/test_odbc_interaction/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ def started_cluster():
privileged=True,
user="root",
)
node1.exec_in_container(
[
"sqlite3",
sqlite_db,
"CREATE TABLE t5(id INTEGER PRIMARY KEY ASC, X INTEGER, Y, Z);",
],
privileged=True,
user="root",
)
node1.exec_in_container(
[
"sqlite3",
Expand Down Expand Up @@ -275,6 +284,37 @@ def test_mysql_simple_select_works(started_cluster):
conn.close()


def test_table_function_odbc_with_named_collection(started_cluster):
skip_test_msan(node1)

mysql_setup = node1.odbc_drivers["MySQL"]

table_name = "test_mysql_with_named_collection"
conn = get_mysql_conn()
create_mysql_table(conn, table_name)

# Check that NULL-values are handled correctly by the ODBC-bridge
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO clickhouse.{} VALUES(50, 'name1', 127, 255, 512), (100, 'name2', 127, 255, 511);".format(
table_name
)
)
conn.commit()

node1.query(f"""
DROP NAMED COLLECTION IF EXISTS odbc_collection;
CREATE NAMED COLLECTION odbc_collection AS
connection_settings = 'DSN={mysql_setup["DSN"]}',
external_table = '{table_name}';
""")
assert node1.query("SELECT name FROM odbc(odbc_collection)") == "name1\nname2\n"

node1.query(f"DROP TABLE IF EXISTS {table_name}")
drop_mysql_table(conn, table_name)
conn.close()


def test_mysql_insert(started_cluster):
skip_test_msan(node1)

Expand Down Expand Up @@ -447,6 +487,37 @@ def test_sqlite_simple_select_storage_works(started_cluster):
)


def test_table_engine_odbc_named_collection(started_cluster):
skip_test_msan(node1)

sqlite_setup = node1.odbc_drivers["SQLite3"]
sqlite_db = sqlite_setup["Database"]

node1.exec_in_container(
["sqlite3", sqlite_db, "INSERT INTO t5 values(1, 1, 2, 3);"],
privileged=True,
user="root",
)

node1.query(f"""
DROP NAMED COLLECTION IF EXISTS engine_odbc_collection;
CREATE NAMED COLLECTION engine_odbc_collection AS
connection_settings = 'DSN={sqlite_setup["DSN"]}',
external_database = '',
external_table = 't5';
""")
node1.query("CREATE TABLE SqliteODBCNamedCol (x Int32, y String, z String) ENGINE = ODBC(engine_odbc_collection)")

assert node1.query("SELECT * FROM SqliteODBCNamedCol") == "1\t2\t3\n"
node1.query("DROP TABLE IF EXISTS SqliteODBCNamedCol")

node1.exec_in_container(
["sqlite3", sqlite_db, "DELETE FROM t5;"],
privileged=True,
user="root",
)


def test_sqlite_odbc_hashed_dictionary(started_cluster):
skip_test_msan(node1)

Expand Down
Loading