Skip to content

Commit 1db6172

Browse files
committed
Enhance BigQuery client with flexible authentication methods and update documentation
1 parent 0c70b89 commit 1db6172

File tree

2 files changed

+129
-23
lines changed

2 files changed

+129
-23
lines changed

CLAUDE.md

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,32 @@ The server exposes these tools to AI agents:
6767
- `QUERY_ACCESS_EVALUATION_ENABLED`: Optional flag to enable/disable query access evaluation (defaults to `true`). Set to `false` to skip AI-based access evaluation when AI is not enabled in Data Mesh Manager.
6868

6969
#### BigQuery Configuration
70-
- `BIGQUERY_CREDENTIALS_PATH`: Path to service account key file
70+
71+
The BigQuery client supports three authentication methods with automatic fallback:
72+
73+
1. **Service Account JSON** (Recommended for production)
74+
- Set `BIGQUERY_CREDENTIALS_PATH` to path of service account key file
75+
- Example: `BIGQUERY_CREDENTIALS_PATH=/path/to/service-account-key.json`
76+
77+
2. **Workload Identity Federation** (Recommended for cloud environments)
78+
- Set `BIGQUERY_CREDENTIALS_PATH` to path of workload identity federation config
79+
- Example: `BIGQUERY_CREDENTIALS_PATH=/path/to/wif-config.json`
80+
81+
3. **Application Default Credentials** (Recommended for local development)
82+
- Do not set `BIGQUERY_CREDENTIALS_PATH` or set it to empty
83+
- Run `gcloud auth application-default login` for local user credentials
84+
- Automatically used in cloud environments (GCE, Cloud Run, GKE, etc.)
85+
86+
**Authentication Priority:**
87+
- If `BIGQUERY_CREDENTIALS_PATH` is set, tries service account JSON first, then workload identity federation
88+
- If not set or file not found, falls back to Application Default Credentials
7189

7290
**Note**: Google Cloud Project ID and dataset information are specified in the data product's output port server configuration, not as environment variables.
7391

7492
### Claude Desktop Integration
7593
Configure in `~/Library/Application Support/Claude/claude_desktop_config.json`:
94+
95+
**Example 1: Using Service Account JSON**
7696
```json
7797
{
7898
"mcpServers": {
@@ -90,6 +110,24 @@ Configure in `~/Library/Application Support/Claude/claude_desktop_config.json`:
90110
}
91111
```
92112

113+
**Example 2: Using Application Default Credentials (local development)**
114+
```json
115+
{
116+
"mcpServers": {
117+
"dataproduct": {
118+
"command": "uv",
119+
"args": ["run", "--directory", "<path_to_folder>/dataproduct-mcp", "python", "-m", "dataproduct_mcp.server"],
120+
"env": {
121+
"DATAMESH_MANAGER_API_KEY": "dmm_live_...",
122+
"DATAMESH_MANAGER_HOST": "https://your-self-hosted-instance.com",
123+
"QUERY_ACCESS_EVALUATION_ENABLED": "true"
124+
}
125+
}
126+
}
127+
}
128+
```
129+
Note: Run `gcloud auth application-default login` before using ADC.
130+
93131
## Code Patterns
94132

95133
- All tools are async and use proper error handling with try/catch blocks
Lines changed: 90 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,118 @@
1-
from typing import Any, Dict, List
1+
from typing import Any, Dict, List, Optional
22
import logging
33
import os
4+
import json
45
from google.cloud import bigquery
56
from google.oauth2 import service_account
7+
from google.auth import default
8+
from google.auth.exceptions import DefaultCredentialsError
69

710
logger = logging.getLogger(__name__)
811

912

13+
def _load_credentials(credentials_path: Optional[str]) -> tuple[Optional[Any], Optional[str]]:
14+
"""
15+
Load Google Cloud credentials using a flexible authentication strategy.
16+
17+
Authentication priority:
18+
1. If credentials_path is provided and exists:
19+
a. Try to load as service account JSON
20+
b. Try to load as workload identity federation config
21+
2. Fall back to Application Default Credentials (ADC) for local user credentials
22+
23+
Returns:
24+
tuple: (credentials, project_id) - project_id may be None if not in credentials
25+
"""
26+
credentials = None
27+
project_id = None
28+
29+
# Strategy 1: Try credentials file if path is provided
30+
if credentials_path:
31+
if not os.path.exists(credentials_path):
32+
raise ValueError(f"Credentials file not found: {credentials_path}")
33+
34+
try:
35+
# Try service account JSON
36+
credentials = service_account.Credentials.from_service_account_file(credentials_path)
37+
logger.info("Using service account credentials from JSON file")
38+
39+
# Extract project_id from service account file if available
40+
with open(credentials_path, 'r') as f:
41+
sa_info = json.load(f)
42+
project_id = sa_info.get('project_id')
43+
44+
return credentials, project_id
45+
46+
except (ValueError, KeyError) as e:
47+
# Not a valid service account file, try workload identity federation
48+
logger.debug(f"Not a service account file: {e}")
49+
50+
try:
51+
# Try workload identity federation (external account)
52+
from google.auth import load_credentials_from_file
53+
credentials, project_id = load_credentials_from_file(credentials_path)
54+
logger.info("Using workload identity federation credentials")
55+
return credentials, project_id
56+
57+
except Exception as e:
58+
logger.warning(f"Failed to load credentials from file: {e}")
59+
raise ValueError(
60+
f"Credentials file is neither a valid service account JSON nor "
61+
f"workload identity federation config: {credentials_path}"
62+
)
63+
64+
# Strategy 2: Fall back to Application Default Credentials (ADC)
65+
try:
66+
credentials, project_id = default()
67+
logger.info("Using Application Default Credentials (local user credentials or environment-based)")
68+
return credentials, project_id
69+
70+
except DefaultCredentialsError as e:
71+
raise ValueError(
72+
"No valid credentials found. Please either:\n"
73+
"1. Set BIGQUERY_CREDENTIALS_PATH to a service account JSON file\n"
74+
"2. Set BIGQUERY_CREDENTIALS_PATH to a workload identity federation config\n"
75+
"3. Run 'gcloud auth application-default login' for local development\n"
76+
"4. Ensure your environment has valid Application Default Credentials\n"
77+
f"Error: {e}"
78+
)
79+
80+
1081
async def execute_bigquery_query(server_info: Dict[str, Any], query: str) -> List[Dict[str, Any]]:
1182
"""Execute query on BigQuery."""
1283
# Parse connection parameters
1384
project_id = server_info.get("project_id") or server_info.get("project")
1485
credentials_path = os.getenv("BIGQUERY_CREDENTIALS_PATH")
15-
86+
1687
# Validate required parameters
1788
if not project_id:
1889
raise ValueError("Missing required parameter: project_id must be specified in server configuration")
19-
20-
if not credentials_path:
21-
raise ValueError(
22-
"Missing required parameter: credentials_path\n"
23-
"Set BIGQUERY_CREDENTIALS_PATH environment variable or specify credentials_path in server configuration"
24-
)
25-
26-
if not os.path.exists(credentials_path):
27-
raise ValueError(f"Credentials file not found: {credentials_path}")
28-
90+
2991
try:
3092
logger.info(f"Executing BigQuery query: {query[:100]}...")
31-
32-
# Create credentials and client
33-
credentials = service_account.Credentials.from_service_account_file(credentials_path)
93+
94+
# Load credentials using flexible authentication strategy
95+
credentials, cred_project_id = _load_credentials(credentials_path)
96+
97+
# Use project_id from server_info, fall back to credentials if not specified
98+
if not project_id and cred_project_id:
99+
project_id = cred_project_id
100+
logger.info(f"Using project_id from credentials: {project_id}")
101+
34102
client = bigquery.Client(project=project_id, credentials=credentials)
35-
103+
36104
# Execute query
37105
query_job = client.query(query)
38106
results = query_job.result()
39-
107+
40108
# Convert results to list of dictionaries
41109
rows = []
42110
for row in results:
43111
row_dict = {}
44112
for field in results.schema:
45113
field_name = field.name
46114
field_value = row[field_name]
47-
115+
48116
# Handle special BigQuery types
49117
if field_value is None:
50118
row_dict[field_name] = None
@@ -54,15 +122,15 @@ async def execute_bigquery_query(server_info: Dict[str, Any], query: str) -> Lis
54122
row_dict[field_name] = field_value
55123
else:
56124
row_dict[field_name] = str(field_value)
57-
125+
58126
rows.append(row_dict)
59-
127+
60128
logger.info(f"BigQuery query executed successfully, returned {len(rows)} rows")
61129
return rows
62-
130+
63131
except ImportError:
64132
logger.error("google-cloud-bigquery is not installed")
65133
raise ValueError("google-cloud-bigquery package is required for BigQuery connections")
66134
except Exception as e:
67135
logger.error(f"Failed to execute query on BigQuery: {str(e)}")
68-
raise
136+
raise

0 commit comments

Comments
 (0)