diff --git a/.gitignore b/.gitignore index 1f95da28..355f0a94 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,8 @@ work/ node_modules/ knnIndices/ +async-profiler/ + # emacs temporary files *~ #* diff --git a/README.md b/README.md index e0b89e08..0d5e0767 100644 --- a/README.md +++ b/README.md @@ -194,4 +194,88 @@ python src/python/localrunFacets.py -source facetsWikimediumAll ``` Note that only comparison of taxonomy based facets is supported at the moment. We need to add SSDV facets support -to the sandbox facets module, as well as add support for other facet types to this package. \ No newline at end of file +to the sandbox facets module, as well as add support for other facet types to this package. + + +# Profiling + +Choose the profiler in **`src/python/localconstants.py`**: + +```python +# 'JFR' – default (-XX:StartFlightRecording) +# 'ASYNC' – async-profiler agent + flame-graph +PROFILER_TYPE = 'JFR' +``` + +--- + +#### Quick setup — one command + +#### Note: you will generally have to setup proper permissions for async-profiler to run, see Troubleshooting section below. + +```bash +# from util/ directory +./gradlew setupAsyncProfiler +``` + +This creates + +``` +util/ + └─ async-profiler/ + ├─ bin/ (profiler.sh, jfrconv, …) + └─ lib/ (libasyncProfiler.so | libasyncProfiler.dylib) +``` + +--- + +#### Manual install + +```bash +wget https://github.com/async-profiler/async-profiler/releases/download/v4.0/async-profiler-4.0-linux-x64.tar.gz +tar -xzf async-profiler-4.0-linux-x64.tar.gz +mv async-profiler $LUCENE_BENCH_HOME/util/ +# or set ASYNC_PROFILER_HOME in localconstants.py +``` + +--- + +#### Running a benchmark with async-profiler + +Choose the profiler in **`src/python/localconstants.py`**: + +```python +# enable the profiler +PROFILER_TYPE = 'ASYNC' +``` + +# run any benchmark +```bash +python src/python/localrun.py -source wikimedium10k +``` + +Output (in `logs/`): + +* `*.jfr` — same format as JFR runs +* `*.html` — interactive flame-graph + +--- + +#### Tuning (optional) + +```python +ASYNC_PROFILER_OPTIONS = "interval=10ms,threads" # extra agent args +``` + +Keys `event, alloc, delay, file, log, jfr` are reserved and cannot be +overridden. + +--- + +#### Troubleshooting + +| Symptom | Fix | +|------------------------------------|--------------------------------------------------------------------------------------------------------------------------| +| **Library not found** | Ensure `async-profiler/lib/libasyncProfiler.so` (Linux) or `.dylib` (macOS) exists and `ASYNC_PROFILER_HOME` is correct. | +| **perf permission errors (Linux)** | `sudo sysctl kernel.perf_event_paranoid=1`
`sudo sysctl kernel.kptr_restrict=0` | +| **No flame-graph generated** | Check console for warnings—usually `jfrconv` missing or agent failed to attach. | diff --git a/build.gradle b/build.gradle index bdf8c15a..51202799 100644 --- a/build.gradle +++ b/build.gradle @@ -41,4 +41,66 @@ allprojects { println "cmd: $commandLine" } } -} \ No newline at end of file +} + +task setupAsyncProfiler { + description = 'Download and unpack async-profiler unless its native library is already present.' + group = 'build setup' + + // ─────────── platform detection ─────────── + def version = '4.0' + def os = org.gradle.internal.os.OperatingSystem.current() + def classifier, libName + + if (os.isLinux()) { + classifier = 'linux-x64' + libName = 'libasyncProfiler.so' + } else if (os.isMacOsX()) { + classifier = System.getProperty('os.arch').contains('aarch') + ? 'macos-arm64' + : 'macos-x64' + libName = 'libasyncProfiler.dylib' + } else { + throw new GradleException("async-profiler: unsupported OS '${os.getName()}'") + } + + def downloadUrl = "https://github.com/async-profiler/async-profiler/releases/" + + "download/v${version}/async-profiler-${version}-${classifier}.tar.gz" + + // ─────────── up-to-date checks ─────────── + def targetDir = project.file("${project.projectDir}/async-profiler") + def targetLibFile = project.file("${targetDir}/lib/${libName}") + + outputs.file(targetLibFile) + onlyIf { !targetLibFile.exists() } // skip whole task once extracted + + doLast { + println "▶ Installing async-profiler ${version} (${classifier})" + + // 1) download + def dlFile = project.layout.buildDirectory + .file("tmp/async-profiler-${version}-${classifier}.tgz") + .get().asFile + dlFile.parentFile.mkdirs() + + ant.get(src: downloadUrl, dest: dlFile, verbose: true) + + // 2) extract (flatten leading dir) + copy { + from tarTree(resources.gzip(dlFile)) + into targetDir + includeEmptyDirs = false + eachFile { f -> + f.path = f.path.replaceFirst('^async-profiler-[^/]+/', '') + } + } + + // 3) cleanup + dlFile.delete() + + if (!targetLibFile.exists()) { + throw new GradleException("Extraction finished, but ${libName} not found in ${targetDir}") + } + println "✔ async-profiler ready – native lib at ${targetLibFile}" + } +} diff --git a/src/python/benchUtil.py b/src/python/benchUtil.py index 45949cde..2f67c62d 100644 --- a/src/python/benchUtil.py +++ b/src/python/benchUtil.py @@ -42,6 +42,65 @@ else: print(f"perf executable is {PERF_EXE}; will collect aggregate CPU profiling data") +# Helper function to generate flamegraph using async-profiler tools +def generate_flamegraph(jfr_output_path, async_profiler_dir): + """Generates a flamegraph HTML file from a JFR file.""" + if not constants.ASYNC_PROFILER_HOME: + return + + if not os.path.exists(jfr_output_path): + print(f"Warning: JFR file not found at {jfr_output_path}, cannot generate flamegraph.") + return + + async_profiler_dir = constants.ASYNC_PROFILER_HOME + + flamegraph_file = jfr_output_path.replace(".jfr", ".html") + print(f"Attempting to generate flamegraph: {flamegraph_file}") + + jfrconv_path = os.path.join(async_profiler_dir, "bin", "jfrconv") + + cmd_to_run = None + cmd_desc = "" + + if os.path.exists(jfrconv_path): + cmd_to_run = [jfrconv_path, jfr_output_path, flamegraph_file] + cmd_desc = "jfrconv" + else: + print(f"Error: Could not find jfrconv ('{jfrconv_path}') in {async_profiler_dir}") + print("Flamegraphs will not be generated.") + return + + try: + print(f" Using {cmd_desc}: {' '.join(cmd_to_run)}") + result = subprocess.run(cmd_to_run, check=True, capture_output=True, text=True, timeout=300) # Added timeout + # Print stdout/stderr for debugging + if result.stdout: + print(f" {cmd_desc} stdout:\n{result.stdout}") + if result.stderr: + print(f" {cmd_desc} stderr:\n{result.stderr}") + + if os.path.exists(flamegraph_file) and os.path.getsize(flamegraph_file) > 0: + print(f"✓ Flamegraph created: {flamegraph_file}") + else: + print(f"Error: Flamegraph file not created or is empty after running command: {flamegraph_file}") + if result.returncode == 0: + print(" Command executed successfully, but the output file is missing or empty.") + + except subprocess.CalledProcessError as e: + print(f"Error: Failed to generate flamegraph using {cmd_desc}.") + print(f" Command: {' '.join(e.cmd)}") + print(f" Return code: {e.returncode}") + if e.stdout: + print(f" Stdout:\n{e.stdout}") + if e.stderr: + print(f" Stderr:\n{e.stderr}") + except subprocess.TimeoutExpired: + print(f"Error: Flamegraph generation timed out after 300 seconds.") + print(f" Command: {' '.join(cmd_to_run)}") + except Exception as e: + print(f"Error: An unexpected error occurred during flamegraph generation: {e}") + traceback.print_exc() + PYTHON_MAJOR_VER = sys.version_info.major VMSTAT_PATH = shutil.which("vmstat") @@ -56,7 +115,8 @@ SLOW_SKIP_PCT = 10 # Disregard first N seconds of query tasks for computing avg QPS: -DISCARD_QPS_WARMUP_SEC = 5 +# Set to 0 for very fast benchmarks like wikimedium10k where total runtime < warmup +DISCARD_QPS_WARMUP_SEC = 0 # From the N times we run each task in a single JVM, how do we pick # the single QPS to represent those results: @@ -422,8 +482,9 @@ def collapseDups(hits): reOneGroup = re.compile("group=(.*?) totalHits=(.*?)(?: hits)? groupRelevance=(.*?)$", re.DOTALL) reHeap = re.compile("HEAP: ([0-9]+)$") reLatencyAndStartTime = re.compile(r"^([\d.]+) msec @ ([\d.]+) msec$") -reTasksWinddown = re.compile("^Start of tasks winddown: ([0-9.]+) msec$") -reAvgCPUCores = re.compile("^Average CPU cores used: (-?[0-9.]+)$") +# Allow for leading whitespace/newlines in these lines from Java's println +reTasksWinddown = re.compile(r"^\s*Start of tasks winddown: ([0-9.]+) msec$") +reAvgCPUCores = re.compile(r"^\s*Average CPU cores used: (-?[0-9.]+)$") def parse_times_line(task, line): @@ -435,8 +496,14 @@ def parse_times_line(task, line): def parseResults(resultsFiles): + if len(resultsFiles) == 0: + raise RuntimeError("No result files found") + taskIters = [] heaps = [] + tasksWindownMSs = [] + avgCPUCoresList = [] + for resultsFile in resultsFiles: tasks = [] @@ -469,6 +536,7 @@ def parseResults(resultsFiles): if line.startswith(b"HEAP: "): m = reHeap.match(decode(line)) heaps.append(int(m.group(1))) + continue if line.startswith(b"TASK: cat="): task = SearchTask() @@ -668,10 +736,10 @@ def parseResults(resultsFiles): if tasksWindownMS == -1: raise RuntimeError(f'did not find "Start of tasks winddown: " line in results file {resultsFile}') + tasksWindownMSs.append(tasksWindownMS) + avgCPUCoresList.append(avgCPUCores) - # TODO: why are we returning tasksWindownMS (which is per-result-file) here when - # we were given multiple results files? - return taskIters, heaps, tasksWindownMS, avgCPUCores + return taskIters, heaps, tasksWindownMSs, avgCPUCoresList # Collect task latencies segregated by categories across all the runs of the task @@ -892,6 +960,93 @@ def run(cmd, logFile=None, indent=" ", vmstatLogFile=None, topLogFile=None): reCoreJar = re.compile("lucene-core-[0-9]+\\.[0-9]+\\.[0-9]+(?:-SNAPSHOT)?\\.jar") +def add_async_profiler_args(command_list, jfr_output_path, base_log_path): + """Appends async-profiler agent arguments to the command list.""" + + if not sys.platform.startswith("linux") and not sys.platform.startswith("darwin"): + print("WARNING: async-profiler currently supported only on Linux/macOS – skipping.") + return + + def _merge_user_options(user_opts_raw: str, + fixed_map: dict[str, str], + reserved_flags: set[str]) -> str: + """ + • `user_opts_raw` – raw string from ASYNC_PROFILER_OPTIONS ("foo=1,bar,baz=2") + • `fixed_map` – key/value pairs that must never be overridden + • `reserved_flags` – flag-only options (no =) that must never be overridden + Returns a comma-joined string of SAFE user options, or "". + """ + if not user_opts_raw: + return "" + + safe_items = [] + seen_keys = set() + + for token in (t.strip() for t in user_opts_raw.split(',')): + if not token: + continue + if '=' in token: + key, value = token.split('=', 1) + key = key.strip() + if key in fixed_map: + print(f"WARNING: ignoring async-profiler option '{token}' – " + f"conflicts with reserved setting.") + continue + if key in seen_keys: + continue # ignore duplicate user keys + seen_keys.add(key) + safe_items.append(f"{key}={value.strip()}") + else: + flag = token + if flag in reserved_flags: + print(f"WARNING: ignoring async-profiler flag '{flag}' – " + f"conflicts with reserved setting.") + continue + if flag in seen_keys: + continue + seen_keys.add(flag) + safe_items.append(flag) + + return ",".join(safe_items) + + + lib_name = { + "linux": "libasyncProfiler.so", + "darwin": "libasyncProfiler.dylib" + }["darwin" if sys.platform.startswith("darwin") else "linux"] + + lib_path = os.path.join(constants.ASYNC_PROFILER_HOME, "lib", lib_name) + + if not os.path.exists(lib_path): + print(f"WARNING: {lib_name} not found at {lib_path}; skipping async-profiler.") + return + + profiler_log = f"{base_log_path}.profiler.log" + + fixed_async_args = ( + "start," + f"delay={constants.ASYNC_PROFILER_DELAY_SEC}s," + "event=cpu," + "alloc=512k," + "jfr=true," + f"file={jfr_output_path}," + f"log={profiler_log}" + ) + + reserved_flags = set() + + user_part = _merge_user_options(constants.ASYNC_PROFILER_OPTIONS, + fixed_kv, + reserved_flags) + + all_opts = ",".join( + [f"{k}={v}" for k, v in fixed_kv.items()] + + ([user_part] if user_part else []) + ) + + command_list.append(f"-agentpath:{lib_path}=start,{all_opts}") + print(f" async-profiler log: {profiler_log}") + class RunAlgs: def __init__(self, javaCommand, verifyScores, verifyCounts): self.logCounter = 0 @@ -952,12 +1107,16 @@ def makeIndex(self, id, index, printCharts=False, profilerCount=30, profilerStac jfrOutput = f"{constants.LOGS_DIR}/bench-index-{id}-{index.getName()}.jfr" - # 77: always enable Java Flight Recorder profiling - w( - f"-XX:StartFlightRecording=dumponexit=true,maxsize=250M,settings={constants.BENCH_BASE_DIR}/src/python/profiling.jfc" + f",filename={jfrOutput}", - "-XX:+UnlockDiagnosticVMOptions", - "-XX:+DebugNonSafepoints", - ) + fullLogFile = "%s/%s.%s.log" % (constants.LOGS_DIR, id, index.getName()) + + # Add profiling options + w("-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints") + + if constants.PROFILER_TYPE == 'ASYNC': + add_async_profiler_args(cmd, jfrOutput, fullLogFile) + else: + # Use JFR (default) + w(f"-XX:StartFlightRecording=dumponexit=true,maxsize=250M,settings={constants.BENCH_BASE_DIR}/src/python/profiling.jfc" + f",filename={jfrOutput}") w("perf.Indexer") w("-dirImpl", index.directory) @@ -1046,8 +1205,6 @@ def makeIndex(self, id, index, printCharts=False, profilerCount=30, profilerStac if index.quantizeKNNGraph: w("-quantizeKNNGraph") - fullLogFile = "%s/%s.%s.log" % (constants.LOGS_DIR, id, index.getName()) - print(" log %s" % fullLogFile) t0 = time.time() @@ -1078,6 +1235,8 @@ def makeIndex(self, id, index, printCharts=False, profilerCount=30, profilerStac shutil.rmtree(fullIndexPath) raise + generate_flamegraph(jfrOutput, constants.ASYNC_PROFILER_HOME) + profilerResults = profilerOutput(index.javaCommand, jfrOutput, checkoutToPath(index.checkout), profilerCount, profilerStackSize) return fullIndexPath, fullLogFile, profilerResults, jfrOutput @@ -1184,14 +1343,19 @@ def runSimpleSearchBench(self, iter, id, c, coldRun, seed, staticSeed, filter=No command += [PERF_EXE, "stat", "-dd"] command += c.javaCommand.split() - # 77: always enable Java Flight Recorder profiling - command += [ - f"-XX:StartFlightRecording=dumponexit=true,maxsize=250M,settings={constants.BENCH_BASE_DIR}/src/python/profiling.jfc" + f",filename={constants.LOGS_DIR}/bench-search-{id}-{c.name}-{iter}.jfr", - "-XX:+UnlockDiagnosticVMOptions", - "-XX:+DebugNonSafepoints", - # uncomment the line below to enable remote debugging - # '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=localhost:7891' - ] + jfrOutput = f"{constants.LOGS_DIR}/bench-search-{id}-{c.name}-{iter}.jfr" + + command += ["-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints"] + + # Add profiling options conditionally + if constants.PROFILER_TYPE == 'ASYNC': + add_async_profiler_args(command, jfrOutput, logFile) + else: + # Use JFR (default) + command.append(f"-XX:StartFlightRecording=dumponexit=true,maxsize=250M,settings={constants.BENCH_BASE_DIR}/src/python/profiling.jfc,filename={jfrOutput}") + + # uncomment the line below to enable remote debugging + # command.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=localhost:7891') w = lambda *xs: [command.append(str(x)) for x in xs] w("-classpath", cp) @@ -1291,13 +1455,20 @@ def runSimpleSearchBench(self, iter, id, c, coldRun, seed, staticSeed, filter=No print(line.rstrip()) raise RuntimeError("SearchPerfTest failed; see log %s.stdout" % logFile) + generate_flamegraph(jfrOutput, constants.ASYNC_PROFILER_HOME) + # run(command, logFile + '.stdout', indent=' ') print(" %.1f s" % (time.time() - t0)) # nocommit don't wastefully load/process here too!! - raw_results, heap_base, tasks_winddown_ms, avg_cpu_cores = parseResults([logFile]) + # parseResults returns: taskIters, heaps, durations, avgCPUCores + taskIters, heaps, tasks_winddown_ms_list, avg_cpu_cores_list = parseResults([logFile]) + # Since we only pass one log file, extract the first inner list + raw_results = taskIters[0] + # tasks_winddown_ms is also a list, take the first element + tasks_winddown_ms = tasks_winddown_ms_list[0] qpss = self.compute_qps(raw_results, tasks_winddown_ms) - print(" %.1f actual sustained QPS; %.1f CPU cores used" % (qpss[0], avg_cpu_cores)) + print(" %.1f actual sustained QPS; %.1f CPU cores used" % (qpss, avg_cpu_cores_list[0])) return logFile @@ -1336,57 +1507,77 @@ def computeTaskLatencies(self, inputList, catSet): return resultLatencyMetrics def compute_qps(self, raw_results, tasks_winddown_ms): - # one per JVM iteration - qpss = [] - for tasks in raw_results: - # make full copy -- don't mess up sort of incoming tasks - sorted_tasks = tasks[:] - sorted_tasks.sort(key=lambda x: x.startMsec) - # print(f'{len(sorted_tasks)} tasks:') - by_second = {} - count = 0 - # nocommit couldn't we dynamically infer warmup by looking for second-by-second QPS - keep_after_ms = sorted_tasks[0].startMsec + DISCARD_QPS_WARMUP_SEC * 1000 - for task in sorted_tasks: - # print(f' {task.startMsec} {task.msec}') - finish_time_ms = task.startMsec + task.msec - if finish_time_ms < keep_after_ms: - # print(f' task too early {finish_time_ms} vs {keep_after_ms}') - continue + # raw_results is the list of tasks from a single JVM iteration - if finish_time_ms > tasks_winddown_ms: - # print(f' task too late {finish_time_ms} vs {tasks_winddown_ms}') - continue + # make full copy -- don't mess up sort of incoming tasks + sorted_tasks = raw_results[:] # Use raw_results directly + sorted_tasks.sort(key=lambda x: x.startMsec) + # print(f'{len(sorted_tasks)} tasks:') + by_second = {} + count = 0 + # nocommit couldn't we dynamically infer warmup by looking for second-by-second QPS + if not sorted_tasks: # Handle empty results case + return 0.0 # Or perhaps raise an error or return None + + keep_after_ms = sorted_tasks[0].startMsec + DISCARD_QPS_WARMUP_SEC * 1000 + for task in sorted_tasks: + # print(f' {task.startMsec} {task.msec}') + finish_time_ms = task.startMsec + task.msec + if finish_time_ms < keep_after_ms: + # print(f' task too early {finish_time_ms} vs {keep_after_ms}') + continue - finish_time_sec = int(finish_time_ms / 1000) - by_second[finish_time_sec] = 1 + by_second.get(finish_time_sec, 0) - count += 1 + if finish_time_ms > tasks_winddown_ms: + # print(f' task too late {finish_time_ms} vs {tasks_winddown_ms}') + continue - qps = count / ((tasks_winddown_ms - keep_after_ms) / 1000.0) - qpss.append(qps) + finish_time_sec = int(finish_time_ms / 1000) + by_second[finish_time_sec] = 1 + by_second.get(finish_time_sec, 0) + count += 1 - if False: - # curious to see how QPS varies second by second...: - l = list(by_second.items()) - l.sort() - for sec, count in l: - print(f" {sec:3d}: qps={count}") + # Calculate duration for QPS calculation + duration_sec = (tasks_winddown_ms - keep_after_ms) / 1000.0 + if duration_sec <= 0: + # Avoid division by zero or negative duration + qps = 0.0 # Or handle as appropriate + else: + qps = count / duration_sec - return qpss + if False: + # curious to see how QPS varies second by second...: + l = list(by_second.items()) + l.sort() + for sec, count_per_sec in l: # Renamed count to count_per_sec for clarity + print(f" {sec:3d}: qps={count_per_sec}") + + return qps if False: def only_qps_report(self, baseLogFiles, cmpLogFiles): # nocommit must also validate tasks' results validity - baseRawResults, heapBase, baseTasksWindownMS, baseAvgCpuCores = parseResults(baseLogFiles) - cmpRawResults, heapCmp, cmpTasksWindownMS, cmpAvgCpuCores = parseResults(cmpLogFiles) + baseRawResults, heapBase, baseTasksWindownMSs, baseAvgCpuCores = parseResults(baseLogFiles) + cmpRawResults, heapCmp, cmpTasksWindownMSs, cmpAvgCpuCores = parseResults(cmpLogFiles) + + # Check if the number of results matches the number of durations + if len(baseRawResults) != len(baseTasksWindownMSs): + raise ValueError(f"Mismatch between number of base results ({len(baseRawResults)}) and durations ({len(baseTasksWindownMSs)})") + if len(cmpRawResults) != len(cmpTasksWindownMSs): + raise ValueError(f"Mismatch between number of comparison results ({len(cmpRawResults)}) and durations ({len(cmpTasksWindownMSs)})") + + base_qpss = [] + for tasks, duration_ms in zip(baseRawResults, baseTasksWindownMSs): + qps = self.compute_qps(tasks, duration_ms) + base_qpss.append(qps) - base_qpss = self.compute_qps(baseRawResults, baseTasksWindownMS) - cmp_qpss = self.compute_qps(cmpRawResults, cmpTasksWindownMS) + cmp_qpss = [] + for tasks, duration_ms in zip(cmpRawResults, cmpTasksWindownMSs): + qps = self.compute_qps(tasks, duration_ms) + cmp_qpss.append(qps) def simpleReport(self, baseLogFiles, cmpLogFiles, jira=False, html=False, baseDesc="Standard", cmpDesc=None, writer=sys.stdout.write): - baseRawResults, heapBase, ignore, baseAvgCpuCores = parseResults(baseLogFiles) - cmpRawResults, heapCmp, ignore, cmpAvgCpuCores = parseResults(cmpLogFiles) + baseRawResults, heapBase, ignore_durations, baseAvgCpuCores = parseResults(baseLogFiles) + cmpRawResults, heapCmp, ignore_durations, cmpAvgCpuCores = parseResults(cmpLogFiles) # make sure they got identical results cmpDiffs = compareHits(baseRawResults, cmpRawResults, self.verifyScores, self.verifyCounts) diff --git a/src/python/constants.py b/src/python/constants.py index 0d27d5e4..98897b44 100644 --- a/src/python/constants.py +++ b/src/python/constants.py @@ -142,6 +142,10 @@ SIMILARITY_DEFAULT = "BM25Similarity" MERGEPOLICY_DEFAULT = "LogDocMergePolicy" +# Profiler configuration +# Type of profiler to use: 'JFR' or 'ASYNC' +PROFILER_TYPE = "JFR" + TESTS_LINE_FILE = "/lucene/clean2.svn/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt" TESTS_LINE_FILE = "/lucenedata/from_hudson/hudson.enwiki.random.lines.txt" # TESTS_LINE_FILE = None diff --git a/src/python/initial_setup.py b/src/python/initial_setup.py index 08826836..4128b81a 100755 --- a/src/python/initial_setup.py +++ b/src/python/initial_setup.py @@ -39,12 +39,22 @@ Usage: python initial_setup.py [-download] Options: - -download downloads a 5GB linedoc file + -download downloads a 5GB linedoc file """ DEFAULT_LOCAL_CONST = """ BASE_DIR = '%(base_dir)s' BENCH_BASE_DIR = '%(base_dir)s/%(cwd)s' + +# Type of profiler to use: 'JFR' (default) or 'ASYNC' +PROFILER_TYPE = 'JFR' + +# Path to async-profiler installation directory (required if PROFILER_TYPE is 'ASYNC') +ASYNC_PROFILER_HOME = f"{BENCH_BASE_DIR}/async-profiler" + +# Additional arguments to pass to async-profiler (e.g., "interval=10ms,live") +ASYNC_PROFILER_OPTIONS = 'cstack=vm' + """ diff --git a/src/python/knnPerfTest.py b/src/python/knnPerfTest.py index 1e971aee..afd6faa6 100644 --- a/src/python/knnPerfTest.py +++ b/src/python/knnPerfTest.py @@ -8,11 +8,13 @@ # - report net concurrency utilized in the table import multiprocessing +import os import re import subprocess import sys import benchUtil +from benchUtil import add_async_profiler_args # Import the helper function import constants from common import getLuceneDirFromGradleProperties @@ -130,6 +132,8 @@ def run_knn_benchmark(checkout, values): # parentJoin_meta_file = f"{constants.BASE_DIR}/data/{'cohere-wikipedia'}-metadata.csv" jfr_output = f"{constants.LOGS_DIR}/knn-perf-test.jfr" + # Define a base log path for the profiler log, derived from jfr_output + profiler_base_log_path = f"{constants.LOGS_DIR}/knn-perf-test" cp = benchUtil.classPathToString(benchUtil.getClassPath(checkout) + (f"{constants.BENCH_BASE_DIR}/build",)) cmd = constants.JAVA_EXE.split(" ") + [ @@ -144,7 +148,11 @@ def run_knn_benchmark(checkout, values): ] if DO_PROFILING: - cmd += [f"-XX:StartFlightRecording=dumponexit=true,maxsize=250M,settings={constants.BENCH_BASE_DIR}/src/python/profiling.jfc" + f",filename={jfr_output}"] + if constants.PROFILER_TYPE == 'ASYNC': + add_async_profiler_args(cmd, jfr_output, profiler_base_log_path) + else: + # Use JFR (default) + cmd += [f"-XX:StartFlightRecording=dumponexit=true,maxsize=250M,settings={constants.BENCH_BASE_DIR}/src/python/profiling.jfc" + f",filename={jfr_output}"] cmd += ["knn.KnnGraphTester"]