Skip to content

Commit cf15933

Browse files
authored
Merge pull request #268202 from sreekzz/patch-3
New page-Sharded SQL connector
2 parents 5b77f27 + ccf9487 commit cf15933

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed

articles/hdinsight-aks/TOC.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ items:
127127
href: ./trino/trino-add-delta-lake-catalog.md
128128
- name: Configure Iceberg catalog
129129
href: ./trino/trino-add-iceberg-catalog.md
130+
- name: Configure sharded SQL connector
131+
href: ./trino/trino-sharded-sql-connector.md
130132
- name: Cluster management
131133
items:
132134
- name: Trino configuration

articles/hdinsight-aks/trino/trino-connectors.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ Trino in HDInsight on AKS enables seamless integration with data sources. You ca
4141
* [Thrift](https://trino.io/docs/410/connector/thrift.html)
4242
* [TPCDS](https://trino.io/docs/410/connector/tpcds.html)
4343
* [TPCH](https://trino.io/docs/410/connector/tpch.html)
44+
* [Sharded SQL server](trino-sharded-sql-connector.md)
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
---
2+
title: Sharded SQL connector
3+
description: How to configure and use sharded sql connector.
4+
ms.service: hdinsight-aks
5+
ms.topic: how-to
6+
ms.date: 02/06/2024
7+
---
8+
9+
# Sharded SQL connector
10+
11+
[!INCLUDE [feature-in-preview](../includes/feature-in-preview.md)]
12+
13+
The sharded SQL connector allows queries to be executed over data distributed across any number of SQL servers.
14+
15+
## Prerequisites
16+
17+
To connect to sharded SQL servers, you need:
18+
19+
- SQL Server 2012 or higher, or Azure SQL Database.
20+
- Network access from the Trino coordinator and workers to SQL Server. Port 1433 is the default port.
21+
22+
### General configuration
23+
24+
The connector can query multiple SQL servers as a single data source. Create a catalog properties file and use `connector.name=sharded-sql` to use sharded SQL connector.
25+
26+
Configuration example:
27+
28+
```
29+
connector.name=sharded_sqlserver
30+
connection-user=<user-name>
31+
connection-password=<user-password>
32+
sharded-cluster=true
33+
shard-config-location=<path-to-sharding-schema>
34+
```
35+
36+
37+
|Property|Description|
38+
|--------|-----------|
39+
|connector.name| Name of the connector For sharded SQL, which should be `sharded_sqlserver`|
40+
|connection-user| User name in SQL server|
41+
|connection-password| Password for the user in SQL server|
42+
|sharded-cluster| Required to be set to `TRUE` for sharded-sql connector|
43+
|shard-config-location| location of the config defining sharding schema|
44+
45+
## Data source authentication
46+
47+
The connector uses user-password authentication to query SQL servers. The same user specified in the configuration is expected to authenticate against all the SQL servers.
48+
49+
## Schema definition
50+
51+
Connector assumes a 2D partition/bucketed layout of the physical data across SQL servers. Schema definition describes this layout.
52+
Currently, only file based sharding schema definition is supported.
53+
54+
You can specify the location of the sharding schema json in the catalog properties like `shard-config-location=etc/shard-schema.json`.
55+
Configure sharding schema json with desired properties to specify the layout.
56+
57+
The following JSON file describes the configuration for a Trino sharded SQL connector. Here's a breakdown of its structure:
58+
59+
- **tables**: An array of objects, each representing a table in the database. Each table object contains:
60+
- **schema**: The schema name of the table, which corresponds to the database in the SQL server.
61+
- **name**: The name of the table.
62+
- **sharding_schema**: The name of the sharding schema associated with the table, which acts as a reference to the `sharding_schema` described in the next steps.
63+
64+
- **sharding_schema**: An array of objects, each representing a sharding schema. Each sharding schema object contains:
65+
- **name**: The name of the sharding schema.
66+
- **partitioned_by**: An array containing one or more columns by which the sharding schema is partitioned.
67+
- **bucket_count(optional)**: An integer representing the total number of buckets the table is distributed, which defaults to 1.
68+
- **bucketed_by(optional)**: An array containing one or more columns by which the data is bucketed, note the partitioning and bucketing are hierarchical, which means each partition is bucketed.
69+
- **partition_map**: An array of objects, each representing a partition within the sharding schema. Each partition object contains:
70+
- **partition**: The partition value specified in the form `partition-key=partitionvalue`
71+
- **shards**: An array of objects, each representing a shard within the partition, each element of the array represents a replica, trino queries any one of them at random to fetch data for a partition/buckets. Each shard object contains:
72+
- **connectionUrl**: The JDBC connection URL to the shard's database.
73+
74+
For example, if two tables `lineitem` and `part` that you want to query using this connector, you can specify them as follows.
75+
76+
```json
77+
"tables": [
78+
{
79+
"schema": "dbo",
80+
"name": "lineitem",
81+
"sharding_schema": "schema1"
82+
},
83+
{
84+
"schema": "dbo",
85+
"name": "part",
86+
"sharding_schema": "schema2"
87+
}
88+
]
89+
90+
```
91+
92+
> [!NOTE]
93+
> Connector expects all the tables to be present in the SQL server defined in the schema for a table, if that's not the case, queries for that table will fail.
94+
95+
In the previous example, you can specify the layout of table `lineitem` as:
96+
97+
```json
98+
"sharding_schema": [
99+
{
100+
"name": "schema1",
101+
"partitioned_by": [
102+
"shipmode"
103+
],
104+
"bucketed_by": [
105+
"partkey"
106+
],
107+
"bucket_count": 10,
108+
"partition_map": [
109+
{
110+
"partition": "shipmode='AIR'",
111+
"buckets": 1-7,
112+
"shards": [
113+
{
114+
"connectionUrl": "jdbc:sqlserver://sampleserver.database.windows.net:1433;database=test1"
115+
}
116+
]
117+
},
118+
{
119+
"partition": "shipmode='AIR'",
120+
"buckets": 8-10,
121+
"shards": [
122+
{
123+
"connectionUrl": "jdbc:sqlserver://sampleserver.database.windows.net:1433;database=test2"
124+
}
125+
]
126+
}
127+
]
128+
}
129+
]
130+
```
131+
132+
This example describes:
133+
134+
- The data for table line item partitioned by `shipmode`.
135+
- Each partition has 10 buckets.
136+
- Each partition is bucketed_by `partkey` column.
137+
- Buckets `1-7` for partition value `AIR` is located in `test1` database.
138+
- Buckets `8-10` for partition value `AIR` is located in `test2` database.
139+
- Shards are an array of `connectionUrl`. Each member of the array represents a replicaSet. During query execution, Trino selects a shard randomly from the array to query data.
140+
141+
142+
### Partition and bucket pruning
143+
144+
Connector evaluates the query constraints during the planning and performs based on the provided query predicates. This helps speed-up query performance, and allows connector to query large amounts of data.
145+
146+
Bucketing formula to determine assignments using murmur hash function implementation described [here](https://commons.apache.org/proper/commons-codec/apidocs/src-html/org/apache/commons/codec/digest/MurmurHash3.html#line.388).
147+
148+
### Type mapping
149+
150+
Sharded SQL connector supports the same type mappings as SQL server connector [type mappings](https://trino.io/docs/current/connector/sqlserver.html#type-mapping).
151+
152+
### Pushdown
153+
154+
The following pushdown optimizations are supported:
155+
- Limit pushdown
156+
- Distributive aggregates
157+
- Join pushdown
158+
159+
`JOIN` operation can be pushed down to server only when the connector determines the data is colocated for the build and probe table. Connector determines the data is colocated when
160+
- the sharding_schema for both `left` and the `right` table is the same.
161+
- join conditions are superset of partitioning and bucketing keys.
162+
163+
To use `JOIN` pushdown optimization, catalog property `join-pushdown.strategy` should set to `EAGER`
164+
165+
`AGGREGATE` pushdown for this connector can only be done for distributive aggregates. The optimizer config `optimizer.partial-aggregate-pushdown-enabled` needs to be set to `true` to enable this optimization.

0 commit comments

Comments
 (0)