Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ export async function createSession(c: HonoContext) {
try {
const server = c.get('server');
const sessionFactory = server.getSessionFactory();
const sessionManager = server.getSessionPool();
const sessionPool = server.getSessionPool();

const { session, sessionInfo, storageUnsubscribe } = await sessionFactory.createSession(c);
const { session, events, sessionInfo, storageUnsubscribe } = await sessionFactory.createSession(c);

sessionManager.set(session.id, session);
sessionPool.set(session.id, session);

// Save unsubscribe function for cleanup
if (storageUnsubscribe) {
Expand All @@ -62,6 +62,7 @@ export async function createSession(c: HonoContext) {
{
sessionId: session.id,
session: sessionInfo,
events
},
201,
);
Expand Down
1 change: 0 additions & 1 deletion multimodal/tarko/agent-server-next/src/routes/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ export function createSessionRoutes(): Hono<{ Variables: ContextVariables }> {
router.use('/api/v1/sessions/generate-summary', sessionRestoreMiddleware);
router.use('/api/v1/sessions/share', sessionRestoreMiddleware);
router.use('/api/v1/sessions/workspace/*', sessionRestoreMiddleware);
router.use('/api/v1/sessions/runtime-settings', sessionRestoreMiddleware);

// Session-specific routes
router.get('/api/v1/sessions/details', sessionsController.getSessionDetails);
Expand Down
4 changes: 4 additions & 0 deletions multimodal/tarko/agent-server-next/src/routes/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
import { Hono } from 'hono';
import * as systemController from '../controllers/system';
import type { ContextVariables } from '../types';
import { sessionRestoreMiddleware } from '../middlewares';

/**
* Create system information routes
*/
export function createSystemRoutes(): Hono<{ Variables: ContextVariables }> {
const router = new Hono<{ Variables: ContextVariables }>();

router.use('/api/v1/runtime-settings', sessionRestoreMiddleware);


// Health check endpoint
router.get('/api/v1/health', systemController.healthCheck);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,62 @@ export class AgentSession {
private agioProviderConstructor?: AgioProviderConstructor;
public sessionInfo?: SessionInfo;
private logger: ILogger;
private storageUnsubscribeMap = new WeakMap<IAgent, () => void>();
private pendingEventSaves = new Set<Promise<void>>();


constructor(
private server: AgentServer,
sessionId: string,
agioProviderImpl?: AgioProviderConstructor,
sessionInfo?: SessionInfo,
private agentOptions?: Record<string, any>, // One-time agent initialization options
) {
this.id = sessionId;
this.eventBridge = new EventStreamBridge();
this.sessionInfo = sessionInfo;
this.agioProviderConstructor = agioProviderImpl;
this.logger = getLogger('AgentSession');

// Agent will be created and initialized in initialize() method
this.agent = null as any; // Temporary placeholder
}


async initialize() {
// Create and initialize agent with all wrappers
// Event streams are now set up within createAndInitializeAgent before agent.initialize()
this.agent = await this.createAndInitializeAgent(this.sessionInfo);

// Extract the storage unsubscribe function from our WeakMap
const storageUnsubscribe = this.storageUnsubscribeMap.get(this.agent);

// Clean up the WeakMap entry
this.storageUnsubscribeMap.delete(this.agent);

// Notify client that session is ready
this.eventBridge.emit('ready', { sessionId: this.id });

return { storageUnsubscribe };
}


/**
* Create event handler for storage and AGIO processing
*/
private createEventHandler() {
return async (event: AgentEventStream.Event) => {
// Save to storage if available and event should be stored
if (shouldStoreEvent(event)) {
try {
await this.server.daoFactory.saveEvent(this.id, event);
} catch (error) {
console.error(`Failed to save event to storage: ${error}`);
}
const savePromise = this.server.daoFactory.saveEvent(this.id, event)
.catch(error => {
console.error(`Failed to save event to storage: ${error}`);
})
.finally(() => {
this.pendingEventSaves.delete(savePromise);
});

this.pendingEventSaves.add(savePromise);
}

// Process AGIO events if collector is configured
Expand All @@ -98,23 +142,14 @@ export class AgentSession {
};
}

/**
* Setup event stream connections for storage and client communication
/**
* Wait for all pending event saves to complete
* This ensures that all events emitted during initialization are persisted before querying storage
*/
private setupEventStreams() {
const agentEventStream = this.agent.getEventStream();
const handleEvent = this.createEventHandler();

// Subscribe to events for storage and AGIO processing
const storageUnsubscribe = agentEventStream.subscribe(handleEvent);

// Connect to event bridge for client communication
if (this.unsubscribe) {
this.unsubscribe();
async waitForEventSavesToComplete(): Promise<void> {
if (this.pendingEventSaves.size > 0) {
await Promise.all(Array.from(this.pendingEventSaves));
}
this.unsubscribe = this.eventBridge.connectToAgentEventStream(agentEventStream);

return { storageUnsubscribe };
}

/**
Expand All @@ -138,7 +173,7 @@ export class AgentSession {
name: this.server.getCurrentAgentName(),
model: this.resolveModelConfig(sessionInfo),
sandboxUrl: sessionInfo?.metadata?.sandboxUrl,
initialEvents: storedEvents, // 🎯 Pass initial events directly to agent
initialEvents: storedEvents, // Pass initial events directly to agent
};

// Apply runtime settings transformation if available
Expand Down Expand Up @@ -166,7 +201,23 @@ export class AgentSession {
// Apply snapshot wrapper if enabled
const wrappedAgent = this.createAgentWithSnapshot(baseAgent, this.id);


// 🎯 Setup event stream connections BEFORE agent initialization
// This ensures that any events emitted during initialize() are properly persisted
const agentEventStream = wrappedAgent.getEventStream();
const handleEvent = this.createEventHandler();

// Subscribe to events for storage and AGIO processing before initialization
const storageUnsubscribe = agentEventStream.subscribe(handleEvent);

// Connect to event bridge for client communication before initialization
if (this.unsubscribe) {
this.unsubscribe();
}
this.unsubscribe = this.eventBridge.connectToAgentEventStream(agentEventStream);

// Initialize the agent (this will automatically restore events)
// Now any events emitted during initialize() will be properly persisted
await wrappedAgent.initialize();

// Initialize AGIO collector if provider URL is configured
Expand Down Expand Up @@ -265,22 +316,6 @@ export class AgentSession {
return baseAgent;
}

constructor(
private server: AgentServer,
sessionId: string,
agioProviderImpl?: AgioProviderConstructor,
sessionInfo?: SessionInfo,
private agentOptions?: Record<string, any>, // One-time agent initialization options
) {
this.id = sessionId;
this.eventBridge = new EventStreamBridge();
this.sessionInfo = sessionInfo;
this.agioProviderConstructor = agioProviderImpl;
this.logger = getLogger('AgentSession');

// Agent will be created and initialized in initialize() method
this.agent = null as any; // Temporary placeholder
}

/**
* Get the current processing status of the agent
Expand All @@ -290,19 +325,6 @@ export class AgentSession {
return this.agent.status() === AgentStatus.EXECUTING;
}

async initialize() {
// Create and initialize agent with all wrappers
this.agent = await this.createAndInitializeAgent(this.sessionInfo);

// Setup event stream connections
const { storageUnsubscribe } = this.setupEventStreams();

// Notify client that session is ready
this.eventBridge.emit('ready', { sessionId: this.id });

return { storageUnsubscribe };
}

/**
* Run a query and return a strongly-typed response
* This version captures errors and returns structured response objects
Expand Down Expand Up @@ -522,9 +544,6 @@ export class AgentSession {

// Create and initialize new agent with updated session info
this.agent = await this.createAndInitializeAgent(sessionInfo);

// Reconnect event streams
this.setupEventStreams();
} catch (error) {
console.error('Failed to recreate agent for session', { sessionId: this.id, error });
throw error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ export class AgentSessionFactory {
session: AgentSession;
sessionInfo?: SessionInfo;
storageUnsubscribe?: () => void;
events: any[]
}> {
const sessionId = nanoid();
const user = getCurrentUser(c);
const server = c.get('server');

// Get runtimeSettings and agentOptions from request body
const body = await c.req.json().catch(() => ({}));

const { runtimeSettings, agentOptions } = body as {
runtimeSettings?: Record<string, any>;
agentOptions?: Record<string, any>;
Expand Down Expand Up @@ -129,10 +131,20 @@ export class AgentSessionFactory {
}
}

// Wait a short time to ensure all initialization events are persisted
// This handles the async nature of event storage during agent initialization
await session.waitForEventSavesToComplete();

// Get events that were created during agent initialization
let initializationEvents = await server.daoFactory.getSessionEvents(sessionId);

console.log('Return initializationEvents', initializationEvents);

return {
session,
sessionInfo: savedSessionInfo,
storageUnsubscribe,
events: initializationEvents,
};
}

Expand Down
65 changes: 35 additions & 30 deletions multimodal/tarko/agent-server/src/core/AgentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,41 @@
private storageUnsubscribeMap = new WeakMap<IAgent, () => void>();
private pendingEventSaves = new Set<Promise<void>>();


constructor(
private server: AgentServer,
sessionId: string,
agioProviderImpl?: AgioProviderConstructor,
sessionInfo?: SessionInfo,
private agentOptions?: Record<string, any>, // One-time agent initialization options
) {
this.id = sessionId;
this.eventBridge = new EventStreamBridge();
this.sessionInfo = sessionInfo;
this.agioProviderConstructor = agioProviderImpl;

// Agent will be created and initialized in initialize() method
this.agent = null as any; // Temporary placeholder
}


async initialize() {
// Create and initialize agent with all wrappers
// Event streams are now set up within createAndInitializeAgent before agent.initialize()
this.agent = await this.createAndInitializeAgent(this.sessionInfo);

// Extract the storage unsubscribe function from our WeakMap
const storageUnsubscribe = this.storageUnsubscribeMap.get(this.agent);

// Clean up the WeakMap entry
this.storageUnsubscribeMap.delete(this.agent);

// Notify client that session is ready
this.eventBridge.emit('ready', { sessionId: this.id });

return { storageUnsubscribe };
}

/**
* Create event handler for storage and AGIO processing
*/
Expand Down Expand Up @@ -200,7 +235,7 @@
console.debug('AGIO collector initialized', { provider: agentOptions.agio.provider });
}

console.info('Agent Config', JSON.stringify(wrappedAgent.getOptions(), null, 2));

Check failure on line 238 in multimodal/tarko/agent-server/src/core/AgentSession.ts

View workflow job for this annotation

GitHub Actions / Agent TARS Build

tests/core/agent-session-restore.test.ts > AgentSession - Context Restore (Simplest Implementation) > should handle empty storage gracefully

TypeError: wrappedAgent.getOptions is not a function ❯ AgentSession.createAndInitializeAgent src/core/AgentSession.ts:238:62 ❯ AgentSession.initialize src/core/AgentSession.ts:97:18 ❯ tests/core/agent-session-restore.test.ts:180:5

Check failure on line 238 in multimodal/tarko/agent-server/src/core/AgentSession.ts

View workflow job for this annotation

GitHub Actions / Agent TARS Build

tests/core/agent-session-restore.test.ts > AgentSession - Context Restore (Simplest Implementation) > should not duplicate events when updating model config

TypeError: wrappedAgent.getOptions is not a function ❯ AgentSession.createAndInitializeAgent src/core/AgentSession.ts:238:62 ❯ AgentSession.initialize src/core/AgentSession.ts:97:18 ❯ tests/core/agent-session-restore.test.ts:125:5

Check failure on line 238 in multimodal/tarko/agent-server/src/core/AgentSession.ts

View workflow job for this annotation

GitHub Actions / Agent TARS Build

tests/core/agent-session-restore.test.ts > AgentSession - Context Restore (Simplest Implementation) > should restore events from storage when recreating agent

TypeError: wrappedAgent.getOptions is not a function ❯ AgentSession.createAndInitializeAgent src/core/AgentSession.ts:238:62 ❯ AgentSession.initialize src/core/AgentSession.ts:97:18 ❯ tests/core/agent-session-restore.test.ts:87:5

// Store the storage unsubscribe function for later cleanup
// We'll use a WeakMap to avoid polluting the agent object
Expand Down Expand Up @@ -262,21 +297,7 @@
return baseAgent;
}

constructor(
private server: AgentServer,
sessionId: string,
agioProviderImpl?: AgioProviderConstructor,
sessionInfo?: SessionInfo,
private agentOptions?: Record<string, any>, // One-time agent initialization options
) {
this.id = sessionId;
this.eventBridge = new EventStreamBridge();
this.sessionInfo = sessionInfo;
this.agioProviderConstructor = agioProviderImpl;

// Agent will be created and initialized in initialize() method
this.agent = null as any; // Temporary placeholder
}

/**
* Get the current processing status of the agent
Expand All @@ -286,22 +307,6 @@
return this.agent.status() === AgentStatus.EXECUTING;
}

async initialize() {
// Create and initialize agent with all wrappers
// Event streams are now set up within createAndInitializeAgent before agent.initialize()
this.agent = await this.createAndInitializeAgent(this.sessionInfo);

// Extract the storage unsubscribe function from our WeakMap
const storageUnsubscribe = this.storageUnsubscribeMap.get(this.agent);

// Clean up the WeakMap entry
this.storageUnsubscribeMap.delete(this.agent);

// Notify client that session is ready
this.eventBridge.emit('ready', { sessionId: this.id });

return { storageUnsubscribe };
}

/**
* Run a query and return a strongly-typed response
Expand Down
Loading