diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 3dda2be..fa0d87b 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -52,7 +52,7 @@ "Davidsekar.redis-xplorer" ] } - }, + } // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], diff --git a/.devcontainer/docker-compose.yaml b/.devcontainer/docker-compose.yaml index b9c3136..eaad1b8 100644 --- a/.devcontainer/docker-compose.yaml +++ b/.devcontainer/docker-compose.yaml @@ -67,7 +67,29 @@ services: # - SYS_PTRACE # security_opt: # - seccomp:unconfined + hydra: + container_name: hydra + image: ghcr.io/hydrasdb/hydra:latest + ports: + - 5432:5432 + environment: + POSTGRES_USER: admin + POSTGRES_PASSWORD: hello123 + volumes: + - hydravolume:/var/lib/postgresql/data + + + createdatabase: + image: ghcr.io/hydrasdb/hydra:latest + depends_on: + - hydra + entrypoint: > + /bin/sh -c " + PGPASSWORD=hello123 /usr/bin/psql -h hydra --user admin -c 'create database dataplanedb;'; + exit 0; + " volumes: miniovolume: + hydravolume: \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9555098..b97a95c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools>=61.0","redis>=1.0.0","pyarrow>1.0.0", "pandas>1.0.0", "requests>1.0.0"] +requires = ["setuptools>=61.0","redis>=1.0.0","pyarrow>1.0.0", "pandas>1.0.0", "requests>1.0.0", "SQLAlchemy>1.4.43", "psycopg2>2.9.5"] build-backend = "setuptools.build_meta" [project] diff --git a/src/dataplane/DataStorage/Hydra/__init__.py b/src/dataplane/DataStorage/Hydra/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dataplane/DataStorage/Hydra/hydra_download.py b/src/dataplane/DataStorage/Hydra/hydra_download.py new file mode 100644 index 0000000..95b0ab0 --- /dev/null +++ b/src/dataplane/DataStorage/Hydra/hydra_download.py @@ -0,0 +1,31 @@ +""" +DBClient: Database Client (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection) +LocalFilePath: File to write (needs DownloadMethod="File") +SourceTableName: table name in Hydra +DownloadMethod: Object or File +""" + +def hydra_download(DBClient, SourceTableName, DownloadMethod="Object", LocalFilePath=""): + + import pandas as pd + from datetime import datetime + # Start the timer + start = datetime.now() + + df = pd.read_sql_table(SourceTableName,DBClient) + + if DownloadMethod=="File": + if LocalFilePath == "": + duration = datetime.now() - start + return {"result":"Fail", "duration": str(duration), "reason":"File method requires a local file file path"} + + with open(LocalFilePath,'w') as f: + f.write(df.to_csv()) + + duration = datetime.now() - start + return {"result":"OK", "duration": str(duration), "FilePath": LocalFilePath} + + objectGet = df.to_csv().encode('utf-8') + duration = datetime.now() - start + return {"result":"OK", "duration": str(duration), "content": objectGet} + diff --git a/src/dataplane/DataStorage/Hydra/hydra_upload.py b/src/dataplane/DataStorage/Hydra/hydra_upload.py new file mode 100644 index 0000000..eae7dba --- /dev/null +++ b/src/dataplane/DataStorage/Hydra/hydra_upload.py @@ -0,0 +1,63 @@ +""" +DBClient: Database Client (sqlalchemy.engine.Engine or sqlalchemy.engine.Connection) +SourceFilePath: /tmp/source.xlsx (needs UploadMethod="File") +TargetTableName: /General/hello.xlxs +UploadMethod: Object or File +""" + + +def hydra_upload(DBClient, TargetTableName, SourceFilePath="/tmp/default.txt", UploadMethod="Object", UploadObject=""): + + import sqlalchemy + import pandas as pd + from datetime import datetime + import csv + + # Start the timer + start = datetime.now() + + # ====== Obtain the file from disk ====== + if UploadMethod == "File": + df = pd.read_csv(SourceFilePath) + else: + df = pd.read_csv(UploadObject) + + + df.to_sql(TargetTableName,con=DBClient,method=_psql_insert_copy) + + duration = datetime.now() - start + + return {"result":"OK", "duration": str(duration)} + + + +def _psql_insert_copy(table, conn, keys, data_iter): + """ + Execute SQL statement inserting data + Parameters + ---------- + table : pandas.io.sql.SQLTable + conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection + keys : list of str + Column names + data_iter : Iterable that iterates the values to be inserted + """ + from io import StringIO + import csv + # gets a DBAPI connection that can provide a cursor + dbapi_conn = conn.connection + with dbapi_conn.cursor() as cur: + s_buf = StringIO() + writer = csv.writer(s_buf) + writer.writerows(data_iter) + s_buf.seek(0) + + columns = ', '.join('"%s"' % (k) for k in keys) + if table.schema: + table_name = '{}.{}'.format(table.schema, table.name) + else: + table_name = table.name + + sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format( + table_name, columns) + cur.copy_expert(sql=sql, file=s_buf) diff --git a/src/dataplane/DataStorage/Hydra/test_hydra.py b/src/dataplane/DataStorage/Hydra/test_hydra.py new file mode 100644 index 0000000..6ddc80e --- /dev/null +++ b/src/dataplane/DataStorage/Hydra/test_hydra.py @@ -0,0 +1,49 @@ +import sqlalchemy +from .hydra_download import hydra_download +from .hydra_upload import hydra_upload +import os +from dotenv import load_dotenv +from nanoid import generate + +def test_hydra(): + # ---------- DATAPLANE RUN ------------ + + # Dataplane run id + os.environ["DP_RUNID"] = generate('1234567890abcdef', 10) + + # Sharepoint connection + load_dotenv() + + RUN_ID = os.environ["DP_RUNID"] + + DBClient = sqlalchemy.create_engine("postgresql://admin:hello123@hydra:5432/dataplanedb?sslmode=disable") + + table = 'newtable' + + CURRENT_DIRECTORY = os.path.realpath(os.path.dirname(__file__)) + + if os.path.exists(CURRENT_DIRECTORY+"/test_cities_delete_object.csv"): + os.remove(CURRENT_DIRECTORY+"/test_cities_delete_object.csv") + + # ------------- Store file to Hydra DB ------------- + print(CURRENT_DIRECTORY) + rs = hydra_upload( + DBClient=DBClient, + TargetTableName=table, + SourceFilePath=CURRENT_DIRECTORY+"/test_hydra_cities.csv", + UploadMethod='File' + ) + print(rs) + assert rs['result']=='OK' + + + # ------------ Retreive Table from Hydra ------------- + + rs = hydra_download( + DBClient=DBClient, + SourceTableName=table, + DownloadMethod="File", + LocalFilePath=CURRENT_DIRECTORY+"/test_cities_delete.csv", + ) + print(rs) + assert rs["result"]=="OK" \ No newline at end of file diff --git a/src/dataplane/DataStorage/Hydra/test_hydra_cities.csv b/src/dataplane/DataStorage/Hydra/test_hydra_cities.csv new file mode 100644 index 0000000..52476fe --- /dev/null +++ b/src/dataplane/DataStorage/Hydra/test_hydra_cities.csv @@ -0,0 +1,14 @@ +"LatD", "LatM", "LatS", "NS", "LonD", "LonM", "LonS", "EW", "City", "State" + 41, 5, 59, "N", 80, 39, 0, "W", "Youngstown", OH + 42, 52, 48, "N", 97, 23, 23, "W", "Yankton", SD + 46, 35, 59, "N", 120, 30, 36, "W", "Yakima", WA + 42, 16, 12, "N", 71, 48, 0, "W", "Worcester", MA + 43, 37, 48, "N", 89, 46, 11, "W", "Wisconsin Dells", WI + 36, 5, 59, "N", 80, 15, 0, "W", "Winston-Salem", NC + 49, 52, 48, "N", 97, 9, 0, "W", "Winnipeg", MB + 39, 11, 23, "N", 78, 9, 36, "W", "Winchester", VA + 34, 14, 24, "N", 77, 55, 11, "W", "Wilmington", NC + 39, 45, 0, "N", 75, 33, 0, "W", "Wilmington", DE + 48, 9, 0, "N", 103, 37, 12, "W", "Williston", ND + 41, 15, 0, "N", 77, 0, 0, "W", "Williamsport", PA + 37, 40, 48, "N", 82, 16, 47, "W", "Williamson", WV diff --git a/src/dataplane/DataStorage/Hydra/test_hydra_object.py b/src/dataplane/DataStorage/Hydra/test_hydra_object.py new file mode 100644 index 0000000..f633bce --- /dev/null +++ b/src/dataplane/DataStorage/Hydra/test_hydra_object.py @@ -0,0 +1,52 @@ +import sqlalchemy +from .hydra_download import hydra_download +from .hydra_upload import hydra_upload +import os +from dotenv import load_dotenv +from nanoid import generate +from io import BytesIO + +def test_hydra_object(): + # ---------- DATAPLANE RUN ------------ + + # Dataplane run id + os.environ["DP_RUNID"] = generate('1234567890abcdef', 10) + + # Sharepoint connection + load_dotenv() + + RUN_ID = os.environ["DP_RUNID"] + + DBClient = sqlalchemy.create_engine("postgresql://admin:hello123@hydra:5432/dataplanedb?sslmode=disable") + + table = 'newtable' + + CURRENT_DIRECTORY = os.path.realpath(os.path.dirname(__file__)) + + if os.path.exists(CURRENT_DIRECTORY+"/test_cities_delete_object.csv"): + os.remove(CURRENT_DIRECTORY+"/test_cities_delete_object.csv") + + # ------------- Store file to Hydra DB ------------- + print(CURRENT_DIRECTORY) + with open(CURRENT_DIRECTORY+"/test_hydra_cities.csv",'rb') as f: + buf = BytesIO(f.read()) + buf.seek(0) + rs = hydra_upload( + DBClient=DBClient, + TargetTableName=table, + UploadObject=buf, + UploadMethod='Object' + ) + print(rs) + assert rs['result']=='OK' + + + # ------------ Retreive Table from Hydra ------------- + + rs = hydra_download( + DBClient=DBClient, + SourceTableName=table, + DownloadMethod="Object", + ) + print(rs) + assert rs["result"]=="OK" \ No newline at end of file