diff --git a/Makefile b/Makefile index 15c38391e..2df576a7d 100644 --- a/Makefile +++ b/Makefile @@ -119,11 +119,11 @@ pre-commit: # Auto lint with black. auto-black: - $(PIPRUN) python -m black . --extend-exclude test/scripts --extend-exclude git_ignore_folder -l 120 + $(PIPRUN) python -m black . --extend-exclude test/scripts --extend-exclude git_ignore_folder --extend-exclude .venv -l 120 # Auto lint with isort. auto-isort: - $(PIPRUN) python -m isort . -s git_ignore_folder -s test/scripts + $(PIPRUN) python -m isort . -s git_ignore_folder -s test/scripts -s .venv # Auto lint with toml-sort. auto-toml-sort: diff --git a/rdagent/log/utils/folder.py b/rdagent/log/utils/folder.py index ea551223a..453fa2158 100644 --- a/rdagent/log/utils/folder.py +++ b/rdagent/log/utils/folder.py @@ -24,7 +24,7 @@ def get_first_session_file_after_duration(log_folder: str | Path, duration: str session_obj: LoopBase = pickle.load(f) timer = session_obj.timer all_duration = timer.all_duration - remain_time_duration = timer.remain_time_duration + remain_time_duration = timer.remain_time() if all_duration is None or remain_time_duration is None: msg = "Timer is not configured" raise ValueError(msg) diff --git a/rdagent/scenarios/data_science/loop.py b/rdagent/scenarios/data_science/loop.py index 49f0538d4..e02adbad5 100644 --- a/rdagent/scenarios/data_science/loop.py +++ b/rdagent/scenarios/data_science/loop.py @@ -228,19 +228,14 @@ def record(self, prev_out: dict[str, Any]): # set the local selection to the trace as global selection, then set the DAG parent for the trace if exp.local_selection is not None: self.trace.set_current_selection(exp.local_selection) - self.trace.sync_dag_parent_and_hist() - - self.trace.hist.append((exp, prev_out["feedback"])) - + self.trace.sync_dag_parent_and_hist((exp, prev_out["feedback"])) else: exp: DSExperiment = prev_out["direct_exp_gen"] if isinstance(e, CoderError) else prev_out["coding"] # set the local selection to the trace as global selection, then set the DAG parent for the trace if exp.local_selection is not None: self.trace.set_current_selection(exp.local_selection) - self.trace.sync_dag_parent_and_hist() - - self.trace.hist.append( + self.trace.sync_dag_parent_and_hist( ( exp, ExperimentFeedback.from_exception(e), diff --git a/rdagent/scenarios/data_science/proposal/exp_gen/base.py b/rdagent/scenarios/data_science/proposal/exp_gen/base.py index b8d396c5c..046877ba5 100644 --- a/rdagent/scenarios/data_science/proposal/exp_gen/base.py +++ b/rdagent/scenarios/data_science/proposal/exp_gen/base.py @@ -3,6 +3,7 @@ from rdagent.app.data_science.conf import DS_RD_SETTING from rdagent.core.evolving_framework import KnowledgeBase +from rdagent.core.experiment import Experiment from rdagent.core.proposal import ExperimentFeedback, Hypothesis, Trace from rdagent.scenarios.data_science.experiment.experiment import COMPONENT, DSExperiment from rdagent.scenarios.data_science.scen import DataScienceScen @@ -93,6 +94,7 @@ def get_leaves(self) -> list[int, ...]: def sync_dag_parent_and_hist( self, + exp_and_fb: tuple[Experiment, ExperimentFeedback], ) -> None: """ Adding corresponding parent index to the dag_parent when the hist is going to be changed. @@ -111,6 +113,7 @@ def sync_dag_parent_and_hist( current_node_idx = len(self.hist) - 1 self.dag_parent.append((current_node_idx,)) + self.hist.append(exp_and_fb) def retrieve_search_list( self, @@ -171,7 +174,7 @@ def has_component( def experiment_and_feedback_list_after_init( self, return_type: Literal["sota", "failed", "all"], - search_type: Literal["all", "ancestors"] = "all", + search_type: Literal["all", "ancestors"] = "ancestors", selection: tuple[int, ...] | None = None, max_retrieve_num: int | None = None, ) -> list[tuple[DSExperiment, ExperimentFeedback]]: diff --git a/rdagent/scenarios/data_science/proposal/exp_gen/parallel.py b/rdagent/scenarios/data_science/proposal/exp_gen/parallel.py index 6fb947cae..9d05c5ed2 100644 --- a/rdagent/scenarios/data_science/proposal/exp_gen/parallel.py +++ b/rdagent/scenarios/data_science/proposal/exp_gen/parallel.py @@ -38,8 +38,7 @@ def __init__(self, *args, **kwargs): "rdagent.scenarios.data_science.proposal.exp_gen.DSExpGen", self.scen ) self.merge_exp_gen = ExpGen2Hypothesis(self.scen) - self.trace_scheduler: TraceScheduler = RoundRobinScheduler() - self.max_trace_num = DS_RD_SETTING.max_trace_num + self.trace_scheduler: TraceScheduler = RoundRobinScheduler(DS_RD_SETTING.max_trace_num) def gen(self, trace: "DSTrace") -> "Experiment": raise NotImplementedError( @@ -69,15 +68,9 @@ async def async_gen(self, trace: DSTrace, loop: LoopBase) -> DSExperiment: else: # set the knowledge base option back to False for the other traces DS_RD_SETTING.enable_knowledge_base = False - # step 1: select the parant trace to expand - # Policy: if we have fewer traces than our target, start a new one. - if trace.sub_trace_count < self.max_trace_num: - local_selection = trace.NEW_ROOT - else: - # Otherwise, use the scheduler to pick an existing trace to expand. - local_selection = await self.trace_scheduler.select_trace(trace) if loop.get_unfinished_loop_cnt(loop.loop_idx) < RD_AGENT_SETTINGS.get_max_parallel(): + local_selection = await self.trace_scheduler.next(trace) # set the local selection as the global current selection for the trace trace.set_current_selection(local_selection) diff --git a/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py b/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py index df4186727..b28be59a5 100644 --- a/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py +++ b/rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py @@ -2,6 +2,7 @@ import asyncio from abc import ABC, abstractmethod +from collections import defaultdict from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -15,11 +16,14 @@ class TraceScheduler(ABC): """ @abstractmethod - async def select_trace(self, trace: DSTrace) -> tuple[int, ...]: + async def next(self, trace: DSTrace) -> tuple[int, ...]: """ Selects the next trace to expand. - This method must be async to allow for safe concurrent access. + For proposing selections, we have to follow the rules + - Suggest selection: suggest a selection that is suitable for the current trace. + - Suggested should be garenteed to be recorded at last!!! + - If no suitable selection is found, the function should async wait!!!! Args: trace: The DSTrace object containing the full experiment history. @@ -39,31 +43,33 @@ class RoundRobinScheduler(TraceScheduler): NOTE: we don't need to use asyncio.Lock here as the kickoff_loop ensures the ExpGen is always sequential, instead of parallel. """ - def __init__(self): + def __init__(self, max_trace_num: int): + self.max_trace_num = max_trace_num self._last_selected_leaf_id = -1 + self.rec_commit_idx = 0 # the node before rec_idx is already committed. + self.uncommited_rec_status = defaultdict(int) # the uncommited record status - async def select_trace(self, trace: DSTrace) -> tuple[int, ...]: + async def next(self, trace: DSTrace) -> tuple[int, ...]: """ Atomically selects the next leaf node from the trace in order. """ - - leaves = trace.get_leaves() - if not leaves: - # This is the very first experiment in a new tree. - return trace.NEW_ROOT - - # Find the index of the last selected leaf in the current list of leaves - try: - current_position = leaves.index(self._last_selected_leaf_id) - # Move to the next position, wrapping around if necessary - next_position = (current_position + 1) % len(leaves) - except ValueError: - # This can happen if the last selected leaf is no longer a leaf - # (it has been expanded) or if this is the first selection. - # In either case, start from the beginning. - next_position = 0 - - selected_leaf = leaves[next_position] - self._last_selected_leaf_id = selected_leaf - - return (selected_leaf,) + while True: + # step 0: Commit the pending selections + for i in range(self.rec_commit_idx, len(trace.dag_parent)): + for p in trace.dag_parent[i]: + self.uncommited_rec_status[p] -= 1 + self.rec_commit_idx = len(trace.hist) + + # step 1: select the parant trace to expand + # Policy: if we have fewer traces than our target, start a new one. + if trace.sub_trace_count + self.uncommited_rec_status[trace.NEW_ROOT] < self.max_trace_num: + self.uncommited_rec_status[trace.NEW_ROOT] += 1 + return trace.NEW_ROOT + + # Step2: suggest a selection to a not expanding leave + leaves = trace.get_leaves() + for leaf in leaves: + if self.uncommited_rec_status[leaf] == 0: + self.uncommited_rec_status[leaf] += 1 + return (leaf,) + await asyncio.sleep(1)