mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-28 02:11:30 +00:00
🛡️ fix: Address Codex round 4 — paused-job edge cases across the stack
Five P2 findings on 4324a4e776, all valid:
- I1 message validation: validateMessageReq's active-job read bypass now
accepts a live requires_action job, so a new-conversation run that pauses
before its final save can recover the prompt instead of 404ing.
- I2 expire targets the observed record: resolve()'s expired path passes
`expectedActionId ?? job.pendingAction.actionId`, so a concurrent
resume+re-pause can't let expire abort a different action.
- I3 stale/malformed prompts: new isPendingActionStale (missing OR expired)
drives active-listing exclusion + cleanup expiry in both stores, and the
status route + middleware require a live pendingAction — a requires_action
job whose pendingAction was dropped on deserialize no longer reads active.
- I4 in-memory parity: InMemory updateJob mirrors pendingActionId on pause and
clears it + refreshes lastActiveAt on resume (matching RedisJobStore), so a
pause via the generic path is still resolvable by actionId.
- I5 long approval windows: paused-job live TTL (job/chunks/run-steps) now
covers pendingAction.expiresAt + grace (pauseTtlSeconds), on both the
transitionStatus and updateJob pause paths, so Redis can't evict a paused
job before its decision window closes.
tsc + lint clean; policy + type-contract specs pass.
This commit is contained in:
parent
4324a4e776
commit
c8abd826e1
6 changed files with 94 additions and 28 deletions
|
|
@ -20,7 +20,16 @@ async function canReadActiveJobConversation(req, conversationId) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!job || job.status !== 'running') {
|
||||
// A job paused for human review is still active (consistent with /chat/status
|
||||
// and /chat/active), so a new-conversation run that pauses before its final
|
||||
// save can still recover the prompt — but only while it has a live,
|
||||
// resolvable prompt (missing/malformed or past-expiry reads as inactive).
|
||||
const pendingAction = job?.metadata?.pendingAction;
|
||||
const pendingLive =
|
||||
!!pendingAction && (pendingAction.expiresAt == null || pendingAction.expiresAt > Date.now());
|
||||
const isActive =
|
||||
!!job && (job.status === 'running' || (job.status === 'requires_action' && pendingLive));
|
||||
if (!isActive) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -203,13 +203,13 @@ router.get('/chat/status/:conversationId', async (req, res) => {
|
|||
// Avoid calling both getStreamInfo and getResumeState (both fetch content)
|
||||
const resumeState = await GenerationJobManager.getResumeState(conversationId);
|
||||
// A job paused for human review is still active (consistent with /chat/active),
|
||||
// so the client resumes/subscribes rather than treating it as finished — unless
|
||||
// its prompt has already expired (cleanup/expiry will finalize it).
|
||||
const pendingExpired =
|
||||
job.metadata.pendingAction?.expiresAt != null &&
|
||||
job.metadata.pendingAction.expiresAt <= Date.now();
|
||||
const isActive =
|
||||
job.status === 'running' || (job.status === 'requires_action' && !pendingExpired);
|
||||
// so the client resumes/subscribes rather than treating it as finished — but
|
||||
// only while it has a live, resolvable prompt: a missing/malformed or
|
||||
// past-expiry pendingAction reads as inactive (cleanup/expiry will finalize it).
|
||||
const pendingAction = job.metadata.pendingAction;
|
||||
const pendingLive =
|
||||
!!pendingAction && (pendingAction.expiresAt == null || pendingAction.expiresAt > Date.now());
|
||||
const isActive = job.status === 'running' || (job.status === 'requires_action' && pendingLive);
|
||||
|
||||
res.json({
|
||||
active: isActive,
|
||||
|
|
|
|||
|
|
@ -82,7 +82,10 @@ export class ApprovalLifecycle {
|
|||
job.pendingAction &&
|
||||
this.isExpired(job.pendingAction)
|
||||
) {
|
||||
await this.expire(streamId, expectedActionId);
|
||||
// Target the exact record observed as expired. If the caller didn't pin an
|
||||
// actionId, fall back to the one just read — otherwise a concurrent
|
||||
// resume + re-pause for a new action could let this expire abort it.
|
||||
await this.expire(streamId, expectedActionId ?? job.pendingAction.actionId);
|
||||
return false;
|
||||
}
|
||||
return this.store.transitionStatus(streamId, {
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import type {
|
|||
JobStatus,
|
||||
JobStatusTransition,
|
||||
} from '~/stream/interfaces/IJobStore';
|
||||
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
|
||||
import { isPendingActionStale } from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Content state for a job - volatile, in-memory only.
|
||||
|
|
@ -137,6 +137,17 @@ export class InMemoryJobStore implements IJobStore {
|
|||
return;
|
||||
}
|
||||
Object.assign(job, updates);
|
||||
// Mirror the guarded transitionStatus path so a pause/resume via this
|
||||
// generic update behaves identically (parity with RedisJobStore):
|
||||
// - mirror the flat pendingActionId the stale-decision guard compares;
|
||||
// - on resume to running, refresh lastActiveAt and drop the flat id.
|
||||
if (updates.pendingAction) {
|
||||
job.pendingActionId = updates.pendingAction.actionId;
|
||||
}
|
||||
if (updates.status === 'running') {
|
||||
job.lastActiveAt = updates.lastActiveAt ?? Date.now();
|
||||
delete job.pendingActionId;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -207,11 +218,11 @@ export class InMemoryJobStore implements IJobStore {
|
|||
if (this.ttlAfterComplete === 0 || now - job.completedAt > this.ttlAfterComplete) {
|
||||
toDelete.push(streamId);
|
||||
}
|
||||
} else if (job.status === 'requires_action' && isPendingActionExpired(job)) {
|
||||
// Past-due approval: finalize it (aborted) so it stops occupying the
|
||||
// user slot and its content state is reclaimed, mirroring
|
||||
// ApprovalLifecycle.expire(). Skipping it (active-list filter) alone
|
||||
// would leave the job resident indefinitely.
|
||||
} else if (job.status === 'requires_action' && isPendingActionStale(job)) {
|
||||
// Stale approval (expired, or missing/malformed pendingAction):
|
||||
// finalize it (aborted) so it stops occupying the user slot and its
|
||||
// content state is reclaimed, mirroring ApprovalLifecycle.expire().
|
||||
// Skipping it (active-list filter) alone would leave it resident.
|
||||
job.status = 'aborted';
|
||||
job.completedAt = now;
|
||||
job.error = 'Approval expired before a decision was made';
|
||||
|
|
@ -344,7 +355,7 @@ export class InMemoryJobStore implements IJobStore {
|
|||
// only while its prompt is live: a past-`expiresAt` approval no longer
|
||||
// counts as active (cleanup/expiry will finalize it).
|
||||
if (job && (job.status === 'running' || job.status === 'requires_action')) {
|
||||
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
|
||||
if (job.status === 'requires_action' && isPendingActionStale(job)) {
|
||||
continue;
|
||||
}
|
||||
activeIds.push(streamId);
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import type {
|
|||
JobStatus,
|
||||
JobStatusTransition,
|
||||
} from '~/stream/interfaces/IJobStore';
|
||||
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
|
||||
import { isPendingActionStale } from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Atomic compare-and-set on the job hash — the single-winner decision for a
|
||||
|
|
@ -266,7 +266,12 @@ export class RedisJobStore implements IJobStore {
|
|||
const fields = Object.entries(serialized).flat();
|
||||
|
||||
if (updates.status === 'requires_action') {
|
||||
await this.transitionToRequiresAction(key, streamId, fields);
|
||||
await this.transitionToRequiresAction(
|
||||
key,
|
||||
streamId,
|
||||
fields,
|
||||
this.pauseTtlSeconds(updates.pendingAction),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -345,6 +350,22 @@ export class RedisJobStore implements IJobStore {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Live-key TTL (seconds) for a paused job. Defaults to the running TTL but
|
||||
* extends to cover a pendingAction whose `expiresAt` is farther out, plus a
|
||||
* grace margin so a decision arriving right at the deadline can still resume.
|
||||
* Without this, a long approval window (e.g. 1h) on the default 20m stream
|
||||
* TTL would let Redis evict the paused job mid-window.
|
||||
*/
|
||||
private pauseTtlSeconds(pendingAction?: Agents.PendingAction): number {
|
||||
const exp = pendingAction?.expiresAt;
|
||||
if (exp == null) {
|
||||
return this.ttl.running;
|
||||
}
|
||||
const secondsUntilExpiry = Math.ceil((exp - Date.now()) / 1000) + 60;
|
||||
return Math.max(this.ttl.running, secondsUntilExpiry);
|
||||
}
|
||||
|
||||
/** The membership set a status belongs to; terminal statuses have none. */
|
||||
private statusSetKey(status: JobStatus): string | null {
|
||||
if (status === 'running') {
|
||||
|
|
@ -370,7 +391,13 @@ export class RedisJobStore implements IJobStore {
|
|||
const remSet = this.statusSetKey(from);
|
||||
const addSet = this.statusSetKey(to);
|
||||
const terminal = addSet === null;
|
||||
const ttl = terminal ? this.ttl.completed : this.ttl.running;
|
||||
let ttl = terminal ? this.ttl.completed : this.ttl.running;
|
||||
if (to === 'requires_action') {
|
||||
// A paused job must outlive its approval window, even when that window is
|
||||
// longer than the running TTL — otherwise Redis evicts it before a
|
||||
// decision can resume it.
|
||||
ttl = this.pauseTtlSeconds(patch?.pendingAction);
|
||||
}
|
||||
|
||||
// 1) Single-winner decision: an atomic CAS on the single-slot job hash.
|
||||
// Works identically on cluster and single-node, so two concurrent
|
||||
|
|
@ -436,9 +463,12 @@ export class RedisJobStore implements IJobStore {
|
|||
key: string,
|
||||
streamId: string,
|
||||
fields: string[],
|
||||
ttlSeconds: number = this.ttl.running,
|
||||
): Promise<void> {
|
||||
// Job paused for human review — non-terminal. Keep the user-active set
|
||||
// untouched so resume can rebuild state from the persisted job.
|
||||
// untouched so resume can rebuild state from the persisted job. The live
|
||||
// TTL covers the approval window (see pauseTtlSeconds) so a long-pending
|
||||
// job isn't evicted before a decision arrives.
|
||||
if (this.isCluster) {
|
||||
const exists = await this.redis.exists(key);
|
||||
if (exists !== 1) {
|
||||
|
|
@ -447,7 +477,9 @@ export class RedisJobStore implements IJobStore {
|
|||
await this.redis.srem(KEYS.runningJobs, streamId);
|
||||
await this.redis.sadd(KEYS.requiresActionJobs, streamId);
|
||||
await this.updateExistingJobHash(key, fields);
|
||||
await this.refreshLiveJobTtls(key, streamId);
|
||||
await this.redis.expire(key, ttlSeconds);
|
||||
await this.redis.expire(KEYS.chunks(streamId), ttlSeconds);
|
||||
await this.redis.expire(KEYS.runSteps(streamId), ttlSeconds);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -460,7 +492,7 @@ export class RedisJobStore implements IJobStore {
|
|||
KEYS.chunks(streamId),
|
||||
KEYS.runSteps(streamId),
|
||||
streamId,
|
||||
String(this.ttl.running),
|
||||
String(ttlSeconds),
|
||||
...fields,
|
||||
);
|
||||
}
|
||||
|
|
@ -675,11 +707,12 @@ export class RedisJobStore implements IJobStore {
|
|||
return 1;
|
||||
}
|
||||
|
||||
// Past-due approval: finalize it (aborted) so it stops occupying the
|
||||
// slot and its stream contents are reclaimed, mirroring
|
||||
// ApprovalLifecycle.expire(). transitionStatus runs the terminal
|
||||
// content cleanup (sets, chunks, run-steps, userJobs, completed TTL).
|
||||
if (isPendingActionExpired(job)) {
|
||||
// Stale approval (expired, or missing/malformed pendingAction):
|
||||
// finalize it (aborted) so it stops occupying the slot and its stream
|
||||
// contents are reclaimed, mirroring ApprovalLifecycle.expire().
|
||||
// transitionStatus runs the terminal content cleanup (sets, chunks,
|
||||
// run-steps, userJobs, completed TTL).
|
||||
if (isPendingActionStale(job)) {
|
||||
await this.transitionStatus(streamId, {
|
||||
from: 'requires_action',
|
||||
to: 'aborted',
|
||||
|
|
@ -779,7 +812,7 @@ export class RedisJobStore implements IJobStore {
|
|||
// counts as active (cleanup/expiry will finalize it), so the client stops
|
||||
// polling and can complete.
|
||||
if (job && (job.status === 'running' || job.status === 'requires_action')) {
|
||||
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
|
||||
if (job.status === 'requires_action' && isPendingActionStale(job)) {
|
||||
continue;
|
||||
}
|
||||
activeIds.push(streamId);
|
||||
|
|
|
|||
|
|
@ -97,6 +97,16 @@ export function isPendingActionExpired(job: Pick<SerializableJobData, 'pendingAc
|
|||
return exp != null && exp <= Date.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether a `requires_action` job has no live, resolvable prompt — either the
|
||||
* pendingAction is missing/malformed (e.g. dropped on deserialize) or past its
|
||||
* `expiresAt`. Such a job can't be rendered or resolved, so it must be kept out
|
||||
* of active listings and finalized by cleanup rather than left stuck active.
|
||||
*/
|
||||
export function isPendingActionStale(job: Pick<SerializableJobData, 'pendingAction'>): boolean {
|
||||
return !job.pendingAction || isPendingActionExpired(job);
|
||||
}
|
||||
|
||||
/**
|
||||
* Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue