Skip to content

Commit b3bccab

Browse files
authored
Feat: Add username, password to Pinot Connection (apache#46826)
* feat: add username, password to Pinot Connection * fix: rename params conn.user to conn.login to align with Airflow Connection * fix: don't add login/password to PinotCLI if None or empty string (Airflow UI connection behavior) test: set login/password as empty string not to return mock
1 parent 2a39959 commit b3bccab

File tree

2 files changed

+217
-1
lines changed
  • providers/apache/pinot

2 files changed

+217
-1
lines changed

providers/apache/pinot/src/airflow/providers/apache/pinot/hooks/pinot.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import subprocess
2222
from collections.abc import Iterable, Mapping
2323
from typing import TYPE_CHECKING, Any
24+
from urllib.parse import quote_plus
2425

2526
from pinotdb import connect
2627

@@ -74,6 +75,8 @@ def __init__(
7475
conn = self.get_connection(conn_id)
7576
self.host = conn.host
7677
self.port = str(conn.port)
78+
self.username = conn.login
79+
self.password = conn.password
7780
if cmd_path != "pinot-admin.sh":
7881
raise RuntimeError(
7982
"In version 4.0.0 of the PinotAdminHook the cmd_path has been hard-coded to"
@@ -99,6 +102,10 @@ def add_schema(self, schema_file: str, with_exec: bool = True) -> Any:
99102
:param with_exec: bool
100103
"""
101104
cmd = ["AddSchema"]
105+
if self.username:
106+
cmd += ["-user", self.username]
107+
if self.password:
108+
cmd += ["-password", self.password]
102109
cmd += ["-controllerHost", self.host]
103110
cmd += ["-controllerPort", self.port]
104111
cmd += ["-schemaFile", schema_file]
@@ -114,6 +121,10 @@ def add_table(self, file_path: str, with_exec: bool = True) -> Any:
114121
:param with_exec: bool
115122
"""
116123
cmd = ["AddTable"]
124+
if self.username:
125+
cmd += ["-user", self.username]
126+
if self.password:
127+
cmd += ["-password", self.password]
117128
cmd += ["-controllerHost", self.host]
118129
cmd += ["-controllerPort", self.port]
119130
cmd += ["-filePath", file_path]
@@ -144,6 +155,11 @@ def create_segment(
144155
) -> Any:
145156
"""Create Pinot segment by run CreateSegment command."""
146157
cmd = ["CreateSegment"]
158+
if self.username:
159+
cmd += ["-user", self.username]
160+
161+
if self.password:
162+
cmd += ["-password", self.password]
147163

148164
if generator_config_file:
149165
cmd += ["-generatorConfigFile", generator_config_file]
@@ -210,6 +226,10 @@ def upload_segment(self, segment_dir: str, table_name: str | None = None) -> Any
210226
:return:
211227
"""
212228
cmd = ["UploadSegment"]
229+
if self.username:
230+
cmd += ["-user", self.username]
231+
if self.password:
232+
cmd += ["-password", self.password]
213233
cmd += ["-controllerHost", self.host]
214234
cmd += ["-controllerPort", self.port]
215235
cmd += ["-segmentDir", segment_dir]
@@ -277,6 +297,8 @@ def get_conn(self) -> Any:
277297
pinot_broker_conn = connect(
278298
host=conn.host,
279299
port=conn.port,
300+
username=conn.login,
301+
password=conn.password,
280302
path=conn.extra_dejson.get("endpoint", "/query/sql"),
281303
scheme=conn.extra_dejson.get("schema", "http"),
282304
)
@@ -291,7 +313,9 @@ def get_uri(self) -> str:
291313
"""
292314
conn = self.get_connection(self.get_conn_id())
293315
host = conn.host
294-
if conn.port is not None:
316+
if conn.login and conn.password:
317+
host = f"{quote_plus(conn.login)}:{quote_plus(conn.password)}@{host}"
318+
if conn.port:
295319
host += f":{conn.port}"
296320
conn_type = conn.conn_type or "http"
297321
endpoint = conn.extra_dejson.get("endpoint", "query/sql")

providers/apache/pinot/tests/unit/apache/pinot/hooks/test_pinot.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ def setup_method(self):
3333
self.conn = conn = mock.MagicMock()
3434
self.conn.host = "host"
3535
self.conn.port = "1000"
36+
self.conn.login = ""
37+
self.conn.password = ""
3638
self.conn.extra_dejson = {}
3739

3840
class PinotAdminHookTest(PinotAdminHook):
@@ -217,6 +219,8 @@ def setup_method(self):
217219
self.conn = conn = mock.MagicMock()
218220
self.conn.host = "host"
219221
self.conn.port = "1000"
222+
self.conn.login = ""
223+
self.conn.password = ""
220224
self.conn.conn_type = "http"
221225
self.conn.extra_dejson = {"endpoint": "query/sql"}
222226
self.cur = mock.MagicMock(rowcount=0)
@@ -272,3 +276,191 @@ def test_get_pandas_df(self):
272276
assert column == df.columns[0]
273277
for i, item in enumerate(result_sets):
274278
assert item[0] == df.values.tolist()[i][0]
279+
280+
281+
class TestPinotAdminHookWithAuth:
282+
def setup_method(self):
283+
self.conn = conn = mock.MagicMock()
284+
self.conn.host = "host"
285+
self.conn.port = "1000"
286+
self.conn.login = "user"
287+
self.conn.password = "pwd"
288+
self.conn.extra_dejson = {}
289+
290+
class PinotAdminHookTest(PinotAdminHook):
291+
def get_connection(self, conn_id):
292+
return conn
293+
294+
self.db_hook = PinotAdminHookTest()
295+
296+
@mock.patch("airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook.run_cli")
297+
def test_add_schema_with_auth(self, mock_run_cli):
298+
params = ["schema_file", False]
299+
self.db_hook.add_schema(*params)
300+
mock_run_cli.assert_called_once_with(
301+
[
302+
"AddSchema",
303+
"-user",
304+
self.conn.login,
305+
"-password",
306+
self.conn.password,
307+
"-controllerHost",
308+
self.conn.host,
309+
"-controllerPort",
310+
self.conn.port,
311+
"-schemaFile",
312+
params[0],
313+
]
314+
)
315+
316+
@mock.patch("airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook.run_cli")
317+
def test_add_table_with_auth(self, mock_run_cli):
318+
params = ["config_file", False]
319+
self.db_hook.add_table(*params)
320+
mock_run_cli.assert_called_once_with(
321+
[
322+
"AddTable",
323+
"-user",
324+
self.conn.login,
325+
"-password",
326+
self.conn.password,
327+
"-controllerHost",
328+
self.conn.host,
329+
"-controllerPort",
330+
self.conn.port,
331+
"-filePath",
332+
params[0],
333+
]
334+
)
335+
336+
@mock.patch("airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook.run_cli")
337+
def test_create_segment_with_auth(self, mock_run_cli):
338+
params = {
339+
"generator_config_file": "a",
340+
"data_dir": "b",
341+
"segment_format": "c",
342+
"out_dir": "d",
343+
"overwrite": True,
344+
"table_name": "e",
345+
"segment_name": "f",
346+
"time_column_name": "g",
347+
"schema_file": "h",
348+
"reader_config_file": "i",
349+
"enable_star_tree_index": False,
350+
"star_tree_index_spec_file": "j",
351+
"hll_size": 9,
352+
"hll_columns": "k",
353+
"hll_suffix": "l",
354+
"num_threads": 8,
355+
"post_creation_verification": True,
356+
"retry": 7,
357+
}
358+
359+
self.db_hook.create_segment(**params)
360+
361+
mock_run_cli.assert_called_once_with(
362+
[
363+
"CreateSegment",
364+
"-user",
365+
self.conn.login,
366+
"-password",
367+
self.conn.password,
368+
"-generatorConfigFile",
369+
params["generator_config_file"],
370+
"-dataDir",
371+
params["data_dir"],
372+
"-format",
373+
params["segment_format"],
374+
"-outDir",
375+
params["out_dir"],
376+
"-overwrite",
377+
params["overwrite"],
378+
"-tableName",
379+
params["table_name"],
380+
"-segmentName",
381+
params["segment_name"],
382+
"-timeColumnName",
383+
params["time_column_name"],
384+
"-schemaFile",
385+
params["schema_file"],
386+
"-readerConfigFile",
387+
params["reader_config_file"],
388+
"-starTreeIndexSpecFile",
389+
params["star_tree_index_spec_file"],
390+
"-hllSize",
391+
params["hll_size"],
392+
"-hllColumns",
393+
params["hll_columns"],
394+
"-hllSuffix",
395+
params["hll_suffix"],
396+
"-numThreads",
397+
params["num_threads"],
398+
"-postCreationVerification",
399+
params["post_creation_verification"],
400+
"-retry",
401+
params["retry"],
402+
]
403+
)
404+
405+
@mock.patch("airflow.providers.apache.pinot.hooks.pinot.PinotAdminHook.run_cli")
406+
def test_upload_segment_with_auth(self, mock_run_cli):
407+
params = ["segment_dir", False]
408+
self.db_hook.upload_segment(*params)
409+
mock_run_cli.assert_called_once_with(
410+
[
411+
"UploadSegment",
412+
"-user",
413+
self.conn.login,
414+
"-password",
415+
self.conn.password,
416+
"-controllerHost",
417+
self.conn.host,
418+
"-controllerPort",
419+
self.conn.port,
420+
"-segmentDir",
421+
params[0],
422+
]
423+
)
424+
425+
426+
class TestPinotDbApiHookWithAuth:
427+
def setup_method(self):
428+
self.conn = conn = mock.MagicMock()
429+
self.conn.host = "host"
430+
self.conn.port = "1000"
431+
self.conn.conn_type = "http"
432+
self.conn.login = "user"
433+
self.conn.password = "pwd"
434+
self.conn.extra_dejson = {"endpoint": "query/sql"}
435+
self.cur = mock.MagicMock(rowcount=0)
436+
self.conn.cursor.return_value = self.cur
437+
self.conn.__enter__.return_value = self.cur
438+
self.conn.__exit__.return_value = None
439+
440+
class TestPinotDBApiHook(PinotDbApiHook):
441+
def get_conn(self):
442+
return conn
443+
444+
def get_connection(self, conn_id):
445+
return conn
446+
447+
self.db_hook = TestPinotDBApiHook
448+
449+
def test_get_uri_with_auth(self):
450+
"""
451+
Test on getting a pinot connection uri
452+
"""
453+
db_hook = self.db_hook()
454+
assert db_hook.get_uri() == "http://user:pwd@host:1000/query/sql"
455+
456+
def test_get_conn_with_auth(self):
457+
"""
458+
Test on getting a pinot connection
459+
"""
460+
conn = self.db_hook().get_conn()
461+
assert conn.host == "host"
462+
assert conn.port == "1000"
463+
assert conn.login == "user"
464+
assert conn.password == "pwd"
465+
assert conn.conn_type == "http"
466+
assert conn.extra_dejson.get("endpoint") == "query/sql"

0 commit comments

Comments
 (0)