Skip to content

Commit 3516670

Browse files
authored
chore(DAG-SINAN): proposal to split DAGs by disease (#211)
* chore(DAG-SINAN): proposal to split DAGs by disease * Changing nomenclature to follow EGH table pattern * change schemaname back to 'brasil' * Fix \x00 error on SIFG * Linter * Fix docstrings * Metadata sheets * Fixing metadata files * extract.medatada method * Unittest for metadata * Extracting all metadata & types from xlsx files * Linter * Poetry lock * Passing files to PySUS * Move changes to PySUS * Minor fixes * SQLAlchemy 2.0 breaks airflow * Poetry lock
1 parent 2f33d6e commit 3516670

File tree

8 files changed

+3475
-3066
lines changed

8 files changed

+3475
-3066
lines changed

conda/dev.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ dependencies:
88
- nodejs
99
- pandas
1010
- pip
11-
- poetry
11+
- poetry >= 1.3.2
1212
- psycopg2
1313
- python 3.9.*
1414
- shellcheck
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import unicodedata
2+
3+
from pysus.online_data import SINAN
4+
5+
DISEASES = SINAN.agravos
6+
7+
8+
def normalize_str(disease: str) -> str:
9+
"""
10+
Animais Peçonhentos -> animais_peconhentos
11+
"""
12+
non_ascii = (
13+
unicodedata.normalize("NFKD", disease).encode("ascii", "ignore").decode()
14+
)
15+
disease = non_ascii.lower().replace(" ", "_")
16+
return disease
Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,34 @@
1-
from pathlib import PosixPath
1+
import os
2+
from pathlib import Path
23

4+
import pandas as pd
35
from loguru import logger
4-
from pysus.online_data import SINAN
6+
from pysus import SINAN
57

68
from epigraphhub.data._config import PYSUS_DATA_PATH, SINAN_LOG_PATH
79

810
logger.add(SINAN_LOG_PATH, retention="7 days")
911

10-
diseases = SINAN.agravos
1112

12-
13-
def download(disease: str):
13+
def download(disease: str, years: list = None) -> None:
1414
"""
15-
Download all parquets available for an disease,
15+
Download all parquets available for a disease,
1616
according to `SINAN.agravos`.
1717
1818
Attrs:
1919
disease (str): The disease to be downloaded.
20-
data_dir (str) : The output directory were files will be downloaded.
21-
A directory with the disease code will be created.
22-
23-
Returns:
24-
parquets_paths_list list(PosixPath) : A list with all parquets dirs.
2520
"""
2621

27-
SINAN.download_all_years_in_chunks(disease, data_dir=PYSUS_DATA_PATH)
22+
SINAN.download_parquets(disease, years, data_path=PYSUS_DATA_PATH)
2823

2924
logger.info(f"All years for {disease} downloaded at {PYSUS_DATA_PATH}")
25+
26+
27+
def metadata_df(disease: str) -> pd.DataFrame:
28+
"""
29+
Returns a DataFrame containing metadata for a SINAN disease.
30+
"""
31+
try:
32+
return SINAN.metadata_df(disease)
33+
except Exception:
34+
logger.error(f"Metadata not available for {disease}")
Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,62 @@
11
import os
22
from pathlib import Path
33

4+
import pandas as pd
5+
from pysus import SINAN
46
from loguru import logger
57
from pangres import upsert
6-
from pysus.online_data import parquets_to_dataframe as to_df
8+
from pysus.classes.sinan import Disease
79

810
from epigraphhub.connection import get_engine
9-
from epigraphhub.data._config import PYSUS_DATA_PATH, SINAN_LOG_PATH
11+
from epigraphhub.data._config import SINAN_LOG_PATH, PYSUS_DATA_PATH
1012
from epigraphhub.settings import env
1113

12-
logger.add(SINAN_LOG_PATH, retention="7 days")
14+
from . import normalize_str
1315

16+
logger.add(SINAN_LOG_PATH, retention="7 days")
1417
engine = get_engine(credential_name=env.db.default_credential)
1518

1619

17-
def upload():
20+
def upload(disease: str, data_path: str = PYSUS_DATA_PATH):
1821
"""
19-
Connects to the EGH SQL server and load all the chunks for all
20-
diseases found at `$PYSUS_DATA_PATH` into database. This method cleans
21-
the chunks left.
22-
22+
Connects to the EpiGraphHub SQL server and load parquet chunks within
23+
directories, extracted using `extract.download`, into database. Receives
24+
a disease and look for local parquets paths in PYSUS_DATA_PATH, extract theirs
25+
DataFrames and upsert rows to Postgres connection following EGH table
26+
convention, see more in EGH's documentation:
27+
https://epigraphhub.readthedocs.io/en/latest/instruction_name_tables.html#about-metadata-tables
2328
"""
24-
diseases_dir = Path(PYSUS_DATA_PATH).glob("*")
25-
di_years_dir = [x for x in diseases_dir if x.is_dir()]
29+
disease_years = Disease(disease).get_years(stage='all')
2630

27-
for dir in di_years_dir:
28-
if "parquet" in Path(dir).suffix and any(os.listdir(dir)):
29-
df = to_df(str(dir), clean_after_read=False)
31+
for year in disease_years:
32+
df = SINAN.parquets_to_df(disease, year, data_path)
33+
if not df.empty:
3034
df.columns = df.columns.str.lower()
3135
df.index.name = "index"
3236

33-
table_i = str(dir).split("/")[-1].split(".parquet")[0]
34-
table = table_i[:-4].lower()
37+
tablename = "sinan_" + normalize_str(disease) + "_m"
3538
schema = "brasil"
3639

40+
print(f"Inserting {disease}-{year} on {schema}.{tablename}")
41+
3742
with engine.connect() as conn:
3843
try:
39-
4044
upsert(
4145
con=conn,
4246
df=df,
43-
table_name=table,
47+
table_name=tablename,
4448
schema=schema,
4549
if_row_exists="update",
4650
chunksize=1000,
4751
add_new_columns=True,
4852
create_table=True,
4953
)
5054

51-
logger.info(f"Table {table} updated")
55+
print(f"Table {tablename} updated")
5256

5357
except Exception as e:
54-
logger.error(f"Not able to upsert {table} \n{e}")
58+
logger.error(f"Not able to upsert {tablename} \n{e}")
5559
raise e
60+
else:
61+
print(f'[WARNING] No data for {disease} and year {year}. Skipping')
62+
continue
Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
import pandas as pd
22
from loguru import logger
3-
from pysus.online_data import SINAN
4-
from pysus.online_data import parquets_to_dataframe as to_df
3+
from pysus import SINAN
54

65
from epigraphhub.connection import get_engine
76
from epigraphhub.data._config import SINAN_LOG_PATH
87
from epigraphhub.settings import env
98

9+
from . import normalize_str
10+
1011
logger.add(SINAN_LOG_PATH, retention="7 days")
1112

1213
engine = get_engine(credential_name=env.db.default_credential)
13-
aggrs = SINAN.agravos
1414

1515

16-
def parquet(ppath: str, clean_after_read=False) -> pd.DataFrame:
16+
def parquet(disease: str, year: str|int) -> pd.DataFrame:
1717
"""
1818
Convert the parquet files into a pandas DataFrame.
1919
@@ -27,14 +27,13 @@ def parquet(ppath: str, clean_after_read=False) -> pd.DataFrame:
2727
df (DataFrame) : A Pandas DataFrame.
2828
"""
2929

30-
df = to_df(str(ppath), clean_after_read)
31-
logger.info("Parquet files converted to dataFrame")
30+
df = SINAN.parquet_to_df(disease, year)
3231
df.columns = df.columns.str.lower()
3332

3433
return df
3534

3635

37-
def table(disease: str, year: int) -> pd.DataFrame:
36+
def table(disease: str) -> pd.DataFrame:
3837
"""
3938
Connect to EGH SQL server and retrieve the data by disease and year.
4039
@@ -48,12 +47,10 @@ def table(disease: str, year: int) -> pd.DataFrame:
4847
4948
"""
5049

51-
year = str(year)[-2:].zfill(2)
52-
disease = SINAN.check_case(disease)
53-
dis_code = aggrs[disease].lower()
54-
tablename = f"{dis_code}{year}"
50+
tablename = "sinan_" + normalize_str(disease) + "_m"
51+
schema = "brasil"
5552

5653
with engine.connect() as conn:
57-
df = pd.read_sql(f"SELECT * FROM brasil.{tablename}", conn)
54+
df = pd.read_sql(f"SELECT * FROM {schema}.{tablename}", conn)
5855

5956
return df

0 commit comments

Comments
 (0)