Skip to content

Commit 2602b10

Browse files
authored
feat(tarko-agent-next): sync code from agent-server (#1663)
1 parent 008124a commit 2602b10

File tree

6 files changed

+127
-87
lines changed

6 files changed

+127
-87
lines changed

multimodal/tarko/agent-server-next/src/controllers/sessions.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ export async function createSession(c: HonoContext) {
4747
try {
4848
const server = c.get('server');
4949
const sessionFactory = server.getSessionFactory();
50-
const sessionManager = server.getSessionPool();
50+
const sessionPool = server.getSessionPool();
5151

52-
const { session, sessionInfo, storageUnsubscribe } = await sessionFactory.createSession(c);
52+
const { session, events, sessionInfo, storageUnsubscribe } = await sessionFactory.createSession(c);
5353

54-
sessionManager.set(session.id, session);
54+
sessionPool.set(session.id, session);
5555

5656
// Save unsubscribe function for cleanup
5757
if (storageUnsubscribe) {
@@ -62,6 +62,7 @@ export async function createSession(c: HonoContext) {
6262
{
6363
sessionId: session.id,
6464
session: sessionInfo,
65+
events
6566
},
6667
201,
6768
);

multimodal/tarko/agent-server-next/src/routes/sessions.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ export function createSessionRoutes(): Hono<{ Variables: ContextVariables }> {
2828
router.use('/api/v1/sessions/generate-summary', sessionRestoreMiddleware);
2929
router.use('/api/v1/sessions/share', sessionRestoreMiddleware);
3030
router.use('/api/v1/sessions/workspace/*', sessionRestoreMiddleware);
31-
router.use('/api/v1/sessions/runtime-settings', sessionRestoreMiddleware);
3231

3332
// Session-specific routes
3433
router.get('/api/v1/sessions/details', sessionsController.getSessionDetails);

multimodal/tarko/agent-server-next/src/routes/system.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
import { Hono } from 'hono';
77
import * as systemController from '../controllers/system';
88
import type { ContextVariables } from '../types';
9+
import { sessionRestoreMiddleware } from '../middlewares';
910

1011
/**
1112
* Create system information routes
1213
*/
1314
export function createSystemRoutes(): Hono<{ Variables: ContextVariables }> {
1415
const router = new Hono<{ Variables: ContextVariables }>();
1516

17+
router.use('/api/v1/runtime-settings', sessionRestoreMiddleware);
18+
19+
1620
// Health check endpoint
1721
router.get('/api/v1/health', systemController.healthCheck);
1822

multimodal/tarko/agent-server-next/src/services/session/AgentSession.ts

Lines changed: 72 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,62 @@ export class AgentSession {
7373
private agioProviderConstructor?: AgioProviderConstructor;
7474
public sessionInfo?: SessionInfo;
7575
private logger: ILogger;
76+
private storageUnsubscribeMap = new WeakMap<IAgent, () => void>();
77+
private pendingEventSaves = new Set<Promise<void>>();
78+
79+
80+
constructor(
81+
private server: AgentServer,
82+
sessionId: string,
83+
agioProviderImpl?: AgioProviderConstructor,
84+
sessionInfo?: SessionInfo,
85+
private agentOptions?: Record<string, any>, // One-time agent initialization options
86+
) {
87+
this.id = sessionId;
88+
this.eventBridge = new EventStreamBridge();
89+
this.sessionInfo = sessionInfo;
90+
this.agioProviderConstructor = agioProviderImpl;
91+
this.logger = getLogger('AgentSession');
92+
93+
// Agent will be created and initialized in initialize() method
94+
this.agent = null as any; // Temporary placeholder
95+
}
96+
97+
98+
async initialize() {
99+
// Create and initialize agent with all wrappers
100+
// Event streams are now set up within createAndInitializeAgent before agent.initialize()
101+
this.agent = await this.createAndInitializeAgent(this.sessionInfo);
102+
103+
// Extract the storage unsubscribe function from our WeakMap
104+
const storageUnsubscribe = this.storageUnsubscribeMap.get(this.agent);
105+
106+
// Clean up the WeakMap entry
107+
this.storageUnsubscribeMap.delete(this.agent);
108+
109+
// Notify client that session is ready
110+
this.eventBridge.emit('ready', { sessionId: this.id });
111+
112+
return { storageUnsubscribe };
113+
}
114+
115+
76116
/**
77117
* Create event handler for storage and AGIO processing
78118
*/
79119
private createEventHandler() {
80120
return async (event: AgentEventStream.Event) => {
81121
// Save to storage if available and event should be stored
82122
if (shouldStoreEvent(event)) {
83-
try {
84-
await this.server.daoFactory.saveEvent(this.id, event);
85-
} catch (error) {
86-
console.error(`Failed to save event to storage: ${error}`);
87-
}
123+
const savePromise = this.server.daoFactory.saveEvent(this.id, event)
124+
.catch(error => {
125+
console.error(`Failed to save event to storage: ${error}`);
126+
})
127+
.finally(() => {
128+
this.pendingEventSaves.delete(savePromise);
129+
});
130+
131+
this.pendingEventSaves.add(savePromise);
88132
}
89133

90134
// Process AGIO events if collector is configured
@@ -98,23 +142,14 @@ export class AgentSession {
98142
};
99143
}
100144

101-
/**
102-
* Setup event stream connections for storage and client communication
145+
/**
146+
* Wait for all pending event saves to complete
147+
* This ensures that all events emitted during initialization are persisted before querying storage
103148
*/
104-
private setupEventStreams() {
105-
const agentEventStream = this.agent.getEventStream();
106-
const handleEvent = this.createEventHandler();
107-
108-
// Subscribe to events for storage and AGIO processing
109-
const storageUnsubscribe = agentEventStream.subscribe(handleEvent);
110-
111-
// Connect to event bridge for client communication
112-
if (this.unsubscribe) {
113-
this.unsubscribe();
149+
async waitForEventSavesToComplete(): Promise<void> {
150+
if (this.pendingEventSaves.size > 0) {
151+
await Promise.all(Array.from(this.pendingEventSaves));
114152
}
115-
this.unsubscribe = this.eventBridge.connectToAgentEventStream(agentEventStream);
116-
117-
return { storageUnsubscribe };
118153
}
119154

120155
/**
@@ -138,7 +173,7 @@ export class AgentSession {
138173
name: this.server.getCurrentAgentName(),
139174
model: this.resolveModelConfig(sessionInfo),
140175
sandboxUrl: sessionInfo?.metadata?.sandboxUrl,
141-
initialEvents: storedEvents, // 🎯 Pass initial events directly to agent
176+
initialEvents: storedEvents, // Pass initial events directly to agent
142177
};
143178

144179
// Apply runtime settings transformation if available
@@ -166,7 +201,23 @@ export class AgentSession {
166201
// Apply snapshot wrapper if enabled
167202
const wrappedAgent = this.createAgentWithSnapshot(baseAgent, this.id);
168203

204+
205+
// 🎯 Setup event stream connections BEFORE agent initialization
206+
// This ensures that any events emitted during initialize() are properly persisted
207+
const agentEventStream = wrappedAgent.getEventStream();
208+
const handleEvent = this.createEventHandler();
209+
210+
// Subscribe to events for storage and AGIO processing before initialization
211+
const storageUnsubscribe = agentEventStream.subscribe(handleEvent);
212+
213+
// Connect to event bridge for client communication before initialization
214+
if (this.unsubscribe) {
215+
this.unsubscribe();
216+
}
217+
this.unsubscribe = this.eventBridge.connectToAgentEventStream(agentEventStream);
218+
169219
// Initialize the agent (this will automatically restore events)
220+
// Now any events emitted during initialize() will be properly persisted
170221
await wrappedAgent.initialize();
171222

172223
// Initialize AGIO collector if provider URL is configured
@@ -265,22 +316,6 @@ export class AgentSession {
265316
return baseAgent;
266317
}
267318

268-
constructor(
269-
private server: AgentServer,
270-
sessionId: string,
271-
agioProviderImpl?: AgioProviderConstructor,
272-
sessionInfo?: SessionInfo,
273-
private agentOptions?: Record<string, any>, // One-time agent initialization options
274-
) {
275-
this.id = sessionId;
276-
this.eventBridge = new EventStreamBridge();
277-
this.sessionInfo = sessionInfo;
278-
this.agioProviderConstructor = agioProviderImpl;
279-
this.logger = getLogger('AgentSession');
280-
281-
// Agent will be created and initialized in initialize() method
282-
this.agent = null as any; // Temporary placeholder
283-
}
284319

285320
/**
286321
* Get the current processing status of the agent
@@ -290,19 +325,6 @@ export class AgentSession {
290325
return this.agent.status() === AgentStatus.EXECUTING;
291326
}
292327

293-
async initialize() {
294-
// Create and initialize agent with all wrappers
295-
this.agent = await this.createAndInitializeAgent(this.sessionInfo);
296-
297-
// Setup event stream connections
298-
const { storageUnsubscribe } = this.setupEventStreams();
299-
300-
// Notify client that session is ready
301-
this.eventBridge.emit('ready', { sessionId: this.id });
302-
303-
return { storageUnsubscribe };
304-
}
305-
306328
/**
307329
* Run a query and return a strongly-typed response
308330
* This version captures errors and returns structured response objects
@@ -522,9 +544,6 @@ export class AgentSession {
522544

523545
// Create and initialize new agent with updated session info
524546
this.agent = await this.createAndInitializeAgent(sessionInfo);
525-
526-
// Reconnect event streams
527-
this.setupEventStreams();
528547
} catch (error) {
529548
console.error('Failed to recreate agent for session', { sessionId: this.id, error });
530549
throw error;

multimodal/tarko/agent-server-next/src/services/session/AgentSessionFactory.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ export class AgentSessionFactory {
4646
session: AgentSession;
4747
sessionInfo?: SessionInfo;
4848
storageUnsubscribe?: () => void;
49+
events: any[]
4950
}> {
5051
const sessionId = nanoid();
5152
const user = getCurrentUser(c);
5253
const server = c.get('server');
5354

5455
// Get runtimeSettings and agentOptions from request body
5556
const body = await c.req.json().catch(() => ({}));
57+
5658
const { runtimeSettings, agentOptions } = body as {
5759
runtimeSettings?: Record<string, any>;
5860
agentOptions?: Record<string, any>;
@@ -129,10 +131,20 @@ export class AgentSessionFactory {
129131
}
130132
}
131133

134+
// Wait a short time to ensure all initialization events are persisted
135+
// This handles the async nature of event storage during agent initialization
136+
await session.waitForEventSavesToComplete();
137+
138+
// Get events that were created during agent initialization
139+
let initializationEvents = await server.daoFactory.getSessionEvents(sessionId);
140+
141+
console.log('Return initializationEvents', initializationEvents);
142+
132143
return {
133144
session,
134145
sessionInfo: savedSessionInfo,
135146
storageUnsubscribe,
147+
events: initializationEvents,
136148
};
137149
}
138150

multimodal/tarko/agent-server/src/core/AgentSession.ts

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,41 @@ export class AgentSession {
7373
private storageUnsubscribeMap = new WeakMap<IAgent, () => void>();
7474
private pendingEventSaves = new Set<Promise<void>>();
7575

76+
77+
constructor(
78+
private server: AgentServer,
79+
sessionId: string,
80+
agioProviderImpl?: AgioProviderConstructor,
81+
sessionInfo?: SessionInfo,
82+
private agentOptions?: Record<string, any>, // One-time agent initialization options
83+
) {
84+
this.id = sessionId;
85+
this.eventBridge = new EventStreamBridge();
86+
this.sessionInfo = sessionInfo;
87+
this.agioProviderConstructor = agioProviderImpl;
88+
89+
// Agent will be created and initialized in initialize() method
90+
this.agent = null as any; // Temporary placeholder
91+
}
92+
93+
94+
async initialize() {
95+
// Create and initialize agent with all wrappers
96+
// Event streams are now set up within createAndInitializeAgent before agent.initialize()
97+
this.agent = await this.createAndInitializeAgent(this.sessionInfo);
98+
99+
// Extract the storage unsubscribe function from our WeakMap
100+
const storageUnsubscribe = this.storageUnsubscribeMap.get(this.agent);
101+
102+
// Clean up the WeakMap entry
103+
this.storageUnsubscribeMap.delete(this.agent);
104+
105+
// Notify client that session is ready
106+
this.eventBridge.emit('ready', { sessionId: this.id });
107+
108+
return { storageUnsubscribe };
109+
}
110+
76111
/**
77112
* Create event handler for storage and AGIO processing
78113
*/
@@ -262,21 +297,7 @@ export class AgentSession {
262297
return baseAgent;
263298
}
264299

265-
constructor(
266-
private server: AgentServer,
267-
sessionId: string,
268-
agioProviderImpl?: AgioProviderConstructor,
269-
sessionInfo?: SessionInfo,
270-
private agentOptions?: Record<string, any>, // One-time agent initialization options
271-
) {
272-
this.id = sessionId;
273-
this.eventBridge = new EventStreamBridge();
274-
this.sessionInfo = sessionInfo;
275-
this.agioProviderConstructor = agioProviderImpl;
276300

277-
// Agent will be created and initialized in initialize() method
278-
this.agent = null as any; // Temporary placeholder
279-
}
280301

281302
/**
282303
* Get the current processing status of the agent
@@ -286,22 +307,6 @@ export class AgentSession {
286307
return this.agent.status() === AgentStatus.EXECUTING;
287308
}
288309

289-
async initialize() {
290-
// Create and initialize agent with all wrappers
291-
// Event streams are now set up within createAndInitializeAgent before agent.initialize()
292-
this.agent = await this.createAndInitializeAgent(this.sessionInfo);
293-
294-
// Extract the storage unsubscribe function from our WeakMap
295-
const storageUnsubscribe = this.storageUnsubscribeMap.get(this.agent);
296-
297-
// Clean up the WeakMap entry
298-
this.storageUnsubscribeMap.delete(this.agent);
299-
300-
// Notify client that session is ready
301-
this.eventBridge.emit('ready', { sessionId: this.id });
302-
303-
return { storageUnsubscribe };
304-
}
305310

306311
/**
307312
* Run a query and return a strongly-typed response

0 commit comments

Comments
 (0)