diff --git a/codeflash/optimization/function_optimizer.py b/codeflash/optimization/function_optimizer.py index 9933751b7..0791b94aa 100644 --- a/codeflash/optimization/function_optimizer.py +++ b/codeflash/optimization/function_optimizer.py @@ -151,6 +151,9 @@ def __init__( self.valid_optimizations: list[BestOptimization] = ( list() # TODO: Figure out the dataclass type for this # noqa: C408 ) + self.executor = concurrent.futures.ThreadPoolExecutor( + max_workers=N_TESTS_TO_GENERATE + 2 if self.experiment_id is None else N_TESTS_TO_GENERATE + 3 + ) def can_be_optimized(self) -> Result[tuple[bool, CodeOptimizationContext, dict[Path, str]], str]: should_run_experiment = self.experiment_id is not None @@ -376,199 +379,192 @@ def determine_best_candidate( console.rule() candidates = deque(candidates) refinement_done = False + future_all_refinements: list[concurrent.futures.Future] = [] # Start a new thread for AI service request, start loop in main thread # check if aiservice request is complete, when it is complete, append result to the candidates list - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - ai_service_client = self.aiservice_client if exp_type == "EXP0" else self.local_aiservice_client - future_line_profile_results = executor.submit( - ai_service_client.optimize_python_code_line_profiler, - source_code=code_context.read_writable_code, - dependency_code=code_context.read_only_context_code, - trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id, - line_profiler_results=original_code_baseline.line_profile_results["str_out"], - num_candidates=10, - experiment_metadata=ExperimentMetadata( - id=self.experiment_id, group="control" if exp_type == "EXP0" else "experiment" - ) - if self.experiment_id - else None, + ai_service_client = self.aiservice_client if exp_type == "EXP0" else self.local_aiservice_client + future_line_profile_results = self.executor.submit( + ai_service_client.optimize_python_code_line_profiler, + source_code=code_context.read_writable_code, + dependency_code=code_context.read_only_context_code, + trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id, + line_profiler_results=original_code_baseline.line_profile_results["str_out"], + num_candidates=10, + experiment_metadata=ExperimentMetadata( + id=self.experiment_id, group="control" if exp_type == "EXP0" else "experiment" ) - try: - candidate_index = 0 - original_len = len(candidates) - while candidates: - candidate_index += 1 - line_profiler_done = ( - True if future_line_profile_results is None else future_line_profile_results.done() - ) - if line_profiler_done and (future_line_profile_results is not None): - line_profile_results = future_line_profile_results.result() - candidates.extend(line_profile_results) - original_len += len(line_profile_results) - logger.info( - f"Added results from line profiler to candidates, total candidates now: {original_len}" - ) - future_line_profile_results = None - candidate = candidates.popleft() - get_run_tmp_file(Path(f"test_return_values_{candidate_index}.bin")).unlink(missing_ok=True) - get_run_tmp_file(Path(f"test_return_values_{candidate_index}.sqlite")).unlink(missing_ok=True) - logger.info(f"Optimization candidate {candidate_index}/{original_len}:") - code_print(candidate.source_code) - try: - did_update = self.replace_function_and_helpers_with_optimized_code( - code_context=code_context, - optimized_code=candidate.source_code, - original_helper_code=original_helper_code, - ) - if not did_update: - logger.warning( - "No functions were replaced in the optimized code. Skipping optimization candidate." - ) - console.rule() - continue - except (ValueError, SyntaxError, cst.ParserSyntaxError, AttributeError) as e: - logger.error(e) - self.write_code_and_helpers( - self.function_to_optimize_source_code, - original_helper_code, - self.function_to_optimize.file_path, - ) - continue - - run_results = self.run_optimized_candidate( - optimization_candidate_index=candidate_index, - baseline_results=original_code_baseline, + if self.experiment_id + else None, + ) + try: + candidate_index = 0 + original_len = len(candidates) + while candidates: + candidate_index += 1 + line_profiler_done = True if future_line_profile_results is None else future_line_profile_results.done() + if line_profiler_done and (future_line_profile_results is not None): + line_profile_results = future_line_profile_results.result() + candidates.extend(line_profile_results) + original_len += len(line_profile_results) + logger.info(f"Added results from line profiler to candidates, total candidates now: {original_len}") + future_line_profile_results = None + candidate = candidates.popleft() + get_run_tmp_file(Path(f"test_return_values_{candidate_index}.bin")).unlink(missing_ok=True) + get_run_tmp_file(Path(f"test_return_values_{candidate_index}.sqlite")).unlink(missing_ok=True) + logger.info(f"Optimization candidate {candidate_index}/{original_len}:") + code_print(candidate.source_code) + try: + did_update = self.replace_function_and_helpers_with_optimized_code( + code_context=code_context, + optimized_code=candidate.source_code, original_helper_code=original_helper_code, - file_path_to_helper_classes=file_path_to_helper_classes, ) - console.rule() - - if not is_successful(run_results): - optimized_runtimes[candidate.optimization_id] = None - is_correct[candidate.optimization_id] = False - speedup_ratios[candidate.optimization_id] = None - else: - candidate_result: OptimizedCandidateResult = run_results.unwrap() - best_test_runtime = candidate_result.best_test_runtime - optimized_runtimes[candidate.optimization_id] = best_test_runtime - is_correct[candidate.optimization_id] = True - perf_gain = performance_gain( - original_runtime_ns=original_code_baseline.runtime, optimized_runtime_ns=best_test_runtime + if not did_update: + logger.warning( + "No functions were replaced in the optimized code. Skipping optimization candidate." ) - speedup_ratios[candidate.optimization_id] = perf_gain - - tree = Tree(f"Candidate #{candidate_index} - Runtime Information") - benchmark_tree = None - if speedup_critic( - candidate_result, original_code_baseline.runtime, best_runtime_until_now=None - ) and quantity_of_tests_critic(candidate_result): - tree.add( - "This candidate is faster than the original code. 🚀" - ) # TODO: Change this description - tree.add(f"Original summed runtime: {humanize_runtime(original_code_baseline.runtime)}") - tree.add( - f"Best summed runtime: {humanize_runtime(candidate_result.best_test_runtime)} " - f"(measured over {candidate_result.max_loop_count} " - f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" - ) - tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") - tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") - line_profile_test_results = self.line_profiler_step( - code_context=code_context, - original_helper_code=original_helper_code, - candidate_index=candidate_index, - ) - optimized_line_profiler_results[candidate.optimization_id] = line_profile_test_results[ - "str_out" - ] - replay_perf_gain = {} - if self.args.benchmark: - test_results_by_benchmark = ( - candidate_result.benchmarking_test_results.group_by_benchmarks( - self.total_benchmark_timings.keys(), self.replay_tests_dir, self.project_root - ) - ) - if len(test_results_by_benchmark) > 0: - benchmark_tree = Tree("Speedup percentage on benchmarks:") - for benchmark_key, candidate_test_results in test_results_by_benchmark.items(): - original_code_replay_runtime = ( - original_code_baseline.replay_benchmarking_test_results[ - benchmark_key - ].total_passed_runtime() - ) - candidate_replay_runtime = candidate_test_results.total_passed_runtime() - replay_perf_gain[benchmark_key] = performance_gain( - original_runtime_ns=original_code_replay_runtime, - optimized_runtime_ns=candidate_replay_runtime, - ) - benchmark_tree.add(f"{benchmark_key}: {replay_perf_gain[benchmark_key] * 100:.1f}%") - - best_optimization = BestOptimization( - candidate=candidate, - helper_functions=code_context.helper_functions, - code_context=code_context, - runtime=best_test_runtime, - line_profiler_test_results=line_profile_test_results, - winning_behavior_test_results=candidate_result.behavior_test_results, - replay_performance_gain=replay_perf_gain if self.args.benchmark else None, - winning_benchmarking_test_results=candidate_result.benchmarking_test_results, - winning_replay_benchmarking_test_results=candidate_result.benchmarking_test_results, - ) - self.valid_optimizations.append(best_optimization) - else: - tree.add( - f"Summed runtime: {humanize_runtime(best_test_runtime)} " - f"(measured over {candidate_result.max_loop_count} " - f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" - ) - tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") - tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") - console.print(tree) - if self.args.benchmark and benchmark_tree: - console.print(benchmark_tree) console.rule() - + continue + except (ValueError, SyntaxError, cst.ParserSyntaxError, AttributeError) as e: + logger.error(e) self.write_code_and_helpers( self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path ) + continue - if (not len(candidates)) and ( - not line_profiler_done - ): # all original candidates processed but lp results haven't been processed - concurrent.futures.wait([future_line_profile_results]) - line_profile_results = future_line_profile_results.result() - candidates.extend(line_profile_results) - original_len += len(line_profile_results) - logger.info( - f"Added results from line profiler to candidates, total candidates now: {original_len}" + run_results = self.run_optimized_candidate( + optimization_candidate_index=candidate_index, + baseline_results=original_code_baseline, + original_helper_code=original_helper_code, + file_path_to_helper_classes=file_path_to_helper_classes, + ) + console.rule() + + if not is_successful(run_results): + optimized_runtimes[candidate.optimization_id] = None + is_correct[candidate.optimization_id] = False + speedup_ratios[candidate.optimization_id] = None + else: + candidate_result: OptimizedCandidateResult = run_results.unwrap() + best_test_runtime = candidate_result.best_test_runtime + optimized_runtimes[candidate.optimization_id] = best_test_runtime + is_correct[candidate.optimization_id] = True + perf_gain = performance_gain( + original_runtime_ns=original_code_baseline.runtime, optimized_runtime_ns=best_test_runtime + ) + speedup_ratios[candidate.optimization_id] = perf_gain + + tree = Tree(f"Candidate #{candidate_index} - Runtime Information") + benchmark_tree = None + if speedup_critic( + candidate_result, original_code_baseline.runtime, best_runtime_until_now=None + ) and quantity_of_tests_critic(candidate_result): + tree.add("This candidate is faster than the original code. 🚀") # TODO: Change this description + tree.add(f"Original summed runtime: {humanize_runtime(original_code_baseline.runtime)}") + tree.add( + f"Best summed runtime: {humanize_runtime(candidate_result.best_test_runtime)} " + f"(measured over {candidate_result.max_loop_count} " + f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" + ) + tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") + tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") + line_profile_test_results = self.line_profiler_step( + code_context=code_context, + original_helper_code=original_helper_code, + candidate_index=candidate_index, ) - future_line_profile_results = None - - if len(candidates) == 0 and len(self.valid_optimizations) > 0 and not refinement_done: - # TODO: Instead of doing it all at once at the end, do it one by one as the optimizations - # are found. This way we can hide the time waiting for the LLM results. - trace_id = self.function_trace_id - if trace_id.endswith(("EXP0", "EXP1")): - trace_id = trace_id[:-4] + exp_type - # refinement_response is a dataclass with optimization_id, code and explanation - refinement_response = self.refine_optimizations( - valid_optimizations=self.valid_optimizations, - original_code_baseline=original_code_baseline, + optimized_line_profiler_results[candidate.optimization_id] = line_profile_test_results[ + "str_out" + ] + replay_perf_gain = {} + if self.args.benchmark: + test_results_by_benchmark = candidate_result.benchmarking_test_results.group_by_benchmarks( + self.total_benchmark_timings.keys(), self.replay_tests_dir, self.project_root + ) + if len(test_results_by_benchmark) > 0: + benchmark_tree = Tree("Speedup percentage on benchmarks:") + for benchmark_key, candidate_test_results in test_results_by_benchmark.items(): + original_code_replay_runtime = original_code_baseline.replay_benchmarking_test_results[ + benchmark_key + ].total_passed_runtime() + candidate_replay_runtime = candidate_test_results.total_passed_runtime() + replay_perf_gain[benchmark_key] = performance_gain( + original_runtime_ns=original_code_replay_runtime, + optimized_runtime_ns=candidate_replay_runtime, + ) + benchmark_tree.add(f"{benchmark_key}: {replay_perf_gain[benchmark_key] * 100:.1f}%") + + best_optimization = BestOptimization( + candidate=candidate, + helper_functions=code_context.helper_functions, code_context=code_context, - trace_id=trace_id, - ai_service_client=ai_service_client, - executor=executor, + runtime=best_test_runtime, + line_profiler_test_results=line_profile_test_results, + winning_behavior_test_results=candidate_result.behavior_test_results, + replay_performance_gain=replay_perf_gain if self.args.benchmark else None, + winning_benchmarking_test_results=candidate_result.benchmarking_test_results, + winning_replay_benchmarking_test_results=candidate_result.benchmarking_test_results, ) - candidates.extend(refinement_response) - print("Added candidates from refinement") - original_len += len(refinement_response) - refinement_done = True - except KeyboardInterrupt as e: + self.valid_optimizations.append(best_optimization) + # queue corresponding refined optimization for best optimization + if not candidate.optimization_id.endswith("refi"): + future_all_refinements.append( + self.refine_optimizations( + valid_optimizations=[best_optimization], + original_code_baseline=original_code_baseline, + code_context=code_context, + trace_id=self.function_trace_id[:-4] + exp_type + if self.experiment_id + else self.function_trace_id, + ai_service_client=ai_service_client, + executor=self.executor, + ) + ) + else: + tree.add( + f"Summed runtime: {humanize_runtime(best_test_runtime)} " + f"(measured over {candidate_result.max_loop_count} " + f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" + ) + tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") + tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") + console.print(tree) + if self.args.benchmark and benchmark_tree: + console.print(benchmark_tree) + console.rule() + self.write_code_and_helpers( self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path ) - logger.exception(f"Optimization interrupted: {e}") - raise + + if ( + (not len(candidates)) and (not line_profiler_done) + ): # all original candidates processed but lp results haven't been processed, doesn't matter at the moment if we're done refining or not + concurrent.futures.wait([future_line_profile_results]) + line_profile_results = future_line_profile_results.result() + candidates.extend(line_profile_results) + original_len += len(line_profile_results) + logger.info(f"Added results from line profiler to candidates, total candidates now: {original_len}") + future_line_profile_results = None + # all original candidates and lp candidates processed, collect refinement candidates and append to candidate list + if (not len(candidates)) and line_profiler_done and not refinement_done: + # waiting just in case not all calls are finished, nothing else to do + concurrent.futures.wait(future_all_refinements) + refinement_response = [] + for future_refinement in future_all_refinements: + possible_refinement = future_refinement.result() + if len(possible_refinement) > 0: # if the api returns a valid response + refinement_response.append(possible_refinement[0]) + candidates.extend(refinement_response) + logger.info(f"Added {len(refinement_response)} candidates from refinement") + original_len += len(refinement_response) + refinement_done = True + except KeyboardInterrupt as e: + self.write_code_and_helpers( + self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path + ) + logger.exception(f"Optimization interrupted: {e}") + raise if not len(self.valid_optimizations): return None @@ -605,7 +601,7 @@ def refine_optimizations( trace_id: str, ai_service_client: AiServiceClient, executor: concurrent.futures.ThreadPoolExecutor, - ) -> list[OptimizedCandidate]: + ) -> concurrent.futures.Future: request = [ AIServiceRefinerRequest( optimization_id=opt.candidate.optimization_id, @@ -621,10 +617,8 @@ def refine_optimizations( optimized_line_profiler_results=opt.line_profiler_test_results["str_out"], ) for opt in valid_optimizations - ] # TODO: multiple workers for this? - future_refinement_results = executor.submit(ai_service_client.optimize_python_code_refinement, request=request) - concurrent.futures.wait([future_refinement_results]) - return future_refinement_results.result() + ] + return executor.submit(ai_service_client.optimize_python_code_refinement, request=request) def log_successful_optimization( self, explanation: Explanation, generated_tests: GeneratedTestsList, exp_type: str @@ -854,85 +848,79 @@ def generate_tests_and_optimizations( run_experiment: bool = False, # noqa: FBT001, FBT002 ) -> Result[tuple[GeneratedTestsList, dict[str, set[FunctionCalledInTest]], OptimizationSet], str]: assert len(generated_test_paths) == N_TESTS_TO_GENERATE - max_workers = N_TESTS_TO_GENERATE + 2 if not run_experiment else N_TESTS_TO_GENERATE + 3 console.rule() - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit the test generation task as future - future_tests = self.submit_test_generation_tasks( - executor, - testgen_context_code, - [definition.fully_qualified_name for definition in helper_functions], - generated_test_paths, - generated_perf_test_paths, - ) - future_optimization_candidates = executor.submit( - self.aiservice_client.optimize_python_code, + # Submit the test generation task as future + future_tests = self.submit_test_generation_tasks( + self.executor, + testgen_context_code, + [definition.fully_qualified_name for definition in helper_functions], + generated_test_paths, + generated_perf_test_paths, + ) + future_optimization_candidates = self.executor.submit( + self.aiservice_client.optimize_python_code, + read_writable_code, + read_only_context_code, + self.function_trace_id[:-4] + "EXP0" if run_experiment else self.function_trace_id, + N_CANDIDATES, + ExperimentMetadata(id=self.experiment_id, group="control") if run_experiment else None, + ) + future_candidates_exp = None + + future_concolic_tests = self.executor.submit( + generate_concolic_tests, self.test_cfg, self.args, self.function_to_optimize, self.function_to_optimize_ast + ) + futures = [*future_tests, future_optimization_candidates, future_concolic_tests] + if run_experiment: + future_candidates_exp = self.executor.submit( + self.local_aiservice_client.optimize_python_code, read_writable_code, read_only_context_code, - self.function_trace_id[:-4] + "EXP0" if run_experiment else self.function_trace_id, + self.function_trace_id[:-4] + "EXP1", N_CANDIDATES, - ExperimentMetadata(id=self.experiment_id, group="control") if run_experiment else None, + ExperimentMetadata(id=self.experiment_id, group="experiment"), ) - future_candidates_exp = None + futures.append(future_candidates_exp) - future_concolic_tests = executor.submit( - generate_concolic_tests, - self.test_cfg, - self.args, - self.function_to_optimize, - self.function_to_optimize_ast, - ) - futures = [*future_tests, future_optimization_candidates, future_concolic_tests] - if run_experiment: - future_candidates_exp = executor.submit( - self.local_aiservice_client.optimize_python_code, - read_writable_code, - read_only_context_code, - self.function_trace_id[:-4] + "EXP1", - N_CANDIDATES, - ExperimentMetadata(id=self.experiment_id, group="experiment"), - ) - futures.append(future_candidates_exp) - - # Wait for all futures to complete - concurrent.futures.wait(futures) - - # Retrieve results - candidates: list[OptimizedCandidate] = future_optimization_candidates.result() - if not candidates: - return Failure(f"/!\\ NO OPTIMIZATIONS GENERATED for {self.function_to_optimize.function_name}") - - candidates_experiment = future_candidates_exp.result() if future_candidates_exp else None - - # Process test generation results - - tests: list[GeneratedTests] = [] - for future in future_tests: - res = future.result() - if res: - ( - generated_test_source, - instrumented_behavior_test_source, - instrumented_perf_test_source, - test_behavior_path, - test_perf_path, - ) = res - tests.append( - GeneratedTests( - generated_original_test_source=generated_test_source, - instrumented_behavior_test_source=instrumented_behavior_test_source, - instrumented_perf_test_source=instrumented_perf_test_source, - behavior_file_path=test_behavior_path, - perf_file_path=test_perf_path, - ) + # Wait for all futures to complete + concurrent.futures.wait(futures) + + # Retrieve results + candidates: list[OptimizedCandidate] = future_optimization_candidates.result() + if not candidates: + return Failure(f"/!\\ NO OPTIMIZATIONS GENERATED for {self.function_to_optimize.function_name}") + + candidates_experiment = future_candidates_exp.result() if future_candidates_exp else None + + # Process test generation results + + tests: list[GeneratedTests] = [] + for future in future_tests: + res = future.result() + if res: + ( + generated_test_source, + instrumented_behavior_test_source, + instrumented_perf_test_source, + test_behavior_path, + test_perf_path, + ) = res + tests.append( + GeneratedTests( + generated_original_test_source=generated_test_source, + instrumented_behavior_test_source=instrumented_behavior_test_source, + instrumented_perf_test_source=instrumented_perf_test_source, + behavior_file_path=test_behavior_path, + perf_file_path=test_perf_path, ) - if not tests: - logger.warning(f"Failed to generate and instrument tests for {self.function_to_optimize.function_name}") - return Failure(f"/!\\ NO TESTS GENERATED for {self.function_to_optimize.function_name}") - function_to_concolic_tests, concolic_test_str = future_concolic_tests.result() - logger.info(f"Generated {len(tests)} tests for {self.function_to_optimize.function_name}") - console.rule() - generated_tests = GeneratedTestsList(generated_tests=tests) + ) + if not tests: + logger.warning(f"Failed to generate and instrument tests for {self.function_to_optimize.function_name}") + return Failure(f"/!\\ NO TESTS GENERATED for {self.function_to_optimize.function_name}") + function_to_concolic_tests, concolic_test_str = future_concolic_tests.result() + logger.info(f"Generated {len(tests)} tests for {self.function_to_optimize.function_name}") + console.rule() + generated_tests = GeneratedTestsList(generated_tests=tests) result = ( generated_tests, function_to_concolic_tests, diff --git a/codeflash/optimization/optimizer.py b/codeflash/optimization/optimizer.py index d66c3fcf0..83073d6d9 100644 --- a/codeflash/optimization/optimizer.py +++ b/codeflash/optimization/optimizer.py @@ -333,6 +333,7 @@ def run(self) -> None: continue finally: if function_optimizer is not None: + function_optimizer.executor.shutdown(wait=True) function_optimizer.cleanup_generated_files() ph("cli-optimize-run-finished", {"optimizations_found": optimizations_found})