diff --git a/guepard_pandas/data.csv b/guepard_pandas/data.csv new file mode 100644 index 0000000000000..b08ed430872f3 --- /dev/null +++ b/guepard_pandas/data.csv @@ -0,0 +1,3 @@ +id,nom +1,nour +2,kobbi \ No newline at end of file diff --git a/guepard_pandas/guepard_dataframe.ipynb b/guepard_pandas/guepard_dataframe.ipynb new file mode 100644 index 0000000000000..4e8badf08fcf7 --- /dev/null +++ b/guepard_pandas/guepard_dataframe.ipynb @@ -0,0 +1,332 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import os\n", + "import pickle\n", + "from datetime import datetime\n", + "\n", + "class GuepardDataFrame(pd.DataFrame):\n", + " def __init__(self, *args, **kwargs):\n", + " version_dir = kwargs.pop('version_dir', './versions')\n", + " super().__init__(*args, **kwargs)\n", + " self.current_version_path = os.path.join(version_dir, 'current_version.pkl')\n", + " self.version_dir = version_dir\n", + " self.versions_meta_file = os.path.join(version_dir, 'versions_meta.pkl')\n", + " if not os.path.exists(self.version_dir):\n", + " os.makedirs(self.version_dir)\n", + " if 'data' in kwargs:\n", + " self._load_data(kwargs['data'])\n", + " else:\n", + " self._load_current_version()\n", + " \n", + " def _load_data(self, data):\n", + " super().__init__(data)\n", + " \n", + " def _load_current_version(self):\n", + " if os.path.exists(self.current_version_path):\n", + " with open(self.current_version_path, 'rb') as f:\n", + " df = pickle.load(f)\n", + " super().__init__(df)\n", + " \n", + " def commit(self, message=\"\"):\n", + " version_id = self._generate_version_id()\n", + " self._save_current_version()\n", + " self._store_version_meta(version_id, message)\n", + " return version_id\n", + " \n", + " def _save_current_version(self):\n", + " with open(self.current_version_path, 'wb') as f:\n", + " pickle.dump(self, f)\n", + " \n", + " def _store_version_meta(self, version_id, message):\n", + " versions_meta = self._load_versions_meta()\n", + " versions_meta.append({'version_id': version_id, 'message': message, 'timestamp': datetime.now()})\n", + " with open(self.versions_meta_file, 'wb') as f:\n", + " pickle.dump(versions_meta, f)\n", + " \n", + " def _load_versions_meta(self):\n", + " if os.path.exists(self.versions_meta_file):\n", + " with open(self.versions_meta_file, 'rb') as f:\n", + " return pickle.load(f)\n", + " return []\n", + " \n", + " def list_versions(self):\n", + " versions_meta = self._load_versions_meta()\n", + " return [{'version_id': meta['version_id'], 'message': meta['message'], 'timestamp': meta['timestamp']} for meta in versions_meta]\n", + " \n", + " def rollback(self, version_id):\n", + " version_path = os.path.join(self.version_dir, f\"{version_id}.pkl\")\n", + " if not os.path.exists(version_path):\n", + " raise ValueError(\"Version ID not found\")\n", + " with open(version_path, 'rb') as f:\n", + " df = pickle.load(f)\n", + " self._load_data(df)\n", + " self._save_current_version()\n", + " \n", + " def save_version(self, version_id):\n", + " version_path = os.path.join(self.version_dir, f\"{version_id}.pkl\")\n", + " with open(version_path, 'wb') as f:\n", + " pickle.dump(self, f)\n", + " \n", + " def _generate_version_id(self):\n", + " return datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n", + "\n", + " def get_current_version(self):\n", + " if os.path.exists(self.current_version_path):\n", + " with open(self.current_version_path, 'rb') as f:\n", + " df = pickle.load(f)\n", + " return df\n", + " else:\n", + " return None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Initial DataFrame:\n", + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n" + ] + } + ], + "source": [ + "\n", + "df = GuepardDataFrame(pd.read_csv(\"data.csv\"), version_dir=\"./versions\")\n", + "\n", + "print(\"Initial DataFrame:\")\n", + "print(df)\n", + "\n", + "initial_version_id = df.commit(\"Initial version\")\n", + "df.save_version(initial_version_id)\n", + "\n", + "new_rows = pd.DataFrame({\n", + " 'id': [3, 4],\n", + " 'nom': ['alice', 'bob']\n", + "})\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob\n", + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob\n" + ] + } + ], + "source": [ + "df=pd.concat([df, new_rows], ignore_index=True)\n", + "df = GuepardDataFrame(data=df, version_dir=\"./versions\")\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idnom
01nour
12kobbi
23alice
34bob
\n", + "
" + ], + "text/plain": [ + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "# Commit the changes\n", + "new_version_id = df.commit(\"Added specific rows\")\n", + "df.save_version(new_version_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Available versions:\n", + "{'version_id': '20250328_033906', 'message': 'Initial version', 'timestamp': datetime.datetime(2025, 3, 28, 3, 39, 6, 470004)}\n", + "{'version_id': '20250328_033915', 'message': 'Added specific rows', 'timestamp': datetime.datetime(2025, 3, 28, 3, 39, 15, 927749)}\n" + ] + } + ], + "source": [ + "# List versions\n", + "print(\"\\nAvailable versions:\")\n", + "for version in df.list_versions():\n", + " print(version)" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "DataFrame after rollback to version 20250328_033906:\n", + " id nom\n", + "0 1 nour\n", + "1 2 kobbi\n", + "2 3 alice\n", + "3 4 bob\n" + ] + } + ], + "source": [ + "# Rollback to the initial version\n", + "df.rollback(version_id='20250328_033915')\n", + "print(f\"\\nDataFrame after rollback to version {initial_version_id}:\")\n", + "print(df)" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Current Version DataFrame:\n", + "None\n" + ] + } + ], + "source": [ + "current_version = df.get_current_version()\n", + "print(\"\\nCurrent Version DataFrame:\")\n", + "print(current_version)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.2" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/guepard_pandas/guepard_dataframe.py b/guepard_pandas/guepard_dataframe.py new file mode 100644 index 0000000000000..1bfa16bb1f564 --- /dev/null +++ b/guepard_pandas/guepard_dataframe.py @@ -0,0 +1,65 @@ +import pandas as pd +import os +import pickle +from datetime import datetime + +class GuepardDataFrame(pd.DataFrame): + def __init__(self, *args, **kwargs): + version_dir = kwargs.pop('version_dir', './versions') + super().__init__(*args, **kwargs) + self.current_version_path = os.path.join(version_dir, 'current_version.pkl') + self.version_dir = version_dir + self.versions_meta_file = os.path.join(version_dir, 'versions_meta.pkl') + if not os.path.exists(self.version_dir): + os.makedirs(self.version_dir) + self._load_current_version() + + def _load_current_version(self): + if os.path.exists(self.current_version_path): + with open(self.current_version_path, 'rb') as f: + df = pickle.load(f) + super().__init__(df) + + def commit(self, message=""): + version_id = self._generate_version_id() + self._save_current_version() + self._store_version_meta(version_id, message) + return version_id + + def _save_current_version(self): + with open(self.current_version_path, 'wb') as f: + pickle.dump(self, f) + + def _store_version_meta(self, version_id, message): + versions_meta = self._load_versions_meta() + versions_meta.append({'version_id': version_id, 'message': message, 'timestamp': datetime.now()}) + with open(self.versions_meta_file, 'wb') as f: + pickle.dump(versions_meta, f) + + def _load_versions_meta(self): + if os.path.exists(self.versions_meta_file): + with open(self.versions_meta_file, 'rb') as f: + return pickle.load(f) + return [] + + def list_versions(self): + versions_meta = self._load_versions_meta() + return [{'version_id': meta['version_id'], 'message': meta['message'], 'timestamp': meta['timestamp']} for meta in versions_meta] + + def rollback(self, version_id): + version_path = os.path.join(self.version_dir, f"{version_id}.pkl") + if not os.path.exists(version_path): + raise ValueError("Version ID not found") + with open(version_path, 'rb') as f: + df = pickle.load(f) + self.__init__(df) + self._save_current_version() + + def save_version(self, version_id): + version_path = os.path.join(self.version_dir, f"{version_id}.pkl") + with open(version_path, 'wb') as f: + pickle.dump(self, f) + + def _generate_version_id(self): + return datetime.now().strftime("%Y%m%d_%H%M%S") + diff --git a/guepard_pandas/readme.md b/guepard_pandas/readme.md new file mode 100644 index 0000000000000..84da96992af03 --- /dev/null +++ b/guepard_pandas/readme.md @@ -0,0 +1,48 @@ +# Guepard-Pandas Wrapper + +## Introduction +The Guepard-Pandas Wrapper is an extension of the Pandas DataFrame that integrates seamlessly with Guepard’s data versioning capabilities. This wrapper allows data engineers to use DataFrames as usual while automatically tracking versions, enabling rollback, and maintaining historical snapshots without additional effort. + +## Features +- Automated version tracking for DataFrames. +- Easy rollback to previous states. +- Seamless integration with Guepard, ensuring efficient storage and retrieval. + +## Example Usage +```python +import pandas as pd +from guepard_pandas.guepard_dataframe import GuepardDataFrame + +# Load a DataFrame +df = GuepardDataFrame(pd.read_csv("data.csv"), dataset_id="1234") + +# Modify it +df["new_col"] = df["existing_col"] * 2 + +# Commit the changes +df.commit("Added new column") + +# List versions +print(df.list_versions()) + +# Rollback to an older version +df.rollback(version_id="20240326_123456") +``` + +## Implementation Plan +1. Prototype Development + - Extend `pd.DataFrame` with versioning methods. + - Implement basic version storage using Parquet or Pickle. + +2. Integration with Guepard API + - Store versions directly in Guepard’s data management system. + - Optimize performance for large DataFrames. + +3. Testing & Optimization + - Benchmark storage and retrieval performance. + - Validate Pandas compatibility. + +## Conclusion +This wrapper offers an elegant solution to integrate version control within Pandas using Guepard, enhancing data engineering workflows while maintaining full compatibility with Pandas. + +Next Steps: Review feedback and develop a proof of concept. \ No newline at end of file