🏛️ refactor: Deepen HITL approval lifecycle into one race-safe seam

Architecture-review candidate #1 (+ #4). The requires_action lifecycle was
three shallow pass-throughs over updateJob with the legal transitions
smeared across JSDoc, the JobStatus union, and each store adapter — and the
resume transition was NOT race-safe: the Redis lua checked existence, not
status, so two concurrent approval submits both drove the run (re-executing
tools / double-billing).

- IJobStore.transitionStatus: atomic compare-and-set status transition that
  only fires if the job is currently `from`. InMemory: sync compare. Redis:
  single-node lua with a status guard (cluster best-effort, matching the
  existing posture); reconciles membership sets + TTLs to `to`.
- New ApprovalLifecycle module: pause / peek / resolve / expire — guarded,
  race-safe transitions behind one interface. resolve() returns true to
  exactly one concurrent caller; the previously-undefined
  requires_action → aborted expiry edge is now explicit; peek treats
  past-expiresAt as gone (lazy expiry).
- GenerationJobManager exposes `approvals` and delegates; the three shallow
  methods (mark/get/clearPendingAction) are removed — callers cross the deep
  interface.
- #4: typeContract.spec asserts the SDK <-> data-provider HITL types stay
  compatible (fails the build on drift); RedisJobStore validates the
  pendingAction shape on deserialize instead of a bare JSON.parse (defends
  the cold-resume path against malformed/stale records).
- Tests rewritten at the deep interface: double-resolve wins once,
  pause-on-terminal rejected, explicit expiry, lazy-expiry peek.

No Slice B wiring — this deepens the existing scaffolding so the future
resume route and run seam are born crossing one race-safe interface.
This commit is contained in:
Danny Avila 2026-06-16 11:27:18 -04:00
parent 0629d60bf4
commit 089ba09f98
7 changed files with 491 additions and 95 deletions

View file

@ -0,0 +1,44 @@
import type {
HumanInterruptPayload as SdkHumanInterruptPayload,
ToolApprovalRequest as SdkToolApprovalRequest,
ToolApprovalDecisionType as SdkToolApprovalDecisionType,
} from '@librechat/agents';
import type { Agents } from 'librechat-data-provider';
/**
* Compile-time contract between the SDK's HITL wire types and LibreChat's
* `Agents.*` mirror in `librechat-data-provider`. The mirror is hand-maintained
* (data-provider can't depend on `@librechat/agents`), so these assignability
* checks are the seam that fails the build when the two drift.
*
* The assertions live inside the function signatures: each `accept*` function's
* parameter type forces TypeScript to prove assignability at compile time. If
* the SDK adds a field the mirror lacks (or a decision literal changes), this
* file stops compiling caught here instead of silently dropped on the Redis
* round-trip. The runtime `expect`s exist only so Jest sees real tests.
*/
describe('HITL type contract: @librechat/agents ↔ librechat-data-provider', () => {
test('the SDK interrupt payload is persistable as the LC mirror', () => {
// Direction that matters most: `Run.getInterrupt()` returns the SDK payload,
// which `approvals.pause()` persists as `Agents.PendingAction.payload`.
// Losing a field here = silent data loss across the pause/resume boundary.
const acceptLcPayload = (p: Agents.HumanInterruptPayload): Agents.HumanInterruptType => p.type;
const fromSdk = (p: SdkHumanInterruptPayload) => acceptLcPayload(p);
expect(typeof fromSdk).toBe('function');
});
test('the SDK action request is persistable as the LC mirror', () => {
const acceptLcRequest = (r: Agents.ToolApprovalRequest): string => r.tool_call_id;
const fromSdk = (r: SdkToolApprovalRequest) => acceptLcRequest(r);
expect(typeof fromSdk).toBe('function');
});
test('decision-type literals match in both directions (resume input contract)', () => {
// What an approval route sends to `run.resume()` must be a valid SDK
// decision, and the LC mirror must enumerate exactly the SDK's literals.
const lcToSdk = (d: Agents.ToolApprovalDecisionType): SdkToolApprovalDecisionType => d;
const sdkToLc = (d: SdkToolApprovalDecisionType): Agents.ToolApprovalDecisionType => d;
expect(typeof lcToSdk).toBe('function');
expect(typeof sdkToLc).toBe('function');
});
});

View file

@ -0,0 +1,109 @@
import { logger } from '@librechat/data-schemas';
import type { Agents } from 'librechat-data-provider';
import type { IJobStore } from '~/stream/interfaces/IJobStore';
/**
* The guarded lifecycle of a run paused for human review (`requires_action`).
*
* Owns the legal transitions pause, resolve, expire behind one interface,
* on top of the store's atomic {@link IJobStore.transitionStatus}. Callers
* (approval routes, the status endpoint, the run seam) cross this seam instead
* of re-implementing the "is this transition legal from the current state, and
* is it safe under a concurrent second submit" logic at each site.
*
* Race-safety is the point. Two approval clicks racing to resume the same job
* must not both drive the run a double-drive re-executes tools and
* double-bills. {@link resolve} returns `true` to exactly one caller; the loser
* gets `false`. The same guard protects {@link pause} (don't pause a job that
* was aborted between the interrupt firing and the mark) and {@link expire}.
*
* State machine:
* ```
* running pause(pendingAction) requires_action
* requires_action resolve() running
* requires_action expire() aborted (the edge that was undefined)
* ```
*/
export class ApprovalLifecycle {
constructor(private readonly store: IJobStore) {}
/**
* `running → requires_action`, attaching the pending review record.
* Returns `false` when the job was not running (aborted mid-flight, gone),
* so a late interrupt is dropped rather than pausing a dead job.
*/
async pause(streamId: string, pendingAction: Agents.PendingAction): Promise<boolean> {
const ok = await this.store.transitionStatus(streamId, {
from: 'running',
to: 'requires_action',
patch: { pendingAction },
});
if (ok) {
logger.debug(
`[ApprovalLifecycle] paused for review: ${streamId} action=${pendingAction.actionId}`,
);
}
return ok;
}
/**
* The pending review record, or `null` when the job isn't awaiting review.
* A past-`expiresAt` record reads as `null` (lazy expiry) so a stale prompt
* is never surfaced to a UI or fed to a resume.
*/
async peek(streamId: string): Promise<Agents.PendingAction | null> {
const job = await this.store.getJob(streamId);
if (!job || job.status !== 'requires_action' || !job.pendingAction) {
return null;
}
return this.isExpired(job.pendingAction) ? null : job.pendingAction;
}
/**
* `requires_action → running`, atomically. Returns `true` to the single
* caller that won the transition; `false` if the job was not paused, was
* already resumed by a racing submit, or had expired in which case it is
* moved to a terminal state instead of resumed.
*
* The caller MUST treat `false` as "do not drive the run": only the `true`
* winner may re-enter the agent.
*/
async resolve(streamId: string): Promise<boolean> {
const job = await this.store.getJob(streamId);
if (
job?.status === 'requires_action' &&
job.pendingAction &&
this.isExpired(job.pendingAction)
) {
await this.expire(streamId);
return false;
}
return this.store.transitionStatus(streamId, {
from: 'requires_action',
to: 'running',
clear: ['pendingAction'],
});
}
/**
* `requires_action → aborted`: the edge that fires when no decision arrives
* in time. Previously undefined; now an explicit, idempotent terminal
* transition. Returns `true` to the single caller that expired it.
*/
async expire(streamId: string): Promise<boolean> {
const ok = await this.store.transitionStatus(streamId, {
from: 'requires_action',
to: 'aborted',
clear: ['pendingAction'],
patch: { error: 'Approval expired before a decision was made' },
});
if (ok) {
logger.debug(`[ApprovalLifecycle] expired pending review: ${streamId}`);
}
return ok;
}
private isExpired(pendingAction: Agents.PendingAction): boolean {
return pendingAction.expiresAt != null && pendingAction.expiresAt <= Date.now();
}
}

View file

@ -20,6 +20,7 @@ import {
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
import { filterPersistableAbortContent } from './abortContent';
import { ApprovalLifecycle } from './ApprovalLifecycle';
/** Error surfaced to any client still attached when a stale/hung job is reaped. */
const REAPED_JOB_ERROR = 'Generation timed out';
@ -165,6 +166,8 @@ interface RuntimeJobState {
class GenerationJobManagerClass {
/** Job metadata + content state storage - swappable for Redis, etc. */
private jobStore: IJobStore;
/** Guarded human-review lifecycle (pause / resolve / expire) over the store. */
private _approvals: ApprovalLifecycle;
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
private eventTransport: IEventTransport;
@ -191,6 +194,7 @@ class GenerationJobManagerClass {
constructor(options?: GenerationJobManagerOptions) {
this.jobStore =
options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 0, maxJobs: 1000 });
this._approvals = new ApprovalLifecycle(this.jobStore);
this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport();
this._cleanupOnComplete = options?.cleanupOnComplete ?? true;
}
@ -249,6 +253,7 @@ class GenerationJobManagerClass {
setGenerationJobsInFlight(previousStore, 0);
this.jobStore = services.jobStore;
this._approvals = new ApprovalLifecycle(this.jobStore);
this.eventTransport = services.eventTransport;
this._isRedis = services.isRedis ?? false;
this._cleanupOnComplete = services.cleanupOnComplete ?? true;
@ -1386,51 +1391,18 @@ class GenerationJobManagerClass {
}
/**
* Transition a job to `requires_action` and persist the pending review record.
* The guarded human-review lifecycle for paused runs:
* `approvals.pause()` / `peek()` / `resolve()` / `expire()`.
*
* The job is NOT cleaned up: chunks, run steps, and user-active-set membership
* remain so the resume path can rebuild context. The Redis job-hash TTL is
* refreshed by the store to give the user the full TTL window to respond.
*
* @param streamId - The stream identifier
* @param pendingAction - The pending review record (tool approval, etc.)
* This is the seam approval routes, the status endpoint, and the run wiring
* cross it owns the legal `requires_action` transitions and is race-safe
* against concurrent resumes (a double-resolve would otherwise drive the run
* twice). The job's chunks, run steps, and user-active-set membership are
* preserved across a pause so the resume path can rebuild context; the store
* refreshes the job-hash TTL to give the user the full window to respond.
*/
async markRequiresAction(streamId: string, pendingAction: Agents.PendingAction): Promise<void> {
await this.jobStore.updateJob(streamId, {
status: 'requires_action',
pendingAction,
});
logger.debug(
`[GenerationJobManager] Job awaiting human review: ${streamId} action=${pendingAction.actionId}`,
);
}
/**
* Read the pending review record for a job.
*
* Returns null when the job doesn't exist, isn't in `requires_action`,
* or has no recorded pending action. Callers (status endpoint, approval routes)
* should treat null as "nothing to approve."
*/
async getPendingAction(streamId: string): Promise<Agents.PendingAction | null> {
const jobData = await this.jobStore.getJob(streamId);
if (!jobData || jobData.status !== 'requires_action') {
return null;
}
return jobData.pendingAction ?? null;
}
/**
* Clear the pending review record and return the job to `running`.
* Called by the resume path after a user approval/rejection has been accepted
* and the run is about to be re-driven.
*/
async clearPendingAction(streamId: string): Promise<void> {
await this.jobStore.updateJob(streamId, {
status: 'running',
pendingAction: undefined,
});
logger.debug(`[GenerationJobManager] Cleared pending action: ${streamId}`);
get approvals(): ApprovalLifecycle {
return this._approvals;
}
/**

View file

@ -6,7 +6,7 @@ import { GenerationJobManagerClass } from '~/stream/GenerationJobManager';
jest.spyOn(console, 'log').mockImplementation();
describe('GenerationJobManager pending-action lifecycle (in-memory)', () => {
describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', () => {
let manager: GenerationJobManagerClass;
beforeEach(() => {
@ -37,76 +37,156 @@ describe('GenerationJobManager pending-action lifecycle (in-memory)', () => {
return { ...action, ...overrides };
}
test('markRequiresAction persists the pending action and transitions status', async () => {
const streamId = 'stream-mark';
await manager.createJob(streamId, 'user-1');
describe('pause', () => {
test('running → requires_action, persisting the pending record', async () => {
const streamId = 'stream-pause';
await manager.createJob(streamId, 'user-1');
const action = buildAction(streamId);
await manager.markRequiresAction(streamId, action);
const action = buildAction(streamId);
expect(await manager.approvals.pause(streamId, action)).toBe(true);
const status = await manager.getJobStatus(streamId);
expect(status).toBe('requires_action');
expect(await manager.getJobStatus(streamId)).toBe('requires_action');
const pending = await manager.approvals.peek(streamId);
expect(pending?.actionId).toBe(action.actionId);
expect(pending?.payload.type).toBe('tool_approval');
if (pending?.payload.type === 'tool_approval') {
expect(pending.payload.action_requests[0].name).toBe('shell');
}
});
const pending = await manager.getPendingAction(streamId);
expect(pending).not.toBeNull();
expect(pending?.actionId).toBe(action.actionId);
expect(pending?.payload.type).toBe('tool_approval');
if (pending?.payload.type === 'tool_approval') {
expect(pending.payload.action_requests[0].name).toBe('shell');
}
test('returns false when the job is already terminal', async () => {
const streamId = 'stream-pause-dead';
await manager.createJob(streamId, 'user-1');
await manager.completeJob(streamId, 'terminated mid-flight');
expect(await manager.approvals.pause(streamId, buildAction(streamId))).toBe(false);
// a late interrupt must NOT resurrect a terminal job into requires_action
expect(await manager.getJobStatus(streamId)).not.toBe('requires_action');
});
test('returns false when the job does not exist', async () => {
expect(await manager.approvals.pause('nonexistent', buildAction('nonexistent'))).toBe(false);
});
});
test('getPendingAction returns null for jobs not in requires_action', async () => {
const streamId = 'stream-running';
await manager.createJob(streamId, 'user-1');
expect(await manager.getPendingAction(streamId)).toBeNull();
describe('peek', () => {
test('returns null for jobs not in requires_action', async () => {
const streamId = 'stream-running';
await manager.createJob(streamId, 'user-1');
expect(await manager.approvals.peek(streamId)).toBeNull();
});
test('returns null when the job does not exist', async () => {
expect(await manager.approvals.peek('nonexistent')).toBeNull();
});
test('treats a past-expiresAt record as gone (lazy expiry)', async () => {
const streamId = 'stream-expired-peek';
await manager.createJob(streamId, 'user-1');
await manager.approvals.pause(
streamId,
buildAction(streamId, { expiresAt: Date.now() - 1000 }),
);
expect(await manager.approvals.peek(streamId)).toBeNull();
});
});
test('getPendingAction returns null when the job does not exist', async () => {
expect(await manager.getPendingAction('nonexistent')).toBeNull();
describe('resolve', () => {
test('requires_action → running, clearing the record, returns true once', async () => {
const streamId = 'stream-resolve';
await manager.createJob(streamId, 'user-1');
await manager.approvals.pause(streamId, buildAction(streamId));
expect(await manager.approvals.resolve(streamId)).toBe(true);
expect(await manager.getJobStatus(streamId)).toBe('running');
expect(await manager.approvals.peek(streamId)).toBeNull();
});
test('a concurrent double-resolve wins exactly once (race-safe)', async () => {
const streamId = 'stream-double-resolve';
await manager.createJob(streamId, 'user-1');
await manager.approvals.pause(streamId, buildAction(streamId));
const results = await Promise.all([
manager.approvals.resolve(streamId),
manager.approvals.resolve(streamId),
]);
// Exactly one caller may drive the run — the other must be rejected.
expect(results.filter(Boolean)).toHaveLength(1);
expect(await manager.getJobStatus(streamId)).toBe('running');
});
test('returns false when the job is not paused', async () => {
const streamId = 'stream-resolve-running';
await manager.createJob(streamId, 'user-1');
expect(await manager.approvals.resolve(streamId)).toBe(false);
});
test('an expired pending action expires instead of resuming', async () => {
const streamId = 'stream-resolve-expired';
await manager.createJob(streamId, 'user-1');
await manager.approvals.pause(
streamId,
buildAction(streamId, { expiresAt: Date.now() - 1000 }),
);
expect(await manager.approvals.resolve(streamId)).toBe(false);
expect(await manager.getJobStatus(streamId)).toBe('aborted');
});
});
test('clearPendingAction returns the job to running and removes the pending record', async () => {
const streamId = 'stream-clear';
await manager.createJob(streamId, 'user-1');
describe('expire', () => {
test('requires_action → aborted, clearing the record, returns true once', async () => {
const streamId = 'stream-expire';
await manager.createJob(streamId, 'user-1');
await manager.approvals.pause(streamId, buildAction(streamId));
await manager.markRequiresAction(streamId, buildAction(streamId));
expect(await manager.getJobStatus(streamId)).toBe('requires_action');
expect(await manager.approvals.expire(streamId)).toBe(true);
expect(await manager.getJobStatus(streamId)).toBe('aborted');
expect(await manager.approvals.peek(streamId)).toBeNull();
await manager.clearPendingAction(streamId);
// idempotent — a second expire does not fire again
expect(await manager.approvals.expire(streamId)).toBe(false);
});
expect(await manager.getJobStatus(streamId)).toBe('running');
expect(await manager.getPendingAction(streamId)).toBeNull();
test('returns false when the job is not paused', async () => {
const streamId = 'stream-expire-running';
await manager.createJob(streamId, 'user-1');
expect(await manager.approvals.expire(streamId)).toBe(false);
});
});
test('requires_action drops the running count but keeps the user-active set', async () => {
const streamId = 'stream-counts';
await manager.createJob(streamId, 'user-counts');
describe('facade integration', () => {
test('requires_action drops the running count but keeps the user-active set', async () => {
const streamId = 'stream-counts';
await manager.createJob(streamId, 'user-counts');
const beforeCounts = await manager.getJobCountByStatus();
expect(beforeCounts.running).toBe(1);
expect(beforeCounts.requires_action).toBe(0);
const before = await manager.getJobCountByStatus();
expect(before.running).toBe(1);
expect(before.requires_action).toBe(0);
await manager.markRequiresAction(streamId, buildAction(streamId));
await manager.approvals.pause(streamId, buildAction(streamId));
const afterCounts = await manager.getJobCountByStatus();
expect(afterCounts.running).toBe(0);
expect(afterCounts.requires_action).toBe(1);
const after = await manager.getJobCountByStatus();
expect(after.running).toBe(0);
expect(after.requires_action).toBe(1);
// Pending-approval jobs still occupy the user's conversation slot.
const active = await manager.getActiveJobIdsForUser('user-counts');
expect(active).toContain(streamId);
});
// Pending-approval jobs still occupy the user's conversation slot.
expect(await manager.getActiveJobIdsForUser('user-counts')).toContain(streamId);
});
test('getActiveJobIdsForUser excludes terminal jobs but includes requires_action', async () => {
await manager.createJob('s-running', 'user-mix');
await manager.createJob('s-paused', 'user-mix');
await manager.createJob('s-done', 'user-mix');
test('getActiveJobIdsForUser excludes terminal jobs but includes requires_action', async () => {
await manager.createJob('s-running', 'user-mix');
await manager.createJob('s-paused', 'user-mix');
await manager.createJob('s-done', 'user-mix');
await manager.markRequiresAction('s-paused', buildAction('s-paused'));
await manager.completeJob('s-done');
await manager.approvals.pause('s-paused', buildAction('s-paused'));
await manager.completeJob('s-done');
const active = await manager.getActiveJobIdsForUser('user-mix');
expect(active.sort()).toEqual(['s-paused', 's-running']);
const active = await manager.getActiveJobIdsForUser('user-mix');
expect(active.sort()).toEqual(['s-paused', 's-running']);
});
});
});

View file

@ -6,6 +6,7 @@ import type {
UsageMetadata,
IJobStore,
JobStatus,
JobStatusTransition,
} from '~/stream/interfaces/IJobStore';
/**
@ -137,6 +138,27 @@ export class InMemoryJobStore implements IJobStore {
Object.assign(job, updates);
}
/**
* Atomic in-memory: the single-threaded event loop makes the
* read-check-write sequence indivisible, so the status guard is exact.
* Membership/counts derive from `job.status` directly, so there are no
* sets to reconcile here.
*/
async transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean> {
const job = this.jobs.get(streamId);
if (!job || job.status !== args.from) {
return false;
}
job.status = args.to;
if (args.patch) {
Object.assign(job, args.patch);
}
for (const field of args.clear ?? []) {
delete job[field];
}
return true;
}
async deleteJob(streamId: string): Promise<void> {
this.jobs.delete(streamId);
this.contentState.delete(streamId);

View file

@ -8,8 +8,40 @@ import type {
UsageMetadata,
IJobStore,
JobStatus,
JobStatusTransition,
} from '~/stream/interfaces/IJobStore';
/**
* Atomic compare-and-set status transition (single-node / sentinel Redis).
*
* Guards on the current `status` field, then in one indivisible step
* removes `clear` fields, writes the `status`+patch pairs, reconciles the
* membership sets, and refreshes TTLs. Returns 1 if it fired, 0 if the job
* was missing or no longer in the expected `from` status.
*
* KEYS: [job, remSet | "", addSet | "", chunks, runSteps]
* ARGV: [from, member, ttl, refreshLive(0|1), hdelCount, ...hdel, ...hsetPairs]
*/
const TRANSITION_STATUS_LUA =
'if redis.call("HGET", KEYS[1], "status") ~= ARGV[1] then return 0 end ' +
'local member = ARGV[2] ' +
'local ttl = tonumber(ARGV[3]) ' +
'local refreshLive = ARGV[4] ' +
'local hdelCount = tonumber(ARGV[5]) ' +
'local idx = 6 ' +
'for i = 1, hdelCount do redis.call("HDEL", KEYS[1], ARGV[idx]) idx = idx + 1 end ' +
'local hset = {} ' +
'for i = idx, #ARGV do hset[#hset + 1] = ARGV[i] end ' +
'if #hset > 0 then redis.call("HSET", KEYS[1], unpack(hset)) end ' +
'if KEYS[2] ~= "" then redis.call("SREM", KEYS[2], member) end ' +
'if KEYS[3] ~= "" then redis.call("SADD", KEYS[3], member) end ' +
'redis.call("EXPIRE", KEYS[1], ttl) ' +
'if refreshLive == "1" then redis.call("EXPIRE", KEYS[4], ttl) redis.call("EXPIRE", KEYS[5], ttl) end ' +
'return 1';
/** Decision kinds the SDK can emit, used to sanity-check persisted records. */
const KNOWN_INTERRUPT_TYPES = new Set(['tool_approval', 'ask_user_question']);
/**
* Key prefixes for Redis storage.
* All keys include the streamId for easy cleanup.
@ -276,6 +308,80 @@ export class RedisJobStore implements IJobStore {
}
}
/** The membership set a status belongs to; terminal statuses have none. */
private statusSetKey(status: JobStatus): string | null {
if (status === 'running') {
return KEYS.runningJobs;
}
if (status === 'requires_action') {
return KEYS.requiresActionJobs;
}
return null;
}
async transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean> {
const { from, to, patch, clear } = args;
const key = KEYS.job(streamId);
// status + patch become HSET pairs; serializeJob skips undefined, so
// cleared fields go through HDEL (`clear`) instead.
const fields = Object.entries(
this.serializeJob({ status: to, ...(patch ?? {}) } as SerializableJobData),
).flat();
const clearFields = (clear ?? []).map(String);
const remSet = this.statusSetKey(from);
const addSet = this.statusSetKey(to);
const terminal = addSet === null;
const ttl = terminal ? this.ttl.completed : this.ttl.running;
if (this.isCluster) {
// Membership sets live on a different slot from the job hash, so a single
// cross-slot script isn't possible. Guard best-effort (read status, then
// apply) — matches the existing cluster posture for status writes.
const current = await this.redis.hget(key, 'status');
if (current !== from) {
return false;
}
if (clearFields.length > 0) {
await this.redis.hdel(key, ...clearFields);
}
if (fields.length > 0) {
await this.updateExistingJobHash(key, fields);
}
if (remSet) {
await this.redis.srem(remSet, streamId);
}
if (addSet) {
await this.redis.sadd(addSet, streamId);
}
await this.redis.expire(key, ttl);
if (!terminal) {
await this.redis.expire(KEYS.chunks(streamId), ttl);
await this.redis.expire(KEYS.runSteps(streamId), ttl);
}
return true;
}
const result = await this.redis.eval(
TRANSITION_STATUS_LUA,
5,
key,
remSet ?? '',
addSet ?? '',
KEYS.chunks(streamId),
KEYS.runSteps(streamId),
from,
streamId,
String(ttl),
terminal ? '0' : '1',
String(clearFields.length),
...clearFields,
...fields,
);
return result === 1;
}
private async updateExistingJobHash(key: string, fields: string[]): Promise<boolean> {
const updated = await this.redis.eval(
'if redis.call("EXISTS", KEYS[1]) == 1 then redis.call("HSET", KEYS[1], unpack(ARGV)) return 1 else return 0 end',
@ -1104,7 +1210,34 @@ export class RedisJobStore implements IJobStore {
replayEvents: data.replayEvents || undefined,
contextUsage: data.contextUsage || undefined,
tokenUsage: data.tokenUsage || undefined,
pendingAction: data.pendingAction ? JSON.parse(data.pendingAction) : undefined,
pendingAction: this.parsePendingAction(data.pendingAction),
};
}
/**
* Parse a persisted `pendingAction`, defending the cold-resume path against
* malformed or stale records: a corrupt JSON blob or a payload whose shape
* predates the current SDK contract is dropped (logged) rather than crashing
* the resume or feeding a bad record to an approval route. Returns undefined
* when absent/invalid.
*/
private parsePendingAction(raw: string | undefined): Agents.PendingAction | undefined {
if (!raw) {
return undefined;
}
try {
const parsed = JSON.parse(raw) as Agents.PendingAction;
const typeOk =
typeof parsed?.actionId === 'string' &&
KNOWN_INTERRUPT_TYPES.has(parsed?.payload?.type as string);
if (!typeOk) {
logger.warn('[RedisJobStore] Dropping malformed pendingAction record');
return undefined;
}
return parsed;
} catch {
logger.warn('[RedisJobStore] Dropping unparseable pendingAction record');
return undefined;
}
}
}

View file

@ -71,6 +71,20 @@ export interface SerializableJobData {
pendingAction?: Agents.PendingAction;
}
/**
* Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set.
*/
export interface JobStatusTransition {
/** Only fire the transition if the job is currently in this status. */
from: JobStatus;
/** Status to move to when the `from` guard holds. */
to: JobStatus;
/** Fields written in the same atomic step as the status change. */
patch?: Partial<SerializableJobData>;
/** Field names removed in the same atomic step (e.g. `pendingAction`). */
clear?: Array<keyof SerializableJobData & string>;
}
/**
* Usage metadata for token spending across different LLM providers.
*
@ -211,6 +225,28 @@ export interface IJobStore {
/** Update job data */
updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void>;
/**
* Atomically transition a job's status, **only if** it is currently `from`.
* Returns `true` when the transition fired, `false` when the job was missing
* or no longer in `from` (lost a race / illegal transition).
*
* `patch` fields are written and `clear` fields removed in the same atomic
* step, and the running / requires_action membership sets plus live-key TTLs
* are reconciled to match `to`. This is the race-safe primitive behind the
* approval lifecycle it prevents two concurrent resumes from both driving a
* paused run (a double-drive would re-execute tools / double-bill).
*
* Distinct from {@link updateJob}, which writes status unconditionally for
* callers that don't know the prior state. Reach for `transitionStatus`
* whenever the legal prior state is known.
*
* Atomicity: fully atomic on in-memory and single-node / sentinel Redis
* (Lua). On Redis Cluster the status guard is best-effort the membership
* sets live on a different hash slot from the job hash matching the store's
* existing cluster posture for status writes.
*/
transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean>;
/** Delete a job */
deleteJob(streamId: string): Promise<void>;