@@ -221,6 +221,7 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None:
221221 # NOTE: each step are aware are of current loop index
222222 # It is very important to set it before calling the step function!
223223 self .loop_prev_out [li ][self .LOOP_IDX_KEY ] = li
224+
224225 try :
225226 # Call function with current loop's output, await if coroutine or use ProcessPoolExecutor for sync if required
226227 if force_subproc :
@@ -236,9 +237,6 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None:
236237 result = func (self .loop_prev_out [li ])
237238 # Store result in the nested dictionary
238239 self .loop_prev_out [li ][name ] = result
239-
240- # Save snapshot after completing the step
241- self .dump (self .session_folder / f"{ li } " / f"{ si } _{ name } " )
242240 except Exception as e :
243241 if isinstance (e , self .skip_loop_error ):
244242 logger .warning (f"Skip loop { li } due to { e } " )
@@ -256,6 +254,8 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None:
256254 else :
257255 raise # re-raise unhandled exceptions
258256 finally :
257+ # No matter the execution succeed or not, we have to finish the following steps
258+
259259 # Record the trace
260260 end = datetime .now (timezone .utc )
261261 self .loop_trace [li ].append (LoopTrace (start , end , step_idx = si ))
@@ -279,6 +279,12 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None:
279279 step_index = next_step ,
280280 step_name = self .steps [next_step ],
281281 )
282+
283+ # Save snapshot after completing the step;
284+ # 1) It has to be after the step_idx is updated, so loading the snapshot will be on the right step.
285+ # 2) Only save it when the step forward, withdraw does not worth saving.
286+ self .dump (self .session_folder / f"{ li } " / f"{ si } _{ name } " )
287+
282288 self ._check_exit_conditions_on_step (loop_id = li , step_id = si )
283289 else :
284290 logger .warning (f"Step forward { si } of loop { li } is skipped." )
0 commit comments