Skip to content

Commit 8a2fb67

Browse files
authored
AE-956 [ads] Collect DAP results and add to BigQuery table (#474)
* Initial version of ads-attribution-dap-collector job
1 parent 7b86b2c commit 8a2fb67

File tree

22 files changed

+1682
-0
lines changed

22 files changed

+1682
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
###
2+
# This file was generated by docker-etl/ci_config.py.
3+
# Changes should be made to job ci_job.yaml files and re-generated.
4+
###
5+
6+
name: ads-attribution-dap-collector
7+
8+
on:
9+
push:
10+
branches:
11+
- main
12+
paths:
13+
- 'jobs/ads-attribution-dap-collector/**'
14+
- '.github/workflows/job-ads-attribution-dap-collector.yml'
15+
pull_request:
16+
paths:
17+
- 'jobs/ads-attribution-dap-collector/**'
18+
- '.github/workflows/job-ads-attribution-dap-collector.yml'
19+
20+
jobs:
21+
build-job-ads-attribution-dap-collector:
22+
runs-on: ubuntu-latest
23+
steps:
24+
- name: Checkout code
25+
uses: actions/checkout@v6
26+
27+
- name: Build the Docker image
28+
# yamllint disable
29+
run: |
30+
docker build jobs/ads-attribution-dap-collector -t us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest
31+
# yamllint enable
32+
- name: Test Code
33+
run: docker run us-docker.pkg.dev/moz-fx-data-artifacts-prod/docker-etl/ads-attribution-dap-collector:latest python3 -m pytest
34+
35+
push-job-ads-attribution-dap-collector:
36+
runs-on: ubuntu-latest
37+
needs: build-job-ads-attribution-dap-collector
38+
if: github.ref == 'refs/heads/main'
39+
steps:
40+
- name: Checkout code
41+
uses: actions/checkout@v6
42+
43+
- name: Authenticate to Google Cloud
44+
uses: google-github-actions/auth@v2
45+
with:
46+
credentials_json: ${{ secrets.GCP_CREDENTIALS }}
47+
48+
- name: Set up Cloud SDK
49+
uses: google-github-actions/setup-gcloud@v2
50+
51+
- name: Configure Docker for GCR
52+
run: gcloud auth configure-docker
53+
54+
- name: Build Docker image
55+
run: docker build jobs/ads-attribution-dap-collector/ -t gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest
56+
57+
- name: Push to GCR
58+
run: docker push gcr.io/$${{ secrets.GCP_PROJECT }}/ads-attribution-dap-collector_docker_etl:latest
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
.ci_job.yaml
2+
.ci_workflow.yaml
3+
.DS_Store
4+
*.pyc
5+
.pytest_cache/
6+
__pycache__/
7+
venv/
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[flake8]
2+
max-line-length = 88
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.DS_Store
2+
*.pyc
3+
__pycache__/
4+
venv/
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
FROM python:3.12
2+
LABEL maintainer="Glenda Leonard <[email protected]>"
3+
ARG HOME="/janus_build"
4+
WORKDIR ${HOME}
5+
6+
RUN apt update && apt --yes install curl
7+
8+
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
9+
ENV PATH=$HOME/.cargo/bin:$PATH
10+
11+
# build the CLI tool
12+
RUN git clone --depth 1 https://github.com/divviup/janus.git --branch '0.7.69'
13+
RUN cd janus && cargo build -r -p janus_tools --bin collect
14+
15+
######### next stage
16+
17+
FROM python:3.12
18+
LABEL maintainer="Glenda Leonard <[email protected]>"
19+
# https://github.com/mozilla-services/Dockerflow/blob/master/docs/building-container.md
20+
ARG USER_ID="10001"
21+
ARG GROUP_ID="app"
22+
ARG HOME="/app"
23+
ENV HOME=${HOME}
24+
25+
RUN groupadd --gid ${USER_ID} ${GROUP_ID} && \
26+
useradd --create-home --uid ${USER_ID} --gid ${GROUP_ID} --home-dir ${HOME} ${GROUP_ID}
27+
28+
WORKDIR ${HOME}
29+
COPY --from=0 /janus_build/janus/target/release/collect ./
30+
31+
RUN pip install --upgrade pip
32+
33+
COPY requirements.txt requirements.txt
34+
RUN pip install -r requirements.txt
35+
36+
COPY . .
37+
38+
RUN pip install .
39+
40+
# Drop root and change ownership of the application folder to the user
41+
RUN chown -R ${USER_ID}:${GROUP_ID} ${HOME}
42+
USER ${USER_ID}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Ads Attribution DAP Collection Job
2+
3+
This job collects metrics from DAP and write the results to BigQuery.
4+
5+
## Overview
6+
This job is driven by a config file from a GCS bucket. Use `job_config_gcp_project`
7+
and `job_config_bucket` to specify the file. The config file must be named
8+
`attribution-conf.json` and a sample is available [here](https://github.com/mozilla-services/mars/tree/main/internal/gcp/storage/testdata/mars-attribution-config).
9+
10+
## Usage
11+
12+
This script is intended to be run in a docker container.
13+
Build the docker image with:
14+
15+
It requires setup of some environment variables that hold DAP credentials, and the job will look for those when it
16+
starts up. A dev script, `dev_run_docker.sh`, is included for convenience to build and run the job locally, and it
17+
also documents those variables.
18+
19+
Once the environment variables are set up, run the job with:
20+
21+
22+
```sh
23+
./dev_run_docker.sh
24+
```
25+
To just build the docker image, use:
26+
```
27+
docker build -t ads-attribution-dap-collector .
28+
```
29+
30+
Sample attribution-conf.json file
31+
```shell
32+
{
33+
"collection_config": {
34+
"hpke_config": "hpke-config"
35+
},
36+
"advertisers": [
37+
{
38+
"name": "mozilla",
39+
"partner_id": "295beef7-1e3b-4128-b8f8-858e12aa1234",
40+
"start_date": "2026-01-08",
41+
"collector_duration": 604800,
42+
"conversion_type": "view",
43+
"lookback_window": 7
44+
}
45+
],
46+
"partners": {
47+
"295beef7-1e3b-4128-b8f8-858e12aa1234": {
48+
"task_id": "<task_id>",
49+
"vdaf": "histogram",
50+
"bits": 0,
51+
"length": 40,
52+
"time_precision": 60,
53+
"default_measurement": 0
54+
}
55+
},
56+
"ads": {
57+
"provider:1234": {
58+
"partner_id": "295beef7-1e3b-4128-b8f8-858e12aa1234",
59+
"index": 1
60+
}
61+
}
62+
}
63+
64+
```
65+
66+
## Testing
67+
68+
First create the job venv using
69+
```
70+
python -m venv ./venv
71+
source ./venv/bin/activat
72+
pip install -r requirements.txt
73+
```
74+
Run tests from `/jobs/ads-attribution-dap-collector` using:
75+
`python -m pytest`
76+
77+
## Linting and Formatting
78+
```
79+
black .
80+
```
81+
```
82+
flake8 .
83+
```

jobs/ads-attribution-dap-collector/ads_attribution_dap_collector/__init__.py

Whitespace-only changes.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import ast
2+
import logging
3+
import subprocess
4+
import re
5+
6+
from datetime import date, datetime, timedelta
7+
8+
DAP_LEADER = "https://dap-09-3.api.divviup.org"
9+
VDAF = "histogram"
10+
PROCESS_TIMEOUT = 1200 # 20 mins
11+
12+
13+
def get_aggregated_results(
14+
process_date: date,
15+
batch_start: date,
16+
batch_end: date,
17+
task_id: str,
18+
vdaf_length: int,
19+
collector_duration: int,
20+
bearer_token: str,
21+
hpke_config: str,
22+
hpke_private_key: str,
23+
) -> dict:
24+
process_batch = _should_collect_batch(process_date, batch_end)
25+
26+
if process_batch:
27+
# Step 4 Collect DAP results.
28+
aggregated_results = collect_dap_result(
29+
task_id=task_id,
30+
vdaf_length=vdaf_length,
31+
batch_start=batch_start,
32+
duration=collector_duration,
33+
bearer_token=bearer_token,
34+
hpke_config=hpke_config,
35+
hpke_private_key=hpke_private_key,
36+
)
37+
38+
return aggregated_results
39+
40+
41+
def current_batch_start(
42+
process_date: date, partner_start_date: date, duration: int
43+
) -> date | None:
44+
if process_date < partner_start_date:
45+
return None
46+
47+
if (
48+
partner_start_date
49+
<= process_date
50+
< partner_start_date + timedelta(seconds=duration)
51+
):
52+
return partner_start_date
53+
54+
# After the first interval ...
55+
batch_start = partner_start_date
56+
while True:
57+
next_start = batch_start + timedelta(seconds=duration)
58+
# check if the process_date is the batch_end date
59+
# if yes we only need to go back 1 duration to get the start
60+
if next_start + timedelta(days=-1) == process_date:
61+
return next_start + timedelta(seconds=-duration)
62+
63+
# this means the process date is in the next interval so
64+
# need to go back 2 durations to get the batch_start
65+
if next_start > process_date:
66+
return next_start + timedelta(seconds=-2 * duration)
67+
68+
batch_start = next_start
69+
70+
71+
def current_batch_end(batch_start: date, duration: int) -> date:
72+
# since the start and end dates are inclusive need to subtract 1 from duration
73+
return batch_start + timedelta(seconds=duration, days=-1)
74+
75+
76+
def _should_collect_batch(process_date, batch_end) -> bool:
77+
return batch_end == process_date
78+
79+
80+
def _correct_wraparound(num: int) -> int:
81+
field_prime = 340282366920938462946865773367900766209
82+
field_size = 128
83+
cutoff = 2 ** (field_size - 1)
84+
if num > cutoff:
85+
return num - field_prime
86+
return num
87+
88+
89+
def _parse_histogram(histogram_str: str) -> dict:
90+
parsed_list = ast.literal_eval(histogram_str)
91+
return {i: _correct_wraparound(val) for i, val in enumerate(parsed_list)}
92+
93+
94+
def _parse_http_error(text: str) -> tuple[int, str, str | None] | None:
95+
"""
96+
Returns (status_code, status_text, error_message)
97+
or None if the pattern is not found.
98+
"""
99+
ERROR_RE = re.compile(
100+
r"HTTP response status\s+(\d+)\s+([A-Za-z ]+)(?:\s+-\s+(.*))?$"
101+
)
102+
match = ERROR_RE.search(text)
103+
if not match:
104+
return None
105+
106+
status_code = int(match.group(1))
107+
status_text = match.group(2).strip()
108+
error_message = match.group(3).strip() if match.group(3) else None
109+
return status_code, status_text, error_message
110+
111+
112+
# DAP functions
113+
def collect_dap_result(
114+
task_id: str,
115+
vdaf_length: int,
116+
batch_start: date,
117+
duration: int,
118+
bearer_token: str,
119+
hpke_config: str,
120+
hpke_private_key: str,
121+
) -> dict:
122+
# Beware! This command string reveals secrets. Use logging only for
123+
# debugging in local dev.
124+
125+
batch_start_epoch = int(
126+
datetime.combine(batch_start, datetime.min.time()).timestamp()
127+
)
128+
129+
try:
130+
result = subprocess.run(
131+
[
132+
"./collect",
133+
"--task-id",
134+
task_id,
135+
"--leader",
136+
DAP_LEADER,
137+
"--vdaf",
138+
VDAF,
139+
"--length",
140+
f"{vdaf_length}",
141+
"--authorization-bearer-token",
142+
bearer_token,
143+
"--batch-interval-start",
144+
f"{batch_start_epoch}",
145+
"--batch-interval-duration",
146+
f"{duration}",
147+
"--hpke-config",
148+
hpke_config,
149+
"--hpke-private-key",
150+
hpke_private_key,
151+
],
152+
capture_output=True,
153+
text=True,
154+
check=True,
155+
timeout=PROCESS_TIMEOUT,
156+
)
157+
for line in result.stdout.splitlines():
158+
if line.startswith("Aggregation result:"):
159+
entries = _parse_histogram(line[21:-1])
160+
return entries
161+
# Beware! Exceptions thrown by the subprocess reveal secrets.
162+
# Log them and include traceback only for debugging in local dev.
163+
except subprocess.CalledProcessError as e:
164+
result = _parse_http_error(e.stderr)
165+
if result is None:
166+
logging.error(e)
167+
raise Exception(
168+
f"Collection failed for {task_id}, {e.returncode}, stderr: {e.stderr}"
169+
) from None
170+
else:
171+
status_code, status_text, error_message = result
172+
if status_code == 400:
173+
logging.info(
174+
f"Collection failed for {task_id}, {status_code} {status_text}"
175+
f" {error_message}"
176+
)
177+
elif status_code == 404:
178+
detail = (
179+
error_message
180+
if error_message is not None
181+
else "Verify start date is not more than 14 days ago."
182+
)
183+
logging.info(
184+
f"Collection failed for {task_id}, {status_code} {status_text} "
185+
f"{detail}"
186+
)
187+
except subprocess.TimeoutExpired as e:
188+
raise Exception(
189+
f"Collection timed out for {task_id}, {e.timeout}, stderr: {e.stderr}"
190+
) from None

0 commit comments

Comments
 (0)