Skip to content

Commit 737e42e

Browse files
feat(trino): add trino HTTP client
1 parent 068c2dc commit 737e42e

16 files changed

+1402
-0
lines changed

ohsome_quality_api/trino/client.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import asyncio
2+
3+
import httpx
4+
from pydantic import BaseModel, ConfigDict, field_validator
5+
6+
from ohsome_quality_api.utils.helper import snake_to_lower_camel
7+
8+
# class TrinoQuery:
9+
# self._query_id: Optional[str] = None
10+
# self._stats: Dict[Any, Any] = {}
11+
# self._info_uri: Optional[str] = None
12+
# self._warnings: List[Dict[Any, Any]] = []
13+
# self._columns: Optional[List[str]] = None
14+
# self._finished = False
15+
# self._cancelled = False
16+
# self._request = request
17+
# self._update_type = None
18+
# self._update_count = None
19+
# self._next_uri = None
20+
# self._query = query
21+
# self._result: Optional[TrinoResult] = None
22+
# self._legacy_primitive_types = legacy_primitive_types
23+
# self._row_mapper: Optional[RowMapper] = None
24+
# self._fetch_mode = fetch_mode
25+
26+
27+
TRINO_HOST = "127.0.0.1"
28+
TRINO_PORT = 8084
29+
TRINO_USER = "mschaub"
30+
31+
URL = f"http://{TRINO_HOST}:{TRINO_PORT}/v1/statement"
32+
33+
HEADERS = {
34+
"X-Trino-User": TRINO_USER,
35+
}
36+
37+
AUTH = None
38+
39+
40+
class TrinoError(Exception):
41+
pass
42+
43+
44+
class BaseConfig(BaseModel):
45+
model_config = ConfigDict(
46+
alias_generator=snake_to_lower_camel,
47+
frozen=True,
48+
)
49+
50+
51+
class QueryError(BaseConfig):
52+
message: str
53+
error_code: int
54+
error_name: str
55+
error_type: str
56+
error_location: dict[str, int]
57+
failure_info: dict
58+
59+
60+
class QueryResults(BaseConfig):
61+
id: str
62+
next_uri: str | None = None
63+
columns: list | None = None
64+
data: list | None = None
65+
error: QueryError | None = None
66+
67+
@field_validator("error")
68+
@classmethod
69+
def check_error(cls, value: QueryError) -> str:
70+
if value is not None:
71+
raise TrinoError(f"{value.error_name}: {value.message}")
72+
else:
73+
return value
74+
75+
76+
async def query(sql) -> QueryResults:
77+
async with httpx.AsyncClient() as client:
78+
response = await client.post(
79+
url=URL,
80+
content=sql.encode("utf-8"),
81+
headers=HEADERS,
82+
auth=AUTH,
83+
)
84+
response.raise_for_status()
85+
results = QueryResults.model_validate(response.json())
86+
return results
87+
88+
89+
async def raise_for_status(response):
90+
# TODO: If the client request returns an HTTP 502, 503, or 504, that means
91+
# there was an intermittent problem processing request and the client
92+
# should try again in 50-100 ms.
93+
94+
# TODO: Additionally, if the request returns a 429 status code, the client should retry the request using the Retry-After header value provided.
95+
96+
# TODO: Any HTTP status other than 502, 503, 504 or 200 means that query processing has failed.
97+
pass
98+
99+
100+
# TODO: user of fetch() should be able to define a timeout instead wait+attempts.
101+
# A possible solution is to start with very low wait times between attempts and
102+
# scale exponentially for each failure
103+
async def fetch(
104+
results: QueryResults,
105+
wait: int = 1,
106+
attempts: int = 10,
107+
_attempt: int = 0,
108+
_data: tuple = tuple(),
109+
) -> tuple:
110+
"""Fetch all data of a query."""
111+
# TODO: use one client instance for the eitre fetching/polling
112+
# intervall
113+
async with httpx.AsyncClient() as client:
114+
response = await client.get(
115+
url=results.next_uri,
116+
headers=HEADERS,
117+
auth=AUTH,
118+
)
119+
response.raise_for_status()
120+
results = QueryResults.model_validate(response.json())
121+
122+
if results.data is not None:
123+
_data = _data + tuple(results.data)
124+
125+
if results.next_uri is None:
126+
return _data
127+
128+
elif results.next_uri is not None and _attempt <= attempts:
129+
await asyncio.sleep(wait)
130+
return await fetch(results, _attempt=_attempt + 1, _data=_data)
131+
else:
132+
raise Exception()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
SELECT
2+
count(osm_id)
3+
FROM
4+
sotm2024_iceberg.geo_sort.contributions
5+
WHERE
6+
status = 'latest'
7+
AND element_at (tags, 'highway') IS NOT NULL
8+
AND tags['highway'] IN ('motorway', 'trunk', 'motorway_link',
9+
'trunk_link', 'primary', 'primary_link', 'secondary',
10+
'secondary_link', 'tertiary', 'tertiary_link', 'unclassified',
11+
'residential')
12+
AND (bbox.xmax >= 8.629761
13+
AND bbox.xmin <= 8.742371)
14+
AND (bbox.ymax >= 49.379556
15+
AND bbox.ymin <= 49.437890)
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
SELECT
2+
osm_id
3+
FROM
4+
sotm2024_iceberg.geo_sort.contributions
5+
WHERE
6+
status = 'latest'
7+
AND element_at (tags, 'highway') IS NOT NULL
8+
AND tags['highway'] IN ('motorway', 'trunk', 'motorway_link',
9+
'trunk_link', 'primary', 'primary_link', 'secondary',
10+
'secondary_link', 'tertiary', 'tertiary_link', 'unclassified',
11+
'residential')
12+
AND (bbox.xmax >= 8.629761
13+
AND bbox.xmin <= 8.742371)
14+
AND (bbox.ymax >= 49.379556
15+
AND bbox.ymin <= 49.437890)

tests/integrationtests/trino/test_fetch_highways

Lines changed: 450 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
interactions:
2+
- request:
3+
body: SELECT version()
4+
headers:
5+
accept:
6+
- '*/*'
7+
accept-encoding:
8+
- gzip, deflate
9+
connection:
10+
- keep-alive
11+
content-length:
12+
- '16'
13+
host:
14+
- 127.0.0.1:8084
15+
user-agent:
16+
- python-httpx/0.23.3
17+
x-trino-user:
18+
- mschaub
19+
method: POST
20+
uri: http://127.0.0.1:8084/v1/statement
21+
response:
22+
content: '{"id":"20250117_195808_00519_3gijq","infoUri":"http://127.0.0.1:8084/ui/query.html?20250117_195808_00519_3gijq","nextUri":"http://127.0.0.1:8084/v1/statement/queued/20250117_195808_00519_3gijq/y06361689388e2a7236eb747c62e90c44ba8ce587/1","stats":{"state":"QUEUED","queued":true,"scheduled":false,"nodes":0,"totalSplits":0,"queuedSplits":0,"runningSplits":0,"completedSplits":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":0,"elapsedTimeMillis":0,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"physicalWrittenBytes":0,"peakMemoryBytes":0,"spilledBytes":0},"warnings":[]}
23+
24+
'
25+
headers:
26+
Content-Encoding:
27+
- gzip
28+
Content-Length:
29+
- '320'
30+
Content-Type:
31+
- application/json
32+
Date:
33+
- Fri, 17 Jan 2025 19:58:08 GMT
34+
Vary:
35+
- Accept-Encoding
36+
X-Content-Type-Options:
37+
- nosniff
38+
http_version: HTTP/1.1
39+
status_code: 200
40+
- request:
41+
body: ''
42+
headers:
43+
accept:
44+
- '*/*'
45+
accept-encoding:
46+
- gzip, deflate
47+
connection:
48+
- keep-alive
49+
host:
50+
- 127.0.0.1:8084
51+
user-agent:
52+
- python-httpx/0.23.3
53+
x-trino-user:
54+
- mschaub
55+
method: GET
56+
uri: http://127.0.0.1:8084/v1/statement/queued/20250117_195808_00519_3gijq/y06361689388e2a7236eb747c62e90c44ba8ce587/1
57+
response:
58+
content: '{"id":"20250117_195808_00519_3gijq","infoUri":"http://127.0.0.1:8084/ui/query.html?20250117_195808_00519_3gijq","nextUri":"http://127.0.0.1:8084/v1/statement/queued/20250117_195808_00519_3gijq/y59f89628f5bff9c4363cb5c93dc4c5f1a6032561/2","stats":{"state":"QUEUED","queued":true,"scheduled":false,"nodes":0,"totalSplits":0,"queuedSplits":0,"runningSplits":0,"completedSplits":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":2,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"physicalWrittenBytes":0,"peakMemoryBytes":0,"spilledBytes":0},"warnings":[]}
59+
60+
'
61+
headers:
62+
Content-Encoding:
63+
- gzip
64+
Content-Length:
65+
- '328'
66+
Content-Type:
67+
- application/json
68+
Date:
69+
- Fri, 17 Jan 2025 19:58:10 GMT
70+
Vary:
71+
- Accept-Encoding
72+
X-Content-Type-Options:
73+
- nosniff
74+
http_version: HTTP/1.1
75+
status_code: 200
76+
- request:
77+
body: ''
78+
headers:
79+
accept:
80+
- '*/*'
81+
accept-encoding:
82+
- gzip, deflate
83+
connection:
84+
- keep-alive
85+
host:
86+
- 127.0.0.1:8084
87+
user-agent:
88+
- python-httpx/0.23.3
89+
x-trino-user:
90+
- mschaub
91+
method: GET
92+
uri: http://127.0.0.1:8084/v1/statement/queued/20250117_195808_00519_3gijq/y59f89628f5bff9c4363cb5c93dc4c5f1a6032561/2
93+
response:
94+
content: '{"id":"20250117_195808_00519_3gijq","infoUri":"http://127.0.0.1:8084/ui/query.html?20250117_195808_00519_3gijq","nextUri":"http://127.0.0.1:8084/v1/statement/executing/20250117_195808_00519_3gijq/y0322ae69458590776af494e8e316d43fcd830012/0","stats":{"state":"QUEUED","queued":true,"scheduled":false,"nodes":0,"totalSplits":0,"queuedSplits":0,"runningSplits":0,"completedSplits":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":2276,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"physicalWrittenBytes":0,"peakMemoryBytes":0,"spilledBytes":0},"warnings":[]}
95+
96+
'
97+
headers:
98+
Content-Encoding:
99+
- gzip
100+
Content-Length:
101+
- '334'
102+
Content-Type:
103+
- application/json
104+
Date:
105+
- Fri, 17 Jan 2025 19:58:12 GMT
106+
Vary:
107+
- Accept-Encoding
108+
X-Content-Type-Options:
109+
- nosniff
110+
http_version: HTTP/1.1
111+
status_code: 200
112+
- request:
113+
body: ''
114+
headers:
115+
accept:
116+
- '*/*'
117+
accept-encoding:
118+
- gzip, deflate
119+
connection:
120+
- keep-alive
121+
host:
122+
- 127.0.0.1:8084
123+
user-agent:
124+
- python-httpx/0.23.3
125+
x-trino-user:
126+
- mschaub
127+
method: GET
128+
uri: http://127.0.0.1:8084/v1/statement/executing/20250117_195808_00519_3gijq/y0322ae69458590776af494e8e316d43fcd830012/0
129+
response:
130+
content: '{"id":"20250117_195808_00519_3gijq","infoUri":"http://127.0.0.1:8084/ui/query.html?20250117_195808_00519_3gijq","partialCancelUri":"http://127.0.0.1:8084/v1/statement/executing/partialCancel/20250117_195808_00519_3gijq/0/y587641876633b650fe6153b202536cacc1ce034b/1","nextUri":"http://127.0.0.1:8084/v1/statement/executing/20250117_195808_00519_3gijq/y587641876633b650fe6153b202536cacc1ce034b/1","columns":[{"name":"_col0","type":"varchar","typeSignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":2147483647}]}}],"stats":{"state":"RUNNING","queued":false,"scheduled":true,"progressPercentage":100.0,"runningPercentage":0.0,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":4381,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"physicalWrittenBytes":0,"peakMemoryBytes":148,"spilledBytes":0,"rootStage":{"stageId":"0","state":"RUNNING","done":false,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":0,"wallTimeMillis":0,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"failedTasks":0,"coordinatorOnly":false,"subStages":[]}},"warnings":[]}
131+
132+
'
133+
headers:
134+
Content-Encoding:
135+
- gzip
136+
Content-Length:
137+
- '513'
138+
Content-Type:
139+
- application/json
140+
Date:
141+
- Fri, 17 Jan 2025 19:58:14 GMT
142+
Vary:
143+
- Accept-Encoding
144+
X-Content-Type-Options:
145+
- nosniff
146+
http_version: HTTP/1.1
147+
status_code: 200
148+
- request:
149+
body: ''
150+
headers:
151+
accept:
152+
- '*/*'
153+
accept-encoding:
154+
- gzip, deflate
155+
connection:
156+
- keep-alive
157+
host:
158+
- 127.0.0.1:8084
159+
user-agent:
160+
- python-httpx/0.23.3
161+
x-trino-user:
162+
- mschaub
163+
method: GET
164+
uri: http://127.0.0.1:8084/v1/statement/executing/20250117_195808_00519_3gijq/y587641876633b650fe6153b202536cacc1ce034b/1
165+
response:
166+
content: '{"id":"20250117_195808_00519_3gijq","infoUri":"http://127.0.0.1:8084/ui/query.html?20250117_195808_00519_3gijq","nextUri":"http://127.0.0.1:8084/v1/statement/executing/20250117_195808_00519_3gijq/y6d268f9ec1ab47bf99cf73cf881e086ba1456652/2","columns":[{"name":"_col0","type":"varchar","typeSignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":2147483647}]}}],"data":[["467"]],"stats":{"state":"FINISHED","queued":false,"scheduled":true,"progressPercentage":100.0,"runningPercentage":0.0,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":6298,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"physicalWrittenBytes":0,"peakMemoryBytes":148,"spilledBytes":0,"rootStage":{"stageId":"0","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":0,"wallTimeMillis":0,"processedRows":1,"processedBytes":0,"physicalInputBytes":0,"failedTasks":0,"coordinatorOnly":false,"subStages":[]}},"warnings":[]}
167+
168+
'
169+
headers:
170+
Content-Encoding:
171+
- gzip
172+
Content-Length:
173+
- '511'
174+
Content-Type:
175+
- application/json
176+
Date:
177+
- Fri, 17 Jan 2025 19:58:16 GMT
178+
Vary:
179+
- Accept-Encoding
180+
X-Content-Type-Options:
181+
- nosniff
182+
http_version: HTTP/1.1
183+
status_code: 200
184+
- request:
185+
body: ''
186+
headers:
187+
accept:
188+
- '*/*'
189+
accept-encoding:
190+
- gzip, deflate
191+
connection:
192+
- keep-alive
193+
host:
194+
- 127.0.0.1:8084
195+
user-agent:
196+
- python-httpx/0.23.3
197+
x-trino-user:
198+
- mschaub
199+
method: GET
200+
uri: http://127.0.0.1:8084/v1/statement/executing/20250117_195808_00519_3gijq/y6d268f9ec1ab47bf99cf73cf881e086ba1456652/2
201+
response:
202+
content: '{"id":"20250117_195808_00519_3gijq","infoUri":"http://127.0.0.1:8084/ui/query.html?20250117_195808_00519_3gijq","columns":[{"name":"_col0","type":"varchar","typeSignature":{"rawType":"varchar","arguments":[{"kind":"LONG","value":2147483647}]}}],"stats":{"state":"FINISHED","queued":false,"scheduled":true,"progressPercentage":100.0,"runningPercentage":0.0,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":6298,"processedRows":0,"processedBytes":0,"physicalInputBytes":0,"physicalWrittenBytes":0,"peakMemoryBytes":148,"spilledBytes":0,"rootStage":{"stageId":"0","state":"FINISHED","done":true,"nodes":1,"totalSplits":1,"queuedSplits":0,"runningSplits":0,"completedSplits":1,"cpuTimeMillis":0,"wallTimeMillis":0,"processedRows":1,"processedBytes":0,"physicalInputBytes":0,"failedTasks":0,"coordinatorOnly":false,"subStages":[]}},"warnings":[]}
203+
204+
'
205+
headers:
206+
Content-Encoding:
207+
- gzip
208+
Content-Length:
209+
- '444'
210+
Content-Type:
211+
- application/json
212+
Date:
213+
- Fri, 17 Jan 2025 19:58:18 GMT
214+
Vary:
215+
- Accept-Encoding
216+
X-Content-Type-Options:
217+
- nosniff
218+
http_version: HTTP/1.1
219+
status_code: 200
220+
version: 1

0 commit comments

Comments
 (0)