-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Summary
Replace the current scheduled workflow (every 15 minutes, 6 function executions) with data modeling event triggers to eliminate wasteful serverless function executions.
Current Architecture
The Problem
Today: Workflow triggers every 15 minutes, spinning up 6 functions:
- 1x
fn_file_annotation_launch(includes prepare + launch logic) - 5x
fn_file_annotation_finalize(parallel instances)
Inefficiency: Functions speculatively poll for work. If nothing to process (common in low-activity), they exit immediately after wasting cold start time, query execution, and function credits.
Current: 96 function executions per day (6 × 4/hour × 24h)
Reality: 60-90% exit early with no work done
Current Flow
Every 15 minutes (scheduled):
├── Launch Function
│ ├── prepare() → Query for files with "ToAnnotate" tag
│ │ └── If none: exit early ❌
│ └── run() → Query for AnnotationStates with status="New"/"Retry"
│ └── If none: exit early ❌
└── Finalize Function (5 parallel)
└── run() → Query for AnnotationStates with status="Processing"
└── If none: exit early ❌
Proposed Architecture
High-Level Design
Three workflow versions, each with data modeling triggers:
| Workflow Version | Trigger Fires On | Function | Batch Config |
|---|---|---|---|
v1_prepare |
Files with ToAnnotate tag |
fn_file_annotation_prepare (new) |
size: 100, timeout: 60s |
v1_launch |
AnnotationState status = "New"/"Retry" |
fn_file_annotation_launch |
size: 50, timeout: 30s |
v1_finalize |
AnnotationState status = "Processing" |
fn_file_annotation_finalize (5x) |
size: 20, timeout: 60s |
Trigger Configurations
1. Prepare Trigger (click to expand)
Purpose: Create AnnotationState instances for files tagged ToAnnotate
WorkflowDataModelingTriggerRule(
data_modeling_query=WorkflowTriggerDataModelingQuery(
with_={
"files_to_prepare": NodeResultSetExpression(
filter=And(
In(property=file_view.as_property_ref("tags"), values=["ToAnnotate"]),
Not(In(property=file_view.as_property_ref("tags"),
values=["AnnotationInProcess", "Annotated", "AnnotationFailed"]))
)
)
},
select={"files_to_prepare": Select(sources=[SourceSelector(source=file_view, properties=["name", "tags"])])}
),
batch_size=100,
batch_timeout=60
)Function Changes:
- Extract
prepare()fromfn_file_annotation_launchinto newfn_file_annotation_prepare - Input:
${workflow.input.items}(files from trigger) - Output: Creates
AnnotationStatewithstatus="New"→ fires launch trigger
2. Launch Trigger (click to expand)
Purpose: Launch diagram detect jobs for ready files
WorkflowDataModelingTriggerRule(
data_modeling_query=WorkflowTriggerDataModelingQuery(
with_={
"states_to_launch": NodeResultSetExpression(
filter=In(property=annotation_state_view.as_property_ref("annotationStatus"),
values=["New", "Retry"])
)
},
select={"states_to_launch": Select(sources=[SourceSelector(source=annotation_state_view,
properties=["annotationStatus", "linkedFile", "attemptCount"])])}
),
batch_size=50,
batch_timeout=30
)Function Changes:
- Remove
prepare()logic fromfn_file_annotation_launch - Keep existing cache management and batching logic
- Input:
${workflow.input.items}(AnnotationStates from trigger) - Output: Updates
status="Processing"→ fires finalize trigger
3. Finalize Trigger (click to expand)
Purpose: Process completed diagram detect jobs
WorkflowDataModelingTriggerRule(
data_modeling_query=WorkflowTriggerDataModelingQuery(
with_={
"jobs_to_finalize": NodeResultSetExpression(
filter=And(
Equals(property=annotation_state_view.as_property_ref("annotationStatus"), value="Processing"),
Exists(property=annotation_state_view.as_property_ref("diagramDetectJobId"))
)
)
},
select={"jobs_to_finalize": Select(sources=[SourceSelector(source=annotation_state_view,
properties=["annotationStatus", "diagramDetectJobId", "patternModeJobId", "linkedFile"])])}
),
batch_size=20,
batch_timeout=60
)Function Changes:
- Minimal changes (existing logic works with trigger batches)
- Keep 5 parallel instances (triggers fire multiple times naturally)
- Input:
${workflow.input.items}(AnnotationStates from trigger) - Output: Updates
status="Annotated"/"Failed"/"New"(for multi-page)
State Machine & Re-triggering Prevention
How we prevent infinite loops:
Prepare Trigger: Fires on files.tags contains "ToAnnotate" without "AnnotationInProcess"
↓ Function updates file.tags → adds "AnnotationInProcess"
↓ Function creates AnnotationState with status="New"
✅ Won't re-trigger prepare (now has "AnnotationInProcess")
Launch Trigger: Fires on AnnotationState.status IN ["New", "Retry"]
↓ Function updates status="Processing"
✅ Won't re-trigger launch (status changed)
Finalize Trigger: Fires on AnnotationState.status="Processing"
↓ Function updates status="Annotated"/"Failed"
✅ Won't re-trigger finalize (status changed)
No new flags needed - existing annotationStatus B-tree index handles this perfectly.
Benefits
| Benefit | Impact |
|---|---|
| Cost Efficiency | 50-90% reduction in wasted function executions |
| Responsiveness | <2 min latency (vs 0-15 min) from file upload to processing start |
| Architecture | Clean separation of prepare/launch/finalize phases |
| Observability | Built-in trigger run history, focused logs per phase |