Skip to content

Commit 106c139

Browse files
committed
🤖 fix: avoid task timeouts while queued
Foreground task waits now start the agent_report timeout only after the child workspace transitions from queued → running, so queue time doesn’t count against execution time. Also start queued tasks only after a successful resumeStream and add a regression test for the queued-wait behavior. Signed-off-by: Thomas Kosiewski <tk@coder.com> --- _Generated with `codex cli` • Model: `gpt-5.2` • Thinking: `xhigh`_ <!-- mux-attribution: model=gpt-5.2 thinking=xhigh --> Change-Id: Iddcac5eb351ebc21e13fed886081f5fd4f1eaf90
1 parent 0e43cbb commit 106c139

File tree

2 files changed

+163
-10
lines changed

2 files changed

+163
-10
lines changed

‎src/node/services/taskService.test.ts‎

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,84 @@ describe("TaskService", () => {
301301
expect(started?.taskStatus).toBe("running");
302302
}, 20_000);
303303

304+
test("waitForAgentReport does not time out while task is queued", async () => {
305+
const config = new Config(rootDir);
306+
307+
const projectPath = path.join(rootDir, "repo");
308+
const parentId = "parent-111";
309+
const childId = "child-222";
310+
311+
await config.saveConfig({
312+
projects: new Map([
313+
[
314+
projectPath,
315+
{
316+
workspaces: [
317+
{ path: path.join(projectPath, "parent"), id: parentId, name: "parent" },
318+
{
319+
path: path.join(projectPath, "child"),
320+
id: childId,
321+
name: "agent_explore_child",
322+
parentWorkspaceId: parentId,
323+
agentType: "explore",
324+
taskStatus: "queued",
325+
},
326+
],
327+
},
328+
],
329+
]),
330+
taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 },
331+
});
332+
333+
const historyService = new HistoryService(config);
334+
const partialService = new PartialService(config, historyService);
335+
336+
const aiService: AIService = {
337+
isStreaming: mock(() => false),
338+
on: mock(() => undefined),
339+
off: mock(() => undefined),
340+
} as unknown as AIService;
341+
342+
const workspaceService: WorkspaceService = {
343+
sendMessage: mock(() => Promise.resolve(Ok(undefined))),
344+
resumeStream: mock(() => Promise.resolve(Ok(undefined))),
345+
remove: mock(() => Promise.resolve(Ok(undefined))),
346+
emit: mock(() => true),
347+
} as unknown as WorkspaceService;
348+
349+
const initStateManager: InitStateManager = {
350+
startInit: mock(() => undefined),
351+
appendOutput: mock(() => undefined),
352+
endInit: mock(() => Promise.resolve()),
353+
} as unknown as InitStateManager;
354+
355+
const taskService = new TaskService(
356+
config,
357+
historyService,
358+
partialService,
359+
aiService,
360+
workspaceService,
361+
initStateManager
362+
);
363+
364+
// Timeout is short so the test would fail if the timer started while queued.
365+
const reportPromise = taskService.waitForAgentReport(childId, { timeoutMs: 50 });
366+
367+
// Wait longer than timeout while task is still queued.
368+
await new Promise((r) => setTimeout(r, 100));
369+
370+
const internal = taskService as unknown as {
371+
setTaskStatus: (workspaceId: string, status: "queued" | "running") => Promise<void>;
372+
resolveWaiters: (taskId: string, report: { reportMarkdown: string; title?: string }) => void;
373+
};
374+
375+
await internal.setTaskStatus(childId, "running");
376+
internal.resolveWaiters(childId, { reportMarkdown: "ok" });
377+
378+
const report = await reportPromise;
379+
expect(report.reportMarkdown).toBe("ok");
380+
});
381+
304382
test("rolls back created workspace when initial sendMessage fails", async () => {
305383
const config = new Config(rootDir);
306384
await fsPromises.mkdir(config.srcDir, { recursive: true });

‎src/node/services/taskService.ts‎

Lines changed: 85 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ interface PendingTaskWaiter {
5353
cleanup: () => void;
5454
}
5555

56+
interface PendingTaskStartWaiter {
57+
createdAt: number;
58+
start: () => void;
59+
cleanup: () => void;
60+
}
61+
5662
function isToolCallEndEvent(value: unknown): value is ToolCallEndEvent {
5763
return (
5864
typeof value === "object" &&
@@ -113,6 +119,7 @@ function getIsoNow(): string {
113119
export class TaskService {
114120
private readonly mutex = new AsyncMutex();
115121
private readonly pendingWaitersByTaskId = new Map<string, PendingTaskWaiter[]>();
122+
private readonly pendingStartWaitersByTaskId = new Map<string, PendingTaskStartWaiter[]>();
116123
private readonly remindedAwaitingReport = new Set<string>();
117124

118125
constructor(
@@ -416,12 +423,33 @@ export class TaskService {
416423

417424
return new Promise<{ reportMarkdown: string; title?: string }>((resolve, reject) => {
418425
let timeout: ReturnType<typeof setTimeout> | null = null;
426+
let startWaiter: PendingTaskStartWaiter | null = null;
419427
let abortListener: (() => void) | null = null;
420428

429+
const startReportTimeout = () => {
430+
if (timeout) return;
431+
timeout = setTimeout(() => {
432+
entry.cleanup();
433+
reject(new Error("Timed out waiting for agent_report"));
434+
}, timeoutMs);
435+
};
436+
437+
const cleanupStartWaiter = () => {
438+
if (!startWaiter) return;
439+
startWaiter.cleanup();
440+
startWaiter = null;
441+
};
442+
421443
const entry: PendingTaskWaiter = {
422444
createdAt: Date.now(),
423-
resolve,
424-
reject,
445+
resolve: (report) => {
446+
entry.cleanup();
447+
resolve(report);
448+
},
449+
reject: (error) => {
450+
entry.cleanup();
451+
reject(error);
452+
},
425453
cleanup: () => {
426454
const current = this.pendingWaitersByTaskId.get(taskId);
427455
if (current) {
@@ -433,6 +461,8 @@ export class TaskService {
433461
}
434462
}
435463

464+
cleanupStartWaiter();
465+
436466
if (timeout) {
437467
clearTimeout(timeout);
438468
timeout = null;
@@ -449,10 +479,43 @@ export class TaskService {
449479
list.push(entry);
450480
this.pendingWaitersByTaskId.set(taskId, list);
451481

452-
timeout = setTimeout(() => {
453-
entry.cleanup();
454-
reject(new Error("Timed out waiting for agent_report"));
455-
}, timeoutMs);
482+
// Don't start the execution timeout while the task is still queued.
483+
// The timer starts once the child actually begins running (queued -> running).
484+
const cfg = this.config.loadConfigOrDefault();
485+
const taskEntry = this.findWorkspaceEntry(cfg, taskId);
486+
const initialStatus = taskEntry?.workspace.taskStatus;
487+
if (initialStatus === "queued") {
488+
const startWaiterEntry: PendingTaskStartWaiter = {
489+
createdAt: Date.now(),
490+
start: startReportTimeout,
491+
cleanup: () => {
492+
const currentStartWaiters = this.pendingStartWaitersByTaskId.get(taskId);
493+
if (currentStartWaiters) {
494+
const next = currentStartWaiters.filter((w) => w !== startWaiterEntry);
495+
if (next.length === 0) {
496+
this.pendingStartWaitersByTaskId.delete(taskId);
497+
} else {
498+
this.pendingStartWaitersByTaskId.set(taskId, next);
499+
}
500+
}
501+
},
502+
};
503+
startWaiter = startWaiterEntry;
504+
505+
const currentStartWaiters = this.pendingStartWaitersByTaskId.get(taskId) ?? [];
506+
currentStartWaiters.push(startWaiterEntry);
507+
this.pendingStartWaitersByTaskId.set(taskId, currentStartWaiters);
508+
509+
// Close the race where the task starts between the initial config read and registering the waiter.
510+
const cfgAfterRegister = this.config.loadConfigOrDefault();
511+
const afterEntry = this.findWorkspaceEntry(cfgAfterRegister, taskId);
512+
if (afterEntry?.workspace.taskStatus !== "queued") {
513+
cleanupStartWaiter();
514+
startReportTimeout();
515+
}
516+
} else {
517+
startReportTimeout();
518+
}
456519

457520
if (options?.abortSignal) {
458521
if (options.abortSignal.aborted) {
@@ -578,8 +641,6 @@ export class TaskService {
578641
for (const task of queued) {
579642
if (!task.id) continue;
580643

581-
await this.setTaskStatus(task.id, "running");
582-
583644
// Start by resuming from the queued prompt in history.
584645
const model = task.taskModelString ?? defaultModel;
585646
const resumeResult = await this.workspaceService.resumeStream(task.id, {
@@ -589,9 +650,10 @@ export class TaskService {
589650

590651
if (!resumeResult.success) {
591652
log.error("Failed to start queued task", { taskId: task.id, error: resumeResult.error });
592-
// Put it back in queued to retry later.
593-
await this.setTaskStatus(task.id, "queued");
653+
continue;
594654
}
655+
656+
await this.setTaskStatus(task.id, "running");
595657
}
596658
}
597659

@@ -612,6 +674,19 @@ export class TaskService {
612674
const allMetadata = await this.config.getAllWorkspaceMetadata();
613675
const metadata = allMetadata.find((m) => m.id === workspaceId) ?? null;
614676
this.workspaceService.emit("metadata", { workspaceId, metadata });
677+
678+
if (status === "running") {
679+
const waiters = this.pendingStartWaitersByTaskId.get(workspaceId);
680+
if (!waiters || waiters.length === 0) return;
681+
this.pendingStartWaitersByTaskId.delete(workspaceId);
682+
for (const waiter of waiters) {
683+
try {
684+
waiter.start();
685+
} catch (error: unknown) {
686+
log.error("Task start waiter callback failed", { workspaceId, error });
687+
}
688+
}
689+
}
615690
}
616691

617692
private async handleStreamEnd(event: StreamEndEvent): Promise<void> {

0 commit comments

Comments
 (0)