diff --git a/rdagent/app/data_science/conf.py b/rdagent/app/data_science/conf.py index 2c1cb4a50..3bce7676b 100644 --- a/rdagent/app/data_science/conf.py +++ b/rdagent/app/data_science/conf.py @@ -179,6 +179,20 @@ class DataScienceBasePropSetting(KaggleBasePropSetting): ensemble_time_upper_bound: bool = False + ### Graph related + enable_node_restart : bool = True + """Enable node restart for failed nodes in the graph""" + + enable_node_a_restart : bool = True + + """Enable node A restart for failed nodes in the graph""" + + enable_node_b_restart : bool = True + """Enable node B restart for failed nodes in the graph""" + + + + DS_RD_SETTING = DataScienceBasePropSetting() # enable_cross_trace_diversity 和 llm_select_hypothesis should not be true at the same time diff --git a/rdagent/scenarios/data_science/proposal/exp_gen/proposal.py b/rdagent/scenarios/data_science/proposal/exp_gen/proposal.py index 46514a0ec..388e0ddf0 100644 --- a/rdagent/scenarios/data_science/proposal/exp_gen/proposal.py +++ b/rdagent/scenarios/data_science/proposal/exp_gen/proposal.py @@ -39,6 +39,7 @@ from rdagent.utils.agent.tpl import T from rdagent.utils.repo.diff import generate_diff_from_dict from rdagent.utils.workflow import wait_retry +from collections import OrderedDict _COMPONENT_META: Dict[str, Dict[str, Any]] = { "DataLoadSpec": { @@ -1271,6 +1272,206 @@ def get_all_hypotheses(self, problem_dict: dict, hypothesis_dict: dict) -> list[ ) ) return result + + def _get_exp_index(self, trace: DSTrace) -> int: + leaves: list[int] = trace.get_leaves() + if trace.sota_exp_to_submit is not None: + sota_submit_value = trace.sota_exp_to_submit.result.loc["ensemble"].iloc[0] + trace_scores = [] + for i, leaf in enumerate(leaves): + if leaf == trace.current_selection[0]: + continue + fb = trace.sota_experiment_fb(selection=(leaf,)) + if fb is None: + continue + final_score = fb[0].result.loc["ensemble"].iloc[0] + trace_scores.append((i, abs(final_score - sota_submit_value))) + if trace_scores: + return min(trace_scores, key=lambda item: item[1])[0] + return next((i for i, leaf in enumerate(leaves) if leaf != trace.current_selection[0])) + + @wait_retry(retry_n=5) + def merge_node_gen( + self, + trace: DSTrace, + component_desc: str, + sota_exp_desc: str, + enable_idea_pool: bool, + pipeline: bool = True, + exp_feedback_list_desc: str = "", + scenario_desc: str = "", + problems: dict = {}, + ) -> Dict: + + sota_exp_fb = trace.sota_experiment_fb(selection=trace.current_selection) + if sota_exp_fb: + sota_exp_desc = T("scenarios.data_science.share:describe.exp").r( + exp=sota_exp_fb[0], + heading="Best previous exploration of the scenario", + ) + eda_output = sota_exp_fb[0].experiment_workspace.file_dict.get("EDA.md", None) + else: + sota_exp_desc = "" + eda_output = None + + trace_fbs: list[tuple[DSExperiment, ExperimentFeedback]] = [] + # find the best exp to merge + leaves: list[int] = trace.get_leaves() + max_sota_retrieved_num_per_trace = max(DS_RD_SETTING.max_sota_retrieved_num * 2 // len(leaves), 4) + for leaf in leaves: + if leaf == trace.current_selection[0]: + continue + + trace_fbs.extend( + trace.experiment_and_feedback_list_after_init( + return_type="sota", + search_type="ancestors", + selection=(leaf,), + max_retrieve_num=max_sota_retrieved_num_per_trace, + ) + ) + + success_fb_list = list(set(trace_fbs)) + logger.info( + f"Merge Hypothesis: select {len(success_fb_list)} from {len(trace_fbs)} SOTA experiments found in {len(leaves)} traces" + ) + + if len(success_fb_list) > 0: + exp_to_merge_fb_desc = T("scenarios.data_science.proposal.exp_gen.merge:trace").r( + exp_and_feedback_list=success_fb_list, + type="success", + heading="Successful iterations:", + success_trial_desc="These trials are the steps or changes that led to the success of the solution to be merged", + pipeline=DS_RD_SETTING.coder_on_whole_pipeline, + ) + else: + exp_index = self._get_exp_index(trace) + exp_to_merge_fb = trace.sota_experiment_fb(selection=(exp_index,)) + if exp_to_merge_fb is None: + exp_to_merge_fb = trace.hist[exp_index] + + exp_to_merge_fb_desc = T("scenarios.data_science.share:describe.feedback").r( + exp_and_feedback=exp_to_merge_fb, + heading="The feedback for the solution to be merged", + ) + + component_desc = T("scenarios.data_science.share:component_description_in_pipeline").r() + + sys_prompt = T(".restart:hypothesis_gen.system").r( + component_desc=component_desc, + hypothesis_output_format=T(".prompts_v2:output_format.hypothesis").r( + pipeline=pipeline, enable_idea_pool=enable_idea_pool + ), + pipeline=pipeline, + ) + user_prompt = T(".restart:hypothesis_gen.user").r( + exp_and_feedback_list_desc=exp_to_merge_fb_desc, + sota_exp_desc=sota_exp_desc, + ) + response = APIBackend().build_messages_and_create_chat_completion( + user_prompt=user_prompt, + system_prompt=sys_prompt, + json_mode=True, + json_target_type=Dict[str, Dict[str, str | Dict[str, str | int]]], + ) + resp_dict = json.loads(response) + return resp_dict + + + def _get_scores(self,trace,loop_id2idx, loop_id_list,root_id) -> list: + id_and_scores = [] + for loop_id in loop_id_list: + if trace.hist[loop_id2idx[loop_id]][1].decision == True: + id_and_scores.append( + (root_id,loop_id,trace.hist[loop_id2idx[loop_id]][0].result.loc["ensemble"].iloc[0].round(3)) + ) + else: + id_and_scores.append((root_id,loop_id,-1)) + return id_and_scores + + + def identify_current_node_type(self, trace: DSTrace) -> str: + + competition = trace.scen.competition + + root_nodes = {} + parent_nodes = {} + for node in range(len(trace.hist)): + parents = trace.get_parents(node) + root_nodes[node] = parents[0] + parent_nodes[node] = parents[-2] if len(parents) > 1 else None + if hasattr(trace, "idx2loop_id"): + root_nodes = {trace.idx2loop_id[n]: trace.idx2loop_id[r] for n, r in root_nodes.items()} + parent_nodes = { + trace.idx2loop_id[n]: trace.idx2loop_id[r] if r is not None else r + for n, r in parent_nodes.items() + } + + + current_record_id = trace.current_selection[0] + current_loop_id = trace.idx2loop_id[current_record_id] + + + loop_id_list = self._get_path(current_loop_id, parent_nodes) + + loop_id2idx = {v: k for k, v in trace.idx2loop_id.items()} + + unique_roots = list(OrderedDict.fromkeys(root_nodes.values())) + + node_list_from_different_root = [ + [node for node, r in root_nodes.items() if r == root] + for root in unique_roots + ] + + score_list = [self._get_scores(trace,loop_id2idx ,l,root_id) + for l,root_id in zip(node_list_from_different_root, unique_roots)] + + all_nodes = [item for sublist in score_list for item in sublist] + + + current_parent_root = [ + root_id + for l in score_list + for root_id, loop_id, score in l + if loop_id == current_loop_id + ] + + mean_scores = [] + successful_rates = [] + for l in score_list: + valid_scores = [s for _, _, s in l if s != -1] + root = l[0][0] + mean_score = np.mean(valid_scores) if valid_scores else None + successful_rate = 100*len(valid_scores)/len(l) if l else 0.0 + successful_rates.append((root, successful_rate)) + mean_scores.append((root,mean_score)) + + + current_success_rate = next((rate for r, rate in successful_rates if r == current_parent_root), 0.0) + if current_success_rate > 50 : + #percentile=25 + min_threshold=0.7 + all_scores = np.array([score for _, score in mean_scores]) + bigger_is_better = get_metric_direction(competition) + + if bigger_is_better: + percentile = 75 + dynamic_threshold = max(np.percentile(all_scores, percentile), min_threshold) + root_score = next((score for root, score in mean_scores if root == current_parent_root), None) + if root_score is None or root_score < dynamic_threshold: + return "restart" + else: + return "explore" + else: + percentile = 25 + dynamic_threshold = min(np.percentile(all_scores, 100 - percentile), 1 - min_threshold) + root_score = next((score for root, score in mean_scores if root == current_parent_root), None) + if root_score is None or root_score > dynamic_threshold: + return "restart" + else: + return "explore" + else: + return "restart" def gen( self, @@ -1372,6 +1573,20 @@ def gen( is_new_tree=is_new_tree, sibling_exp=sibling_exp, ) + + node_type = "none" + if DS_RD_SETTING.enable_node_restart: + if len(trace.current_selection) != 0: + node_type = self.identify_current_node_type(trace) + + if node_type == "restart": + hypothesis_dict = self.merge_node_gen(trace= trace,component_desc = component_desc,sota_exp_desc = sota_exp_desc, + enable_idea_pool =DS_RD_SETTING.enable_knowledge_base,pipeline =pipeline,exp_feedback_list_desc=exp_feedback_list_desc, + scenario_desc=scenario_desc,problems = all_problems ) + else: + hypothesis_dict = hypothesis_dict + + if not pipeline: sota_exp_model_file_count = len( [ @@ -1388,6 +1603,8 @@ def gen( for name in pop_names: hypothesis_dict.pop(name) + + # Step 2.1 & 2.2: Hypothesis Critique and Rewrite Stage (controlled by enable_hypo_critique_rewrite) if DS_RD_SETTING.enable_hypo_critique_rewrite and len(trace.hist) > 0: logger.info(f"Hypothesis critique and rewrite enabled - processing {len(hypothesis_dict)} hypotheses") @@ -1424,7 +1641,8 @@ def gen( logger.info(f"Hypothesis critique and rewrite disabled - using original {len(hypothesis_dict)} hypotheses") # Step 3: Select the best hypothesis - if DS_RD_SETTING.llm_select_hypothesis: + + if DS_RD_SETTING.llm_select_hypothesis and node_type != "restart": response_dict = self.hypothesis_select_with_llm( scenario_desc=scenario_desc, exp_feedback_list_desc=exp_feedback_list_desc, @@ -1439,11 +1657,13 @@ def gen( ) pickled_problem_name = None else: + all_problems = {} pickled_problem_name, new_hypothesis = self.hypothesis_rank( hypothesis_dict=hypothesis_dict, problem_dict=all_problems, + selected_idx=0, ) - + # Step 3.5: Update knowledge base with the picked problem if DS_RD_SETTING.enable_knowledge_base: trace.knowledge_base.update_pickled_problem(all_problems, pickled_problem_name) diff --git a/rdagent/scenarios/data_science/proposal/exp_gen/restart.yaml b/rdagent/scenarios/data_science/proposal/exp_gen/restart.yaml new file mode 100644 index 000000000..24b7366dd --- /dev/null +++ b/rdagent/scenarios/data_science/proposal/exp_gen/restart.yaml @@ -0,0 +1,47 @@ +hypothesis_gen: + system: |- + {% include "scenarios.data_science.share:scen.role" %} + The user is improving a Kaggle competition implementation iteratively through traces where each new trace is modified from the current SOTA in the trace. If new trace surpasses the current SOTA, it will be the new SOTA. If not, it will be a failed experiment. + You will be provided with: + 1. A detailed competition scenario description; + 2. Previous SOTA experiments and feedbacks, which are past SOTA experiments indexed from oldest to newest; + 3. The current SOTA implementation and feedback, which is the latest SOTA experiments from the previous experiments; + 4. Extra implementations from another users' experiments; + Your task is to: + 1. **Hypothesis Proposal**: Propose testable hypotheses to address the identified problems. + 2. **Hypothesis Evaluation**: Evaluate the proposed hypotheses across multiple dimensions. + + # Task 1: Hypothesis Proposal + For each identified problem, propose a hypothesis to improve the current SOTA implementation. + + ## Hypothesis Guidelines + Here are few guidelines to help you formulate hypotheses: + 1. Previous Experiments Analysis + - For previous SOTA experiments, analyze insights and implicit patterns that can be leveraged to improve the current SOTA implementation. + - For failed experiments, think about the persistent problems they facing. If these experiments consistently failed due to time/memory constraints, prioritize changes on efficiency. + 2. Note on Time/Memory Constraints + - If prior experiments failed due to time/memory limitations, assume your new hypothesis will face the same constraints. In this case, prioritize efficiency and **ONLY** response to the problems related to time/memory constraints in your response dictionary. + - Besides, do not compromise performance merely for efficiency since the current SOTA implementation do not encounter the constraints. You should think about how to balance the efficiency and performance so that your new hypothesis can be executed successfully and achieve satisfactory performance. + + # Task 2: Hypothesis Evaluation + ## Evaluation Instruction + Firstly, you should tag the hypothesis with one of the following components. If the hypothesis is related to multiple components, you should choose the most relevant one. + {{ component_desc }} + After proposing the hypothesis, your second task is to evaluate the hypothesis from multiple dimensions. + + Secondly, please score the proposed hypothesis from 1 to 10 for each of the following dimensions (where 1 means lowest and 10 means highest): + 1. Problem-Hypothesis Alignment: How well the hypothesis addresses the identified problem. + 2. Expected Impact: The estimated improvement after applying the hypothesis to current SOTA implementation. + 3. Novelty: Degree of innovation compared to previous attempts. If the proposed hypothesis is similar to previous experiments' hypothesis, assign novelty score to one. + 4. Feasibility: The ease of implementing the proposed hypothesis in the current SOTA implementation. + 5. Risk-Reward Balance: The exploration-exploitation balance of the proposed hypothesis. + + ## Final Output Format in JSON Schema: + {{ hypothesis_output_format }} + + user: |- + # Ertra Experiments and Feedbacks + {{ exp_and_feedback_list_desc }} + + # Current SOTA Implementation + {{ sota_exp_desc }} \ No newline at end of file diff --git a/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py b/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py index 771209b88..b99130020 100644 --- a/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py +++ b/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py @@ -109,6 +109,7 @@ def select(self, trace: DSTrace) -> tuple[int, ...] | None: return None + # ====================================================================================== # Probabilistic Scheduler and its potential functions # ======================================================================================