🛡️ fix: Address Codex review on the HITL approval lifecycle

Seven findings on the lifecycle deepening (089ba09f9), all valid:

- F3 actionId guard: resolve/expire take an expectedActionId; pause records a
  flat `pendingActionId` the atomic CAS guards on, so a stale decision can't
  resume a job that has since paused for a different action.
- F4 cluster single-winner: transitionStatus now decides the winner with an
  atomic CAS on the single-slot job hash (one Lua, cluster-safe), then
  reconciles cross-slot membership sets — two concurrent resolves can no
  longer both win on Redis Cluster.
- F1 resume reaping: resolve refreshes `lastActiveAt`; both stores' stale-
  running failsafes key off it, so a long-paused approval isn't reaped right
  after resuming.
- F2 expire completedAt: expire writes completedAt so terminal cleanup
  reclaims the job (InMemory only cleans terminal jobs with completedAt set).
- F5 facade: buildJobFacade copies pendingAction into metadata so status/
  resume routes can render the prompt.
- F6 resume metadata: PendingAction + buildPendingAction carry the SDK
  interruptId/threadId needed to rebuild Command({ resume }) cross-process.
- F7 mirror: data-provider AskUserQuestionRequest gains optional description.

Tests added at the interface: stale-actionId resolve rejected, expire sets
completedAt. tsc + lint clean; policy + type-contract specs pass.
This commit is contained in:
Danny Avila 2026-06-16 13:49:53 -04:00
parent 089ba09f98
commit abf4b86291
8 changed files with 157 additions and 62 deletions

View file

@ -117,6 +117,10 @@ export interface PendingActionContext {
ttlMs?: number;
/** Override actionId; defaults to a fresh uuid. */
actionId?: string;
/** SDK interrupt id (`RunInterruptResult.interruptId`) for cross-process resume. */
interruptId?: string;
/** LangGraph `thread_id` (`RunInterruptResult.threadId`) for cross-process resume. */
threadId?: string;
}
/**
@ -140,5 +144,7 @@ export function buildPendingAction(
payload,
createdAt,
expiresAt: typeof ctx.ttlMs === 'number' ? createdAt + ctx.ttlMs : undefined,
interruptId: ctx.interruptId,
threadId: ctx.threadId,
};
}

View file

@ -36,7 +36,8 @@ export class ApprovalLifecycle {
const ok = await this.store.transitionStatus(streamId, {
from: 'running',
to: 'requires_action',
patch: { pendingAction },
// pendingActionId is the flat mirror the atomic resolve/expire guard on.
patch: { pendingAction, pendingActionId: pendingAction.actionId },
});
if (ok) {
logger.debug(
@ -62,40 +63,54 @@ export class ApprovalLifecycle {
/**
* `requires_action → running`, atomically. Returns `true` to the single
* caller that won the transition; `false` if the job was not paused, was
* already resumed by a racing submit, or had expired in which case it is
* moved to a terminal state instead of resumed.
* already resumed by a racing submit, no longer matches `expectedActionId`,
* or had expired in which case it is moved to a terminal state instead of
* resumed.
*
* Pass `expectedActionId` (the id the user actually decided on, from the
* approval route) so a stale decision can't resume a job that has since
* paused for a *different* action. Omit it only for callers with no specific
* action in hand.
*
* The caller MUST treat `false` as "do not drive the run": only the `true`
* winner may re-enter the agent.
*/
async resolve(streamId: string): Promise<boolean> {
async resolve(streamId: string, expectedActionId?: string): Promise<boolean> {
const job = await this.store.getJob(streamId);
if (
job?.status === 'requires_action' &&
job.pendingAction &&
this.isExpired(job.pendingAction)
) {
await this.expire(streamId);
await this.expire(streamId, expectedActionId);
return false;
}
return this.store.transitionStatus(streamId, {
from: 'requires_action',
to: 'running',
clear: ['pendingAction'],
clear: ['pendingAction', 'pendingActionId'],
// Refresh the liveness basis so a long-paused run isn't reaped as stale
// immediately after resuming (cleanup keys off lastActiveAt).
patch: { lastActiveAt: Date.now() },
expectActionId: expectedActionId,
});
}
/**
* `requires_action → aborted`: the edge that fires when no decision arrives
* in time. Previously undefined; now an explicit, idempotent terminal
* transition. Returns `true` to the single caller that expired it.
* transition. Returns `true` to the single caller that expired it. Honors
* `expectedActionId` for the same stale-decision protection as `resolve`.
*/
async expire(streamId: string): Promise<boolean> {
async expire(streamId: string, expectedActionId?: string): Promise<boolean> {
const ok = await this.store.transitionStatus(streamId, {
from: 'requires_action',
to: 'aborted',
clear: ['pendingAction'],
patch: { error: 'Approval expired before a decision was made' },
clear: ['pendingAction', 'pendingActionId'],
// completedAt lets the stores' terminal-cleanup reclaim the job; without
// it an expired approval lingers in the in-memory map indefinitely.
patch: { error: 'Approval expired before a decision was made', completedAt: Date.now() },
expectActionId: expectedActionId,
});
if (ok) {
logger.debug(`[ApprovalLifecycle] expired pending review: ${streamId}`);

View file

@ -480,6 +480,9 @@ class GenerationJobManagerClass {
iconURL: jobData.iconURL,
model: jobData.model,
promptTokens: jobData.promptTokens,
// Surface the pending review so status/resume routes built on the
// facade can render the prompt for a `requires_action` job.
pendingAction: jobData.pendingAction,
},
readyPromise: runtime.readyPromise,
resolveReady: runtime.resolveReady,

View file

@ -124,6 +124,21 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', ()
expect(await manager.approvals.resolve(streamId)).toBe(false);
});
test('rejects a resolve whose actionId no longer matches (stale-decision guard)', async () => {
const streamId = 'stream-stale-action';
await manager.createJob(streamId, 'user-1');
const action = buildAction(streamId);
await manager.approvals.pause(streamId, action);
// A decision targeting a different action must not resume this one.
expect(await manager.approvals.resolve(streamId, 'some-other-action-id')).toBe(false);
expect(await manager.getJobStatus(streamId)).toBe('requires_action');
// The matching actionId resolves it.
expect(await manager.approvals.resolve(streamId, action.actionId)).toBe(true);
expect(await manager.getJobStatus(streamId)).toBe('running');
});
test('an expired pending action expires instead of resuming', async () => {
const streamId = 'stream-resolve-expired';
await manager.createJob(streamId, 'user-1');
@ -156,6 +171,17 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', ()
await manager.createJob(streamId, 'user-1');
expect(await manager.approvals.expire(streamId)).toBe(false);
});
test('sets completedAt so terminal cleanup can reclaim the job', async () => {
const streamId = 'stream-expire-completed';
await manager.createJob(streamId, 'user-1');
await manager.approvals.pause(streamId, buildAction(streamId));
expect(await manager.approvals.expire(streamId)).toBe(true);
const job = await manager.getJob(streamId);
expect(job?.status).toBe('aborted');
expect(job?.completedAt).toBeGreaterThan(0);
});
});
describe('facade integration', () => {

View file

@ -149,6 +149,9 @@ export class InMemoryJobStore implements IJobStore {
if (!job || job.status !== args.from) {
return false;
}
if (args.expectActionId != null && job.pendingActionId !== args.expectActionId) {
return false;
}
job.status = args.to;
if (args.patch) {
Object.assign(job, args.patch);
@ -210,7 +213,7 @@ export class InMemoryJobStore implements IJobStore {
// content state in memory until the process OOMs. Reaping keys off last
// activity (not creation time) so a long but live stream is never reaped,
// mirroring RedisJobStore refreshing the running TTL on each chunk.
const lastActive = this.lastActivity.get(streamId) ?? job.createdAt;
const lastActive = this.lastActivity.get(streamId) ?? job.lastActiveAt ?? job.createdAt;
if (now - lastActive > this.staleJobTimeout) {
toDelete.push(streamId);
staleRunning++;

View file

@ -12,31 +12,30 @@ import type {
} from '~/stream/interfaces/IJobStore';
/**
* Atomic compare-and-set status transition (single-node / sentinel Redis).
* Atomic compare-and-set on the job hash the single-winner decision for a
* status transition. Touches ONLY the job key, which lives on one hash slot, so
* it is atomic on both single-node and Redis Cluster (cross-slot membership
* sets are reconciled by the caller AFTER this decides the winner).
*
* Guards on the current `status` field, then in one indivisible step
* removes `clear` fields, writes the `status`+patch pairs, reconciles the
* membership sets, and refreshes TTLs. Returns 1 if it fired, 0 if the job
* was missing or no longer in the expected `from` status.
* Guards on the current `status` and, when ARGV[2] is non-empty, on the flat
* `pendingActionId` field so a stale decision targeting a different action
* loses. On success: removes `clear` fields, writes `status`+patch pairs,
* refreshes the job-hash TTL. Returns 1 if it fired, 0 otherwise.
*
* KEYS: [job, remSet | "", addSet | "", chunks, runSteps]
* ARGV: [from, member, ttl, refreshLive(0|1), hdelCount, ...hdel, ...hsetPairs]
* KEYS: [job]
* ARGV: [from, expectActionId | "", ttl, hdelCount, ...hdelFields, ...hsetPairs]
*/
const TRANSITION_STATUS_LUA =
const JOB_CAS_LUA =
'if redis.call("HGET", KEYS[1], "status") ~= ARGV[1] then return 0 end ' +
'local member = ARGV[2] ' +
'if ARGV[2] ~= "" and redis.call("HGET", KEYS[1], "pendingActionId") ~= ARGV[2] then return 0 end ' +
'local ttl = tonumber(ARGV[3]) ' +
'local refreshLive = ARGV[4] ' +
'local hdelCount = tonumber(ARGV[5]) ' +
'local idx = 6 ' +
'local hdelCount = tonumber(ARGV[4]) ' +
'local idx = 5 ' +
'for i = 1, hdelCount do redis.call("HDEL", KEYS[1], ARGV[idx]) idx = idx + 1 end ' +
'local hset = {} ' +
'for i = idx, #ARGV do hset[#hset + 1] = ARGV[i] end ' +
'if #hset > 0 then redis.call("HSET", KEYS[1], unpack(hset)) end ' +
'if KEYS[2] ~= "" then redis.call("SREM", KEYS[2], member) end ' +
'if KEYS[3] ~= "" then redis.call("SADD", KEYS[3], member) end ' +
'redis.call("EXPIRE", KEYS[1], ttl) ' +
'if refreshLive == "1" then redis.call("EXPIRE", KEYS[4], ttl) redis.call("EXPIRE", KEYS[5], ttl) end ' +
'return 1';
/** Decision kinds the SDK can emit, used to sanity-check persisted records. */
@ -320,7 +319,7 @@ export class RedisJobStore implements IJobStore {
}
async transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean> {
const { from, to, patch, clear } = args;
const { from, to, patch, clear, expectActionId } = args;
const key = KEYS.job(streamId);
// status + patch become HSET pairs; serializeJob skips undefined, so
@ -335,51 +334,53 @@ export class RedisJobStore implements IJobStore {
const terminal = addSet === null;
const ttl = terminal ? this.ttl.completed : this.ttl.running;
// 1) Single-winner decision: an atomic CAS on the single-slot job hash.
// Works identically on cluster and single-node, so two concurrent
// resolves can never both win (and drive the run twice).
const won = await this.redis.eval(
JOB_CAS_LUA,
1,
key,
from,
expectActionId ?? '',
String(ttl),
String(clearFields.length),
...clearFields,
...fields,
);
if (won !== 1) {
return false;
}
// 2) Reconcile membership sets + live-key TTLs. Only the winner reaches
// here; set membership is derived state that periodic cleanup
// self-heals, so this non-atomic cross-slot step is safe.
if (this.isCluster) {
// Membership sets live on a different slot from the job hash, so a single
// cross-slot script isn't possible. Guard best-effort (read status, then
// apply) — matches the existing cluster posture for status writes.
const current = await this.redis.hget(key, 'status');
if (current !== from) {
return false;
}
if (clearFields.length > 0) {
await this.redis.hdel(key, ...clearFields);
}
if (fields.length > 0) {
await this.updateExistingJobHash(key, fields);
}
if (remSet) {
await this.redis.srem(remSet, streamId);
}
if (addSet) {
await this.redis.sadd(addSet, streamId);
}
await this.redis.expire(key, ttl);
if (!terminal) {
await this.redis.expire(KEYS.chunks(streamId), ttl);
await this.redis.expire(KEYS.runSteps(streamId), ttl);
}
return true;
} else {
const pipeline = this.redis.pipeline();
if (remSet) {
pipeline.srem(remSet, streamId);
}
if (addSet) {
pipeline.sadd(addSet, streamId);
}
if (!terminal) {
pipeline.expire(KEYS.chunks(streamId), ttl);
pipeline.expire(KEYS.runSteps(streamId), ttl);
}
await pipeline.exec();
}
const result = await this.redis.eval(
TRANSITION_STATUS_LUA,
5,
key,
remSet ?? '',
addSet ?? '',
KEYS.chunks(streamId),
KEYS.runSteps(streamId),
from,
streamId,
String(ttl),
terminal ? '0' : '1',
String(clearFields.length),
...clearFields,
...fields,
);
return result === 1;
return true;
}
private async updateExistingJobHash(key: string, fields: string[]): Promise<boolean> {
@ -576,8 +577,11 @@ export class RedisJobStore implements IJobStore {
return 1;
}
// Stale running job (failsafe - running for > configured TTL)
if (now - job.createdAt > this.ttl.running * 1000) {
// Stale running job (failsafe - running for > configured TTL).
// Keys off `lastActiveAt` when present so a just-resumed approval
// isn't reaped on the basis of its original creation time.
const liveSince = job.lastActiveAt ?? job.createdAt;
if (now - liveSince > this.ttl.running * 1000) {
logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`);
const userJobsKey = job.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
await this.deleteJobInternal(streamId, userJobsKey);
@ -1211,6 +1215,8 @@ export class RedisJobStore implements IJobStore {
contextUsage: data.contextUsage || undefined,
tokenUsage: data.tokenUsage || undefined,
pendingAction: this.parsePendingAction(data.pendingAction),
pendingActionId: data.pendingActionId || undefined,
lastActiveAt: data.lastActiveAt ? parseInt(data.lastActiveAt, 10) : undefined,
};
}

View file

@ -69,6 +69,22 @@ export interface SerializableJobData {
* run is waiting on. Cleared by the resume path before the job returns to `running`.
*/
pendingAction?: Agents.PendingAction;
/**
* Flat mirror of `pendingAction.actionId`, kept as a top-level field so an
* atomic status transition can guard on it (a nested JSON field can't be
* compared inside a Redis Lua CAS). Lets `resolve`/`expire` reject a stale
* decision that targets a different action than the one currently pending.
*/
pendingActionId?: string;
/**
* Liveness basis for the stale-running failsafe, refreshed when a paused job
* is resumed. Without it, cleanup keys off `createdAt`, so an approval that
* sat in `requires_action` past the running window would be reaped on the
* next tick right after resuming. Falls back to `createdAt` when unset.
*/
lastActiveAt?: number;
}
/**
@ -83,6 +99,12 @@ export interface JobStatusTransition {
patch?: Partial<SerializableJobData>;
/** Field names removed in the same atomic step (e.g. `pendingAction`). */
clear?: Array<keyof SerializableJobData & string>;
/**
* Additional guard: only fire if the job's `pendingActionId` equals this.
* Checked atomically alongside the `from` status so a stale decision can't
* resolve a job that has since paused for a different action.
*/
expectActionId?: string;
}
/**

View file

@ -350,6 +350,8 @@ export namespace Agents {
/** The question itself: free-form prompt with optional curated answers. */
export interface AskUserQuestionRequest {
question: string;
/** Optional descriptive context for the prompt; mirrors the SDK field. */
description?: string;
options?: AskUserQuestionOption[];
}
@ -383,6 +385,18 @@ export namespace Agents {
createdAt: number;
/** Optional expiry; clients should treat past `expiresAt` as stale */
expiresAt?: number;
/**
* SDK interrupt id (`RunInterruptResult.interruptId`). Persisted so a
* cross-process resume can correlate the decision with the LangGraph
* interrupt after the original `Run` object is gone.
*/
interruptId?: string;
/**
* LangGraph `thread_id` the run was bound to (`RunInterruptResult.threadId`).
* Required, with the checkpointer, to rebuild `Command({ resume })` on a
* worker that didn't originate the run.
*/
threadId?: string;
}
/**