Skip to content

Commit 783a24e

Browse files
committed
feat(connectors): restore multi-service sync infrastructure
1 parent 15d59cc commit 783a24e

File tree

17 files changed

+1131
-1
lines changed

17 files changed

+1131
-1
lines changed

agent_pm/connectors/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""Connector package exports."""
2+
3+
from .base import Connector
4+
from .calendar import CalendarConnector
5+
from .email import EmailConnector
6+
from .github import GitHubConnector
7+
from .google_drive import GoogleDriveConnector
8+
from .notion import NotionConnector
9+
from .slack import SlackConnector
10+
11+
__all__ = [
12+
"Connector",
13+
"CalendarConnector",
14+
"EmailConnector",
15+
"GitHubConnector",
16+
"GoogleDriveConnector",
17+
"NotionConnector",
18+
"SlackConnector",
19+
]

agent_pm/connectors/base.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Base connector protocol."""
2+
3+
from __future__ import annotations
4+
5+
from abc import ABC, abstractmethod
6+
from datetime import datetime
7+
from typing import Any
8+
9+
10+
class Connector(ABC):
11+
"""Abstract connector interface used by the sync manager."""
12+
13+
name: str
14+
15+
def __init__(self, *, name: str) -> None:
16+
self.name = name
17+
18+
@property
19+
@abstractmethod
20+
def enabled(self) -> bool:
21+
"""Return True if the connector has the credentials it needs."""
22+
23+
@abstractmethod
24+
async def sync(self, *, since: datetime | None = None) -> list[dict[str, Any]]:
25+
"""Fetch data updated since the provided timestamp."""
26+
27+
def format_metadata(self, **kwargs: Any) -> dict[str, Any]:
28+
data = {"connector": self.name}
29+
data.update(kwargs)
30+
return data

agent_pm/connectors/calendar.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""Google Calendar connector with dry-run support."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import json
7+
import logging
8+
from datetime import UTC, datetime, timedelta
9+
from pathlib import Path
10+
from typing import Any
11+
12+
import httpx
13+
14+
from agent_pm.connectors.base import Connector
15+
from agent_pm.settings import settings
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class CalendarConnector(Connector):
21+
def __init__(self) -> None:
22+
super().__init__(name="calendar")
23+
self._scopes = settings.google_calendar_scopes
24+
self._delegated_user = settings.google_calendar_delegated_user
25+
26+
@property
27+
def enabled(self) -> bool:
28+
json_creds = settings.google_service_account_json
29+
file_creds = settings.google_service_account_file
30+
return bool(settings.calendar_id and (json_creds or (file_creds and Path(file_creds).exists())))
31+
32+
async def sync(self, *, since: datetime | None = None) -> list[dict[str, Any]]:
33+
calendar_id = settings.calendar_id
34+
window_days = settings.calendar_sync_window_days
35+
if settings.dry_run or not self.enabled:
36+
return [
37+
{
38+
"dry_run": True,
39+
"calendar_id": calendar_id,
40+
"since": since.isoformat() if since else None,
41+
"window_days": window_days,
42+
}
43+
]
44+
45+
token = await self._get_token()
46+
headers = {"Authorization": f"Bearer {token}"}
47+
lower_bound = datetime.now(tz=UTC) - timedelta(days=window_days)
48+
time_min = (since if since and since > lower_bound else lower_bound).isoformat()
49+
time_max = (datetime.now(tz=UTC) + timedelta(days=window_days)).isoformat()
50+
params = {"timeMin": time_min, "timeMax": time_max, "singleEvents": "true", "orderBy": "startTime"}
51+
async with httpx.AsyncClient() as client:
52+
response = await client.get(
53+
f"https://www.googleapis.com/calendar/v3/calendars/{calendar_id}/events",
54+
headers=headers,
55+
params=params,
56+
timeout=30,
57+
)
58+
response.raise_for_status()
59+
return [response.json()]
60+
61+
async def _get_token(self) -> str:
62+
creds = await asyncio.to_thread(self._load_credentials)
63+
if not creds:
64+
raise RuntimeError("Google Calendar credentials missing")
65+
if not creds.valid or creds.expired:
66+
await asyncio.to_thread(creds.refresh, self._request())
67+
if not creds.token:
68+
raise RuntimeError("Failed to obtain calendar token")
69+
return creds.token
70+
71+
def _load_credentials(self):
72+
info: dict[str, Any] | None = None
73+
if settings.google_service_account_json:
74+
try:
75+
info = json.loads(settings.google_service_account_json)
76+
except json.JSONDecodeError as exc: # pragma: no cover - defensive
77+
logger.error("Invalid GOOGLE_SERVICE_ACCOUNT_JSON: %s", exc)
78+
elif settings.google_service_account_file:
79+
try:
80+
info = json.loads(Path(settings.google_service_account_file).read_text(encoding="utf-8"))
81+
except FileNotFoundError:
82+
logger.error("GOOGLE_SERVICE_ACCOUNT_FILE not found: %s", settings.google_service_account_file)
83+
except json.JSONDecodeError as exc: # pragma: no cover - defensive
84+
logger.error("Invalid GOOGLE_SERVICE_ACCOUNT_FILE: %s", exc)
85+
if not info:
86+
return None
87+
88+
try:
89+
from google.oauth2 import service_account
90+
except ImportError as exc: # pragma: no cover - optional dependency
91+
raise RuntimeError("google-auth not installed") from exc
92+
93+
credentials = service_account.Credentials.from_service_account_info(info, scopes=self._scopes)
94+
if self._delegated_user:
95+
credentials = credentials.with_subject(self._delegated_user)
96+
return credentials
97+
98+
@staticmethod
99+
def _request():
100+
try:
101+
from google.auth.transport.requests import Request
102+
except ImportError as exc: # pragma: no cover - optional dependency
103+
raise RuntimeError("google-auth not installed") from exc
104+
return Request()

agent_pm/connectors/email.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""Gmail connector returning structured message payloads."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import json
7+
import logging
8+
from datetime import UTC, datetime
9+
from pathlib import Path
10+
from typing import Any
11+
12+
import httpx
13+
14+
from agent_pm.connectors.base import Connector
15+
from agent_pm.settings import settings
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class EmailConnector(Connector):
21+
def __init__(self) -> None:
22+
super().__init__(name="gmail")
23+
self._delegated_user = settings.gmail_delegated_user
24+
self._labels = settings.gmail_label_filter
25+
self._scopes = settings.gmail_scopes
26+
27+
@property
28+
def enabled(self) -> bool:
29+
json_creds = settings.gmail_service_account_json
30+
file_creds = settings.gmail_service_account_file
31+
return bool(json_creds or (file_creds and Path(file_creds).exists()))
32+
33+
def _build_query(self, since: datetime | None) -> str:
34+
clauses: list[str] = []
35+
if since:
36+
clauses.append(f"after:{since.strftime('%Y/%m/%d')}")
37+
if self._labels:
38+
clauses.append(" OR ".join(f"label:{label}" for label in self._labels))
39+
return " ".join(filter(None, clauses))
40+
41+
async def sync(self, *, since: datetime | None = None) -> list[dict[str, Any]]:
42+
query = self._build_query(since)
43+
if settings.dry_run or not self.enabled:
44+
return [
45+
{
46+
"dry_run": True,
47+
"query": query,
48+
"labels": self._labels,
49+
"delegated_user": self._delegated_user,
50+
}
51+
]
52+
53+
token = await self._get_token()
54+
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
55+
params = {"q": query} if query else {}
56+
async with httpx.AsyncClient() as client:
57+
response = await client.get(
58+
"https://gmail.googleapis.com/gmail/v1/users/me/messages",
59+
headers=headers,
60+
params=params,
61+
timeout=30,
62+
)
63+
response.raise_for_status()
64+
message_list = response.json()
65+
66+
ids = [item.get("id") for item in message_list.get("messages", []) if item.get("id")]
67+
if not ids:
68+
return [message_list]
69+
70+
messages: list[dict[str, Any]] = []
71+
async with httpx.AsyncClient() as client:
72+
for message_id in ids:
73+
detail = await client.get(
74+
f"https://gmail.googleapis.com/gmail/v1/users/me/messages/{message_id}",
75+
headers=headers,
76+
timeout=30,
77+
)
78+
detail.raise_for_status()
79+
data = detail.json()
80+
payload = data.get("payload", {})
81+
headers_list = payload.get("headers", [])
82+
header_map = {item.get("name"): item.get("value") for item in headers_list}
83+
messages.append(
84+
{
85+
"id": message_id,
86+
"thread_id": data.get("threadId"),
87+
"snippet": data.get("snippet"),
88+
"subject": header_map.get("Subject"),
89+
"from": header_map.get("From"),
90+
"to": header_map.get("To"),
91+
"date": header_map.get("Date"),
92+
"labels": data.get("labelIds", []),
93+
}
94+
)
95+
return [message_list, {"messages": messages}]
96+
97+
async def _get_token(self) -> str:
98+
creds = await asyncio.to_thread(self._load_credentials)
99+
if not creds:
100+
raise RuntimeError("Gmail credentials missing")
101+
if not creds.valid or creds.expired:
102+
await asyncio.to_thread(creds.refresh, self._request())
103+
if not creds.token:
104+
raise RuntimeError("Failed to refresh Gmail access token")
105+
return creds.token
106+
107+
def _load_credentials(self):
108+
info: dict[str, Any] | None = None
109+
if settings.gmail_service_account_json:
110+
try:
111+
info = json.loads(settings.gmail_service_account_json)
112+
except json.JSONDecodeError as exc: # pragma: no cover - defensive
113+
logger.error("Invalid GMAIL_SERVICE_ACCOUNT_JSON: %s", exc)
114+
elif settings.gmail_service_account_file:
115+
try:
116+
info = json.loads(Path(settings.gmail_service_account_file).read_text(encoding="utf-8"))
117+
except FileNotFoundError:
118+
logger.error("GMAIL_SERVICE_ACCOUNT_FILE not found: %s", settings.gmail_service_account_file)
119+
except json.JSONDecodeError as exc: # pragma: no cover - defensive
120+
logger.error("Invalid GMAIL_SERVICE_ACCOUNT_FILE: %s", exc)
121+
if not info:
122+
return None
123+
124+
try:
125+
from google.oauth2 import service_account
126+
except ImportError as exc: # pragma: no cover - optional dependency
127+
raise RuntimeError("google-auth not installed") from exc
128+
129+
credentials = service_account.Credentials.from_service_account_info(info, scopes=self._scopes)
130+
if self._delegated_user:
131+
credentials = credentials.with_subject(self._delegated_user)
132+
return credentials
133+
134+
@staticmethod
135+
def _request():
136+
try:
137+
from google.auth.transport.requests import Request
138+
except ImportError as exc: # pragma: no cover - optional dependency
139+
raise RuntimeError("google-auth not installed") from exc
140+
return Request()

agent_pm/connectors/github.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""GitHub connector for synchronizing repositories, issues, and pull requests."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import UTC, datetime
6+
from typing import Any
7+
8+
import httpx
9+
10+
from agent_pm.connectors.base import Connector
11+
from agent_pm.settings import settings
12+
13+
14+
class GitHubConnector(Connector):
15+
def __init__(self) -> None:
16+
super().__init__(name="github")
17+
self._token = settings.github_token
18+
self._repositories = settings.github_repositories
19+
self._base_url = "https://api.github.com"
20+
21+
@property
22+
def enabled(self) -> bool:
23+
return bool(self._token and self._repositories)
24+
25+
async def _fetch(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
26+
if settings.dry_run or not self.enabled:
27+
return {"dry_run": True, "path": path, "params": params or {}, "repositories": self._repositories}
28+
29+
headers = {
30+
"Authorization": f"Bearer {self._token}",
31+
"Accept": "application/vnd.github+json",
32+
}
33+
async with httpx.AsyncClient() as client:
34+
response = await client.get(f"{self._base_url}{path}", headers=headers, params=params, timeout=30)
35+
response.raise_for_status()
36+
return response.json()
37+
38+
async def _sync_repo(self, repo: str, since: datetime | None) -> dict[str, Any]:
39+
params: dict[str, Any] = {}
40+
if since:
41+
params["since"] = since.astimezone(UTC).isoformat()
42+
43+
repo_data = await self._fetch(f"/repos/{repo}")
44+
issues = await self._fetch(f"/repos/{repo}/issues", params)
45+
pulls = await self._fetch(f"/repos/{repo}/pulls", params)
46+
return {
47+
"repository": repo,
48+
"repository_data": repo_data,
49+
"issues": issues,
50+
"pull_requests": pulls,
51+
}
52+
53+
async def sync(self, *, since: datetime | None = None) -> list[dict[str, Any]]:
54+
if not self._repositories:
55+
return []
56+
payloads = []
57+
for repo in self._repositories:
58+
payloads.append(await self._sync_repo(repo, since))
59+
return payloads

0 commit comments

Comments
 (0)