Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 108 additions & 116 deletions README.md

Large diffs are not rendered by default.

28 changes: 20 additions & 8 deletions datacontract/export/custom_exporter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from pathlib import Path

from jinja2 import Environment, FileSystemLoader
from open_data_contract_standard.model import OpenDataContractStandard
from open_data_contract_standard.model import OpenDataContractStandard, SchemaObject

from datacontract.export.exporter import Exporter
from datacontract.export.exporter import Exporter, _check_schema_name_for_export


class CustomExporter(Exporter):
Expand All @@ -22,16 +22,28 @@ def export(
if template is None:
raise RuntimeError("Export to custom requires template argument.")

return to_custom(data_contract, template)
if schema_name and schema_name != "all":
schema_name, model_obj = _check_schema_name_for_export(data_contract, schema_name, self.export_format)
return to_custom(data_contract, template, schema_name=schema_name, schema=model_obj)
else:
return to_custom(data_contract, template)


def to_custom(data_contract: OpenDataContractStandard, template_path: Path) -> str:
def to_custom(
data_contract: OpenDataContractStandard,
template_path: Path,
schema_name: str | None = None,
schema: SchemaObject | None = None,
) -> str:
template = get_template(template_path)
rendered_sql = template.render(data_contract=data_contract)
return rendered_sql
context = {"data_contract": data_contract}
if schema is not None:
context["schema"] = schema
context["schema_name"] = schema_name
return template.render(**context)


def get_template(path: Path):
abosolute_path = Path(path).resolve()
env = Environment(loader=FileSystemLoader(str(abosolute_path.parent)))
absolute_path = Path(path).resolve()
env = Environment(loader=FileSystemLoader(str(absolute_path.parent)))
return env.get_template(path.name)
86 changes: 86 additions & 0 deletions tests/fixtures/custom/export_model/datacontract.odcs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
kind: DataContract
apiVersion: v3.1.0
id: orders-unit-test
name: Orders Unit Test
version: 1.0.0
status: active
description:
purpose: The orders data contract
team:
name: checkout
description: Checkout team
servers:
- server: production
type: bigquery
environment: production
account: my-account
project: my-database
dataset: my-schema
schema:
- name: orders
businessName: orders
physicalType: table
description: The orders model
properties:
- name: order_id
businessName: Order ID
logicalType: string
physicalType: varchar
unique: true
required: true
classification: sensitive
tags:
- order_id
logicalTypeOptions:
minLength: 8
maxLength: 10
pattern: ^B[0-9]+$
customProperties:
- property: pii
value: "true"
examples:
- B12345678
- B12345679
- name: order_total
logicalType: integer
physicalType: bigint
required: true
description: The order_total field
logicalTypeOptions:
minimum: 0
maximum: 1000000
quality:
- type: sql
description: 95% of all order total values are expected to be between 10 and 499 EUR.
query: |
SELECT quantile_cont(order_total, 0.95) AS percentile_95
FROM orders
mustBeBetween:
- 1000
- 49900
- name: order_status
logicalType: string
physicalType: text
required: true
customProperties:
- property: enum
value: "[\"pending\", \"shipped\", \"delivered\"]"
- name: user_id
businessName: User ID
logicalType: string
physicalType: varchar
required: true
relationships:
- type: foreignKey
to: users.user_id
- name: users
businessName: users
physicalType: table
description: The users model
properties:
- name: user_id
businessName: User ID
logicalType: string
physicalType: varchar
unique: true
required: true
216 changes: 216 additions & 0 deletions tests/fixtures/custom/export_model/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
dataContractSpecification: 1.2.1
id: urn:datacontract:checkout:orders-latest
info:
title: Orders Latest
version: 2.0.0
description: |
Successful customer orders in the webshop.
All orders since 2020-01-01.
Orders with their line items are in their current state (no history included).
owner: Checkout Team
contact:
name: John Doe (Data Product Owner)
url: https://teams.microsoft.com/l/channel/example/checkout
servers:
production:
type: s3
environment: prod
location: s3://datacontract-example-orders-latest/v2/{model}/*.json
format: json
delimiter: new_line
description: "One folder per model. One file per day."
roles:
- name: analyst_us
description: Access to the data for US region
- name: analyst_cn
description: Access to the data for China region
terms:
usage: |
Data can be used for reports, analytics and machine learning use cases.
Order may be linked and joined by other tables
limitations: |
Not suitable for real-time use cases.
Data may not be used to identify individual customers.
Max data processing per day: 10 TiB
policies:
- name: privacy-policy
url: https://example.com/privacy-policy
- name: license
description: External data is licensed under agreement 1234.
url: https://example.com/license/1234
billing: 5000 USD per month
noticePeriod: P3M
models:
orders:
description: One record per order. Includes cancelled and deleted orders.
type: table
fields:
order_id:
$ref: "#/definitions/order_id"
required: true
unique: true
primaryKey: true
order_timestamp:
description: The business timestamp in UTC when the order was successfully registered in the source system and the payment was successful.
type: timestamp
required: true
examples:
- "2024-09-09T08:30:00Z"
tags: ["business-timestamp"]
order_total:
description: Total amount the smallest monetary unit (e.g., cents).
type: long
required: true
examples:
- 9999
quality:
- type: sql
description: 95% of all order total values are expected to be between 10 and 499 EUR.
query: |
SELECT quantile_cont(order_total, 0.95) AS percentile_95
FROM orders
mustBeBetween: [1000, 49900]
customer_id:
description: Unique identifier for the customer.
type: text
minLength: 10
maxLength: 20
customer_email_address:
description: The email address, as entered by the customer.
type: text
format: email
required: true
pii: true
classification: sensitive
quality:
- type: text
description: The email address is not verified and may be invalid.
lineage:
inputFields:
- namespace: com.example.service.checkout
name: checkout_db.orders
field: email_address
processed_timestamp:
description: The timestamp when the record was processed by the data platform.
type: timestamp
required: true
config:
jsonType: string
jsonFormat: date-time
quality:
- type: sql
description: The maximum duration between two orders should be less that 3600 seconds
query: |
SELECT MAX(duration) AS max_duration FROM (SELECT EXTRACT(EPOCH FROM (order_timestamp - LAG(order_timestamp)
OVER (ORDER BY order_timestamp))) AS duration FROM orders)
mustBeLessThan: 3600
- type: sql
description: Row Count
query: |
SELECT count(*) as row_count
FROM orders
mustBeGreaterThan: 5
examples:
- |
order_id,order_timestamp,order_total,customer_id,customer_email_address,processed_timestamp
"1001","2030-09-09T08:30:00Z",2500,"1000000001","[email protected]","2030-09-09T08:31:00Z"
"1002","2030-09-08T15:45:00Z",1800,"1000000002","[email protected]","2030-09-09T08:31:00Z"
"1003","2030-09-07T12:15:00Z",3200,"1000000003","[email protected]","2030-09-09T08:31:00Z"
"1004","2030-09-06T19:20:00Z",1500,"1000000004","[email protected]","2030-09-09T08:31:00Z"
"1005","2030-09-05T10:10:00Z",4200,"1000000004","[email protected]","2030-09-09T08:31:00Z"
"1006","2030-09-04T14:55:00Z",2800,"1000000005","[email protected]","2030-09-09T08:31:00Z"
"1007","2030-09-03T21:05:00Z",1900,"1000000006","[email protected]","2030-09-09T08:31:00Z"
"1008","2030-09-02T17:40:00Z",3600,"1000000007","[email protected]","2030-09-09T08:31:00Z"
"1009","2030-09-01T09:25:00Z",3100,"1000000008","[email protected]","2030-09-09T08:31:00Z"
"1010","2030-08-31T22:50:00Z",2700,"1000000009","[email protected]","2030-09-09T08:31:00Z"
line_items:
description: A single article that is part of an order.
type: table
fields:
line_item_id:
type: text
description: Primary key of the lines_item_id table
required: true
order_id:
$ref: "#/definitions/order_id"
references: orders.order_id
sku:
description: The purchased article number
$ref: "#/definitions/sku"
primaryKey: ["order_id", "line_item_id"]
examples:
- |
line_item_id,order_id,sku
"LI-1","1001","5901234123457"
"LI-2","1001","4001234567890"
"LI-3","1002","5901234123457"
"LI-4","1002","2001234567893"
"LI-5","1003","4001234567890"
"LI-6","1003","5001234567892"
"LI-7","1004","5901234123457"
"LI-8","1005","2001234567893"
"LI-9","1005","5001234567892"
"LI-10","1005","6001234567891"
definitions:
order_id:
title: Order ID
type: text
format: uuid
description: An internal ID that identifies an order in the online shop.
examples:
- 243c25e5-a081-43a9-aeab-6d5d5b6cb5e2
pii: true
classification: restricted
tags:
- orders
sku:
title: Stock Keeping Unit
type: text
pattern: ^[A-Za-z0-9]{8,14}$
examples:
- "96385074"
description: |
A Stock Keeping Unit (SKU) is an internal unique identifier for an article.
It is typically associated with an article's barcode, such as the EAN/GTIN.
links:
wikipedia: https://en.wikipedia.org/wiki/Stock_keeping_unit
tags:
- inventory
servicelevels:
availability:
description: The server is available during support hours
percentage: 99.9%
retention:
description: Data is retained for one year
period: P1Y
unlimited: false
latency:
description: Data is available within 25 hours after the order was placed
threshold: 25h
sourceTimestampField: orders.order_timestamp
processedTimestampField: orders.processed_timestamp
freshness:
description: The age of the youngest row in a table.
threshold: 25h
timestampField: orders.order_timestamp
frequency:
description: Data is delivered once a day
type: batch # or streaming
interval: daily # for batch, either or cron
cron: 0 0 * * * # for batch, either or interval
support:
description: The data is available during typical business hours at headquarters
time: 9am to 5pm in EST on business days
responseTime: 1h
backup:
description: Data is backed up once a week, every Sunday at 0:00 UTC.
interval: weekly
cron: 0 0 * * 0
recoveryTime: 24 hours
recoveryPoint: 1 week
tags:
- checkout
- orders
- s3
links:
datacontractCli: https://cli.datacontract.com
6 changes: 6 additions & 0 deletions tests/fixtures/custom/export_model/expected.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

SELECT
line_item_id AS line_item_id,
order_id AS order_id,
sku AS sku,
FROM {{ ref('line_items') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

select
try_cast(user_id as varchar) as user_id
from {{ source('orders-unit-test', 'users') }}
4 changes: 4 additions & 0 deletions tests/fixtures/custom/export_model/expected_odcs_users.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

SELECT
user_id AS user_id,
FROM {{ ref('users') }}
6 changes: 6 additions & 0 deletions tests/fixtures/custom/export_model/expected_stg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

select
try_cast(line_item_id as text) as line_item_id
try_cast(order_id as text) as order_id
try_cast(sku as text) as sku
from {{ source('orders-latest', 'line_items') }}
10 changes: 10 additions & 0 deletions tests/fixtures/custom/export_model/template.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

SELECT
{%- for field in schema.properties %}
{%- if field.physicalType == "timestamp" %}
DATETIME({{ field.name }}, "Asia/Tokyo") AS {{ field.name }},
{%- else %}
{{ field.name }} AS {{ field.name }},
{%- endif %}
{%- endfor %}
FROM {{ "{{" }} ref('{{ schema_name }}') {{ "}}" }}
6 changes: 6 additions & 0 deletions tests/fixtures/custom/export_model/template_stg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

select
{%- for field in schema.properties %}
try_cast({{ field.name }} as {{ field.physicalType | lower }}) as {{ field.name }}
{%- endfor %}
from {{ "{{" }} source('{{ data_contract.id.split(':')[-1] }}', '{{ schema_name }}') {{ "}}" }}
Loading
Loading