Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ pre-commit:

# Auto lint with black.
auto-black:
$(PIPRUN) python -m black . --extend-exclude test/scripts --extend-exclude git_ignore_folder -l 120
$(PIPRUN) python -m black . --extend-exclude test/scripts --extend-exclude git_ignore_folder --extend-exclude .venv -l 120

# Auto lint with isort.
auto-isort:
$(PIPRUN) python -m isort . -s git_ignore_folder -s test/scripts
$(PIPRUN) python -m isort . -s git_ignore_folder -s test/scripts -s .venv

# Auto lint with toml-sort.
auto-toml-sort:
Expand Down
2 changes: 1 addition & 1 deletion rdagent/log/utils/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_first_session_file_after_duration(log_folder: str | Path, duration: str
session_obj: LoopBase = pickle.load(f)
timer = session_obj.timer
all_duration = timer.all_duration
remain_time_duration = timer.remain_time_duration
remain_time_duration = timer.remain_time()
if all_duration is None or remain_time_duration is None:
msg = "Timer is not configured"
raise ValueError(msg)
Expand Down
9 changes: 2 additions & 7 deletions rdagent/scenarios/data_science/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,14 @@ def record(self, prev_out: dict[str, Any]):
# set the local selection to the trace as global selection, then set the DAG parent for the trace
if exp.local_selection is not None:
self.trace.set_current_selection(exp.local_selection)
self.trace.sync_dag_parent_and_hist()

self.trace.hist.append((exp, prev_out["feedback"]))

self.trace.sync_dag_parent_and_hist((exp, prev_out["feedback"]))
else:
exp: DSExperiment = prev_out["direct_exp_gen"] if isinstance(e, CoderError) else prev_out["coding"]

# set the local selection to the trace as global selection, then set the DAG parent for the trace
if exp.local_selection is not None:
self.trace.set_current_selection(exp.local_selection)
self.trace.sync_dag_parent_and_hist()

self.trace.hist.append(
self.trace.sync_dag_parent_and_hist(
(
exp,
ExperimentFeedback.from_exception(e),
Expand Down
5 changes: 4 additions & 1 deletion rdagent/scenarios/data_science/proposal/exp_gen/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from rdagent.app.data_science.conf import DS_RD_SETTING
from rdagent.core.evolving_framework import KnowledgeBase
from rdagent.core.experiment import Experiment
from rdagent.core.proposal import ExperimentFeedback, Hypothesis, Trace
from rdagent.scenarios.data_science.experiment.experiment import COMPONENT, DSExperiment
from rdagent.scenarios.data_science.scen import DataScienceScen
Expand Down Expand Up @@ -93,6 +94,7 @@ def get_leaves(self) -> list[int, ...]:

def sync_dag_parent_and_hist(
self,
exp_and_fb: tuple[Experiment, ExperimentFeedback],
) -> None:
"""
Adding corresponding parent index to the dag_parent when the hist is going to be changed.
Expand All @@ -111,6 +113,7 @@ def sync_dag_parent_and_hist(
current_node_idx = len(self.hist) - 1

self.dag_parent.append((current_node_idx,))
self.hist.append(exp_and_fb)

def retrieve_search_list(
self,
Expand Down Expand Up @@ -171,7 +174,7 @@ def has_component(
def experiment_and_feedback_list_after_init(
self,
return_type: Literal["sota", "failed", "all"],
search_type: Literal["all", "ancestors"] = "all",
search_type: Literal["all", "ancestors"] = "ancestors",
selection: tuple[int, ...] | None = None,
max_retrieve_num: int | None = None,
) -> list[tuple[DSExperiment, ExperimentFeedback]]:
Expand Down
11 changes: 2 additions & 9 deletions rdagent/scenarios/data_science/proposal/exp_gen/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ def __init__(self, *args, **kwargs):
"rdagent.scenarios.data_science.proposal.exp_gen.DSExpGen", self.scen
)
self.merge_exp_gen = ExpGen2Hypothesis(self.scen)
self.trace_scheduler: TraceScheduler = RoundRobinScheduler()
self.max_trace_num = DS_RD_SETTING.max_trace_num
self.trace_scheduler: TraceScheduler = RoundRobinScheduler(DS_RD_SETTING.max_trace_num)

def gen(self, trace: "DSTrace") -> "Experiment":
raise NotImplementedError(
Expand Down Expand Up @@ -69,15 +68,9 @@ async def async_gen(self, trace: DSTrace, loop: LoopBase) -> DSExperiment:
else:
# set the knowledge base option back to False for the other traces
DS_RD_SETTING.enable_knowledge_base = False
# step 1: select the parant trace to expand
# Policy: if we have fewer traces than our target, start a new one.
if trace.sub_trace_count < self.max_trace_num:
local_selection = trace.NEW_ROOT
else:
# Otherwise, use the scheduler to pick an existing trace to expand.
local_selection = await self.trace_scheduler.select_trace(trace)

if loop.get_unfinished_loop_cnt(loop.loop_idx) < RD_AGENT_SETTINGS.get_max_parallel():
local_selection = await self.trace_scheduler.next(trace)

# set the local selection as the global current selection for the trace
trace.set_current_selection(local_selection)
Expand Down
56 changes: 31 additions & 25 deletions rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand All @@ -15,11 +16,14 @@ class TraceScheduler(ABC):
"""

@abstractmethod
async def select_trace(self, trace: DSTrace) -> tuple[int, ...]:
async def next(self, trace: DSTrace) -> tuple[int, ...]:
"""
Selects the next trace to expand.

This method must be async to allow for safe concurrent access.
For proposing selections, we have to follow the rules
- Suggest selection: suggest a selection that is suitable for the current trace.
- Suggested should be garenteed to be recorded at last!!!
- If no suitable selection is found, the function should async wait!!!!

Args:
trace: The DSTrace object containing the full experiment history.
Expand All @@ -39,31 +43,33 @@ class RoundRobinScheduler(TraceScheduler):
NOTE: we don't need to use asyncio.Lock here as the kickoff_loop ensures the ExpGen is always sequential, instead of parallel.
"""

def __init__(self):
def __init__(self, max_trace_num: int):
self.max_trace_num = max_trace_num
self._last_selected_leaf_id = -1
self.rec_commit_idx = 0 # the node before rec_idx is already committed.
self.uncommited_rec_status = defaultdict(int) # the uncommited record status

async def select_trace(self, trace: DSTrace) -> tuple[int, ...]:
async def next(self, trace: DSTrace) -> tuple[int, ...]:
"""
Atomically selects the next leaf node from the trace in order.
"""

leaves = trace.get_leaves()
if not leaves:
# This is the very first experiment in a new tree.
return trace.NEW_ROOT

# Find the index of the last selected leaf in the current list of leaves
try:
current_position = leaves.index(self._last_selected_leaf_id)
# Move to the next position, wrapping around if necessary
next_position = (current_position + 1) % len(leaves)
except ValueError:
# This can happen if the last selected leaf is no longer a leaf
# (it has been expanded) or if this is the first selection.
# In either case, start from the beginning.
next_position = 0

selected_leaf = leaves[next_position]
self._last_selected_leaf_id = selected_leaf

return (selected_leaf,)
while True:
# step 0: Commit the pending selections
for i in range(self.rec_commit_idx, len(trace.dag_parent)):
for p in trace.dag_parent[i]:
Copy link
Collaborator

@xuangu-fang xuangu-fang Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trace.dag_parent[i] is a single-element tuple now, is the for-loop here prepared for multi-pararnt case in future? If so, what if a parant have multi-children, and we will have duplicate operations on it

self.uncommited_rec_status[p] -= 1
self.rec_commit_idx = len(trace.hist)

# step 1: select the parant trace to expand
# Policy: if we have fewer traces than our target, start a new one.
if trace.sub_trace_count + self.uncommited_rec_status[trace.NEW_ROOT] < self.max_trace_num:
self.uncommited_rec_status[trace.NEW_ROOT] += 1
return trace.NEW_ROOT

# Step2: suggest a selection to a not expanding leave
leaves = trace.get_leaves()
for leaf in leaves:
if self.uncommited_rec_status[leaf] == 0:
self.uncommited_rec_status[leaf] += 1
return (leaf,)
await asyncio.sleep(1)