From 25f5c4ade12548809e3f30620d38f002f485e963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Reynard?= Date: Fri, 20 Dec 2024 12:11:33 +0100 Subject: [PATCH] Add load_from_singlestore command --- .../commands/store/load_from_singlestore.py | 122 ++++++++++++++++++ cosmotech/coal/cli/commands/store/store.py | 2 + docs/csm-data/store/load-from-singlestore.md | 11 ++ requirements.txt | 1 + 4 files changed, 136 insertions(+) create mode 100644 cosmotech/coal/cli/commands/store/load_from_singlestore.py create mode 100644 docs/csm-data/store/load-from-singlestore.md diff --git a/cosmotech/coal/cli/commands/store/load_from_singlestore.py b/cosmotech/coal/cli/commands/store/load_from_singlestore.py new file mode 100644 index 00000000..6635a671 --- /dev/null +++ b/cosmotech/coal/cli/commands/store/load_from_singlestore.py @@ -0,0 +1,122 @@ +# Copyright (C) - 2023 - 2024 - Cosmo Tech +# This document and all information contained herein is the exclusive property - +# including all intellectual property rights pertaining thereto - of Cosmo Tech. +# Any use, reproduction, translation, broadcasting, transmission, distribution, +# etc., to any person is prohibited unless it has been previously and +# specifically authorized by written means by Cosmo Tech. +import pathlib +import time +import csv +import singlestoredb as s2 +from sqlite3 import Cursor + +from cosmotech.coal.cli.utils.click import click +from cosmotech.coal.cli.utils.decorators import web_help +from cosmotech.coal.store.csv import store_csv_file +from cosmotech.coal.store.store import Store +from cosmotech.coal.utils.logger import LOGGER + +def get_data(table_name:str, output_directory:str, cursor: Cursor): + """ + Run a SQL query to fetch all data from a table and write them in csv files + """ + start_time = time.perf_counter() + cursor.execute(f"SELECT * FROM {table_name}") + rows = cursor.fetchall() + end_time = time.perf_counter() + LOGGER.info(f"Rows fetched in {table_name} table: {len(rows)} in {round(end_time - start_time, 2)} seconds") + with open(f"{output_directory}/{table_name}.csv", "w", newline="") as csv_stock: + w = csv.DictWriter(csv_stock, rows[0].keys()) + w.writeheader() + w.writerows(rows) + +@click.command() +@web_help("csm-data/store/load-from-singlestore") +@click.option("--singlestore-host", + "single_store_host", + envvar="SINGLE_STORE_HOST", + help="SingleStore instance URI", + type=str, + show_envvar=True, + required=True) +@click.option('--singlestore-port', + "single_store_port", + help='SingleStore port', + envvar="SINGLE_STORE_PORT", + show_envvar=True, + required=False, + default=3306) +@click.option('--singlestore-db', + "single_store_db", + help='SingleStore database name', + envvar="SINGLE_STORE_DB", + show_envvar=True, + required=True) +@click.option('--singlestore-user', + "single_store_user", + help='SingleStore connection user name', + envvar="SINGLE_STORE_USERNAME", + show_envvar=True, + required=True) +@click.option('--singlestore-password', + "single_store_password", + help='SingleStore connection password', + envvar="SINGLE_STORE_PASSWORD", + show_envvar=True, + required=True) +@click.option('--singlestore-tables', + "single_store_tables", + help='SingleStore table names to fetched (separated by comma)', + envvar="SINGLE_STORE_TABLES", + show_envvar=True, + required=True) +@click.option("--store-folder", + "store_folder", + envvar="CSM_PARAMETERS_ABSOLUTE_PATH", + help="The folder containing the store files", + metavar="PATH", + type=str, + show_envvar=True, + required=True) +def load_from_singlestore( + single_store_host, + single_store_port, + single_store_db, + single_store_user, + single_store_password, + store_folder, + single_store_tables:str =""): + """Load data from SingleStore tables into the store. + Will download everything from a given SingleStore database following some configuration into the store. + Make use of the singlestoredb to access to SingleStore + More information is available on this page: + [https://docs.singlestore.com/cloud/developer-resources/connect-with-application-development-tools/connect-with-python/connect-using-the-singlestore-python-client/] + """ + + single_store_working_dir = store_folder + "/singlestore" + if not pathlib.Path.exists(single_store_working_dir): + pathlib.Path.mkdir(single_store_working_dir) + + start_full = time.perf_counter() + + conn = s2.connect(host=single_store_host, + port=single_store_port, + database=single_store_db, + user=single_store_user, + password=single_store_password, + results_type="dicts") + with conn: + with conn.cursor() as cur: + table_names = single_store_tables.split(",") + if not table_names: + cur.execute("SHOW TABLES") + table_names = cur.fetchall() + LOGGER.info(f"Tables to fetched: {table_names}") + for name in table_names: + get_data(name, single_store_working_dir, cur) + end_full = time.perf_counter() + LOGGER.info(f"Full dataset fetched and wrote in {round(end_full - start_full, 2)} seconds") + + for csv_path in pathlib.Path(single_store_working_dir).glob("*.csv"): + LOGGER.info(f"Found {csv_path.name}, storing it") + store_csv_file(csv_path.name[:-4], csv_path, store=Store(False, store_folder)) diff --git a/cosmotech/coal/cli/commands/store/store.py b/cosmotech/coal/cli/commands/store/store.py index cb3e21fc..19b764f0 100644 --- a/cosmotech/coal/cli/commands/store/store.py +++ b/cosmotech/coal/cli/commands/store/store.py @@ -11,6 +11,7 @@ from cosmotech.coal.cli.commands.store.dump_to_s3 import dump_to_s3 from cosmotech.coal.cli.commands.store.list_tables import list_tables from cosmotech.coal.cli.commands.store.load_csv_folder import load_csv_folder +from cosmotech.coal.cli.commands.store.load_from_singlestore import load_from_singlestore from cosmotech.coal.cli.commands.store.reset import reset from cosmotech.coal.cli.utils.click import click from cosmotech.coal.cli.utils.decorators import web_help @@ -30,6 +31,7 @@ def store(): store.add_command(reset, "reset") store.add_command(list_tables, "list-tables") store.add_command(load_csv_folder, "load-csv-folder") +store.add_command(load_from_singlestore, "load-from-singlestore") store.add_command(dump_to_postgresql, "dump-to-postgresql") store.add_command(dump_to_s3, "dump-to-s3") store.add_command(dump_to_azure, "dump-to-azure") diff --git a/docs/csm-data/store/load-from-singlestore.md b/docs/csm-data/store/load-from-singlestore.md new file mode 100644 index 00000000..3870ea91 --- /dev/null +++ b/docs/csm-data/store/load-from-singlestore.md @@ -0,0 +1,11 @@ +--- +hide: + - toc +description: "Command help: `csm-data store load-from-singlestore`" +--- +# load-from-singlestore + +!!! info "Help command" + ```text + --8<-- "generated/commands_help/csm-data/store/load-from-singlestore.txt" + ``` diff --git a/requirements.txt b/requirements.txt index a2a8dbd6..cba267aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,7 @@ cosmotech-api~=3.2 # Commands requirements boto3~=1.34 requests~=2.32.3 +singlestoredb~=1.10.0 # Orchestrator templates requirements cosmotech-run-orchestrator~=1.6.0