mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-06-29 10:51:34 +00:00
* 🧠 feat: Memory Agent Capability with Inline Tools and Ephemeral Badge
Add `AgentCapabilities.memory`, which expands into the inline set_memory/delete_memory tool pair (mirroring the execute_code expansion via registerMemoryTools) when a run-level memoryAvailable gate holds: capability enabled, memory configured, MEMORIES.USE permission, and personalization not opted out. Surfaces the memory artifact as an attachment in the agents tool-end callback.
Adds the ephemeral path (TEphemeralAgent.memory, load/added agent tool injection), a fully-gated memory badge plus tools-dropdown entry, the agent-builder Memory toggle with form round-trip, and a mock e2e test asserting the badge reaches the request payload. Additive to and independent of the existing post-turn memory extraction agent.
* 🩹 fix: Address Codex review on memory capability (gating, validKeys, usage guard)
- Strip the memory capability from the served agents capabilities when memory is not configured/enabled, so the badge, tools dropdown, agent-builder toggle, and backend capability gate stay consistent instead of exposing an inert toggle on default installs (where MEMORIES.USE defaults true).
- Surface configured memory.validKeys in the inline tool definitions so the model is told the allowed keys up front, matching the runtime createMemoryTool schema.
- Append a strict explicit-request usage guard to the agent instructions when inline memory tools are registered, preserving the memory-agent's privacy behavior.
- Add AppService tests covering memory-capability stripping.
* ✅ test: Update AppService capability snapshots for memory strip
AppService now strips the memory capability from the served agents defaults when no memory block is configured; update the spec's expected capability lists to defaultAgentCapabilitiesWithoutMemory for the no-memory-config cases.
* 🛡️ fix: Address Codex re-review on memory capability (round 2)
- Strip the memory capability from the FINAL served agents config, not just defaults; loadEndpoints reparses any endpoints.agents block, so memory was still exposed in that common shape (packages/data-schemas/src/app/service.ts) + regression test.
- Re-check the full memory gate (config, opt-out, MEMORIES.USE) inside handleTools before constructing set_memory/delete_memory, so an unsolicited tool call from a model/custom endpoint can't bypass the runtime gates (api/app/clients/tools/util/handleTools.js).
- Restore the persisted memory toggle for model-spec conversations via applyModelSpecEphemeralAgent (client/src/utils/endpoints.ts).
- Clear LAST_MEMORY_TOGGLE_ on logout and clear-all-chats so a stale memory preference can't leak across users on a shared browser (client/src/utils/localStorage.ts).
* 🧠 fix: Address Codex re-review on memory capability (round 3)
- Serialize set_memory writes and advance a running token total inside createMemoryTool, so parallel batched calls in one event-driven turn can't each pass the limit check against a stale total and collectively exceed memory.tokenLimit (packages/api/src/agents/memory.ts) + tests.
- Inject the keyed memory context (withKeys) instead of withoutKeys when the running agent has the inline memory capability, so delete_memory has a visible key to target (api/server/controllers/agents/client.js).
* 🔐 fix: Address Codex re-review on memory capability (round 4)
- Detect inline memory by tool NAME (set_memory/delete_memory) across an initialized agent's tools + toolDefinitions, since the 'memory' marker is expanded at init and the prior string check never matched; inject the keyed memory context for any primary OR sub-agent that carries the inline memory tools (api/server/controllers/agents/client.js).
- Enforce memory WRITE permissions in the inline tool gate: set_memory requires CREATE+UPDATE and delete_memory requires UPDATE (matching the REST memory routes), so a USE-only role can't mutate/delete memories via agent tool calls (api/app/clients/tools/util/handleTools.js).
* 🔒 fix: Address Codex re-review on memory capability (round 5)
- Gate inline memory registration (memoryAvailable) on the memory WRITE permissions (USE+CREATE+UPDATE), so a read-only-memory role no longer has set_memory/delete_memory shown to the model only for the runtime loader to refuse them (api/server/services/Endpoints/agents/initialize.js).
- Enforce the per-agent memory opt-in at execution: handleTools now refuses to construct set_memory/delete_memory unless the agent actually declared them (toolDefinitions/tools), blocking hallucinated/undeclared memory tool calls from mutating memory.
- Fail closed when getFormattedMemories errors with a configured tokenLimit, instead of writing as if storage were empty and bypassing the cap (api/app/clients/tools/util/handleTools.js).
* 🩹 fix: Address Codex re-review on memory capability (round 6)
- Fix a P1 regression from the prior round: the execution-context agent keeps the raw 'memory' capability marker (not the expanded set_memory/delete_memory names), so the opt-in check now matches the marker. This restores memory writes/deletes AND avoids hijacking an MCP tool that merely shares the set_memory/delete_memory name (api/app/clients/tools/util/handleTools.js).
- Count repeated set_memory writes to the same key as replacements, not additions, against tokenLimit — set_memory upserts, so a same-key rewrite swaps its prior token contribution instead of double-counting (packages/api/src/agents/memory.ts) + test.
- Gate the memory badge, tools dropdown, and agent-builder toggle on the full memory write permissions (USE+CREATE+UPDATE) via a shared useHasMemoryAccess hook, so a read-only-memory role no longer sees an enabled Memory control the backend would refuse to wire up.
* 🧷 fix: Address Codex re-review on memory capability (round 7)
- Recognize inline memory across both execution-context agent shapes: initializeAgent now sets a LibreChat-only memoryToolsRegistered flag on the InitializedAgent, and the opt-in/detection checks accept that flag OR the raw 'memory' marker. Fixes memory failing for processAddedConvo agents (which store the initialized config, marker already expanded) while staying MCP-name-collision-safe (api/app/clients/tools/util/handleTools.js, packages/api/src/agents/initialize.ts, api/server/controllers/agents/client.js).
- Scope keyed memory context to memory-enabled agents only: useMemory now returns both keyed and unkeyed contexts, and buildMessages injects the keyed one (memory keys + token metadata) only to agents that can call delete_memory, while the primary/post-turn path keeps the unkeyed values — so a primary without memory tools no longer sees memory keys it doesn't need.
* 🔏 fix: Address Codex re-review on memory capability (round 8)
- Enforce memory size limits on inline writes: createMemoryTool now rejects keys over 1000 chars and values over memory.charLimit, matching the REST memory routes, so an inline-memory agent can't persist blobs the memory UI/API would reject (packages/api/src/agents/memory.ts, api/app/clients/tools/util/handleTools.js) + test.
- Recheck the agents 'memory' endpoint capability at execution time, so a stale/hallucinated set_memory/delete_memory call can't mutate memory after an admin removes the capability while the agent document still carries the marker (api/app/clients/tools/util/handleTools.js).
* ♻️ refactor: Move inline-memory backend logic into packages/api + share memory load
Workspace boundary: the inline-memory gating/detection logic that had crept into /api now lives in packages/api/src/agents/memory.ts (TS), with /api kept as thin wrappers.
- Add agentHasInlineMemoryTools, isMemoryToolAllowed, and buildInlineMemoryTool to packages/api; handleTools.js now calls buildInlineMemoryTool instead of constructing/gating the tools inline, and client.js imports agentHasInlineMemoryTools instead of redefining it.
- Optimize repeated memory loads: getRequestMemories memoizes getFormattedMemories per request (WeakMap keyed by req), so the run's memory-context load and every memory-enabled agent's set_memory token-usage load share a single DB fetch instead of one per agent.
* 🧠 fix: Invalidate request memory cache after inline writes
Inline set_memory/delete_memory now invalidate the request-scoped
getFormattedMemories cache on a successful write, so a later tool round
in the same response is seeded with the post-write usage total instead
of the stale pre-write one (multi-round writes no longer collectively
exceed tokenLimit, and a set after a delete is not over-counted). The
within-round sharing across multiple memory-enabled agents is preserved.
* 🧠 fix: Persist memory capability on saved agents; honor registration flag
- Add Tools.memory to the v1 systemTools allowlist so filterAuthorizedTools
no longer silently drops the memory marker when an agent with the Memory
capability is created/updated/duplicated through the builder (previously
the capability only worked for ephemeral chats, not persisted agents).
- agentHasInlineMemoryTools now honors an explicit memoryToolsRegistered
boolean before falling back to the raw `memory` marker, so an initialized
config whose registration was denied (memoryAvailable false) is not given
keyed memory context just because the marker survives in tools.
* 🧩 fix: Bring memory tool to parity with other ephemeral tools
- Add `memory` to the model-spec schema/type and honor `modelSpec.memory`
in both ephemeral paths (load.ts, added.ts) and the frontend spec
application, so admins can pre-enable Memory from a model spec exactly
like webSearch/fileSearch/executeCode.
- Add LAST_MEMORY_TOGGLE_ to the timestamped-storage cleanup list so stale
per-conversation memory toggles are purged on startup like the others.
- Hide the agent-builder Memory toggle for users who disabled memory in
personalization (memories === false), mirroring the chat badge's opt-out
gate, so the setting isn't shown as inert/misleading.
* ✅ test: Cover memory in applyModelSpecEphemeralAgent spec defaults
Update the exact-object assertions to include the new `memory` field and
add positive coverage that `modelSpec.memory` maps to the ephemeral
agent's `memory` flag. Fixes the shard 2/4 failure from 672a03b05.
1246 lines
46 KiB
JavaScript
1246 lines
46 KiB
JavaScript
const { nanoid } = require('nanoid');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
Tools,
|
|
StepTypes,
|
|
FileContext,
|
|
ErrorTypes,
|
|
UsageEvents,
|
|
} = require('librechat-data-provider');
|
|
const {
|
|
GraphEvents,
|
|
GraphNodeKeys,
|
|
ToolEndHandler,
|
|
createContentAggregator,
|
|
} = require('@librechat/agents');
|
|
const {
|
|
sendEvent,
|
|
computeUsageCostUSD,
|
|
GenerationJobManager,
|
|
writeAttachmentEvent,
|
|
createToolExecuteHandler,
|
|
HOST_FILE_AUTHORING_ARTIFACT_KEY,
|
|
isCodeSessionToolName,
|
|
} = require('@librechat/api');
|
|
const { processFileCitations } = require('~/server/services/Files/Citations');
|
|
const { processCodeOutput, runPreviewFinalize } = require('~/server/services/Files/Code/process');
|
|
const { saveBase64Image } = require('~/server/services/Files/process');
|
|
|
|
function isHostFileAuthoringArtifact(artifact) {
|
|
return artifact?.[HOST_FILE_AUTHORING_ARTIFACT_KEY] === true;
|
|
}
|
|
|
|
function isCodeArtifactToolOutput(output) {
|
|
return isCodeSessionToolName(output.name) || isHostFileAuthoringArtifact(output.artifact);
|
|
}
|
|
|
|
class ModelEndHandler {
|
|
/**
|
|
* @param {Array<UsageMetadata>} collectedUsage
|
|
* @param {Record<string, string> | null} [collectedThoughtSignatures] Map of
|
|
* `tool_call_id → thoughtSignature` accumulated across `chat_model_end`
|
|
* events. Used to persist Vertex Gemini 3 thought signatures across DB
|
|
* round-trips so resumed conversations don't 400 on the next API call.
|
|
* Each `model_end` may emit multiple tool calls (one per LLM cycle in a
|
|
* tool-using turn); per-id storage preserves the mapping so each tool
|
|
* call's signature can be restored onto the right reconstructed
|
|
* AIMessage rather than being concentrated on the last one.
|
|
* Optional; when `null`, the handler is a no-op for signatures. Non-Vertex
|
|
* providers don't emit `additional_kwargs.signatures`, so capture is also
|
|
* a no-op for them even when the map is provided.
|
|
* @param {(data: Record<string, unknown>) => Promise<void> | void} [emitUsage] Optional
|
|
* callback to stream per-call token usage to the client.
|
|
*/
|
|
constructor(collectedUsage, collectedThoughtSignatures = null, emitUsage = null) {
|
|
if (!Array.isArray(collectedUsage)) {
|
|
throw new Error('collectedUsage must be an array');
|
|
}
|
|
this.collectedUsage = collectedUsage;
|
|
this.collectedThoughtSignatures = collectedThoughtSignatures;
|
|
this.emitUsage = emitUsage;
|
|
}
|
|
|
|
finalize(errorMessage) {
|
|
if (!errorMessage) {
|
|
return;
|
|
}
|
|
throw new Error(errorMessage);
|
|
}
|
|
|
|
/**
|
|
* @param {string} event
|
|
* @param {ModelEndData | undefined} data
|
|
* @param {Record<string, unknown> | undefined} metadata
|
|
* @param {StandardGraph} graph
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async handle(event, data, metadata, graph) {
|
|
if (!graph || !metadata) {
|
|
console.warn(`Graph or metadata not found in ${event} event`);
|
|
return;
|
|
}
|
|
|
|
/** @type {string | undefined} */
|
|
let errorMessage;
|
|
try {
|
|
const agentContext = graph.getAgentContext(metadata);
|
|
if (data?.output?.additional_kwargs?.stop_reason === 'refusal') {
|
|
const info = { ...data.output.additional_kwargs };
|
|
errorMessage = JSON.stringify({
|
|
type: ErrorTypes.REFUSAL,
|
|
info,
|
|
});
|
|
logger.debug(`[ModelEndHandler] Model refused to respond`, {
|
|
...info,
|
|
userId: metadata.user_id,
|
|
messageId: metadata.run_id,
|
|
conversationId: metadata.thread_id,
|
|
});
|
|
}
|
|
|
|
const usage = data?.output?.usage_metadata;
|
|
if (!usage) {
|
|
return this.finalize(errorMessage);
|
|
}
|
|
const modelName = metadata?.ls_model_name || agentContext.clientOptions?.model;
|
|
if (modelName) {
|
|
usage.model = modelName;
|
|
}
|
|
if (agentContext.provider) {
|
|
usage.provider = agentContext.provider;
|
|
}
|
|
/** Tag the producing agent so multi-endpoint graphs can price each call
|
|
* with its own endpoint token config (recordCollectedUsage resolver). */
|
|
if (agentContext.agentId) {
|
|
usage.agentId = agentContext.agentId;
|
|
}
|
|
|
|
let taggedUsage = markSummarizationUsage(usage, metadata);
|
|
/** Hidden intermediate sequential-agent calls are billed but never shown.
|
|
* Tag them non-primary on the COLLECTED usage too (not just the emit) so
|
|
* recordCollectedUsage excludes their output from the parent's tokenCount
|
|
* and the client folds them into cost/totals only — not the live gauge. */
|
|
if (
|
|
taggedUsage.usage_type == null &&
|
|
!checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node) &&
|
|
metadata?.hide_sequential_outputs === true
|
|
) {
|
|
taggedUsage = { ...taggedUsage, usage_type: 'sequential' };
|
|
}
|
|
|
|
this.collectedUsage.push(taggedUsage);
|
|
|
|
if (this.emitUsage) {
|
|
/** Normalize Anthropic/Bedrock-style top-level cache fields into details */
|
|
const cache_creation =
|
|
taggedUsage.input_token_details?.cache_creation ??
|
|
taggedUsage.cache_creation_input_tokens;
|
|
const cache_read =
|
|
taggedUsage.input_token_details?.cache_read ?? taggedUsage.cache_read_input_tokens;
|
|
try {
|
|
await this.emitUsage({
|
|
input_tokens: taggedUsage.input_tokens,
|
|
output_tokens: taggedUsage.output_tokens,
|
|
total_tokens: taggedUsage.total_tokens,
|
|
input_token_details:
|
|
cache_creation != null || cache_read != null
|
|
? { cache_creation, cache_read }
|
|
: undefined,
|
|
model: taggedUsage.model,
|
|
provider: taggedUsage.provider,
|
|
usage_type: taggedUsage.usage_type,
|
|
/** Producing agent for per-endpoint pricing; consumed by the emit
|
|
* cost resolver and not included in the emitted/persisted payload. */
|
|
agentId: taggedUsage.agentId,
|
|
runId: metadata?.run_id,
|
|
/** Per-run sequence so identical payloads from distinct calls
|
|
* stay distinguishable during resume dedupe */
|
|
seq: this.collectedUsage.length,
|
|
});
|
|
} catch (err) {
|
|
/** Best-effort telemetry: a failed emit (closed SSE, Redis publish
|
|
* error) must not abort the handler before the thought-signature
|
|
* capture below, or resumed tool-call requests lose that metadata */
|
|
logger.warn('[ModelEndHandler] Failed to emit token usage', err);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* `additional_kwargs.signatures` is a flat array indexed by response
|
|
* part position (text + functionCall interleaved). `tool_calls` is
|
|
* just the function calls in their original order. Non-empty
|
|
* signatures correspond 1:1 with `tool_calls` in order — see
|
|
* `partsToSignatures` in `@langchain/google-common`. Walk both in a
|
|
* single pass to map each signature onto the right `tool_call.id`.
|
|
*/
|
|
const signatures = data?.output?.additional_kwargs?.signatures;
|
|
const toolCalls = data?.output?.tool_calls;
|
|
if (
|
|
this.collectedThoughtSignatures &&
|
|
Array.isArray(signatures) &&
|
|
Array.isArray(toolCalls)
|
|
) {
|
|
let toolIdx = 0;
|
|
for (const sig of signatures) {
|
|
if (typeof sig !== 'string' || sig.length === 0) continue;
|
|
if (toolIdx >= toolCalls.length) break;
|
|
const id = toolCalls[toolIdx++]?.id;
|
|
if (id) this.collectedThoughtSignatures[id] = sig;
|
|
}
|
|
}
|
|
} catch (error) {
|
|
logger.error('Error handling model end event:', error);
|
|
return this.finalize(errorMessage);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @deprecated Agent Chain helper
|
|
* @param {string | undefined} [last_agent_id]
|
|
* @param {string | undefined} [langgraph_node]
|
|
* @returns {boolean}
|
|
*/
|
|
function checkIfLastAgent(last_agent_id, langgraph_node) {
|
|
if (!last_agent_id || !langgraph_node) {
|
|
return false;
|
|
}
|
|
return langgraph_node?.endsWith(last_agent_id);
|
|
}
|
|
|
|
/**
|
|
* Helper to emit events either to res (standard mode) or to job emitter (resumable mode).
|
|
* In Redis mode, awaits the emit to guarantee event ordering (critical for streaming deltas).
|
|
* @param {ServerResponse} res - The server response object
|
|
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
|
|
* @param {Object} eventData - The event data to send
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async function emitEvent(res, streamId, eventData) {
|
|
if (streamId) {
|
|
await GenerationJobManager.emitChunk(streamId, eventData);
|
|
} else {
|
|
sendEvent(res, eventData);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Maps a {@link SubagentUpdateEvent} phase to the corresponding
|
|
* {@link GraphEvents} name that the SDK's `createContentAggregator`
|
|
* knows how to consume. Phases that don't carry content (`start`, `stop`,
|
|
* `error`) or whose payload doesn't match a handled event (`run_step`
|
|
* with an `ON_TOOL_EXECUTE`-shaped batch request rather than a RunStep)
|
|
* return `null` so the caller skips them.
|
|
* @param {SubagentUpdateEvent} event
|
|
* @returns {string | null}
|
|
*/
|
|
function subagentPhaseToGraphEvent(event) {
|
|
switch (event?.phase) {
|
|
case 'run_step':
|
|
/** `ON_RUN_STEP` and `ON_TOOL_EXECUTE` both forward with phase
|
|
* `run_step`; only the former matches the aggregator's RunStep
|
|
* schema. Detect by presence of `stepDetails`. */
|
|
return event.data?.stepDetails ? GraphEvents.ON_RUN_STEP : null;
|
|
case 'run_step_delta':
|
|
return GraphEvents.ON_RUN_STEP_DELTA;
|
|
case 'run_step_completed':
|
|
return GraphEvents.ON_RUN_STEP_COMPLETED;
|
|
case 'message_delta':
|
|
return GraphEvents.ON_MESSAGE_DELTA;
|
|
case 'reasoning_delta':
|
|
return GraphEvents.ON_REASONING_DELTA;
|
|
default:
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Folds a single {@link SubagentUpdateEvent} into the given content
|
|
* aggregator. Silent no-op for phases outside the aggregator's domain.
|
|
* @param {{ aggregateContent: Function }} aggregator
|
|
* @param {SubagentUpdateEvent} event
|
|
*/
|
|
function feedSubagentAggregator(aggregator, event) {
|
|
const graphEvent = subagentPhaseToGraphEvent(event);
|
|
if (!graphEvent) return;
|
|
aggregator.aggregateContent({ event: graphEvent, data: event.data });
|
|
}
|
|
|
|
/**
|
|
* @typedef {Object} ToolExecuteOptions
|
|
* @property {(toolNames: string[]) => Promise<{loadedTools: StructuredTool[]}>} loadTools - Function to load tools by name
|
|
* @property {Object} configurable - Configurable context for tool invocation
|
|
*/
|
|
|
|
/**
|
|
* Get default handlers for stream events.
|
|
* @param {Object} options - The options object.
|
|
* @param {ServerResponse} options.res - The server response object.
|
|
* @param {ContentAggregator} options.aggregateContent - Content aggregator function.
|
|
* @param {ToolEndCallback} options.toolEndCallback - Callback to use when tool ends.
|
|
* @param {Array<UsageMetadata>} options.collectedUsage - The list of collected usage metadata.
|
|
* @param {string | null} [options.streamId] - The stream ID for resumable mode, or null for standard mode.
|
|
* @param {ToolExecuteOptions} [options.toolExecuteOptions] - Options for event-driven tool execution.
|
|
* @param {UsageCostDeps} [options.usageCost] - Pricing context for authoritative per-event cost.
|
|
* @param {{ latest: TContextUsageEvent | null, count: number }} [options.contextUsageSink] - Mutable
|
|
* holder for the latest visible context snapshot + a count of visible snapshots (model calls),
|
|
* used to persist the breakdown only when the final call emitted usage.
|
|
* @param {Array<TTokenUsageEvent>} [options.usageEmitSink] - Array collecting each emitted
|
|
* `on_token_usage` payload (incl. cost) so the response's usage rollup can be persisted.
|
|
* @returns {Record<string, t.EventHandler>} The default handlers.
|
|
* @throws {Error} If the request is not found.
|
|
*/
|
|
function getDefaultHandlers({
|
|
res,
|
|
aggregateContent,
|
|
toolEndCallback,
|
|
collectedUsage,
|
|
collectedThoughtSignatures = null,
|
|
streamId = null,
|
|
toolExecuteOptions = null,
|
|
summarizationOptions = null,
|
|
subagentAggregatorsByToolCallId = null,
|
|
usageCost = null,
|
|
contextUsageSink = null,
|
|
usageEmitSink = null,
|
|
}) {
|
|
if (!res || !aggregateContent) {
|
|
throw new Error(
|
|
`[getDefaultHandlers] Missing required options: res: ${!res}, aggregateContent: ${!aggregateContent}`,
|
|
);
|
|
}
|
|
/**
|
|
* Emit a token-usage event, attaching the authoritative per-event USD cost
|
|
* when cost display is enabled. The backend is the single source of truth
|
|
* for pricing (premium tiers, cache rates) — the client sums these instead
|
|
* of re-deriving from base rates.
|
|
* @param {Record<string, unknown>} data
|
|
*/
|
|
const emitTokenUsage = ({ agentId, ...data }) => {
|
|
let payload = data;
|
|
if (usageCost?.enabled === true && usageCost.pricing) {
|
|
try {
|
|
/** Price with the producing agent's config (multi-endpoint graphs) so
|
|
* the streamed/persisted cost matches the per-agent balance transaction;
|
|
* `agentId` is resolved here, not forwarded to the client or rollup. */
|
|
const endpointTokenConfig = usageCost.resolveEndpointTokenConfig
|
|
? usageCost.resolveEndpointTokenConfig({ agentId })
|
|
: usageCost.endpointTokenConfig;
|
|
payload = {
|
|
...data,
|
|
cost: computeUsageCostUSD(data, usageCost.pricing, endpointTokenConfig),
|
|
};
|
|
} catch (err) {
|
|
logger.warn('[getDefaultHandlers] Failed to compute usage cost', err);
|
|
}
|
|
}
|
|
/** Collect the same payload the client folds so the response's usage rollup
|
|
* persisted on `metadata.usage` reproduces the live branch/total + cost. */
|
|
if (usageEmitSink) {
|
|
usageEmitSink.push(payload);
|
|
}
|
|
return emitEvent(res, streamId, { event: UsageEvents.ON_TOKEN_USAGE, data: payload });
|
|
};
|
|
const handlers = {
|
|
[GraphEvents.CHAT_MODEL_END]: new ModelEndHandler(
|
|
collectedUsage,
|
|
collectedThoughtSignatures,
|
|
emitTokenUsage,
|
|
),
|
|
[GraphEvents.TOOL_END]: new ToolEndHandler(toolEndCallback, logger),
|
|
[GraphEvents.ON_RUN_STEP]: {
|
|
/**
|
|
* Handle ON_RUN_STEP event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (data?.stepDetails.type === StepTypes.TOOL_CALLS) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else {
|
|
const agentName = metadata?.name ?? 'Agent';
|
|
const isToolCall = data?.stepDetails.type === StepTypes.TOOL_CALLS;
|
|
const action = isToolCall ? 'performing a task...' : 'thinking...';
|
|
await emitEvent(res, streamId, {
|
|
event: 'on_agent_update',
|
|
data: {
|
|
runId: metadata?.run_id,
|
|
message: `${agentName} is ${action}`,
|
|
},
|
|
});
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_RUN_STEP_DELTA]: {
|
|
/**
|
|
* Handle ON_RUN_STEP_DELTA event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (data?.delta.type === StepTypes.TOOL_CALLS) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_RUN_STEP_COMPLETED]: {
|
|
/**
|
|
* Handle ON_RUN_STEP_COMPLETED event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData & { result: ToolEndData }} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (data?.result != null) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_MESSAGE_DELTA]: {
|
|
/**
|
|
* Handle ON_MESSAGE_DELTA event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
[GraphEvents.ON_REASONING_DELTA]: {
|
|
/**
|
|
* Handle ON_REASONING_DELTA event.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
aggregateContent({ event, data });
|
|
if (checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node)) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
} else if (!metadata?.hide_sequential_outputs) {
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
},
|
|
};
|
|
|
|
if (toolExecuteOptions) {
|
|
handlers[GraphEvents.ON_TOOL_EXECUTE] = createToolExecuteHandler(toolExecuteOptions);
|
|
}
|
|
|
|
handlers[GraphEvents.ON_SUBAGENT_UPDATE] = {
|
|
/**
|
|
* Forwards subagent progress envelopes to the client stream, and
|
|
* (when a caller-owned aggregator map is provided) also folds each
|
|
* event into a per-tool-call `createContentAggregator`. The
|
|
* resulting `contentParts` are attached to the parent's `subagent`
|
|
* tool_call at message-save time so the child's reasoning / tool
|
|
* calls / final text survive a page refresh — in-memory Recoil
|
|
* atoms alone wouldn't persist that.
|
|
*
|
|
* Aggregation runs regardless of stream visibility (persistence +
|
|
* dialog depend on it), but the SSE forward respects
|
|
* `hide_sequential_outputs` the same way `ON_RUN_STEP`,
|
|
* `ON_MESSAGE_DELTA`, etc. do — so intermediate agents in a
|
|
* sequential chain don't leak their subagent activity when the
|
|
* chain is configured to suppress intermediates.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
const isLastAgent = checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node);
|
|
const visible = isLastAgent || !metadata?.hide_sequential_outputs;
|
|
/**
|
|
* Gate BOTH aggregation (persistence) AND streaming on the same
|
|
* visibility rule. If we aggregated for a hidden intermediate
|
|
* agent, `finalizeSubagentContent` would still attach its
|
|
* child's reasoning / tool output to the saved message — so a
|
|
* page refresh would reveal activity that was intentionally
|
|
* suppressed live. Treat hide_sequential_outputs as a
|
|
* consistent "don't record" rule for subagent traces.
|
|
*/
|
|
if (!visible) return;
|
|
if (subagentAggregatorsByToolCallId && data?.parentToolCallId) {
|
|
const key = data.parentToolCallId;
|
|
let aggregator = subagentAggregatorsByToolCallId.get(key);
|
|
if (!aggregator) {
|
|
aggregator = createContentAggregator();
|
|
subagentAggregatorsByToolCallId.set(key, aggregator);
|
|
}
|
|
try {
|
|
feedSubagentAggregator(aggregator, data);
|
|
} catch (err) {
|
|
logger.warn(
|
|
`[ON_SUBAGENT_UPDATE] Failed to aggregate phase "${data?.phase}" for tool_call ${key}: ${err?.message ?? err}`,
|
|
);
|
|
}
|
|
}
|
|
await emitEvent(res, streamId, { event, data });
|
|
},
|
|
};
|
|
|
|
if (summarizationOptions?.enabled !== false) {
|
|
handlers[GraphEvents.ON_SUMMARIZE_START] = {
|
|
handle: async (_event, data) => {
|
|
await emitEvent(res, streamId, {
|
|
event: GraphEvents.ON_SUMMARIZE_START,
|
|
data,
|
|
});
|
|
},
|
|
};
|
|
handlers[GraphEvents.ON_SUMMARIZE_DELTA] = {
|
|
handle: async (_event, data) => {
|
|
aggregateContent({ event: GraphEvents.ON_SUMMARIZE_DELTA, data });
|
|
await emitEvent(res, streamId, {
|
|
event: GraphEvents.ON_SUMMARIZE_DELTA,
|
|
data,
|
|
});
|
|
},
|
|
};
|
|
handlers[GraphEvents.ON_SUMMARIZE_COMPLETE] = {
|
|
handle: async (_event, data) => {
|
|
aggregateContent({ event: GraphEvents.ON_SUMMARIZE_COMPLETE, data });
|
|
await emitEvent(res, streamId, {
|
|
event: GraphEvents.ON_SUMMARIZE_COMPLETE,
|
|
data,
|
|
});
|
|
},
|
|
};
|
|
}
|
|
|
|
handlers[GraphEvents.ON_AGENT_LOG] = { handle: agentLogHandler };
|
|
|
|
/** Guarded: no-op when the installed @librechat/agents predates the event */
|
|
if (GraphEvents.ON_CONTEXT_USAGE) {
|
|
handlers[GraphEvents.ON_CONTEXT_USAGE] = {
|
|
/**
|
|
* Forward per-model-call context usage snapshots to the client,
|
|
* honoring the same sequential-agent visibility gate as deltas.
|
|
* @param {string} event - The event name.
|
|
* @param {StreamEventData} data - The event data.
|
|
* @param {GraphRunnableConfig['configurable']} [metadata] The runnable metadata.
|
|
*/
|
|
handle: async (event, data, metadata) => {
|
|
if (
|
|
checkIfLastAgent(metadata?.last_agent_id, metadata?.langgraph_node) ||
|
|
!metadata?.hide_sequential_outputs
|
|
) {
|
|
/** Capture the latest visible snapshot (last-wins) and how many usage
|
|
* events preceded it BEFORE awaiting the emit. `emitEvent` can yield
|
|
* (resumable SSE / Redis publish); with parallel runs active this
|
|
* call's own primary usage could land in `usageEmitSink` during that
|
|
* yield, pushing `latestUsageIndex` past the very event that proves the
|
|
* snapshot completed — the save path would then slice it away and drop
|
|
* a valid breakdown. The recorded index lets the save path persist only
|
|
* when a PRIMARY usage follows this snapshot (the snapshot's call
|
|
* actually invoked the model); a summarization detour emits a snapshot
|
|
* whose only following usage is tagged `summarization`, which a plain
|
|
* snapshot-count would over-count and wrongly drop. */
|
|
if (contextUsageSink) {
|
|
contextUsageSink.latest = data;
|
|
contextUsageSink.count = (contextUsageSink.count ?? 0) + 1;
|
|
contextUsageSink.latestUsageIndex = usageEmitSink?.length ?? 0;
|
|
}
|
|
await emitEvent(res, streamId, { event, data });
|
|
}
|
|
},
|
|
};
|
|
}
|
|
|
|
return handlers;
|
|
}
|
|
|
|
/**
|
|
* Helper to write attachment events either to res or to job emitter.
|
|
* Note: Attachments are not order-sensitive like deltas, so fire-and-forget is acceptable.
|
|
* @param {ServerResponse} res - The server response object
|
|
* @param {string | null} streamId - The stream ID for resumable mode, or null for standard mode
|
|
* @param {Object} attachment - The attachment data
|
|
*/
|
|
function writeAttachment(res, streamId, attachment) {
|
|
if (streamId) {
|
|
GenerationJobManager.emitChunk(streamId, { event: 'attachment', data: attachment });
|
|
} else {
|
|
res.write(`event: attachment\ndata: ${JSON.stringify(attachment)}\n\n`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Predicate: is it safe to push an SSE write to the caller right now?
|
|
*
|
|
* In `streamId` (resumable) mode, writes go to the job emitter and the
|
|
* `res` state is irrelevant — always writable.
|
|
*
|
|
* In standard mode, the caller's `res` must have headers sent (the
|
|
* stream has been opened) and not yet be `writableEnded` (the response
|
|
* hasn't closed). Writing to a closed stream raises
|
|
* `ERR_STREAM_WRITE_AFTER_END`.
|
|
*
|
|
* Used by deferred preview emits in both `createToolEndCallback`
|
|
* (chat-completions) and `createResponsesToolEndCallback` (Open
|
|
* Responses) so the gate logic stays in one place. (Comprehensive
|
|
* review #3 on PR #12957.)
|
|
*/
|
|
function isStreamWritable(res, streamId) {
|
|
if (streamId) {
|
|
return true;
|
|
}
|
|
return !!res && res.headersSent && !res.writableEnded;
|
|
}
|
|
|
|
/**
|
|
* Emit an update for an attachment that was previously sent with
|
|
* `status: 'pending'`. Fire-and-forget: if the response stream has
|
|
* already closed (the agent finished generating before the deferred
|
|
* preview resolved) the frontend's React Query polling on
|
|
* `/api/files/:file_id/preview` picks up the resolved record on its
|
|
* next tick. Skipping the write in that case avoids
|
|
* `ERR_STREAM_WRITE_AFTER_END`.
|
|
*
|
|
* Reuses the `attachment` SSE event name with a discriminated payload:
|
|
* the frontend's `useAttachmentHandler` upserts by `file_id`, so a
|
|
* second event with the same id and `status: 'ready' | 'failed'`
|
|
* overwrites the pending placeholder in place. No new event type, no
|
|
* new client listener.
|
|
*
|
|
* @param {ServerResponse} res
|
|
* @param {string | null} streamId
|
|
* @param {Object} attachment - Updated attachment payload (must carry `file_id`).
|
|
*/
|
|
function writeAttachmentUpdate(res, streamId, attachment) {
|
|
if (!isStreamWritable(res, streamId)) {
|
|
return;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerRequest} params.req
|
|
* @param {ServerResponse} params.res
|
|
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
|
|
* @param {string | null} [params.streamId] - The stream ID for resumable mode, or null for standard mode.
|
|
* @returns {ToolEndCallback} The tool end callback.
|
|
*/
|
|
function createToolEndCallback({ req, res, artifactPromises, streamId = null }) {
|
|
/**
|
|
* @type {ToolEndCallback}
|
|
*/
|
|
return async (data, metadata) => {
|
|
const output = data?.output;
|
|
if (!output) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact) {
|
|
return;
|
|
}
|
|
|
|
if (output.artifact[Tools.file_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const user = req.user;
|
|
const attachment = await processFileCitations({
|
|
user,
|
|
metadata,
|
|
appConfig: req.config,
|
|
toolArtifact: output.artifact,
|
|
toolCallId: output.tool_call_id,
|
|
});
|
|
if (!attachment) {
|
|
return null;
|
|
}
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing file citations:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.ui_resources]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.ui_resources,
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
|
|
};
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.web_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.web_search,
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
|
|
};
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.memory]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.memory,
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
[Tools.memory]: output.artifact[Tools.memory],
|
|
};
|
|
if (!streamId && !res.headersSent) {
|
|
return attachment;
|
|
}
|
|
writeAttachment(res, streamId, attachment);
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing memory artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact.content) {
|
|
/** @type {FormattedContent[]} */
|
|
const content = output.artifact.content;
|
|
for (let i = 0; i < content.length; i++) {
|
|
const part = content[i];
|
|
if (!part) {
|
|
continue;
|
|
}
|
|
if (part.type !== 'image_url') {
|
|
continue;
|
|
}
|
|
const { url } = part.image_url;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const filename = `${output.name}_img_${nanoid()}`;
|
|
const file_id = output.artifact.file_ids?.[i];
|
|
const file = await saveBase64Image(url, {
|
|
req,
|
|
file_id,
|
|
filename,
|
|
endpoint: metadata.provider,
|
|
context: FileContext.image_generation,
|
|
});
|
|
const fileMetadata = Object.assign(file, {
|
|
messageId: metadata.run_id,
|
|
toolCallId: output.tool_call_id,
|
|
conversationId: metadata.thread_id,
|
|
});
|
|
if (!streamId && !res.headersSent) {
|
|
return fileMetadata;
|
|
}
|
|
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
|
|
writeAttachment(res, streamId, fileMetadata);
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!isCodeArtifactToolOutput(output)) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact.files) {
|
|
return;
|
|
}
|
|
|
|
for (const file of output.artifact.files) {
|
|
/* `inherited` files are unchanged passthroughs of inputs the caller
|
|
* already owns (skill files, prior session inputs, inherited
|
|
* .dirkeep markers). Skip post-processing: re-downloading with the
|
|
* user's session key 403s when the file is entity-scoped, and the
|
|
* input is already persisted at its origin. They remain available
|
|
* to subsequent calls via primeInvokedSkills / session inheritance. */
|
|
if (file.inherited) {
|
|
continue;
|
|
}
|
|
const { id, name } = file;
|
|
const toolCallId = output.tool_call_id;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const result = await processCodeOutput({
|
|
req,
|
|
id,
|
|
name,
|
|
messageId: metadata.run_id,
|
|
toolCallId,
|
|
conversationId: metadata.thread_id,
|
|
/**
|
|
* Use the FILE's `storage_session_id` (storage session),
|
|
* not the top-level artifact `session_id` (exec session).
|
|
* The codeapi worker reports two distinct ids on a tool
|
|
* result:
|
|
* - `artifact.session_id` is the EXEC session — the
|
|
* sandbox VM that ran the bash command. Files don't
|
|
* live there; it's torn down post-execution.
|
|
* - `file.storage_session_id` is the STORAGE session —
|
|
* the file-server bucket prefix where artifacts
|
|
* actually live and are served from.
|
|
* `processCodeOutput` builds `/download/{session_id}/{id}`,
|
|
* so passing the exec id resolves to a path the file-server
|
|
* doesn't know about and 404s. Fall back to artifact-level
|
|
* for older worker payloads that may not populate per-file
|
|
* ids.
|
|
*/
|
|
session_id: file.storage_session_id ?? output.artifact.session_id,
|
|
});
|
|
const fileMetadata = result?.file ?? null;
|
|
const finalize = result?.finalize;
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
/* Initial emit: ship the attachment to the client immediately
|
|
* (carries `status: 'pending'` for office buckets so the UI
|
|
* shows "preparing preview…"). The agent's response stops
|
|
* blocking on extraction here.
|
|
*
|
|
* Use the shared `isStreamWritable` predicate rather than the
|
|
* narrower `streamId || res.headersSent` check that lived
|
|
* here before — a client disconnect mid-stream
|
|
* (`res.writableEnded`) would otherwise hit `res.write` and
|
|
* raise `ERR_STREAM_WRITE_AFTER_END` (caught by the outer
|
|
* IIFE catch but logged as noise). Same gate the Responses
|
|
* path uses below. */
|
|
if (isStreamWritable(res, streamId)) {
|
|
writeAttachment(res, streamId, fileMetadata);
|
|
}
|
|
/* Deferred preview rendering: extraction continues running
|
|
* even after the HTTP response closes. If the stream is still
|
|
* open when the preview resolves, push an `attachment`
|
|
* update event so the UI patches in place; otherwise React
|
|
* Query polling on `/api/files/:file_id/preview` picks it up.
|
|
*
|
|
* Spread the full updated record (mirroring the initial emit
|
|
* shape) and overlay `messageId`/`toolCallId` from the
|
|
* current run. The DB record preserves the original
|
|
* `messageId` across cross-turn filename reuse so
|
|
* `getCodeGeneratedFiles` can trace the file back to its
|
|
* original assistant message; routing the update SSE by the
|
|
* persisted id would land the patch on a stale message
|
|
* slot — turn-N's pending placeholder would stay stuck while
|
|
* turn-1's already-resolved attachment got re-merged.
|
|
* (Codex P1 review on PR #12957.) */
|
|
runPreviewFinalize({
|
|
finalize,
|
|
fileId: fileMetadata.file_id,
|
|
previewRevision: result?.previewRevision,
|
|
onResolved: (updated) => {
|
|
writeAttachmentUpdate(res, streamId, {
|
|
...updated,
|
|
messageId: metadata.run_id,
|
|
toolCallId,
|
|
});
|
|
},
|
|
});
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing code output:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Helper to write attachment events in Open Responses format (librechat:attachment)
|
|
* @param {ServerResponse} res - The server response object
|
|
* @param {Object} tracker - The response tracker with sequence number
|
|
* @param {Object} attachment - The attachment data
|
|
* @param {Object} metadata - Additional metadata (messageId, conversationId)
|
|
*/
|
|
function writeResponsesAttachment(res, tracker, attachment, metadata) {
|
|
const sequenceNumber = tracker.nextSequence();
|
|
writeAttachmentEvent(res, sequenceNumber, attachment, {
|
|
messageId: metadata.run_id,
|
|
conversationId: metadata.thread_id,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Creates a tool end callback specifically for the Responses API.
|
|
* Emits attachments as `librechat:attachment` events per the Open Responses extension spec.
|
|
*
|
|
* @param {Object} params
|
|
* @param {ServerRequest} params.req
|
|
* @param {ServerResponse} params.res
|
|
* @param {Object} params.tracker - Response tracker with sequence number
|
|
* @param {Promise<MongoFile | { filename: string; filepath: string; expires: number;} | null>[]} params.artifactPromises
|
|
* @returns {ToolEndCallback} The tool end callback.
|
|
*/
|
|
function createResponsesToolEndCallback({ req, res, tracker, artifactPromises }) {
|
|
/**
|
|
* @type {ToolEndCallback}
|
|
*/
|
|
return async (data, metadata) => {
|
|
const output = data?.output;
|
|
if (!output) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact) {
|
|
return;
|
|
}
|
|
|
|
if (output.artifact[Tools.file_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const user = req.user;
|
|
const attachment = await processFileCitations({
|
|
user,
|
|
metadata,
|
|
appConfig: req.config,
|
|
toolArtifact: output.artifact,
|
|
toolCallId: output.tool_call_id,
|
|
});
|
|
if (!attachment) {
|
|
return null;
|
|
}
|
|
// For Responses API, emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing file citations:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.ui_resources]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.ui_resources,
|
|
toolCallId: output.tool_call_id,
|
|
[Tools.ui_resources]: output.artifact[Tools.ui_resources].data,
|
|
};
|
|
// For Responses API, always emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact[Tools.web_search]) {
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const attachment = {
|
|
type: Tools.web_search,
|
|
toolCallId: output.tool_call_id,
|
|
[Tools.web_search]: { ...output.artifact[Tools.web_search] },
|
|
};
|
|
// For Responses API, always emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
return attachment;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
|
|
if (output.artifact.content) {
|
|
/** @type {FormattedContent[]} */
|
|
const content = output.artifact.content;
|
|
for (let i = 0; i < content.length; i++) {
|
|
const part = content[i];
|
|
if (!part) {
|
|
continue;
|
|
}
|
|
if (part.type !== 'image_url') {
|
|
continue;
|
|
}
|
|
const { url } = part.image_url;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const filename = `${output.name}_img_${nanoid()}`;
|
|
const file_id = output.artifact.file_ids?.[i];
|
|
const file = await saveBase64Image(url, {
|
|
req,
|
|
file_id,
|
|
filename,
|
|
endpoint: metadata.provider,
|
|
context: FileContext.image_generation,
|
|
});
|
|
const fileMetadata = Object.assign(file, {
|
|
toolCallId: output.tool_call_id,
|
|
});
|
|
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
|
|
// For Responses API, emit attachment during streaming
|
|
if (res.headersSent && !res.writableEnded) {
|
|
const attachment = {
|
|
file_id: fileMetadata.file_id,
|
|
filename: fileMetadata.filename,
|
|
type: fileMetadata.type,
|
|
url: fileMetadata.filepath,
|
|
width: fileMetadata.width,
|
|
height: fileMetadata.height,
|
|
tool_call_id: output.tool_call_id,
|
|
};
|
|
writeResponsesAttachment(res, tracker, attachment, metadata);
|
|
}
|
|
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing artifact content:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!isCodeArtifactToolOutput(output)) {
|
|
return;
|
|
}
|
|
|
|
if (!output.artifact.files) {
|
|
return;
|
|
}
|
|
|
|
for (const file of output.artifact.files) {
|
|
/* `inherited` files are unchanged passthroughs of inputs the caller
|
|
* already owns (skill files, prior session inputs, inherited
|
|
* .dirkeep markers). Skip post-processing: re-downloading with the
|
|
* user's session key 403s when the file is entity-scoped, and the
|
|
* input is already persisted at its origin. They remain available
|
|
* to subsequent calls via primeInvokedSkills / session inheritance. */
|
|
if (file.inherited) {
|
|
continue;
|
|
}
|
|
const { id, name } = file;
|
|
const toolCallId = output.tool_call_id;
|
|
artifactPromises.push(
|
|
(async () => {
|
|
const result = await processCodeOutput({
|
|
req,
|
|
id,
|
|
name,
|
|
messageId: metadata.run_id,
|
|
toolCallId,
|
|
conversationId: metadata.thread_id,
|
|
/**
|
|
* Use the FILE's `storage_session_id` (storage session),
|
|
* not the top-level artifact `session_id` (exec session).
|
|
* The codeapi worker reports two distinct ids on a tool
|
|
* result:
|
|
* - `artifact.session_id` is the EXEC session — the
|
|
* sandbox VM that ran the bash command. Files don't
|
|
* live there; it's torn down post-execution.
|
|
* - `file.storage_session_id` is the STORAGE session —
|
|
* the file-server bucket prefix where artifacts
|
|
* actually live and are served from.
|
|
* `processCodeOutput` builds `/download/{session_id}/{id}`,
|
|
* so passing the exec id resolves to a path the file-server
|
|
* doesn't know about and 404s. Fall back to artifact-level
|
|
* for older worker payloads that may not populate per-file
|
|
* ids.
|
|
*/
|
|
session_id: file.storage_session_id ?? output.artifact.session_id,
|
|
});
|
|
const fileMetadata = result?.file ?? null;
|
|
const finalize = result?.finalize;
|
|
if (!fileMetadata) {
|
|
return null;
|
|
}
|
|
|
|
/* Initial emit (Open Responses extension format). The agent's
|
|
* response no longer blocks on extraction. */
|
|
if (isStreamWritable(res, null)) {
|
|
writeResponsesAttachment(
|
|
res,
|
|
tracker,
|
|
buildResponsesAttachment(fileMetadata, toolCallId),
|
|
metadata,
|
|
);
|
|
}
|
|
|
|
/* Deferred preview rendering: extract HTML in the background
|
|
* and emit a follow-up `librechat:attachment` with the same
|
|
* `file_id` so the client merges the resolved record over the
|
|
* pending placeholder. Fire-and-forget — survives response
|
|
* close; polling covers the post-close gap. */
|
|
runPreviewFinalize({
|
|
finalize,
|
|
fileId: fileMetadata.file_id,
|
|
previewRevision: result?.previewRevision,
|
|
onResolved: (updated) => {
|
|
if (!isStreamWritable(res, null)) {
|
|
return;
|
|
}
|
|
writeResponsesAttachment(
|
|
res,
|
|
tracker,
|
|
buildResponsesAttachment(updated, toolCallId),
|
|
metadata,
|
|
);
|
|
},
|
|
});
|
|
|
|
return fileMetadata;
|
|
})().catch((error) => {
|
|
logger.error('Error processing code output:', error);
|
|
return null;
|
|
}),
|
|
);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Project a file metadata record into the Open Responses attachment
|
|
* shape. Mirrors the legacy inline projection but adds `status` and
|
|
* `previewError` so deferred preview updates carry the lifecycle
|
|
* signal the client uses to upsert by `file_id`.
|
|
*/
|
|
function buildResponsesAttachment(fileMetadata, toolCallId) {
|
|
return {
|
|
file_id: fileMetadata.file_id,
|
|
filename: fileMetadata.filename,
|
|
type: fileMetadata.type,
|
|
url: fileMetadata.filepath,
|
|
width: fileMetadata.width,
|
|
height: fileMetadata.height,
|
|
tool_call_id: toolCallId,
|
|
text: fileMetadata.text ?? null,
|
|
textFormat: fileMetadata.textFormat ?? null,
|
|
status: fileMetadata.status,
|
|
previewError: fileMetadata.previewError,
|
|
};
|
|
}
|
|
|
|
const ALLOWED_LOG_LEVELS = new Set(['debug', 'info', 'warn', 'error']);
|
|
|
|
function agentLogHandler(_event, data) {
|
|
if (!data) {
|
|
return;
|
|
}
|
|
const logFn = ALLOWED_LOG_LEVELS.has(data.level) ? logger[data.level] : logger.debug;
|
|
const meta = typeof data.data === 'object' && data.data != null ? data.data : {};
|
|
logFn(`[agents:${data.scope ?? 'unknown'}] ${data.message ?? ''}`, {
|
|
...meta,
|
|
runId: data.runId,
|
|
agentId: data.agentId,
|
|
});
|
|
}
|
|
|
|
function markSummarizationUsage(usage, metadata) {
|
|
const node = metadata?.langgraph_node;
|
|
if (typeof node === 'string' && node.startsWith(GraphNodeKeys.SUMMARIZE)) {
|
|
return { ...usage, usage_type: 'summarization' };
|
|
}
|
|
return usage;
|
|
}
|
|
|
|
const agentLogHandlerObj = { handle: agentLogHandler };
|
|
|
|
/**
|
|
* Builds the three summarization SSE event handlers.
|
|
* In streaming mode, each event is forwarded to the client via `res.write`.
|
|
* In non-streaming mode, the handlers are no-ops.
|
|
* @param {{ isStreaming: boolean, res: import('express').Response }} opts
|
|
*/
|
|
function buildSummarizationHandlers({ isStreaming, res }) {
|
|
if (!isStreaming) {
|
|
const noop = { handle: () => {} };
|
|
return { on_summarize_start: noop, on_summarize_delta: noop, on_summarize_complete: noop };
|
|
}
|
|
const writeEvent = (name) => ({
|
|
handle: async (_event, data) => {
|
|
if (!res.writableEnded) {
|
|
res.write(`event: ${name}\ndata: ${JSON.stringify(data)}\n\n`);
|
|
}
|
|
},
|
|
});
|
|
return {
|
|
on_summarize_start: writeEvent('on_summarize_start'),
|
|
on_summarize_delta: writeEvent('on_summarize_delta'),
|
|
on_summarize_complete: writeEvent('on_summarize_complete'),
|
|
};
|
|
}
|
|
|
|
module.exports = {
|
|
ModelEndHandler,
|
|
agentLogHandler,
|
|
agentLogHandlerObj,
|
|
getDefaultHandlers,
|
|
createToolEndCallback,
|
|
isStreamWritable,
|
|
markSummarizationUsage,
|
|
buildSummarizationHandlers,
|
|
createResponsesToolEndCallback,
|
|
};
|