Skip to content

Commit aefbfb3

Browse files
authored
feat(array): split out snapshot creation from task processing (#42488)
1 parent 6fec5f1 commit aefbfb3

37 files changed

+1358
-1279
lines changed

products/tasks/backend/models.py

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -245,19 +245,8 @@ def get_artifact_s3_prefix(self) -> str:
245245

246246
def append_log(self, entries: list[dict]):
247247
"""Append log entries to S3 storage."""
248-
249-
existing_content = ""
250-
is_new_file = False
251-
try:
252-
existing_content = object_storage.read(self.log_url) or ""
253-
except Exception as e:
254-
is_new_file = True
255-
logger.debug(
256-
"task_run.no_existing_logs",
257-
task_run_id=str(self.id),
258-
log_url=self.log_url,
259-
error=str(e),
260-
)
248+
existing_content = object_storage.read(self.log_url, missing_ok=True) or ""
249+
is_new_file = not existing_content
261250

262251
new_lines = "\n".join(json.dumps(entry) for entry in entries)
263252
content = existing_content + ("\n" if existing_content else "") + new_lines
Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,38 @@
1+
from .create_snapshot.activities import (
2+
cleanup_sandbox as snapshot_cleanup_sandbox,
3+
clone_repository as snapshot_clone_repository,
4+
create_sandbox as snapshot_create_sandbox,
5+
create_snapshot as snapshot_create_snapshot,
6+
get_snapshot_context,
7+
setup_repository as snapshot_setup_repository,
8+
)
9+
from .create_snapshot.workflow import CreateSnapshotForRepositoryWorkflow
110
from .process_task.activities import (
2-
check_snapshot_exists_for_repository,
3-
cleanup_personal_api_key,
411
cleanup_sandbox,
5-
clone_repository,
6-
create_sandbox_from_snapshot,
7-
create_snapshot,
812
execute_task_in_sandbox,
9-
get_sandbox_for_setup,
13+
get_sandbox_for_repository,
1014
get_task_processing_context,
11-
setup_repository,
1215
track_workflow_event,
1316
)
1417
from .process_task.workflow import ProcessTaskWorkflow
1518

1619
WORKFLOWS = [
1720
ProcessTaskWorkflow,
21+
CreateSnapshotForRepositoryWorkflow,
1822
]
1923

2024
ACTIVITIES = [
25+
# process_task activities
2126
get_task_processing_context,
22-
check_snapshot_exists_for_repository,
23-
get_sandbox_for_setup,
24-
clone_repository,
25-
setup_repository,
26-
create_snapshot,
27-
create_sandbox_from_snapshot,
27+
get_sandbox_for_repository,
2828
execute_task_in_sandbox,
29-
cleanup_personal_api_key,
3029
cleanup_sandbox,
3130
track_workflow_event,
31+
# create_snapshot activities
32+
get_snapshot_context,
33+
snapshot_create_sandbox,
34+
snapshot_clone_repository,
35+
snapshot_setup_repository,
36+
snapshot_create_snapshot,
37+
snapshot_cleanup_sandbox,
3238
]

products/tasks/backend/temporal/conftest.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55

66
from temporalio.testing import ActivityEnvironment
77

8-
from posthog.models import Integration, Organization, OrganizationMembership, Team, User
8+
from posthog.models import Integration, OAuthApplication, Organization, OrganizationMembership, Team, User
99
from posthog.temporal.common.logger import configure_logger
1010

1111
from products.tasks.backend.models import SandboxSnapshot, Task, TaskRun
1212
from products.tasks.backend.services.sandbox import Sandbox, SandboxConfig, SandboxTemplate
13+
from products.tasks.backend.temporal.create_snapshot.activities.get_snapshot_context import SnapshotContext
14+
from products.tasks.backend.temporal.oauth import ARRAY_APP_CLIENT_ID_DEV
1315
from products.tasks.backend.temporal.process_task.activities.get_task_processing_context import TaskProcessingContext
1416

1517

@@ -19,6 +21,22 @@ def activity_environment():
1921
return ActivityEnvironment()
2022

2123

24+
@pytest.fixture
25+
def array_oauth_app():
26+
"""Create the Array OAuth application for tests."""
27+
app, _ = OAuthApplication.objects.get_or_create(
28+
client_id=ARRAY_APP_CLIENT_ID_DEV,
29+
defaults={
30+
"name": "Array Test App",
31+
"client_type": OAuthApplication.CLIENT_PUBLIC,
32+
"authorization_grant_type": OAuthApplication.GRANT_AUTHORIZATION_CODE,
33+
"redirect_uris": "https://app.posthog.com/callback",
34+
"algorithm": "RS256",
35+
},
36+
)
37+
yield app
38+
39+
2240
@pytest.fixture
2341
def organization():
2442
"""A test organization."""
@@ -43,6 +61,12 @@ def team(organization):
4361
team.delete()
4462

4563

64+
@pytest.fixture
65+
def test_team(team):
66+
"""Alias for team fixture."""
67+
return team
68+
69+
4670
@pytest.fixture
4771
def user(team):
4872
user = User.objects.create(
@@ -75,7 +99,7 @@ def github_integration(team):
7599

76100

77101
@pytest.fixture
78-
def test_task(team, user, github_integration):
102+
def test_task(team, user, github_integration, array_oauth_app):
79103
"""Create a test task."""
80104

81105
task = Task.objects.create(
@@ -120,6 +144,16 @@ def task_context(test_task, test_task_run) -> TaskProcessingContext:
120144
)
121145

122146

147+
@pytest.fixture
148+
def snapshot_context(github_integration, team) -> SnapshotContext:
149+
"""Create a SnapshotContext for testing."""
150+
return SnapshotContext(
151+
github_integration_id=github_integration.id,
152+
repository="posthog/posthog-js",
153+
team_id=team.id,
154+
)
155+
156+
123157
@pytest.fixture(autouse=True)
124158
def configure_logger_auto() -> None:
125159
"""Configure logger when running in a Temporal activity environment."""
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from .activities import (
2+
CleanupSandboxInput,
3+
CloneRepositoryInput,
4+
CreateSandboxInput,
5+
CreateSandboxOutput,
6+
CreateSnapshotInput,
7+
GetSnapshotContextInput,
8+
SetupRepositoryInput,
9+
SnapshotContext,
10+
cleanup_sandbox,
11+
clone_repository,
12+
create_sandbox,
13+
create_snapshot,
14+
get_snapshot_context,
15+
setup_repository,
16+
)
17+
from .workflow import (
18+
CreateSnapshotForRepositoryInput,
19+
CreateSnapshotForRepositoryOutput,
20+
CreateSnapshotForRepositoryWorkflow,
21+
)
22+
23+
__all__ = [
24+
"CleanupSandboxInput",
25+
"CloneRepositoryInput",
26+
"CreateSandboxInput",
27+
"CreateSandboxOutput",
28+
"CreateSnapshotForRepositoryInput",
29+
"CreateSnapshotForRepositoryOutput",
30+
"CreateSnapshotForRepositoryWorkflow",
31+
"CreateSnapshotInput",
32+
"GetSnapshotContextInput",
33+
"SetupRepositoryInput",
34+
"SnapshotContext",
35+
"cleanup_sandbox",
36+
"clone_repository",
37+
"create_sandbox",
38+
"create_snapshot",
39+
"get_snapshot_context",
40+
"setup_repository",
41+
]
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from .cleanup_sandbox import CleanupSandboxInput, cleanup_sandbox
2+
from .clone_repository import CloneRepositoryInput, clone_repository
3+
from .create_sandbox import CreateSandboxInput, CreateSandboxOutput, create_sandbox
4+
from .create_snapshot import CreateSnapshotInput, create_snapshot
5+
from .get_snapshot_context import GetSnapshotContextInput, SnapshotContext, get_snapshot_context
6+
from .setup_repository import SetupRepositoryInput, setup_repository
7+
8+
__all__ = [
9+
"CleanupSandboxInput",
10+
"CloneRepositoryInput",
11+
"CreateSandboxInput",
12+
"CreateSandboxOutput",
13+
"CreateSnapshotInput",
14+
"GetSnapshotContextInput",
15+
"SetupRepositoryInput",
16+
"SnapshotContext",
17+
"cleanup_sandbox",
18+
"clone_repository",
19+
"create_sandbox",
20+
"create_snapshot",
21+
"get_snapshot_context",
22+
"setup_repository",
23+
]
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import logging
2+
from dataclasses import dataclass
3+
4+
from temporalio import activity
5+
6+
from posthog.temporal.common.utils import asyncify
7+
8+
from products.tasks.backend.services.sandbox import Sandbox
9+
from products.tasks.backend.temporal.observability import log_activity_execution
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
@dataclass
15+
class CleanupSandboxInput:
16+
sandbox_id: str
17+
18+
19+
@activity.defn
20+
@asyncify
21+
def cleanup_sandbox(input: CleanupSandboxInput) -> None:
22+
with log_activity_execution(
23+
"cleanup_sandbox",
24+
sandbox_id=input.sandbox_id,
25+
):
26+
try:
27+
sandbox = Sandbox.get_by_id(input.sandbox_id)
28+
sandbox.destroy()
29+
except Exception:
30+
# The sandbox has a timeout, and it will eventually terminate if we failed to cleanup
31+
pass

products/tasks/backend/temporal/process_task/activities/clone_repository.py renamed to products/tasks/backend/temporal/create_snapshot/activities/clone_repository.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,34 @@
66

77
from products.tasks.backend.services.sandbox import Sandbox
88
from products.tasks.backend.temporal.exceptions import GitHubAuthenticationError, RepositoryCloneError
9-
from products.tasks.backend.temporal.observability import emit_agent_log, log_activity_execution
9+
from products.tasks.backend.temporal.observability import log_activity_execution
1010

1111
from ..utils import get_github_token
12-
from .get_task_processing_context import TaskProcessingContext
12+
from .get_snapshot_context import SnapshotContext
1313

1414

1515
@dataclass
1616
class CloneRepositoryInput:
17-
context: TaskProcessingContext
17+
context: SnapshotContext
1818
sandbox_id: str
1919

2020

2121
@activity.defn
2222
@asyncify
2323
def clone_repository(input: CloneRepositoryInput) -> str:
24-
"""Clone repository into sandbox. Idempotent: wipes existing directory. Returns clone logs."""
2524
ctx = input.context
2625

2726
with log_activity_execution(
2827
"clone_repository",
2928
sandbox_id=input.sandbox_id,
3029
**ctx.to_log_context(),
3130
):
32-
emit_agent_log(ctx.run_id, "info", f"Cloning repository {ctx.repository}")
33-
3431
try:
3532
github_token = get_github_token(ctx.github_integration_id)
3633
except Exception as e:
3734
raise GitHubAuthenticationError(
3835
f"Failed to get GitHub token for integration {ctx.github_integration_id}",
39-
{"github_integration_id": ctx.github_integration_id, "task_id": ctx.task_id, "error": str(e)},
36+
{"github_integration_id": ctx.github_integration_id, "error": str(e)},
4037
cause=e,
4138
)
4239

@@ -50,7 +47,6 @@ def clone_repository(input: CloneRepositoryInput) -> str:
5047
{
5148
"repository": ctx.repository,
5249
"sandbox_id": input.sandbox_id,
53-
"task_id": ctx.task_id,
5450
"error": str(e),
5551
},
5652
cause=e,
@@ -63,10 +59,8 @@ def clone_repository(input: CloneRepositoryInput) -> str:
6359
"repository": ctx.repository,
6460
"exit_code": result.exit_code,
6561
"stderr": result.stderr[:500],
66-
"task_id": ctx.task_id,
6762
},
6863
cause=RuntimeError(f"Git clone exited with code {result.exit_code}: {result.stderr[:200]}"),
6964
)
7065

71-
# NOTE: git clone returns it's output in stderr
7266
return result.stderr
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from dataclasses import dataclass
2+
3+
from temporalio import activity
4+
5+
from posthog.temporal.common.utils import asyncify
6+
7+
from products.tasks.backend.services.sandbox import Sandbox, SandboxConfig, SandboxTemplate
8+
from products.tasks.backend.temporal.observability import log_activity_execution
9+
10+
from ..utils import get_sandbox_name_for_snapshot
11+
from .get_snapshot_context import SnapshotContext
12+
13+
14+
@dataclass
15+
class CreateSandboxInput:
16+
context: SnapshotContext
17+
18+
19+
@dataclass
20+
class CreateSandboxOutput:
21+
sandbox_id: str
22+
23+
24+
@activity.defn
25+
@asyncify
26+
def create_sandbox(input: CreateSandboxInput) -> CreateSandboxOutput:
27+
ctx = input.context
28+
29+
with log_activity_execution(
30+
"create_sandbox",
31+
**ctx.to_log_context(),
32+
):
33+
config = SandboxConfig(
34+
name=get_sandbox_name_for_snapshot(ctx.github_integration_id, ctx.repository),
35+
template=SandboxTemplate.DEFAULT_BASE,
36+
environment_variables={},
37+
snapshot_id=None,
38+
metadata={"purpose": "snapshot_creation"},
39+
)
40+
41+
sandbox = Sandbox.create(config)
42+
43+
activity.logger.info(f"Created sandbox {sandbox.id} for snapshot creation")
44+
45+
return CreateSandboxOutput(sandbox_id=sandbox.id)

0 commit comments

Comments
 (0)