Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/node/runtime/LocalBaseRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,19 @@ export abstract class LocalBaseRuntime implements Runtime {
}
}

async ensureDir(dirPath: string): Promise<void> {
const expandedPath = expandTilde(dirPath);
try {
await fsPromises.mkdir(expandedPath, { recursive: true });
} catch (err) {
throw new RuntimeErrorClass(
`Failed to create directory ${dirPath}: ${err instanceof Error ? err.message : String(err)}`,
"file_io",
err instanceof Error ? err : undefined
);
}
}

resolvePath(filePath: string): Promise<string> {
// Expand tilde to actual home directory path
const expanded = expandTilde(filePath);
Expand Down
15 changes: 15 additions & 0 deletions src/node/runtime/LocalRuntime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ describe("LocalRuntime", () => {
// Note: exec, stat, resolvePath, normalizePath are tested in the shared Runtime
// interface tests (tests/runtime/runtime.test.ts matrix)

describe("ensureDir", () => {
it("creates directories recursively", async () => {
const runtime = new LocalRuntime(testDir);

const dirPath = path.join(testDir, "ensure-dir", "a", "b", "c");
await runtime.ensureDir(dirPath);

const stat = await fs.stat(dirPath);
expect(stat.isDirectory()).toBe(true);

// Should be idempotent
await runtime.ensureDir(dirPath);
});
});

describe("tilde expansion in file operations", () => {
it("stat expands tilde paths", async () => {
const runtime = new LocalRuntime(testDir);
Expand Down
26 changes: 26 additions & 0 deletions src/node/runtime/RemoteRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,32 @@ export abstract class RemoteRuntime implements Runtime {
return `mkdir -p $(dirname ${quotedPath}) && cat > ${quotedTempPath} && mv ${quotedTempPath} ${quotedPath}`;
}

/**
* Ensure a directory exists (mkdir -p semantics).
*/
async ensureDir(dirPath: string): Promise<void> {
const stream = await this.exec(`mkdir -p ${this.quoteForRemote(dirPath)}`, {
cwd: "/",
timeout: 10,
});

await stream.stdin.close();

const [stdout, stderr, exitCode] = await Promise.all([
streamToString(stream.stdout),
streamToString(stream.stderr),
stream.exitCode,
]);

if (exitCode !== 0) {
const extra = stderr.trim() || stdout.trim();
throw new RuntimeError(
`Failed to create directory ${dirPath}: exit code ${exitCode}${extra ? `: ${extra}` : ""}`,
"file_io"
);
}
}

/**
* Get file statistics via exec.
*/
Expand Down
8 changes: 8 additions & 0 deletions src/node/runtime/Runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ export interface Runtime {
*/
stat(path: string, abortSignal?: AbortSignal): Promise<FileStat>;

/**
* Ensure a directory exists (mkdir -p semantics).
*
* This intentionally lives on the Runtime abstraction so local runtimes can use
* Node fs APIs (Windows-safe) while remote runtimes can use shell commands.
*/
ensureDir(path: string): Promise<void>;

/**
* Resolve a path to its absolute, canonical form (expanding tildes, resolving symlinks, etc.).
* This is used at workspace creation time to normalize srcBaseDir paths in config.
Expand Down
14 changes: 6 additions & 8 deletions src/node/services/backgroundProcessExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
buildTerminateCommand,
shellQuote,
} from "@/node/runtime/backgroundCommands";
import { execBuffered } from "@/node/utils/runtime/helpers";
import { execBuffered, writeFileString } from "@/node/utils/runtime/helpers";
import { NON_INTERACTIVE_ENV_VARS } from "@/common/constants/env";
import { toPosixPath } from "@/node/utils/paths";

Expand Down Expand Up @@ -140,15 +140,13 @@ export async function spawnProcess(
);

// Create output directory and empty file
const mkdirResult = await execBuffered(
runtime,
`mkdir -p ${quotePath(outputDir)} && touch ${quotePath(outputPath)}`,
{ cwd: FALLBACK_CWD, timeout: 30 }
);
if (mkdirResult.exitCode !== 0) {
try {
await runtime.ensureDir(outputDir);
await writeFileString(runtime, outputPath, "");
} catch (error) {
return {
success: false,
error: `Failed to create output directory: ${mkdirResult.stderr}`,
error: `Failed to create output directory: ${errorMsg(error)}`,
};
}

Expand Down
46 changes: 46 additions & 0 deletions src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { describe, test, expect, beforeEach, mock } from "bun:test";
import * as fs from "node:fs/promises";

import { KNOWN_MODELS } from "@/common/constants/knownModels";
import { StreamManager } from "./streamManager";
import { APICallError } from "ai";
import type { HistoryService } from "./historyService";
import type { PartialService } from "./partialService";
import { createAnthropic } from "@ai-sdk/anthropic";
import { shouldRunIntegrationTests, validateApiKeys } from "../../../tests/testUtils";
import { DisposableTempDir } from "@/node/services/tempDir";
import { createRuntime } from "@/node/runtime/runtimeFactory";

// Skip integration tests if TEST_INTEGRATION is not set
Expand Down Expand Up @@ -37,6 +40,49 @@ const createMockPartialService = (): PartialService => {
} as unknown as PartialService;
};

describe("StreamManager - createTempDirForStream", () => {
test("creates ~/.mux-tmp/<token> under the runtime's home", async () => {
using home = new DisposableTempDir("stream-home");

const prevHome = process.env.HOME;
const prevUserProfile = process.env.USERPROFILE;

process.env.HOME = home.path;
process.env.USERPROFILE = home.path;

try {
const streamManager = new StreamManager(
createMockHistoryService(),
createMockPartialService()
);
const runtime = createRuntime({ type: "local", srcBaseDir: "/tmp" });

const token = streamManager.generateStreamToken();
const resolved = await streamManager.createTempDirForStream(token, runtime);

// StreamManager normalizes Windows paths to forward slashes.
const normalizedHomePath = home.path.replace(/\\/g, "/");
expect(resolved.startsWith(normalizedHomePath)).toBe(true);
expect(resolved).toContain(`/.mux-tmp/${token}`);

const stat = await fs.stat(resolved);
expect(stat.isDirectory()).toBe(true);
} finally {
if (prevHome === undefined) {
delete process.env.HOME;
} else {
process.env.HOME = prevHome;
}

if (prevUserProfile === undefined) {
delete process.env.USERPROFILE;
} else {
process.env.USERPROFILE = prevUserProfile;
}
}
});
});

describe("StreamManager - Concurrent Stream Prevention", () => {
let streamManager: StreamManager;
let mockHistoryService: HistoryService;
Expand Down
17 changes: 5 additions & 12 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ import { AsyncMutex } from "@/node/utils/concurrency/asyncMutex";
import { stripInternalToolResultFields } from "@/common/utils/tools/internalToolResultFields";
import type { ToolPolicy } from "@/common/utils/tools/toolPolicy";
import { StreamingTokenTracker } from "@/node/utils/main/StreamingTokenTracker";
import { shescape } from "@/node/runtime/streamUtils";
import type { Runtime } from "@/node/runtime/Runtime";
import { execBuffered } from "@/node/utils/runtime/helpers";
import {
createCachedSystemMessage,
applyCacheControlToTools,
Expand Down Expand Up @@ -307,16 +305,11 @@ export class StreamManager extends EventEmitter {
resolvedPath = resolvedPath.replace(/\\/g, "/");
}

// Create directory on target runtime (local/SSH/Docker)
const result = await execBuffered(runtime, `mkdir -p ${shescape.quote(resolvedPath)}`, {
cwd: "/",
timeout: 10,
});

if (result.exitCode !== 0) {
throw new Error(
`Failed to create temp directory ${resolvedPath}: exit code ${result.exitCode}`
);
try {
await runtime.ensureDir(resolvedPath);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(`Failed to create temp directory ${resolvedPath}: ${msg}`);
}

return resolvedPath;
Expand Down
19 changes: 10 additions & 9 deletions src/node/services/workspaceMcpOverridesService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,16 @@ export class WorkspaceMcpOverridesService {

private async ensureOverridesDir(
runtime: ReturnType<typeof createRuntime>,
workspacePath: string
workspacePath: string,
runtimeConfig: RuntimeConfig | undefined
): Promise<void> {
const result = await execBuffered(runtime, `mkdir -p "${MCP_OVERRIDES_DIR}"`, {
cwd: workspacePath,
timeout: 10,
});
const overridesDirPath = joinForRuntime(runtimeConfig, workspacePath, MCP_OVERRIDES_DIR);

if (result.exitCode !== 0) {
throw new Error(`Failed to create ${MCP_OVERRIDES_DIR} directory: ${result.stderr}`);
try {
await runtime.ensureDir(overridesDirPath);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(`Failed to create ${MCP_OVERRIDES_DIR} directory: ${msg}`);
}
}

Expand Down Expand Up @@ -361,7 +362,7 @@ export class WorkspaceMcpOverridesService {
}

try {
await this.ensureOverridesDir(runtime, workspacePath);
await this.ensureOverridesDir(runtime, workspacePath, metadata.runtimeConfig);
await writeFileString(runtime, jsoncPath, JSON.stringify(normalizedLegacy, null, 2) + "\n");
await this.ensureOverridesGitignored(runtime, workspacePath, metadata.runtimeConfig);
await this.clearLegacyOverridesInConfig(workspaceId);
Expand Down Expand Up @@ -404,7 +405,7 @@ export class WorkspaceMcpOverridesService {
return;
}

await this.ensureOverridesDir(runtime, workspacePath);
await this.ensureOverridesDir(runtime, workspacePath, metadata.runtimeConfig);
await writeFileString(runtime, jsoncPath, JSON.stringify(normalized, null, 2) + "\n");
await this.ensureOverridesGitignored(runtime, workspacePath, metadata.runtimeConfig);
}
Expand Down
Loading