Data workflows and DAGs - orchestrating data transformations
The Pipeline entity represents data workflows, DAGs (Directed Acyclic Graphs), or data pipelines that orchestrate data processing tasks. Pipelines define the sequence and dependencies of tasks that move and transform data across systems.
Hierarchy:
graph LR
SVC[Pipeline Service] --> PIPE[Pipeline]
PIPE --> TASK[Task]
style SVC fill:#667eea,color:#fff
style PIPE fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
style TASK fill:#00f2fe,color:#333
View the complete Pipeline schema in your preferred format:
=== "JSON Schema"
**Complete JSON Schema Definition**
```json
{
"$id": "https://open-metadata.org/schema/entity/data/pipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Pipeline",
"description": "A `Pipeline` entity represents a workflow or DAG that orchestrates data processing tasks.",
"type": "object",
"javaType": "org.openmetadata.schema.entity.data.Pipeline",
"definitions": {
"pipelineStatus": {
"description": "Pipeline execution status",
"type": "string",
"enum": [
"Successful", "Failed", "Pending", "Running",
"Stopped", "Skipped", "UpForRetry", "Queued"
]
},
"scheduleInterval": {
"description": "Pipeline schedule",
"type": "object",
"properties": {
"scheduleExpression": {
"description": "Cron or rate expression",
"type": "string"
},
"startDate": {
"description": "Schedule start date",
"type": "string",
"format": "date-time"
},
"endDate": {
"description": "Schedule end date",
"type": "string",
"format": "date-time"
}
}
}
},
"properties": {
"id": {
"description": "Unique identifier",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Pipeline name",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"fullyQualifiedName": {
"description": "Fully qualified name: service.pipeline",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
},
"displayName": {
"description": "Display name",
"type": "string"
},
"description": {
"description": "Markdown description",
"$ref": "../../type/basic.json#/definitions/markdown"
},
"pipelineUrl": {
"description": "URL to pipeline in orchestration tool",
"type": "string",
"format": "uri"
},
"sourceUrl": {
"description": "URL to pipeline source code",
"type": "string",
"format": "uri"
},
"tasks": {
"description": "Pipeline tasks",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
}
},
"scheduleInterval": {
"$ref": "#/definitions/scheduleInterval"
},
"pipelineStatus": {
"$ref": "#/definitions/pipelineStatus"
},
"startDate": {
"description": "Pipeline creation/start date",
"type": "string",
"format": "date-time"
},
"concurrency": {
"description": "Maximum concurrent runs",
"type": "integer"
},
"pipelineLocation": {
"description": "Pipeline code location",
"type": "string"
},
"service": {
"description": "Pipeline service",
"$ref": "../../type/entityReference.json"
},
"owner": {
"description": "Owner (user or team)",
"$ref": "../../type/entityReference.json"
},
"domain": {
"description": "Data domain",
"$ref": "../../type/entityReference.json"
},
"tags": {
"description": "Classification tags",
"type": "array",
"items": {
"$ref": "../../type/tagLabel.json"
}
},
"glossaryTerms": {
"description": "Business glossary terms",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
}
},
"version": {
"description": "Metadata version",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
}
},
"required": ["id", "name", "service"]
}
```
**[View Full JSON Schema →](https://github.com/open-metadata/OpenMetadataStandards/blob/main/schemas/entity/data/pipeline.json)**
=== "RDF"
**RDF/OWL Ontology Definition**
```turtle
@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
# Pipeline Class Definition
om:Pipeline a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Pipeline" ;
rdfs:comment "A workflow or DAG that orchestrates data processing tasks" ;
om:hierarchyLevel 2 .
# Properties
om:pipelineName a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:string ;
rdfs:label "name" ;
rdfs:comment "Name of the pipeline" .
om:fullyQualifiedName a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:string ;
rdfs:label "fullyQualifiedName" ;
rdfs:comment "Complete hierarchical name: service.pipeline" .
om:pipelineUrl a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:anyURI ;
rdfs:label "pipelineUrl" ;
rdfs:comment "URL to pipeline in orchestration tool" .
om:scheduleInterval a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range xsd:string ;
rdfs:label "scheduleInterval" ;
rdfs:comment "Cron or rate expression for pipeline schedule" .
om:pipelineStatus a owl:DatatypeProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:PipelineStatus ;
rdfs:label "pipelineStatus" ;
rdfs:comment "Current execution status" .
om:hasTask a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:Task ;
rdfs:label "hasTask" ;
rdfs:comment "Tasks in this pipeline" .
om:belongsToPipelineService a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:PipelineService ;
rdfs:label "belongsToService" ;
rdfs:comment "Service managing this pipeline" .
om:pipelineOwnedBy a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:Owner ;
rdfs:label "ownedBy" ;
rdfs:comment "User or team that owns this pipeline" .
om:pipelineHasTag a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:Tag ;
rdfs:label "hasTag" ;
rdfs:comment "Classification tags applied to pipeline" .
om:pipelineLinkedToGlossaryTerm a owl:ObjectProperty ;
rdfs:domain om:Pipeline ;
rdfs:range om:GlossaryTerm ;
rdfs:label "linkedToGlossaryTerm" ;
rdfs:comment "Business glossary terms" .
# Pipeline Status Enumeration
om:PipelineStatus a owl:Class ;
owl:oneOf (
om:Successful
om:Failed
om:Pending
om:Running
om:Stopped
) .
# Example Instance
ex:customerEtlPipeline a om:Pipeline ;
om:pipelineName "customer_etl" ;
om:fullyQualifiedName "airflow_prod.customer_etl" ;
om:displayName "Customer ETL Pipeline" ;
om:scheduleInterval "0 2 * * *" ;
om:pipelineStatus om:Successful ;
om:belongsToPipelineService ex:airflowProdService ;
om:pipelineOwnedBy ex:dataEngTeam ;
om:pipelineHasTag ex:tierGold ;
om:hasTask ex:extractCustomersTask ;
om:hasTask ex:transformCustomersTask ;
om:hasTask ex:loadCustomersTask .
```
**[View Full RDF Ontology →](https://github.com/open-metadata/OpenMetadataStandards/blob/main/rdf/ontology/openmetadata.ttl)**
=== "JSON-LD"
**JSON-LD Context and Example**
```json
{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"xsd": "http://www.w3.org/2001/XMLSchema#",
"Pipeline": "om:Pipeline",
"name": {
"@id": "om:pipelineName",
"@type": "xsd:string"
},
"fullyQualifiedName": {
"@id": "om:fullyQualifiedName",
"@type": "xsd:string"
},
"displayName": {
"@id": "om:displayName",
"@type": "xsd:string"
},
"description": {
"@id": "om:description",
"@type": "xsd:string"
},
"pipelineUrl": {
"@id": "om:pipelineUrl",
"@type": "xsd:anyURI"
},
"scheduleInterval": {
"@id": "om:scheduleInterval"
},
"pipelineStatus": {
"@id": "om:pipelineStatus",
"@type": "@vocab"
},
"tasks": {
"@id": "om:hasTask",
"@type": "@id",
"@container": "@list"
},
"service": {
"@id": "om:belongsToPipelineService",
"@type": "@id"
},
"owner": {
"@id": "om:pipelineOwnedBy",
"@type": "@id"
},
"domain": {
"@id": "om:inDomain",
"@type": "@id"
},
"tags": {
"@id": "om:pipelineHasTag",
"@type": "@id",
"@container": "@set"
},
"glossaryTerms": {
"@id": "om:pipelineLinkedToGlossaryTerm",
"@type": "@id",
"@container": "@set"
}
}
}
```
**Example JSON-LD Instance**:
```json
{
"@context": "https://open-metadata.org/context/pipeline.jsonld",
"@type": "Pipeline",
"@id": "https://example.com/pipelines/customer_etl",
"name": "customer_etl",
"fullyQualifiedName": "airflow_prod.customer_etl",
"displayName": "Customer ETL Pipeline",
"description": "Daily ETL pipeline for customer data",
"pipelineUrl": "https://airflow.company.com/dags/customer_etl",
"sourceUrl": "https://github.com/company/pipelines/blob/main/dags/customer_etl.py",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"startDate": "2024-01-01T00:00:00Z"
},
"pipelineStatus": "Successful",
"concurrency": 1,
"service": {
"@id": "https://example.com/services/airflow_prod",
"@type": "PipelineService",
"name": "airflow_prod"
},
"owner": {
"@id": "https://example.com/teams/data-engineering",
"@type": "Team",
"name": "data-engineering",
"displayName": "Data Engineering"
},
"tags": [
{
"@id": "https://open-metadata.org/tags/Tier/Gold",
"tagFQN": "Tier.Gold"
},
{
"@id": "https://open-metadata.org/tags/Schedule/Daily",
"tagFQN": "Schedule.Daily"
}
],
"glossaryTerms": [
{
"@id": "https://example.com/glossary/ETL",
"@type": "GlossaryTerm",
"fullyQualifiedName": "BusinessGlossary.ETL"
}
],
"tasks": [
{
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/extract_customers",
"name": "extract_customers"
},
{
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/transform_customers",
"name": "transform_customers"
},
{
"@type": "Task",
"@id": "https://example.com/pipelines/customer_etl/tasks/load_customers",
"name": "load_customers"
}
]
}
```
**[View Full JSON-LD Context →](https://github.com/open-metadata/OpenMetadataStandards/blob/main/rdf/contexts/pipeline.jsonld)**
- Document ETL/ELT workflows and data pipelines
- Track pipeline schedules and execution history
- Monitor pipeline success rates and failures
- Capture data lineage through pipelines
- Define pipeline ownership and responsibilities
- Apply governance tags to critical pipelines
- Link pipelines to business processes
- Audit data transformation logic
Type: string (UUID format)
Required: Yes (system-generated)
Description: Unique identifier for this pipeline instance
{
"id": "2a3b4c5d-6e7f-8a9b-0c1d-2e3f4a5b6c7d"
}Type: string
Required: Yes
Pattern: ^[^.]*$ (no dots allowed)
Min Length: 1
Max Length: 256
Description: Name of the pipeline (unqualified)
{
"name": "customer_etl"
}Type: string
Required: Yes (system-generated)
Pattern: ^((?!::).)*$
Description: Fully qualified name in the format service.pipeline
{
"fullyQualifiedName": "airflow_prod.customer_etl"
}Type: string
Required: No
Description: Human-readable display name
{
"displayName": "Customer ETL Pipeline"
}Type: string (Markdown format)
Required: No
Description: Rich text description of the pipeline's purpose and logic
{
"description": "# Customer ETL Pipeline\n\nDaily pipeline that extracts customer data from MongoDB, transforms it, and loads into PostgreSQL.\n\n## Schedule\n- Runs daily at 2 AM UTC\n- Average duration: 45 minutes\n\n## Data Flow\n1. Extract from MongoDB `customers` collection\n2. Deduplicate and validate\n3. Enrich with geographic data\n4. Load to PostgreSQL `public.customers` table"
}Type: string (URI format)
Required: No
Description: URL to view pipeline in orchestration tool
{
"pipelineUrl": "https://airflow.company.com/dags/customer_etl"
}Type: string (URI format)
Required: No
Description: URL to pipeline source code repository
{
"sourceUrl": "https://github.com/company/pipelines/blob/main/dags/customer_etl.py"
}Type: object
Required: No
Description: Pipeline execution schedule
ScheduleInterval Object Properties:
| Property | Type | Required | Description |
|---|---|---|---|
scheduleExpression |
string | No | Cron expression or rate |
startDate |
string (ISO 8601) | No | Schedule start date |
endDate |
string (ISO 8601) | No | Schedule end date |
Example - Cron Schedule:
{
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"startDate": "2024-01-01T00:00:00Z"
}
}Example - Rate Expression:
{
"scheduleInterval": {
"scheduleExpression": "@hourly",
"startDate": "2024-01-01T00:00:00Z",
"endDate": "2024-12-31T23:59:59Z"
}
}Type: string enum
Required: No (system-populated from latest run)
Allowed Values:
Successful- Last run completed successfullyFailed- Last run failedPending- Waiting to startRunning- Currently executingStopped- Manually stoppedSkipped- Skipped executionUpForRetry- Failed, waiting for retryQueued- In execution queue
{
"pipelineStatus": "Successful"
}Type: integer
Required: No
Description: Maximum number of concurrent pipeline runs
{
"concurrency": 1
}Type: string
Required: No
Description: File path or location of pipeline definition
{
"pipelineLocation": "/opt/airflow/dags/customer_etl.py"
}Type: string (ISO 8601 date-time)
Required: No
Description: Pipeline creation or first run date
{
"startDate": "2024-01-01T00:00:00Z"
}Type: array of EntityReference
Required: No
Description: List of tasks in the pipeline
{
"tasks": [
{
"id": "3b4c5d6e-7f8a-9b0c-1d2e-3f4a5b6c7d8e",
"type": "task",
"name": "extract_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.extract_customers"
},
{
"id": "4c5d6e7f-8a9b-0c1d-2e3f-4a5b6c7d8e9f",
"type": "task",
"name": "transform_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.transform_customers"
},
{
"id": "5d6e7f8a-9b0c-1d2e-3f4a-5b6c7d8e9f0a",
"type": "task",
"name": "load_customers",
"fullyQualifiedName": "airflow_prod.customer_etl.load_customers"
}
]
}Type: object
Required: Yes
Description: Reference to parent pipeline service
{
"service": {
"id": "1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d",
"type": "pipelineService",
"name": "airflow_prod",
"fullyQualifiedName": "airflow_prod"
}
}Type: object
Required: No
Description: User or team that owns this pipeline
{
"owner": {
"id": "6e7f8a9b-0c1d-2e3f-4a5b-6c7d8e9f0a1b",
"type": "team",
"name": "data-engineering",
"displayName": "Data Engineering Team"
}
}Type: object
Required: No
Description: Data domain this pipeline belongs to
{
"domain": {
"id": "7f8a9b0c-1d2e-3f4a-5b6c-7d8e9f0a1b2c",
"type": "domain",
"name": "CustomerData",
"fullyQualifiedName": "CustomerData"
}
}Type: array
Required: No
Description: Classification tags applied to the pipeline
{
"tags": [
{
"tagFQN": "Tier.Gold",
"description": "Critical production pipeline",
"source": "Classification",
"labelType": "Manual",
"state": "Confirmed"
},
{
"tagFQN": "Schedule.Daily",
"source": "Classification",
"labelType": "Automated",
"state": "Confirmed"
}
]
}Type: array
Required: No
Description: Business glossary terms linked to this pipeline
{
"glossaryTerms": [
{
"fullyQualifiedName": "BusinessGlossary.ETL"
},
{
"fullyQualifiedName": "BusinessGlossary.CustomerData"
}
]
}Type: array
Required: No (system-populated)
Description: Upstream data assets (tables, files, etc.)
{
"upstream": [
{
"id": "source-table-uuid",
"type": "table",
"name": "customers",
"fullyQualifiedName": "mongodb_prod.crm.customers"
}
]
}Type: array
Required: No (system-populated)
Description: Downstream data assets
{
"downstream": [
{
"id": "target-table-uuid",
"type": "table",
"name": "customers",
"fullyQualifiedName": "postgres_prod.ecommerce.public.customers"
}
]
}Type: number
Required: Yes (system-managed)
Description: Metadata version number, incremented on changes
{
"version": 3.1
}Type: integer (Unix epoch milliseconds)
Required: Yes (system-managed)
Description: Last update timestamp
{
"updatedAt": 1704240000000
}Type: string
Required: Yes (system-managed)
Description: User who made the update
{
"updatedBy": "john.doe"
}Type: object
Required: No
Description: Details of what changed in this version
{
"changeDescription": {
"fieldsAdded": [
{
"name": "tasks.data_quality_check",
"newValue": "Task for validating data quality"
}
],
"fieldsUpdated": [
{
"name": "scheduleInterval.scheduleExpression",
"oldValue": "0 1 * * *",
"newValue": "0 2 * * *"
}
],
"fieldsDeleted": [],
"previousVersion": 3.0
}
}{
"id": "2a3b4c5d-6e7f-8a9b-0c1d-2e3f4a5b6c7d",
"name": "customer_etl",
"fullyQualifiedName": "airflow_prod.customer_etl",
"displayName": "Customer ETL Pipeline",
"description": "# Customer ETL Pipeline\n\nDaily pipeline that extracts customer data from MongoDB, transforms it, and loads into PostgreSQL.",
"pipelineUrl": "https://airflow.company.com/dags/customer_etl",
"sourceUrl": "https://github.com/company/pipelines/blob/main/dags/customer_etl.py",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"startDate": "2024-01-01T00:00:00Z"
},
"pipelineStatus": "Successful",
"concurrency": 1,
"startDate": "2024-01-01T00:00:00Z",
"tasks": [
{
"id": "3b4c5d6e-7f8a-9b0c-1d2e-3f4a5b6c7d8e",
"type": "task",
"name": "extract_customers"
},
{
"id": "4c5d6e7f-8a9b-0c1d-2e3f-4a5b6c7d8e9f",
"type": "task",
"name": "transform_customers"
},
{
"id": "5d6e7f8a-9b0c-1d2e-3f4a-5b6c7d8e9f0a",
"type": "task",
"name": "load_customers"
}
],
"service": {
"id": "1a2b3c4d-5e6f-7a8b-9c0d-1e2f3a4b5c6d",
"type": "pipelineService",
"name": "airflow_prod"
},
"owner": {
"id": "6e7f8a9b-0c1d-2e3f-4a5b-6c7d8e9f0a1b",
"type": "team",
"name": "data-engineering"
},
"domain": {
"id": "7f8a9b0c-1d2e-3f4a-5b6c-7d8e9f0a1b2c",
"type": "domain",
"name": "CustomerData"
},
"tags": [
{"tagFQN": "Tier.Gold"},
{"tagFQN": "Schedule.Daily"}
],
"glossaryTerms": [
{"fullyQualifiedName": "BusinessGlossary.ETL"}
],
"version": 3.1,
"updatedAt": 1704240000000,
"updatedBy": "john.doe"
}@prefix om: <https://open-metadata.org/schema/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix owl: <http://www.w3.org/2001/XMLSchema#> .
om:Pipeline a owl:Class ;
rdfs:subClassOf om:DataAsset ;
rdfs:label "Pipeline" ;
rdfs:comment "A workflow or DAG that orchestrates data processing" ;
om:hasProperties [
om:name "string" ;
om:tasks "Task[]" ;
om:scheduleInterval "string" ;
om:service "PipelineService" ;
om:owner "Owner" ;
om:tags "Tag[]" ;
] .@prefix om: <https://open-metadata.org/schema/> .
@prefix ex: <https://example.com/pipelines/> .
ex:customer_etl a om:Pipeline ;
om:pipelineName "customer_etl" ;
om:fullyQualifiedName "airflow_prod.customer_etl" ;
om:displayName "Customer ETL Pipeline" ;
om:description "Daily pipeline that extracts customer data" ;
om:pipelineUrl "https://airflow.company.com/dags/customer_etl"^^xsd:anyURI ;
om:scheduleInterval "0 2 * * *" ;
om:pipelineStatus "Successful" ;
om:belongsToPipelineService ex:airflow_prod ;
om:pipelineOwnedBy ex:data_engineering_team ;
om:pipelineHasTag ex:tier_gold ;
om:hasTask ex:extract_customers ;
om:hasTask ex:transform_customers ;
om:hasTask ex:load_customers .{
"@context": {
"@vocab": "https://open-metadata.org/schema/",
"om": "https://open-metadata.org/schema/",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#",
"Pipeline": "om:Pipeline",
"name": "om:pipelineName",
"fullyQualifiedName": "om:fullyQualifiedName",
"displayName": "om:displayName",
"description": "om:description",
"pipelineUrl": "om:pipelineUrl",
"scheduleInterval": "om:scheduleInterval",
"tasks": {
"@id": "om:hasTask",
"@type": "@id",
"@container": "@list"
},
"service": {
"@id": "om:belongsToPipelineService",
"@type": "@id"
},
"owner": {
"@id": "om:pipelineOwnedBy",
"@type": "@id"
},
"tags": {
"@id": "om:pipelineHasTag",
"@type": "@id",
"@container": "@set"
}
}
}{
"@context": "https://open-metadata.org/context/pipeline.jsonld",
"@type": "Pipeline",
"@id": "https://example.com/pipelines/customer_etl",
"name": "customer_etl",
"fullyQualifiedName": "airflow_prod.customer_etl",
"displayName": "Customer ETL Pipeline",
"pipelineUrl": "https://airflow.company.com/dags/customer_etl",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *"
},
"service": {
"@id": "https://example.com/services/airflow_prod",
"@type": "PipelineService"
},
"owner": {
"@id": "https://example.com/teams/data-engineering",
"@type": "Team"
},
"tags": [
{"@id": "https://open-metadata.org/tags/Tier/Gold"}
]
}Pipeline has comprehensive relationships with entities across the metadata platform:
graph TD
subgraph Hierarchy
SVC[PipelineService<br/>airflow_prod]
SVC --> PIPE[Pipeline<br/>customer_etl]
end
subgraph Tasks
PIPE --> TASK1[Task<br/>extract_customers]
PIPE --> TASK2[Task<br/>transform_customers]
PIPE --> TASK3[Task<br/>load_customers]
PIPE --> TASK4[Task<br/>data_quality_check]
TASK1 --> TASK2
TASK2 --> TASK3
TASK3 --> TASK4
end
subgraph Source Tables
SRC1[Table<br/>mongodb.crm.customers] -.->|extracted by| PIPE
SRC2[Table<br/>postgres.raw.customer_data] -.->|read by| PIPE
end
subgraph Target Tables
PIPE -.->|loads to| TGT1[Table<br/>warehouse.customers]
PIPE -.->|creates| TGT2[Table<br/>warehouse.customer_summary]
PIPE -.->|updates| TGT3[Table<br/>warehouse.customer_metrics]
end
subgraph Downstream
TGT1 -.->|feeds| DASH[Dashboard<br/>Customer Analytics]
TGT1 -.->|trains| ML[MLModel<br/>churn_predictor]
TGT2 -.->|consumed by| API[ApiEndpoint<br/>customer_stats]
end
subgraph Ownership
PIPE -.->|owned by| TEAM[Team<br/>Data Engineering]
PIPE -.->|owned by| USER[User<br/>etl.admin]
end
subgraph Governance
PIPE -.->|in domain| DOM[Domain<br/>Customer Data]
PIPE -.->|tagged| TAG1[Tag<br/>Tier.Gold]
PIPE -.->|tagged| TAG2[Tag<br/>Schedule.Daily]
PIPE -.->|tagged| TAG3[Tag<br/>Production]
PIPE -.->|linked to| GT[GlossaryTerm<br/>ETL Process]
end
subgraph Quality
TC1[TestCase<br/>execution_success] -.->|validates| PIPE
TC2[TestCase<br/>data_freshness] -.->|monitors| TGT1
TC3[TestCase<br/>row_count_check] -.->|validates| TGT1
TC4[TestCase<br/>schema_validation] -.->|checks| TGT2
end
subgraph Monitoring
MON[Dashboard<br/>Pipeline Monitor] -.->|tracks| PIPE
ALERT[Alert<br/>failure_notification] -.->|watches| PIPE
LOG[Logs<br/>execution_history] -.->|records| PIPE
end
subgraph Configuration
REPO[Repository<br/>github.com/pipelines] -.->|source code| PIPE
SCHED[Schedule<br/>Daily at 2 AM UTC] -.->|triggers| PIPE
CONFIG[Config<br/>pipeline_config.yaml] -.->|configures| PIPE
end
style SVC fill:#667eea,color:#fff
style PIPE fill:#4facfe,color:#fff,stroke:#4c51bf,stroke-width:3px
style TASK1 fill:#00f2fe,color:#333
style TASK2 fill:#00f2fe,color:#333
style TASK3 fill:#00f2fe,color:#333
style TASK4 fill:#00f2fe,color:#333
style SRC1 fill:#764ba2,color:#fff
style SRC2 fill:#764ba2,color:#fff
style TGT1 fill:#764ba2,color:#fff
style TGT2 fill:#764ba2,color:#fff
style TGT3 fill:#764ba2,color:#fff
style DASH fill:#ff6b6b,color:#fff
style ML fill:#ff6b6b,color:#fff
style API fill:#ff6b6b,color:#fff
style TEAM fill:#43e97b,color:#fff
style USER fill:#43e97b,color:#fff
style DOM fill:#fa709a,color:#fff
style TAG1 fill:#f093fb,color:#fff
style TAG2 fill:#f093fb,color:#fff
style TAG3 fill:#f093fb,color:#fff
style GT fill:#ffd700,color:#333
style TC1 fill:#9b59b6,color:#fff
style TC2 fill:#9b59b6,color:#fff
style TC3 fill:#9b59b6,color:#fff
style TC4 fill:#9b59b6,color:#fff
style MON fill:#00ac69,color:#fff
style ALERT fill:#00ac69,color:#fff
style LOG fill:#00ac69,color:#fff
style REPO fill:#f5576c,color:#fff
style SCHED fill:#f5576c,color:#fff
style CONFIG fill:#f5576c,color:#fff
Relationship Types:
- Solid lines (→): Hierarchical containment (Service contains Pipeline, Pipeline contains Tasks)
- Dashed lines (-.->): References and associations (ownership, governance, lineage)
- PipelineService: The service managing this pipeline
- Task: Individual tasks/steps within the pipeline
- Owner: User or team owning this pipeline
- Domain: Business domain assignment
- Tag: Classification tags
- GlossaryTerm: Business terminology
- Table: Source tables (upstream) and target tables (downstream)
- Dashboard: Dashboards consuming pipeline outputs or monitoring pipeline health
- MLModel: ML models trained on pipeline outputs
- ApiEndpoint: APIs serving pipeline outputs
- TestCase: Pipeline execution, data quality, and schema validation tests
- Alert: Failure notifications and monitoring alerts
This entity supports custom properties through the extension field.
Common custom properties include:
- Data Classification: Sensitivity level
- Cost Center: Billing allocation
- Retention Period: Data retention requirements
- Application Owner: Owning application/team
See Custom Properties for details on defining and using custom properties.
Users can follow pipelines to receive notifications about configuration changes, task modifications, and schedule updates. See Followers for details.
All Pipeline operations are available under the /v1/pipelines endpoint.
Get a list of pipelines, optionally filtered by service.
GET /v1/pipelines
Query Parameters:
- fields: Fields to include (tasks, tags, owner, lineage, pipelineStatus, etc.)
- service: Filter by pipeline service name
- limit: Number of results (1-1000000, default 10)
- before/after: Cursor-based pagination
- include: all | deleted | non-deleted (default: non-deleted)
Response: PipelineListCreate a new pipeline under a pipeline service.
POST /v1/pipelines
Content-Type: application/json
{
"name": "customer_etl",
"service": "airflow_prod",
"description": "Customer data ETL pipeline",
"scheduleInterval": {
"scheduleExpression": "0 2 * * *",
"scheduleType": "CRON"
},
"tasks": [
{
"name": "extract_customers",
"taskType": "PythonOperator",
"description": "Extract customer data from source"
},
{
"name": "transform_data",
"taskType": "SparkSubmitOperator",
"downstreamTasks": ["extract_customers"]
},
{
"name": "load_to_warehouse",
"taskType": "PostgresOperator",
"downstreamTasks": ["transform_data"]
}
]
}
Response: PipelineGet a pipeline by its fully qualified name.
GET /v1/pipelines/name/{fqn}
Query Parameters:
- fields: Fields to include (tasks, tags, owner, lineage, etc.)
- include: all | deleted | non-deleted
Example:
GET /v1/pipelines/name/airflow_prod.customer_etl?fields=tasks,tags,owner,lineage,pipelineStatus
Response: PipelineGet a pipeline by its unique identifier.
GET /v1/pipelines/{id}
Query Parameters:
- fields: Fields to include
- include: all | deleted | non-deleted
Response: PipelineUpdate a pipeline using JSON Patch.
PATCH /v1/pipelines/name/{fqn}
Content-Type: application/json-patch+json
[
{"op": "replace", "path": "/scheduleInterval/scheduleExpression", "value": "0 3 * * *"},
{"op": "add", "path": "/tags/-", "value": {"tagFQN": "Critical"}},
{"op": "replace", "path": "/description", "value": "Updated ETL pipeline"}
]
Response: PipelineCreate a new pipeline or update if it exists.
PUT /v1/pipelines
Content-Type: application/json
{
"name": "orders_pipeline",
"service": "airflow_prod",
"scheduleInterval": {...},
"tasks": [...]
}
Response: PipelineDelete a pipeline by fully qualified name.
DELETE /v1/pipelines/name/{fqn}
Query Parameters:
- hardDelete: Permanently delete (default: false)
Response: 200 OKUpdate tasks in a pipeline.
PUT /v1/pipelines/{id}/tasks
Content-Type: application/json
{
"tasks": [
{
"name": "data_quality_check",
"taskType": "PythonOperator",
"description": "Run data quality tests"
}
]
}
Response: PipelineGet the execution status of a pipeline.
GET /v1/pipelines/{id}/pipelineStatus
Response: PipelineStatus (latest runs, success/failure, execution time)Update the status of a pipeline execution.
PUT /v1/pipelines/{id}/pipelineStatus
Content-Type: application/json
{
"executionStatus": "Successful",
"timestamp": 1700000000,
"executionDate": "2024-01-15"
}
Response: PipelineStatusGet all versions of a pipeline.
GET /v1/pipelines/{id}/versions
Response: EntityHistoryAdd a follower to a pipeline.
PUT /v1/pipelines/{id}/followers/{userId}
Response: ChangeEventGet all followers of a pipeline.
GET /v1/pipelines/{id}/followers
Response: EntityReference[]Create or update multiple pipelines.
PUT /v1/pipelines/bulk
Content-Type: application/json
{
"entities": [...]
}
Response: BulkOperationResult- Pipeline Service - Service configuration
- Task - Task specification
- Lineage - Pipeline lineage tracking
- Data Quality - Testing pipeline outputs