Skip to content

Commit b06b026

Browse files
authored
chore(ducklake): proper feature flag gating for DuckLake data-model copy (#42541)
1 parent f75590b commit b06b026

File tree

7 files changed

+203
-20
lines changed

7 files changed

+203
-20
lines changed

posthog/ducklake/README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ For local dev the defaults are:
2222
- `DUCKLAKE_S3_ACCESS_KEY=object_storage_root_user`
2323
- `DUCKLAKE_S3_SECRET_KEY=object_storage_root_password`
2424

25+
## Feature flag gating
26+
27+
The modeling workflow launches the DuckLake copy child only when the
28+
`ducklake-data-modeling-copy-workflow` feature flag is enabled for the team (as evaluated
29+
via `feature_enabled`). Create or update that flag locally to target the team you are testing
30+
with—otherwise the copy workflow will be skipped even if the rest of the configuration is correct.
31+
2532
## Target bucket layout
2633

2734
Every model copy is written to a deterministic prefix inside the DuckLake data bucket. Each workflow
@@ -50,12 +57,7 @@ For AWS S3, grant the worker role at least `s3:ListBucket`, `s3:GetObject`, `s3:
5057
Follow this checklist to exercise the DuckLake copy workflow on a local checkout without needing extra tribal knowledge:
5158

5259
1. **Start the dev stack**
53-
Run `hogli start` (or `bin/start`) so Postgres, MinIO, Temporal, and all DuckLake defaults are up.
54-
55-
```bash
56-
export DUCKLAKE_DATA_MODELING_COPY_WORKFLOW_ENABLED=true
57-
hogli start
58-
```
60+
Run `hogli start` (or `bin/start`) so Postgres, MinIO, Temporal, and all DuckLake defaults are up. Make sure the `ducklake-data-modeling-copy-workflow` feature flag is enabled for the team you plan to use.
5961

6062
2. **Trigger a model materialization from the app**
6163
In the PostHog UI, open Data Warehouse → Views, pick (or create) a view, open the Materialization section, enable it if needed, and click **Sync now**. This schedules the `data-modeling-run` workflow for that team/view.

posthog/settings/base_variables.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,6 @@
3535
"️WARNING! Environment variable E2E_TESTING is enabled. This is a security vulnerability unless you are running tests."
3636
)
3737

38-
ducklake_data_modeling_env = get_from_env(
39-
"DUCKLAKE_DATA_MODELING_COPY_WORKFLOW_ENABLED", optional=True, type_cast=str_to_bool
40-
)
41-
if ducklake_data_modeling_env is None:
42-
ducklake_data_modeling_env = get_from_env("DUCKLAKE_COPY_WORKFLOW_ENABLED", False, type_cast=str_to_bool)
43-
44-
DUCKLAKE_DATA_MODELING_COPY_WORKFLOW_ENABLED: bool = ducklake_data_modeling_env
45-
4638
IS_COLLECT_STATIC = len(sys.argv) > 1 and sys.argv[1] == "collectstatic"
4739
SERVER_GATEWAY_INTERFACE = get_from_env("SERVER_GATEWAY_INTERFACE", "WSGI", type_cast=str)
4840

posthog/temporal/data_modeling/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from posthog.temporal.data_modeling.ducklake_copy_workflow import (
22
DuckLakeCopyDataModelingWorkflow,
33
copy_data_modeling_model_to_ducklake_activity,
4+
ducklake_copy_workflow_gate_activity,
45
prepare_data_modeling_ducklake_metadata_activity,
56
)
67
from posthog.temporal.data_modeling.run_workflow import (
@@ -26,5 +27,6 @@
2627
create_job_model_activity,
2728
cleanup_running_jobs_activity,
2829
prepare_data_modeling_ducklake_metadata_activity,
30+
ducklake_copy_workflow_gate_activity,
2931
copy_data_modeling_model_to_ducklake_activity,
3032
]

posthog/temporal/data_modeling/ducklake_copy_workflow.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from django.conf import settings
88

99
import duckdb
10+
import posthoganalytics
1011
from structlog.contextvars import bind_contextvars
1112
from temporalio import activity, workflow
1213
from temporalio.common import RetryPolicy
@@ -18,6 +19,8 @@
1819
get_config,
1920
normalize_endpoint,
2021
)
22+
from posthog.exceptions_capture import capture_exception
23+
from posthog.models import Team
2124
from posthog.sync import database_sync_to_async
2225
from posthog.temporal.common.base import PostHogWorkflow
2326
from posthog.temporal.common.logger import get_logger
@@ -49,6 +52,39 @@ class DuckLakeCopyActivityInputs:
4952
model: DuckLakeCopyModelMetadata
5053

5154

55+
@dataclasses.dataclass
56+
class DuckLakeCopyWorkflowGateInputs:
57+
team_id: int
58+
59+
60+
@activity.defn
61+
async def ducklake_copy_workflow_gate_activity(inputs: DuckLakeCopyWorkflowGateInputs) -> bool:
62+
"""Evaluate whether the DuckLake copy workflow should run for a team."""
63+
bind_contextvars(team_id=inputs.team_id)
64+
logger = LOGGER.bind()
65+
66+
try:
67+
team = await database_sync_to_async(Team.objects.only("uuid", "organization_id").get)(id=inputs.team_id)
68+
except Team.DoesNotExist:
69+
await logger.aerror("Team does not exist when evaluating DuckLake copy workflow gate")
70+
return False
71+
72+
try:
73+
return posthoganalytics.feature_enabled(
74+
"ducklake-data-modeling-copy-workflow",
75+
str(team.uuid),
76+
groups={"organization": str(team.organization_id)},
77+
only_evaluate_locally=True,
78+
)
79+
except Exception as error:
80+
await logger.awarning(
81+
"Failed to evaluate DuckLake copy workflow feature flag",
82+
error=str(error),
83+
)
84+
capture_exception(error)
85+
return False
86+
87+
5288
@activity.defn
5389
async def prepare_data_modeling_ducklake_metadata_activity(
5490
inputs: DataModelingDuckLakeCopyInputs,
@@ -137,6 +173,20 @@ async def run(self, inputs: DataModelingDuckLakeCopyInputs) -> None:
137173
workflow.logger.info("No models to copy - exiting early", **inputs.properties_to_log)
138174
return
139175

176+
should_copy = await workflow.execute_activity(
177+
ducklake_copy_workflow_gate_activity,
178+
DuckLakeCopyWorkflowGateInputs(team_id=inputs.team_id),
179+
start_to_close_timeout=dt.timedelta(seconds=30),
180+
retry_policy=RetryPolicy(maximum_attempts=1),
181+
)
182+
183+
if not should_copy:
184+
workflow.logger.info(
185+
"DuckLake copy workflow disabled by feature flag",
186+
**inputs.properties_to_log,
187+
)
188+
return
189+
140190
metadata = await workflow.execute_activity(
141191
prepare_data_modeling_ducklake_metadata_activity,
142192
inputs,

posthog/temporal/data_modeling/run_workflow.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1565,11 +1565,7 @@ async def run(self, inputs: RunWorkflowInputs) -> Results:
15651565
),
15661566
)
15671567

1568-
if (
1569-
settings.DUCKLAKE_DATA_MODELING_COPY_WORKFLOW_ENABLED
1570-
and self.ducklake_copy_inputs
1571-
and self.ducklake_copy_inputs.models
1572-
):
1568+
if self.ducklake_copy_inputs and self.ducklake_copy_inputs.models:
15731569
temporalio.workflow.logger.info(
15741570
"Triggering DuckLake copy child workflow",
15751571
job_id=job_id,

posthog/temporal/tests/data_modeling/test_ducklake_copy_workflow.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import uuid
2+
import datetime as dt
23

34
import pytest
45

56
from django.test import override_settings
67

8+
import temporalio.worker
9+
from temporalio import activity as temporal_activity
10+
from temporalio.testing import WorkflowEnvironment
11+
712
from posthog.temporal.data_modeling import ducklake_copy_workflow as ducklake_module
813
from posthog.temporal.data_modeling.ducklake_copy_workflow import (
914
DuckLakeCopyActivityInputs,
@@ -126,3 +131,139 @@ def fake_ensure_bucket(config):
126131
assert "read_parquet('s3://source/table/**/*.parquet')" in table_calls[0]
127132
assert any("ATTACH" in statement for statement in fake_conn.sql_statements), "Expected DuckLake catalog attach"
128133
assert fake_conn.closed is True
134+
135+
136+
@pytest.mark.asyncio
137+
@pytest.mark.django_db
138+
async def test_ducklake_copy_workflow_skips_when_feature_flag_disabled(monkeypatch, ateam):
139+
call_counts = {"metadata": 0, "copy": 0}
140+
141+
@temporal_activity.defn
142+
async def metadata_stub(inputs: DataModelingDuckLakeCopyInputs):
143+
call_counts["metadata"] += 1
144+
return [
145+
DuckLakeCopyModelMetadata(
146+
model_label="model",
147+
saved_query_id=str(uuid.uuid4()),
148+
saved_query_name="model",
149+
normalized_name="model",
150+
source_glob_uri="s3://source/table/**/*.parquet",
151+
schema_name="data_modeling_team_1",
152+
table_name="model",
153+
)
154+
]
155+
156+
@temporal_activity.defn
157+
async def copy_stub(inputs: DuckLakeCopyActivityInputs):
158+
call_counts["copy"] += 1
159+
160+
monkeypatch.setattr(
161+
"posthog.temporal.data_modeling.ducklake_copy_workflow.posthoganalytics.feature_enabled",
162+
lambda *args, **kwargs: False,
163+
)
164+
monkeypatch.setattr(ducklake_module, "prepare_data_modeling_ducklake_metadata_activity", metadata_stub)
165+
monkeypatch.setattr(ducklake_module, "copy_data_modeling_model_to_ducklake_activity", copy_stub)
166+
167+
inputs = DataModelingDuckLakeCopyInputs(
168+
team_id=ateam.pk,
169+
job_id="job",
170+
models=[
171+
DuckLakeCopyModelInput(
172+
model_label="model",
173+
saved_query_id=str(uuid.uuid4()),
174+
table_uri="s3://source/table",
175+
file_uris=["s3://source/table/part-0.parquet"],
176+
)
177+
],
178+
)
179+
180+
async with await WorkflowEnvironment.start_time_skipping() as env:
181+
async with temporalio.worker.Worker(
182+
env.client,
183+
task_queue="ducklake-test",
184+
workflows=[ducklake_module.DuckLakeCopyDataModelingWorkflow],
185+
activities=[
186+
ducklake_module.ducklake_copy_workflow_gate_activity,
187+
ducklake_module.prepare_data_modeling_ducklake_metadata_activity,
188+
ducklake_module.copy_data_modeling_model_to_ducklake_activity,
189+
],
190+
workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(),
191+
):
192+
await env.client.execute_workflow(
193+
ducklake_module.DuckLakeCopyDataModelingWorkflow.run,
194+
inputs,
195+
id=str(uuid.uuid4()),
196+
task_queue="ducklake-test",
197+
execution_timeout=dt.timedelta(seconds=30),
198+
)
199+
200+
assert call_counts["metadata"] == 0
201+
assert call_counts["copy"] == 0
202+
203+
204+
@pytest.mark.asyncio
205+
@pytest.mark.django_db
206+
async def test_ducklake_copy_workflow_runs_when_feature_flag_enabled(monkeypatch, ateam):
207+
call_counts = {"metadata": 0, "copy": 0}
208+
209+
@temporal_activity.defn
210+
async def metadata_stub(inputs: DataModelingDuckLakeCopyInputs):
211+
call_counts["metadata"] += 1
212+
return [
213+
DuckLakeCopyModelMetadata(
214+
model_label="model",
215+
saved_query_id=str(uuid.uuid4()),
216+
saved_query_name="model",
217+
normalized_name="model",
218+
source_glob_uri="s3://source/table/**/*.parquet",
219+
schema_name="data_modeling_team_1",
220+
table_name="model",
221+
)
222+
]
223+
224+
@temporal_activity.defn
225+
async def copy_stub(inputs: DuckLakeCopyActivityInputs):
226+
call_counts["copy"] += 1
227+
228+
monkeypatch.setattr(
229+
"posthog.temporal.data_modeling.ducklake_copy_workflow.posthoganalytics.feature_enabled",
230+
lambda *args, **kwargs: True,
231+
)
232+
monkeypatch.setattr(ducklake_module, "prepare_data_modeling_ducklake_metadata_activity", metadata_stub)
233+
monkeypatch.setattr(ducklake_module, "copy_data_modeling_model_to_ducklake_activity", copy_stub)
234+
235+
inputs = DataModelingDuckLakeCopyInputs(
236+
team_id=ateam.pk,
237+
job_id="job",
238+
models=[
239+
DuckLakeCopyModelInput(
240+
model_label="model",
241+
saved_query_id=str(uuid.uuid4()),
242+
table_uri="s3://source/table",
243+
file_uris=["s3://source/table/part-0.parquet"],
244+
)
245+
],
246+
)
247+
248+
async with await WorkflowEnvironment.start_time_skipping() as env:
249+
async with temporalio.worker.Worker(
250+
env.client,
251+
task_queue="ducklake-test",
252+
workflows=[ducklake_module.DuckLakeCopyDataModelingWorkflow],
253+
activities=[
254+
ducklake_module.ducklake_copy_workflow_gate_activity,
255+
ducklake_module.prepare_data_modeling_ducklake_metadata_activity,
256+
ducklake_module.copy_data_modeling_model_to_ducklake_activity,
257+
],
258+
workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(),
259+
):
260+
await env.client.execute_workflow(
261+
ducklake_module.DuckLakeCopyDataModelingWorkflow.run,
262+
inputs,
263+
id=str(uuid.uuid4()),
264+
task_queue="ducklake-test",
265+
execution_timeout=dt.timedelta(seconds=30),
266+
)
267+
268+
assert call_counts["metadata"] == 1
269+
assert call_counts["copy"] == 1

posthog/temporal/tests/data_modeling/test_run_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ async def fail_jobs_stub(inputs):
10311031
monkeypatch.setattr(run_workflow_module, "finish_run_activity", finish_run_stub)
10321032
monkeypatch.setattr(run_workflow_module, "fail_jobs_activity", fail_jobs_stub)
10331033

1034-
with override_settings(DUCKLAKE_DATA_MODELING_COPY_WORKFLOW_ENABLED=True, DATA_MODELING_TASK_QUEUE="ducklake-test"):
1034+
with override_settings(DATA_MODELING_TASK_QUEUE="ducklake-test"):
10351035
child_ducklake_workflow_runs.clear()
10361036
async with await WorkflowEnvironment.start_time_skipping() as env:
10371037
async with temporalio.worker.Worker(

0 commit comments

Comments
 (0)