From c7a089399cf859067385d86d430e54450924f7e5 Mon Sep 17 00:00:00 2001 From: KobbiNour13 Date: Fri, 28 Mar 2025 02:44:13 +0100 Subject: [PATCH 1/3] FEAT: Add GuepardDataFrame for automated version tracking and rollback functionality --- guepard_pandas/guepard_dataframe.py | 36 ++++++++++++++++++++++ guepard_pandas/readme.md | 48 +++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 guepard_pandas/guepard_dataframe.py create mode 100644 guepard_pandas/readme.md diff --git a/guepard_pandas/guepard_dataframe.py b/guepard_pandas/guepard_dataframe.py new file mode 100644 index 0000000000000..3f5450eedb1f8 --- /dev/null +++ b/guepard_pandas/guepard_dataframe.py @@ -0,0 +1,36 @@ +import pandas as pd +import requests + +class GuepardDataFrame(pd.DataFrame): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.api_url = "https://api.guepard.com" + self.dataset_id = kwargs.get('dataset_id', 'default') + + def commit(self, message=""): + version_id = self._generate_version_id() + data = self.to_parquet() + response = requests.post(f"{self.api_url}/datasets/{self.dataset_id}/versions", + files={"data": data}, + data={"message": message, "version_id": version_id}) + response.raise_for_status() + return version_id + + def list_versions(self): + response = requests.get(f"{self.api_url}/datasets/{self.dataset_id}/versions") + response.raise_for_status() + return response.json() + + def rollback(self, version_id): + response = requests.get(f"{self.api_url}/datasets/{self.dataset_id}/versions/{version_id}") + response.raise_for_status() + data = response.content + df = pd.read_parquet(data) + self.__init__(df) + + def next_version(self): + return self.commit() + + def _generate_version_id(self): + from datetime import datetime + return datetime.now().strftime("%Y%m%d_%H%M%S") \ No newline at end of file diff --git a/guepard_pandas/readme.md b/guepard_pandas/readme.md new file mode 100644 index 0000000000000..84da96992af03 --- /dev/null +++ b/guepard_pandas/readme.md @@ -0,0 +1,48 @@ +# Guepard-Pandas Wrapper + +## Introduction +The Guepard-Pandas Wrapper is an extension of the Pandas DataFrame that integrates seamlessly with Guepard’s data versioning capabilities. This wrapper allows data engineers to use DataFrames as usual while automatically tracking versions, enabling rollback, and maintaining historical snapshots without additional effort. + +## Features +- Automated version tracking for DataFrames. +- Easy rollback to previous states. +- Seamless integration with Guepard, ensuring efficient storage and retrieval. + +## Example Usage +```python +import pandas as pd +from guepard_pandas.guepard_dataframe import GuepardDataFrame + +# Load a DataFrame +df = GuepardDataFrame(pd.read_csv("data.csv"), dataset_id="1234") + +# Modify it +df["new_col"] = df["existing_col"] * 2 + +# Commit the changes +df.commit("Added new column") + +# List versions +print(df.list_versions()) + +# Rollback to an older version +df.rollback(version_id="20240326_123456") +``` + +## Implementation Plan +1. Prototype Development + - Extend `pd.DataFrame` with versioning methods. + - Implement basic version storage using Parquet or Pickle. + +2. Integration with Guepard API + - Store versions directly in Guepard’s data management system. + - Optimize performance for large DataFrames. + +3. Testing & Optimization + - Benchmark storage and retrieval performance. + - Validate Pandas compatibility. + +## Conclusion +This wrapper offers an elegant solution to integrate version control within Pandas using Guepard, enhancing data engineering workflows while maintaining full compatibility with Pandas. + +Next Steps: Review feedback and develop a proof of concept. \ No newline at end of file From 4e1f823ff979fbdb92a2c17e5cc59f3a54f9523b Mon Sep 17 00:00:00 2001 From: KobbiNour13 Date: Fri, 28 Mar 2025 02:47:54 +0100 Subject: [PATCH 2/3] FEAT: Implement version tracking and rollback functionality in GuepardDataFrame --- guepard_pandas/guepard_dataframe.py | 45 ++++++++++++++++++----------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/guepard_pandas/guepard_dataframe.py b/guepard_pandas/guepard_dataframe.py index 3f5450eedb1f8..a1eb33cf0de9c 100644 --- a/guepard_pandas/guepard_dataframe.py +++ b/guepard_pandas/guepard_dataframe.py @@ -1,36 +1,47 @@ import pandas as pd -import requests +import os +import pickle +from datetime import datetime class GuepardDataFrame(pd.DataFrame): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.api_url = "https://api.guepard.com" - self.dataset_id = kwargs.get('dataset_id', 'default') + self.version_dir = kwargs.pop('version_dir', './versions') + if not os.path.exists(self.version_dir): + os.makedirs(self.version_dir) def commit(self, message=""): version_id = self._generate_version_id() - data = self.to_parquet() - response = requests.post(f"{self.api_url}/datasets/{self.dataset_id}/versions", - files={"data": data}, - data={"message": message, "version_id": version_id}) - response.raise_for_status() + version_path = os.path.join(self.version_dir, f"{version_id}.pkl") + with open(version_path, 'wb') as f: + pickle.dump(self, f) return version_id def list_versions(self): - response = requests.get(f"{self.api_url}/datasets/{self.dataset_id}/versions") - response.raise_for_status() - return response.json() + versions = [] + for filename in os.listdir(self.version_dir): + if filename.endswith(".pkl"): + version_id = filename.split('.')[0] + versions.append(version_id) + return versions def rollback(self, version_id): - response = requests.get(f"{self.api_url}/datasets/{self.dataset_id}/versions/{version_id}") - response.raise_for_status() - data = response.content - df = pd.read_parquet(data) + version_path = os.path.join(self.version_dir, f"{version_id}.pkl") + if not os.path.exists(version_path): + raise ValueError("Version ID not found") + with open(version_path, 'rb') as f: + df = pickle.load(f) self.__init__(df) def next_version(self): return self.commit() def _generate_version_id(self): - from datetime import datetime - return datetime.now().strftime("%Y%m%d_%H%M%S") \ No newline at end of file + return datetime.now().strftime("%Y%m%d_%H%M%S") + +# Example usage: +# df = GuepardDataFrame(pd.read_csv("data.csv"), version_dir="path/to/versions") +# df["new_col"] = df["existing_col"] * 2 +# df.commit("Added new column") +# print(df.list_versions()) +# df.rollback(version_id="20240326_123456") \ No newline at end of file From 2cb40d1b3972b514fc95427927c076a10781ad36 Mon Sep 17 00:00:00 2001 From: KobbiNour13 Date: Fri, 28 Mar 2025 03:43:02 +0100 Subject: [PATCH 3/3] FEAT: Enhance GuepardDataFrame with version management and rollback capabilities --- guepard_pandas/data.csv | 3 + guepard_pandas/guepard_dataframe.ipynb | 332 +++++++++++++++++++++++++ guepard_pandas/guepard_dataframe.py | 56 +++-- 3 files changed, 372 insertions(+), 19 deletions(-) create mode 100644 guepard_pandas/data.csv create mode 100644 guepard_pandas/guepard_dataframe.ipynb diff --git a/guepard_pandas/data.csv b/guepard_pandas/data.csv new file mode 100644 index 0000000000000..b08ed430872f3 --- /dev/null +++ b/guepard_pandas/data.csv @@ -0,0 +1,3 @@ +id,nom +1,nour +2,kobbi \ No newline at end of file diff --git a/guepard_pandas/guepard_dataframe.ipynb b/guepard_pandas/guepard_dataframe.ipynb new file mode 100644 index 0000000000000..4e8badf08fcf7 --- /dev/null +++ b/guepard_pandas/guepard_dataframe.ipynb @@ -0,0 +1,332 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import os\n", + "import pickle\n", + "from datetime import datetime\n", + "\n", + "class GuepardDataFrame(pd.DataFrame):\n", + " def __init__(self, *args, **kwargs):\n", + " version_dir = kwargs.pop('version_dir', './versions')\n", + " super().__init__(*args, **kwargs)\n", + " self.current_version_path = os.path.join(version_dir, 'current_version.pkl')\n", + " self.version_dir = version_dir\n", + " self.versions_meta_file = os.path.join(version_dir, 'versions_meta.pkl')\n", + " if not os.path.exists(self.version_dir):\n", + " os.makedirs(self.version_dir)\n", + " if 'data' in kwargs:\n", + " self._load_data(kwargs['data'])\n", + " else:\n", + " self._load_current_version()\n", + " \n", + " def _load_data(self, data):\n", + " super().__init__(data)\n", + " \n", + " def _load_current_version(self):\n", + " if os.path.exists(self.current_version_path):\n", + " with open(self.current_version_path, 'rb') as f:\n", + " df = pickle.load(f)\n", + " super().__init__(df)\n", + " \n", + " def commit(self, message=\"\"):\n", + " version_id = self._generate_version_id()\n", + " self._save_current_version()\n", + " self._store_version_meta(version_id, message)\n", + " return version_id\n", + " \n", + " def _save_current_version(self):\n", + " with open(self.current_version_path, 'wb') as f:\n", + " pickle.dump(self, f)\n", + " \n", + " def _store_version_meta(self, version_id, message):\n", + " versions_meta = self._load_versions_meta()\n", + " versions_meta.append({'version_id': version_id, 'message': message, 'timestamp': datetime.now()})\n", + " with open(self.versions_meta_file, 'wb') as f:\n", + " pickle.dump(versions_meta, f)\n", + " \n", + " def _load_versions_meta(self):\n", + " if os.path.exists(self.versions_meta_file):\n", + " with open(self.versions_meta_file, 'rb') as f:\n", + " return pickle.load(f)\n", + " return []\n", + " \n", + " def list_versions(self):\n", + " versions_meta = self._load_versions_meta()\n", + " return [{'version_id': meta['version_id'], 'message': meta['message'], 'timestamp': meta['timestamp']} for meta in versions_meta]\n", + " \n", + " def rollback(self, version_id):\n", + " version_path = os.path.join(self.version_dir, f\"{version_id}.pkl\")\n", + " if not os.path.exists(version_path):\n", + " raise ValueError(\"Version ID not found\")\n", + " with open(version_path, 'rb') as f:\n", + " df = pickle.load(f)\n", + " self._load_data(df)\n", + " self._save_current_version()\n", + " \n", + " def save_version(self, version_id):\n", + " version_path = os.path.join(self.version_dir, f\"{version_id}.pkl\")\n", + " with open(version_path, 'wb') as f:\n", + " pickle.dump(self, f)\n", + " \n", + " def _generate_version_id(self):\n", + " return datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n", + "\n", + " def get_current_version(self):\n", + " if os.path.exists(self.current_version_path):\n", + " with open(self.current_version_path, 'rb') as f:\n", + " df = pickle.load(f)\n", + " return df\n", + " else:\n", + " return None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Initial DataFrame:\n", + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n" + ] + } + ], + "source": [ + "\n", + "df = GuepardDataFrame(pd.read_csv(\"data.csv\"), version_dir=\"./versions\")\n", + "\n", + "print(\"Initial DataFrame:\")\n", + "print(df)\n", + "\n", + "initial_version_id = df.commit(\"Initial version\")\n", + "df.save_version(initial_version_id)\n", + "\n", + "new_rows = pd.DataFrame({\n", + " 'id': [3, 4],\n", + " 'nom': ['alice', 'bob']\n", + "})\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob\n", + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob\n" + ] + } + ], + "source": [ + "df=pd.concat([df, new_rows], ignore_index=True)\n", + "df = GuepardDataFrame(data=df, version_dir=\"./versions\")\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idnom
01nour
12kobbi
23alice
34bob
\n", + "
" + ], + "text/plain": [ + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "# Commit the changes\n", + "new_version_id = df.commit(\"Added specific rows\")\n", + "df.save_version(new_version_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Available versions:\n", + "{'version_id': '20250328_033906', 'message': 'Initial version', 'timestamp': datetime.datetime(2025, 3, 28, 3, 39, 6, 470004)}\n", + "{'version_id': '20250328_033915', 'message': 'Added specific rows', 'timestamp': datetime.datetime(2025, 3, 28, 3, 39, 15, 927749)}\n" + ] + } + ], + "source": [ + "# List versions\n", + "print(\"\\nAvailable versions:\")\n", + "for version in df.list_versions():\n", + " print(version)" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "DataFrame after rollback to version 20250328_033906:\n", + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob\n" + ] + } + ], + "source": [ + "# Rollback to the initial version\n", + "df.rollback(version_id='20250328_033915')\n", + "print(f\"\\nDataFrame after rollback to version {initial_version_id}:\")\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Current Version DataFrame:\n", + "None\n" + ] + } + ], + "source": [ + "current_version = df.get_current_version()\n", + "print(\"\\nCurrent Version DataFrame:\")\n", + "print(current_version)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.2" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/guepard_pandas/guepard_dataframe.py b/guepard_pandas/guepard_dataframe.py index a1eb33cf0de9c..1bfa16bb1f564 100644 --- a/guepard_pandas/guepard_dataframe.py +++ b/guepard_pandas/guepard_dataframe.py @@ -5,25 +5,46 @@ class GuepardDataFrame(pd.DataFrame): def __init__(self, *args, **kwargs): + version_dir = kwargs.pop('version_dir', './versions') super().__init__(*args, **kwargs) - self.version_dir = kwargs.pop('version_dir', './versions') + self.current_version_path = os.path.join(version_dir, 'current_version.pkl') + self.version_dir = version_dir + self.versions_meta_file = os.path.join(version_dir, 'versions_meta.pkl') if not os.path.exists(self.version_dir): os.makedirs(self.version_dir) + self._load_current_version() + + def _load_current_version(self): + if os.path.exists(self.current_version_path): + with open(self.current_version_path, 'rb') as f: + df = pickle.load(f) + super().__init__(df) def commit(self, message=""): version_id = self._generate_version_id() - version_path = os.path.join(self.version_dir, f"{version_id}.pkl") - with open(version_path, 'wb') as f: - pickle.dump(self, f) + self._save_current_version() + self._store_version_meta(version_id, message) return version_id + def _save_current_version(self): + with open(self.current_version_path, 'wb') as f: + pickle.dump(self, f) + + def _store_version_meta(self, version_id, message): + versions_meta = self._load_versions_meta() + versions_meta.append({'version_id': version_id, 'message': message, 'timestamp': datetime.now()}) + with open(self.versions_meta_file, 'wb') as f: + pickle.dump(versions_meta, f) + + def _load_versions_meta(self): + if os.path.exists(self.versions_meta_file): + with open(self.versions_meta_file, 'rb') as f: + return pickle.load(f) + return [] + def list_versions(self): - versions = [] - for filename in os.listdir(self.version_dir): - if filename.endswith(".pkl"): - version_id = filename.split('.')[0] - versions.append(version_id) - return versions + versions_meta = self._load_versions_meta() + return [{'version_id': meta['version_id'], 'message': meta['message'], 'timestamp': meta['timestamp']} for meta in versions_meta] def rollback(self, version_id): version_path = os.path.join(self.version_dir, f"{version_id}.pkl") @@ -32,16 +53,13 @@ def rollback(self, version_id): with open(version_path, 'rb') as f: df = pickle.load(f) self.__init__(df) + self._save_current_version() + + def save_version(self, version_id): + version_path = os.path.join(self.version_dir, f"{version_id}.pkl") + with open(version_path, 'wb') as f: + pickle.dump(self, f) - def next_version(self): - return self.commit() - def _generate_version_id(self): return datetime.now().strftime("%Y%m%d_%H%M%S") -# Example usage: -# df = GuepardDataFrame(pd.read_csv("data.csv"), version_dir="path/to/versions") -# df["new_col"] = df["existing_col"] * 2 -# df.commit("Added new column") -# print(df.list_versions()) -# df.rollback(version_id="20240326_123456") \ No newline at end of file