mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-28 02:11:30 +00:00
🛡️ fix: Address Codex review on the HITL approval lifecycle
Seven findings on the lifecycle deepening (089ba09f9), all valid:
- F3 actionId guard: resolve/expire take an expectedActionId; pause records a
flat `pendingActionId` the atomic CAS guards on, so a stale decision can't
resume a job that has since paused for a different action.
- F4 cluster single-winner: transitionStatus now decides the winner with an
atomic CAS on the single-slot job hash (one Lua, cluster-safe), then
reconciles cross-slot membership sets — two concurrent resolves can no
longer both win on Redis Cluster.
- F1 resume reaping: resolve refreshes `lastActiveAt`; both stores' stale-
running failsafes key off it, so a long-paused approval isn't reaped right
after resuming.
- F2 expire completedAt: expire writes completedAt so terminal cleanup
reclaims the job (InMemory only cleans terminal jobs with completedAt set).
- F5 facade: buildJobFacade copies pendingAction into metadata so status/
resume routes can render the prompt.
- F6 resume metadata: PendingAction + buildPendingAction carry the SDK
interruptId/threadId needed to rebuild Command({ resume }) cross-process.
- F7 mirror: data-provider AskUserQuestionRequest gains optional description.
Tests added at the interface: stale-actionId resolve rejected, expire sets
completedAt. tsc + lint clean; policy + type-contract specs pass.
This commit is contained in:
parent
089ba09f98
commit
abf4b86291
8 changed files with 157 additions and 62 deletions
|
|
@ -117,6 +117,10 @@ export interface PendingActionContext {
|
|||
ttlMs?: number;
|
||||
/** Override actionId; defaults to a fresh uuid. */
|
||||
actionId?: string;
|
||||
/** SDK interrupt id (`RunInterruptResult.interruptId`) for cross-process resume. */
|
||||
interruptId?: string;
|
||||
/** LangGraph `thread_id` (`RunInterruptResult.threadId`) for cross-process resume. */
|
||||
threadId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -140,5 +144,7 @@ export function buildPendingAction(
|
|||
payload,
|
||||
createdAt,
|
||||
expiresAt: typeof ctx.ttlMs === 'number' ? createdAt + ctx.ttlMs : undefined,
|
||||
interruptId: ctx.interruptId,
|
||||
threadId: ctx.threadId,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,8 @@ export class ApprovalLifecycle {
|
|||
const ok = await this.store.transitionStatus(streamId, {
|
||||
from: 'running',
|
||||
to: 'requires_action',
|
||||
patch: { pendingAction },
|
||||
// pendingActionId is the flat mirror the atomic resolve/expire guard on.
|
||||
patch: { pendingAction, pendingActionId: pendingAction.actionId },
|
||||
});
|
||||
if (ok) {
|
||||
logger.debug(
|
||||
|
|
@ -62,40 +63,54 @@ export class ApprovalLifecycle {
|
|||
/**
|
||||
* `requires_action → running`, atomically. Returns `true` to the single
|
||||
* caller that won the transition; `false` if the job was not paused, was
|
||||
* already resumed by a racing submit, or had expired — in which case it is
|
||||
* moved to a terminal state instead of resumed.
|
||||
* already resumed by a racing submit, no longer matches `expectedActionId`,
|
||||
* or had expired — in which case it is moved to a terminal state instead of
|
||||
* resumed.
|
||||
*
|
||||
* Pass `expectedActionId` (the id the user actually decided on, from the
|
||||
* approval route) so a stale decision can't resume a job that has since
|
||||
* paused for a *different* action. Omit it only for callers with no specific
|
||||
* action in hand.
|
||||
*
|
||||
* The caller MUST treat `false` as "do not drive the run": only the `true`
|
||||
* winner may re-enter the agent.
|
||||
*/
|
||||
async resolve(streamId: string): Promise<boolean> {
|
||||
async resolve(streamId: string, expectedActionId?: string): Promise<boolean> {
|
||||
const job = await this.store.getJob(streamId);
|
||||
if (
|
||||
job?.status === 'requires_action' &&
|
||||
job.pendingAction &&
|
||||
this.isExpired(job.pendingAction)
|
||||
) {
|
||||
await this.expire(streamId);
|
||||
await this.expire(streamId, expectedActionId);
|
||||
return false;
|
||||
}
|
||||
return this.store.transitionStatus(streamId, {
|
||||
from: 'requires_action',
|
||||
to: 'running',
|
||||
clear: ['pendingAction'],
|
||||
clear: ['pendingAction', 'pendingActionId'],
|
||||
// Refresh the liveness basis so a long-paused run isn't reaped as stale
|
||||
// immediately after resuming (cleanup keys off lastActiveAt).
|
||||
patch: { lastActiveAt: Date.now() },
|
||||
expectActionId: expectedActionId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* `requires_action → aborted`: the edge that fires when no decision arrives
|
||||
* in time. Previously undefined; now an explicit, idempotent terminal
|
||||
* transition. Returns `true` to the single caller that expired it.
|
||||
* transition. Returns `true` to the single caller that expired it. Honors
|
||||
* `expectedActionId` for the same stale-decision protection as `resolve`.
|
||||
*/
|
||||
async expire(streamId: string): Promise<boolean> {
|
||||
async expire(streamId: string, expectedActionId?: string): Promise<boolean> {
|
||||
const ok = await this.store.transitionStatus(streamId, {
|
||||
from: 'requires_action',
|
||||
to: 'aborted',
|
||||
clear: ['pendingAction'],
|
||||
patch: { error: 'Approval expired before a decision was made' },
|
||||
clear: ['pendingAction', 'pendingActionId'],
|
||||
// completedAt lets the stores' terminal-cleanup reclaim the job; without
|
||||
// it an expired approval lingers in the in-memory map indefinitely.
|
||||
patch: { error: 'Approval expired before a decision was made', completedAt: Date.now() },
|
||||
expectActionId: expectedActionId,
|
||||
});
|
||||
if (ok) {
|
||||
logger.debug(`[ApprovalLifecycle] expired pending review: ${streamId}`);
|
||||
|
|
|
|||
|
|
@ -480,6 +480,9 @@ class GenerationJobManagerClass {
|
|||
iconURL: jobData.iconURL,
|
||||
model: jobData.model,
|
||||
promptTokens: jobData.promptTokens,
|
||||
// Surface the pending review so status/resume routes built on the
|
||||
// facade can render the prompt for a `requires_action` job.
|
||||
pendingAction: jobData.pendingAction,
|
||||
},
|
||||
readyPromise: runtime.readyPromise,
|
||||
resolveReady: runtime.resolveReady,
|
||||
|
|
|
|||
|
|
@ -124,6 +124,21 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', ()
|
|||
expect(await manager.approvals.resolve(streamId)).toBe(false);
|
||||
});
|
||||
|
||||
test('rejects a resolve whose actionId no longer matches (stale-decision guard)', async () => {
|
||||
const streamId = 'stream-stale-action';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
const action = buildAction(streamId);
|
||||
await manager.approvals.pause(streamId, action);
|
||||
|
||||
// A decision targeting a different action must not resume this one.
|
||||
expect(await manager.approvals.resolve(streamId, 'some-other-action-id')).toBe(false);
|
||||
expect(await manager.getJobStatus(streamId)).toBe('requires_action');
|
||||
|
||||
// The matching actionId resolves it.
|
||||
expect(await manager.approvals.resolve(streamId, action.actionId)).toBe(true);
|
||||
expect(await manager.getJobStatus(streamId)).toBe('running');
|
||||
});
|
||||
|
||||
test('an expired pending action expires instead of resuming', async () => {
|
||||
const streamId = 'stream-resolve-expired';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
|
|
@ -156,6 +171,17 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', ()
|
|||
await manager.createJob(streamId, 'user-1');
|
||||
expect(await manager.approvals.expire(streamId)).toBe(false);
|
||||
});
|
||||
|
||||
test('sets completedAt so terminal cleanup can reclaim the job', async () => {
|
||||
const streamId = 'stream-expire-completed';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.approvals.pause(streamId, buildAction(streamId));
|
||||
|
||||
expect(await manager.approvals.expire(streamId)).toBe(true);
|
||||
const job = await manager.getJob(streamId);
|
||||
expect(job?.status).toBe('aborted');
|
||||
expect(job?.completedAt).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('facade integration', () => {
|
||||
|
|
|
|||
|
|
@ -149,6 +149,9 @@ export class InMemoryJobStore implements IJobStore {
|
|||
if (!job || job.status !== args.from) {
|
||||
return false;
|
||||
}
|
||||
if (args.expectActionId != null && job.pendingActionId !== args.expectActionId) {
|
||||
return false;
|
||||
}
|
||||
job.status = args.to;
|
||||
if (args.patch) {
|
||||
Object.assign(job, args.patch);
|
||||
|
|
@ -210,7 +213,7 @@ export class InMemoryJobStore implements IJobStore {
|
|||
// content state in memory until the process OOMs. Reaping keys off last
|
||||
// activity (not creation time) so a long but live stream is never reaped,
|
||||
// mirroring RedisJobStore refreshing the running TTL on each chunk.
|
||||
const lastActive = this.lastActivity.get(streamId) ?? job.createdAt;
|
||||
const lastActive = this.lastActivity.get(streamId) ?? job.lastActiveAt ?? job.createdAt;
|
||||
if (now - lastActive > this.staleJobTimeout) {
|
||||
toDelete.push(streamId);
|
||||
staleRunning++;
|
||||
|
|
|
|||
|
|
@ -12,31 +12,30 @@ import type {
|
|||
} from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Atomic compare-and-set status transition (single-node / sentinel Redis).
|
||||
* Atomic compare-and-set on the job hash — the single-winner decision for a
|
||||
* status transition. Touches ONLY the job key, which lives on one hash slot, so
|
||||
* it is atomic on both single-node and Redis Cluster (cross-slot membership
|
||||
* sets are reconciled by the caller AFTER this decides the winner).
|
||||
*
|
||||
* Guards on the current `status` field, then — in one indivisible step —
|
||||
* removes `clear` fields, writes the `status`+patch pairs, reconciles the
|
||||
* membership sets, and refreshes TTLs. Returns 1 if it fired, 0 if the job
|
||||
* was missing or no longer in the expected `from` status.
|
||||
* Guards on the current `status` and, when ARGV[2] is non-empty, on the flat
|
||||
* `pendingActionId` field — so a stale decision targeting a different action
|
||||
* loses. On success: removes `clear` fields, writes `status`+patch pairs,
|
||||
* refreshes the job-hash TTL. Returns 1 if it fired, 0 otherwise.
|
||||
*
|
||||
* KEYS: [job, remSet | "", addSet | "", chunks, runSteps]
|
||||
* ARGV: [from, member, ttl, refreshLive(0|1), hdelCount, ...hdel, ...hsetPairs]
|
||||
* KEYS: [job]
|
||||
* ARGV: [from, expectActionId | "", ttl, hdelCount, ...hdelFields, ...hsetPairs]
|
||||
*/
|
||||
const TRANSITION_STATUS_LUA =
|
||||
const JOB_CAS_LUA =
|
||||
'if redis.call("HGET", KEYS[1], "status") ~= ARGV[1] then return 0 end ' +
|
||||
'local member = ARGV[2] ' +
|
||||
'if ARGV[2] ~= "" and redis.call("HGET", KEYS[1], "pendingActionId") ~= ARGV[2] then return 0 end ' +
|
||||
'local ttl = tonumber(ARGV[3]) ' +
|
||||
'local refreshLive = ARGV[4] ' +
|
||||
'local hdelCount = tonumber(ARGV[5]) ' +
|
||||
'local idx = 6 ' +
|
||||
'local hdelCount = tonumber(ARGV[4]) ' +
|
||||
'local idx = 5 ' +
|
||||
'for i = 1, hdelCount do redis.call("HDEL", KEYS[1], ARGV[idx]) idx = idx + 1 end ' +
|
||||
'local hset = {} ' +
|
||||
'for i = idx, #ARGV do hset[#hset + 1] = ARGV[i] end ' +
|
||||
'if #hset > 0 then redis.call("HSET", KEYS[1], unpack(hset)) end ' +
|
||||
'if KEYS[2] ~= "" then redis.call("SREM", KEYS[2], member) end ' +
|
||||
'if KEYS[3] ~= "" then redis.call("SADD", KEYS[3], member) end ' +
|
||||
'redis.call("EXPIRE", KEYS[1], ttl) ' +
|
||||
'if refreshLive == "1" then redis.call("EXPIRE", KEYS[4], ttl) redis.call("EXPIRE", KEYS[5], ttl) end ' +
|
||||
'return 1';
|
||||
|
||||
/** Decision kinds the SDK can emit, used to sanity-check persisted records. */
|
||||
|
|
@ -320,7 +319,7 @@ export class RedisJobStore implements IJobStore {
|
|||
}
|
||||
|
||||
async transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean> {
|
||||
const { from, to, patch, clear } = args;
|
||||
const { from, to, patch, clear, expectActionId } = args;
|
||||
const key = KEYS.job(streamId);
|
||||
|
||||
// status + patch become HSET pairs; serializeJob skips undefined, so
|
||||
|
|
@ -335,51 +334,53 @@ export class RedisJobStore implements IJobStore {
|
|||
const terminal = addSet === null;
|
||||
const ttl = terminal ? this.ttl.completed : this.ttl.running;
|
||||
|
||||
// 1) Single-winner decision: an atomic CAS on the single-slot job hash.
|
||||
// Works identically on cluster and single-node, so two concurrent
|
||||
// resolves can never both win (and drive the run twice).
|
||||
const won = await this.redis.eval(
|
||||
JOB_CAS_LUA,
|
||||
1,
|
||||
key,
|
||||
from,
|
||||
expectActionId ?? '',
|
||||
String(ttl),
|
||||
String(clearFields.length),
|
||||
...clearFields,
|
||||
...fields,
|
||||
);
|
||||
if (won !== 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 2) Reconcile membership sets + live-key TTLs. Only the winner reaches
|
||||
// here; set membership is derived state that periodic cleanup
|
||||
// self-heals, so this non-atomic cross-slot step is safe.
|
||||
if (this.isCluster) {
|
||||
// Membership sets live on a different slot from the job hash, so a single
|
||||
// cross-slot script isn't possible. Guard best-effort (read status, then
|
||||
// apply) — matches the existing cluster posture for status writes.
|
||||
const current = await this.redis.hget(key, 'status');
|
||||
if (current !== from) {
|
||||
return false;
|
||||
}
|
||||
if (clearFields.length > 0) {
|
||||
await this.redis.hdel(key, ...clearFields);
|
||||
}
|
||||
if (fields.length > 0) {
|
||||
await this.updateExistingJobHash(key, fields);
|
||||
}
|
||||
if (remSet) {
|
||||
await this.redis.srem(remSet, streamId);
|
||||
}
|
||||
if (addSet) {
|
||||
await this.redis.sadd(addSet, streamId);
|
||||
}
|
||||
await this.redis.expire(key, ttl);
|
||||
if (!terminal) {
|
||||
await this.redis.expire(KEYS.chunks(streamId), ttl);
|
||||
await this.redis.expire(KEYS.runSteps(streamId), ttl);
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
if (remSet) {
|
||||
pipeline.srem(remSet, streamId);
|
||||
}
|
||||
if (addSet) {
|
||||
pipeline.sadd(addSet, streamId);
|
||||
}
|
||||
if (!terminal) {
|
||||
pipeline.expire(KEYS.chunks(streamId), ttl);
|
||||
pipeline.expire(KEYS.runSteps(streamId), ttl);
|
||||
}
|
||||
await pipeline.exec();
|
||||
}
|
||||
|
||||
const result = await this.redis.eval(
|
||||
TRANSITION_STATUS_LUA,
|
||||
5,
|
||||
key,
|
||||
remSet ?? '',
|
||||
addSet ?? '',
|
||||
KEYS.chunks(streamId),
|
||||
KEYS.runSteps(streamId),
|
||||
from,
|
||||
streamId,
|
||||
String(ttl),
|
||||
terminal ? '0' : '1',
|
||||
String(clearFields.length),
|
||||
...clearFields,
|
||||
...fields,
|
||||
);
|
||||
return result === 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
private async updateExistingJobHash(key: string, fields: string[]): Promise<boolean> {
|
||||
|
|
@ -576,8 +577,11 @@ export class RedisJobStore implements IJobStore {
|
|||
return 1;
|
||||
}
|
||||
|
||||
// Stale running job (failsafe - running for > configured TTL)
|
||||
if (now - job.createdAt > this.ttl.running * 1000) {
|
||||
// Stale running job (failsafe - running for > configured TTL).
|
||||
// Keys off `lastActiveAt` when present so a just-resumed approval
|
||||
// isn't reaped on the basis of its original creation time.
|
||||
const liveSince = job.lastActiveAt ?? job.createdAt;
|
||||
if (now - liveSince > this.ttl.running * 1000) {
|
||||
logger.warn(`[RedisJobStore] Cleaning up stale job: ${streamId}`);
|
||||
const userJobsKey = job.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
|
||||
await this.deleteJobInternal(streamId, userJobsKey);
|
||||
|
|
@ -1211,6 +1215,8 @@ export class RedisJobStore implements IJobStore {
|
|||
contextUsage: data.contextUsage || undefined,
|
||||
tokenUsage: data.tokenUsage || undefined,
|
||||
pendingAction: this.parsePendingAction(data.pendingAction),
|
||||
pendingActionId: data.pendingActionId || undefined,
|
||||
lastActiveAt: data.lastActiveAt ? parseInt(data.lastActiveAt, 10) : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -69,6 +69,22 @@ export interface SerializableJobData {
|
|||
* run is waiting on. Cleared by the resume path before the job returns to `running`.
|
||||
*/
|
||||
pendingAction?: Agents.PendingAction;
|
||||
|
||||
/**
|
||||
* Flat mirror of `pendingAction.actionId`, kept as a top-level field so an
|
||||
* atomic status transition can guard on it (a nested JSON field can't be
|
||||
* compared inside a Redis Lua CAS). Lets `resolve`/`expire` reject a stale
|
||||
* decision that targets a different action than the one currently pending.
|
||||
*/
|
||||
pendingActionId?: string;
|
||||
|
||||
/**
|
||||
* Liveness basis for the stale-running failsafe, refreshed when a paused job
|
||||
* is resumed. Without it, cleanup keys off `createdAt`, so an approval that
|
||||
* sat in `requires_action` past the running window would be reaped on the
|
||||
* next tick right after resuming. Falls back to `createdAt` when unset.
|
||||
*/
|
||||
lastActiveAt?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -83,6 +99,12 @@ export interface JobStatusTransition {
|
|||
patch?: Partial<SerializableJobData>;
|
||||
/** Field names removed in the same atomic step (e.g. `pendingAction`). */
|
||||
clear?: Array<keyof SerializableJobData & string>;
|
||||
/**
|
||||
* Additional guard: only fire if the job's `pendingActionId` equals this.
|
||||
* Checked atomically alongside the `from` status so a stale decision can't
|
||||
* resolve a job that has since paused for a different action.
|
||||
*/
|
||||
expectActionId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -350,6 +350,8 @@ export namespace Agents {
|
|||
/** The question itself: free-form prompt with optional curated answers. */
|
||||
export interface AskUserQuestionRequest {
|
||||
question: string;
|
||||
/** Optional descriptive context for the prompt; mirrors the SDK field. */
|
||||
description?: string;
|
||||
options?: AskUserQuestionOption[];
|
||||
}
|
||||
|
||||
|
|
@ -383,6 +385,18 @@ export namespace Agents {
|
|||
createdAt: number;
|
||||
/** Optional expiry; clients should treat past `expiresAt` as stale */
|
||||
expiresAt?: number;
|
||||
/**
|
||||
* SDK interrupt id (`RunInterruptResult.interruptId`). Persisted so a
|
||||
* cross-process resume can correlate the decision with the LangGraph
|
||||
* interrupt after the original `Run` object is gone.
|
||||
*/
|
||||
interruptId?: string;
|
||||
/**
|
||||
* LangGraph `thread_id` the run was bound to (`RunInterruptResult.threadId`).
|
||||
* Required, with the checkpointer, to rebuild `Command({ resume })` on a
|
||||
* worker that didn't originate the run.
|
||||
*/
|
||||
threadId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue