Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 11 additions & 34 deletions multimodal/tarko/agent-server-next/src/controllers/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ export async function getAllSessions(c: HonoContext) {
try {
const server = c.get('server');

if (!server.storageProvider) {
throw new Error('no storage provider!');
}

let sessions: SessionInfo[];

// In multi-tenant mode, only get user's sessions
Expand All @@ -29,10 +25,10 @@ export async function getAllSessions(c: HonoContext) {
return c.json({ error: 'Authentication required' }, 401);
}

sessions = await server.storageProvider.getUserSessions(userId);
sessions = await server.daoFactory.getUserSessions(userId);
} else {
// Single tenant mode: get all sessions
sessions = await server.storageProvider.getAllSessions();
sessions = await server.daoFactory.getAllSessions();
}

filterSessionModel(sessions);
Expand Down Expand Up @@ -88,8 +84,8 @@ export async function getSessionDetails(c: HonoContext) {
return c.json({ error: 'Session not found' }, 404);
}

if (server.storageProvider && sessionId) {
const sessionInfo = await server.storageProvider.getSessionInfo(sessionId);
if (sessionId) {
const sessionInfo = await server.daoFactory.getSessionInfo(sessionId);

sessionInfo && filterSessionModel([sessionInfo]);

Expand Down Expand Up @@ -124,11 +120,7 @@ export async function getSessionEvents(c: HonoContext) {
return c.json({ error: 'Session ID is required' }, 400);
}

if (!server.storageProvider) {
return c.json({ error: 'Storage not configured' }, 503);
}

const events = await server.storageProvider.getSessionEvents(sessionId);
const events = await server.daoFactory.getSessionEvents(sessionId);

return c.json({ events }, 200);
} catch (error) {
Expand All @@ -149,11 +141,7 @@ export async function getLatestSessionEvents(c: HonoContext) {
return c.json({ error: 'Session ID is required' }, 400);
}

if (!server.storageProvider) {
return c.json({ error: 'Storage not configured' }, 503);
}

const events = await server.storageProvider.getSessionEvents(sessionId);
const events = await server.daoFactory.getSessionEvents(sessionId);

return c.json({ events }, 200);
} catch (error) {
Expand Down Expand Up @@ -209,17 +197,13 @@ export async function updateSession(c: HonoContext) {
if (!session) {
return c.json({ error: 'Session not found' }, 404);
}

if (!server.storageProvider) {
return c.json({ error: 'Storage not configured' }, 503);
}

const sessionInfo = await server.storageProvider.getSessionInfo(sessionId);

const sessionInfo = await server.daoFactory.getSessionInfo(sessionId);
if (!sessionInfo) {
return c.json({ error: 'Session not found' }, 404);
}

const updatedMetadata = await server.storageProvider.updateSessionInfo(sessionId, {
const updatedMetadata = await server.daoFactory.updateSessionInfo(sessionId, {
metadata: {
...sessionInfo.metadata,
...metadataUpdates,
Expand Down Expand Up @@ -258,14 +242,7 @@ export async function deleteSession(c: HonoContext) {
delete server.storageUnsubscribes[sessionId];
}

// Delete from storage if available
if (server.storageProvider) {
try {
await server.storageProvider.deleteSession(sessionId);
} catch (error) {
console.warn(`Failed to delete session ${sessionId} from storage:`, error);
}
}
await server.daoFactory.deleteSession(sessionId);

return c.json({ success: true, message: 'Session deleted successfully' }, 200);
} catch (error) {
Expand Down Expand Up @@ -331,7 +308,7 @@ export async function shareSession(c: HonoContext) {

try {
const server = c.get('server');
const shareService = new ShareService(server.appConfig, server.storageProvider, server);
const shareService = new ShareService(server.appConfig, server.daoFactory, server);

// Get agent instance if session is active (for slug generation)
const agent = server.getSessionPool().get(sessionId)?.agent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ import type { HonoContext } from '../types';
*/
export function getShareConfig(c: HonoContext) {
const server = c.get('server');
const shareService = new ShareService(server.appConfig, server.storageProvider, server);
const shareService = new ShareService(server.appConfig, server.daoFactory, server);
return c.json(shareService.getShareConfig(), 200);
}
4 changes: 2 additions & 2 deletions multimodal/tarko/agent-server-next/src/controllers/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ export async function updateSessionModel(c: HonoContext) {

try {
// Get current session metadata
const currentSessionInfo = await server.storageProvider.getSessionInfo(sessionId);
const currentSessionInfo = await server.daoFactory.getSessionInfo(sessionId);
if (!currentSessionInfo) {
return c.json({ error: 'Session not found' }, 404);
}

// Update metadata with new model config
const updatedSessionInfo = await server.storageProvider.updateSessionInfo(sessionId, {
const updatedSessionInfo = await server.daoFactory.updateSessionInfo(sessionId, {
metadata: {
...currentSessionInfo.metadata,
modelConfig: model,
Expand Down
35 changes: 30 additions & 5 deletions multimodal/tarko/agent-server-next/src/dao/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,50 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { MongoDBAgentStorageImplementation, SqliteAgentStorageImplementation } from '@tarko/interface';
import { AgentStorageImplementation, MongoDBAgentStorageImplementation, SqliteAgentStorageImplementation } from '@tarko/interface';
import { IDAOFactory, StorageBackend } from './interfaces/IDAOFactory';
import { MongoDAOFactory } from './mongodb/MongoDAOFactory';
import { SQLiteDAOFactory } from './sqlite/SQLiteDAOFactory';

/**
* Factory function to create appropriate DAO factory based on configuration
* @deprecated Use createDAOFactory(config) instead
*/
export function createDAOFactory(
backend: StorageBackend,
config: MongoDBAgentStorageImplementation | SqliteAgentStorageImplementation,
): IDAOFactory;

/**
* Create a DAO factory based on configuration (replaces createStorageProvider)
*/
export function createDAOFactory(config: AgentStorageImplementation): IDAOFactory;

export function createDAOFactory(
configOrBackend: AgentStorageImplementation | StorageBackend,
config?: MongoDBAgentStorageImplementation | SqliteAgentStorageImplementation,
): IDAOFactory {
switch (backend) {
// Handle old signature for backward compatibility
if (typeof configOrBackend === 'string' && config) {
switch (configOrBackend) {
case 'mongodb':
return new MongoDAOFactory(config as MongoDBAgentStorageImplementation);
case 'sqlite':
return new SQLiteDAOFactory(config as SqliteAgentStorageImplementation);
default:
throw new Error(`Unsupported storage backend: ${configOrBackend}`);
}
}

// Handle new signature
const storageConfig = configOrBackend as AgentStorageImplementation;
switch (storageConfig.type) {
case 'mongodb':
return new MongoDAOFactory(config as MongoDBAgentStorageImplementation);
return new MongoDAOFactory(storageConfig as MongoDBAgentStorageImplementation);
case 'sqlite':
return new SQLiteDAOFactory(config as SqliteAgentStorageImplementation);
return new SQLiteDAOFactory(storageConfig as SqliteAgentStorageImplementation);
default:
throw new Error(`Unsupported storage backend: ${backend}`);
throw new Error(`Unsupported storage type: ${(storageConfig as any).type}`);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,26 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { AgentEventStream, SessionInfo } from '@tarko/interface';
import { IUserConfigDAO } from './IUserConfigDAO';
import { ISessionDAO } from './ISessionDAO';
import { IEventDAO } from './IEventDAO';
import { ISandboxAllocationDAO } from './ISandboxAllocationDAO';


/**
* DAO Factory interface
* Provides abstraction for creating and managing DAO instances
* Supports different storage backends (MongoDB, SQLite, etc.)
*
* Now includes StorageProvider functionality for unified data access
*/
export interface IDAOFactory {
/**
* DB path (for SQLite backends)
*/
dbPath?: string;

/**
* Initialize the DAO factory and underlying connections
*/
Expand All @@ -28,12 +37,66 @@ export interface IDAOFactory {
* Close all connections and cleanup resources
*/
close(): Promise<void>;

// DAO factory methods
getUserConfigDAO(): IUserConfigDAO;
getSessionDAO(): ISessionDAO;
getEventDAO(): IEventDAO;
getSandboxAllocationDAO(): ISandboxAllocationDAO;
healthCheck(): Promise<{ healthy: boolean; message?: string; [key: string]: any }>;

// StorageProvider methods - high-level data operations
/**
* Create a new session with metadata
* @param metadata Session metadata
*/
createSession(metadata: SessionInfo): Promise<SessionInfo>;

/**
* Update session metadata
* @param sessionId Session ID
* @param sessionInfo Partial session info data to update
*/
updateSessionInfo(
sessionId: string,
sessionInfo: Partial<Omit<SessionInfo, 'id'>>,
): Promise<SessionInfo>;

/**
* Get session metadata
* @param sessionId Session ID
*/
getSessionInfo(sessionId: string): Promise<SessionInfo | null>;

/**
* Get all sessions metadata
*/
getAllSessions(): Promise<SessionInfo[]>;

/**
* Get all sessions for a specific user (multi-tenant)
* @param userId User ID
*/
getUserSessions(userId: string): Promise<SessionInfo[]>;

/**
* Delete a session and all its events
* @param sessionId Session ID
*/
deleteSession(sessionId: string): Promise<boolean>;

/**
* Save an event to a session
* @param sessionId Session ID
* @param event Event to save
*/
saveEvent(sessionId: string, event: AgentEventStream.Event): Promise<void>;

/**
* Get all events for a session
* @param sessionId Session ID
*/
getSessionEvents(sessionId: string): Promise<AgentEventStream.Event[]>;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { Connection, Model } from 'mongoose';
import { AgentEventStream } from '@tarko/interface';
import { IEventDAO } from '../interfaces/IEventDAO';
import { EventDocument } from '../../storage/MongoDBStorageProvider/MongoDBSchemas';
import { EventDocument } from './MongoDBSchemas';
import { getLogger } from '../../utils/logger';

const logger = getLogger('EventDAO');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

import mongoose, { Connection, ConnectOptions } from 'mongoose';
import { MongoDBAgentStorageImplementation } from '@tarko/interface';
import { AgentEventStream, MongoDBAgentStorageImplementation, SessionInfo } from '@tarko/interface';
import {
IDAOFactory,
IUserConfigDAO,
Expand All @@ -21,7 +21,7 @@ import {
EventModel,
UserConfigModel,
SandboxAllocationModel,
} from '../../storage/MongoDBStorageProvider/MongoDBSchemas';
} from './MongoDBSchemas';
import { getLogger } from '../../utils/logger';

const logger = getLogger('MongoDAOFactory');
Expand Down Expand Up @@ -180,6 +180,54 @@ export class MongoDAOFactory implements IDAOFactory {
}
}

// StorageProvider methods - delegate to DAOs
async createSession(metadata: SessionInfo): Promise<SessionInfo> {
return this.getSessionDAO().createSession(metadata);
}

async updateSessionInfo(
sessionId: string,
sessionInfo: Partial<Omit<SessionInfo, 'id'>>,
): Promise<SessionInfo> {
return this.getSessionDAO().updateSessionInfo(sessionId, sessionInfo);
}

async getSessionInfo(sessionId: string): Promise<SessionInfo | null> {
return this.getSessionDAO().getSessionInfo(sessionId);
}

async getAllSessions(): Promise<SessionInfo[]> {
return this.getSessionDAO().getAllSessions();
}

async getUserSessions(userId: string): Promise<SessionInfo[]> {
return this.getSessionDAO().getUserSessions(userId);
}

async deleteSession(sessionId: string): Promise<boolean> {
// Delete events first, then session
await this.getEventDAO().deleteSessionEvents(sessionId);
return this.getSessionDAO().deleteSession(sessionId);
}

async saveEvent(sessionId: string, event: AgentEventStream.Event): Promise<void> {
// Check if session exists first
const sessionExists = await this.getSessionDAO().sessionExists(sessionId);
if (!sessionExists) {
throw new Error(`Session not found: ${sessionId}`);
}

// Save the event
await this.getEventDAO().saveEvent(sessionId, event);

// Update session timestamp
await this.getSessionDAO().updateSessionTimestamp(sessionId);
}

async getSessionEvents(sessionId: string): Promise<AgentEventStream.Event[]> {
return this.getEventDAO().getSessionEvents(sessionId);
}

private ensureInitialized(): void {
if (!this.initialized || !this.connection) {
throw new Error('MongoDB DAO Factory not initialized. Call initialize() first.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { Connection, Model } from 'mongoose';
import { ISandboxAllocationDAO, SandboxAllocation } from '../interfaces/ISandboxAllocationDAO';
import { SandboxAllocationDocument } from '../../storage/MongoDBStorageProvider/MongoDBSchemas';
import { SandboxAllocationDocument } from './MongoDBSchemas';
import { getLogger } from '../../utils/logger';
import { ILogger } from '../../types';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { Connection, Model } from 'mongoose';
import { SessionInfo } from '@tarko/interface';
import { ISessionDAO } from '../interfaces/ISessionDAO';
import { SessionDocument } from '../../storage/MongoDBStorageProvider/MongoDBSchemas';
import { SessionDocument } from './MongoDBSchemas';
import { getLogger } from '../../utils/logger';
import { ILogger } from '../../types';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { Connection, Model } from 'mongoose';
import { IUserConfigDAO, UserConfig, UserConfigInfo } from '../interfaces/IUserConfigDAO';
import { UserConfigDocument } from '../../storage/MongoDBStorageProvider/MongoDBSchemas';
import { UserConfigDocument } from './MongoDBSchemas';
import { getLogger } from '../../utils/logger';

const logger = getLogger('UserConfigDAO');
Expand Down
Loading