|
1 | 1 | import os |
2 | | -from pathlib import Path |
3 | 2 |
|
4 | | -import pandas as pd |
5 | | -from pysus import SINAN |
6 | 3 | from loguru import logger |
7 | 4 | from pangres import upsert |
8 | | -from pysus.classes.sinan import Disease |
| 5 | +from pysus.online_data import parquets_to_dataframe |
9 | 6 |
|
10 | 7 | from epigraphhub.connection import get_engine |
11 | | -from epigraphhub.data._config import SINAN_LOG_PATH, PYSUS_DATA_PATH |
| 8 | +from epigraphhub.data._config import SINAN_LOG_PATH |
12 | 9 | from epigraphhub.settings import env |
13 | 10 |
|
14 | 11 | from . import normalize_str |
|
17 | 14 | engine = get_engine(credential_name=env.db.default_credential) |
18 | 15 |
|
19 | 16 |
|
20 | | -def upload(disease: str, data_path: str = PYSUS_DATA_PATH): |
| 17 | +def upload(disease: str, parquet_dir: str) -> None: |
21 | 18 | """ |
22 | 19 | Connects to the EpiGraphHub SQL server and load parquet chunks within |
23 | | - directories, extracted using `extract.download`, into database. Receives |
24 | | - a disease and look for local parquets paths in PYSUS_DATA_PATH, extract theirs |
25 | | - DataFrames and upsert rows to Postgres connection following EGH table |
26 | | - convention, see more in EGH's documentation: |
| 20 | + directories, extracted using `extract.download`, into database. a local |
| 21 | + parquet dir (eg. ~/pysus/ZIKABR19.parquet), extract theirs DataFrames |
| 22 | + and upsert rows to Postgres connection following EGH table convention, |
| 23 | + see more in EGH's documentation: |
27 | 24 | https://epigraphhub.readthedocs.io/en/latest/instruction_name_tables.html#about-metadata-tables |
28 | 25 | """ |
29 | | - disease_years = Disease(disease).get_years(stage='all') |
30 | | - |
31 | | - for year in disease_years: |
32 | | - df = SINAN.parquets_to_df(disease, year, data_path) |
33 | | - if not df.empty: |
34 | | - df.columns = df.columns.str.lower() |
35 | | - df.index.name = "index" |
36 | | - |
37 | | - tablename = "sinan_" + normalize_str(disease) + "_m" |
38 | | - schema = "brasil" |
39 | | - |
40 | | - print(f"Inserting {disease}-{year} on {schema}.{tablename}") |
41 | | - |
42 | | - with engine.connect() as conn: |
43 | | - try: |
44 | | - upsert( |
45 | | - con=conn, |
46 | | - df=df, |
47 | | - table_name=tablename, |
48 | | - schema=schema, |
49 | | - if_row_exists="update", |
50 | | - chunksize=1000, |
51 | | - add_new_columns=True, |
52 | | - create_table=True, |
53 | | - ) |
54 | | - |
55 | | - print(f"Table {tablename} updated") |
56 | | - |
57 | | - except Exception as e: |
58 | | - logger.error(f"Not able to upsert {tablename} \n{e}") |
59 | | - raise e |
60 | | - else: |
61 | | - print(f'[WARNING] No data for {disease} and year {year}. Skipping') |
62 | | - continue |
| 26 | + if any(os.listdir(parquet_dir)): |
| 27 | + df = parquets_to_dataframe(parquet_dir=parquet_dir) |
| 28 | + df.columns = df.columns.str.lower() |
| 29 | + df.index.name = "index" |
| 30 | + |
| 31 | + tablename = "sinan_" + normalize_str(disease) + "_m" |
| 32 | + schema = "brasil" |
| 33 | + print(f"Inserting {parquet_dir} on {schema}.{tablename}") |
| 34 | + |
| 35 | + with engine.connect() as conn: |
| 36 | + try: |
| 37 | + upsert( |
| 38 | + con=conn, |
| 39 | + df=df, |
| 40 | + table_name=tablename, |
| 41 | + schema=schema, |
| 42 | + if_row_exists="update", |
| 43 | + chunksize=1000, |
| 44 | + add_new_columns=True, |
| 45 | + create_table=True, |
| 46 | + ) |
| 47 | + |
| 48 | + print(f"Table {tablename} updated") |
| 49 | + |
| 50 | + except Exception as e: |
| 51 | + logger.error(f"Not able to upsert {tablename} \n{e}") |
| 52 | + raise e |
0 commit comments