diff --git a/api/server/middleware/validateMessageReq.js b/api/server/middleware/validateMessageReq.js index 15967cdc52..77f5fe620d 100644 --- a/api/server/middleware/validateMessageReq.js +++ b/api/server/middleware/validateMessageReq.js @@ -20,7 +20,16 @@ async function canReadActiveJobConversation(req, conversationId) { return false; } - if (!job || job.status !== 'running') { + // A job paused for human review is still active (consistent with /chat/status + // and /chat/active), so a new-conversation run that pauses before its final + // save can still recover the prompt — but only while it has a live, + // resolvable prompt (missing/malformed or past-expiry reads as inactive). + const pendingAction = job?.metadata?.pendingAction; + const pendingLive = + !!pendingAction && (pendingAction.expiresAt == null || pendingAction.expiresAt > Date.now()); + const isActive = + !!job && (job.status === 'running' || (job.status === 'requires_action' && pendingLive)); + if (!isActive) { return false; } diff --git a/api/server/routes/agents/index.js b/api/server/routes/agents/index.js index 7cc11c15bf..f634da0d16 100644 --- a/api/server/routes/agents/index.js +++ b/api/server/routes/agents/index.js @@ -203,13 +203,13 @@ router.get('/chat/status/:conversationId', async (req, res) => { // Avoid calling both getStreamInfo and getResumeState (both fetch content) const resumeState = await GenerationJobManager.getResumeState(conversationId); // A job paused for human review is still active (consistent with /chat/active), - // so the client resumes/subscribes rather than treating it as finished — unless - // its prompt has already expired (cleanup/expiry will finalize it). - const pendingExpired = - job.metadata.pendingAction?.expiresAt != null && - job.metadata.pendingAction.expiresAt <= Date.now(); - const isActive = - job.status === 'running' || (job.status === 'requires_action' && !pendingExpired); + // so the client resumes/subscribes rather than treating it as finished — but + // only while it has a live, resolvable prompt: a missing/malformed or + // past-expiry pendingAction reads as inactive (cleanup/expiry will finalize it). + const pendingAction = job.metadata.pendingAction; + const pendingLive = + !!pendingAction && (pendingAction.expiresAt == null || pendingAction.expiresAt > Date.now()); + const isActive = job.status === 'running' || (job.status === 'requires_action' && pendingLive); res.json({ active: isActive, diff --git a/packages/api/src/stream/ApprovalLifecycle.ts b/packages/api/src/stream/ApprovalLifecycle.ts index 859493da67..1daff23c85 100644 --- a/packages/api/src/stream/ApprovalLifecycle.ts +++ b/packages/api/src/stream/ApprovalLifecycle.ts @@ -82,7 +82,10 @@ export class ApprovalLifecycle { job.pendingAction && this.isExpired(job.pendingAction) ) { - await this.expire(streamId, expectedActionId); + // Target the exact record observed as expired. If the caller didn't pin an + // actionId, fall back to the one just read — otherwise a concurrent + // resume + re-pause for a new action could let this expire abort it. + await this.expire(streamId, expectedActionId ?? job.pendingAction.actionId); return false; } return this.store.transitionStatus(streamId, { diff --git a/packages/api/src/stream/implementations/InMemoryJobStore.ts b/packages/api/src/stream/implementations/InMemoryJobStore.ts index 8c52ee829f..702b58ca40 100644 --- a/packages/api/src/stream/implementations/InMemoryJobStore.ts +++ b/packages/api/src/stream/implementations/InMemoryJobStore.ts @@ -8,7 +8,7 @@ import type { JobStatus, JobStatusTransition, } from '~/stream/interfaces/IJobStore'; -import { isPendingActionExpired } from '~/stream/interfaces/IJobStore'; +import { isPendingActionStale } from '~/stream/interfaces/IJobStore'; /** * Content state for a job - volatile, in-memory only. @@ -137,6 +137,17 @@ export class InMemoryJobStore implements IJobStore { return; } Object.assign(job, updates); + // Mirror the guarded transitionStatus path so a pause/resume via this + // generic update behaves identically (parity with RedisJobStore): + // - mirror the flat pendingActionId the stale-decision guard compares; + // - on resume to running, refresh lastActiveAt and drop the flat id. + if (updates.pendingAction) { + job.pendingActionId = updates.pendingAction.actionId; + } + if (updates.status === 'running') { + job.lastActiveAt = updates.lastActiveAt ?? Date.now(); + delete job.pendingActionId; + } } /** @@ -207,11 +218,11 @@ export class InMemoryJobStore implements IJobStore { if (this.ttlAfterComplete === 0 || now - job.completedAt > this.ttlAfterComplete) { toDelete.push(streamId); } - } else if (job.status === 'requires_action' && isPendingActionExpired(job)) { - // Past-due approval: finalize it (aborted) so it stops occupying the - // user slot and its content state is reclaimed, mirroring - // ApprovalLifecycle.expire(). Skipping it (active-list filter) alone - // would leave the job resident indefinitely. + } else if (job.status === 'requires_action' && isPendingActionStale(job)) { + // Stale approval (expired, or missing/malformed pendingAction): + // finalize it (aborted) so it stops occupying the user slot and its + // content state is reclaimed, mirroring ApprovalLifecycle.expire(). + // Skipping it (active-list filter) alone would leave it resident. job.status = 'aborted'; job.completedAt = now; job.error = 'Approval expired before a decision was made'; @@ -344,7 +355,7 @@ export class InMemoryJobStore implements IJobStore { // only while its prompt is live: a past-`expiresAt` approval no longer // counts as active (cleanup/expiry will finalize it). if (job && (job.status === 'running' || job.status === 'requires_action')) { - if (job.status === 'requires_action' && isPendingActionExpired(job)) { + if (job.status === 'requires_action' && isPendingActionStale(job)) { continue; } activeIds.push(streamId); diff --git a/packages/api/src/stream/implementations/RedisJobStore.ts b/packages/api/src/stream/implementations/RedisJobStore.ts index d96b835a9e..b82f3ab393 100644 --- a/packages/api/src/stream/implementations/RedisJobStore.ts +++ b/packages/api/src/stream/implementations/RedisJobStore.ts @@ -10,7 +10,7 @@ import type { JobStatus, JobStatusTransition, } from '~/stream/interfaces/IJobStore'; -import { isPendingActionExpired } from '~/stream/interfaces/IJobStore'; +import { isPendingActionStale } from '~/stream/interfaces/IJobStore'; /** * Atomic compare-and-set on the job hash — the single-winner decision for a @@ -266,7 +266,12 @@ export class RedisJobStore implements IJobStore { const fields = Object.entries(serialized).flat(); if (updates.status === 'requires_action') { - await this.transitionToRequiresAction(key, streamId, fields); + await this.transitionToRequiresAction( + key, + streamId, + fields, + this.pauseTtlSeconds(updates.pendingAction), + ); return; } @@ -345,6 +350,22 @@ export class RedisJobStore implements IJobStore { } } + /** + * Live-key TTL (seconds) for a paused job. Defaults to the running TTL but + * extends to cover a pendingAction whose `expiresAt` is farther out, plus a + * grace margin so a decision arriving right at the deadline can still resume. + * Without this, a long approval window (e.g. 1h) on the default 20m stream + * TTL would let Redis evict the paused job mid-window. + */ + private pauseTtlSeconds(pendingAction?: Agents.PendingAction): number { + const exp = pendingAction?.expiresAt; + if (exp == null) { + return this.ttl.running; + } + const secondsUntilExpiry = Math.ceil((exp - Date.now()) / 1000) + 60; + return Math.max(this.ttl.running, secondsUntilExpiry); + } + /** The membership set a status belongs to; terminal statuses have none. */ private statusSetKey(status: JobStatus): string | null { if (status === 'running') { @@ -370,7 +391,13 @@ export class RedisJobStore implements IJobStore { const remSet = this.statusSetKey(from); const addSet = this.statusSetKey(to); const terminal = addSet === null; - const ttl = terminal ? this.ttl.completed : this.ttl.running; + let ttl = terminal ? this.ttl.completed : this.ttl.running; + if (to === 'requires_action') { + // A paused job must outlive its approval window, even when that window is + // longer than the running TTL — otherwise Redis evicts it before a + // decision can resume it. + ttl = this.pauseTtlSeconds(patch?.pendingAction); + } // 1) Single-winner decision: an atomic CAS on the single-slot job hash. // Works identically on cluster and single-node, so two concurrent @@ -436,9 +463,12 @@ export class RedisJobStore implements IJobStore { key: string, streamId: string, fields: string[], + ttlSeconds: number = this.ttl.running, ): Promise { // Job paused for human review — non-terminal. Keep the user-active set - // untouched so resume can rebuild state from the persisted job. + // untouched so resume can rebuild state from the persisted job. The live + // TTL covers the approval window (see pauseTtlSeconds) so a long-pending + // job isn't evicted before a decision arrives. if (this.isCluster) { const exists = await this.redis.exists(key); if (exists !== 1) { @@ -447,7 +477,9 @@ export class RedisJobStore implements IJobStore { await this.redis.srem(KEYS.runningJobs, streamId); await this.redis.sadd(KEYS.requiresActionJobs, streamId); await this.updateExistingJobHash(key, fields); - await this.refreshLiveJobTtls(key, streamId); + await this.redis.expire(key, ttlSeconds); + await this.redis.expire(KEYS.chunks(streamId), ttlSeconds); + await this.redis.expire(KEYS.runSteps(streamId), ttlSeconds); return; } @@ -460,7 +492,7 @@ export class RedisJobStore implements IJobStore { KEYS.chunks(streamId), KEYS.runSteps(streamId), streamId, - String(this.ttl.running), + String(ttlSeconds), ...fields, ); } @@ -675,11 +707,12 @@ export class RedisJobStore implements IJobStore { return 1; } - // Past-due approval: finalize it (aborted) so it stops occupying the - // slot and its stream contents are reclaimed, mirroring - // ApprovalLifecycle.expire(). transitionStatus runs the terminal - // content cleanup (sets, chunks, run-steps, userJobs, completed TTL). - if (isPendingActionExpired(job)) { + // Stale approval (expired, or missing/malformed pendingAction): + // finalize it (aborted) so it stops occupying the slot and its stream + // contents are reclaimed, mirroring ApprovalLifecycle.expire(). + // transitionStatus runs the terminal content cleanup (sets, chunks, + // run-steps, userJobs, completed TTL). + if (isPendingActionStale(job)) { await this.transitionStatus(streamId, { from: 'requires_action', to: 'aborted', @@ -779,7 +812,7 @@ export class RedisJobStore implements IJobStore { // counts as active (cleanup/expiry will finalize it), so the client stops // polling and can complete. if (job && (job.status === 'running' || job.status === 'requires_action')) { - if (job.status === 'requires_action' && isPendingActionExpired(job)) { + if (job.status === 'requires_action' && isPendingActionStale(job)) { continue; } activeIds.push(streamId); diff --git a/packages/api/src/stream/interfaces/IJobStore.ts b/packages/api/src/stream/interfaces/IJobStore.ts index da807ed4dd..ddd67026ab 100644 --- a/packages/api/src/stream/interfaces/IJobStore.ts +++ b/packages/api/src/stream/interfaces/IJobStore.ts @@ -97,6 +97,16 @@ export function isPendingActionExpired(job: Pick): boolean { + return !job.pendingAction || isPendingActionExpired(job); +} + /** * Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set. */