mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-28 02:11:30 +00:00
🛡️ fix: Address Codex round 2 on the HITL Redis adapter
Five P2 findings on abf4b86291, all valid Redis-adapter consequences of
round 1:
- G1 terminal cleanup on expiry: transitionStatus's terminal path now runs
the same chunk/run-step/userJobs cleanup as updateJob (extracted into a
shared applyTerminalContentCleanup). Expired approvals no longer leave
Redis stream contents around for the full running TTL.
- G2 pause via updateJob mirrors pendingActionId, so a pause through the
generic path carries the flat field the stale-decision guard compares.
- G3 resume via updateJob refreshes lastActiveAt (and clears pendingActionId),
matching transitionStatus so a long-paused job isn't reaped post-resume.
- G4 getActiveJobIdsByUser excludes a requires_action job whose pendingAction
is past expiry (both stores), via shared isPendingActionExpired — the client
stops polling an expired prompt.
- G5 createJob clears stale pendingAction/pendingActionId/lastActiveAt on a
reused streamId, so a fresh run never exposes a prior run's approval metadata
and cleanup keys off the new createdAt.
Tests added: expired pending-approval excluded from the active set. tsc +
lint clean; policy + type-contract specs pass.
This commit is contained in:
parent
abf4b86291
commit
780833d908
4 changed files with 135 additions and 61 deletions
|
|
@ -214,5 +214,17 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', ()
|
|||
const active = await manager.getActiveJobIdsForUser('user-mix');
|
||||
expect(active.sort()).toEqual(['s-paused', 's-running']);
|
||||
});
|
||||
|
||||
test('excludes a pending-approval job whose prompt has expired', async () => {
|
||||
const streamId = 'stream-expired-active';
|
||||
await manager.createJob(streamId, 'user-exp');
|
||||
await manager.approvals.pause(
|
||||
streamId,
|
||||
buildAction(streamId, { expiresAt: Date.now() - 1000 }),
|
||||
);
|
||||
|
||||
// Still requires_action, but the prompt is past expiry → no longer active.
|
||||
expect(await manager.getActiveJobIdsForUser('user-exp')).not.toContain(streamId);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import type {
|
|||
JobStatus,
|
||||
JobStatusTransition,
|
||||
} from '~/stream/interfaces/IJobStore';
|
||||
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Content state for a job - volatile, in-memory only.
|
||||
|
|
@ -321,8 +322,13 @@ export class InMemoryJobStore implements IJobStore {
|
|||
for (const streamId of trackedIds) {
|
||||
const job = this.jobs.get(streamId);
|
||||
// Include running jobs and jobs paused for human review (e.g. tool approval).
|
||||
// A pending-approval job still occupies the user's conversation slot.
|
||||
// A pending-approval job still occupies the user's conversation slot — but
|
||||
// only while its prompt is live: a past-`expiresAt` approval no longer
|
||||
// counts as active (cleanup/expiry will finalize it).
|
||||
if (job && (job.status === 'running' || job.status === 'requires_action')) {
|
||||
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
|
||||
continue;
|
||||
}
|
||||
activeIds.push(streamId);
|
||||
} else {
|
||||
// Self-healing: job completed/deleted but mapping wasn't cleaned - fix it now
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import type {
|
|||
JobStatus,
|
||||
JobStatusTransition,
|
||||
} from '~/stream/interfaces/IJobStore';
|
||||
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
|
||||
|
||||
/**
|
||||
* Atomic compare-and-set on the job hash — the single-winner decision for a
|
||||
|
|
@ -194,10 +195,21 @@ export class RedisJobStore implements IJobStore {
|
|||
const key = KEYS.job(streamId);
|
||||
const userJobsKey = KEYS.userJobs(userId, tenantId);
|
||||
|
||||
// A reused streamId overlays onto any existing hash, so paused-run fields
|
||||
// from a prior generation could survive. Drop the HITL fields so the fresh
|
||||
// running job never exposes stale approval metadata and cleanup keys off the
|
||||
// new createdAt rather than a leftover lastActiveAt.
|
||||
const staleHitlFields: Array<keyof SerializableJobData> = [
|
||||
'pendingAction',
|
||||
'pendingActionId',
|
||||
'lastActiveAt',
|
||||
];
|
||||
|
||||
// For cluster mode, we can't pipeline keys on different slots
|
||||
// The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots
|
||||
if (this.isCluster) {
|
||||
await this.redis.hset(key, this.serializeJob(job));
|
||||
await this.redis.hdel(key, ...staleHitlFields);
|
||||
await this.redis.expire(key, this.ttl.running);
|
||||
await this.redis.sadd(KEYS.runningJobs, streamId);
|
||||
await this.redis.srem(KEYS.requiresActionJobs, streamId);
|
||||
|
|
@ -208,6 +220,7 @@ export class RedisJobStore implements IJobStore {
|
|||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
pipeline.hset(key, this.serializeJob(job));
|
||||
pipeline.hdel(key, ...staleHitlFields);
|
||||
pipeline.expire(key, this.ttl.running);
|
||||
pipeline.sadd(KEYS.runningJobs, streamId);
|
||||
pipeline.srem(KEYS.requiresActionJobs, streamId);
|
||||
|
|
@ -233,7 +246,19 @@ export class RedisJobStore implements IJobStore {
|
|||
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
|
||||
const key = KEYS.job(streamId);
|
||||
|
||||
const serialized = this.serializeJob(updates as SerializableJobData);
|
||||
// Keep this generic path consistent with the guarded transitionStatus path:
|
||||
// - mirror `pendingActionId` on any `pendingAction` write so a pause here
|
||||
// still carries the flat field the stale-decision guard compares;
|
||||
// - refresh `lastActiveAt` when resuming to `running` so a long-paused job
|
||||
// isn't reaped by `createdAt` on the next cleanup tick.
|
||||
let effective: Partial<SerializableJobData> = updates;
|
||||
if (updates.status === 'running') {
|
||||
effective = { ...updates, lastActiveAt: updates.lastActiveAt ?? Date.now() };
|
||||
} else if (updates.pendingAction) {
|
||||
effective = { ...updates, pendingActionId: updates.pendingAction.actionId };
|
||||
}
|
||||
|
||||
const serialized = this.serializeJob(effective as SerializableJobData);
|
||||
if (Object.keys(serialized).length === 0) {
|
||||
return;
|
||||
}
|
||||
|
|
@ -256,54 +281,67 @@ export class RedisJobStore implements IJobStore {
|
|||
}
|
||||
|
||||
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
|
||||
// Proactively remove from user's job set (requires reading userId from the job hash)
|
||||
const job = await this.getJob(streamId);
|
||||
const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
|
||||
await this.applyTerminalContentCleanup(streamId);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.isCluster) {
|
||||
await this.redis.expire(key, this.ttl.completed);
|
||||
await this.redis.srem(KEYS.runningJobs, streamId);
|
||||
await this.redis.srem(KEYS.requiresActionJobs, streamId);
|
||||
/**
|
||||
* Terminal cleanup shared by `updateJob` (complete/error/aborted) and the
|
||||
* terminal path of `transitionStatus` (approval expiry → aborted): drop the
|
||||
* job from both membership sets and the user-active set, shorten the job-hash
|
||||
* TTL to the completed window, and del/shorten the chunk + run-step keys per
|
||||
* the configured after-complete TTLs. Without sharing this, an expired
|
||||
* approval left Redis stream contents around for the full running TTL.
|
||||
*/
|
||||
private async applyTerminalContentCleanup(streamId: string): Promise<void> {
|
||||
const key = KEYS.job(streamId);
|
||||
// Proactively remove from user's job set (requires reading userId from the job hash)
|
||||
const job = await this.getJob(streamId);
|
||||
const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
|
||||
|
||||
if (this.ttl.chunksAfterComplete === 0) {
|
||||
await this.redis.del(KEYS.chunks(streamId));
|
||||
} else {
|
||||
await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
|
||||
}
|
||||
if (this.isCluster) {
|
||||
await this.redis.expire(key, this.ttl.completed);
|
||||
await this.redis.srem(KEYS.runningJobs, streamId);
|
||||
await this.redis.srem(KEYS.requiresActionJobs, streamId);
|
||||
|
||||
if (this.ttl.runStepsAfterComplete === 0) {
|
||||
await this.redis.del(KEYS.runSteps(streamId));
|
||||
} else {
|
||||
await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
|
||||
}
|
||||
|
||||
if (userJobsKey) {
|
||||
await this.redis.srem(userJobsKey, streamId);
|
||||
}
|
||||
if (this.ttl.chunksAfterComplete === 0) {
|
||||
await this.redis.del(KEYS.chunks(streamId));
|
||||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
pipeline.expire(key, this.ttl.completed);
|
||||
pipeline.srem(KEYS.runningJobs, streamId);
|
||||
pipeline.srem(KEYS.requiresActionJobs, streamId);
|
||||
|
||||
if (this.ttl.chunksAfterComplete === 0) {
|
||||
pipeline.del(KEYS.chunks(streamId));
|
||||
} else {
|
||||
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
|
||||
}
|
||||
|
||||
if (this.ttl.runStepsAfterComplete === 0) {
|
||||
pipeline.del(KEYS.runSteps(streamId));
|
||||
} else {
|
||||
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
|
||||
}
|
||||
|
||||
if (userJobsKey) {
|
||||
pipeline.srem(userJobsKey, streamId);
|
||||
}
|
||||
|
||||
await pipeline.exec();
|
||||
await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
|
||||
}
|
||||
|
||||
if (this.ttl.runStepsAfterComplete === 0) {
|
||||
await this.redis.del(KEYS.runSteps(streamId));
|
||||
} else {
|
||||
await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
|
||||
}
|
||||
|
||||
if (userJobsKey) {
|
||||
await this.redis.srem(userJobsKey, streamId);
|
||||
}
|
||||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
pipeline.expire(key, this.ttl.completed);
|
||||
pipeline.srem(KEYS.runningJobs, streamId);
|
||||
pipeline.srem(KEYS.requiresActionJobs, streamId);
|
||||
|
||||
if (this.ttl.chunksAfterComplete === 0) {
|
||||
pipeline.del(KEYS.chunks(streamId));
|
||||
} else {
|
||||
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
|
||||
}
|
||||
|
||||
if (this.ttl.runStepsAfterComplete === 0) {
|
||||
pipeline.del(KEYS.runSteps(streamId));
|
||||
} else {
|
||||
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
|
||||
}
|
||||
|
||||
if (userJobsKey) {
|
||||
pipeline.srem(userJobsKey, streamId);
|
||||
}
|
||||
|
||||
await pipeline.exec();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -352,9 +390,14 @@ export class RedisJobStore implements IJobStore {
|
|||
return false;
|
||||
}
|
||||
|
||||
// 2) Reconcile membership sets + live-key TTLs. Only the winner reaches
|
||||
// here; set membership is derived state that periodic cleanup
|
||||
// self-heals, so this non-atomic cross-slot step is safe.
|
||||
// 2) Reconcile derived state. Only the winner reaches here; membership is
|
||||
// self-healed by periodic cleanup, so this non-atomic cross-slot step is
|
||||
// safe. A terminal target (e.g. approval expiry → aborted) gets the same
|
||||
// content cleanup as updateJob's terminal path.
|
||||
if (terminal) {
|
||||
await this.applyTerminalContentCleanup(streamId);
|
||||
return true;
|
||||
}
|
||||
if (this.isCluster) {
|
||||
if (remSet) {
|
||||
await this.redis.srem(remSet, streamId);
|
||||
|
|
@ -362,10 +405,8 @@ export class RedisJobStore implements IJobStore {
|
|||
if (addSet) {
|
||||
await this.redis.sadd(addSet, streamId);
|
||||
}
|
||||
if (!terminal) {
|
||||
await this.redis.expire(KEYS.chunks(streamId), ttl);
|
||||
await this.redis.expire(KEYS.runSteps(streamId), ttl);
|
||||
}
|
||||
await this.redis.expire(KEYS.chunks(streamId), ttl);
|
||||
await this.redis.expire(KEYS.runSteps(streamId), ttl);
|
||||
} else {
|
||||
const pipeline = this.redis.pipeline();
|
||||
if (remSet) {
|
||||
|
|
@ -374,10 +415,8 @@ export class RedisJobStore implements IJobStore {
|
|||
if (addSet) {
|
||||
pipeline.sadd(addSet, streamId);
|
||||
}
|
||||
if (!terminal) {
|
||||
pipeline.expire(KEYS.chunks(streamId), ttl);
|
||||
pipeline.expire(KEYS.runSteps(streamId), ttl);
|
||||
}
|
||||
pipeline.expire(KEYS.chunks(streamId), ttl);
|
||||
pipeline.expire(KEYS.runSteps(streamId), ttl);
|
||||
await pipeline.exec();
|
||||
}
|
||||
return true;
|
||||
|
|
@ -431,14 +470,15 @@ export class RedisJobStore implements IJobStore {
|
|||
streamId: string,
|
||||
fields: string[],
|
||||
): Promise<void> {
|
||||
// Resume from requires_action and clear stale pendingAction. serializeJob skips
|
||||
// `undefined`, so the hash field must be removed explicitly.
|
||||
// Resume from requires_action and clear stale pendingAction + its flat
|
||||
// pendingActionId mirror. serializeJob skips `undefined`, so the hash
|
||||
// fields must be removed explicitly.
|
||||
if (this.isCluster) {
|
||||
const updated = await this.updateExistingJobHash(key, fields);
|
||||
if (!updated) {
|
||||
return;
|
||||
}
|
||||
await this.redis.hdel(key, 'pendingAction');
|
||||
await this.redis.hdel(key, 'pendingAction', 'pendingActionId');
|
||||
await this.refreshLiveJobTtls(key, streamId);
|
||||
await this.redis.srem(KEYS.requiresActionJobs, streamId);
|
||||
await this.redis.sadd(KEYS.runningJobs, streamId);
|
||||
|
|
@ -446,7 +486,7 @@ export class RedisJobStore implements IJobStore {
|
|||
}
|
||||
|
||||
await this.redis.eval(
|
||||
'if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end redis.call("HSET", KEYS[1], unpack(ARGV, 3)) redis.call("HDEL", KEYS[1], "pendingAction") redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[4], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[5], tonumber(ARGV[2])) redis.call("SREM", KEYS[2], ARGV[1]) redis.call("SADD", KEYS[3], ARGV[1]) return 1',
|
||||
'if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end redis.call("HSET", KEYS[1], unpack(ARGV, 3)) redis.call("HDEL", KEYS[1], "pendingAction", "pendingActionId") redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[4], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[5], tonumber(ARGV[2])) redis.call("SREM", KEYS[2], ARGV[1]) redis.call("SADD", KEYS[3], ARGV[1]) return 1',
|
||||
5,
|
||||
key,
|
||||
KEYS.requiresActionJobs,
|
||||
|
|
@ -717,8 +757,14 @@ export class RedisJobStore implements IJobStore {
|
|||
for (const streamId of trackedIds) {
|
||||
const job = await this.getJob(streamId);
|
||||
// Include running jobs and jobs paused for human review (e.g. tool approval).
|
||||
// A pending-approval job still occupies the user's conversation slot.
|
||||
// A pending-approval job still occupies the user's conversation slot — but
|
||||
// only while its prompt is live: a past-`expiresAt` approval no longer
|
||||
// counts as active (cleanup/expiry will finalize it), so the client stops
|
||||
// polling and can complete.
|
||||
if (job && (job.status === 'running' || job.status === 'requires_action')) {
|
||||
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
|
||||
continue;
|
||||
}
|
||||
activeIds.push(streamId);
|
||||
} else {
|
||||
// Self-healing: job completed/deleted but mapping wasn't cleaned - mark for removal
|
||||
|
|
|
|||
|
|
@ -87,6 +87,16 @@ export interface SerializableJobData {
|
|||
lastActiveAt?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether a job's pending review has passed its `expiresAt`. Shared by the
|
||||
* stores so an expired approval is kept out of active-job listings (the client
|
||||
* stops polling; cleanup/expiry finalizes it).
|
||||
*/
|
||||
export function isPendingActionExpired(job: Pick<SerializableJobData, 'pendingAction'>): boolean {
|
||||
const exp = job.pendingAction?.expiresAt;
|
||||
return exp != null && exp <= Date.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue