Skip to content

Commit 759f295

Browse files
feat: add pipeline coder (#742)
* init commit * limit problem numbers * ensemble lower case * add runtime and spec to coder * submission check notice * sub EDA in sample execution * avoid lightgbm * add time limit to scenario * rephrase the submission check * give positive feedback when facing warning in check * ENABLE FEEDBACK * fix feedback bug --------- Co-authored-by: Xu Yang <[email protected]>
1 parent 7200682 commit 759f295

File tree

17 files changed

+650
-39
lines changed

17 files changed

+650
-39
lines changed

rdagent/app/data_science/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class DataScienceBasePropSetting(KaggleBasePropSetting):
2525
spec_enabled: bool = True
2626

2727
proposal_version: str = "v1"
28+
coder_on_whole_pipeline: bool = False
2829

2930
coder_max_loop: int = 10
3031
runner_max_loop: int = 3

rdagent/app/data_science/loop.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from rdagent.components.coder.data_science.feature.exp import FeatureTask
1111
from rdagent.components.coder.data_science.model import ModelCoSTEER
1212
from rdagent.components.coder.data_science.model.exp import ModelTask
13+
from rdagent.components.coder.data_science.pipeline import PipelineCoSTEER
14+
from rdagent.components.coder.data_science.pipeline.exp import PipelineTask
1315
from rdagent.components.coder.data_science.raw_data_loader import DataLoaderCoSTEER
1416
from rdagent.components.coder.data_science.raw_data_loader.exp import DataLoaderTask
1517
from rdagent.components.coder.data_science.workflow import WorkflowCoSTEER
@@ -54,6 +56,8 @@ def __init__(self, PROP_SETTING: BasePropSetting):
5456
self.ensemble_coder = EnsembleCoSTEER(scen)
5557
self.workflow_coder = WorkflowCoSTEER(scen)
5658

59+
self.pipeline_coder = PipelineCoSTEER(scen)
60+
5761
self.runner = DSCoSTEERRunner(scen)
5862
# self.summarizer: Experiment2Feedback = import_class(PROP_SETTING.summarizer)(scen)
5963
# logger.log_object(self.summarizer, tag="summarizer")
@@ -86,6 +90,8 @@ def coding(self, prev_out: dict[str, Any]):
8690
exp = self.ensemble_coder.develop(exp)
8791
elif isinstance(exp.sub_tasks[0], WorkflowTask):
8892
exp = self.workflow_coder.develop(exp)
93+
elif isinstance(exp.sub_tasks[0], PipelineTask):
94+
exp = self.pipeline_coder.develop(exp)
8995
else:
9096
raise NotImplementedError(f"Unsupported component in DataScienceRDLoop: {exp.hypothesis.component}")
9197
exp.sub_tasks = []
@@ -106,7 +112,7 @@ def feedback(self, prev_out: dict[str, Any]) -> ExperimentFeedback:
106112
- If we come to feedback phase, the previous development steps are successful.
107113
"""
108114
exp: DSExperiment = prev_out["running"]
109-
if self.trace.next_incomplete_component() is None:
115+
if self.trace.next_incomplete_component() is None or DS_RD_SETTING.coder_on_whole_pipeline:
110116
# we have alreadly completed components in previous trace. So current loop is focusing on a new proposed idea.
111117
# So we need feedback for the proposal.
112118
feedback = self.summarizer.generate_feedback(exp, self.trace)
@@ -130,7 +136,11 @@ def record(self, prev_out: dict[str, Any]):
130136
ExperimentFeedback.from_exception(e),
131137
)
132138
)
133-
if self.trace.sota_experiment() is None and len(self.trace.hist) >= DS_RD_SETTING.consecutive_errors:
139+
if (
140+
self.trace.sota_experiment() is None
141+
and len(self.trace.hist) >= DS_RD_SETTING.consecutive_errors
142+
and not DS_RD_SETTING.coder_on_whole_pipeline
143+
):
134144
# if {in inital/drafting stage} and {tried enough times}
135145
for _, fb in self.trace.hist[-DS_RD_SETTING.consecutive_errors :]:
136146
if fb:
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
"""
2+
3+
Loop should not large change exclude
4+
- Action Choice[current data loader & spec]
5+
- other should share
6+
- Propose[choice] => Task[Choice] => CoSTEER =>
7+
-
8+
9+
Extra feature:
10+
- cache
11+
12+
13+
File structure
14+
- ___init__.py: the entrance/agent of coder
15+
- evaluator.py
16+
- conf.py
17+
- exp.py: everything under the experiment, e.g.
18+
- Task
19+
- Experiment
20+
- Workspace
21+
- test.py
22+
- Each coder could be tested.
23+
"""
24+
25+
import json
26+
import re
27+
from pathlib import Path
28+
from typing import Dict
29+
30+
from rdagent.app.data_science.conf import DS_RD_SETTING
31+
from rdagent.components.coder.CoSTEER import CoSTEER
32+
from rdagent.components.coder.CoSTEER.evaluators import (
33+
CoSTEERMultiEvaluator,
34+
CoSTEERSingleFeedback,
35+
)
36+
from rdagent.components.coder.CoSTEER.evolving_strategy import (
37+
MultiProcessEvolvingStrategy,
38+
)
39+
from rdagent.components.coder.CoSTEER.knowledge_management import (
40+
CoSTEERQueriedKnowledge,
41+
)
42+
from rdagent.components.coder.data_science.conf import (
43+
DSCoderCoSTEERSettings,
44+
get_ds_env,
45+
)
46+
from rdagent.components.coder.data_science.pipeline.eval import PipelineCoSTEEREvaluator
47+
from rdagent.components.coder.data_science.raw_data_loader.eval import (
48+
DataLoaderCoSTEEREvaluator,
49+
)
50+
from rdagent.components.coder.data_science.raw_data_loader.exp import DataLoaderTask
51+
from rdagent.core.exception import CoderError
52+
from rdagent.core.experiment import FBWorkspace
53+
from rdagent.core.scenario import Scenario
54+
from rdagent.oai.llm_utils import APIBackend
55+
from rdagent.utils.agent.ret import PythonAgentOut
56+
from rdagent.utils.agent.tpl import T
57+
58+
DIRNAME = Path(__file__).absolute().resolve().parent
59+
60+
61+
class PipelineMultiProcessEvolvingStrategy(MultiProcessEvolvingStrategy):
62+
def implement_one_task(
63+
self,
64+
target_task: DataLoaderTask,
65+
queried_knowledge: CoSTEERQueriedKnowledge | None = None,
66+
workspace: FBWorkspace | None = None,
67+
prev_task_feedback: CoSTEERSingleFeedback | None = None,
68+
) -> dict[str, str]:
69+
competition_info = self.scen.get_scenario_all_desc()
70+
runtime_environment = self.scen.get_runtime_environment()
71+
data_folder_info = self.scen.processed_data_folder_description
72+
pipeline_task_info = target_task.get_task_information()
73+
74+
queried_similar_successful_knowledge = (
75+
queried_knowledge.task_to_similar_task_successful_knowledge[pipeline_task_info]
76+
if queried_knowledge is not None
77+
else []
78+
)
79+
queried_former_failed_knowledge = (
80+
queried_knowledge.task_to_former_failed_traces[pipeline_task_info] if queried_knowledge is not None else []
81+
)
82+
queried_former_failed_knowledge = (
83+
[
84+
knowledge
85+
for knowledge in queried_former_failed_knowledge[0]
86+
if knowledge.implementation.file_dict.get("main.py") != workspace.file_dict.get("main.py")
87+
],
88+
queried_former_failed_knowledge[1],
89+
)
90+
91+
system_prompt = T(".prompts:pipeline_coder.system").r(
92+
task_desc=pipeline_task_info,
93+
queried_similar_successful_knowledge=queried_similar_successful_knowledge,
94+
queried_former_failed_knowledge=queried_former_failed_knowledge[0],
95+
out_spec=PythonAgentOut.get_spec(),
96+
runtime_environment=runtime_environment,
97+
spec=T("scenarios.data_science.share:component_spec.Pipeline").r(),
98+
)
99+
user_prompt = T(".prompts:pipeline_coder.user").r(
100+
competition_info=competition_info,
101+
folder_spec=data_folder_info,
102+
latest_code=workspace.file_dict.get("main.py"),
103+
latest_code_feedback=prev_task_feedback,
104+
)
105+
106+
for _ in range(5):
107+
pipeline_code = PythonAgentOut.extract_output(
108+
APIBackend().build_messages_and_create_chat_completion(
109+
user_prompt=user_prompt,
110+
system_prompt=system_prompt,
111+
)
112+
)
113+
if pipeline_code != workspace.file_dict.get("main.py"):
114+
break
115+
else:
116+
user_prompt = user_prompt + "\nPlease avoid generating same code to former code!"
117+
else:
118+
raise CoderError("Failed to generate a new pipeline code.")
119+
120+
return {
121+
"main.py": pipeline_code,
122+
}
123+
124+
def assign_code_list_to_evo(self, code_list: list[dict[str, str]], evo):
125+
"""
126+
Assign the code list to the evolving item.
127+
128+
The code list is aligned with the evolving item's sub-tasks.
129+
If a task is not implemented, put a None in the list.
130+
"""
131+
for index in range(len(evo.sub_tasks)):
132+
if code_list[index] is None:
133+
continue
134+
if evo.sub_workspace_list[index] is None:
135+
# evo.sub_workspace_list[index] = FBWorkspace(target_task=evo.sub_tasks[index])
136+
evo.sub_workspace_list[index] = evo.experiment_workspace
137+
evo.sub_workspace_list[index].inject_files(**code_list[index])
138+
return evo
139+
140+
141+
class PipelineCoSTEER(CoSTEER):
142+
def __init__(
143+
self,
144+
scen: Scenario,
145+
*args,
146+
**kwargs,
147+
) -> None:
148+
settings = DSCoderCoSTEERSettings()
149+
eva = CoSTEERMultiEvaluator(
150+
PipelineCoSTEEREvaluator(scen=scen), scen=scen
151+
) # Please specify whether you agree running your eva in parallel or not
152+
es = PipelineMultiProcessEvolvingStrategy(scen=scen, settings=settings)
153+
154+
super().__init__(
155+
*args,
156+
settings=settings,
157+
eva=eva,
158+
es=es,
159+
evolving_version=2,
160+
scen=scen,
161+
max_loop=DS_RD_SETTING.coder_max_loop,
162+
**kwargs,
163+
)
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# tess successfully running.
2+
# (GPT) if it aligns with the spec & rationality of the spec.
3+
import json
4+
import re
5+
from pathlib import Path
6+
7+
import pandas as pd
8+
9+
from rdagent.app.data_science.conf import DS_RD_SETTING
10+
from rdagent.components.coder.CoSTEER import CoSTEERMultiFeedback
11+
from rdagent.components.coder.CoSTEER.evaluators import (
12+
CoSTEEREvaluator,
13+
CoSTEERSingleFeedback,
14+
)
15+
from rdagent.components.coder.CoSTEER.knowledge_management import (
16+
CoSTEERQueriedKnowledgeV2,
17+
)
18+
from rdagent.components.coder.data_science.conf import get_ds_env
19+
from rdagent.core.experiment import FBWorkspace, Task
20+
from rdagent.utils.agent.tpl import T
21+
from rdagent.utils.agent.workflow import build_cls_from_json_with_retry
22+
23+
DIRNAME = Path(__file__).absolute().resolve().parent
24+
25+
PipelineSingleFeedback = CoSTEERSingleFeedback
26+
PipelineMultiFeedback = CoSTEERMultiFeedback
27+
28+
29+
class PipelineCoSTEEREvaluator(CoSTEEREvaluator):
30+
31+
def evaluate(
32+
self,
33+
target_task: Task,
34+
implementation: FBWorkspace,
35+
gt_implementation: FBWorkspace,
36+
queried_knowledge: CoSTEERQueriedKnowledgeV2 = None,
37+
**kwargs,
38+
) -> PipelineSingleFeedback:
39+
40+
target_task_information = target_task.get_task_information()
41+
if (
42+
queried_knowledge is not None
43+
and target_task_information in queried_knowledge.success_task_to_knowledge_dict
44+
):
45+
return queried_knowledge.success_task_to_knowledge_dict[target_task_information].feedback
46+
elif queried_knowledge is not None and target_task_information in queried_knowledge.failed_task_info_set:
47+
return PipelineSingleFeedback(
48+
execution="This task has failed too many times, skip implementation.",
49+
return_checking="This task has failed too many times, skip implementation.",
50+
code="This task has failed too many times, skip implementation.",
51+
final_decision=False,
52+
)
53+
54+
env = get_ds_env()
55+
env.conf.extra_volumes = {f"{DS_RD_SETTING.local_data_path}/sample/{self.scen.competition}": "/kaggle/input"}
56+
57+
# Clean the scores.csv & submission.csv.
58+
implementation.execute(env=env, entry=f"rm submission.csv scores.csv")
59+
stdout = implementation.execute(env=env, entry=f"python main.py")
60+
stdout = re.sub(r"=== Start of EDA part ===(.*)=== End of EDA part ===", "", stdout)
61+
62+
score_fp = implementation.workspace_path / "scores.csv"
63+
score_ret_code = 0
64+
score_check_text = ""
65+
if not score_fp.exists():
66+
score_check_text = "[Error] Metrics file (scores.csv) is not generated!"
67+
score_ret_code = 1
68+
else:
69+
try:
70+
score_df = pd.read_csv(score_fp, index_col=0)
71+
model_set_in_scores = set(score_df.index)
72+
73+
# Check model names (index)
74+
if "ensemble" not in model_set_in_scores:
75+
score_check_text += (
76+
f"\n[Error] The score dataframe doesn't contain the ensemble model.\nscore_df is:\n{score_df}"
77+
)
78+
score_ret_code = 1
79+
80+
# Check metric name (columns)
81+
if score_df.columns.tolist() != [self.scen.metric_name]:
82+
score_check_text += f"\n[Error] The scores dataframe does not contain the correct column names.\nCorrect columns is: ['{self.scen.metric_name}']\nBut got: {score_df.columns.tolist()}"
83+
score_ret_code = 1
84+
85+
except Exception as e:
86+
score_check_text += f"\n[Error] in checking the scores.csv file: {e}\nscores.csv's content:\n-----\n{score_fp.read_text()}\n-----"
87+
score_ret_code = 1
88+
89+
# Check submission file
90+
base_check_code = (DIRNAME / "eval_tests" / "submission_format_test.txt").read_text()
91+
implementation.inject_files(**{"test/submission_format_test.py": base_check_code})
92+
# stdout += "----Submission Check 1-----\n"
93+
submission_check_out, submission_ret_code = implementation.execute_ret_code(
94+
env=env, entry="python test/submission_format_test.py"
95+
)
96+
stdout += "\n" + submission_check_out
97+
98+
system_prompt = T(".prompts:pipeline_eval.system").r(
99+
scenario=self.scen.get_scenario_all_desc(),
100+
task_desc=target_task.get_task_information(),
101+
spec=T("scenarios.data_science.share:component_spec.Pipeline").r(),
102+
)
103+
user_prompt = T(".prompts:pipeline_eval.user").r(
104+
stdout=stdout.strip(),
105+
code=implementation.file_dict["main.py"],
106+
)
107+
wfb = build_cls_from_json_with_retry(
108+
PipelineSingleFeedback,
109+
system_prompt=system_prompt,
110+
user_prompt=user_prompt,
111+
init_kwargs_update_func=PipelineSingleFeedback.val_and_update_init_dict,
112+
)
113+
if score_ret_code != 0:
114+
wfb.final_decision = False
115+
wfb.return_checking += "\n" + score_check_text
116+
if submission_ret_code != 0:
117+
wfb.final_decision = False
118+
wfb.return_checking += "\nSubmission file check failed."
119+
return wfb

0 commit comments

Comments
 (0)