diff --git a/codeflash/api/aiservice.py b/codeflash/api/aiservice.py index d921469d1..db78a64df 100644 --- a/codeflash/api/aiservice.py +++ b/codeflash/api/aiservice.py @@ -360,6 +360,7 @@ def log_results( # noqa: D417 is_correct: dict[str, bool] | None, optimized_line_profiler_results: dict[str, str] | None, metadata: dict[str, Any] | None, + optimizations_post: dict[str, str] | None = None, ) -> None: """Log features to the database. @@ -372,6 +373,7 @@ def log_results( # noqa: D417 - is_correct (Optional[Dict[str, bool]]): Whether the optimized code is correct. - optimized_line_profiler_results: line_profiler results for every candidate mapped to their optimization_id - metadata: contains the best optimization id + - optimizations_post - dict mapping opt id to code str after postprocessing """ payload = { @@ -383,6 +385,7 @@ def log_results( # noqa: D417 "codeflash_version": codeflash_version, "optimized_line_profiler_results": optimized_line_profiler_results, "metadata": metadata, + "optimizations_post": optimizations_post, } try: self.make_ai_service_request("/log_features", payload=payload, timeout=5) diff --git a/codeflash/optimization/function_optimizer.py b/codeflash/optimization/function_optimizer.py index 6905bf47c..d009c3b9c 100644 --- a/codeflash/optimization/function_optimizer.py +++ b/codeflash/optimization/function_optimizer.py @@ -67,6 +67,7 @@ GeneratedTests, GeneratedTestsList, OptimizationSet, + OptimizedCandidate, OptimizedCandidateResult, OriginalCodeBaseline, TestFile, @@ -99,7 +100,6 @@ CoverageData, FunctionCalledInTest, FunctionSource, - OptimizedCandidate, ) from codeflash.verification.verification_utils import TestConfig @@ -149,8 +149,8 @@ def __init__( self.generate_and_instrument_tests_results: ( tuple[GeneratedTestsList, dict[str, set[FunctionCalledInTest]], OptimizationSet] | None ) = None - self.valid_optimizations: list[BestOptimization] = ( - list() # TODO: Figure out the dataclass type for this # noqa: C408 + self.executor = concurrent.futures.ThreadPoolExecutor( + max_workers=N_TESTS_TO_GENERATE + 2 if self.experiment_id is None else N_TESTS_TO_GENERATE + 3 ) def can_be_optimized(self) -> Result[tuple[bool, CodeOptimizationContext, dict[Path, str]], str]: @@ -380,216 +380,261 @@ def determine_best_candidate( console.rule() candidates = deque(candidates) refinement_done = False + future_all_refinements: list[concurrent.futures.Future] = [] + ast_code_to_id = {} + valid_optimizations = [] + optimizations_post = {} # we need to overwrite some opt candidates' code strings as they are no longer evaluated, instead their shorter/longer versions might be evaluated # Start a new thread for AI service request, start loop in main thread # check if aiservice request is complete, when it is complete, append result to the candidates list - with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: - ai_service_client = self.aiservice_client if exp_type == "EXP0" else self.local_aiservice_client - future_line_profile_results = executor.submit( - ai_service_client.optimize_python_code_line_profiler, - source_code=code_context.read_writable_code.markdown, - dependency_code=code_context.read_only_context_code, - trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id, - line_profiler_results=original_code_baseline.line_profile_results["str_out"], - num_candidates=10, - experiment_metadata=ExperimentMetadata( - id=self.experiment_id, group="control" if exp_type == "EXP0" else "experiment" - ) - if self.experiment_id - else None, + ai_service_client = self.aiservice_client if exp_type == "EXP0" else self.local_aiservice_client + future_line_profile_results = self.executor.submit( + ai_service_client.optimize_python_code_line_profiler, + source_code=code_context.read_writable_code.markdown, + dependency_code=code_context.read_only_context_code, + trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id, + line_profiler_results=original_code_baseline.line_profile_results["str_out"], + num_candidates=10, + experiment_metadata=ExperimentMetadata( + id=self.experiment_id, group="control" if exp_type == "EXP0" else "experiment" ) - try: - candidate_index = 0 - original_len = len(candidates) - while candidates: - candidate_index += 1 - line_profiler_done = ( - True if future_line_profile_results is None else future_line_profile_results.done() - ) - if line_profiler_done and (future_line_profile_results is not None): - line_profile_results = future_line_profile_results.result() - candidates.extend(line_profile_results) - original_len += len(line_profile_results) - logger.info( - f"Added results from line profiler to candidates, total candidates now: {original_len}" - ) - future_line_profile_results = None - candidate = candidates.popleft() - get_run_tmp_file(Path(f"test_return_values_{candidate_index}.bin")).unlink(missing_ok=True) - get_run_tmp_file(Path(f"test_return_values_{candidate_index}.sqlite")).unlink(missing_ok=True) - logger.info(f"Optimization candidate {candidate_index}/{original_len}:") - code_print(candidate.source_code.flat) - try: - did_update = self.replace_function_and_helpers_with_optimized_code( - code_context=code_context, - optimized_code=candidate.source_code, - original_helper_code=original_helper_code, - ) - if not did_update: - logger.warning( - "No functions were replaced in the optimized code. Skipping optimization candidate." - ) - console.rule() - continue - except (ValueError, SyntaxError, cst.ParserSyntaxError, AttributeError) as e: - logger.error(e) - self.write_code_and_helpers( - self.function_to_optimize_source_code, - original_helper_code, - self.function_to_optimize.file_path, - ) - continue - - run_results = self.run_optimized_candidate( - optimization_candidate_index=candidate_index, - baseline_results=original_code_baseline, + if self.experiment_id + else None, + ) + try: + candidate_index = 0 + original_len = len(candidates) + while candidates: + candidate_index += 1 + line_profiler_done = True if future_line_profile_results is None else future_line_profile_results.done() + if line_profiler_done and (future_line_profile_results is not None): + line_profile_results = future_line_profile_results.result() + candidates.extend(line_profile_results) + original_len += len(line_profile_results) + logger.info(f"Added results from line profiler to candidates, total candidates now: {original_len}") + future_line_profile_results = None + candidate = candidates.popleft() + get_run_tmp_file(Path(f"test_return_values_{candidate_index}.bin")).unlink(missing_ok=True) + get_run_tmp_file(Path(f"test_return_values_{candidate_index}.sqlite")).unlink(missing_ok=True) + logger.info(f"Optimization candidate {candidate_index}/{original_len}:") + code_print(candidate.source_code.flat) + # map ast normalized code to diff len, unnormalized code + # map opt id to the shortest unnormalized code + try: + did_update = self.replace_function_and_helpers_with_optimized_code( + code_context=code_context, + optimized_code=candidate.source_code, original_helper_code=original_helper_code, - file_path_to_helper_classes=file_path_to_helper_classes, ) - console.rule() - - if not is_successful(run_results): - optimized_runtimes[candidate.optimization_id] = None - is_correct[candidate.optimization_id] = False - speedup_ratios[candidate.optimization_id] = None - else: - candidate_result: OptimizedCandidateResult = run_results.unwrap() - best_test_runtime = candidate_result.best_test_runtime - optimized_runtimes[candidate.optimization_id] = best_test_runtime - is_correct[candidate.optimization_id] = True - perf_gain = performance_gain( - original_runtime_ns=original_code_baseline.runtime, optimized_runtime_ns=best_test_runtime + if not did_update: + logger.warning( + "No functions were replaced in the optimized code. Skipping optimization candidate." ) - speedup_ratios[candidate.optimization_id] = perf_gain - - tree = Tree(f"Candidate #{candidate_index} - Runtime Information") - benchmark_tree = None - if speedup_critic( - candidate_result, original_code_baseline.runtime, best_runtime_until_now=None - ) and quantity_of_tests_critic(candidate_result): - tree.add( - "This candidate is faster than the original code. 🚀" - ) # TODO: Change this description - tree.add(f"Original summed runtime: {humanize_runtime(original_code_baseline.runtime)}") - tree.add( - f"Best summed runtime: {humanize_runtime(candidate_result.best_test_runtime)} " - f"(measured over {candidate_result.max_loop_count} " - f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" - ) - tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") - tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") - line_profile_test_results = self.line_profiler_step( - code_context=code_context, - original_helper_code=original_helper_code, - candidate_index=candidate_index, - ) - optimized_line_profiler_results[candidate.optimization_id] = line_profile_test_results[ - "str_out" - ] - replay_perf_gain = {} - if self.args.benchmark: - test_results_by_benchmark = ( - candidate_result.benchmarking_test_results.group_by_benchmarks( - self.total_benchmark_timings.keys(), self.replay_tests_dir, self.project_root - ) - ) - if len(test_results_by_benchmark) > 0: - benchmark_tree = Tree("Speedup percentage on benchmarks:") - for benchmark_key, candidate_test_results in test_results_by_benchmark.items(): - original_code_replay_runtime = ( - original_code_baseline.replay_benchmarking_test_results[ - benchmark_key - ].total_passed_runtime() - ) - candidate_replay_runtime = candidate_test_results.total_passed_runtime() - replay_perf_gain[benchmark_key] = performance_gain( - original_runtime_ns=original_code_replay_runtime, - optimized_runtime_ns=candidate_replay_runtime, - ) - benchmark_tree.add(f"{benchmark_key}: {replay_perf_gain[benchmark_key] * 100:.1f}%") - - best_optimization = BestOptimization( - candidate=candidate, - helper_functions=code_context.helper_functions, - code_context=code_context, - runtime=best_test_runtime, - line_profiler_test_results=line_profile_test_results, - winning_behavior_test_results=candidate_result.behavior_test_results, - replay_performance_gain=replay_perf_gain if self.args.benchmark else None, - winning_benchmarking_test_results=candidate_result.benchmarking_test_results, - winning_replay_benchmarking_test_results=candidate_result.benchmarking_test_results, - ) - self.valid_optimizations.append(best_optimization) - else: - tree.add( - f"Summed runtime: {humanize_runtime(best_test_runtime)} " - f"(measured over {candidate_result.max_loop_count} " - f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" - ) - tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") - tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") - console.print(tree) - if self.args.benchmark and benchmark_tree: - console.print(benchmark_tree) console.rule() - + continue + except (ValueError, SyntaxError, cst.ParserSyntaxError, AttributeError) as e: + logger.error(e) self.write_code_and_helpers( self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path ) + continue + # check if this code has been evaluated before by checking the ast normalized code string + normalized_code = ast.unparse(ast.parse(candidate.source_code.flat.strip())) + if normalized_code in ast_code_to_id: + past_opt_id = ast_code_to_id[normalized_code]["optimization_id"] + # update speedup ratio, is_correct, optimizations_post, optimized_line_profiler_results, optimized_runtimes + speedup_ratios[candidate.optimization_id] = speedup_ratios[past_opt_id] + is_correct[candidate.optimization_id] = is_correct[past_opt_id] + optimized_runtimes[candidate.optimization_id] = optimized_runtimes[past_opt_id] + # line profiler results only available for successful runs + if past_opt_id in optimized_line_profiler_results: + optimized_line_profiler_results[candidate.optimization_id] = optimized_line_profiler_results[ + past_opt_id + ] + optimizations_post[candidate.optimization_id] = ast_code_to_id[normalized_code][ + "shorter_source_code" + ].markdown + optimizations_post[past_opt_id] = ast_code_to_id[normalized_code]["shorter_source_code"].markdown + new_diff_len = diff_length(candidate.source_code.flat, code_context.read_writable_code.flat) + if new_diff_len < ast_code_to_id[normalized_code]["diff_len"]: + ast_code_to_id[normalized_code]["shorter_source_code"] = candidate.source_code + ast_code_to_id[normalized_code]["diff_len"] = new_diff_len + continue + ast_code_to_id[normalized_code] = { + "optimization_id": candidate.optimization_id, + "shorter_source_code": candidate.source_code, + "diff_len": diff_length(candidate.source_code.flat, code_context.read_writable_code.flat), + } + run_results = self.run_optimized_candidate( + optimization_candidate_index=candidate_index, + baseline_results=original_code_baseline, + original_helper_code=original_helper_code, + file_path_to_helper_classes=file_path_to_helper_classes, + ) + console.rule() - if (not len(candidates)) and ( - not line_profiler_done - ): # all original candidates processed but lp results haven't been processed - concurrent.futures.wait([future_line_profile_results]) - line_profile_results = future_line_profile_results.result() - candidates.extend(line_profile_results) - original_len += len(line_profile_results) - logger.info( - f"Added results from line profiler to candidates, total candidates now: {original_len}" + if not is_successful(run_results): + optimized_runtimes[candidate.optimization_id] = None + is_correct[candidate.optimization_id] = False + speedup_ratios[candidate.optimization_id] = None + else: + candidate_result: OptimizedCandidateResult = run_results.unwrap() + best_test_runtime = candidate_result.best_test_runtime + optimized_runtimes[candidate.optimization_id] = best_test_runtime + is_correct[candidate.optimization_id] = True + perf_gain = performance_gain( + original_runtime_ns=original_code_baseline.runtime, optimized_runtime_ns=best_test_runtime + ) + speedup_ratios[candidate.optimization_id] = perf_gain + + tree = Tree(f"Candidate #{candidate_index} - Runtime Information") + benchmark_tree = None + if speedup_critic( + candidate_result, original_code_baseline.runtime, best_runtime_until_now=None + ) and quantity_of_tests_critic(candidate_result): + tree.add("This candidate is faster than the original code. 🚀") # TODO: Change this description + tree.add(f"Original summed runtime: {humanize_runtime(original_code_baseline.runtime)}") + tree.add( + f"Best summed runtime: {humanize_runtime(candidate_result.best_test_runtime)} " + f"(measured over {candidate_result.max_loop_count} " + f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" + ) + tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") + tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") + line_profile_test_results = self.line_profiler_step( + code_context=code_context, + original_helper_code=original_helper_code, + candidate_index=candidate_index, ) - future_line_profile_results = None - - if len(candidates) == 0 and len(self.valid_optimizations) > 0 and not refinement_done: - # TODO: Instead of doing it all at once at the end, do it one by one as the optimizations - # are found. This way we can hide the time waiting for the LLM results. - trace_id = self.function_trace_id - if trace_id.endswith(("EXP0", "EXP1")): - trace_id = trace_id[:-4] + exp_type - # refinement_response is a dataclass with optimization_id, code and explanation - refinement_response = self.refine_optimizations( - valid_optimizations=self.valid_optimizations, - original_code_baseline=original_code_baseline, + optimized_line_profiler_results[candidate.optimization_id] = line_profile_test_results[ + "str_out" + ] + replay_perf_gain = {} + if self.args.benchmark: + test_results_by_benchmark = candidate_result.benchmarking_test_results.group_by_benchmarks( + self.total_benchmark_timings.keys(), self.replay_tests_dir, self.project_root + ) + if len(test_results_by_benchmark) > 0: + benchmark_tree = Tree("Speedup percentage on benchmarks:") + for benchmark_key, candidate_test_results in test_results_by_benchmark.items(): + original_code_replay_runtime = original_code_baseline.replay_benchmarking_test_results[ + benchmark_key + ].total_passed_runtime() + candidate_replay_runtime = candidate_test_results.total_passed_runtime() + replay_perf_gain[benchmark_key] = performance_gain( + original_runtime_ns=original_code_replay_runtime, + optimized_runtime_ns=candidate_replay_runtime, + ) + benchmark_tree.add(f"{benchmark_key}: {replay_perf_gain[benchmark_key] * 100:.1f}%") + + best_optimization = BestOptimization( + candidate=candidate, + helper_functions=code_context.helper_functions, code_context=code_context, - trace_id=trace_id, - ai_service_client=ai_service_client, - executor=executor, + runtime=best_test_runtime, + line_profiler_test_results=line_profile_test_results, + winning_behavior_test_results=candidate_result.behavior_test_results, + replay_performance_gain=replay_perf_gain if self.args.benchmark else None, + winning_benchmarking_test_results=candidate_result.benchmarking_test_results, + winning_replay_benchmarking_test_results=candidate_result.benchmarking_test_results, + ) + valid_optimizations.append(best_optimization) + # queue corresponding refined optimization for best optimization + if not candidate.optimization_id.endswith("refi"): + future_all_refinements.append( + self.refine_optimizations( + valid_optimizations=[best_optimization], + original_code_baseline=original_code_baseline, + code_context=code_context, + trace_id=self.function_trace_id[:-4] + exp_type + if self.experiment_id + else self.function_trace_id, + ai_service_client=ai_service_client, + executor=self.executor, + ) + ) + else: + tree.add( + f"Summed runtime: {humanize_runtime(best_test_runtime)} " + f"(measured over {candidate_result.max_loop_count} " + f"loop{'s' if candidate_result.max_loop_count > 1 else ''})" ) - candidates.extend(refinement_response) - print("Added candidates from refinement") - original_len += len(refinement_response) - refinement_done = True - except KeyboardInterrupt as e: + tree.add(f"Speedup percentage: {perf_gain * 100:.1f}%") + tree.add(f"Speedup ratio: {perf_gain + 1:.3f}X") + console.print(tree) + if self.args.benchmark and benchmark_tree: + console.print(benchmark_tree) + console.rule() + self.write_code_and_helpers( self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path ) - logger.exception(f"Optimization interrupted: {e}") - raise - if not len(self.valid_optimizations): + if ( + (not len(candidates)) and (not line_profiler_done) + ): # all original candidates processed but lp results haven't been processed, doesn't matter at the moment if we're done refining or not + concurrent.futures.wait([future_line_profile_results]) + line_profile_results = future_line_profile_results.result() + candidates.extend(line_profile_results) + original_len += len(line_profile_results) + logger.info(f"Added results from line profiler to candidates, total candidates now: {original_len}") + future_line_profile_results = None + # all original candidates and lp candidates processed, collect refinement candidates and append to candidate list + if (not len(candidates)) and line_profiler_done and not refinement_done: + # waiting just in case not all calls are finished, nothing else to do + concurrent.futures.wait(future_all_refinements) + refinement_response = [] + for future_refinement in future_all_refinements: + possible_refinement = future_refinement.result() + if len(possible_refinement) > 0: # if the api returns a valid response + refinement_response.append(possible_refinement[0]) + candidates.extend(refinement_response) + logger.info(f"Added {len(refinement_response)} candidates from refinement") + original_len += len(refinement_response) + refinement_done = True + except KeyboardInterrupt as e: + self.write_code_and_helpers( + self.function_to_optimize_source_code, original_helper_code, self.function_to_optimize.file_path + ) + logger.exception(f"Optimization interrupted: {e}") + raise + + if not valid_optimizations: return None # need to figure out the best candidate here before we return best_optimization + # reassign the shorter code here + valid_candidates_with_shorter_code = [] diff_lens_list = [] # character level diff runtimes_list = [] - for valid_opt in self.valid_optimizations: + for valid_opt in valid_optimizations: + valid_opt_normalized_code = ast.unparse(ast.parse(valid_opt.candidate.source_code.flat.strip())) + new_candidate_with_shorter_code = OptimizedCandidate( + source_code=ast_code_to_id[valid_opt_normalized_code]["shorter_source_code"], + optimization_id=valid_opt.candidate.optimization_id, + explanation=valid_opt.candidate.explanation, + ) + new_best_opt = BestOptimization( + candidate=new_candidate_with_shorter_code, + helper_functions=valid_opt.helper_functions, + code_context=valid_opt.code_context, + runtime=valid_opt.runtime, + line_profiler_test_results=valid_opt.line_profiler_test_results, + winning_behavior_test_results=valid_opt.winning_behavior_test_results, + replay_performance_gain=valid_opt.replay_performance_gain, + winning_benchmarking_test_results=valid_opt.winning_benchmarking_test_results, + winning_replay_benchmarking_test_results=valid_opt.winning_replay_benchmarking_test_results, + ) + valid_candidates_with_shorter_code.append(new_best_opt) diff_lens_list.append( - diff_length(valid_opt.candidate.source_code.flat, code_context.read_writable_code.flat) + diff_length(new_best_opt.candidate.source_code.flat, code_context.read_writable_code.flat) ) # char level diff - runtimes_list.append(valid_opt.runtime) + runtimes_list.append(new_best_opt.runtime) diff_lens_ranking = create_rank_dictionary_compact(diff_lens_list) runtimes_ranking = create_rank_dictionary_compact(runtimes_list) # TODO: better way to resolve conflicts with same min ranking overall_ranking = {key: diff_lens_ranking[key] + runtimes_ranking[key] for key in diff_lens_ranking.keys()} # noqa: SIM118 min_key = min(overall_ranking, key=overall_ranking.get) - best_optimization = self.valid_optimizations[min_key] + best_optimization = valid_candidates_with_shorter_code[min_key] + # reassign code string which is the shortest ai_service_client.log_results( function_trace_id=self.function_trace_id[:-4] + exp_type if self.experiment_id else self.function_trace_id, speedup_ratio=speedup_ratios, @@ -597,6 +642,7 @@ def determine_best_candidate( optimized_runtime=optimized_runtimes, is_correct=is_correct, optimized_line_profiler_results=optimized_line_profiler_results, + optimizations_post=optimizations_post, metadata={"best_optimization_id": best_optimization.candidate.optimization_id}, ) return best_optimization @@ -609,7 +655,7 @@ def refine_optimizations( trace_id: str, ai_service_client: AiServiceClient, executor: concurrent.futures.ThreadPoolExecutor, - ) -> list[OptimizedCandidate]: + ) -> concurrent.futures.Future: request = [ AIServiceRefinerRequest( optimization_id=opt.candidate.optimization_id, @@ -625,10 +671,8 @@ def refine_optimizations( optimized_line_profiler_results=opt.line_profiler_test_results["str_out"], ) for opt in valid_optimizations - ] # TODO: multiple workers for this? - future_refinement_results = executor.submit(ai_service_client.optimize_python_code_refinement, request=request) - concurrent.futures.wait([future_refinement_results]) - return future_refinement_results.result() + ] + return executor.submit(ai_service_client.optimize_python_code_refinement, request=request) def log_successful_optimization( self, explanation: Explanation, generated_tests: GeneratedTestsList, exp_type: str @@ -867,85 +911,79 @@ def generate_tests_and_optimizations( run_experiment: bool = False, # noqa: FBT001, FBT002 ) -> Result[tuple[GeneratedTestsList, dict[str, set[FunctionCalledInTest]], OptimizationSet], str]: assert len(generated_test_paths) == N_TESTS_TO_GENERATE - max_workers = N_TESTS_TO_GENERATE + 2 if not run_experiment else N_TESTS_TO_GENERATE + 3 console.rule() - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit the test generation task as future - future_tests = self.submit_test_generation_tasks( - executor, - testgen_context_code, - [definition.fully_qualified_name for definition in helper_functions], - generated_test_paths, - generated_perf_test_paths, - ) - future_optimization_candidates = executor.submit( - self.aiservice_client.optimize_python_code, + # Submit the test generation task as future + future_tests = self.submit_test_generation_tasks( + self.executor, + testgen_context_code, + [definition.fully_qualified_name for definition in helper_functions], + generated_test_paths, + generated_perf_test_paths, + ) + future_optimization_candidates = self.executor.submit( + self.aiservice_client.optimize_python_code, + read_writable_code.markdown, + read_only_context_code, + self.function_trace_id[:-4] + "EXP0" if run_experiment else self.function_trace_id, + N_CANDIDATES, + ExperimentMetadata(id=self.experiment_id, group="control") if run_experiment else None, + ) + future_candidates_exp = None + + future_concolic_tests = self.executor.submit( + generate_concolic_tests, self.test_cfg, self.args, self.function_to_optimize, self.function_to_optimize_ast + ) + futures = [*future_tests, future_optimization_candidates, future_concolic_tests] + if run_experiment: + future_candidates_exp = self.executor.submit( + self.local_aiservice_client.optimize_python_code, read_writable_code.markdown, read_only_context_code, - self.function_trace_id[:-4] + "EXP0" if run_experiment else self.function_trace_id, + self.function_trace_id[:-4] + "EXP1", N_CANDIDATES, - ExperimentMetadata(id=self.experiment_id, group="control") if run_experiment else None, + ExperimentMetadata(id=self.experiment_id, group="experiment"), ) - future_candidates_exp = None + futures.append(future_candidates_exp) - future_concolic_tests = executor.submit( - generate_concolic_tests, - self.test_cfg, - self.args, - self.function_to_optimize, - self.function_to_optimize_ast, - ) - futures = [*future_tests, future_optimization_candidates, future_concolic_tests] - if run_experiment: - future_candidates_exp = executor.submit( - self.local_aiservice_client.optimize_python_code, - read_writable_code.markdown, - read_only_context_code, - self.function_trace_id[:-4] + "EXP1", - N_CANDIDATES, - ExperimentMetadata(id=self.experiment_id, group="experiment"), - ) - futures.append(future_candidates_exp) - - # Wait for all futures to complete - concurrent.futures.wait(futures) - - # Retrieve results - candidates: list[OptimizedCandidate] = future_optimization_candidates.result() - if not candidates: - return Failure(f"/!\\ NO OPTIMIZATIONS GENERATED for {self.function_to_optimize.function_name}") - - candidates_experiment = future_candidates_exp.result() if future_candidates_exp else None - - # Process test generation results - - tests: list[GeneratedTests] = [] - for future in future_tests: - res = future.result() - if res: - ( - generated_test_source, - instrumented_behavior_test_source, - instrumented_perf_test_source, - test_behavior_path, - test_perf_path, - ) = res - tests.append( - GeneratedTests( - generated_original_test_source=generated_test_source, - instrumented_behavior_test_source=instrumented_behavior_test_source, - instrumented_perf_test_source=instrumented_perf_test_source, - behavior_file_path=test_behavior_path, - perf_file_path=test_perf_path, - ) + # Wait for all futures to complete + concurrent.futures.wait(futures) + + # Retrieve results + candidates: list[OptimizedCandidate] = future_optimization_candidates.result() + if not candidates: + return Failure(f"/!\\ NO OPTIMIZATIONS GENERATED for {self.function_to_optimize.function_name}") + + candidates_experiment = future_candidates_exp.result() if future_candidates_exp else None + + # Process test generation results + + tests: list[GeneratedTests] = [] + for future in future_tests: + res = future.result() + if res: + ( + generated_test_source, + instrumented_behavior_test_source, + instrumented_perf_test_source, + test_behavior_path, + test_perf_path, + ) = res + tests.append( + GeneratedTests( + generated_original_test_source=generated_test_source, + instrumented_behavior_test_source=instrumented_behavior_test_source, + instrumented_perf_test_source=instrumented_perf_test_source, + behavior_file_path=test_behavior_path, + perf_file_path=test_perf_path, ) - if not tests: - logger.warning(f"Failed to generate and instrument tests for {self.function_to_optimize.function_name}") - return Failure(f"/!\\ NO TESTS GENERATED for {self.function_to_optimize.function_name}") - function_to_concolic_tests, concolic_test_str = future_concolic_tests.result() - logger.info(f"Generated {len(tests)} tests for {self.function_to_optimize.function_name}") - console.rule() - generated_tests = GeneratedTestsList(generated_tests=tests) + ) + if not tests: + logger.warning(f"Failed to generate and instrument tests for {self.function_to_optimize.function_name}") + return Failure(f"/!\\ NO TESTS GENERATED for {self.function_to_optimize.function_name}") + function_to_concolic_tests, concolic_test_str = future_concolic_tests.result() + logger.info(f"Generated {len(tests)} tests for {self.function_to_optimize.function_name}") + console.rule() + generated_tests = GeneratedTestsList(generated_tests=tests) result = ( generated_tests, function_to_concolic_tests, @@ -1041,7 +1079,6 @@ def find_and_process_best_optimization( if candidates is None: continue - self.valid_optimizations = [] # reset for each experiment best_optimization = self.determine_best_candidate( candidates=candidates, code_context=code_context, diff --git a/codeflash/optimization/optimizer.py b/codeflash/optimization/optimizer.py index d66c3fcf0..83073d6d9 100644 --- a/codeflash/optimization/optimizer.py +++ b/codeflash/optimization/optimizer.py @@ -333,6 +333,7 @@ def run(self) -> None: continue finally: if function_optimizer is not None: + function_optimizer.executor.shutdown(wait=True) function_optimizer.cleanup_generated_files() ph("cli-optimize-run-finished", {"optimizations_found": optimizations_found})