Skip to content

Commit dce4d7d

Browse files
authored
Application: accept "quix_portal_api" param (#902)
* Application: accept "quix_portal_api" param * Concentrate env variable access in QuixEnvironment
1 parent a2f4ee7 commit dce4d7d

File tree

8 files changed

+162
-97
lines changed

8 files changed

+162
-97
lines changed

quixstreams/app.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,12 @@
11
import contextlib
22
import functools
33
import logging
4-
import os
54
import signal
65
import time
76
import warnings
87
from collections import defaultdict
98
from pathlib import Path
10-
from typing import (
11-
Callable,
12-
List,
13-
Literal,
14-
Optional,
15-
Protocol,
16-
Tuple,
17-
Type,
18-
Union,
19-
)
9+
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union
2010

2111
from confluent_kafka import TopicPartition
2212
from pydantic import AliasGenerator, Field
@@ -45,12 +35,14 @@
4535
TopicManager,
4636
)
4737
from .platforms.quix import (
38+
DEFAULT_PORTAL_API_URL,
4839
QuixKafkaConfigsBuilder,
4940
QuixTopicManager,
5041
check_state_dir,
5142
check_state_management_enabled,
5243
is_quix_deployment,
5344
)
45+
from .platforms.quix.env import QUIX_ENVIRONMENT
5446
from .processing import ProcessingContext
5547
from .runtracker import RunTracker
5648
from .sinks import SinkManager
@@ -134,6 +126,7 @@ def __init__(
134126
broker_address: Optional[Union[str, ConnectionConfig]] = None,
135127
*,
136128
quix_sdk_token: Optional[str] = None,
129+
quix_portal_api: Optional[str] = None,
137130
consumer_group: Optional[str] = None,
138131
auto_offset_reset: AutoOffsetReset = "latest",
139132
commit_interval: float = 5.0,
@@ -172,6 +165,11 @@ def __init__(
172165
Linked Environment Variable: `Quix__Sdk__Token`.
173166
Default: None (if not run on Quix Cloud)
174167
>***NOTE:*** the environment variable is set for you in the Quix Cloud
168+
:param quix_portal_api: If using the Quix Cloud, the cluster API URL to use.
169+
Use it to connect to the dedicated Quix Cloud environment.
170+
Linked Environment Variable: `Quix__Portal__Api`.
171+
Default: `https://portal-api.platform.quix.io/`.
172+
>***NOTE:*** the environment variable is set for you in the Quix Cloud
175173
:param consumer_group: Kafka consumer group.
176174
Passed as `group.id` to `confluent_kafka.Consumer`.
177175
Linked Environment Variable: `Quix__Consumer_Group`.
@@ -244,19 +242,20 @@ def __init__(
244242
consumer_extra_config = consumer_extra_config or {}
245243

246244
if state_dir is None:
247-
state_dir = os.getenv(
248-
"Quix__State__Dir", "/app/state" if is_quix_deployment() else "state"
245+
state_dir = QUIX_ENVIRONMENT.state_dir or (
246+
"/app/state" if is_quix_deployment() else "state"
249247
)
250248
state_dir = Path(state_dir)
251249

252-
# We can't use os.getenv as defaults (and have testing work nicely)
253-
# since it evaluates getenv when the function is defined.
254-
# In general this is just a most robust approach.
255-
broker_address = broker_address or os.getenv("Quix__Broker__Address")
256-
quix_sdk_token = quix_sdk_token or os.getenv("Quix__Sdk__Token")
250+
broker_address = broker_address or QUIX_ENVIRONMENT.broker_address
251+
quix_sdk_token = quix_sdk_token or QUIX_ENVIRONMENT.sdk_token
252+
quix_portal_api = (
253+
quix_portal_api or QUIX_ENVIRONMENT.portal_api or DEFAULT_PORTAL_API_URL
254+
)
257255

258-
if not consumer_group:
259-
consumer_group = os.getenv("Quix__Consumer_Group", "quixstreams-default")
256+
consumer_group = (
257+
consumer_group or QUIX_ENVIRONMENT.consumer_group or "quixstreams-default"
258+
)
260259

261260
if broker_address:
262261
# If broker_address is passed to the app it takes priority over any quix configuration
@@ -276,8 +275,8 @@ def __init__(
276275
)
277276
elif quix_sdk_token:
278277
quix_app_source = "Quix SDK Token"
279-
quix_config_builder = QuixKafkaConfigsBuilder(
280-
quix_sdk_token=quix_sdk_token
278+
quix_config_builder = QuixKafkaConfigsBuilder.from_credentials(
279+
quix_sdk_token=quix_sdk_token, quix_portal_api=quix_portal_api
281280
)
282281
else:
283282
raise ValueError(
@@ -287,7 +286,8 @@ def __init__(
287286
# SDK Token or QuixKafkaConfigsBuilder were provided
288287
logger.info(
289288
f"{quix_app_source} detected; "
290-
f"the application will connect to Quix Cloud brokers"
289+
f"the application will connect to Quix Cloud brokers "
290+
f'(quix_portal_api="{quix_portal_api}")'
291291
)
292292
self._topic_manager_factory = functools.partial(
293293
QuixTopicManager, quix_config_builder=quix_config_builder

quixstreams/platforms/quix/api.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
__all__ = ("QuixPortalApiService",)
1616

17-
DEFAULT_PORTAL_API_URL = "https://portal-api.platform.quix.io/"
18-
1917

2018
class QuixPortalApiService:
2119
"""
@@ -34,13 +32,11 @@ class QuixPortalApiService:
3432
def __init__(
3533
self,
3634
auth_token: str,
37-
portal_api: Optional[str] = None,
35+
portal_api: str,
3836
api_version: Optional[str] = None,
3937
default_workspace_id: Optional[str] = None,
4038
):
41-
self._portal_api_url = (
42-
portal_api or QUIX_ENVIRONMENT.portal_api or DEFAULT_PORTAL_API_URL
43-
)
39+
self._portal_api_url = portal_api
4440
if not auth_token:
4541
raise MissingConnectionRequirements(
4642
f"A Quix Cloud auth token (SDK or PAT) is required; "

quixstreams/platforms/quix/config.py

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
UndefinedQuixWorkspaceId,
2121
)
2222

23+
__all__ = ("QuixKafkaConfigsBuilder", "QuixApplicationConfig", "DEFAULT_PORTAL_API_URL")
24+
2325
logger = logging.getLogger(__name__)
2426

25-
__all__ = ("QuixKafkaConfigsBuilder", "QuixApplicationConfig")
2627

28+
DEFAULT_PORTAL_API_URL = "https://portal-api.platform.quix.io/"
2729
QUIX_CONNECTIONS_MAX_IDLE_MS = 3 * 60 * 1000
2830
QUIX_METADATA_MAX_AGE_MS = 3 * 60 * 1000
2931

@@ -95,32 +97,21 @@ class QuixKafkaConfigsBuilder:
9597
It also currently handles the app_auto_create_topics setting for Quix Applications.
9698
"""
9799

98-
# TODO: Consider a workspace class?
99100
def __init__(
100101
self,
101-
quix_sdk_token: Optional[str] = None,
102+
quix_portal_api_service: QuixPortalApiService,
102103
workspace_id: Optional[str] = None,
103-
quix_portal_api_service: Optional[QuixPortalApiService] = None,
104104
timeout: float = 30,
105105
topic_create_timeout: float = 60,
106106
):
107107
"""
108-
:param quix_portal_api_service: A QuixPortalApiService instance (else generated)
108+
:param quix_portal_api_service: A QuixPortalApiService instance
109109
:param workspace_id: A valid Quix Workspace ID (else searched for)
110110
"""
111-
if quix_sdk_token:
112-
self.api = QuixPortalApiService(
113-
default_workspace_id=workspace_id, auth_token=quix_sdk_token
114-
)
115-
elif quix_portal_api_service:
116-
self.api = quix_portal_api_service
117-
else:
118-
raise ValueError(
119-
'Either "quix_sdk_token" or "quix_portal_api_service" must be provided'
120-
)
121111

112+
self._api = quix_portal_api_service
122113
try:
123-
self._workspace_id = workspace_id or self.api.default_workspace_id
114+
self._workspace_id = workspace_id or self._api.default_workspace_id
124115
except UndefinedQuixWorkspaceId:
125116
self._workspace_id = ""
126117
logger.warning(
@@ -136,6 +127,30 @@ def __init__(
136127
self._timeout = timeout
137128
self._topic_create_timeout = topic_create_timeout
138129

130+
@classmethod
131+
def from_credentials(
132+
cls,
133+
quix_sdk_token: str,
134+
quix_portal_api: str = DEFAULT_PORTAL_API_URL,
135+
workspace_id: Optional[str] = None,
136+
timeout: float = 30,
137+
topic_create_timeout: float = 60,
138+
) -> "QuixKafkaConfigsBuilder":
139+
"""
140+
Initialize class using the quix_sdk_token and quix_portal_api params.
141+
"""
142+
api_service = QuixPortalApiService(
143+
default_workspace_id=workspace_id,
144+
auth_token=quix_sdk_token,
145+
portal_api=quix_portal_api,
146+
)
147+
return cls(
148+
quix_portal_api_service=api_service,
149+
workspace_id=workspace_id,
150+
timeout=timeout,
151+
topic_create_timeout=topic_create_timeout,
152+
)
153+
139154
@property
140155
def workspace_id(self) -> str:
141156
if not self._workspace_id:
@@ -250,12 +265,12 @@ def search_for_workspace(
250265
if not workspace_name_or_id:
251266
workspace_name_or_id = self._workspace_id
252267
try:
253-
return self.api.get_workspace(
268+
return self._api.get_workspace(
254269
workspace_id=workspace_name_or_id, timeout=timeout
255270
)
256271
except HTTPError:
257272
# check to see if they provided the workspace name instead
258-
ws_list = self.api.get_workspaces(timeout=timeout)
273+
ws_list = self._api.get_workspaces(timeout=timeout)
259274
for ws in ws_list:
260275
if ws["name"] == workspace_name_or_id:
261276
return ws
@@ -315,7 +330,7 @@ def search_workspace_for_topic(
315330
316331
:return: the workspace_id if success, else None
317332
"""
318-
topics = self.api.get_topics(
333+
topics = self._api.get_topics(
319334
workspace_id=workspace_id,
320335
timeout=timeout if timeout is not None else self._timeout,
321336
)
@@ -338,7 +353,7 @@ def search_for_topic_workspace(
338353
339354
:return: workspace data dict if topic search success, else None
340355
"""
341-
ws_list = self.api.get_workspaces(
356+
ws_list = self._api.get_workspaces(
342357
timeout=timeout if timeout is not None else self._timeout
343358
)
344359
if len(ws_list) == 1:
@@ -376,7 +391,7 @@ def create_topic(self, topic: Topic, timeout: Optional[float] = None) -> dict:
376391
f'Creating topic "{topic.name}" '
377392
f'with a config: "{topic.create_config.as_dict() if topic.create_config is not None else {} }"'
378393
)
379-
resp = self.api.post_topic(
394+
resp = self._api.post_topic(
380395
topic_name=topic.name,
381396
workspace_id=self.workspace_id,
382397
topic_partitions=cfg.num_partitions,
@@ -473,14 +488,14 @@ def get_topic(self, topic: Topic, timeout: Optional[float] = None) -> dict:
473488
:return: response dict of the topic info if topic found, else None
474489
:raises QuixApiRequestFailure: when topic does not exist
475490
"""
476-
return self.api.get_topic(
491+
return self._api.get_topic(
477492
topic_name=topic.name,
478493
workspace_id=self.workspace_id,
479494
timeout=timeout if timeout is not None else self._timeout,
480495
)
481496

482497
def get_topics(self, timeout: Optional[float] = None) -> List[dict]:
483-
return self.api.get_topics(
498+
return self._api.get_topics(
484499
workspace_id=self.workspace_id,
485500
timeout=timeout if timeout is not None else self._timeout,
486501
)
@@ -490,7 +505,7 @@ def _get_librdkafka_connection_config(self) -> ConnectionConfig:
490505
Get the full client config required to authenticate a confluent-kafka
491506
client to a Quix platform broker/workspace as a ConnectionConfig
492507
"""
493-
librdkafka_dict = self.api.get_librdkafka_connection_config(self.workspace_id)
508+
librdkafka_dict = self._api.get_librdkafka_connection_config(self.workspace_id)
494509
if (cert := librdkafka_dict.pop("ssl.ca.cert", None)) is not None:
495510
librdkafka_dict["ssl.ca.pem"] = base64.b64decode(cert)
496511
return ConnectionConfig.from_librdkafka_dict(librdkafka_dict)

quixstreams/platforms/quix/env.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66

77
class QuixEnvironment:
88
"""
9-
Class to access various Quix platform environment settings
9+
A class to access various Quix Streams environment variables
1010
"""
1111

1212
SDK_TOKEN = "Quix__Sdk__Token" # noqa: S105
13+
BROKER_ADDRESS = "Quix__Broker__Address"
1314
PORTAL_API = "Quix__Portal__Api"
1415
WORKSPACE_ID = "Quix__Workspace__Id"
1516
DEPLOYMENT_ID = "Quix__Deployment__Id"
1617
STATE_MANAGEMENT_ENABLED = "Quix__Deployment__State__Enabled"
18+
STATE_DIR = "Quix__State__Dir"
19+
CONSUMER_GROUP = "Quix__Consumer_Group"
1720

1821
@property
1922
def state_management_enabled(self) -> bool:
@@ -44,22 +47,40 @@ def workspace_id(self) -> Optional[str]:
4447
return os.environ.get(self.WORKSPACE_ID)
4548

4649
@property
47-
def portal_api(self) -> Optional[str]:
50+
def state_dir(self) -> Optional[str]:
4851
"""
49-
Return Quix Portal API url if set
52+
Return application state directory on Quix.
53+
:return: path to state dir
54+
"""
55+
return os.environ.get(self.STATE_DIR)
5056

51-
:return: portal API URL or None
57+
@property
58+
def portal_api(self) -> Optional[str]:
59+
"""
60+
Quix Portal API URL
5261
"""
5362
return os.environ.get(self.PORTAL_API)
5463

5564
@property
56-
def state_dir(self) -> str:
65+
def broker_address(self) -> Optional[str]:
5766
"""
58-
Return application state directory on Quix.
59-
:return: path to state dir
67+
Kafka broker address
68+
"""
69+
return os.environ.get(self.BROKER_ADDRESS)
70+
71+
@property
72+
def sdk_token(self) -> Optional[str]:
73+
"""
74+
Quix SDK token
75+
"""
76+
return os.environ.get(self.SDK_TOKEN)
77+
78+
@property
79+
def consumer_group(self) -> Optional[str]:
80+
"""
81+
Kafka consumer group
6082
"""
61-
# TODO: Use env variables instead when they're available
62-
return "/app/state"
83+
return os.environ.get(self.CONSUMER_GROUP)
6384

6485

6586
QUIX_ENVIRONMENT = QuixEnvironment()

quixstreams/platforms/quix/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
"MissingConnectionRequirements",
77
"UndefinedQuixWorkspaceId",
88
"QuixApiRequestFailure",
9+
"MultipleWorkspaces",
10+
"NoWorkspaceFound",
11+
"QuixCreateTopicFailure",
12+
"QuixCreateTopicTimeout",
913
)
1014

1115

quixstreams/sources/core/kafka/quix.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
from quixstreams.kafka import AutoOffsetReset
55
from quixstreams.models.serializers import DeserializerType
66
from quixstreams.models.topics import Topic
7-
from quixstreams.platforms.quix import QuixKafkaConfigsBuilder
8-
from quixstreams.platforms.quix.api import QuixPortalApiService
7+
from quixstreams.platforms.quix import DEFAULT_PORTAL_API_URL, QuixKafkaConfigsBuilder
98
from quixstreams.sources import (
109
ClientConnectFailureCallback,
1110
ClientConnectSuccessCallback,
@@ -83,12 +82,10 @@ def __init__(
8382

8483
self._short_topic = topic
8584
self._quix_workspace_id = quix_workspace_id
86-
self._quix_config = QuixKafkaConfigsBuilder(
87-
quix_portal_api_service=QuixPortalApiService(
88-
default_workspace_id=quix_workspace_id,
89-
auth_token=quix_sdk_token,
90-
portal_api=quix_portal_api,
91-
)
85+
self._quix_config = QuixKafkaConfigsBuilder.from_credentials(
86+
quix_sdk_token=quix_sdk_token,
87+
quix_portal_api=quix_portal_api or DEFAULT_PORTAL_API_URL,
88+
workspace_id=quix_workspace_id,
9289
)
9390

9491
quix_topic = self._quix_config.convert_topic_response(

0 commit comments

Comments
 (0)