Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions osbenchmark/resources/sdg-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ settings:
workers: 8
max_file_size_gb: 40
docs_per_chunk: 10000
filename_suffix_begins_at: 0
timeseries_enabled:
timeseries_field: "@timestamp"
timeseries_start_date: "1/1/2024"
timeseries_end_date: "1/31/2024"
timeseries_frequency: "ms"
timeseries_format: "epoch_ms"

CustomGenerationValues:
# For users who want to generate data via a custom Python module
Expand Down
50 changes: 48 additions & 2 deletions osbenchmark/synthetic_data_generator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,58 @@

from pydantic import BaseModel, Field, field_validator

from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner

GB_TO_BYTES = 1024 ** 3

class TimeSeriesConfig(BaseModel):
timeseries_field: str
timeseries_start_date: str
timeseries_end_date: str
timeseries_frequency: str
timeseries_format: str

# pylint: disable = no-self-argument
@field_validator('timeseries_start_date', 'timeseries_end_date', 'timeseries_frequency', 'timeseries_format')
def validate_string_fields(cls, v, info):
"""Validate that timeseries configuration fields are strings"""
if not isinstance(v, str):
field_name = info.field_name.replace('_', ' ').title()
raise ValueError(f"{field_name} requires a string value. Value {v} is not valid.")

# Additional validation for frequency and format fields
if info.field_name == 'timeseries_frequency':
if v not in TimeSeriesPartitioner.AVAILABLE_FREQUENCIES:
raise ValueError(f"Timeseries frequency {v} is not a valid value. Valid values are {TimeSeriesPartitioner.AVAILABLE_FREQUENCIES}")

if info.field_name == 'timeseries_format':
if v not in TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS:
raise ValueError(f"Timeseries format {v} is not a valid value. Valid values are {TimeSeriesPartitioner.VALID_DATETIMESTAMPS_FORMATS}")

return v

# pylint: disable = no-self-argument
@field_validator('timeseries_field')
def validate_timeseries_field(cls, v):
if not v or not v.strip():
raise ValueError("timeseries_field cannot be empty")

# Validate field name format
# OpenSearch field names must start with a letter and contain only alphanumeric, underscores, and periods
if not re.match(r'^[a-zA-Z][a-zA-Z0-9_.]*$', v):
raise ValueError(
f"Invalid timeseries_field '{v}'. Field names must start with a letter "
"and contain only alphanumeric characters, underscores, and periods."
)

return v

class SettingsConfig(BaseModel):
workers: Optional[int] = Field(default_factory=os.cpu_count) # Number of workers recommended to not exceed CPU count
max_file_size_gb: Optional[int] = 40 # Default because some CloudProviders limit the size of files stored
docs_per_chunk: Optional[int] = 10000 # Default based on testing
max_file_size_gb: Optional[int] = 40 # Default because some CloudProviders limit the size of files stored
docs_per_chunk: Optional[int] = 10000 # Default based on testing
filename_suffix_begins_at: Optional[int] = 0 # Start at suffix 0
timeseries_enabled: Optional[TimeSeriesConfig] = None

# pylint: disable = no-self-argument
@field_validator('workers', 'max_file_size_gb', 'docs_per_chunk')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
from types import ModuleType
from typing import Optional, Callable
from typing import Generator

from dask.distributed import Client
from mimesis import Generic
Expand All @@ -19,6 +20,7 @@
from osbenchmark import exceptions
from osbenchmark.synthetic_data_generator.strategies import DataGenerationStrategy
from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata, SDGConfig
from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner

class CustomModuleStrategy(DataGenerationStrategy):
def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SDGConfig, custom_module: ModuleType) -> None:
Expand Down Expand Up @@ -50,18 +52,36 @@ def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SD


# pylint: disable=arguments-differ
def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list ) -> list:
def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list, timeseries_enabled: dict = None, timeseries_windows: list = None) -> list:
"""
Submits workers to generate data chunks and returns Dask futures

Returns: list of Dask Futures
"""
return [dask_client.submit(
self.generate_data_chunk_from_worker, self.custom_module.generate_synthetic_document,
docs_per_chunk, seed) for seed in seeds]
self.logger.info("Generating data across workers")
if timeseries_enabled and timeseries_windows:
futures = []
# pylint: disable=consider-using-enumerate
for _ in range(len(seeds)):
seed = seeds[_]
window = timeseries_windows[_]
future = dask_client.submit(
self.generate_data_chunk_from_worker, self.custom_module.generate_synthetic_document,
docs_per_chunk, seed, timeseries_enabled, window
)

futures.append(future)

return futures
else:
# If not using timeseries approach
return [dask_client.submit(
self.generate_data_chunk_from_worker, self.custom_module.generate_synthetic_document,
docs_per_chunk, seed) for seed in seeds]


# pylint: disable=arguments-renamed
def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, docs_per_chunk: int, seed: Optional[int]) -> list:
def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, docs_per_chunk: int, seed: Optional[int],
timeseries_enabled: dict = None, timeseries_window: set = None) -> list:
"""
This method is submitted to Dask worker and can be thought of as the worker performing a job, which is calling the
custom module's generate_synthetic_document() function to generate documents.
Expand All @@ -80,19 +100,44 @@ def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable,
providers = self._instantiate_all_providers(self.custom_providers)
seeded_providers = self._seed_providers(providers, seed)

return [generate_synthetic_document(providers=seeded_providers, **self.custom_lists) for _ in range(docs_per_chunk)]
if timeseries_enabled and timeseries_enabled.timeseries_field:
synthetic_docs = []
datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window(
window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format
)
for datetimestamp in datetimestamps:
document = generate_synthetic_document(providers=seeded_providers, **self.custom_lists)
try:
document[timeseries_enabled.timeseries_field] = datetimestamp
synthetic_docs.append(document)

except Exception as e:
raise exceptions.DataError(f"Encountered problem when inserting datetimestamps for timeseries data being generated: {e}")

return synthetic_docs

def generate_test_document(self):
else:
return [generate_synthetic_document(providers=seeded_providers, **self.custom_lists) for _ in range(docs_per_chunk)]

def generate_test_document(self, timeseries_enabled: dict = None, timeseries_window: set = None):
providers = self._instantiate_all_providers(self.custom_providers)
providers = self._seed_providers(providers)

try:
document = self.custom_module.generate_synthetic_document(providers=providers, **self.custom_lists)
if timeseries_enabled and timeseries_enabled.timeseries_field:
datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window(
window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format
)
for datetimestamp in datetimestamps:
document[timeseries_enabled.timeseries_field] = datetimestamp

except AttributeError as e:
msg = "Encountered AttributeError when setting up custom_providers and custom_lists. " + \
"It seems that your module might be using custom_lists and custom_providers." + \
f"Please ensure you have provided a custom config with custom_providers and custom_lists: {e}"
raise exceptions.ConfigError(msg)

return document

def _instantiate_all_providers(self, custom_providers):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# GitHub history for details.

import logging
from typing import Optional, Callable, Dict, Any
from typing import Optional, Callable, Dict, Any, Generator
import random
import datetime
import uuid
Expand All @@ -17,9 +17,10 @@
from mimesis.locales import Locale
from mimesis.random import Random

from osbenchmark.exceptions import ConfigError, MappingsError
from osbenchmark import exceptions
from osbenchmark.synthetic_data_generator.strategies import DataGenerationStrategy
from osbenchmark.synthetic_data_generator.models import SyntheticDataGeneratorMetadata, SDGConfig, MappingGenerationValuesConfig
from osbenchmark.synthetic_data_generator.timeseries_partitioner import TimeSeriesPartitioner

class MappingStrategy(DataGenerationStrategy):
def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SDGConfig, index_mapping: dict) -> None:
Expand All @@ -30,18 +31,32 @@ def __init__(self, sdg_metadata: SyntheticDataGeneratorMetadata, sdg_config: SD

self.logger = logging.getLogger(__name__)

def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list ) -> list:
def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list, timeseries_enabled: dict, timeseries_windows: list) -> list:
"""
Submits workers to generate data chunks and returns Dask futures

Returns: list of Dask Futures
"""
futures = [dask_client.submit(self.generate_data_chunk_from_worker, docs_per_chunk, seed) for seed in seeds]
if timeseries_enabled and timeseries_windows:
futures = []
# pylint: disable=consider-using-enumerate
for _ in range(len(seeds)):
seed = seeds[_]
window = timeseries_windows[_]
future = dask_client.submit(
self.generate_data_chunk_from_worker,
docs_per_chunk, seed, timeseries_enabled, window
)

futures.append(future)

else:
futures = [dask_client.submit(self.generate_data_chunk_from_worker, docs_per_chunk, seed) for seed in seeds]

return futures

# pylint: disable=arguments-differ
def generate_data_chunk_from_worker(self, docs_per_chunk: int, seed: Optional[int]) -> list:
def generate_data_chunk_from_worker(self, docs_per_chunk: int, seed: Optional[int], timeseries_enabled: dict = None, timeseries_window: set = None) -> list:
"""
This method is submitted to Dask worker and can be thought of as the worker performing a job, which is calling the
MappingConverter's static method generate_synthetic_document() function to generate documents.
Expand All @@ -57,15 +72,41 @@ def generate_data_chunk_from_worker(self, docs_per_chunk: int, seed: Optional[in
mapping_generator_logic = MappingConverter(self.mapping_generation_values, seed)
mappings_with_generators = mapping_generator_logic.transform_mapping_to_generators(self.index_mapping)

if timeseries_enabled and timeseries_enabled.timeseries_field:
synthetic_docs = []
datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window(
window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format
)
for datetimestamp in datetimestamps:
document = MappingConverter.generate_synthetic_document(mappings_with_generators)
try:
document[timeseries_enabled.timeseries_field] = datetimestamp
synthetic_docs.append(document)

except Exception as e:
raise exceptions.DataError(f"Encountered problem when inserting datetimestamps for timeseries data being generated: {e}")

return synthetic_docs


documents = [MappingConverter.generate_synthetic_document(mappings_with_generators) for _ in range(docs_per_chunk)]

return documents

def generate_test_document(self):
def generate_test_document(self, timeseries_enabled: dict = None, timeseries_window: set = None):
mapping_converter = MappingConverter(self.mapping_generation_values)
converted_mappings = mapping_converter.transform_mapping_to_generators(self.index_mapping)

return MappingConverter.generate_synthetic_document(transformed_mapping=converted_mappings)
document = MappingConverter.generate_synthetic_document(transformed_mapping=converted_mappings)
if timeseries_enabled and timeseries_enabled.timeseries_field:
datetimestamps: Generator = TimeSeriesPartitioner.generate_datetimestamps_from_window(
window=timeseries_window, frequency=timeseries_enabled.timeseries_frequency, format=timeseries_enabled.timeseries_format
)
for datetimestamp in datetimestamps:
document[timeseries_enabled.timeseries_field] = datetimestamp

return document


class MappingConverter:
def __init__(self, mapping_generation_values=None, seed=1):
Expand Down Expand Up @@ -140,7 +181,9 @@ def generate_keyword(self, field_def: Dict[str, Any], **params) -> str:
return f"key_{uuid.uuid4().hex[:8]}"

def generate_long(self, field_def: Dict[str, Any], **params) -> int:
return random.randint(-9223372036854775808, 9223372036854775807)
min = params.get('min', -(2**63 - 1))
max = params.get('max', (2**63 - 1))
return random.randint(min, max)

def generate_integer(self, field_def: Dict[str, Any], **params) -> int:
min = params.get('min', -2147483648)
Expand All @@ -149,13 +192,19 @@ def generate_integer(self, field_def: Dict[str, Any], **params) -> int:
return random.randint(min, max)

def generate_short(self, field_def: Dict[str, Any], **params) -> int:
return random.randint(-32768, 32767)
min = params.get('min', -32768)
max = params.get('max', 32767)
return random.randint(min, max)

def generate_byte(self, field_def: Dict[str, Any], **params) -> int:
return random.randint(-128, 127)
min = params.get('min', -128)
max = params.get('max', 127)
return random.randint(min, max)

def generate_double(self, field_def: Dict[str, Any], **params) -> float:
return random.uniform(-1000000, 1000000)
min = params.get('min', -1e9)
max = params.get('max', 1e9)
return random.uniform(min, max)

def generate_float(self, field_def: Dict[str, Any], **params) -> float:
min = params.get('min', 0)
Expand Down Expand Up @@ -239,7 +288,7 @@ def transform_mapping_to_generators(self, mapping_dict: Dict[str, Any], field_pa
else:
properties = mapping_dict.get("properties", mapping_dict)
except KeyError:
raise MappingsError("OpenSearch mappings provided are invalid. Please ensure it includes 'mappings' or 'properties' fields.")
raise exceptions.MappingsError("OpenSearch mappings provided are invalid. Please ensure it includes 'mappings' or 'properties' fields.")

# Iterate through all the properties in the index mapping
for field_name, field_def in properties.items():
Expand Down Expand Up @@ -272,7 +321,7 @@ def transform_mapping_to_generators(self, mapping_dict: Dict[str, Any], field_pa
else:
self.logger.info("Issue with sdg-config.yml: override for field [%s] specifies non-existent data generator [%s]", current_field_path, gen_name)
msg = f"Issue with sdg-config.yml: override for field [{current_field_path}] specifies non-existent data generator [{gen_name}]"
raise ConfigError(msg)
raise exceptions.ConfigError(msg)
else:
# Check if default_generators has overrides for all instances of a type of generator
generator_override_params = generator_overrides.get(field_type, {})
Expand Down
7 changes: 4 additions & 3 deletions osbenchmark/synthetic_data_generator/strategies/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@
class DataGenerationStrategy(ABC):

@abstractmethod
def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list ) -> list:
def generate_data_chunks_across_workers(self, dask_client: Client, docs_per_chunk: int, seeds: list, timeseries_enabled: dict, timeseries_windows: list) -> list:
"""
Submit requests to generate data chunks across Dask workers

returns: Dask Futures
"""

@abstractmethod
def generate_data_chunk_from_worker(self, logic_function: Callable, docs_per_chunk: int, seed: Optional[int]) -> list:
def generate_data_chunk_from_worker(self, generate_synthetic_document: Callable, docs_per_chunk: int,
seed: Optional[int], timeseries_enabled: dict = None, timeseries_window: set = None) -> list:
"""
Generate chunk of docs with data generation logic for Dask worker

returns: list of documents
"""

@abstractmethod
def generate_test_document(self) -> dict:
def generate_test_document(self, timeseries_enabled: dict = None, timeseries_window: set = None) -> dict:
"""Generate test document from data generation logic"""
Loading
Loading