diff --git a/osbenchmark/resources/sdg-config.yml b/osbenchmark/resources/sdg-config.yml index 8c2c09fca..8fa9aa004 100644 --- a/osbenchmark/resources/sdg-config.yml +++ b/osbenchmark/resources/sdg-config.yml @@ -2,6 +2,13 @@ settings: workers: 8 max_file_size_gb: 40 docs_per_chunk: 10000 + filename_suffix_begins_at: 0 + timeseries_enabled: + timeseries_field: "@timestamp" + timeseries_start_date: "1/1/2024" + timeseries_end_date: "1/31/2024" + timeseries_frequency: "ms" + timeseries_format: "epoch_ms" CustomGenerationValues: # For users who want to generate data via a custom Python module diff --git a/osbenchmark/synthetic_data_generator/models.py b/osbenchmark/synthetic_data_generator/models.py index 1075d13e1..2e95104d1 100644 --- a/osbenchmark/synthetic_data_generator/models.py +++ b/osbenchmark/synthetic_data_generator/models.py @@ -12,12 +12,58 @@ from pydantic import BaseModel, Field, field_validator +from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner + GB_TO_BYTES = 1024 ** 3 +class TimeSeriesConfig(BaseModel): + timeseries_field: str + timeseries_start_date: str + timeseries_end_date: str + timeseries_frequency: str + timeseries_format: str + + # pylint: disable = no-self-argument + @field_validator('timeseries_start_date', 'timeseries_end_date', 'timeseries_frequency', 'timeseries_format') + def validate_string_fields(cls, v, info): + """Validate that timeseries configuration fields are strings""" + if not isinstance(v, str): + field_name = info.field_name.replace('_', ' ').title() + raise ValueError(f"{field_name} requires a string value. Value {v} is not valid.") + + # Additional validation for frequency and format fields + if info.field_name == 'timeseries_frequency': + if v not in TimeSeriesPartitioner.AVAILABLE_FREQUENCIES: + raise ValueError(f"Timeseries frequency {v} is not a valid value. Valid values are {TimeSeriesPartitioner.AVAILABLE_FREQUENCIES}") + + if info.field_name == 'timeseries_format': + if v not in TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS: + raise ValueError(f"Timeseries format {v} is not a valid value. Valid values are {TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS}") + + return v + + # pylint: disable = no-self-argument + @field_validator('timeseries_field') + def validate_timeseries_field(cls, v): + if not v or not v.strip(): + raise ValueError("timeseries_field cannot be empty") + + # Validate field name format + # OpenSearch field names must start with a letter and contain only alphanumeric, underscores, and periods + if not re.match(r'^[a-zA-Z][a-zA-Z0-9_.]*$', v): + raise ValueError( + f"Invalid timeseries_field '{v}'. Field names must start with a letter " + "and contain only alphanumeric characters, underscores, and periods." + ) + + return v + class SettingsConfig(BaseModel): workers: Optional[int] = Field(default_factory=os.cpu_count) # Number of workers recommended to not exceed CPU count - max_file_size_gb: Optional[int] = 40 # Default because some CloudProviders limit the size of files stored - docs_per_chunk: Optional[int] = 10000 # Default based on testing + max_file_size_gb: Optional[int] = 40 # Default because some CloudProviders limit the size of files stored + docs_per_chunk: Optional[int] = 10000 # Default based on testing + filename_suffix_begins_at: Optional[int] = 0 # Start at suffix 0 + timeseries_enabled: Optional[TimeSeriesConfig] = None # pylint: disable = no-self-argument @field_validator('workers', 'max_file_size_gb', 'docs_per_chunk') diff --git a/osbenchmark/synthetic_data_generator/strategies/custom_module_strategy.py b/osbenchmark/synthetic_data_generator/strategies/custom_module_strategy.py index 1f790267f..cfe8c7670 100644 --- a/osbenchmark/synthetic_data_generator/strategies/custom_module_strategy.py +++ b/osbenchmark/synthetic_data_generator/strategies/custom_module_strategy.py @@ -9,6 +9,7 @@ import logging from types import ModuleType from typing import Optional, Callable +from typing import Generator from dask.distributed import Client from mimesis import Generic @@ -19,6 +20,7 @@ from osbenchmark import exceptions from osbenchmark.synthetic_data_generator.strategies import DataGenerationStrategy from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata, SDGConfig +from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner class CustomModuleStrategy(DataGenerationStrategy): def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SDGConfig, custom_module: ModuleType) -> None: @@ -50,18 +52,36 @@ def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SD # pylint: disable=arguments-differ - def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list ) -> list: + def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list, timeseries_enabled: dict = None, timeseries_windows: list = None) -> list: """ Submits workers to generate data chunks and returns Dask futures Returns: list of Dask Futures """ - return [dask_client.submit( - self.generate_data_chunk_from_worker, self.custom_module.generate_synthetic_document, - docs_per_chunk, seed) for seed in seeds] + self.logger.info("Generating data across workers") + if timeseries_enabled and timeseries_windows: + futures = [] + # pylint: disable=consider-using-enumerate + for _ in range(len(seeds)): + seed = seeds[_] + window = timeseries_windows[_] + future = dask_client.submit( + self.generate_data_chunk_from_worker, self.custom_module.generate_synthetic_document, + docs_per_chunk, seed, timeseries_enabled, window + ) + + futures.append(future) + + return futures + else: + # If not using timeseries approach + return [dask_client.submit( + self.generate_data_chunk_from_worker, self.custom_module.generate_synthetic_document, + docs_per_chunk, seed) for seed in seeds] + - # pylint: disable=arguments-renamed - def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, docs_per_chunk: int, seed: Optional[int]) -> list: + def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, docs_per_chunk: int, seed: Optional[int], + timeseries_enabled: dict = None, timeseries_window: set = None) -> list: """ This method is submitted to Dask worker and can be thought of as the worker performing a job, which is calling the custom module's generate_synthetic_document() function to generate documents. @@ -80,19 +100,44 @@ def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, providers = self._instantiate_all_providers(self.custom_providers) seeded_providers = self._seed_providers(providers, seed) - return [generate_synthetic_document(providers=seeded_providers, **self.custom_lists) for _ in range(docs_per_chunk)] + if timeseries_enabled and timeseries_enabled.timeseries_field: + synthetic_docs = [] + datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window( + window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format + ) + for datetimestamp in datetimestamps: + document = generate_synthetic_document(providers=seeded_providers, **self.custom_lists) + try: + document[timeseries_enabled.timeseries_field] = datetimestamp + synthetic_docs.append(document) + + except Exception as e: + raise exceptions.DataError(f"Encountered problem when inserting datetimestamps for timeseries data being generated: {e}") + + return synthetic_docs - def generate_test_document(self): + else: + return [generate_synthetic_document(providers=seeded_providers, **self.custom_lists) for _ in range(docs_per_chunk)] + + def generate_test_document(self, timeseries_enabled: dict = None, timeseries_window: set = None): providers = self._instantiate_all_providers(self.custom_providers) providers = self._seed_providers(providers) try: document = self.custom_module.generate_synthetic_document(providers=providers, **self.custom_lists) + if timeseries_enabled and timeseries_enabled.timeseries_field: + datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window( + window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format + ) + for datetimestamp in datetimestamps: + document[timeseries_enabled.timeseries_field] = datetimestamp + except AttributeError as e: msg = "Encountered AttributeError when setting up custom_providers and custom_lists. " + \ "It seems that your module might be using custom_lists and custom_providers." + \ f"Please ensure you have provided a custom config with custom_providers and custom_lists: {e}" raise exceptions.ConfigError(msg) + return document def _instantiate_all_providers(self, custom_providers): diff --git a/osbenchmark/synthetic_data_generator/strategies/mapping_strategy.py b/osbenchmark/synthetic_data_generator/strategies/mapping_strategy.py index 00bfb5711..44f05c9bd 100644 --- a/osbenchmark/synthetic_data_generator/strategies/mapping_strategy.py +++ b/osbenchmark/synthetic_data_generator/strategies/mapping_strategy.py @@ -7,7 +7,7 @@ # GitHub history for details. import logging -from typing import Optional, Callable, Dict, Any +from typing import Optional, Callable, Dict, Any, Generator import random import datetime import uuid @@ -17,9 +17,10 @@ from mimesis.locales import Locale from mimesis.random import Random -from osbenchmark.exceptions import ConfigError, MappingsError +from osbenchmark import exceptions from osbenchmark.synthetic_data_generator.strategies import DataGenerationStrategy from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata, SDGConfig, MappingGenerationValuesConfig +from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner class MappingStrategy(DataGenerationStrategy): def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SDGConfig, index_mapping: dict) -> None: @@ -30,18 +31,32 @@ def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SD self.logger = logging.getLogger(__name__) - def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list ) -> list: + def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list, timeseries_enabled: dict, timeseries_windows: list) -> list: """ Submits workers to generate data chunks and returns Dask futures Returns: list of Dask Futures """ - futures = [dask_client.submit(self.generate_data_chunk_from_worker, docs_per_chunk, seed) for seed in seeds] + if timeseries_enabled and timeseries_windows: + futures = [] + # pylint: disable=consider-using-enumerate + for _ in range(len(seeds)): + seed = seeds[_] + window = timeseries_windows[_] + future = dask_client.submit( + self.generate_data_chunk_from_worker, + docs_per_chunk, seed, timeseries_enabled, window + ) + + futures.append(future) + + else: + futures = [dask_client.submit(self.generate_data_chunk_from_worker, docs_per_chunk, seed) for seed in seeds] return futures # pylint: disable=arguments-differ - def generate_data_chunk_from_worker(self, docs_per_chunk: int, seed: Optional[int]) -> list: + def generate_data_chunk_from_worker(self, docs_per_chunk: int, seed: Optional[int], timeseries_enabled: dict = None, timeseries_window: set = None) -> list: """ This method is submitted to Dask worker and can be thought of as the worker performing a job, which is calling the MappingConverter's static method generate_synthetic_document() function to generate documents. @@ -57,15 +72,41 @@ def generate_data_chunk_from_worker(self, docs_per_chunk: int, seed: Optional[in mapping_generator_logic = MappingConverter(self.mapping_generation_values, seed) mappings_with_generators = mapping_generator_logic.transform_mapping_to_generators(self.index_mapping) + if timeseries_enabled and timeseries_enabled.timeseries_field: + synthetic_docs = [] + datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window( + window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format + ) + for datetimestamp in datetimestamps: + document = MappingConverter.generate_synthetic_document(mappings_with_generators) + try: + document[timeseries_enabled.timeseries_field] = datetimestamp + synthetic_docs.append(document) + + except Exception as e: + raise exceptions.DataError(f"Encountered problem when inserting datetimestamps for timeseries data being generated: {e}") + + return synthetic_docs + + documents = [MappingConverter.generate_synthetic_document(mappings_with_generators) for _ in range(docs_per_chunk)] return documents - def generate_test_document(self): + def generate_test_document(self, timeseries_enabled: dict = None, timeseries_window: set = None): mapping_converter = MappingConverter(self.mapping_generation_values) converted_mappings = mapping_converter.transform_mapping_to_generators(self.index_mapping) - return MappingConverter.generate_synthetic_document(transformed_mapping=converted_mappings) + document = MappingConverter.generate_synthetic_document(transformed_mapping=converted_mappings) + if timeseries_enabled and timeseries_enabled.timeseries_field: + datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window( + window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format + ) + for datetimestamp in datetimestamps: + document[timeseries_enabled.timeseries_field] = datetimestamp + + return document + class MappingConverter: def __init__(self, mapping_generation_values=None, seed=1): @@ -140,7 +181,9 @@ def generate_keyword(self, field_def: Dict[str, Any], **params) -> str: return f"key_{uuid.uuid4().hex[:8]}" def generate_long(self, field_def: Dict[str, Any], **params) -> int: - return random.randint(-9223372036854775808, 9223372036854775807) + min = params.get('min', -(2**63 - 1)) + max = params.get('max', (2**63 - 1)) + return random.randint(min, max) def generate_integer(self, field_def: Dict[str, Any], **params) -> int: min = params.get('min', -2147483648) @@ -149,13 +192,19 @@ def generate_integer(self, field_def: Dict[str, Any], **params) -> int: return random.randint(min, max) def generate_short(self, field_def: Dict[str, Any], **params) -> int: - return random.randint(-32768, 32767) + min = params.get('min', -32768) + max = params.get('max', 32767) + return random.randint(min, max) def generate_byte(self, field_def: Dict[str, Any], **params) -> int: - return random.randint(-128, 127) + min = params.get('min', -128) + max = params.get('max', 127) + return random.randint(min, max) def generate_double(self, field_def: Dict[str, Any], **params) -> float: - return random.uniform(-1000000, 1000000) + min = params.get('min', -1e9) + max = params.get('max', 1e9) + return random.uniform(min, max) def generate_float(self, field_def: Dict[str, Any], **params) -> float: min = params.get('min', 0) @@ -239,7 +288,7 @@ def transform_mapping_to_generators(self, mapping_dict: Dict[str, Any], field_pa else: properties = mapping_dict.get("properties", mapping_dict) except KeyError: - raise MappingsError("OpenSearch mappings provided are invalid. Please ensure it includes 'mappings' or 'properties' fields.") + raise exceptions.MappingsError("OpenSearch mappings provided are invalid. Please ensure it includes 'mappings' or 'properties' fields.") # Iterate through all the properties in the index mapping for field_name, field_def in properties.items(): @@ -272,7 +321,7 @@ def transform_mapping_to_generators(self, mapping_dict: Dict[str, Any], field_pa else: self.logger.info("Issue with sdg-config.yml: override for field [%s] specifies non-existent data generator [%s]", current_field_path, gen_name) msg = f"Issue with sdg-config.yml: override for field [{current_field_path}] specifies non-existent data generator [{gen_name}]" - raise ConfigError(msg) + raise exceptions.ConfigError(msg) else: # Check if default_generators has overrides for all instances of a type of generator generator_override_params = generator_overrides.get(field_type, {}) diff --git a/osbenchmark/synthetic_data_generator/strategies/strategy.py b/osbenchmark/synthetic_data_generator/strategies/strategy.py index 7176c4c6f..b0e39573f 100644 --- a/osbenchmark/synthetic_data_generator/strategies/strategy.py +++ b/osbenchmark/synthetic_data_generator/strategies/strategy.py @@ -14,7 +14,7 @@ class DataGenerationStrategy(ABC): @abstractmethod - def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list ) -> list: + def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list, timeseries_enabled: dict, timeseries_windows: list) -> list: """ Submit requests to generate data chunks across Dask workers @@ -22,7 +22,8 @@ def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chun """ @abstractmethod - def generate_data_chunk_from_worker(self, logic_function: Callable, docs_per_chunk: int, seed: Optional[int]) -> list: + def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, docs_per_chunk: int, + seed: Optional[int], timeseries_enabled: dict = None, timeseries_window: set = None) -> list: """ Generate chunk of docs with data generation logic for Dask worker @@ -30,5 +31,5 @@ def generate_data_chunk_from_worker(self, logic_function: Callable, docs_per_chu """ @abstractmethod - def generate_test_document(self) -> dict: + def generate_test_document(self, timeseries_enabled: dict = None, timeseries_window: set = None) -> dict: """Generate test document from data generation logic""" diff --git a/osbenchmark/synthetic_data_generator/synthetic_data_generator.py b/osbenchmark/synthetic_data_generator/synthetic_data_generator.py index bc6b211c9..095332a46 100644 --- a/osbenchmark/synthetic_data_generator/synthetic_data_generator.py +++ b/osbenchmark/synthetic_data_generator/synthetic_data_generator.py @@ -10,6 +10,7 @@ import logging import time import hashlib +from typing import Generator from dask.distributed import Client, get_client, as_completed from tqdm import tqdm @@ -18,6 +19,7 @@ from osbenchmark.synthetic_data_generator import helpers from osbenchmark.synthetic_data_generator.strategies import DataGenerationStrategy from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata, SDGConfig, GB_TO_BYTES +from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner class SyntheticDataGenerator: def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SDGConfig, strategy: DataGenerationStrategy) -> None: @@ -29,6 +31,7 @@ def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SDG def generate_seeds_for_workers(self, regenerate=False): # This adds latency so might consider deprecating this + seed_generation_start_time = time.time() client = get_client() workers = client.scheduler_info()['workers'] @@ -46,10 +49,41 @@ def generate_seeds_for_workers(self, regenerate=False): seed = int(hash_hex[:8], 16) seeds.append(seed) + seed_generation_end_time = time.time() + self.logger.info("Seed generation took %s seconds", seed_generation_end_time - seed_generation_start_time) return seeds + def setup_timeseries_window(self, timeseries_enabled_settings: dict, workers: int, docs_per_chunk: int, avg_document_size: int, total_size_bytes: int): + self.logger.info("User is using timeseries enabled settings: %s", timeseries_enabled_settings) + # Generate timeseries windows + timeseries_partitioner = TimeSeriesPartitioner( + timeseries_enabled=timeseries_enabled_settings, + workers=workers, + docs_per_chunk=docs_per_chunk, + avg_document_size=avg_document_size, + total_size_bytes=total_size_bytes + ) + timeseries_window = timeseries_partitioner.create_window_generator() + if timeseries_enabled_settings.timeseries_frequency != timeseries_partitioner.frequency: + timeseries_enabled_settings = timeseries_partitioner.get_updated_settings(timeseries_settings=timeseries_enabled_settings) + self.logger.info("TimeSeries Windows Generator: %s", timeseries_window) + + return timeseries_enabled_settings, timeseries_window + def generate_test_document(self): - return self.strategy.generate_test_document() + total_size_bytes: int = self.sdg_metadata.total_size_gb * GB_TO_BYTES + timeseries_enabled_settings: dict = self.sdg_config.settings.timeseries_enabled + if timeseries_enabled_settings: + # Use dummy values for workers, docs_per_chunk, and avg_document_size + timeseries_enabled_settings, timeseries_window = self.setup_timeseries_window( + timeseries_enabled_settings=timeseries_enabled_settings, workers=1, docs_per_chunk=1, + avg_document_size=123, total_size_bytes=total_size_bytes + ) + windows_for_workers = [next(timeseries_window) for _ in range(1)][0] # Just need to get one window for test document + + return self.strategy.generate_test_document(timeseries_enabled_settings, windows_for_workers) + else: + return self.strategy.generate_test_document() def generate_dataset(self): """ @@ -58,18 +92,26 @@ def generate_dataset(self): max_file_size_bytes: int = self.sdg_config.settings.max_file_size_gb * GB_TO_BYTES total_size_bytes: int = self.sdg_metadata.total_size_gb * GB_TO_BYTES docs_per_chunk: int = self.sdg_config.settings.docs_per_chunk + timeseries_enabled_settings: dict = self.sdg_config.settings.timeseries_enabled avg_document_size = helpers.calculate_avg_doc_size(strategy=self.strategy) current_size = 0 docs_written = 0 - file_counter = 0 + file_counter = self.sdg_config.settings.filename_suffix_begins_at if self.sdg_config.settings.filename_suffix_begins_at else 0 generated_dataset_details = [] helpers.check_for_existing_files(self.sdg_metadata.output_path, self.sdg_metadata.index_name) + timeseries_window: Generator = None workers: int = self.sdg_config.settings.workers + if timeseries_enabled_settings: + timeseries_enabled_settings, timeseries_window = self.setup_timeseries_window( + timeseries_enabled_settings=timeseries_enabled_settings, workers=workers, docs_per_chunk=docs_per_chunk, + avg_document_size=avg_document_size, total_size_bytes=total_size_bytes + ) + dask_client = Client(n_workers=workers, threads_per_worker=1) # We keep it to 1 thread because generating random data is CPU intensive self.logger.info("Number of workers to use: [%s]", workers) @@ -106,16 +148,41 @@ def generate_dataset(self): seeds = self.generate_seeds_for_workers(regenerate=True) self.logger.info("Using seeds: %s", seeds) - futures = self.strategy.generate_data_chunks_across_workers(dask_client, docs_per_chunk, seeds) - - writing_start_time = time.time() - for _, data in as_completed(futures, with_results=True): - self.logger.info("Future [%s] completed.", _) - docs_written_from_chunk, written_bytes = helpers.write_chunk(data, file_path) - docs_written += docs_written_from_chunk - current_size += written_bytes - progress_bar.update(written_bytes) - writing_end_time = time.time() + if timeseries_window and timeseries_enabled_settings: + windows_for_workers = [next(timeseries_window) for _ in range(workers)] + self.logger.info("Windows for workers: %s", windows_for_workers) + mp_generation_start_time = time.time() + futures = self.strategy.generate_data_chunks_across_workers( + dask_client, docs_per_chunk, seeds, + timeseries_enabled_settings, windows_for_workers + ) + results = dask_client.gather(futures) + mp_generation_end_time = time.time() + mp_generation_took_time = mp_generation_end_time - mp_generation_start_time + self.logger.info("Futures for generated docs with timestamp took [%s] seconds", mp_generation_took_time) + + ordered_results = TimeSeriesPartitioner.sort_results_by_datetimestamps(results, timeseries_enabled_settings.timeseries_field) + + writing_start_time = time.time() + for i, res in enumerate(ordered_results): + self.logger.info("Writing results [%s/%s]", i+1, len(ordered_results)) + docs_written_from_chunk, written_bytes = helpers.write_chunk(res, file_path) + docs_written += docs_written_from_chunk + current_size += written_bytes + progress_bar.update(written_bytes) + writing_end_time = time.time() + + else: + futures = self.strategy.generate_data_chunks_across_workers(dask_client, docs_per_chunk, seeds) + + writing_start_time = time.time() + for _, data in as_completed(futures, with_results=True): + self.logger.info("Future [%s] completed.", _) + docs_written_from_chunk, written_bytes = helpers.write_chunk(data, file_path) + docs_written += docs_written_from_chunk + current_size += written_bytes + progress_bar.update(written_bytes) + writing_end_time = time.time() generating_took_time = writing_start_time - generation_start_time writing_took_time = writing_end_time - writing_start_time diff --git a/osbenchmark/synthetic_data_generator/timeseries_partitioner.py b/osbenchmark/synthetic_data_generator/timeseries_partitioner.py new file mode 100644 index 000000000..ff279a8f8 --- /dev/null +++ b/osbenchmark/synthetic_data_generator/timeseries_partitioner.py @@ -0,0 +1,225 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. +import logging +import math +from collections import deque +import sys +from typing import Generator +import time + +import pandas as pd + +import osbenchmark.exceptions as exceptions + +class TimeSeriesPartitioner: + + # TODO: Change this into a dictionary that points to which frequencies can have which formats + VALID_DATETIMESTAMPS_FORMATS = [ + "%Y-%m-%d", # 2023-05-20 + "%Y-%m-%dT%H:%M:%S", # 2023-05-20T15:30:45 + "%Y-%m-%dT%H:%M:%S.%f", # 2023-05-20T15:30:45.123456 + "%Y-%m-%d %H:%M:%S", # 2023-05-20 15:30:45 + "%Y-%m-%d %H:%M:%S.%f", # 2023-05-20 15:30:45.123456 + "%d/%m/%Y", # 20/05/2023 + "%m/%d/%Y", # 05/20/2023 + "%d-%m-%Y", # 20-05-2023 + "%m-%d-%Y", # 05-20-2023 + "%d.%m.%Y", # 20.05.2023 + "%Y%m%d", # 20230520 + "%B %d, %Y", # May 20, 2023 + "%b %d, %Y", # May 20, 2023 + "%d %B %Y", # 20 May 2023 + "%d %b %Y", # 20 May 2023 + "%Y %B %d", # 2023 May 20 + "%d/%m/%Y %H:%M", # 20/05/2023 15:30 + "%d/%m/%Y %H:%M:%S", # 20/05/2023 15:30:45 + "%Y-%m-%d %I:%M %p", # 2023-05-20 03:30 PM + "%d.%m.%Y %H:%M", # 20.05.2023 15:30 + "%H:%M", # 15:30 + "%H:%M:%S", # 15:30:45 + "%I:%M %p", # 03:30 PM + "%I:%M:%S %p", # 03:30:45 PM + "%a, %d %b %Y %H:%M:%S", # Sat, 20 May 2023 15:30:45 + "%Y/%m/%d", # 2023/05/20 + "%Y/%m/%d %H:%M:%S", # 2023/05/20 15:30:45 + "%Y%m%d%H%M%S", # 20230520153045 + "epoch_s", # Epoch time in seconds format + "epoch_ms" # Epoch time in ms format + ] + + # TODO: Let's make this a hashmap so that we can ensure the invalid formats are not used (e.g. frequency is updated to ms and format is still seconds) + # These frequencies are based on what is supported in the Pandas library + AVAILABLE_FREQUENCIES = ['B', 'C', 'D', 'h', 'bh', 'cbh', 'min', 's', 'ms'] + + def __init__(self, timeseries_enabled: dict, workers: int, docs_per_chunk: int, avg_document_size: int, total_size_bytes: int): + self.timeseries_enabled = timeseries_enabled + self.workers = workers + self.docs_per_chunk = docs_per_chunk + self.avg_document_size = avg_document_size + self.total_size_bytes = total_size_bytes + + self.timeseries_field = self.timeseries_enabled.timeseries_field + self.start_date = self.timeseries_enabled.timeseries_start_date if self.timeseries_enabled.timeseries_start_date else "1/01/2019" + self.end_date = self.timeseries_enabled.timeseries_end_date if self.timeseries_enabled.timeseries_end_date else "12/31/2019" + self.frequency = self.timeseries_enabled.timeseries_frequency if self.timeseries_enabled.timeseries_frequency else "min" + self.format = self.timeseries_enabled.timeseries_format if self.timeseries_enabled.timeseries_format else "%Y-%m-%dT%H:%M:%S" + self.logger = logging.getLogger(__name__) + + if self.frequency not in TimeSeriesPartitioner.AVAILABLE_FREQUENCIES: + msg = f"Frequency {self.frequency} not found in available frequencies {TimeSeriesPartitioner.AVAILABLE_FREQUENCIES}" + raise exceptions.ConfigError(msg) + + if self.format not in TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS: + msg = f"Format {self.format} not found in available format {TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS}" + raise exceptions.ConfigError(msg) + + def get_updated_settings(self, timeseries_settings) -> dict: + timeseries_settings.model_update(timeseries_field=self.timeseries_field) + timeseries_settings.model_update(timeseries_start_date=self.start_date) + timeseries_settings.model_update(timeseries_end_date=self.end_date) + timeseries_settings.model_update(timeseries_frequency=self.frequency) + timeseries_settings.model_update(timeseries_format=self.format) + + return timeseries_settings + + def create_window_generator(self) -> Generator: + ''' + returns: a list of timestamp pairs where each timestamp pair is a set containing start datetime and end datetime + ''' + # Determine optimal time settings + # Check if number of docs generated will fit in the timestamp. Adjust frequency as needed + expected_number_of_docs = self.total_size_bytes // self.avg_document_size + expected_number_of_docs_with_buffer = math.ceil((expected_number_of_docs * 0.1) + expected_number_of_docs) + + # Get number of timestamps with dates and frequencies + number_of_timestamps = self._count_timestamps(frequency=self.frequency) + + if number_of_timestamps < expected_number_of_docs_with_buffer: + self.logger.info("Number of timestamps generated is less than expected docs generated. Trying to find the optimal frequency") + # ms is the smallest unit of time SDG can generate + if self.frequency == 'ms': + msg = "No finer time frequencies available to try than \"ms\". Please expand dates and frequency accordingly." + self.logger.error(msg) + raise exceptions.ConfigError(msg) + + #TODO: Update the timeseries enabled settings too so downstream isn't confused + optimal_frequency = self._try_other_frequencies(expected_number_of_docs_with_buffer) + if not self._does_user_want_optimal_frequency(user_frequency=self.frequency, optimal_frequency=optimal_frequency): + self.logger.info("User does not want to use optimal frequency and will cancel generation.") + sys.exit(1) + + self.frequency = optimal_frequency + print("Frequency chosen: ", self.frequency) + self.logger.info("Updated frequency to use [%s]", self.frequency) + + # After validating everything, let's return the window generator + return self.generate_datetimestamp_window() + + def generate_datetimestamp_window(self): + current = pd.Timestamp(self.start_date) + end = pd.Timestamp(self.end_date) + freq = pd.Timedelta(f"{self.docs_per_chunk-1}{self.frequency}") # Need to subtract one to include current timestamp. + + while current < end: + window_end = min(current + freq, end) + yield (current, window_end) + current += freq + + @staticmethod + def generate_datetimestamps_from_window(window: set, frequency: str = "min", format: str = "%Y-%m-%dT%H:%M:%S") -> Generator: + if frequency not in TimeSeriesPartitioner.AVAILABLE_FREQUENCIES: + msg = f"Frequency {frequency} not found in available frequencies {TimeSeriesPartitioner.AVAILABLE_FREQUENCIES}" + raise exceptions.ConfigError(msg) + + if format not in TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS: + msg = f"Format {format} not found in available format {TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS}" + raise exceptions.ConfigError(msg) + + try: + start_datetimestamp = window[0] + end_datetimestamp = window[1] + generated_datetimestamps: pd.DatetimeIndex = pd.date_range(start_datetimestamp, end_datetimestamp, freq=frequency) + #TODO: Handle formatting after generating iterator? + if format and format in TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS: + if format == "epoch_s": + generated_datetimestamps = generated_datetimestamps.map(lambda x: int(x.timestamp())) + elif format == "epoch_ms": + generated_datetimestamps = generated_datetimestamps.map(lambda x: int(x.timestamp() * 1000)) + else: + generated_datetimestamps = generated_datetimestamps.strftime(date_format=format) + + return generated_datetimestamps + + except IndexError: + raise exceptions.SystemSetupError("IndexError encountered with accessing datetimestamp from window.") + except Exception: + raise exceptions.SystemSetupError("Unknown error encountered with generating datetimestamps from window.") + + @staticmethod + def sort_results_by_datetimestamps(results: list, timeseries_field: str) -> list: + logger = logging.getLogger(__name__) + logger.info("Length of results: %s", len(results)) + logger.info("Docs in each result: %s ", [len(result) for result in results]) + + + start_time = time.time() + sorted_results = sorted(results, key=lambda chunk: chunk[0][timeseries_field]) + end_time = time.time() + logger.info("Time it took to sort: %s secs", end_time-start_time) + logger.info("First timestamp from all chunks: %s ", [result[0][timeseries_field] for result in sorted_results]) + + return sorted_results + + def _count_timestamps(self, frequency: str) -> int: + if frequency in ["B", "C", "bh", "cbh"]: + try: + return len(pd.date_range(start=self.start_date, end=self.end_date, freq=frequency)) + except Exception as e: + msg = f"Had issues when generating and counting datetimestamps: {e}" + raise exceptions.SystemSetupError(msg) + + else: + # Arithmetically calculate rather than load into memory + start_datetimestamp = pd.Timestamp(self.start_date) + end_datetimestamp = pd.Timestamp(self.end_date) + + offset = pd.tseries.frequencies.to_offset(freq=frequency) + delta = end_datetimestamp - start_datetimestamp + count = int(delta / offset) + 1 + return count + + + def _try_other_frequencies(self, expected_number_of_docs_with_buffer: int) -> str: + frequencies_to_try = deque(TimeSeriesPartitioner.AVAILABLE_FREQUENCIES[TimeSeriesPartitioner.AVAILABLE_FREQUENCIES.index(self.frequency)+1:]) + + frequency = "" + while frequencies_to_try: + frequency = frequencies_to_try.popleft() + number_of_timestamps = self._count_timestamps(frequency=frequency) + print("Number of docs expected, frequency, and number of timestamps: ", expected_number_of_docs_with_buffer, frequency, number_of_timestamps) + if number_of_timestamps > expected_number_of_docs_with_buffer: + self.logger.info("Using [%s] frequency as this resulted in more timestamps", frequency) + else: + self.logger.info("Using [%s] frequency did not result in more timestamps", frequency) + + return frequency + + def _does_user_want_optimal_frequency(self, user_frequency: str, optimal_frequency: str) -> bool: + valid_responses = ['y', 'yes', 'n', 'no'] + msg = f"The frequency [{optimal_frequency}] is a better option for the number of docs you are trying to generate " + \ + "because the current frequency you've selected does not have enough timestamps to allocate to docs generated." + \ + f"If you prefer your current frequency [{user_frequency}], please extend the time frame. " + \ + f"Would you like to use [{optimal_frequency}] as the frequency? (y/n): " + requested_input = input(msg) + while requested_input.lower() not in valid_responses: + msg = f"Please enter y or n. The frequency [{optimal_frequency}] is a better option for the number of docs you are trying to generate. " + \ + f"If you prefer your current frequency [{user_frequency}], please extend the time frame. " + \ + f"Would you like to use [{optimal_frequency}] as the frequency? (y/n): " + requested_input = input(msg) + + return requested_input.lower() in ['y', 'yes'] diff --git a/osbenchmark/synthetic_data_generator/types.py b/osbenchmark/synthetic_data_generator/types.py new file mode 100644 index 000000000..99ebbbea1 --- /dev/null +++ b/osbenchmark/synthetic_data_generator/types.py @@ -0,0 +1,34 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +import os +from dataclasses import dataclass, field +from typing import Optional + +GB_TO_BYTES = 1024 ** 3 + +DEFAULT_GENERATION_SETTINGS = { + "workers": os.cpu_count(), + "max_file_size_gb": 40, + "docs_per_chunk": 10000, + "filename_suffix_begins_at": 0, + "timeseries_enabled": {} +} + +@dataclass +class SyntheticDataGeneratorMetadata: + index_name: Optional[str] = None + index_mappings_path: Optional[str] = None + custom_module_path: Optional[str] = None + custom_config_path: Optional[str] = None + output_path: Optional[str] = None + total_size_gb: Optional[int] = None + mode: Optional[str] = None + checkpoint: Optional[str] = None + blueprint: dict = None + generators: dict = field(default_factory=dict) diff --git a/tests/synthetic_data_generator/strategies_test.py b/tests/synthetic_data_generator/strategies_test.py index ee65822de..5401820f9 100644 --- a/tests/synthetic_data_generator/strategies_test.py +++ b/tests/synthetic_data_generator/strategies_test.py @@ -145,7 +145,7 @@ def test_avg_doc_size(self, custom_module_strategy): assert isinstance(avg_doc_size, int) def test_generate_data_chunks_across_workers(self, dask_client, custom_module_strategy): - futures_across_workers = custom_module_strategy.generate_data_chunks_across_workers(dask_client, 3, [1,2,3]) + futures_across_workers = custom_module_strategy.generate_data_chunks_across_workers(dask_client, 3, [1,2,3], None, None) docs = [future.result() for future in futures_across_workers] assert len(docs) == 3 @@ -919,7 +919,7 @@ def test_avg_doc_size(self, mapping_strategy_with_basic_mappings): def test_generate_data_chunks_across_workers(self, dask_client, mapping_strategy_with_basic_mappings): fields = ["title", "description", "price", "created_at", "is_available", "category_id", "tags"] - futures_across_workers = mapping_strategy_with_basic_mappings.generate_data_chunks_across_workers(dask_client, 3, [1,2,3]) + futures_across_workers = mapping_strategy_with_basic_mappings.generate_data_chunks_across_workers(dask_client, 3, [1,2,3], None, None) docs = [future.result() for future in futures_across_workers] assert len(docs) == 3 diff --git a/tests/synthetic_data_generator/synthetic_data_generator_test.py b/tests/synthetic_data_generator/synthetic_data_generator_test.py index 9fc3e8cd9..3ab97af1d 100644 --- a/tests/synthetic_data_generator/synthetic_data_generator_test.py +++ b/tests/synthetic_data_generator/synthetic_data_generator_test.py @@ -10,7 +10,7 @@ import pytest from osbenchmark.synthetic_data_generator.synthetic_data_generator import SyntheticDataGenerator -from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata +from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata, SDGConfig class TestSyntheticDataGeneratorWithCustomStrategy: @@ -27,10 +27,25 @@ def setup_sdg_metadata(self): @pytest.fixture def mock_sdg_config(self): - return { - 'providers': {}, - 'lists': {} - } + loaded_sdg_config = { + 'settings': {'workers': 8, 'max_file_size_gb': 1, 'chunk_size': 10000}, + 'CustomGenerationValues': { + 'custom_lists': {'dog_names': ['Hana', 'Youpie', 'Charlie', 'Lucy', 'Cooper', 'Luna', 'Rocky', 'Daisy', 'Buddy', 'Molly'], + 'dog_breeds': ['Jindo', 'Labrador', 'German Shepherd', 'Golden Retriever', 'Bulldog', + 'Poodle', 'Beagle', 'Rottweiler', 'Boxer', 'Dachshund', 'Chihuahua'], + 'treats': ['cookies', 'pup_cup', 'jerky'], 'license_plates': ['WOOF101', 'BARKATAMZN'], + 'tips': ['biscuits', 'cash'], + 'skills': ['sniffing', 'squirrel_chasing', 'bite_tail', 'smile'], + 'vehicle_types': ['sedan', 'suv', 'truck'], 'vehicle_makes': ['toyta', 'honda', 'nissan'], + 'vehicle_models': ['rav4', 'accord', 'murano'], 'vehicle_years': [2012, 2015, 2019], + 'vehicle_colors': ['white', 'red', 'blue', 'black', 'silver'], 'account_status': ['active', 'inactive']}, + 'custom_providers': ['NumericString', 'MultipleChoices'] + } + } + + + sdg_config = SDGConfig(**loaded_sdg_config) + return sdg_config @pytest.fixture def mock_custom_module(self): @@ -104,10 +119,32 @@ def setup_sdg_metadata(self): @pytest.fixture def mock_sdg_config(self): - return { - 'providers': {}, - 'lists': {} - } + loaded_sdg_config = { + 'settings': {'workers': 8, 'max_file_size_gb': 1, 'chunk_size': 10000}, + 'MappingGenerationValues': { + 'generator_overrides': { + 'integer': {'min': 0, 'max': 20}, + 'long': {'min': 0, 'max': 1000}, + 'float': {'min': 0.0, 'max': 1.0}, + 'double': {'min': 0.0, 'max': 2000.0}, + 'date': {'start_date': '2020-01-01', 'end_date': '2023-01-01', 'format': 'yyyy-mm-dd'}, + 'text': {'must_include': ['lorem', 'ipsum']}, + 'keyword': {'choices': ['naruto', 'sakura', 'sasuke']} + }, + 'field_overrides': { + 'id': {'generator': 'generate_keyword', + 'params': {'choices': ['Helly R', 'Mark S', 'Irving B']}}, + 'promo_codes': {'generator': 'generate_keyword', 'params': {'choices': ['HOT_SUMMER', 'TREATSYUM!']}}, + 'preferences.language': {'generator': 'generate_keyword', 'params': {'choices': ['Python', 'English']}}, + 'payment_methods.type': {'generator': 'generate_keyword', 'params': {'choices': ['Visa', 'Mastercard', 'Cash', 'Venmo']}}, + 'preferences.allergies': {'generator': 'generate_keyword', 'params': {'choices': ['Squirrels', 'Cats']}}, + 'favorite_locations.name': {'generator': 'generate_keyword', 'params': {'choices': ['Austin', 'NYC', 'Miami']}} + } + } + } + + sdg_config = SDGConfig(**loaded_sdg_config) + return sdg_config @pytest.fixture def mock_custom_module(self):