File tree Expand file tree Collapse file tree 5 files changed +26
-17
lines changed Expand file tree Collapse file tree 5 files changed +26
-17
lines changed Original file line number Diff line number Diff line change 1+ .databricks /
2+ .venv /
3+ * .pyc
4+ __pycache__ /
5+ .pytest_cache /
6+ dist /
7+ build /
8+ covid_analysis.egg-info /
Original file line number Diff line number Diff line change 1+ {
2+ "python.testing.pytestArgs" : [" ." ],
3+ "python.testing.unittestEnabled" : false ,
4+ "python.testing.pytestEnabled" : true
5+ }
Original file line number Diff line number Diff line change 22Python Spark job that imports the latest COVID-19 hospitalization data
33'''
44import sys
5- import urllib .request
65import pandas as pd
76from pyspark .sql import SparkSession
87
1312# check if job is running in production mode
1413is_prod = len (sys .argv ) >= 2 and sys .argv [1 ] == "--prod"
1514
16- urllib .request .urlretrieve ("https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv" , "/tmp/covid-hospitalizations.csv" )
17-
1815# read from /tmp, subset for USA, pivot and fill missing values
19- df = pd .read_csv ("/tmp/covid-hospitalizations.csv" )
20- df = filter_country (df , country = 'DZA' )
21- df = pivot_and_clean (df , fillna = 0 )
16+ df = pd .read_csv (
17+ "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv" )
18+
19+ df = filter_country (df )
20+ df = pivot_and_clean (df , fillna = 0 )
2221df = clean_spark_cols (df )
2322df = index_to_col (df , colname = 'date' )
2423
2524# Convert from Pandas to a pyspark sql DataFrame.
2625df = spark .createDataFrame (df )
2726
28- print ("Covid data successfully imported." )
29-
3027# only write table in production mode
3128if is_prod :
3229 # Write to Delta Lake
3330 df .write .mode ('overwrite' ).saveAsTable ('covid_stats' )
31+ print ("Covid data successfully imported." )
3432
35- # display sample data
33+ # display sample data
34+ if is_prod :
3635 spark .sql ('select * from covid_stats' ).show (10 )
37-
38-
39-
36+ else :
37+ df .show (10 )
Original file line number Diff line number Diff line change 1+ import sys
2+ sys .path .append ('.' )
Original file line number Diff line number Diff line change 11# Test each of the transform functions.
22import pytest
3- from textwrap import fill
4- import os
53import pandas as pd
6- import numpy as np
74from covid_analysis .transforms import *
8- from pyspark .sql import SparkSession
95
106
117@pytest .fixture
@@ -30,7 +26,7 @@ def colnames_df() -> pd.DataFrame:
3026 ],
3127 )
3228 return df
33-
29+
3430
3531# Make sure the filter works as expected.
3632def test_filter (raw_input_df ):
You can’t perform that action at this time.
0 commit comments