mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-28 02:11:30 +00:00
🏛️ refactor: Deepen HITL approval lifecycle into one race-safe seam
Architecture-review candidate #1 (+ #4). The requires_action lifecycle was three shallow pass-throughs over updateJob with the legal transitions smeared across JSDoc, the JobStatus union, and each store adapter — and the resume transition was NOT race-safe: the Redis lua checked existence, not status, so two concurrent approval submits both drove the run (re-executing tools / double-billing). - IJobStore.transitionStatus: atomic compare-and-set status transition that only fires if the job is currently `from`. InMemory: sync compare. Redis: single-node lua with a status guard (cluster best-effort, matching the existing posture); reconciles membership sets + TTLs to `to`. - New ApprovalLifecycle module: pause / peek / resolve / expire — guarded, race-safe transitions behind one interface. resolve() returns true to exactly one concurrent caller; the previously-undefined requires_action → aborted expiry edge is now explicit; peek treats past-expiresAt as gone (lazy expiry). - GenerationJobManager exposes `approvals` and delegates; the three shallow methods (mark/get/clearPendingAction) are removed — callers cross the deep interface. - #4: typeContract.spec asserts the SDK <-> data-provider HITL types stay compatible (fails the build on drift); RedisJobStore validates the pendingAction shape on deserialize instead of a bare JSON.parse (defends the cold-resume path against malformed/stale records). - Tests rewritten at the deep interface: double-resolve wins once, pause-on-terminal rejected, explicit expiry, lazy-expiry peek. No Slice B wiring — this deepens the existing scaffolding so the future resume route and run seam are born crossing one race-safe interface.
This commit is contained in:
parent
0629d60bf4
commit
089ba09f98
7 changed files with 491 additions and 95 deletions
44
packages/api/src/agents/hitl/typeContract.spec.ts
Normal file
44
packages/api/src/agents/hitl/typeContract.spec.ts
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
import type {
|
||||
HumanInterruptPayload as SdkHumanInterruptPayload,
|
||||
ToolApprovalRequest as SdkToolApprovalRequest,
|
||||
ToolApprovalDecisionType as SdkToolApprovalDecisionType,
|
||||
} from '@librechat/agents';
|
||||
import type { Agents } from 'librechat-data-provider';
|
||||
|
||||
/**
|
||||
* Compile-time contract between the SDK's HITL wire types and LibreChat's
|
||||
* `Agents.*` mirror in `librechat-data-provider`. The mirror is hand-maintained
|
||||
* (data-provider can't depend on `@librechat/agents`), so these assignability
|
||||
* checks are the seam that fails the build when the two drift.
|
||||
*
|
||||
* The assertions live inside the function signatures: each `accept*` function's
|
||||
* parameter type forces TypeScript to prove assignability at compile time. If
|
||||
* the SDK adds a field the mirror lacks (or a decision literal changes), this
|
||||
* file stops compiling — caught here instead of silently dropped on the Redis
|
||||
* round-trip. The runtime `expect`s exist only so Jest sees real tests.
|
||||
*/
|
||||
describe('HITL type contract: @librechat/agents ↔ librechat-data-provider', () => {
|
||||
test('the SDK interrupt payload is persistable as the LC mirror', () => {
|
||||
// Direction that matters most: `Run.getInterrupt()` returns the SDK payload,
|
||||
// which `approvals.pause()` persists as `Agents.PendingAction.payload`.
|
||||
// Losing a field here = silent data loss across the pause/resume boundary.
|
||||
const acceptLcPayload = (p: Agents.HumanInterruptPayload): Agents.HumanInterruptType => p.type;
|
||||
const fromSdk = (p: SdkHumanInterruptPayload) => acceptLcPayload(p);
|
||||
expect(typeof fromSdk).toBe('function');
|
||||
});
|
||||
|
||||
test('the SDK action request is persistable as the LC mirror', () => {
|
||||
const acceptLcRequest = (r: Agents.ToolApprovalRequest): string => r.tool_call_id;
|
||||
const fromSdk = (r: SdkToolApprovalRequest) => acceptLcRequest(r);
|
||||
expect(typeof fromSdk).toBe('function');
|
||||
});
|
||||
|
||||
test('decision-type literals match in both directions (resume input contract)', () => {
|
||||
// What an approval route sends to `run.resume()` must be a valid SDK
|
||||
// decision, and the LC mirror must enumerate exactly the SDK's literals.
|
||||
const lcToSdk = (d: Agents.ToolApprovalDecisionType): SdkToolApprovalDecisionType => d;
|
||||
const sdkToLc = (d: SdkToolApprovalDecisionType): Agents.ToolApprovalDecisionType => d;
|
||||
expect(typeof lcToSdk).toBe('function');
|
||||
expect(typeof sdkToLc).toBe('function');
|
||||
});
|
||||
});
|
||||
109
packages/api/src/stream/ApprovalLifecycle.ts
Normal file
109
packages/api/src/stream/ApprovalLifecycle.ts
Normal file
|
|
@ -0,0 +1,109 @@
|
|||
import { logger } from '@librechat/data-schemas';
|
||||
import type { Agents } from 'librechat-data-provider';
|
||||
import type { IJobStore } from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* The guarded lifecycle of a run paused for human review (`requires_action`).
|
||||
*
|
||||
* Owns the legal transitions — pause, resolve, expire — behind one interface,
|
||||
* on top of the store's atomic {@link IJobStore.transitionStatus}. Callers
|
||||
* (approval routes, the status endpoint, the run seam) cross this seam instead
|
||||
* of re-implementing the "is this transition legal from the current state, and
|
||||
* is it safe under a concurrent second submit" logic at each site.
|
||||
*
|
||||
* Race-safety is the point. Two approval clicks racing to resume the same job
|
||||
* must not both drive the run — a double-drive re-executes tools and
|
||||
* double-bills. {@link resolve} returns `true` to exactly one caller; the loser
|
||||
* gets `false`. The same guard protects {@link pause} (don't pause a job that
|
||||
* was aborted between the interrupt firing and the mark) and {@link expire}.
|
||||
*
|
||||
* State machine:
|
||||
* ```
|
||||
* running ──pause(pendingAction)──▶ requires_action
|
||||
* requires_action ──resolve()──────▶ running
|
||||
* requires_action ──expire()───────▶ aborted (the edge that was undefined)
|
||||
* ```
|
||||
*/
|
||||
export class ApprovalLifecycle {
|
||||
constructor(private readonly store: IJobStore) {}
|
||||
|
||||
/**
|
||||
* `running → requires_action`, attaching the pending review record.
|
||||
* Returns `false` when the job was not running (aborted mid-flight, gone),
|
||||
* so a late interrupt is dropped rather than pausing a dead job.
|
||||
*/
|
||||
async pause(streamId: string, pendingAction: Agents.PendingAction): Promise<boolean> {
|
||||
const ok = await this.store.transitionStatus(streamId, {
|
||||
from: 'running',
|
||||
to: 'requires_action',
|
||||
patch: { pendingAction },
|
||||
});
|
||||
if (ok) {
|
||||
logger.debug(
|
||||
`[ApprovalLifecycle] paused for review: ${streamId} action=${pendingAction.actionId}`,
|
||||
);
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
/**
|
||||
* The pending review record, or `null` when the job isn't awaiting review.
|
||||
* A past-`expiresAt` record reads as `null` (lazy expiry) so a stale prompt
|
||||
* is never surfaced to a UI or fed to a resume.
|
||||
*/
|
||||
async peek(streamId: string): Promise<Agents.PendingAction | null> {
|
||||
const job = await this.store.getJob(streamId);
|
||||
if (!job || job.status !== 'requires_action' || !job.pendingAction) {
|
||||
return null;
|
||||
}
|
||||
return this.isExpired(job.pendingAction) ? null : job.pendingAction;
|
||||
}
|
||||
|
||||
/**
|
||||
* `requires_action → running`, atomically. Returns `true` to the single
|
||||
* caller that won the transition; `false` if the job was not paused, was
|
||||
* already resumed by a racing submit, or had expired — in which case it is
|
||||
* moved to a terminal state instead of resumed.
|
||||
*
|
||||
* The caller MUST treat `false` as "do not drive the run": only the `true`
|
||||
* winner may re-enter the agent.
|
||||
*/
|
||||
async resolve(streamId: string): Promise<boolean> {
|
||||
const job = await this.store.getJob(streamId);
|
||||
if (
|
||||
job?.status === 'requires_action' &&
|
||||
job.pendingAction &&
|
||||
this.isExpired(job.pendingAction)
|
||||
) {
|
||||
await this.expire(streamId);
|
||||
return false;
|
||||
}
|
||||
return this.store.transitionStatus(streamId, {
|
||||
from: 'requires_action',
|
||||
to: 'running',
|
||||
clear: ['pendingAction'],
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* `requires_action → aborted`: the edge that fires when no decision arrives
|
||||
* in time. Previously undefined; now an explicit, idempotent terminal
|
||||
* transition. Returns `true` to the single caller that expired it.
|
||||
*/
|
||||
async expire(streamId: string): Promise<boolean> {
|
||||
const ok = await this.store.transitionStatus(streamId, {
|
||||
from: 'requires_action',
|
||||
to: 'aborted',
|
||||
clear: ['pendingAction'],
|
||||
patch: { error: 'Approval expired before a decision was made' },
|
||||
});
|
||||
if (ok) {
|
||||
logger.debug(`[ApprovalLifecycle] expired pending review: ${streamId}`);
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
private isExpired(pendingAction: Agents.PendingAction): boolean {
|
||||
return pendingAction.expiresAt != null && pendingAction.expiresAt <= Date.now();
|
||||
}
|
||||
}
|
||||
|
|
@ -20,6 +20,7 @@ import {
|
|||
import { InMemoryEventTransport } from './implementations/InMemoryEventTransport';
|
||||
import { InMemoryJobStore } from './implementations/InMemoryJobStore';
|
||||
import { filterPersistableAbortContent } from './abortContent';
|
||||
import { ApprovalLifecycle } from './ApprovalLifecycle';
|
||||
|
||||
/** Error surfaced to any client still attached when a stale/hung job is reaped. */
|
||||
const REAPED_JOB_ERROR = 'Generation timed out';
|
||||
|
|
@ -165,6 +166,8 @@ interface RuntimeJobState {
|
|||
class GenerationJobManagerClass {
|
||||
/** Job metadata + content state storage - swappable for Redis, etc. */
|
||||
private jobStore: IJobStore;
|
||||
/** Guarded human-review lifecycle (pause / resolve / expire) over the store. */
|
||||
private _approvals: ApprovalLifecycle;
|
||||
/** Event pub/sub transport - swappable for Redis Pub/Sub, etc. */
|
||||
private eventTransport: IEventTransport;
|
||||
|
||||
|
|
@ -191,6 +194,7 @@ class GenerationJobManagerClass {
|
|||
constructor(options?: GenerationJobManagerOptions) {
|
||||
this.jobStore =
|
||||
options?.jobStore ?? new InMemoryJobStore({ ttlAfterComplete: 0, maxJobs: 1000 });
|
||||
this._approvals = new ApprovalLifecycle(this.jobStore);
|
||||
this.eventTransport = options?.eventTransport ?? new InMemoryEventTransport();
|
||||
this._cleanupOnComplete = options?.cleanupOnComplete ?? true;
|
||||
}
|
||||
|
|
@ -249,6 +253,7 @@ class GenerationJobManagerClass {
|
|||
setGenerationJobsInFlight(previousStore, 0);
|
||||
|
||||
this.jobStore = services.jobStore;
|
||||
this._approvals = new ApprovalLifecycle(this.jobStore);
|
||||
this.eventTransport = services.eventTransport;
|
||||
this._isRedis = services.isRedis ?? false;
|
||||
this._cleanupOnComplete = services.cleanupOnComplete ?? true;
|
||||
|
|
@ -1386,51 +1391,18 @@ class GenerationJobManagerClass {
|
|||
}
|
||||
|
||||
/**
|
||||
* Transition a job to `requires_action` and persist the pending review record.
|
||||
* The guarded human-review lifecycle for paused runs:
|
||||
* `approvals.pause()` / `peek()` / `resolve()` / `expire()`.
|
||||
*
|
||||
* The job is NOT cleaned up: chunks, run steps, and user-active-set membership
|
||||
* remain so the resume path can rebuild context. The Redis job-hash TTL is
|
||||
* refreshed by the store to give the user the full TTL window to respond.
|
||||
*
|
||||
* @param streamId - The stream identifier
|
||||
* @param pendingAction - The pending review record (tool approval, etc.)
|
||||
* This is the seam approval routes, the status endpoint, and the run wiring
|
||||
* cross — it owns the legal `requires_action` transitions and is race-safe
|
||||
* against concurrent resumes (a double-resolve would otherwise drive the run
|
||||
* twice). The job's chunks, run steps, and user-active-set membership are
|
||||
* preserved across a pause so the resume path can rebuild context; the store
|
||||
* refreshes the job-hash TTL to give the user the full window to respond.
|
||||
*/
|
||||
async markRequiresAction(streamId: string, pendingAction: Agents.PendingAction): Promise<void> {
|
||||
await this.jobStore.updateJob(streamId, {
|
||||
status: 'requires_action',
|
||||
pendingAction,
|
||||
});
|
||||
logger.debug(
|
||||
`[GenerationJobManager] Job awaiting human review: ${streamId} action=${pendingAction.actionId}`,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the pending review record for a job.
|
||||
*
|
||||
* Returns null when the job doesn't exist, isn't in `requires_action`,
|
||||
* or has no recorded pending action. Callers (status endpoint, approval routes)
|
||||
* should treat null as "nothing to approve."
|
||||
*/
|
||||
async getPendingAction(streamId: string): Promise<Agents.PendingAction | null> {
|
||||
const jobData = await this.jobStore.getJob(streamId);
|
||||
if (!jobData || jobData.status !== 'requires_action') {
|
||||
return null;
|
||||
}
|
||||
return jobData.pendingAction ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the pending review record and return the job to `running`.
|
||||
* Called by the resume path after a user approval/rejection has been accepted
|
||||
* and the run is about to be re-driven.
|
||||
*/
|
||||
async clearPendingAction(streamId: string): Promise<void> {
|
||||
await this.jobStore.updateJob(streamId, {
|
||||
status: 'running',
|
||||
pendingAction: undefined,
|
||||
});
|
||||
logger.debug(`[GenerationJobManager] Cleared pending action: ${streamId}`);
|
||||
get approvals(): ApprovalLifecycle {
|
||||
return this._approvals;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import { GenerationJobManagerClass } from '~/stream/GenerationJobManager';
|
|||
|
||||
jest.spyOn(console, 'log').mockImplementation();
|
||||
|
||||
describe('GenerationJobManager pending-action lifecycle (in-memory)', () => {
|
||||
describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', () => {
|
||||
let manager: GenerationJobManagerClass;
|
||||
|
||||
beforeEach(() => {
|
||||
|
|
@ -37,76 +37,156 @@ describe('GenerationJobManager pending-action lifecycle (in-memory)', () => {
|
|||
return { ...action, ...overrides };
|
||||
}
|
||||
|
||||
test('markRequiresAction persists the pending action and transitions status', async () => {
|
||||
const streamId = 'stream-mark';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
describe('pause', () => {
|
||||
test('running → requires_action, persisting the pending record', async () => {
|
||||
const streamId = 'stream-pause';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
|
||||
const action = buildAction(streamId);
|
||||
await manager.markRequiresAction(streamId, action);
|
||||
const action = buildAction(streamId);
|
||||
expect(await manager.approvals.pause(streamId, action)).toBe(true);
|
||||
|
||||
const status = await manager.getJobStatus(streamId);
|
||||
expect(status).toBe('requires_action');
|
||||
expect(await manager.getJobStatus(streamId)).toBe('requires_action');
|
||||
const pending = await manager.approvals.peek(streamId);
|
||||
expect(pending?.actionId).toBe(action.actionId);
|
||||
expect(pending?.payload.type).toBe('tool_approval');
|
||||
if (pending?.payload.type === 'tool_approval') {
|
||||
expect(pending.payload.action_requests[0].name).toBe('shell');
|
||||
}
|
||||
});
|
||||
|
||||
const pending = await manager.getPendingAction(streamId);
|
||||
expect(pending).not.toBeNull();
|
||||
expect(pending?.actionId).toBe(action.actionId);
|
||||
expect(pending?.payload.type).toBe('tool_approval');
|
||||
if (pending?.payload.type === 'tool_approval') {
|
||||
expect(pending.payload.action_requests[0].name).toBe('shell');
|
||||
}
|
||||
test('returns false when the job is already terminal', async () => {
|
||||
const streamId = 'stream-pause-dead';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.completeJob(streamId, 'terminated mid-flight');
|
||||
|
||||
expect(await manager.approvals.pause(streamId, buildAction(streamId))).toBe(false);
|
||||
// a late interrupt must NOT resurrect a terminal job into requires_action
|
||||
expect(await manager.getJobStatus(streamId)).not.toBe('requires_action');
|
||||
});
|
||||
|
||||
test('returns false when the job does not exist', async () => {
|
||||
expect(await manager.approvals.pause('nonexistent', buildAction('nonexistent'))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
test('getPendingAction returns null for jobs not in requires_action', async () => {
|
||||
const streamId = 'stream-running';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
expect(await manager.getPendingAction(streamId)).toBeNull();
|
||||
describe('peek', () => {
|
||||
test('returns null for jobs not in requires_action', async () => {
|
||||
const streamId = 'stream-running';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
expect(await manager.approvals.peek(streamId)).toBeNull();
|
||||
});
|
||||
|
||||
test('returns null when the job does not exist', async () => {
|
||||
expect(await manager.approvals.peek('nonexistent')).toBeNull();
|
||||
});
|
||||
|
||||
test('treats a past-expiresAt record as gone (lazy expiry)', async () => {
|
||||
const streamId = 'stream-expired-peek';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.approvals.pause(
|
||||
streamId,
|
||||
buildAction(streamId, { expiresAt: Date.now() - 1000 }),
|
||||
);
|
||||
|
||||
expect(await manager.approvals.peek(streamId)).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
test('getPendingAction returns null when the job does not exist', async () => {
|
||||
expect(await manager.getPendingAction('nonexistent')).toBeNull();
|
||||
describe('resolve', () => {
|
||||
test('requires_action → running, clearing the record, returns true once', async () => {
|
||||
const streamId = 'stream-resolve';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.approvals.pause(streamId, buildAction(streamId));
|
||||
|
||||
expect(await manager.approvals.resolve(streamId)).toBe(true);
|
||||
expect(await manager.getJobStatus(streamId)).toBe('running');
|
||||
expect(await manager.approvals.peek(streamId)).toBeNull();
|
||||
});
|
||||
|
||||
test('a concurrent double-resolve wins exactly once (race-safe)', async () => {
|
||||
const streamId = 'stream-double-resolve';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.approvals.pause(streamId, buildAction(streamId));
|
||||
|
||||
const results = await Promise.all([
|
||||
manager.approvals.resolve(streamId),
|
||||
manager.approvals.resolve(streamId),
|
||||
]);
|
||||
|
||||
// Exactly one caller may drive the run — the other must be rejected.
|
||||
expect(results.filter(Boolean)).toHaveLength(1);
|
||||
expect(await manager.getJobStatus(streamId)).toBe('running');
|
||||
});
|
||||
|
||||
test('returns false when the job is not paused', async () => {
|
||||
const streamId = 'stream-resolve-running';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
expect(await manager.approvals.resolve(streamId)).toBe(false);
|
||||
});
|
||||
|
||||
test('an expired pending action expires instead of resuming', async () => {
|
||||
const streamId = 'stream-resolve-expired';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.approvals.pause(
|
||||
streamId,
|
||||
buildAction(streamId, { expiresAt: Date.now() - 1000 }),
|
||||
);
|
||||
|
||||
expect(await manager.approvals.resolve(streamId)).toBe(false);
|
||||
expect(await manager.getJobStatus(streamId)).toBe('aborted');
|
||||
});
|
||||
});
|
||||
|
||||
test('clearPendingAction returns the job to running and removes the pending record', async () => {
|
||||
const streamId = 'stream-clear';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
describe('expire', () => {
|
||||
test('requires_action → aborted, clearing the record, returns true once', async () => {
|
||||
const streamId = 'stream-expire';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
await manager.approvals.pause(streamId, buildAction(streamId));
|
||||
|
||||
await manager.markRequiresAction(streamId, buildAction(streamId));
|
||||
expect(await manager.getJobStatus(streamId)).toBe('requires_action');
|
||||
expect(await manager.approvals.expire(streamId)).toBe(true);
|
||||
expect(await manager.getJobStatus(streamId)).toBe('aborted');
|
||||
expect(await manager.approvals.peek(streamId)).toBeNull();
|
||||
|
||||
await manager.clearPendingAction(streamId);
|
||||
// idempotent — a second expire does not fire again
|
||||
expect(await manager.approvals.expire(streamId)).toBe(false);
|
||||
});
|
||||
|
||||
expect(await manager.getJobStatus(streamId)).toBe('running');
|
||||
expect(await manager.getPendingAction(streamId)).toBeNull();
|
||||
test('returns false when the job is not paused', async () => {
|
||||
const streamId = 'stream-expire-running';
|
||||
await manager.createJob(streamId, 'user-1');
|
||||
expect(await manager.approvals.expire(streamId)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
test('requires_action drops the running count but keeps the user-active set', async () => {
|
||||
const streamId = 'stream-counts';
|
||||
await manager.createJob(streamId, 'user-counts');
|
||||
describe('facade integration', () => {
|
||||
test('requires_action drops the running count but keeps the user-active set', async () => {
|
||||
const streamId = 'stream-counts';
|
||||
await manager.createJob(streamId, 'user-counts');
|
||||
|
||||
const beforeCounts = await manager.getJobCountByStatus();
|
||||
expect(beforeCounts.running).toBe(1);
|
||||
expect(beforeCounts.requires_action).toBe(0);
|
||||
const before = await manager.getJobCountByStatus();
|
||||
expect(before.running).toBe(1);
|
||||
expect(before.requires_action).toBe(0);
|
||||
|
||||
await manager.markRequiresAction(streamId, buildAction(streamId));
|
||||
await manager.approvals.pause(streamId, buildAction(streamId));
|
||||
|
||||
const afterCounts = await manager.getJobCountByStatus();
|
||||
expect(afterCounts.running).toBe(0);
|
||||
expect(afterCounts.requires_action).toBe(1);
|
||||
const after = await manager.getJobCountByStatus();
|
||||
expect(after.running).toBe(0);
|
||||
expect(after.requires_action).toBe(1);
|
||||
|
||||
// Pending-approval jobs still occupy the user's conversation slot.
|
||||
const active = await manager.getActiveJobIdsForUser('user-counts');
|
||||
expect(active).toContain(streamId);
|
||||
});
|
||||
// Pending-approval jobs still occupy the user's conversation slot.
|
||||
expect(await manager.getActiveJobIdsForUser('user-counts')).toContain(streamId);
|
||||
});
|
||||
|
||||
test('getActiveJobIdsForUser excludes terminal jobs but includes requires_action', async () => {
|
||||
await manager.createJob('s-running', 'user-mix');
|
||||
await manager.createJob('s-paused', 'user-mix');
|
||||
await manager.createJob('s-done', 'user-mix');
|
||||
test('getActiveJobIdsForUser excludes terminal jobs but includes requires_action', async () => {
|
||||
await manager.createJob('s-running', 'user-mix');
|
||||
await manager.createJob('s-paused', 'user-mix');
|
||||
await manager.createJob('s-done', 'user-mix');
|
||||
|
||||
await manager.markRequiresAction('s-paused', buildAction('s-paused'));
|
||||
await manager.completeJob('s-done');
|
||||
await manager.approvals.pause('s-paused', buildAction('s-paused'));
|
||||
await manager.completeJob('s-done');
|
||||
|
||||
const active = await manager.getActiveJobIdsForUser('user-mix');
|
||||
expect(active.sort()).toEqual(['s-paused', 's-running']);
|
||||
const active = await manager.getActiveJobIdsForUser('user-mix');
|
||||
expect(active.sort()).toEqual(['s-paused', 's-running']);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import type {
|
|||
UsageMetadata,
|
||||
IJobStore,
|
||||
JobStatus,
|
||||
JobStatusTransition,
|
||||
} from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
|
|
@ -137,6 +138,27 @@ export class InMemoryJobStore implements IJobStore {
|
|||
Object.assign(job, updates);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomic in-memory: the single-threaded event loop makes the
|
||||
* read-check-write sequence indivisible, so the status guard is exact.
|
||||
* Membership/counts derive from `job.status` directly, so there are no
|
||||
* sets to reconcile here.
|
||||
*/
|
||||
async transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean> {
|
||||
const job = this.jobs.get(streamId);
|
||||
if (!job || job.status !== args.from) {
|
||||
return false;
|
||||
}
|
||||
job.status = args.to;
|
||||
if (args.patch) {
|
||||
Object.assign(job, args.patch);
|
||||
}
|
||||
for (const field of args.clear ?? []) {
|
||||
delete job[field];
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async deleteJob(streamId: string): Promise<void> {
|
||||
this.jobs.delete(streamId);
|
||||
this.contentState.delete(streamId);
|
||||
|
|
|
|||
|
|
@ -8,8 +8,40 @@ import type {
|
|||
UsageMetadata,
|
||||
IJobStore,
|
||||
JobStatus,
|
||||
JobStatusTransition,
|
||||
} from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Atomic compare-and-set status transition (single-node / sentinel Redis).
|
||||
*
|
||||
* Guards on the current `status` field, then — in one indivisible step —
|
||||
* removes `clear` fields, writes the `status`+patch pairs, reconciles the
|
||||
* membership sets, and refreshes TTLs. Returns 1 if it fired, 0 if the job
|
||||
* was missing or no longer in the expected `from` status.
|
||||
*
|
||||
* KEYS: [job, remSet | "", addSet | "", chunks, runSteps]
|
||||
* ARGV: [from, member, ttl, refreshLive(0|1), hdelCount, ...hdel, ...hsetPairs]
|
||||
*/
|
||||
const TRANSITION_STATUS_LUA =
|
||||
'if redis.call("HGET", KEYS[1], "status") ~= ARGV[1] then return 0 end ' +
|
||||
'local member = ARGV[2] ' +
|
||||
'local ttl = tonumber(ARGV[3]) ' +
|
||||
'local refreshLive = ARGV[4] ' +
|
||||
'local hdelCount = tonumber(ARGV[5]) ' +
|
||||
'local idx = 6 ' +
|
||||
'for i = 1, hdelCount do redis.call("HDEL", KEYS[1], ARGV[idx]) idx = idx + 1 end ' +
|
||||
'local hset = {} ' +
|
||||
'for i = idx, #ARGV do hset[#hset + 1] = ARGV[i] end ' +
|
||||
'if #hset > 0 then redis.call("HSET", KEYS[1], unpack(hset)) end ' +
|
||||
'if KEYS[2] ~= "" then redis.call("SREM", KEYS[2], member) end ' +
|
||||
'if KEYS[3] ~= "" then redis.call("SADD", KEYS[3], member) end ' +
|
||||
'redis.call("EXPIRE", KEYS[1], ttl) ' +
|
||||
'if refreshLive == "1" then redis.call("EXPIRE", KEYS[4], ttl) redis.call("EXPIRE", KEYS[5], ttl) end ' +
|
||||
'return 1';
|
||||
|
||||
/** Decision kinds the SDK can emit, used to sanity-check persisted records. */
|
||||
const KNOWN_INTERRUPT_TYPES = new Set(['tool_approval', 'ask_user_question']);
|
||||
|
||||
/**
|
||||
* Key prefixes for Redis storage.
|
||||
* All keys include the streamId for easy cleanup.
|
||||
|
|
@ -276,6 +308,80 @@ export class RedisJobStore implements IJobStore {
|
|||
}
|
||||
}
|
||||
|
||||
/** The membership set a status belongs to; terminal statuses have none. */
|
||||
private statusSetKey(status: JobStatus): string | null {
|
||||
if (status === 'running') {
|
||||
return KEYS.runningJobs;
|
||||
}
|
||||
if (status === 'requires_action') {
|
||||
return KEYS.requiresActionJobs;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
async transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean> {
|
||||
const { from, to, patch, clear } = args;
|
||||
const key = KEYS.job(streamId);
|
||||
|
||||
// status + patch become HSET pairs; serializeJob skips undefined, so
|
||||
// cleared fields go through HDEL (`clear`) instead.
|
||||
const fields = Object.entries(
|
||||
this.serializeJob({ status: to, ...(patch ?? {}) } as SerializableJobData),
|
||||
).flat();
|
||||
const clearFields = (clear ?? []).map(String);
|
||||
|
||||
const remSet = this.statusSetKey(from);
|
||||
const addSet = this.statusSetKey(to);
|
||||
const terminal = addSet === null;
|
||||
const ttl = terminal ? this.ttl.completed : this.ttl.running;
|
||||
|
||||
if (this.isCluster) {
|
||||
// Membership sets live on a different slot from the job hash, so a single
|
||||
// cross-slot script isn't possible. Guard best-effort (read status, then
|
||||
// apply) — matches the existing cluster posture for status writes.
|
||||
const current = await this.redis.hget(key, 'status');
|
||||
if (current !== from) {
|
||||
return false;
|
||||
}
|
||||
if (clearFields.length > 0) {
|
||||
await this.redis.hdel(key, ...clearFields);
|
||||
}
|
||||
if (fields.length > 0) {
|
||||
await this.updateExistingJobHash(key, fields);
|
||||
}
|
||||
if (remSet) {
|
||||
await this.redis.srem(remSet, streamId);
|
||||
}
|
||||
if (addSet) {
|
||||
await this.redis.sadd(addSet, streamId);
|
||||
}
|
||||
await this.redis.expire(key, ttl);
|
||||
if (!terminal) {
|
||||
await this.redis.expire(KEYS.chunks(streamId), ttl);
|
||||
await this.redis.expire(KEYS.runSteps(streamId), ttl);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
const result = await this.redis.eval(
|
||||
TRANSITION_STATUS_LUA,
|
||||
5,
|
||||
key,
|
||||
remSet ?? '',
|
||||
addSet ?? '',
|
||||
KEYS.chunks(streamId),
|
||||
KEYS.runSteps(streamId),
|
||||
from,
|
||||
streamId,
|
||||
String(ttl),
|
||||
terminal ? '0' : '1',
|
||||
String(clearFields.length),
|
||||
...clearFields,
|
||||
...fields,
|
||||
);
|
||||
return result === 1;
|
||||
}
|
||||
|
||||
private async updateExistingJobHash(key: string, fields: string[]): Promise<boolean> {
|
||||
const updated = await this.redis.eval(
|
||||
'if redis.call("EXISTS", KEYS[1]) == 1 then redis.call("HSET", KEYS[1], unpack(ARGV)) return 1 else return 0 end',
|
||||
|
|
@ -1104,7 +1210,34 @@ export class RedisJobStore implements IJobStore {
|
|||
replayEvents: data.replayEvents || undefined,
|
||||
contextUsage: data.contextUsage || undefined,
|
||||
tokenUsage: data.tokenUsage || undefined,
|
||||
pendingAction: data.pendingAction ? JSON.parse(data.pendingAction) : undefined,
|
||||
pendingAction: this.parsePendingAction(data.pendingAction),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a persisted `pendingAction`, defending the cold-resume path against
|
||||
* malformed or stale records: a corrupt JSON blob or a payload whose shape
|
||||
* predates the current SDK contract is dropped (logged) rather than crashing
|
||||
* the resume or feeding a bad record to an approval route. Returns undefined
|
||||
* when absent/invalid.
|
||||
*/
|
||||
private parsePendingAction(raw: string | undefined): Agents.PendingAction | undefined {
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as Agents.PendingAction;
|
||||
const typeOk =
|
||||
typeof parsed?.actionId === 'string' &&
|
||||
KNOWN_INTERRUPT_TYPES.has(parsed?.payload?.type as string);
|
||||
if (!typeOk) {
|
||||
logger.warn('[RedisJobStore] Dropping malformed pendingAction record');
|
||||
return undefined;
|
||||
}
|
||||
return parsed;
|
||||
} catch {
|
||||
logger.warn('[RedisJobStore] Dropping unparseable pendingAction record');
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,6 +71,20 @@ export interface SerializableJobData {
|
|||
pendingAction?: Agents.PendingAction;
|
||||
}
|
||||
|
||||
/**
|
||||
* Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set.
|
||||
*/
|
||||
export interface JobStatusTransition {
|
||||
/** Only fire the transition if the job is currently in this status. */
|
||||
from: JobStatus;
|
||||
/** Status to move to when the `from` guard holds. */
|
||||
to: JobStatus;
|
||||
/** Fields written in the same atomic step as the status change. */
|
||||
patch?: Partial<SerializableJobData>;
|
||||
/** Field names removed in the same atomic step (e.g. `pendingAction`). */
|
||||
clear?: Array<keyof SerializableJobData & string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Usage metadata for token spending across different LLM providers.
|
||||
*
|
||||
|
|
@ -211,6 +225,28 @@ export interface IJobStore {
|
|||
/** Update job data */
|
||||
updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void>;
|
||||
|
||||
/**
|
||||
* Atomically transition a job's status, **only if** it is currently `from`.
|
||||
* Returns `true` when the transition fired, `false` when the job was missing
|
||||
* or no longer in `from` (lost a race / illegal transition).
|
||||
*
|
||||
* `patch` fields are written and `clear` fields removed in the same atomic
|
||||
* step, and the running / requires_action membership sets plus live-key TTLs
|
||||
* are reconciled to match `to`. This is the race-safe primitive behind the
|
||||
* approval lifecycle — it prevents two concurrent resumes from both driving a
|
||||
* paused run (a double-drive would re-execute tools / double-bill).
|
||||
*
|
||||
* Distinct from {@link updateJob}, which writes status unconditionally for
|
||||
* callers that don't know the prior state. Reach for `transitionStatus`
|
||||
* whenever the legal prior state is known.
|
||||
*
|
||||
* Atomicity: fully atomic on in-memory and single-node / sentinel Redis
|
||||
* (Lua). On Redis Cluster the status guard is best-effort — the membership
|
||||
* sets live on a different hash slot from the job hash — matching the store's
|
||||
* existing cluster posture for status writes.
|
||||
*/
|
||||
transitionStatus(streamId: string, args: JobStatusTransition): Promise<boolean>;
|
||||
|
||||
/** Delete a job */
|
||||
deleteJob(streamId: string): Promise<void>;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue