🛡️ fix: Address Codex round 4 — paused-job edge cases across the stack

Five P2 findings on 4324a4e776, all valid:

- I1 message validation: validateMessageReq's active-job read bypass now
  accepts a live requires_action job, so a new-conversation run that pauses
  before its final save can recover the prompt instead of 404ing.
- I2 expire targets the observed record: resolve()'s expired path passes
  `expectedActionId ?? job.pendingAction.actionId`, so a concurrent
  resume+re-pause can't let expire abort a different action.
- I3 stale/malformed prompts: new isPendingActionStale (missing OR expired)
  drives active-listing exclusion + cleanup expiry in both stores, and the
  status route + middleware require a live pendingAction — a requires_action
  job whose pendingAction was dropped on deserialize no longer reads active.
- I4 in-memory parity: InMemory updateJob mirrors pendingActionId on pause and
  clears it + refreshes lastActiveAt on resume (matching RedisJobStore), so a
  pause via the generic path is still resolvable by actionId.
- I5 long approval windows: paused-job live TTL (job/chunks/run-steps) now
  covers pendingAction.expiresAt + grace (pauseTtlSeconds), on both the
  transitionStatus and updateJob pause paths, so Redis can't evict a paused
  job before its decision window closes.

tsc + lint clean; policy + type-contract specs pass.
This commit is contained in:
Danny Avila 2026-06-16 14:51:49 -04:00
parent 4324a4e776
commit c8abd826e1
6 changed files with 94 additions and 28 deletions

View file

@ -20,7 +20,16 @@ async function canReadActiveJobConversation(req, conversationId) {
return false;
}
if (!job || job.status !== 'running') {
// A job paused for human review is still active (consistent with /chat/status
// and /chat/active), so a new-conversation run that pauses before its final
// save can still recover the prompt — but only while it has a live,
// resolvable prompt (missing/malformed or past-expiry reads as inactive).
const pendingAction = job?.metadata?.pendingAction;
const pendingLive =
!!pendingAction && (pendingAction.expiresAt == null || pendingAction.expiresAt > Date.now());
const isActive =
!!job && (job.status === 'running' || (job.status === 'requires_action' && pendingLive));
if (!isActive) {
return false;
}

View file

@ -203,13 +203,13 @@ router.get('/chat/status/:conversationId', async (req, res) => {
// Avoid calling both getStreamInfo and getResumeState (both fetch content)
const resumeState = await GenerationJobManager.getResumeState(conversationId);
// A job paused for human review is still active (consistent with /chat/active),
// so the client resumes/subscribes rather than treating it as finished — unless
// its prompt has already expired (cleanup/expiry will finalize it).
const pendingExpired =
job.metadata.pendingAction?.expiresAt != null &&
job.metadata.pendingAction.expiresAt <= Date.now();
const isActive =
job.status === 'running' || (job.status === 'requires_action' && !pendingExpired);
// so the client resumes/subscribes rather than treating it as finished — but
// only while it has a live, resolvable prompt: a missing/malformed or
// past-expiry pendingAction reads as inactive (cleanup/expiry will finalize it).
const pendingAction = job.metadata.pendingAction;
const pendingLive =
!!pendingAction && (pendingAction.expiresAt == null || pendingAction.expiresAt > Date.now());
const isActive = job.status === 'running' || (job.status === 'requires_action' && pendingLive);
res.json({
active: isActive,

View file

@ -82,7 +82,10 @@ export class ApprovalLifecycle {
job.pendingAction &&
this.isExpired(job.pendingAction)
) {
await this.expire(streamId, expectedActionId);
// Target the exact record observed as expired. If the caller didn't pin an
// actionId, fall back to the one just read — otherwise a concurrent
// resume + re-pause for a new action could let this expire abort it.
await this.expire(streamId, expectedActionId ?? job.pendingAction.actionId);
return false;
}
return this.store.transitionStatus(streamId, {

View file

@ -8,7 +8,7 @@ import type {
JobStatus,
JobStatusTransition,
} from '~/stream/interfaces/IJobStore';
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
import { isPendingActionStale } from '~/stream/interfaces/IJobStore';
/**
* Content state for a job - volatile, in-memory only.
@ -137,6 +137,17 @@ export class InMemoryJobStore implements IJobStore {
return;
}
Object.assign(job, updates);
// Mirror the guarded transitionStatus path so a pause/resume via this
// generic update behaves identically (parity with RedisJobStore):
// - mirror the flat pendingActionId the stale-decision guard compares;
// - on resume to running, refresh lastActiveAt and drop the flat id.
if (updates.pendingAction) {
job.pendingActionId = updates.pendingAction.actionId;
}
if (updates.status === 'running') {
job.lastActiveAt = updates.lastActiveAt ?? Date.now();
delete job.pendingActionId;
}
}
/**
@ -207,11 +218,11 @@ export class InMemoryJobStore implements IJobStore {
if (this.ttlAfterComplete === 0 || now - job.completedAt > this.ttlAfterComplete) {
toDelete.push(streamId);
}
} else if (job.status === 'requires_action' && isPendingActionExpired(job)) {
// Past-due approval: finalize it (aborted) so it stops occupying the
// user slot and its content state is reclaimed, mirroring
// ApprovalLifecycle.expire(). Skipping it (active-list filter) alone
// would leave the job resident indefinitely.
} else if (job.status === 'requires_action' && isPendingActionStale(job)) {
// Stale approval (expired, or missing/malformed pendingAction):
// finalize it (aborted) so it stops occupying the user slot and its
// content state is reclaimed, mirroring ApprovalLifecycle.expire().
// Skipping it (active-list filter) alone would leave it resident.
job.status = 'aborted';
job.completedAt = now;
job.error = 'Approval expired before a decision was made';
@ -344,7 +355,7 @@ export class InMemoryJobStore implements IJobStore {
// only while its prompt is live: a past-`expiresAt` approval no longer
// counts as active (cleanup/expiry will finalize it).
if (job && (job.status === 'running' || job.status === 'requires_action')) {
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
if (job.status === 'requires_action' && isPendingActionStale(job)) {
continue;
}
activeIds.push(streamId);

View file

@ -10,7 +10,7 @@ import type {
JobStatus,
JobStatusTransition,
} from '~/stream/interfaces/IJobStore';
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
import { isPendingActionStale } from '~/stream/interfaces/IJobStore';
/**
* Atomic compare-and-set on the job hash the single-winner decision for a
@ -266,7 +266,12 @@ export class RedisJobStore implements IJobStore {
const fields = Object.entries(serialized).flat();
if (updates.status === 'requires_action') {
await this.transitionToRequiresAction(key, streamId, fields);
await this.transitionToRequiresAction(
key,
streamId,
fields,
this.pauseTtlSeconds(updates.pendingAction),
);
return;
}
@ -345,6 +350,22 @@ export class RedisJobStore implements IJobStore {
}
}
/**
* Live-key TTL (seconds) for a paused job. Defaults to the running TTL but
* extends to cover a pendingAction whose `expiresAt` is farther out, plus a
* grace margin so a decision arriving right at the deadline can still resume.
* Without this, a long approval window (e.g. 1h) on the default 20m stream
* TTL would let Redis evict the paused job mid-window.
*/
private pauseTtlSeconds(pendingAction?: Agents.PendingAction): number {
const exp = pendingAction?.expiresAt;
if (exp == null) {
return this.ttl.running;
}
const secondsUntilExpiry = Math.ceil((exp - Date.now()) / 1000) + 60;
return Math.max(this.ttl.running, secondsUntilExpiry);
}
/** The membership set a status belongs to; terminal statuses have none. */
private statusSetKey(status: JobStatus): string | null {
if (status === 'running') {
@ -370,7 +391,13 @@ export class RedisJobStore implements IJobStore {
const remSet = this.statusSetKey(from);
const addSet = this.statusSetKey(to);
const terminal = addSet === null;
const ttl = terminal ? this.ttl.completed : this.ttl.running;
let ttl = terminal ? this.ttl.completed : this.ttl.running;
if (to === 'requires_action') {
// A paused job must outlive its approval window, even when that window is
// longer than the running TTL — otherwise Redis evicts it before a
// decision can resume it.
ttl = this.pauseTtlSeconds(patch?.pendingAction);
}
// 1) Single-winner decision: an atomic CAS on the single-slot job hash.
// Works identically on cluster and single-node, so two concurrent
@ -436,9 +463,12 @@ export class RedisJobStore implements IJobStore {
key: string,
streamId: string,
fields: string[],
ttlSeconds: number = this.ttl.running,
): Promise<void> {
// Job paused for human review — non-terminal. Keep the user-active set
// untouched so resume can rebuild state from the persisted job.
// untouched so resume can rebuild state from the persisted job. The live
// TTL covers the approval window (see pauseTtlSeconds) so a long-pending
// job isn't evicted before a decision arrives.
if (this.isCluster) {
const exists = await this.redis.exists(key);
if (exists !== 1) {
@ -447,7 +477,9 @@ export class RedisJobStore implements IJobStore {
await this.redis.srem(KEYS.runningJobs, streamId);
await this.redis.sadd(KEYS.requiresActionJobs, streamId);
await this.updateExistingJobHash(key, fields);
await this.refreshLiveJobTtls(key, streamId);
await this.redis.expire(key, ttlSeconds);
await this.redis.expire(KEYS.chunks(streamId), ttlSeconds);
await this.redis.expire(KEYS.runSteps(streamId), ttlSeconds);
return;
}
@ -460,7 +492,7 @@ export class RedisJobStore implements IJobStore {
KEYS.chunks(streamId),
KEYS.runSteps(streamId),
streamId,
String(this.ttl.running),
String(ttlSeconds),
...fields,
);
}
@ -675,11 +707,12 @@ export class RedisJobStore implements IJobStore {
return 1;
}
// Past-due approval: finalize it (aborted) so it stops occupying the
// slot and its stream contents are reclaimed, mirroring
// ApprovalLifecycle.expire(). transitionStatus runs the terminal
// content cleanup (sets, chunks, run-steps, userJobs, completed TTL).
if (isPendingActionExpired(job)) {
// Stale approval (expired, or missing/malformed pendingAction):
// finalize it (aborted) so it stops occupying the slot and its stream
// contents are reclaimed, mirroring ApprovalLifecycle.expire().
// transitionStatus runs the terminal content cleanup (sets, chunks,
// run-steps, userJobs, completed TTL).
if (isPendingActionStale(job)) {
await this.transitionStatus(streamId, {
from: 'requires_action',
to: 'aborted',
@ -779,7 +812,7 @@ export class RedisJobStore implements IJobStore {
// counts as active (cleanup/expiry will finalize it), so the client stops
// polling and can complete.
if (job && (job.status === 'running' || job.status === 'requires_action')) {
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
if (job.status === 'requires_action' && isPendingActionStale(job)) {
continue;
}
activeIds.push(streamId);

View file

@ -97,6 +97,16 @@ export function isPendingActionExpired(job: Pick<SerializableJobData, 'pendingAc
return exp != null && exp <= Date.now();
}
/**
* Whether a `requires_action` job has no live, resolvable prompt either the
* pendingAction is missing/malformed (e.g. dropped on deserialize) or past its
* `expiresAt`. Such a job can't be rendered or resolved, so it must be kept out
* of active listings and finalized by cleanup rather than left stuck active.
*/
export function isPendingActionStale(job: Pick<SerializableJobData, 'pendingAction'>): boolean {
return !job.pendingAction || isPendingActionExpired(job);
}
/**
* Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set.
*/