1- import hashlib
21import logging
32import shutil
43from typing import Any , cast
1615from futurehouse_client import FutureHouseClient
1716
1817from .notebook_env import NBEnvironment
19- from .utils import NBLanguage , MultipleChoiceQuestion , nb_to_html
18+ from .utils import NBLanguage , MultipleChoiceQuestion
2019from . import prompts
2120from . import config as cfg
2221
@@ -150,7 +149,7 @@ def export_frame(self) -> Frame:
150149 "done" : self .state .done ,
151150 "total_reward" : self .state .total_reward ,
152151 "nb_state" : self .state .nb ,
153- "nb_state_html" : nb_to_html (self .state .nb ),
152+ # "nb_state_html": nb_to_html(self.state.nb), # temporarily disabled
154153 "nb_runtime_errors" : self .state .notebook_runtime_errors ,
155154 },
156155 info = {
@@ -168,6 +167,8 @@ def from_task(
168167 cls ,
169168 task : str ,
170169 gcs_artifact_path : str | None = None ,
170+ trajectory_id : str | None = None ,
171+ user_id : str | None = None ,
171172 environment_config : dict [str , Any ] | None = None ,
172173 ) -> "DataAnalysisEnv" :
173174 """
@@ -178,9 +179,11 @@ def from_task(
178179 gcs_artifact_path: The path to the GCS artifact – required for evaluation on crow jobs
179180 environment_config: A JSON string of environment configuration
180181 """
181- logger .info ("User task: %s" , task [:100 ])
182+ logger .info ("User task: %s" , task [:50 ])
182183 logger .info ("GCS artifact path: %s" , gcs_artifact_path )
183184 logger .info ("environment_config: %s" , environment_config )
185+ logger .info ("trajectory_id: %s" , trajectory_id )
186+ logger .info ("user_id: %s" , user_id )
184187 # Track cost of running the environment
185188 enable_cost_tracking ()
186189 if (
@@ -190,6 +193,22 @@ def from_task(
190193 "Running crow jobs without gcs_artifact_path is not supported"
191194 )
192195
196+ if user_id is None :
197+ user_id = "default_user"
198+ if trajectory_id is None :
199+ trajectory_id = f"{ gcs_artifact_path } -{ time .time ()} "
200+
201+ # Always create a new directory for the trajectory
202+ trajectory_path = (
203+ cfg .DATA_STORAGE_PATH / "user_trajectories" / user_id / trajectory_id
204+ )
205+ logger .info ("Trajectory path: %s" , trajectory_path )
206+ trajectory_path .mkdir (parents = True , exist_ok = True )
207+ for item in (cfg .DATA_STORAGE_PATH / gcs_artifact_path ).iterdir ():
208+ if item .is_file ():
209+ shutil .copy2 (item , trajectory_path )
210+ elif item .is_dir ():
211+ shutil .copytree (item , trajectory_path / item .name , dirs_exist_ok = True )
193212 if environment_config :
194213 kwargs = {
195214 k : v
@@ -200,39 +219,27 @@ def from_task(
200219 kwargs = {}
201220 environment_config = {}
202221 logger .info ("Filtered kwargs: %s" , kwargs )
203- task_hash = hashlib .sha256 (task .encode ()).hexdigest ()
204- if environment_config .get ("eval" , False ):
205- logger .info ("Eval mode is True" )
206- # Create a temporary directory in GCP mounted storage volume
207- trajectory_path = cfg .DATA_STORAGE_PATH / f"{ task_hash } -{ time .time ()} "
208- trajectory_path .mkdir (parents = True , exist_ok = True )
209- for item in (cfg .DATA_STORAGE_PATH / gcs_artifact_path ).iterdir ():
210- if item .is_file ():
211- shutil .copy2 (item , trajectory_path )
212- elif item .is_dir ():
213- shutil .copytree (
214- item , trajectory_path / item .name , dirs_exist_ok = True
215- )
216- else :
217- logger .info ("Eval mode is False" )
218- # Use the GCP folder created when uploading the data via the platform
219- trajectory_path = cfg .DATA_STORAGE_PATH / gcs_artifact_path
220- # Augment incoming user query with CoT instructions
222+
223+ language = getattr (NBLanguage , environment_config .get ("language" , "PYTHON" ))
224+ # Overwrite the language in the kwargs with NBLanguage enum
225+ kwargs ["language" ] = language
226+ logger .info ("Language: %s" , language .name )
227+
228+ if not environment_config .get ("eval" , False ):
229+ logger .info (
230+ "Platform job detected, augmenting user query with CoT instructions"
231+ )
232+ # If running via the platform, augment incoming user query with CoT instructions
221233 task = (
222- f"Here is the user query to address:\n "
234+ f"{ prompts .CHAIN_OF_THOUGHT_AGNOSTIC .format (language = kwargs .get ('language' , 'PYTHON' ))} \n "
235+ f"{ prompts .GENERAL_NOTEBOOK_GUIDELINES .format (language = kwargs .get ('language' , 'PYTHON' ))} "
236+ f"Here is the research question to address:\n "
223237 f"<query>\n "
224238 f"{ task } \n "
225239 f"</query>\n "
226- f"{ prompts .CHAIN_OF_THOUGHT_AGNOSTIC .format (language = kwargs .get ('language' , 'PYTHON' ))} \n "
227- f"{ prompts .GENERAL_NOTEBOOK_GUIDELINES .format (language = kwargs .get ('language' , 'PYTHON' ))} "
228240 )
229- logger .info ("Trajectory path: %s" , trajectory_path )
230241 nb_path = trajectory_path / NBEnvironment .NOTEBOOK_NAME
231242 logger .info ("NB path: %s" , nb_path )
232- language = getattr (NBLanguage , environment_config .get ("language" , "PYTHON" ))
233- # Overwrite the language in the kwargs with NBLanguage enum
234- kwargs ["language" ] = language
235- logger .info ("Language: %s" , language .name )
236243
237244 if trajectory_path .exists ():
238245 files = list (trajectory_path .iterdir ())
@@ -245,12 +252,14 @@ def from_task(
245252 raise ValueError (f"Trajectory path does not exist: { trajectory_path } " )
246253
247254 return cls (
248- problem_id = f"data-analysis-task-{ task_hash } " ,
255+ problem_id = f"data-analysis-task-{ trajectory_id } " ,
249256 problem = task ,
250257 eval_mode = EvalAnswerMode .LLM ,
251258 nb_path = nb_path ,
252259 work_dir = trajectory_path ,
253- system_prompt = prompts .CAPSULE_SYSTEM_PROMPT_QUERY ,
260+ system_prompt = environment_config .get (
261+ "system_prompt" , prompts .CAPSULE_SYSTEM_PROMPT_QUERY
262+ ),
254263 use_tmp_work_dir = False ,
255264 ** kwargs ,
256265 )
0 commit comments