diff --git a/.github/workflows/daily_collection.yaml b/.github/workflows/daily_collection.yaml index b174bc5..1545d31 100644 --- a/.github/workflows/daily_collection.yaml +++ b/.github/workflows/daily_collection.yaml @@ -7,13 +7,23 @@ on: description: Slack channel to post the error message to if the builds fail. required: false default: "sdv-alerts-debug" + max_days_pypi: + description: 'Maximum number of days to collect, starting from today for PyPI.' + required: false + type: number + default: 30 + max_days_anaconda: + description: 'Maximum number of days to collect, starting from today for Anaconda' + required: false + type: number + default: 90 schedule: - cron: '0 0 * * *' jobs: collect: runs-on: ubuntu-latest-large - timeout-minutes: 20 + timeout-minutes: 25 steps: - uses: actions/checkout@v4 - name: Install uv @@ -21,21 +31,31 @@ jobs: with: enable-cache: true activate-environment: true + cache-dependency-glob: | + **/pyproject.toml + **/__main__.py - name: Install pip and dependencies run: | uv pip install -U pip - uv pip install -e . - - name: Collect Downloads Data + uv pip install . + - name: Collect PyPI Downloads run: | - uv run download-analytics collect \ + uv run download-analytics collect-pypi \ --verbose \ - --max-days 30 \ + --max-days ${{ inputs.max_days_pypi || 30 }} \ --add-metrics \ --output-folder gdrive://10QHbqyvptmZX4yhu2Y38YJbVHqINRr0n env: PYDRIVE_CREDENTIALS: ${{ secrets.PYDRIVE_CREDENTIALS }} BIGQUERY_CREDENTIALS: ${{ secrets.BIGQUERY_CREDENTIALS }} - + - name: Collect Anaconda Downloads + run: | + uv run download-analytics collect-anaconda \ + --output-folder gdrive://1UnDYovLkL4gletOF5328BG1X59mSHF-Z \ + --max-days ${{ inputs.max_days_anaconda || 90 }} \ + --verbose + env: + PYDRIVE_CREDENTIALS: ${{ secrets.PYDRIVE_CREDENTIALS }} alert: needs: [collect] runs-on: ubuntu-latest diff --git a/.github/workflows/daily_summarize.yaml b/.github/workflows/daily_summarize.yaml index 9a30761..b5c731e 100644 --- a/.github/workflows/daily_summarize.yaml +++ b/.github/workflows/daily_summarize.yaml @@ -21,6 +21,9 @@ jobs: with: enable-cache: true activate-environment: true + cache-dependency-glob: | + **/pyproject.toml + **/__main__.py - name: Install pip and dependencies run: | uv pip install -U pip diff --git a/.github/workflows/dryrun.yaml b/.github/workflows/dryrun.yaml index 8ebc06f..9811928 100644 --- a/.github/workflows/dryrun.yaml +++ b/.github/workflows/dryrun.yaml @@ -10,7 +10,8 @@ concurrency: cancel-in-progress: true jobs: dry_run: - runs-on: ubuntu-latest + runs-on: ubuntu-latest-large + timeout-minutes: 25 steps: - uses: actions/checkout@v4 - name: Install uv @@ -18,13 +19,16 @@ jobs: with: enable-cache: true activate-environment: true + cache-dependency-glob: | + **/pyproject.toml + **/__main__.py - name: Install pip and dependencies run: | uv pip install -U pip uv pip install . - - name: Collect Downloads Data - Dry Run + - name: Collect PyPI Downloads - Dry Run run: | - uv run download-analytics collect \ + uv run download-analytics collect-pypi \ --verbose \ --max-days 30 \ --add-metrics \ @@ -33,7 +37,16 @@ jobs: env: PYDRIVE_CREDENTIALS: ${{ secrets.PYDRIVE_CREDENTIALS }} BIGQUERY_CREDENTIALS: ${{ secrets.BIGQUERY_CREDENTIALS }} - - name: Run Summarize - Dry Run + - name: Collect Anaconda Downloads - Dry Run + run: | + uv run download-analytics collect-anaconda \ + --output-folder gdrive://1UnDYovLkL4gletOF5328BG1X59mSHF-Z \ + --max-days 90 \ + --verbose \ + --dry-run + env: + PYDRIVE_CREDENTIALS: ${{ secrets.PYDRIVE_CREDENTIALS }} + - name: Summarize - Dry Run run: | uv run download-analytics summarize \ --verbose \ diff --git a/.github/workflows/manual.yaml b/.github/workflows/manual.yaml index cffe1df..1dc642c 100644 --- a/.github/workflows/manual.yaml +++ b/.github/workflows/manual.yaml @@ -36,7 +36,7 @@ jobs: uv pip install . - name: Collect Downloads Data run: | - uv run download-analytics collect \ + uv run download-analytics collect-pypi \ --verbose \ --projects ${{ github.event.inputs.projects }} \ ${{ github.event.inputs.max_days && '--max-days ' || '' }} \ diff --git a/README.md b/README.md index 5dcba58..824eca0 100644 --- a/README.md +++ b/README.md @@ -10,16 +10,24 @@ engagement metrics. ### Data sources -Currently the download data is coming from the following distributions: +Currently the download data is collected from the following distributions: * [PyPI](https://pypi.org/): Information about the project downloads from [PyPI](https://pypi.org/) obtained from the public Big Query dataset, equivalent to the information shown on [pepy.tech](https://pepy.tech). +* [conda-forge](https://conda-forge.org/): Information about the project downloads from the + `conda-forge` channel on `conda`. + - The conda package download data provided by Anaconda. It includes package download counts + starting from January 2017. More information: + - https://github.com/anaconda/anaconda-package-data + - The conda package metadata data provided by Anaconda. There is a public API which allows for + the retrieval of package information, including current number of downloads. + - https://api.anaconda.org/package/{username}/{package_name} + - Replace {username} with the Anaconda username (`conda-forge`) and {package_name} with + the specific package name (`sdv`). In the future, we may also expand the source distributions to include: -* [conda-forge](https://conda-forge.org/): Information about the project downloads from the - `conda-forge` channel on `conda`. * [github](https://github.com/): Information about the project downloads from github releases. For more information about how to configure and use the software, or about the data that is being diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index 19c80fe..603eb30 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -33,14 +33,14 @@ For development, run `make install-develop` instead. ## Command Line Interface After the installation, a new `download-analytics` command will have been registered inside your -`virtualenv`. This command can be used in conjunction with the `collect` action to collect +`virtualenv`. This command can be used in conjunction with the `collect-pypi` action to collect downloads data from BigQuery and store the output locally or in Google Drive. Here is the entire list of arguments that the command line has: ```bash -$ download-analytics collect --help -usage: download-analytics collect [-h] [-v] [-l LOGFILE] [-o OUTPUT_FOLDER] [-a AUTHENTICATION_CREDENTIALS] +$ download-analytics collect-pypi --help +usage: download-analytics collect-pypi [-h] [-v] [-l LOGFILE] [-o OUTPUT_FOLDER] [-a AUTHENTICATION_CREDENTIALS] [-c CONFIG_FILE] [-p [PROJECTS [PROJECTS ...]]] [-s START_DATE] [-m MAX_DAYS] [-d] [-f] [-M] @@ -73,7 +73,7 @@ and store the downloads data into a Google Drive folder alongside the correspond metric spreadsheets would look like this: ```bash -$ download-analytics collect --verbose --projects sdv ctgan --start-date 2021-01-01 \ +$ download-analytics collect-pypi --verbose --projects sdv ctgan --start-date 2021-01-01 \ --add-metrics --output-folder gdrive://10QHbqyvptmZX4yhu2Y38YJbVHqINRr0n ``` diff --git a/docs/SETUP.md b/docs/SETUP.md index 7c2cdef..d30a806 100644 --- a/docs/SETUP.md +++ b/docs/SETUP.md @@ -31,10 +31,10 @@ if contains the application KEY which should never be made public. Once the file is created, you can follow these steps: -1. Run the `download-analytics collect` command. If the `settings.yaml` file has been properly +1. Run the `download-analytics collect-pypi` command. If the `settings.yaml` file has been properly created, this will **open a new tab on your web browser**, where you need to authenticate. - | ![pydrive-collect](imgs/pydrive-collect.png "Run the `download-analytics collect` Command") | + | ![pydrive-collect](imgs/pydrive-collect.png "Run the `download-analytics collect-pypi` Command") | | - | 2. Click on the Google account which you which to authenticate with. Notice that the account that @@ -67,7 +67,7 @@ be provided to you by a privileged admin. Once you have this JSON file, you have two options: 1. Pass the path to the authentication file with the `-a` or `--authentication-credentials` - argument to the `download-analytics collect` command. + argument to the `download-analytics collect-pypi` command. | ![bigquery-a](imgs/bigquery-a.png "Pass the credentials on command line") | | - | diff --git a/download_analytics/__main__.py b/download_analytics/__main__.py index fef9256..d0ebcec 100644 --- a/download_analytics/__main__.py +++ b/download_analytics/__main__.py @@ -9,6 +9,7 @@ import yaml +from download_analytics.anaconda import collect_anaconda_downloads from download_analytics.main import collect_downloads from download_analytics.summarize import summarize_downloads @@ -44,7 +45,7 @@ def _load_config(config_path): return config -def _collect(args): +def _collect_pypi(args): config = _load_config(args.config_file) projects = args.projects or config['projects'] output_folder = args.output_folder or config.get('output-folder', '.') @@ -62,6 +63,19 @@ def _collect(args): ) +def _collect_anaconda(args): + config = _load_config(args.config_file) + projects = config['projects'] + output_folder = args.output_folder or config.get('output-folder', '.') + collect_anaconda_downloads( + projects=projects, + output_folder=output_folder, + max_days=args.max_days, + dry_run=args.dry_run, + verbose=args.verbose, + ) + + def _summarize(args): config = _load_config(args.config_file) projects = config['projects'] @@ -98,7 +112,12 @@ def _get_parser(): logging_args.add_argument( '-l', '--logfile', help='If given, file where the logs will be written.' ) - + logging_args.add_argument( + '-d', + '--dry-run', + action='store_true', + help='Do not upload the results. Just calculate them.', + ) parser = argparse.ArgumentParser( prog='download-analytics', description='Download Analytics Command Line Interface', @@ -109,10 +128,12 @@ def _get_parser(): action.required = True # collect - collect = action.add_parser('collect', help='Collect downloads data.', parents=[logging_args]) - collect.set_defaults(action=_collect) + collect_pypi = action.add_parser( + 'collect-pypi', help='Collect download data from PyPi.', parents=[logging_args] + ) + collect_pypi.set_defaults(action=_collect_pypi) - collect.add_argument( + collect_pypi.add_argument( '-o', '--output-folder', type=str, @@ -122,54 +143,48 @@ def _get_parser(): ' Google Drive folder path in the format gdrive://' ), ) - collect.add_argument( + collect_pypi.add_argument( '-a', '--authentication-credentials', type=str, required=False, help='Path to the GCP (BigQuery) credentials file to use.', ) - collect.add_argument( + collect_pypi.add_argument( '-c', '--config-file', type=str, default='config.yaml', help='Path to the configuration file.', ) - collect.add_argument( + collect_pypi.add_argument( '-p', '--projects', nargs='*', help='List of projects to collect. If not given use the configured ones.', default=None, ) - collect.add_argument( + collect_pypi.add_argument( '-s', '--start-date', type=_valid_date, required=False, help='Date from which to start pulling data.', ) - collect.add_argument( + collect_pypi.add_argument( '-m', '--max-days', type=int, required=False, help='Max days of data to pull if start-date is not given.', ) - collect.add_argument( - '-d', - '--dry-run', - action='store_true', - help='Do not run the actual query, only simulate it.', - ) - collect.add_argument( + collect_pypi.add_argument( '-f', '--force', action='store_true', help='Force the download even if the data already exists or there is a gap', ) - collect.add_argument( + collect_pypi.add_argument( '-M', '--add-metrics', action='store_true', @@ -205,11 +220,36 @@ def _get_parser(): ' Google Drive folder path in the format gdrive://' ), ) - summarize.add_argument( - '-d', - '--dry-run', - action='store_true', - help='Do not upload the summary results. Just calculate them.', + + # collect + collect_anaconda = action.add_parser( + 'collect-anaconda', help='Collect download data from Anaconda.', parents=[logging_args] + ) + collect_anaconda.set_defaults(action=_collect_anaconda) + collect_anaconda.add_argument( + '-c', + '--config-file', + type=str, + default='config.yaml', + help='Path to the configuration file.', + ) + collect_anaconda.add_argument( + '-o', + '--output-folder', + type=str, + required=False, + help=( + 'Path to the folder where data will be outputted. It can be a local path or a' + ' Google Drive folder path in the format gdrive://' + ), + ) + collect_anaconda.add_argument( + '-m', + '--max-days', + type=int, + required=False, + default=90, + help='Max days of data to pull.', ) return parser diff --git a/download_analytics/anaconda.py b/download_analytics/anaconda.py new file mode 100644 index 0000000..c4cbc3c --- /dev/null +++ b/download_analytics/anaconda.py @@ -0,0 +1,207 @@ +"""Functions to get Anaconda downloads from Anaconda S3 bucket.""" + +import logging +import os +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +import pandas as pd +import requests +from tqdm import tqdm + +from download_analytics.output import append_row, create_csv, get_path, load_csv +from download_analytics.time_utils import drop_duplicates_by_date + +LOGGER = logging.getLogger(__name__) +dir_path = os.path.dirname(os.path.realpath(__file__)) + + +BUCKET_NAME = 'anaconda-package-data' +PREVIOUS_ANACONDA_FILENAME = 'anaconda.csv' +PREVIOUS_ANACONDA_ORG_OVERALL_FILENAME = 'anaconda_org_overall.csv' +PREVIOUS_ANACONDA_ORG_VERSION_FILENAME = 'anaconda_org_per_version.csv' +TIME_COLUMN = 'time' +PKG_COLUMN = 'pkg_name' +ANACONDA_BUCKET_PATH = 's3://anaconda-package-data/conda' + + +def _read_anaconda_parquet(URL, pkg_names=None): + """Read parquet file in anaconda bucket.""" + storage_options = None + if 's3://' in URL: + storage_options = {'anon': True} + try: + df = pd.read_parquet( + URL, + storage_options=storage_options, + engine='pyarrow', + dtype_backend='pyarrow', + ) + df[TIME_COLUMN] = pd.to_datetime(df[TIME_COLUMN]) + if pkg_names: + df = df[df[PKG_COLUMN].isin(set(pkg_names))] + except FileNotFoundError: + return pd.DataFrame() + return df + + +def _anaconda_package_data_by_day(year, month, day, pkg_names=None): + """Anaconda download data on a per day basis. + + More information: https://github.com/anaconda/anaconda-package-data + + """ + padded_year = '{:04d}'.format(year) + padded_month = '{:02d}'.format(month) + padded_day = '{:02d}'.format(day) + + filename = f'{padded_year}-{padded_month}-{padded_day}.parquet' + URL = f'{ANACONDA_BUCKET_PATH}/hourly/{padded_year}/{padded_month}/{filename}' + return _read_anaconda_parquet(URL, pkg_names=pkg_names) + + +def anaconda_package_data_by_year_month(year, month, pkg_names=None): + """Anaconda download data on a per month basis. Unused. + + More information: https://github.com/anaconda/anaconda-package-data + + """ + padded_year = '{:04d}'.format(year) + padded_month = '{:02d}'.format(month) + filename = f'{padded_year}-{padded_month}.parquet' + URL = f'{ANACONDA_BUCKET_PATH}/monthly/{padded_year}/{filename}' + return _read_anaconda_parquet(URL, pkg_names=pkg_names) + + +def _get_previous_anaconda_downloads(output_folder, filename): + """Read anaconda.csv to get previous downloads.""" + read_csv_kwargs = { + 'parse_dates': [TIME_COLUMN], + } + csv_path = get_path(output_folder, filename) + previous = load_csv(csv_path, read_csv_kwargs=read_csv_kwargs) + return previous + + +def _get_downloads_from_anaconda_org(packages, channel='conda-forge'): + overall_downloads = pd.DataFrame(columns=['pkg_name', TIME_COLUMN, 'total_ndownloads']) + per_version_downloads = pd.DataFrame(columns=['pkg_name', 'version', TIME_COLUMN, 'ndownloads']) + + for pkg_name in packages: + URL = f'https://api.anaconda.org/package/{channel}/{pkg_name}' + timestamp = datetime.now(ZoneInfo('UTC')) + response = requests.get(URL) + row_info = {'pkg_name': [pkg_name], TIME_COLUMN: [timestamp], 'total_ndownloads': 0} + data = response.json() + total_ndownloads = 0 + if 'could not be found' in data.get('error', ''): + pass + else: + for files_info in data['files']: + total_ndownloads += files_info['ndownloads'] + + per_release_row = { + 'pkg_name': [pkg_name], + 'version': [files_info.get('version', None)], + TIME_COLUMN: [timestamp], + 'ndownloads': [files_info.get('ndownloads', 0)], + } + per_version_downloads = append_row(per_version_downloads, per_release_row) + + row_info['total_ndownloads'] = total_ndownloads + overall_downloads = append_row(overall_downloads, row_info) + return overall_downloads, per_version_downloads + + +def _collect_ananconda_downloads_from_website(projects, output_folder): + previous_overall = _get_previous_anaconda_downloads( + output_folder=output_folder, filename=PREVIOUS_ANACONDA_ORG_OVERALL_FILENAME + ) + previous_version = _get_previous_anaconda_downloads( + output_folder=output_folder, filename=PREVIOUS_ANACONDA_ORG_VERSION_FILENAME + ) + new_overall_downloads, new_version_downloads = _get_downloads_from_anaconda_org( + projects, + ) + overall_df = pd.concat([previous_overall, new_overall_downloads], ignore_index=True) + overall_df = drop_duplicates_by_date( + overall_df, time_column=TIME_COLUMN, group_by_columns=[PKG_COLUMN] + ) + + version_downloads = pd.concat([previous_version, new_version_downloads], ignore_index=True) + version_downloads = drop_duplicates_by_date( + version_downloads, time_column=TIME_COLUMN, group_by_columns=[PKG_COLUMN] + ) + return overall_df, version_downloads + + +def collect_anaconda_downloads( + projects, + output_folder, + max_days=90, + dry_run=False, + verbose=False, +): + """Pull data about the downloads of a list of projects from Anaconda. + + Args: + projects (list[str]): + List of projects to analyze. + output_folder (str): + Folder in which project downloads will be stored. + It can be passed as a local folder or as a Google Drive path in the format + `gdrive://{folder_id}`. + The folder must contain 'anaconda.csv', 'anaconda_org_overall.csv', + and 'anaconda_org_per_version.csv'. + max_days (int): + Maximum amount of days to include in the query from current date back, in case + `start_date` has not been provided. Defaults to 90 days. + dry_run (bool): + If `True`, do not upload the results. Defaults to `False`. + """ + overall_df, version_downloads = _collect_ananconda_downloads_from_website( + projects, output_folder=output_folder + ) + + previous = _get_previous_anaconda_downloads(output_folder, filename=PREVIOUS_ANACONDA_FILENAME) + previous = previous.sort_values(TIME_COLUMN) + + end_date = datetime.now(tz=ZoneInfo('UTC')).date() + start_date = end_date - timedelta(days=max_days) + LOGGER.info(f'Getting daily anaconda data for start_date>={start_date} to end_date<{end_date}') + date_ranges = pd.date_range(start=start_date, end=end_date, freq='D') + all_downloads_count = len(previous) + for iteration_datetime in tqdm(date_ranges): + new_downloads = _anaconda_package_data_by_day( + year=iteration_datetime.year, + month=iteration_datetime.month, + day=iteration_datetime.day, + pkg_names=projects, + ) + if len(new_downloads) > 0: + # Keep only the newest data (on a per day basis) for all packages + previous = previous[previous[TIME_COLUMN].dt.date != iteration_datetime.date()] + previous = pd.concat([previous, new_downloads], ignore_index=True) + + previous = previous.sort_values(TIME_COLUMN) + LOGGER.info('Obtained %s new downloads', all_downloads_count - len(previous)) + + if verbose: + LOGGER.info(f'{PREVIOUS_ANACONDA_FILENAME} tail') + LOGGER.info(previous.tail(5).to_string()) + LOGGER.info(f'{PREVIOUS_ANACONDA_ORG_OVERALL_FILENAME} tail') + LOGGER.info(overall_df.tail(5).to_string()) + LOGGER.info(f'{PREVIOUS_ANACONDA_ORG_VERSION_FILENAME} tail') + LOGGER.info(version_downloads.tail(5).to_string()) + + if not dry_run: + gfolder_path = f'{output_folder}/{PREVIOUS_ANACONDA_FILENAME}' + create_csv(output_path=gfolder_path, data=previous) + + gfolder_path = f'{output_folder}/{PREVIOUS_ANACONDA_ORG_OVERALL_FILENAME}' + create_csv(output_path=gfolder_path, data=overall_df) + + gfolder_path = f'{output_folder}/{PREVIOUS_ANACONDA_ORG_VERSION_FILENAME}' + create_csv(output_path=gfolder_path, data=version_downloads) + + return None diff --git a/download_analytics/main.py b/download_analytics/main.py index 0f4cbaa..69f6f73 100644 --- a/download_analytics/main.py +++ b/download_analytics/main.py @@ -3,8 +3,9 @@ import logging from download_analytics.metrics import compute_metrics -from download_analytics.output import create_csv, get_path, load_csv +from download_analytics.output import create_csv, get_path from download_analytics.pypi import get_pypi_downloads +from download_analytics.summarize import get_previous_pypi_downloads LOGGER = logging.getLogger(__name__) @@ -47,10 +48,10 @@ def collect_downloads( if not projects: raise ValueError('No projects have been passed') - LOGGER.info(f'Collecting downloads for projects={projects}') + LOGGER.info(f'Collecting new downloads for projects={projects}') csv_path = get_path(output_folder, 'pypi.csv') - previous = load_csv(csv_path, dry_run=dry_run) + previous = get_previous_pypi_downloads(input_file=None, output_folder=output_folder) pypi_downloads = get_pypi_downloads( projects=projects, diff --git a/download_analytics/output.py b/download_analytics/output.py index 9c53952..a65c5f4 100644 --- a/download_analytics/output.py +++ b/download_analytics/output.py @@ -154,7 +154,7 @@ def load_spreadsheet(spreadsheet): return sheets -def load_csv(csv_path, dry_run=False): +def load_csv(csv_path, read_csv_kwargs=None): """Load a CSV previously created by download-analytics. Args: @@ -169,31 +169,17 @@ def load_csv(csv_path, dry_run=False): csv_path += '.csv' LOGGER.info('Trying to load CSV file %s', csv_path) + if not read_csv_kwargs: + read_csv_kwargs = {} try: - read_csv_kwargs = { - 'parse_dates': ['timestamp'], - 'dtype': { - 'country_code': pd.CategoricalDtype(), - 'project': pd.CategoricalDtype(), - 'version': pd.CategoricalDtype(), - 'type': pd.CategoricalDtype(), - 'installer_name': pd.CategoricalDtype(), - 'implementation_name': pd.CategoricalDtype(), - 'implementation_version': pd.CategoricalDtype(), - 'distro_name': pd.CategoricalDtype(), - 'distro_version': pd.CategoricalDtype(), - 'system_name': pd.CategoricalDtype(), - 'system_release': pd.CategoricalDtype(), - 'cpu': pd.CategoricalDtype(), - }, - } if drive.is_drive_path(csv_path): folder, filename = drive.split_drive_path(csv_path) stream = drive.download(folder, filename) data = pd.read_csv(stream, **read_csv_kwargs) else: data = pd.read_csv(csv_path, **read_csv_kwargs) - data['version'] = data['version'].apply(parse) + if 'version' in data.columns: + data['version'] = data['version'].apply(parse) except FileNotFoundError: LOGGER.info('Failed to load CSV file %s: not found', csv_path) return None @@ -201,3 +187,8 @@ def load_csv(csv_path, dry_run=False): LOGGER.info('Loaded CSV %s', csv_path) return data + + +def append_row(df, row): + """Append a dictionary as a row to a DataFrame.""" + return pd.concat([df, pd.DataFrame(data=row)], ignore_index=True) diff --git a/download_analytics/summarize.py b/download_analytics/summarize.py index a86491a..c2c12c7 100644 --- a/download_analytics/summarize.py +++ b/download_analytics/summarize.py @@ -6,7 +6,7 @@ import pandas as pd from packaging.version import Version -from download_analytics.output import create_spreadsheet, get_path, load_csv +from download_analytics.output import append_row, create_spreadsheet, get_path, load_csv from download_analytics.time_utils import get_current_year, get_min_max_dt_in_year TOTAL_COLUMN_NAME = 'Total Since Beginning' @@ -76,19 +76,27 @@ def _sum_counts(base_count, dep_to_count, parent_to_count): return base_count + sum(parent_to_count.values()) + sum(dep_to_count.values()) -def append_row(df, row): - """Append a dictionary as a row to a DataFrame.""" - return pd.concat([df, pd.DataFrame(data=row)], ignore_index=True) - - -def get_downloads(input_file, output_folder, dry_run): +def get_previous_pypi_downloads(input_file, output_folder): """Read pypi.csv and return a DataFrame of the downloads.""" - if input_file: - downloads = load_csv(input_file, dry_run=dry_run) - else: - csv_path = get_path(output_folder, 'pypi.csv') - downloads = load_csv(csv_path, dry_run=dry_run) - return downloads + csv_path = input_file or get_path(output_folder, 'pypi.csv') + read_csv_kwargs = { + 'parse_dates': ['timestamp'], + 'dtype': { + 'country_code': pd.CategoricalDtype(), + 'project': pd.CategoricalDtype(), + 'version': pd.CategoricalDtype(), + 'type': pd.CategoricalDtype(), + 'installer_name': pd.CategoricalDtype(), + 'implementation_name': pd.CategoricalDtype(), + 'implementation_version': pd.CategoricalDtype(), + 'distro_name': pd.CategoricalDtype(), + 'distro_version': pd.CategoricalDtype(), + 'system_name': pd.CategoricalDtype(), + 'system_release': pd.CategoricalDtype(), + 'cpu': pd.CategoricalDtype(), + }, + } + return load_csv(csv_path, read_csv_kwargs=read_csv_kwargs) def _ecosystem_count_by_year(downloads, base_project, dependency_projects, parent_projects): @@ -204,7 +212,7 @@ def summarize_downloads( `gdrive://{folder_id}`. """ - downloads = get_downloads(input_file, output_folder, dry_run) + downloads = get_previous_pypi_downloads(input_file, output_folder) vendor_df = pd.DataFrame.from_records(vendors) all_df = _create_all_df() diff --git a/download_analytics/time_utils.py b/download_analytics/time_utils.py index dea0ad1..7e0b1e5 100644 --- a/download_analytics/time_utils.py +++ b/download_analytics/time_utils.py @@ -2,6 +2,9 @@ from datetime import datetime +import pandas as pd +from pandas.api.types import is_datetime64_any_dtype + def get_current_year(tz=None): """Get the current year.""" @@ -25,3 +28,41 @@ def get_min_max_dt_in_year(year): min_datetime = get_first_datetime_in_year(year) max_datetime = get_last_datetime_in_year(year) return min_datetime, max_datetime + + +def drop_duplicates_by_date(df, time_column, group_by_columns): + """Keep only the latest record for each day within each group. + + For each unique combination of date and group, retain only the row with the + latest timestamp. This is useful for deduplicating time series data where + multiple records may exist for the same day. + + Args: + df (pd.DataFrame): Input DataFrame containing the data to deduplicate. + time_column (str): Name of the column containing timestamp data. + group_by_columns (list[str]): Name of the column to group by when determining duplicates. + + """ + df_copy = df.copy() + date_column = _create_unique_name('date', df_copy.columns.tolist()) + original_dtype = None + if not is_datetime64_any_dtype(df_copy[time_column].dtype): + original_dtype = df_copy[time_column].dtype + df_copy[time_column] = pd.to_datetime(df_copy[time_column], utc=True) + df_copy[date_column] = df_copy[time_column].dt.date + columns = [date_column] + group_by_columns + df_copy = df_copy.loc[df_copy.groupby(columns)[time_column].idxmax()] + df_copy = df_copy.drop(columns=date_column) + if original_dtype: + df_copy[time_column] = df[time_column].astype(original_dtype) + return df_copy + + +def _create_unique_name(name, list_names): + # Copied from https://github.com/sdv-dev/SDV/blob/dcc95725af4f249fc8e9015c6d2617184de95041/sdv/_utils.py#L184 + """Modify the ``name`` parameter if it already exists in the list of names.""" + result = name + while result in list_names: + result += '_' + + return result diff --git a/pyproject.toml b/pyproject.toml index d94144c..9d6836e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,10 @@ dependencies = [ "google-cloud-bigquery-storage", "db-dtypes", "httplib2==0.15.0", # https://stackoverflow.com/questions/59815620/gcloud-upload-httplib2-redirectmissinglocation-redirected-but-the-response-is-m + 'pyarrow', + 's3fs', + 'boto3', + "tqdm", ] [project.urls] diff --git a/tests/unit/test_time_utils.py b/tests/unit/test_time_utils.py index 74298f3..fd6f413 100644 --- a/tests/unit/test_time_utils.py +++ b/tests/unit/test_time_utils.py @@ -1,6 +1,9 @@ from datetime import datetime +import pandas as pd + from download_analytics.time_utils import ( + drop_duplicates_by_date, get_current_year, get_first_datetime_in_year, get_last_datetime_in_year, @@ -51,3 +54,60 @@ def test_get_min_max_dt_in_year(): assert max_dt.month == 12 assert min_dt.microsecond == 0 assert max_dt.microsecond == 999999 + + +def test_drop_duplicates_by_date_basic(): + # Setup + df = pd.DataFrame({ + 'timestamp': ['2023-01-01 10:00:00', '2023-01-01 15:00:00', '2023-01-02 12:00:00'], + 'pkg_name': ['sdv', 'sdv', 'sdv'], + 'counts': [1, 2, 3], + }) + + # Run + result = drop_duplicates_by_date(df, 'timestamp', ['pkg_name']) + + # Assert + assert len(result) == 2 + assert result.iloc[0]['counts'] == 2 + assert result.iloc[1]['counts'] == 3 + + +def test_drop_duplicates_by_date_multiple_groups(): + # Setup + df = pd.DataFrame({ + 'timestamp': [ + '2023-01-01 09:00:00', + '2023-01-01 14:00:00', + '2023-01-01 11:00:00', + '2023-01-01 16:00:00', + ], + 'pkg_name': ['sdv', 'sdv', 'mostlyai', 'mostlyai'], + 'counts': [1, 2, 3, 4], + }) + + # Run + result = drop_duplicates_by_date(df, 'timestamp', ['pkg_name']) + + # Assert + assert len(result) == 2 + sdv_row = result[result['pkg_name'] == 'sdv'].iloc[0] + mostlyai_row = result[result['pkg_name'] == 'mostlyai'].iloc[0] + assert sdv_row['counts'] == 2 + assert mostlyai_row['counts'] == 4 + + +def test_drop_duplicates_by_date_no_duplicates(): + # Setup + df = pd.DataFrame({ + 'timestamp': ['2023-01-01 10:00:00', '2023-01-02 11:00:00', '2023-01-03 12:00:00'], + 'pkg_name': ['sdv', 'sdv', 'mostlyai'], + 'counts': [1, 2, 3], + }) + + # Run + result = drop_duplicates_by_date(df, 'timestamp', ['pkg_name']) + + # Assert + assert len(result) == 3 + pd.testing.assert_frame_equal(result.reset_index(drop=True), df.reset_index(drop=True))