|
7 | 7 | import copy
|
8 | 8 | import json
|
9 | 9 | import logging
|
| 10 | +import os |
10 | 11 | import shutil
|
11 | 12 | import uuid
|
12 | 13 | from contextvars import ContextVar
|
|
41 | 42 | from pyatlan.client.asset import A, AssetClient, IndexSearchResults, LineageListResults
|
42 | 43 | from pyatlan.client.audit import AuditClient
|
43 | 44 | from pyatlan.client.common import CONNECTION_RETRY, HTTP_PREFIX, HTTPS_PREFIX
|
44 |
| -from pyatlan.client.constants import EVENT_STREAM, PARSE_QUERY, UPLOAD_IMAGE |
| 45 | +from pyatlan.client.constants import EVENT_STREAM, GET_TOKEN, PARSE_QUERY, UPLOAD_IMAGE |
45 | 46 | from pyatlan.client.contract import ContractClient
|
46 | 47 | from pyatlan.client.credential import CredentialClient
|
47 | 48 | from pyatlan.client.file import FileClient
|
|
75 | 76 | from pyatlan.model.group import AtlanGroup, CreateGroupResponse, GroupResponse
|
76 | 77 | from pyatlan.model.lineage import LineageListRequest
|
77 | 78 | from pyatlan.model.query import ParsedQuery, QueryParserRequest
|
78 |
| -from pyatlan.model.response import AssetMutationResponse |
| 79 | +from pyatlan.model.response import AccessTokenResponse, AssetMutationResponse |
79 | 80 | from pyatlan.model.role import RoleResponse
|
80 | 81 | from pyatlan.model.search import IndexSearchRequest
|
81 | 82 | from pyatlan.model.typedef import TypeDef, TypeDefResponse
|
@@ -346,6 +347,70 @@ def source_tag_cache(self) -> SourceTagCache:
|
346 | 347 | self._source_tag_cache = SourceTagCache(client=self)
|
347 | 348 | return self._source_tag_cache
|
348 | 349 |
|
| 350 | + @classmethod |
| 351 | + def from_token_guid(cls, guid: str) -> AtlanClient: |
| 352 | + """ |
| 353 | + Create an AtlanClient instance using an API token GUID. |
| 354 | +
|
| 355 | + This method performs a multi-step authentication flow: |
| 356 | + 1. Obtains Atlan-Argo (superuser) access token |
| 357 | + 2. Uses Argo token to retrieve the API token's client credentials |
| 358 | + 3. Exchanges those credentials for an access token |
| 359 | + 4. Returns a new AtlanClient authenticated with the resolved token |
| 360 | +
|
| 361 | + :param guid: API token GUID to resolve |
| 362 | + :returns: a new client instance authenticated with the resolved token |
| 363 | + :raises: ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM: If any step in the token resolution fails |
| 364 | + """ |
| 365 | + base_url = os.environ.get("ATLAN_BASE_URL", "INTERNAL") |
| 366 | + |
| 367 | + # Step 1: Initialize base client and get Atlan-Argo credentials |
| 368 | + # Note: Using empty api_key as we're bootstrapping authentication |
| 369 | + client = AtlanClient(base_url=base_url, api_key="") |
| 370 | + client_info = client.impersonate._get_client_info() |
| 371 | + |
| 372 | + # Prepare credentials for Atlan-Argo token request |
| 373 | + argo_credentials = { |
| 374 | + "grant_type": "client_credentials", |
| 375 | + "client_id": client_info.client_id, |
| 376 | + "client_secret": client_info.client_secret, |
| 377 | + "scope": "openid", |
| 378 | + } |
| 379 | + |
| 380 | + # Step 2: Obtain Atlan-Argo (superuser) access token |
| 381 | + try: |
| 382 | + raw_json = client._call_api(GET_TOKEN, request_obj=argo_credentials) |
| 383 | + argo_token = AccessTokenResponse(**raw_json).access_token |
| 384 | + temp_argo_client = AtlanClient(base_url=base_url, api_key=argo_token) |
| 385 | + except AtlanError as atlan_err: |
| 386 | + raise ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM.exception_with_parameters( |
| 387 | + "Failed to obtain Atlan-Argo token" |
| 388 | + ) from atlan_err |
| 389 | + |
| 390 | + # Step 3: Use Argo client to retrieve API token's credentials |
| 391 | + # Both endpoints require authentication, hence using the Argo token |
| 392 | + token_secret = temp_argo_client.impersonate.get_client_secret(client_guid=guid) |
| 393 | + token_client_id = temp_argo_client.token.get_by_guid(guid=guid).client_id |
| 394 | + |
| 395 | + # Step 4: Exchange API token credentials for access token |
| 396 | + token_credentials = { |
| 397 | + "grant_type": "client_credentials", |
| 398 | + "client_id": token_client_id, |
| 399 | + "client_secret": token_secret, |
| 400 | + "scope": "openid", |
| 401 | + } |
| 402 | + |
| 403 | + try: |
| 404 | + raw_json = client._call_api(GET_TOKEN, request_obj=token_credentials) |
| 405 | + token_api_key = AccessTokenResponse(**raw_json).access_token |
| 406 | + |
| 407 | + # Step 5: Create and return the authenticated client |
| 408 | + return AtlanClient(base_url=base_url, api_key=token_api_key) |
| 409 | + except AtlanError as atlan_err: |
| 410 | + raise ErrorCode.UNABLE_TO_ESCALATE_WITH_PARAM.exception_with_parameters( |
| 411 | + "Failed to obtain access token for API token" |
| 412 | + ) from atlan_err |
| 413 | + |
349 | 414 | def update_headers(self, header: Dict[str, str]):
|
350 | 415 | self._session.headers.update(header)
|
351 | 416 |
|
|
0 commit comments