Skip to content

Commit 3c2712b

Browse files
drowosequem.slavoshevski@cian.ru
andauthored
Presto support in Luigi (#2885)
Co-authored-by: m.slavoshevski@cian.ru <m.slavoshevski@cian.ru>
1 parent c6551a5 commit 3c2712b

File tree

3 files changed

+499
-0
lines changed

3 files changed

+499
-0
lines changed

luigi/contrib/presto.py

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
import inspect
2+
import logging
3+
import re
4+
from collections import OrderedDict
5+
from contextlib import closing
6+
from enum import Enum
7+
from time import sleep
8+
9+
import luigi
10+
from luigi.six import add_metaclass
11+
from luigi.contrib import rdbms
12+
from luigi.task_register import Register
13+
14+
logger = logging.getLogger('luigi-interface')
15+
16+
try:
17+
from pyhive.presto import Connection, Cursor
18+
from pyhive.exc import DatabaseError
19+
except ImportError:
20+
logger.warning("pyhive[presto] is not installed.")
21+
22+
23+
class presto(luigi.Config): # NOQA
24+
host = luigi.Parameter(default='localhost', description='Presto host')
25+
port = luigi.IntParameter(default=8090, description='Presto port')
26+
user = luigi.Parameter(default='anonymous', description='Presto user')
27+
catalog = luigi.Parameter(default='hive', description='Default catalog')
28+
password = luigi.Parameter(default=None, description='User password')
29+
protocol = luigi.Parameter(default='https', description='Presto connection protocol')
30+
poll_interval = luigi.FloatParameter(
31+
default=1.0,
32+
description=' how often to ask the Presto REST interface for a progress update, defaults to a second'
33+
)
34+
35+
36+
class PrestoClient:
37+
"""
38+
Helper class wrapping `pyhive.presto.Connection`
39+
for executing presto queries and tracking progress
40+
"""
41+
42+
def __init__(self, connection, sleep_time=1):
43+
self.sleep_time = sleep_time
44+
self._connection = connection
45+
self._status = {'state': 'initial'}
46+
47+
@property
48+
def percentage_progress(self):
49+
"""
50+
:return: percentage of query overall progress
51+
"""
52+
return self._status.get('stats', {}).get('progressPercentage', 0.1)
53+
54+
@property
55+
def info_uri(self):
56+
"""
57+
:return: query UI link
58+
"""
59+
return self._status.get('infoUri')
60+
61+
def execute(self, query, parameters=None, mode=None):
62+
"""
63+
64+
:param query: query to run
65+
:param parameters: parameters should be injected in the query
66+
:param mode: "fetch" - yields rows, "watch" - yields log entries
67+
:return:
68+
"""
69+
class Mode(Enum):
70+
watch = 'watch'
71+
fetch = 'fetch'
72+
73+
_mode = Mode(mode) if mode else Mode.watch
74+
75+
with closing(self._connection.cursor()) as cursor:
76+
cursor.execute(query, parameters)
77+
status = self._status
78+
while status:
79+
sleep(self.sleep_time)
80+
status = cursor.poll()
81+
if status:
82+
if _mode == Mode.watch:
83+
yield status
84+
self._status = status
85+
86+
if _mode == Mode.fetch:
87+
for row in cursor.fetchall():
88+
yield row
89+
90+
91+
class WithPrestoClient(Register):
92+
"""
93+
A metaclass for injecting `PrestoClient` as a `_client` field into a new instance of class `T`
94+
Presto connection options are taken from `T`-instance fields
95+
Fields should have the same names as in `pyhive.presto.Cursor`
96+
"""
97+
98+
def __new__(cls, name, bases, attrs):
99+
def _client(self):
100+
def _kwargs():
101+
"""
102+
replace to
103+
```
104+
(_self, *args), *_ = inspect.getfullargspec(Cursor.__init__)
105+
```
106+
after py2-deprecation
107+
"""
108+
args = inspect.getargspec(Cursor.__init__)[0][1:]
109+
for parameter in args:
110+
val = getattr(self, parameter)
111+
if val:
112+
yield parameter, val
113+
114+
connection = Connection(**dict(_kwargs()))
115+
return PrestoClient(connection=connection)
116+
117+
attrs.update({
118+
'_client': property(_client)
119+
})
120+
return super(cls, WithPrestoClient).__new__(cls, name, bases, attrs)
121+
122+
123+
class PrestoTarget(luigi.Target):
124+
"""
125+
Target for presto-accessible tables
126+
"""
127+
def __init__(self, client, catalog, database, table, partition=None):
128+
self.catalog = catalog
129+
self.database = database
130+
self.table = table
131+
self.partition = partition
132+
self._client = client
133+
self._count = None
134+
135+
@property
136+
def _count_query(self):
137+
partition = OrderedDict(self.partition or {1: 1})
138+
139+
def _clauses():
140+
for k in partition.keys():
141+
yield '{} = %s'.format(k)
142+
143+
clauses = ' AND '.join(_clauses())
144+
145+
query = 'SELECT COUNT(*) AS cnt FROM {}.{}.{} WHERE {} LIMIT 1'.format(
146+
self.catalog,
147+
self.database,
148+
self.table,
149+
clauses
150+
)
151+
params = list(partition.values())
152+
return query, params
153+
154+
def _table_doesnot_exist(self, exception):
155+
pattern = re.compile(
156+
r'line (\d+):(\d+): Table {}.{}.{} does not exist'.format(
157+
self.catalog,
158+
self.database,
159+
self.table
160+
)
161+
)
162+
try:
163+
message = exception.message['message']
164+
if pattern.match(message):
165+
return True
166+
finally:
167+
return False
168+
169+
def count(self):
170+
if not self._count:
171+
'''
172+
replace to
173+
self._count, *_ = next(self._client.execute(*self.count_query, 'fetch'))
174+
after py2 deprecation
175+
'''
176+
self._count = next(self._client.execute(*self._count_query, mode='fetch'))[0]
177+
return self._count
178+
179+
def exists(self):
180+
"""
181+
182+
:return: `True` if given table exists and there are any rows in a given partition
183+
`False` if no rows in the partition exists or table is absent
184+
"""
185+
try:
186+
return self.count() > 0
187+
except DatabaseError as exception:
188+
if self._table_doesnot_exist(exception):
189+
return False
190+
except Exception:
191+
raise
192+
193+
194+
@add_metaclass(WithPrestoClient)
195+
class PrestoTask(rdbms.Query):
196+
"""
197+
Task for executing presto queries
198+
During its executions tracking url and percentage progress are set
199+
"""
200+
_tracking_url_set = False
201+
202+
@property
203+
def host(self):
204+
return presto().host
205+
206+
@property
207+
def port(self):
208+
return presto().port
209+
210+
@property
211+
def user(self):
212+
return presto().user
213+
214+
@property
215+
def username(self):
216+
return self.user
217+
218+
@property
219+
def schema(self):
220+
return self.database
221+
222+
@property
223+
def password(self):
224+
return presto().password
225+
226+
@property
227+
def catalog(self):
228+
return presto().catalog
229+
230+
@property
231+
def poll_interval(self):
232+
return presto().poll_interval
233+
234+
@property
235+
def source(self):
236+
return 'pyhive'
237+
238+
@property
239+
def partition(self):
240+
return None
241+
242+
@property
243+
def protocol(self):
244+
return 'https' if self.password else presto().protocol
245+
246+
@property
247+
def session_props(self):
248+
return None
249+
250+
@property
251+
def requests_session(self):
252+
return None
253+
254+
@property
255+
def requests_kwargs(self):
256+
return {
257+
'verify': False
258+
}
259+
260+
query = None
261+
262+
def _maybe_set_tracking_url(self):
263+
if not self._tracking_url_set:
264+
self.set_tracking_url(self._client.info_uri)
265+
self._tracking_url_set = True
266+
267+
def _set_progress(self):
268+
self.set_progress_percentage(self._client.percentage_progress)
269+
270+
def run(self):
271+
for _ in self._client.execute(self.query):
272+
self._maybe_set_tracking_url()
273+
self._set_progress()
274+
275+
def output(self):
276+
return PrestoTarget(
277+
client=self._client,
278+
catalog=self.catalog,
279+
database=self.database,
280+
table=self.table,
281+
partition=self.partition,
282+
)

0 commit comments

Comments
 (0)