diff --git a/examples/09_Warmup_Example/README.md b/examples/09_Warmup_Example/README.md new file mode 100644 index 00000000..3c9c2497 --- /dev/null +++ b/examples/09_Warmup_Example/README.md @@ -0,0 +1,47 @@ +# Warmup Example + +This directory shows how to add a warmup phase to offline and online benchmark +configs for [Qwen/Qwen2.5-0.5B-Instruct](https://huggingface.co/Qwen/Qwen2.5-0.5B-Instruct). + +Warmup sends randomly generated requests before the timed run to prime the +endpoint. Those samples complete before `TEST_STARTED`, so they are excluded +from reported throughput and latency metrics. + +## Files + +- `warmup_offline.yaml`: offline max-throughput example +- `warmup_online.yaml`: online Poisson-QPS example + +Both configs use the same `warmup` block: + +```yaml +warmup: + num_samples: 64 + input_seq_length: 256 + output_seq_length: 64 + input_range_ratio: 0.9 + random_seed: 42 +``` + +Warmup data is generated at runtime from random token IDs using the model +tokenizer, so no separate warmup dataset is needed. + +## Run Locally + +With the built-in echo server: + +```bash +python -m inference_endpoint.testing.echo_server --port 8000 +inference-endpoint benchmark from-config -c examples/09_Warmup_Example/warmup_offline.yaml +inference-endpoint benchmark from-config -c examples/09_Warmup_Example/warmup_online.yaml +``` + +Against a real endpoint, point `endpoint_config.endpoints` in the YAML at that +server and run the same commands. + +## Tuning + +- `num_samples`: use enough requests to reach a steady state +- `input_seq_length`: match the typical prompt length of the workload +- `output_seq_length`: match the expected response length +- `input_range_ratio`: use values below `1.0` to add light ISL variation diff --git a/examples/09_Warmup_Example/warmup_offline.yaml b/examples/09_Warmup_Example/warmup_offline.yaml new file mode 100644 index 00000000..8ea56c1d --- /dev/null +++ b/examples/09_Warmup_Example/warmup_offline.yaml @@ -0,0 +1,62 @@ +# Offline Throughput Benchmark with Warmup Phase +# +# The warmup phase issues randomly generated requests before the timed +# performance window starts. This primes the endpoint by: +# - Establishing and reusing TCP connections +# - Filling KV caches to steady-state +# - Triggering JIT compilation in the inference runtime +# +# Warmup samples are excluded from all reported metrics. +name: "warmup-offline-qwen2.5-0.5b" +version: "1.0" +type: "offline" + +# Warmup configuration: runs before the timed performance test. +# Uses randomly generated token sequences; no real dataset required. +warmup: + num_samples: 64 # Number of warmup requests to issue + input_seq_length: 256 # ISL: target input sequence length in tokens + output_seq_length: 64 # OSL: max_new_tokens for warmup requests + input_range_ratio: 0.9 # ISL variance: generates ISL in [256*0.9, 256] + random_seed: 42 + +model_params: + name: "Qwen/Qwen2.5-0.5B-Instruct" + temperature: 0.0 + top_p: 1.0 + max_new_tokens: 128 + +datasets: + - name: cnn_dailymail::llama3_8b + type: "performance" + samples: 18 + parser: + input: prompt + +settings: + runtime: + min_duration_ms: 60000 # 1 minute + max_duration_ms: 360000 # 6 minutes + scheduler_random_seed: 137 + dataloader_random_seed: 111 + n_samples_to_issue: 4 + + load_pattern: + type: "max_throughput" + + client: + workers: 4 + +metrics: + collect: + - "throughput" + - "latency" + - "ttft" + - "tpot" + +endpoint_config: + endpoints: + - "http://localhost:8000" + api_key: null + +report_dir: logs/warmup_offline_fixed diff --git a/examples/09_Warmup_Example/warmup_online.yaml b/examples/09_Warmup_Example/warmup_online.yaml new file mode 100644 index 00000000..a270273a --- /dev/null +++ b/examples/09_Warmup_Example/warmup_online.yaml @@ -0,0 +1,63 @@ +# Online (Sustained QPS) Benchmark with Warmup Phase +# +# The warmup phase issues randomly generated requests before the timed +# performance window starts. This primes the endpoint by: +# - Establishing and reusing TCP connections +# - Filling KV caches to steady-state +# - Triggering JIT compilation in the inference runtime +# +# Warmup samples are excluded from all reported metrics. +name: "warmup-online-qwen2.5-0.5b" +version: "1.0" +type: "online" + +# Warmup configuration: runs before the timed performance test. +# Uses randomly generated token sequences; no real dataset required. +warmup: + num_samples: 32 # Number of warmup requests to issue + input_seq_length: 128 # ISL: target input sequence length in tokens + output_seq_length: 32 # OSL: max_new_tokens for warmup requests + input_range_ratio: 0.8 # ISL variance: generates ISL in [128*0.8, 128] + random_seed: 42 + +model_params: + name: "Qwen/Qwen2.5-0.5B-Instruct" + temperature: 0.0 + top_p: 1.0 + max_new_tokens: 128 + streaming: "on" + +datasets: + - name: cnn_dailymail::llama3_8b + type: "performance" + samples: 13368 + parser: + input: prompt + +settings: + runtime: + min_duration_ms: 60000 # 1 minute + max_duration_ms: 360000 # 6 minutes + scheduler_random_seed: 137 + dataloader_random_seed: 111 + + load_pattern: + type: "poisson" + target_qps: 10.0 + + client: + workers: 4 + +metrics: + collect: + - "throughput" + - "latency" + - "ttft" + - "tpot" + +endpoint_config: + endpoints: + - "http://localhost:8000" + api_key: null + +report_dir: logs/warmup_online diff --git a/src/inference_endpoint/commands/benchmark.py b/src/inference_endpoint/commands/benchmark.py index 25cdc2c9..3aa6c0b7 100644 --- a/src/inference_endpoint/commands/benchmark.py +++ b/src/inference_endpoint/commands/benchmark.py @@ -63,6 +63,7 @@ from inference_endpoint.core.types import QueryResult from inference_endpoint.dataset_manager.dataset import Dataset from inference_endpoint.dataset_manager.factory import DataLoaderFactory +from inference_endpoint.dataset_manager.predefined.random import RandomDataset from inference_endpoint.endpoint_client.config import HTTPClientConfig from inference_endpoint.endpoint_client.cpu_affinity import ( pin_loadgen, @@ -458,6 +459,43 @@ def _run_benchmark( logger.info("Streaming: disabled (auto, offline mode)") config.model_params.streaming = StreamingMode.OFF + # Build warmup dataset if configured + warmup_dataset = None + if config.warmup is not None: + if tokenizer is None: + raise InputValidationError( + "A tokenizer is required to generate the warmup dataset. Ensure model_params.name is set." + ) + + warmup_cfg = config.warmup + try: + warmup_df = RandomDataset.generate( + datasets_dir=None, + force=False, + num_sequences=warmup_cfg.num_samples, + input_seq_length=warmup_cfg.input_seq_length, + input_range_ratio=warmup_cfg.input_range_ratio, + random_seed=warmup_cfg.random_seed, + tokenizer=tokenizer, + ) + except (ValueError, TypeError) as e: + raise InputValidationError( + f"Failed to generate warmup dataset from warmup config: {e}" + ) from e + warmup_dataset = RandomDataset(warmup_df) + # Create a new model params object with the warmup output sequence length + warmup_model_params = config.model_params.model_copy( + update={"max_new_tokens": warmup_cfg.output_seq_length} + ) + warmup_dataset.load( + api_type=config.endpoint_config.api_type, + model_params=warmup_model_params, + ) + logger.info( + f"Warmup dataset ready: {warmup_dataset.num_samples()} samples " + f"(ISL={warmup_cfg.input_seq_length}, OSL={warmup_cfg.output_seq_length})" + ) + # Get dataset - from CLI or from config # TODO: Dataset Logic is not yet fully implemented @@ -543,6 +581,9 @@ def _run_benchmark( total_samples += sum( [dataset.num_samples() * dataset.repeats for dataset in accuracy_datasets] ) + if warmup_dataset is not None: + total_samples += warmup_dataset.num_samples() + duration_s = rt_settings.min_duration_ms / 1000 logger.info( @@ -626,6 +667,7 @@ def _run_benchmark( dataloader, sample_issuer, scheduler, + warmup_dataset=warmup_dataset, name=f"cli_benchmark_{uuid.uuid4().hex[0:8]}", report_dir=report_dir, tokenizer_override=tokenizer, diff --git a/src/inference_endpoint/config/schema.py b/src/inference_endpoint/config/schema.py index 195d9cbb..8c21121e 100644 --- a/src/inference_endpoint/config/schema.py +++ b/src/inference_endpoint/config/schema.py @@ -234,6 +234,29 @@ class AccuracyConfig(BaseModel): num_repeats: int = 1 +class WarmupConfig(BaseModel): + """Configuration for the warmup phase using randomly generated data. + + The warmup phase runs before the timed performance test to prime the + endpoint (warm TCP connections, fill KV caches, trigger JIT compilation). + Uses randomly generated token sequences with configurable ISL and OSL. + + Fields: + num_samples: Number of warmup queries to issue. + input_seq_length: Target input sequence length in tokens (ISL). + output_seq_length: Max output tokens for warmup requests (OSL). + input_range_ratio: ISL variance factor in [0.0, 1.0]. Generates ISL in + the range [input_seq_length * input_range_ratio, input_seq_length]. + random_seed: Seed for reproducible warmup data generation. + """ + + num_samples: int = Field(100, gt=0) + input_seq_length: int = Field(512, gt=0) + output_seq_length: int = Field(128, gt=0) + input_range_ratio: float = Field(1.0, ge=0.0, le=1.0) + random_seed: int = 42 + + class RuntimeConfig(BaseModel): """Runtime configuration from YAML (user-facing). @@ -408,6 +431,7 @@ class BenchmarkConfig(BaseModel): # - True = auto (compute optimal NUMA-aware plan) # - False = disabled (no CPU pinning) enable_cpu_affinity: bool = True + warmup: WarmupConfig | None = None @classmethod def from_yaml_file(cls, path: Path) -> BenchmarkConfig: diff --git a/src/inference_endpoint/dataset_manager/predefined/random/__init__.py b/src/inference_endpoint/dataset_manager/predefined/random/__init__.py index 45e096b0..f2e86a97 100644 --- a/src/inference_endpoint/dataset_manager/predefined/random/__init__.py +++ b/src/inference_endpoint/dataset_manager/predefined/random/__init__.py @@ -37,7 +37,7 @@ def generate( *, num_sequences: int = 1024, input_seq_length: int = 1024, - range_ratio: float = 1.0, + input_range_ratio: float = 1.0, random_seed: int = 42, save_tokenized_data: bool = False, tokenizer: str | PreTrainedTokenizer, @@ -49,7 +49,7 @@ def generate( data = [] # Generate the input sequence lengths given the range ratio input_seq_lengths = rng.integers( - int(input_seq_length * range_ratio), + int(input_seq_length * input_range_ratio), input_seq_length + 1, num_sequences, ) diff --git a/src/inference_endpoint/load_generator/__init__.py b/src/inference_endpoint/load_generator/__init__.py index 94b6309f..65f5ebd5 100644 --- a/src/inference_endpoint/load_generator/__init__.py +++ b/src/inference_endpoint/load_generator/__init__.py @@ -29,6 +29,7 @@ PoissonDistributionScheduler, SampleOrder, Scheduler, + SequentialSampleOrder, WithoutReplacementSampleOrder, WithReplacementSampleOrder, ) @@ -46,6 +47,7 @@ "MaxThroughputScheduler", "PoissonDistributionScheduler", "SampleOrder", + "SequentialSampleOrder", "WithReplacementSampleOrder", "WithoutReplacementSampleOrder", "LoadGenerator", diff --git a/src/inference_endpoint/load_generator/scheduler.py b/src/inference_endpoint/load_generator/scheduler.py index ae691d09..1dc1709e 100644 --- a/src/inference_endpoint/load_generator/scheduler.py +++ b/src/inference_endpoint/load_generator/scheduler.py @@ -168,6 +168,24 @@ def next_sample_index(self) -> int: return self.rng.randint(0, self.n_samples_in_dataset - 1) +class SequentialSampleOrder(SampleOrder): + """Sample ordering without randomness. + + Issues dataset rows in their natural order and wraps around if more samples are + requested than the dataset contains. + """ + + def next_sample_index(self) -> int: + """Get next sample index in dataset order. + + Returns: + Sample index from dataset. + """ + if self.n_samples_in_dataset <= 0: + raise IndexError("Cannot issue samples from an empty dataset") + return self._issued_samples % self.n_samples_in_dataset + + def uniform_delay_fn( max_delay_ns: int = 0, rng: random.Random | None = None ) -> Callable[[], float]: diff --git a/src/inference_endpoint/load_generator/session.py b/src/inference_endpoint/load_generator/session.py index da5faf84..a0c768d7 100644 --- a/src/inference_endpoint/load_generator/session.py +++ b/src/inference_endpoint/load_generator/session.py @@ -26,13 +26,14 @@ from transformers import AutoTokenizer from ..config.runtime_settings import RuntimeSettings +from ..config.schema import LoadPattern, LoadPatternType from ..dataset_manager.dataset import Dataset from ..metrics.recorder import EventRecorder from ..metrics.reporter import MetricsReporter from ..utils.version import get_version_info from .events import SessionEvent from .load_generator import LoadGenerator, SampleIssuer, SchedulerBasedLoadGenerator -from .scheduler import Scheduler, WithoutReplacementSampleOrder +from .scheduler import Scheduler, SequentialSampleOrder, WithoutReplacementSampleOrder logger = logging.getLogger(__name__) @@ -65,6 +66,9 @@ def __init__( self.sample_uuid_map: dict[str, dict[str, int]] | None = None + # Wall-clock nanoseconds spent in the warmup phase; set by _run_warmup. + self.warmup_duration_ns: int | None = None + @property def is_running(self): return self.thread is not None and self.thread.is_alive() @@ -75,6 +79,56 @@ def stop(self) -> None: # wakeup _run_test if needed, short-circuit SHUTDOWN_POLL_INTERVAL_S self.end_event.set() + def _run_warmup( + self, + warmup_generator: LoadGenerator, + max_shutdown_timeout_s: float | None = 300.0, + ) -> None: + """Issue warmup samples and drain responses using the already-open EventRecorder. + + Warmup events land in the shared DB with timestamps before TEST_STARTED, so they + are excluded from all perf metrics. Wall-clock duration is stored on + ``self.warmup_duration_ns``. + + Args: + warmup_generator: Pre-configured load generator for the warmup dataset. + max_shutdown_timeout_s: Drain timeout in seconds. None means wait forever. + """ + warmup_start_ns = time.monotonic_ns() + + self.logger.info("Warmup: issuing samples...") + for _ in warmup_generator: + pass + self.logger.info("Warmup samples issued, waiting for responses to drain...") + + # Enable idle signalling so EventRecorder wakes end_event when all warmup + # responses are received, then reset it for the perf test. + self.event_recorder.should_check_idle = True + drain_start = time.monotonic() + while self.event_recorder.n_inflight_samples != 0: + if ( + max_shutdown_timeout_s is not None + and time.monotonic() - drain_start > max_shutdown_timeout_s + ): + raise TimeoutError( + f"Warmup drain timeout exceeded: " + f"{self.event_recorder.n_inflight_samples} warmup requests " + f"still in-flight after {max_shutdown_timeout_s}s" + ) + if self.stop_requested: + self.logger.info( + f"Early stop requested (pending={self.event_recorder.n_inflight_samples}), shutting down warmup..." + ) + return + self.end_event.wait(timeout=SHUTDOWN_POLL_INTERVAL_S) + + # Reset so end_event is not pre-fired for the performance drain below. + self.event_recorder.should_check_idle = False + self.end_event.clear() + + self.warmup_duration_ns = time.monotonic_ns() - warmup_start_ns + self.logger.info("Warmup complete") + def _run_test( self, perf_test_generator: LoadGenerator, @@ -84,7 +138,9 @@ def _run_test( tokenizer_override: AutoTokenizer | None = None, dump_events_log: bool = False, ): - with self.event_recorder: + # The EventRecorder was opened in start() before warmup ran, so both the warmup + # phase and the perf test share the same DB. We are responsible for closing it. + try: try: EventRecorder.record_event( SessionEvent.TEST_STARTED, @@ -161,7 +217,9 @@ def _run_test( f"Error loading tokenizer for model {model}: {e}" ) tokenizer = None - report = reporter.create_report(tokenizer) + report = reporter.create_report( + tokenizer, warmup_duration_ns=self.warmup_duration_ns + ) # Store report on session so external callers can use it self.report = report @@ -234,6 +292,8 @@ def _run_test( with open(report_path, "w") as f: report.display(fn=f.write, summary_only=False, newline="\n") logger.info(f"Report saved to {report_path}") + finally: + self.event_recorder.close() def wait_for_test_end(self, timeout: float | None = None) -> bool: """ @@ -258,6 +318,7 @@ def start( sample_issuer: SampleIssuer, scheduler: Scheduler, *args, + warmup_dataset: Dataset | None = None, accuracy_datasets: list[Dataset] | None = None, load_generator_cls: type[LoadGenerator] = SchedulerBasedLoadGenerator, name: str | None = None, @@ -273,15 +334,17 @@ def start( dataset: The dataset to use for the performance test. sample_issuer: The sample issuer to use for the session. scheduler: The scheduler to use for the session. + *args: Additional arguments to pass to the load generator constructor. + warmup_dataset: The dataset to use for the warmup test. If None, no warmup test will be run. accuracy_datasets: The datasets to use for the accuracy tests. If None, no accuracy tests will be run. load_generator_cls: The load generator class to use for the session. name: The name of the session. max_shutdown_timeout_s: The maximum timeout to wait for the test to complete after all samples have been issued. - If None, wait indefinitely. (Default: 300.0 seconds) + If None, wait indefinitely. Warmup drain defaults to 300.0 seconds if not specified. report_dir: The path to save the report to. If None, no report will be saved. tokenizer_override: The tokenizer to use for the session. If None, a tokenizer will be automatically selected based on the model name in the runtime settings. - dump_events_csv: Whether to dump the events to a CSV file. Only use for debugging + dump_events_log: Whether to dump the events to a JSONL file. Only use for debugging purposes, as the events database can get quite large. Returns: @@ -290,6 +353,33 @@ def start( session = cls(runtime_settings, session_id=name) load_generator = load_generator_cls(sample_issuer, dataset, scheduler, *args) # type: ignore[arg-type] + # Create warmup generator (max-throughput, issues all samples at t=0) + warmup_generator = None + if warmup_dataset is not None: + warmup_n = warmup_dataset.num_samples() + warmup_rt = RuntimeSettings( + metric_target=runtime_settings.metric_target, + reported_metrics=runtime_settings.reported_metrics, + min_duration_ms=0, + max_duration_ms=None, + n_samples_from_dataset=warmup_n, + n_samples_to_issue=warmup_n, + min_sample_count=warmup_n, + rng_sched=runtime_settings.rng_sched, + rng_sample_index=runtime_settings.rng_sample_index, + load_pattern=LoadPattern(type=LoadPatternType.MAX_THROUGHPUT), + ) + warmup_sched_cls = Scheduler.get_implementation( + LoadPatternType.MAX_THROUGHPUT + ) + warmup_sched = warmup_sched_cls(warmup_rt, SequentialSampleOrder) + warmup_generator = load_generator_cls( + sample_issuer, + warmup_dataset, + warmup_sched, # type: ignore[arg-type] + *args, + ) + # Create accuracy test generators accuracy_test_generators = None if accuracy_datasets: @@ -324,16 +414,32 @@ def start( *args, ) - session.thread = threading.Thread( - target=session._run_test, - args=(load_generator,), - kwargs={ - "accuracy_test_generators": accuracy_test_generators, - "max_shutdown_timeout_s": max_shutdown_timeout_s, - "report_dir": report_dir, - "tokenizer_override": tokenizer_override, - "dump_events_log": dump_events_log, - }, - ) - session.thread.start() + # Open the recorder before warmup so both phases share the same DB. + # _run_test is responsible for closing it when the perf test finishes. + session.event_recorder.__enter__() + thread_started = False + try: + if warmup_generator is not None: + session._run_warmup(warmup_generator, max_shutdown_timeout_s or 300.0) + + if not session.stop_requested: + session.thread = threading.Thread( + target=session._run_test, + args=(load_generator,), + kwargs={ + "accuracy_test_generators": accuracy_test_generators, + "max_shutdown_timeout_s": max_shutdown_timeout_s, + "report_dir": report_dir, + "tokenizer_override": tokenizer_override, + "dump_events_log": dump_events_log, + }, + ) + session.thread.start() + thread_started = True + finally: + # If the thread did not start (warmup stopped early or an exception was + # raised), we own the recorder and must close it here. + if not thread_started: + session.event_recorder.close() + return session diff --git a/src/inference_endpoint/metrics/reporter.py b/src/inference_endpoint/metrics/reporter.py index 5b2471aa..1c6e426a 100644 --- a/src/inference_endpoint/metrics/reporter.py +++ b/src/inference_endpoint/metrics/reporter.py @@ -362,6 +362,7 @@ class Report: tpot: dict[str, float] latency: dict[str, float] output_sequence_lengths: dict[str, int] + warmup_duration_ns: int | None = None tpot_reporting_mode: TPOTReportingMode = TPOTReportingMode.REQUEST_WEIGHTED @functools.cached_property @@ -497,6 +498,8 @@ def display( fn(f"Total samples issued: {self.n_samples_issued}{newline}") fn(f"Total samples completed: {self.n_samples_completed}{newline}") fn(f"Total samples failed: {self.n_samples_failed}{newline}") + if self.warmup_duration_ns is not None: + fn(f"Warmup duration: {self.warmup_duration_ns / 1e9:.2f} seconds{newline}") if self.duration_ns is not None: fn(f"Duration: {self.duration_ns / 1e9:.2f} seconds{newline}") else: @@ -677,6 +680,22 @@ def init_connection(self): raise ValueError(f"Invalid client type: {self.client_type}") self.is_closed = False + @functools.cached_property + def test_started_timestamp_ns(self) -> int: + """Returns the timestamp_ns of the TEST_STARTED event. + + Returns 0 if the event is not found, which acts as an open lower bound + (all samples issued after epoch 0 are included). This keeps the queries + correct for event logs that pre-date warmup support. + """ + result = self.cur_.execute(f""" + SELECT timestamp_ns FROM events + WHERE event_type = '{SessionEvent.TEST_STARTED.value}' + ORDER BY timestamp_ns ASC + LIMIT 1 + """).fetchone() + return int(result[0]) if result else 0 + @functools.cached_property def stop_performance_tracking_timestamp_ns(self) -> float: """Returns the timestamp_ns of the STOP_PERFORMANCE_TRACKING event. @@ -710,16 +729,19 @@ def derive_metric(self, query: str, metric_type: str) -> RollupQueryTable: def derive_TTFT(self) -> RollupQueryTable: stop_ts = self.stop_performance_tracking_timestamp_ns + start_ts = self.test_started_timestamp_ns # Build the HAVING clause conditionally to handle infinity if stop_ts != float("inf"): before_stop_ts_clause = f""" HAVING COUNT(DISTINCT event_type) = 2 + AND MAX(CASE WHEN event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' THEN timestamp_ns END) >= {start_ts} AND MAX(CASE WHEN event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' THEN timestamp_ns END) < {stop_ts} """ else: - before_stop_ts_clause = """ + before_stop_ts_clause = f""" HAVING COUNT(DISTINCT event_type) = 2 + AND MAX(CASE WHEN event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' THEN timestamp_ns END) >= {start_ts} """ return self.derive_metric( @@ -807,6 +829,7 @@ def derive_duration(self, check_malformed: bool = True) -> float | None: # Check if STOP_PERFORMANCE_TRACKING event exists stop_ts = self.stop_performance_tracking_timestamp_ns + start_ts = self.test_started_timestamp_ns if stop_ts != float("inf"): # Build list of sample_uuids with LOADGEN_ISSUE_CALLED before stop_ts @@ -819,6 +842,7 @@ def derive_duration(self, check_malformed: bool = True) -> float | None: FROM events WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' AND timestamp_ns < {stop_ts} + AND timestamp_ns >= {start_ts} ) AND event_type = '{SampleEvent.COMPLETE.value}' """).fetchone() @@ -844,6 +868,7 @@ def derive_duration(self, check_malformed: bool = True) -> float | None: SELECT COUNT(*) AS n_ends, MAX(timestamp_ns) AS end_ts FROM events WHERE event_type = '{SessionEvent.TEST_ENDED.value}' + AND timestamp_ns >= {start_ts} """).fetchone() n_test_ended = test_ended_result[0] @@ -872,16 +897,19 @@ def derive_sample_latency(self) -> RollupQueryTable: RollupQueryTable: A table containing per-sample latencies in nanoseconds. """ stop_ts = self.stop_performance_tracking_timestamp_ns + start_ts = self.test_started_timestamp_ns # HAVING clause is different if there is a STOP_PERFORMANCE_TRACKING event if stop_ts != float("inf"): before_stop_ts_clause = f""" HAVING COUNT(DISTINCT event_type) = 2 + AND MAX(CASE WHEN event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' THEN timestamp_ns END) >= {start_ts} AND MAX(CASE WHEN event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' THEN timestamp_ns END) < {stop_ts} """ else: - before_stop_ts_clause = """ + before_stop_ts_clause = f""" HAVING COUNT(DISTINCT event_type) = 2 + AND MAX(CASE WHEN event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' THEN timestamp_ns END) >= {start_ts} """ return self.derive_metric( @@ -906,17 +934,26 @@ def get_sample_statuses(self) -> dict[int, str]: - "in_flight" (int): The number of samples in flight """ stop_ts = self.stop_performance_tracking_timestamp_ns + start_ts = self.test_started_timestamp_ns - # Build WHERE clause to filter samples issued before stop_ts - where_clause = "" + # Build WHERE clause to filter samples in the performance window if stop_ts != float("inf"): where_clause = f""" WHERE sample_uuid IN ( SELECT sample_uuid FROM events WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' + AND timestamp_ns >= {start_ts} AND timestamp_ns < {stop_ts} ) """ + else: + where_clause = f""" + WHERE sample_uuid IN ( + SELECT sample_uuid FROM events + WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' + AND timestamp_ns >= {start_ts} + ) + """ statuses = self.cur_.execute(f""" SELECT @@ -943,17 +980,22 @@ def get_error_count(self) -> int: Returns: int: The number of distinct failed sample UUIDs. """ + start_ts = self.test_started_timestamp_ns stop_ts = self.stop_performance_tracking_timestamp_ns - where_clause = "" + where_clause = f""" + AND sample_uuid IN ( + SELECT DISTINCT sample_uuid FROM events + WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' + AND timestamp_ns >= {start_ts} + """ if stop_ts != float("inf"): - where_clause = f""" - AND sample_uuid IN ( - SELECT DISTINCT sample_uuid FROM events - WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' + where_clause += f""" AND timestamp_ns < {stop_ts} - ) """ + where_clause += """ + ) + """ return self.cur_.execute(f""" SELECT @@ -977,16 +1019,26 @@ def get_sample_outputs( Returns an empty list if no COMPLETE events are found. """ stop_ts = self.stop_performance_tracking_timestamp_ns + start_ts = self.test_started_timestamp_ns - # Build WHERE clause to filter samples issued before STOP_PERFORMANCE_TRACKING + # Build WHERE clause to filter samples in the performance window if performance_only and stop_ts != float("inf"): before_stop_ts_clause = f""" AND sample_uuid IN ( SELECT sample_uuid FROM events WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' + AND timestamp_ns >= {start_ts} AND timestamp_ns < {stop_ts} ) """ + elif performance_only: + before_stop_ts_clause = f""" + AND sample_uuid IN ( + SELECT sample_uuid FROM events + WHERE event_type = '{SessionEvent.LOADGEN_ISSUE_CALLED.value}' + AND timestamp_ns >= {start_ts} + ) + """ else: before_stop_ts_clause = "" @@ -1275,20 +1327,14 @@ def get_test_started_at(self) -> int | None: Returns: int|None: The timestamp of the TEST_STARTED event in nanoseconds, or None if not found. """ - query = f""" - SELECT timestamp_ns FROM events - WHERE event_type = '{SessionEvent.TEST_STARTED.value}' - ORDER BY timestamp_ns ASC - LIMIT 1""" - result = self.cur_.execute(query).fetchone() - if result and result[0]: - return result[0] - return None + ts = self.test_started_timestamp_ns + return ts if ts != 0 else None def create_report( self, tokenizer: Tokenizer | None = None, tpot_reporting_mode: TPOTReportingMode = TPOTReportingMode.REQUEST_WEIGHTED, + warmup_duration_ns: int | None = None, ) -> Report: """Creates a Report object from the metrics. @@ -1340,6 +1386,7 @@ def create_report( n_samples_completed=sample_statuses["completed"], n_samples_failed=self.get_error_count(), duration_ns=self.derive_duration(), + warmup_duration_ns=warmup_duration_ns, ttft=ttft_summary, tpot=tpot_summary, latency=sample_latency_rollup.summarize(), diff --git a/tests/integration/commands/test_warmup.py b/tests/integration/commands/test_warmup.py new file mode 100644 index 00000000..ed17af90 --- /dev/null +++ b/tests/integration/commands/test_warmup.py @@ -0,0 +1,183 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for warmup phase against echo server.""" + +import argparse +import textwrap +from unittest.mock import MagicMock, patch + +import pytest +from inference_endpoint.commands.benchmark import run_benchmark_command + + +@pytest.fixture +def mock_tokenizer(): + tok = MagicMock() + tok.vocab_size = 1000 + tok.decode.side_effect = lambda ids, **kwargs: " ".join(str(i) for i in ids) + tok.tokenize.side_effect = lambda text, **kwargs: text.split() + return tok + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_warmup_offline_with_echo_server( + mock_http_echo_server, ds_pickle_dataset_path, tmp_path, caplog, mock_tokenizer +): + """Warmup phase runs and completes before the performance test starts.""" + config_yaml = textwrap.dedent(f""" + name: "warmup-test-offline" + version: "1.0" + type: "offline" + + warmup: + num_samples: 4 + input_seq_length: 16 + output_seq_length: 8 + input_range_ratio: 0.9 + random_seed: 42 + + model_params: + name: "Qwen/Qwen2.5-0.5B-Instruct" + temperature: 0.0 + max_new_tokens: 16 + + datasets: + - name: custom + type: "performance" + path: "{ds_pickle_dataset_path}" + parser: + input: prompt + + settings: + runtime: + min_duration_ms: 0 + max_duration_ms: 30000 + n_samples_to_issue: 10 + load_pattern: + type: "max_throughput" + client: + workers: 1 + warmup_connections: 0 + + endpoint_config: + endpoints: + - "{mock_http_echo_server.url}" + api_key: null + + report_dir: "{tmp_path}" + """).strip() + + config_file = tmp_path / "warmup_test.yaml" + config_file.write_text(config_yaml) + + args = argparse.Namespace( + benchmark_mode="from-config", + config=str(config_file), + output=None, + mode=None, + verbose=1, + ) + + with ( + caplog.at_level("INFO"), + patch( + "inference_endpoint.commands.benchmark.AutoTokenizer.from_pretrained", + return_value=mock_tokenizer, + ), + ): + await run_benchmark_command(args) + + log_text = caplog.text + assert "Warmup: issuing samples" in log_text, "Warmup did not start" + assert "Warmup complete" in log_text, "Warmup did not complete" + assert "Estimated QPS:" in log_text, "Performance test did not run after warmup" + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_warmup_online_with_echo_server( + mock_http_echo_server, ds_pickle_dataset_path, tmp_path, caplog, mock_tokenizer +): + """Warmup phase runs before the online (Poisson) performance test.""" + config_yaml = textwrap.dedent(f""" + name: "warmup-test-online" + version: "1.0" + type: "online" + + warmup: + num_samples: 4 + input_seq_length: 16 + output_seq_length: 8 + input_range_ratio: 0.8 + random_seed: 42 + + model_params: + name: "Qwen/Qwen2.5-0.5B-Instruct" + temperature: 0.0 + max_new_tokens: 16 + streaming: "on" + + datasets: + - name: custom + type: "performance" + path: "{ds_pickle_dataset_path}" + parser: + input: prompt + + settings: + runtime: + min_duration_ms: 0 + max_duration_ms: 5000 + load_pattern: + type: "poisson" + target_qps: 50.0 + client: + workers: 1 + warmup_connections: 0 + + endpoint_config: + endpoints: + - "{mock_http_echo_server.url}" + api_key: null + + report_dir: "{tmp_path}" + """).strip() + + config_file = tmp_path / "warmup_online_test.yaml" + config_file.write_text(config_yaml) + + args = argparse.Namespace( + benchmark_mode="from-config", + config=str(config_file), + output=None, + mode=None, + verbose=1, + ) + + with ( + caplog.at_level("INFO"), + patch( + "inference_endpoint.commands.benchmark.AutoTokenizer.from_pretrained", + return_value=mock_tokenizer, + ), + ): + await run_benchmark_command(args) + + log_text = caplog.text + assert "Warmup: issuing samples" in log_text, "Warmup did not start" + assert "Warmup complete" in log_text, "Warmup did not complete" + assert "Estimated QPS:" in log_text, "Performance test did not run after warmup" diff --git a/tests/unit/dataset_manager/test_data_loader.py b/tests/unit/dataset_manager/test_data_loader.py index 4e08bdf9..73862966 100644 --- a/tests/unit/dataset_manager/test_data_loader.py +++ b/tests/unit/dataset_manager/test_data_loader.py @@ -50,8 +50,8 @@ def test_hf_squad_dataset(hf_squad_dataset): @pytest.mark.slow -@pytest.mark.parametrize("range_ratio", [0.5, 0.8, 1.0]) -def test_random_data_loader(range_ratio): +@pytest.mark.parametrize("input_range_ratio", [0.5, 0.8, 1.0]) +def test_random_data_loader(input_range_ratio): num_sequences = 1024 input_seq_length = 1024 random_seed = 42 @@ -60,7 +60,7 @@ def test_random_data_loader(range_ratio): random_data_loader = RandomDataset.get_dataloader( num_sequences=num_sequences, input_seq_length=input_seq_length, - range_ratio=range_ratio, + input_range_ratio=input_range_ratio, random_seed=random_seed, tokenizer=tokenizer, save_tokenized_data=save_tokenized_data, @@ -83,11 +83,11 @@ def test_random_data_loader(range_ratio): # the decoding-encoding which may coalesce some sequences to newer tokens. We use a 0.8 factor to allow for this. # And we allow for a 20% overhead due to the decoding-encoding. assert ( - len(sample["input_tokens"]) > input_seq_length * range_ratio * 0.8 + len(sample["input_tokens"]) > input_seq_length * input_range_ratio * 0.8 and len(sample["input_tokens"]) <= input_seq_length * 1.2 - ), f"Expected {input_seq_length*range_ratio*0.8} to {input_seq_length*0.2} input tokens, got {len(sample["input_tokens"])}" + ), f"Expected {input_seq_length*input_range_ratio*0.8} to {input_seq_length*1.2} input tokens, got {len(sample['input_tokens'])}" assert ( - len(sample["prompt"]) >= 1024 * range_ratio * 0.5 + len(sample["prompt"]) >= 1024 * input_range_ratio * 0.5 and len(sample["prompt"]) <= 7 * 1024 - ), f"Expected length between 1024*range_ratio*0.5 and 1024, got {len(sample["prompt"])}" + ), f"Expected length between 1024*input_range_ratio*0.5 and 7*1024, got {len(sample['prompt'])}" diff --git a/tests/unit/load_generator/test_session.py b/tests/unit/load_generator/test_session.py index afcd9a4b..68fa4c6e 100644 --- a/tests/unit/load_generator/test_session.py +++ b/tests/unit/load_generator/test_session.py @@ -15,7 +15,7 @@ import random from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch import inference_endpoint.metrics as metrics import pytest @@ -157,3 +157,64 @@ def set_pbar(self, pbar: tqdm): assert stats["total_sent"] == 10_000 assert stats["completed"] == 10_000 assert stats["in_flight"] == 0 + + +@pytest.mark.unit +def test_warmup_uses_sequential_sample_order(): + rt_settings = RuntimeSettings( + metrics.Throughput(10), + [metrics.Throughput(10)], + min_duration_ms=1000, + max_duration_ms=10_000, + n_samples_from_dataset=8, + n_samples_to_issue=8, + min_sample_count=1, + rng_sched=random.Random(1234), + rng_sample_index=random.Random(5678), + load_pattern=LoadPattern(type=LoadPatternType.MAX_THROUGHPUT), + ) + + dl = DummyDataLoader(n_samples=8) + sample_issuer = PooledSampleIssuer(lambda n: [str(n)]) + sched = MaxThroughputScheduler(rt_settings, WithoutReplacementSampleOrder) + warmup_dataset = DummyDataLoader(n_samples=4) + + created_schedulers = [] + + class RecordingLoadGenerator: + def __init__(self, sample_issuer, dataloader, scheduler, *args): + self.sample_issuer = sample_issuer + self.dataloader = dataloader + self.scheduler = scheduler + self.args = args + self.uuid_to_index_map = {} + self.name = None + created_schedulers.append(scheduler) + + mock_recorder = MagicMock() + mock_recorder.__enter__ = MagicMock(return_value=mock_recorder) + mock_recorder.close = MagicMock() + + with patch( + "inference_endpoint.load_generator.session.Scheduler.get_implementation", + return_value=MaxThroughputScheduler, + ): + with patch.object(BenchmarkSession, "_run_test", return_value=None): + with patch.object(BenchmarkSession, "_run_warmup", return_value=None): + with patch( + "inference_endpoint.load_generator.session.EventRecorder", + return_value=mock_recorder, + ): + BenchmarkSession.start( + rt_settings, + dl, + sample_issuer, + sched, + warmup_dataset=warmup_dataset, + load_generator_cls=RecordingLoadGenerator, + max_shutdown_timeout_s=0.01, + ) + + assert len(created_schedulers) >= 2 + warmup_scheduler = created_schedulers[1] + assert [next(warmup_scheduler.sample_order) for _ in range(4)] == [0, 1, 2, 3] diff --git a/tests/unit/metrics/test_recorder.py b/tests/unit/metrics/test_recorder.py index 058ee50a..5d2b7757 100644 --- a/tests/unit/metrics/test_recorder.py +++ b/tests/unit/metrics/test_recorder.py @@ -342,7 +342,7 @@ def test_shm_usage(sample_uuids): rec.wait_for_writes() events_created_ev.set() - worker_proc.join(timeout=10) + worker_proc.join(timeout=60) if worker_proc.is_alive(): worker_proc.terminate() worker_proc.join(timeout=1) diff --git a/tests/unit/metrics/test_reporter.py b/tests/unit/metrics/test_reporter.py index db6d97fa..23752697 100644 --- a/tests/unit/metrics/test_reporter.py +++ b/tests/unit/metrics/test_reporter.py @@ -43,6 +43,52 @@ def test_error_counting(events_db): assert reporter.get_error_count() == 1 +@pytest.mark.unit +def test_error_counting_excludes_warmup_errors(tmp_path, sample_uuids): + test_db = str(tmp_path / "test_error_counting_excludes_warmup_errors.db") + warmup_uuid = sample_uuids(1) + perf_error_uuid = sample_uuids(2) + perf_success_uuid = sample_uuids(3) + + with sqlite3_cursor(test_db) as (cursor, conn): + cursor.execute( + "CREATE TABLE IF NOT EXISTS events (sample_uuid VARCHAR(32), event_type VARCHAR(32), timestamp_ns INTEGER, data BLOB)" + ) + cursor.executemany( + "INSERT INTO events (sample_uuid, event_type, timestamp_ns, data) VALUES (?, ?, ?, ?)", + [ + ( + warmup_uuid, + SessionEvent.LOADGEN_ISSUE_CALLED.value, + 1000, + b"", + ), + (warmup_uuid, SessionEvent.ERROR.value, 1010, b"warmup failed"), + ("", SessionEvent.TEST_STARTED.value, 5000, b""), + ( + perf_error_uuid, + SessionEvent.LOADGEN_ISSUE_CALLED.value, + 6000, + b"", + ), + (perf_error_uuid, SessionEvent.ERROR.value, 6010, b"perf failed"), + ( + perf_success_uuid, + SessionEvent.LOADGEN_ISSUE_CALLED.value, + 7000, + b"", + ), + (perf_success_uuid, SampleEvent.COMPLETE.value, 7010, b""), + ("", SessionEvent.STOP_PERFORMANCE_TRACKING.value, 8000, b""), + ("", SessionEvent.TEST_ENDED.value, 9000, b""), + ], + ) + conn.commit() + + with MetricsReporter(test_db) as reporter: + assert reporter.get_error_count() == 1 + + def test_derive_ttft(events_db, sample_uuids): uuid1 = sample_uuids(1) uuid2 = sample_uuids(2) @@ -442,6 +488,7 @@ def test_reporter_json(events_db): "latency", "output_sequence_lengths", "tpot_reporting_mode", + "warmup_duration_ns", "qps", "tps", "test_started_at",