Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion rdagent/core/proposal.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,29 @@ class Trace(Generic[ASpecificScen, ASpecificKB]):

def __init__(self, scen: ASpecificScen, knowledge_base: ASpecificKB | None = None) -> None:
self.scen: ASpecificScen = scen

# BEGIN: graph structure -------------------------
self.hist: list[Trace.NodeType] = (
[]
) # List of tuples containing experiments and their feedback, organized over time.
self.dag_parent: list[tuple[int, ...]] = [] # List of tuples representing parent indices in the DAG structure.
# (,) represents no parent; (1,) presents one parent; (1, 2) represents two parents.
# Definition:
# - (,) represents no parent (root node in one tree);
# - (1,) presents one parent;
# - (1, 2) represents two parents (Multiple parent is not implemented yet).
# Syntax sugar for the parent relationship:
# - Only for selection:
# - (-1,) indicates that select the last record node as parent.

# NOTE: the sequence of hist and dag_parent is organized by the order to record the experiment.
# So it may be different from the order of the loop_id.
# So we need an extra mapping to map the enqueue id back to the loop id.
self.idx2loop_id: dict[int, int] = {}

# Design discussion:
# - If we unifiy the loop_id and the enqueue id, we will have less recognition burden.
# - If we use different id for loop and enqueue, we don't have to handle the placeholder logic.
# END: graph structure -------------------------

# TODO: self.hist is 2-tuple now, remove hypothesis from it, change old code for this later.
self.knowledge_base: ASpecificKB | None = knowledge_base
Expand Down Expand Up @@ -227,9 +245,13 @@ def get_selection(self, trace: Trace) -> tuple[int, ...] | None:
checkpoint_idx represents the place where we want to create a new node.
the return value should be the idx of target node (the parent of the new generating node).
- `(-1, )` represents starting from the latest trial in the trace - default value

- NOTE: we don't encourage to use this option; It is confusing when we have multiple traces.

- `(idx, )` represents starting from the `idx`-th trial in the trace.
- `None` represents starting from scratch (start a new trace)


- More advanced selection strategies in `select.py`
"""

Expand Down
15 changes: 12 additions & 3 deletions rdagent/log/ui/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,22 @@ def trace_figure(trace: Trace):
for i in range(len(trace.dag_parent)):
levels[i] = len(trace.get_parents(i))

def get_display_name(idx: int):
"""
Convert to index in the queue (enque id) to loop_idx for easier understanding.
"""
if hasattr(trace, "idx2loop_id"):
# FIXME: only keep me after it is stable. Just for compatibility.
return f"L{trace.idx2loop_id[idx]}"
return f"L{idx}"

# Add nodes and edges
edges = []
for i, parents in enumerate(trace.dag_parent):
for parent in parents:
edges.append((f"L{parent}", f"L{i}"))
edges.append((get_display_name(parent), get_display_name(i)))
if len(parents) == 0:
G.add_node(f"L{i}")
G.add_node(get_display_name(i))
G.add_edges_from(edges)

# Check if G is a path (a single line)
Expand Down Expand Up @@ -643,7 +652,7 @@ def trace_figure(trace: Trace):
# Group nodes by number of ancestors, fewer ancestors are higher up
layer_nodes = {}
for idx, lvl in levels.items():
layer_nodes.setdefault(lvl, []).append(f"L{idx}")
layer_nodes.setdefault(lvl, []).append(get_display_name(idx))

# Layout by level: y axis is -lvl, x axis is evenly distributed
pos = {}
Expand Down
8 changes: 6 additions & 2 deletions rdagent/scenarios/data_science/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ def record(self, prev_out: dict[str, Any]):

exp: DSExperiment = None

cur_loop_id = prev_out[self.LOOP_IDX_KEY]

e = prev_out.get(self.EXCEPTION_KEY, None)
if e is None:
exp = prev_out["running"]
Expand All @@ -210,18 +212,20 @@ def record(self, prev_out: dict[str, Any]):
# set the local selection to the trace as global selection, then set the DAG parent for the trace
if exp.local_selection is not None:
self.trace.set_current_selection(exp.local_selection)
self.trace.sync_dag_parent_and_hist((exp, prev_out["feedback"]))
self.trace.sync_dag_parent_and_hist((exp, prev_out["feedback"]), cur_loop_id)
else:
exp: DSExperiment = prev_out["direct_exp_gen"] if isinstance(e, CoderError) else prev_out["coding"]

# set the local selection to the trace as global selection, then set the DAG parent for the trace
if exp.local_selection is not None:
self.trace.set_current_selection(exp.local_selection)

self.trace.sync_dag_parent_and_hist(
(
exp,
ExperimentFeedback.from_exception(e),
)
),
cur_loop_id,
)

if self.trace.sota_experiment() is None:
Expand Down
16 changes: 5 additions & 11 deletions rdagent/scenarios/data_science/proposal/exp_gen/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,10 @@ def __str__(self) -> str:

class DSTrace(Trace[DataScienceScen, KnowledgeBase]):
def __init__(self, scen: DataScienceScen, knowledge_base: KnowledgeBase | None = None) -> None:
self.scen: DataScienceScen = scen
self.hist: list[tuple[DSExperiment, ExperimentFeedback]] = []
"""
The dag_parent is a list of tuples, each tuple is the parent index of the current node.
The first element of the tuple is the parent index, the rest are the parent indexes of the parent (not implemented yet).
If the current node is the root node without parent, the tuple is empty.
"""
self.dag_parent: list[tuple[int, ...]] = [] # List of tuples representing parent indices in the DAG structure.
# () represents no parent; (1,) presents one parent; (1, 2) represents two parents.
super().__init__(scen, knowledge_base)

self.knowledge_base = knowledge_base
self.current_selection: tuple[int, ...] = (-1,)
# NOTE: this line is just for linting.
self.hist: list[tuple[DSExperiment, ExperimentFeedback] | None] = []

self.sota_exp_to_submit: DSExperiment | None = None # grab the global best exp to submit

Expand Down Expand Up @@ -95,6 +87,7 @@ def get_leaves(self) -> list[int, ...]:
def sync_dag_parent_and_hist(
self,
exp_and_fb: tuple[Experiment, ExperimentFeedback],
cur_loop_id: int,
) -> None:
"""
Adding corresponding parent index to the dag_parent when the hist is going to be changed.
Expand All @@ -114,6 +107,7 @@ def sync_dag_parent_and_hist(

self.dag_parent.append((current_node_idx,))
self.hist.append(exp_and_fb)
self.idx2loop_id[len(self.hist) - 1] = cur_loop_id

def retrieve_search_list(
self,
Expand Down
8 changes: 7 additions & 1 deletion rdagent/utils/workflow/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class LoopBase:
] = () # you can define a list of error that will withdraw current loop

EXCEPTION_KEY = "_EXCEPTION"
LOOP_IDX_KEY = "_LOOP_IDX"
SENTINEL = -1

_pbar: tqdm # progress bar instance
Expand All @@ -113,7 +114,9 @@ def __init__(self) -> None:
self.step_idx: defaultdict[int, int] = defaultdict(int) # dict from loop index to next step index
self.queue: asyncio.Queue[Any] = asyncio.Queue()

# Store step results for all loops in a nested dictionary: loop_prev_out[loop_index][step_name]
# Store step results for all loops in a nested dictionary, following information will be stored:
# - loop_prev_out[loop_index][step_name]: the output of the step function
# - loop_prev_out[loop_index][<special keys like LOOP_IDX_KEY or EXCEPTION_KEY>]: the special keys
self.loop_prev_out: dict[int, dict[str, Any]] = defaultdict(dict)
self.loop_trace = defaultdict(list[LoopTrace]) # the key is the number of loop
self.session_folder = Path(LOG_SETTINGS.trace_path) / "__session__"
Expand Down Expand Up @@ -213,6 +216,9 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None:

next_step_idx = si + 1
step_forward = True
# NOTE: each step are aware are of current loop index
# It is very important to set it before calling the step function!
self.loop_prev_out[li][self.LOOP_IDX_KEY] = li
try:
# Call function with current loop's output, await if coroutine or use ProcessPoolExecutor for sync if required
if force_subproc:
Expand Down