Skip to content

Commit 99e5b35

Browse files
committed
workflow orchestration
1 parent 2590c6b commit 99e5b35

File tree

6 files changed

+130
-0
lines changed

6 files changed

+130
-0
lines changed

dag/.prefectignore

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# prefect artifacts
2+
.prefectignore
3+
4+
# python artifacts
5+
__pycache__/
6+
*.py[cod]
7+
*$py.class
8+
*.egg-info/
9+
*.egg
10+
11+
# Type checking artifacts
12+
.mypy_cache/
13+
.dmypy.json
14+
dmypy.json
15+
.pyre/
16+
17+
# IPython
18+
profile_default/
19+
ipython_config.py
20+
*.ipynb_checkpoints/*
21+
22+
# Environments
23+
.python-version
24+
.env
25+
.venv
26+
env/
27+
venv/
28+
29+
# MacOS
30+
.DS_Store
31+
32+
# Dask
33+
dask-worker-space/
34+
35+
# Editors
36+
.idea/
37+
.vscode/
38+
*.yaml

dag/Dockerfile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# We're using the latest version of Prefect with Python 3.10
2+
FROM prefecthq/prefect:2-python3.10
3+
4+
# Add our requirements.txt file to the image and install dependencies
5+
COPY requirements.txt .
6+
RUN pip install -r requirements.txt --trusted-host pypi.python.org --no-cache-dir
7+
8+
# Add our flow code to the image
9+
ADD flows /opt/prefect/flows

dag/flows/healthcheck.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import platform
2+
import prefect
3+
from prefect import task, flow, get_run_logger
4+
import sys
5+
6+
7+
@task
8+
def log_platform_info():
9+
logger = get_run_logger()
10+
logger.info("Host's network name = %s", platform.node())
11+
logger.info("Python version = %s", platform.python_version())
12+
logger.info("Platform information (instance type) = %s ", platform.platform())
13+
logger.info("OS/Arch = %s/%s", sys.platform, platform.machine())
14+
logger.info("Prefect Version = %s 🚀", prefect.__version__)
15+
16+
17+
@flow
18+
def healthcheck():
19+
log_platform_info()
20+
21+
22+
if __name__ == "__main__":
23+
healthcheck()

dag/flows/sample.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import httpx
2+
from prefect import flow
3+
4+
5+
@flow(log_prints=True)
6+
def get_repo_info(repo_name: str = "PrefectHQ/prefect"):
7+
url = f"https://api.github.com/repos/{repo_name}"
8+
response = httpx.get(url)
9+
response.raise_for_status()
10+
repo = response.json()
11+
print(f"{repo_name} repository statistics 🤓:")
12+
print(f"Stars 🌠 : {repo['stargazers_count']}")
13+
print(f"Forks 🍴 : {repo['forks_count']}")
14+
15+
16+
if __name__ == "__main__":
17+
get_repo_info.serve(name="prefect-docker-guide")

dag/flows/ss_etl.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from prefect import flow, task
2+
3+
from steam_sales.steam_etl import SteamDataClean, SteamSpyFetcher, SteamSpyMetadataFetcher, SteamStoreFetcher
4+
5+
6+
@task(name="Fetch & Update Metadata")
7+
def update_metadata():
8+
fetcher = SteamSpyMetadataFetcher()
9+
fetcher.run()
10+
return 0
11+
12+
13+
@task(name="Fetch & Update SteamSpy data")
14+
def update_steamspy_data():
15+
fetcher = SteamSpyFetcher()
16+
fetcher.run()
17+
18+
19+
@task(name="Fetch & Update SteamStore data")
20+
def update_steamstore_data():
21+
fetcher = SteamStoreFetcher()
22+
fetcher.run()
23+
24+
25+
@task(name="Clean New Steamstore database")
26+
def clean_steamstore_data():
27+
cleaner = SteamDataClean()
28+
cleaner.ingest()
29+
30+
31+
@flow(name="Steam ETL Pipeline", log_prints=True)
32+
def steam_etl():
33+
update_metadata()
34+
update_steamspy_data()
35+
update_steamstore_data()
36+
clean_steamstore_data()
37+
38+
39+
if __name__ == "__main__":
40+
steam_etl.serve(name="steam_etl")

dag/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
prefect>=2.12.0
2+
steamstore_etl
3+
httpx

0 commit comments

Comments
 (0)