🛡️ fix: Address Codex round 2 on the HITL Redis adapter

Five P2 findings on abf4b86291, all valid Redis-adapter consequences of
round 1:

- G1 terminal cleanup on expiry: transitionStatus's terminal path now runs
  the same chunk/run-step/userJobs cleanup as updateJob (extracted into a
  shared applyTerminalContentCleanup). Expired approvals no longer leave
  Redis stream contents around for the full running TTL.
- G2 pause via updateJob mirrors pendingActionId, so a pause through the
  generic path carries the flat field the stale-decision guard compares.
- G3 resume via updateJob refreshes lastActiveAt (and clears pendingActionId),
  matching transitionStatus so a long-paused job isn't reaped post-resume.
- G4 getActiveJobIdsByUser excludes a requires_action job whose pendingAction
  is past expiry (both stores), via shared isPendingActionExpired — the client
  stops polling an expired prompt.
- G5 createJob clears stale pendingAction/pendingActionId/lastActiveAt on a
  reused streamId, so a fresh run never exposes a prior run's approval metadata
  and cleanup keys off the new createdAt.

Tests added: expired pending-approval excluded from the active set. tsc +
lint clean; policy + type-contract specs pass.
This commit is contained in:
Danny Avila 2026-06-16 14:14:40 -04:00
parent abf4b86291
commit 780833d908
4 changed files with 135 additions and 61 deletions

View file

@ -214,5 +214,17 @@ describe('ApprovalLifecycle via GenerationJobManager.approvals (in-memory)', ()
const active = await manager.getActiveJobIdsForUser('user-mix');
expect(active.sort()).toEqual(['s-paused', 's-running']);
});
test('excludes a pending-approval job whose prompt has expired', async () => {
const streamId = 'stream-expired-active';
await manager.createJob(streamId, 'user-exp');
await manager.approvals.pause(
streamId,
buildAction(streamId, { expiresAt: Date.now() - 1000 }),
);
// Still requires_action, but the prompt is past expiry → no longer active.
expect(await manager.getActiveJobIdsForUser('user-exp')).not.toContain(streamId);
});
});
});

View file

@ -8,6 +8,7 @@ import type {
JobStatus,
JobStatusTransition,
} from '~/stream/interfaces/IJobStore';
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
/**
* Content state for a job - volatile, in-memory only.
@ -321,8 +322,13 @@ export class InMemoryJobStore implements IJobStore {
for (const streamId of trackedIds) {
const job = this.jobs.get(streamId);
// Include running jobs and jobs paused for human review (e.g. tool approval).
// A pending-approval job still occupies the user's conversation slot.
// A pending-approval job still occupies the user's conversation slot — but
// only while its prompt is live: a past-`expiresAt` approval no longer
// counts as active (cleanup/expiry will finalize it).
if (job && (job.status === 'running' || job.status === 'requires_action')) {
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
continue;
}
activeIds.push(streamId);
} else {
// Self-healing: job completed/deleted but mapping wasn't cleaned - fix it now

View file

@ -10,6 +10,7 @@ import type {
JobStatus,
JobStatusTransition,
} from '~/stream/interfaces/IJobStore';
import { isPendingActionExpired } from '~/stream/interfaces/IJobStore';
/**
* Atomic compare-and-set on the job hash the single-winner decision for a
@ -194,10 +195,21 @@ export class RedisJobStore implements IJobStore {
const key = KEYS.job(streamId);
const userJobsKey = KEYS.userJobs(userId, tenantId);
// A reused streamId overlays onto any existing hash, so paused-run fields
// from a prior generation could survive. Drop the HITL fields so the fresh
// running job never exposes stale approval metadata and cleanup keys off the
// new createdAt rather than a leftover lastActiveAt.
const staleHitlFields: Array<keyof SerializableJobData> = [
'pendingAction',
'pendingActionId',
'lastActiveAt',
];
// For cluster mode, we can't pipeline keys on different slots
// The job key uses hash tag {streamId}, runningJobs and userJobs are on different slots
if (this.isCluster) {
await this.redis.hset(key, this.serializeJob(job));
await this.redis.hdel(key, ...staleHitlFields);
await this.redis.expire(key, this.ttl.running);
await this.redis.sadd(KEYS.runningJobs, streamId);
await this.redis.srem(KEYS.requiresActionJobs, streamId);
@ -208,6 +220,7 @@ export class RedisJobStore implements IJobStore {
} else {
const pipeline = this.redis.pipeline();
pipeline.hset(key, this.serializeJob(job));
pipeline.hdel(key, ...staleHitlFields);
pipeline.expire(key, this.ttl.running);
pipeline.sadd(KEYS.runningJobs, streamId);
pipeline.srem(KEYS.requiresActionJobs, streamId);
@ -233,7 +246,19 @@ export class RedisJobStore implements IJobStore {
async updateJob(streamId: string, updates: Partial<SerializableJobData>): Promise<void> {
const key = KEYS.job(streamId);
const serialized = this.serializeJob(updates as SerializableJobData);
// Keep this generic path consistent with the guarded transitionStatus path:
// - mirror `pendingActionId` on any `pendingAction` write so a pause here
// still carries the flat field the stale-decision guard compares;
// - refresh `lastActiveAt` when resuming to `running` so a long-paused job
// isn't reaped by `createdAt` on the next cleanup tick.
let effective: Partial<SerializableJobData> = updates;
if (updates.status === 'running') {
effective = { ...updates, lastActiveAt: updates.lastActiveAt ?? Date.now() };
} else if (updates.pendingAction) {
effective = { ...updates, pendingActionId: updates.pendingAction.actionId };
}
const serialized = this.serializeJob(effective as SerializableJobData);
if (Object.keys(serialized).length === 0) {
return;
}
@ -256,54 +281,67 @@ export class RedisJobStore implements IJobStore {
}
if (updates.status && ['complete', 'error', 'aborted'].includes(updates.status)) {
// Proactively remove from user's job set (requires reading userId from the job hash)
const job = await this.getJob(streamId);
const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
await this.applyTerminalContentCleanup(streamId);
}
}
if (this.isCluster) {
await this.redis.expire(key, this.ttl.completed);
await this.redis.srem(KEYS.runningJobs, streamId);
await this.redis.srem(KEYS.requiresActionJobs, streamId);
/**
* Terminal cleanup shared by `updateJob` (complete/error/aborted) and the
* terminal path of `transitionStatus` (approval expiry aborted): drop the
* job from both membership sets and the user-active set, shorten the job-hash
* TTL to the completed window, and del/shorten the chunk + run-step keys per
* the configured after-complete TTLs. Without sharing this, an expired
* approval left Redis stream contents around for the full running TTL.
*/
private async applyTerminalContentCleanup(streamId: string): Promise<void> {
const key = KEYS.job(streamId);
// Proactively remove from user's job set (requires reading userId from the job hash)
const job = await this.getJob(streamId);
const userJobsKey = job?.userId ? KEYS.userJobs(job.userId, job.tenantId) : null;
if (this.ttl.chunksAfterComplete === 0) {
await this.redis.del(KEYS.chunks(streamId));
} else {
await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
if (this.isCluster) {
await this.redis.expire(key, this.ttl.completed);
await this.redis.srem(KEYS.runningJobs, streamId);
await this.redis.srem(KEYS.requiresActionJobs, streamId);
if (this.ttl.runStepsAfterComplete === 0) {
await this.redis.del(KEYS.runSteps(streamId));
} else {
await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (userJobsKey) {
await this.redis.srem(userJobsKey, streamId);
}
if (this.ttl.chunksAfterComplete === 0) {
await this.redis.del(KEYS.chunks(streamId));
} else {
const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId);
pipeline.srem(KEYS.requiresActionJobs, streamId);
if (this.ttl.chunksAfterComplete === 0) {
pipeline.del(KEYS.chunks(streamId));
} else {
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
if (this.ttl.runStepsAfterComplete === 0) {
pipeline.del(KEYS.runSteps(streamId));
} else {
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (userJobsKey) {
pipeline.srem(userJobsKey, streamId);
}
await pipeline.exec();
await this.redis.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
if (this.ttl.runStepsAfterComplete === 0) {
await this.redis.del(KEYS.runSteps(streamId));
} else {
await this.redis.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (userJobsKey) {
await this.redis.srem(userJobsKey, streamId);
}
} else {
const pipeline = this.redis.pipeline();
pipeline.expire(key, this.ttl.completed);
pipeline.srem(KEYS.runningJobs, streamId);
pipeline.srem(KEYS.requiresActionJobs, streamId);
if (this.ttl.chunksAfterComplete === 0) {
pipeline.del(KEYS.chunks(streamId));
} else {
pipeline.expire(KEYS.chunks(streamId), this.ttl.chunksAfterComplete);
}
if (this.ttl.runStepsAfterComplete === 0) {
pipeline.del(KEYS.runSteps(streamId));
} else {
pipeline.expire(KEYS.runSteps(streamId), this.ttl.runStepsAfterComplete);
}
if (userJobsKey) {
pipeline.srem(userJobsKey, streamId);
}
await pipeline.exec();
}
}
@ -352,9 +390,14 @@ export class RedisJobStore implements IJobStore {
return false;
}
// 2) Reconcile membership sets + live-key TTLs. Only the winner reaches
// here; set membership is derived state that periodic cleanup
// self-heals, so this non-atomic cross-slot step is safe.
// 2) Reconcile derived state. Only the winner reaches here; membership is
// self-healed by periodic cleanup, so this non-atomic cross-slot step is
// safe. A terminal target (e.g. approval expiry → aborted) gets the same
// content cleanup as updateJob's terminal path.
if (terminal) {
await this.applyTerminalContentCleanup(streamId);
return true;
}
if (this.isCluster) {
if (remSet) {
await this.redis.srem(remSet, streamId);
@ -362,10 +405,8 @@ export class RedisJobStore implements IJobStore {
if (addSet) {
await this.redis.sadd(addSet, streamId);
}
if (!terminal) {
await this.redis.expire(KEYS.chunks(streamId), ttl);
await this.redis.expire(KEYS.runSteps(streamId), ttl);
}
await this.redis.expire(KEYS.chunks(streamId), ttl);
await this.redis.expire(KEYS.runSteps(streamId), ttl);
} else {
const pipeline = this.redis.pipeline();
if (remSet) {
@ -374,10 +415,8 @@ export class RedisJobStore implements IJobStore {
if (addSet) {
pipeline.sadd(addSet, streamId);
}
if (!terminal) {
pipeline.expire(KEYS.chunks(streamId), ttl);
pipeline.expire(KEYS.runSteps(streamId), ttl);
}
pipeline.expire(KEYS.chunks(streamId), ttl);
pipeline.expire(KEYS.runSteps(streamId), ttl);
await pipeline.exec();
}
return true;
@ -431,14 +470,15 @@ export class RedisJobStore implements IJobStore {
streamId: string,
fields: string[],
): Promise<void> {
// Resume from requires_action and clear stale pendingAction. serializeJob skips
// `undefined`, so the hash field must be removed explicitly.
// Resume from requires_action and clear stale pendingAction + its flat
// pendingActionId mirror. serializeJob skips `undefined`, so the hash
// fields must be removed explicitly.
if (this.isCluster) {
const updated = await this.updateExistingJobHash(key, fields);
if (!updated) {
return;
}
await this.redis.hdel(key, 'pendingAction');
await this.redis.hdel(key, 'pendingAction', 'pendingActionId');
await this.refreshLiveJobTtls(key, streamId);
await this.redis.srem(KEYS.requiresActionJobs, streamId);
await this.redis.sadd(KEYS.runningJobs, streamId);
@ -446,7 +486,7 @@ export class RedisJobStore implements IJobStore {
}
await this.redis.eval(
'if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end redis.call("HSET", KEYS[1], unpack(ARGV, 3)) redis.call("HDEL", KEYS[1], "pendingAction") redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[4], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[5], tonumber(ARGV[2])) redis.call("SREM", KEYS[2], ARGV[1]) redis.call("SADD", KEYS[3], ARGV[1]) return 1',
'if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end redis.call("HSET", KEYS[1], unpack(ARGV, 3)) redis.call("HDEL", KEYS[1], "pendingAction", "pendingActionId") redis.call("EXPIRE", KEYS[1], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[4], tonumber(ARGV[2])) redis.call("EXPIRE", KEYS[5], tonumber(ARGV[2])) redis.call("SREM", KEYS[2], ARGV[1]) redis.call("SADD", KEYS[3], ARGV[1]) return 1',
5,
key,
KEYS.requiresActionJobs,
@ -717,8 +757,14 @@ export class RedisJobStore implements IJobStore {
for (const streamId of trackedIds) {
const job = await this.getJob(streamId);
// Include running jobs and jobs paused for human review (e.g. tool approval).
// A pending-approval job still occupies the user's conversation slot.
// A pending-approval job still occupies the user's conversation slot — but
// only while its prompt is live: a past-`expiresAt` approval no longer
// counts as active (cleanup/expiry will finalize it), so the client stops
// polling and can complete.
if (job && (job.status === 'running' || job.status === 'requires_action')) {
if (job.status === 'requires_action' && isPendingActionExpired(job)) {
continue;
}
activeIds.push(streamId);
} else {
// Self-healing: job completed/deleted but mapping wasn't cleaned - mark for removal

View file

@ -87,6 +87,16 @@ export interface SerializableJobData {
lastActiveAt?: number;
}
/**
* Whether a job's pending review has passed its `expiresAt`. Shared by the
* stores so an expired approval is kept out of active-job listings (the client
* stops polling; cleanup/expiry finalizes it).
*/
export function isPendingActionExpired(job: Pick<SerializableJobData, 'pendingAction'>): boolean {
const exp = job.pendingAction?.expiresAt;
return exp != null && exp <= Date.now();
}
/**
* Arguments for an atomic {@link IJobStore.transitionStatus} compare-and-set.
*/