mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-05-13 16:07:30 +00:00
* 🧱 refactor: typed CodeEnvRef + kind discriminator + tenant-aware sandbox cache Final cutover for the LibreChat ↔ codeapi sandbox file identity. Replaces the magic string `${session_id}/${file_id}?entity_id=...` with a typed, discriminated `CodeEnvRef`. Pre-release lockstep deploy with codeapi #1455 and agents #148; no legacy aliases retained. ## Final shape ```ts type CodeEnvRef = | { kind: 'skill'; id: string; storage_session_id: string; file_id: string; version: number } | { kind: 'agent'; id: string; storage_session_id: string; file_id: string } | { kind: 'user'; id: string; storage_session_id: string; file_id: string }; ``` `kind` drives codeapi's sessionKey: `<tenant>:<kind>:<id>[✌️<version>]` for shared kinds, `<tenant>:user:<userId>` for user-private (auth context provides `userId`). `version` is statically required for `kind: 'skill'` and forbidden otherwise via discriminated union — constraint holds at compile time on every consumer, not just codeapi's runtime validator. `id` is sessionKey-meaningful for `'skill'` / `'agent'`; informational only for `'user'` (codeapi resolves user identity from auth context). ## What changed - `packages/data-provider/src/codeEnvRef.ts` — discriminated union + `CODE_ENV_KINDS` const-tuple keeps the runtime list and TS union locked together. - Schemas: `metadata.codeEnvRef` and `SkillFile.codeEnvRef` enums tightened to `['skill', 'agent', 'user']`. - `primeSkillFiles` writes `kind: 'skill'`, `id: skill._id`, `version: skill.version`. Cache-hit path reads `codeEnvRef` directly. Bumping `skill.version` on edit naturally invalidates the prior cache entry under the new sessionKey. - `processCodeOutput` writes `kind: 'user'`, `id: req.user.id`. Output bucket is always user-scoped, regardless of which skill the execution invoked. New regression test pins the asymmetry. - `primeFiles` reupload preserves `kind`/`id`/`version?` from the existing ref so a skill-cache-miss reupload doesn't silently demote to user bucket. - `crud.js` upload functions (`uploadCodeEnvFile` / `batchUploadCodeEnvFiles`) thread `kind`/`id`/`version?` to the multipart form (codeapi #1455 option α). Without these on the wire, codeapi falls back to user bucketing and skill-cache invalidation never fires. Client-side validation mirrors codeapi's validator. - `Files/process.js` — chat attachments use `kind: 'user'`; agent setup files use `kind: 'agent'`. - Drops `entity_id` everywhere (struct, schema sub-docs, write paths, upload form fields). Drops `'system'` from the kind enum (no emitter ever existed). ## Test plan - [x] `cd packages/data-provider && npx jest src/codeEnvRef.spec` — 4 / 4 - [x] `cd packages/data-schemas && npx jest` — 1447 / 1447 - [x] `cd packages/api && npx jest src/agents` — 81 / 81 in skillFiles + handlers + resources - [x] `cd api && npx jest server/services/Files server/controllers/agents` — 436 / 436 - [x] `cd api && npx jest server/services/Files/Code` — 98 / 98 (incl. new "outputs are user-scoped regardless of which skill the execution invoked" regression and "reupload forwards kind/id/version from existing ref") - [x] `npx tsc --noEmit -p packages/data-{provider,schemas}/tsconfig.json && npx tsc --noEmit -p packages/api/tsconfig.json` — clean (only pre-existing unrelated dev errors in storage/balance, untouched here) ## Deploy notes - **24h cache-miss burst** on first deploy. Inputs (skill caches re-prime under new sessionKey shape) and outputs (any pre-Phase C skill-output cached files become unreadable). Bounded by codeapi's 24h TTL. - **Lockstep with codeapi #1455 and agents #148.** Either repo can land first since no aliases to drain, but the three deploys must overlap within the same maintenance window. - **`@librechat/agents` bump to `3.1.79-dev.0`** required after agents #148 lands and is published. ## What this enables Auth bridge work (JWT-based tenant/user identity between LC and codeapi) — codeapi now derives sessionKey purely from `req.codeApiAuthContext.{ tenantId, userId}`, so the next chapter is replacing the header-asserted user identity with a verified-claim path. * 🩹 fix: persist execute_code uploads under codeEnvRef metadata key Codex review P1 (chatgpt-codex-connector). `Files/process.js` was storing the upload result under `metadata.fileIdentifier` even though: - `uploadCodeEnvFile` now returns `{ storage_session_id, file_id }`, not the legacy magic string. - The post-cutover schema (`File.metadata.codeEnvRef`) only declares `codeEnvRef` — mongoose strict mode silently strips unknown keys. - All readers (`primeFiles`, `getCodeFilesByIds`, `categorizeFileForToolResources`, controller filtering) check `metadata.codeEnvRef`. Net effect of the bug: chat-attached and agent-setup execute_code files would lose their sandbox reference on save, and primeFiles would skip them on subsequent code-execution turns — the file blob would still be available locally but never re-mounted in the sandbox. Fix: construct the full `CodeEnvRef` (`{ kind, id, storage_session_id, file_id }`) at the write site and persist under `metadata.codeEnvRef`. `BaseClient`'s "is this a code-env file" presence check accepts the new shape alongside the legacy `fileIdentifier` for back-compat with any pre-cutover records still in the database. Mirrors the same change in `processAttachments.spec.ts` (which re-implements the BaseClient logic for testability). New regression tests in `process.spec.js` cover three cases: - chat attachments (`messageAttachment=true`) → `kind: 'user'` - agent setup (`messageAttachment=false`) → `kind: 'agent'` - legacy `fileIdentifier` key is NOT persisted (would be schema-stripped) * 🩹 fix: read storage_session_id on primed file refs (Codex P1) Codex review (chatgpt-codex-connector). After Phase B's per-file `session_id` → `storage_session_id` rename, `primeFiles` emits the new field — but `seedCodeFilesIntoSessions` was still reading `files[0].session_id` for the representative session and `f.session_id` for the dedupe key. In runs with only primed attachments (no skill seed), `representativeSessionId` was `undefined`, the function returned the unchanged map, and `seedCodeFilesIntoSessions` silently dropped the entire batch. The first `execute_code` call then started without `_injected_files` and the agent couldn't see prior-turn artifacts. Fix: - `codeFilesSession.ts`: read `f.storage_session_id` for both the dedupe key and the representative session id. JSDoc updated to match the new field name. - `callbacks.js`: the two output-file persistence paths read `file.session_id` to pass to `processCodeOutput` — switch to `file.storage_session_id`. The original comment explicitly says this should be the STORAGE session, which is exactly the field Phase B renamed. - `codeFilesSession.spec.ts`: fixture builder uses `storage_session_id` and `kind: 'user'` to match the post-cutover `CodeEnvFile` shape. Lockstep coordination: this matches the post-bump shape of `@librechat/agents` 3.1.79+. CI tsc errors against the currently-pinned 3.1.78 are expected and resolve when the dep bumps in this PR before merge. * 📦 chore: Bump `@librechat/agents` to version 3.1.80-dev.0 in package-lock and package.json files * 🪪 fix: thread kind/id/version through codeapi /download URLs (Phase C α) Symmetric fix for the upload-side wire change in 537725a. Codeapi's `sessionAuth` middleware now requires `kind`/`id`/`version?` on every download/freshness URL — without them it 400s with "kind must be one of: skill, agent, user" before serving the file. Three sites construct codeapi-side URLs that go through `sessionAuth`: - `processCodeOutput` (`Files/Code/process.js`): `/download/<sess>/<id>` for freshly-generated sandbox outputs. Always `kind: 'user'` + `id: req.user.id` — code-output files are always user-private, regardless of which skill the run invoked. - `getSessionInfo` (`Files/Code/process.js`): `/sessions/<sess>/objects/<id>` for the 23h freshness check. Pulls kind/id/version straight off the `codeEnvRef` already in scope — skill files stay skill-bucketed, user files stay user-bucketed. - `/code/download/:session_id/:fileId` LC route (`routes/files/files.js`): proxies to codeapi for manual downloads. Code-output files only on this route, so `kind: 'user'` + `id: req.user.id`. The `getCodeOutputDownloadStream` helper in `crud.js` now takes an `identity` param, validated by a `buildCodeEnvDownloadQuery` helper that mirrors `appendCodeEnvFileIdentity`'s shape rules: kind required from the closed `{skill, agent, user}` set, version required for 'skill' and forbidden otherwise. Bad callers fail fast on the client instead of round-tripping a 400. Also cleans up two log-noise sources reported alongside the 400: - `logAxiosError` in `packages/api/src/utils/axios.ts` was dumping `error.response.data` raw. With `responseType: 'arraybuffer'` that's a `Buffer` (~4 chars per byte after JSON-serialization); with `responseType: 'stream'` it's a `Readable` whose internal state serializes the entire ring buffer + socket. New `renderResponseData` decodes small buffers as UTF-8 (truncated past 2KB) and stubs streams as `'[stream]'`. Diagnostics stay useful, log lines stop being megabytes. - `/code/download` route's catch was bare `logger.error('...', error)`, bypassing the redactor. Switched to `logAxiosError` so it benefits from the same buffer/stream handling. Tests updated to match the new contract: - crud.spec: `getCodeOutputDownloadStream` fixtures pass `userIdentity`; new cases cover skill identity (with version), bad kind rejection, skill-without-version rejection. - process.spec: `getSessionInfo` test passes a full `codeEnvRef` object. * ♻️ refactor: extract codeEnv identity helpers into packages/api Per the project convention that new backend code lives in TypeScript under `packages/api`, moves `appendCodeEnvFileIdentity` and `buildCodeEnvDownloadQuery` from `api/server/services/Files/Code/crud.js` into a new `packages/api/src/files/code/identity.ts` module. Both helpers are pure validators that mirror codeapi's `parseUploadSessionKeyInput` server-side rules (closed kind set, `version` required for `'skill'` and forbidden otherwise) — they deserve TS support and a dedicated spec rather than living as JSDoc-typed helpers in the legacy `/api` workspace. The new module: - Exports a `CodeEnvIdentity` interface using the `librechat-data-provider` `CodeEnvKind` discriminated union. - Adds 13 unit tests in `identity.spec.ts` covering the validation matrix (skill+version, agent, user, and every rejection path) plus URL encoding for the download query. - Re-exported from `packages/api/src/files/code/index.ts` alongside `classify`, `extract`, and `form`. Consumer updates: - `api/server/services/Files/Code/crud.js`: drops the local helpers and imports them from `@librechat/api`. Net -64 lines. - `api/server/services/Files/Code/process.js`: same. - Test mocks for `@librechat/api` in three spec files now stub the helpers' validation behavior locally rather than pulling them through `requireActual` (which would drag in provider-config init-time side effects). The package's `exports` field only surfaces the root barrel, so leaf imports aren't reachable from legacy `/api` test setup. No runtime behavior change. Identity validation rules and emitted form/query shapes are byte-for-byte identical pre/post. * 🪪 fix: emit resource_id alongside id on _injected_files (skill 403 fix) Companion to codeapi #1455 fix and agents 3.1.80-dev.1 — the wire shape for shared-kind files now requires `resource_id` distinct from the storage `id`. Without this LC change, codeapi's sessionKey re-derivation on every shared-kind /exec rejects with 403 session_key_mismatch: cached: legacy:skill:69dcf561...✌️59 (signed at upload, skill _id) derived: legacy:skill:ysPwEURuPk-...✌️59 (storage nanoid) Emit sites updated: - `primeInvokedSkills` cache-hit path: `resource_id: ref.id` (the persisted skill `_id` from `codeEnvRef.id`); `id: ref.file_id` unchanged (storage uuid). - `primeInvokedSkills` fresh-upload path: `resource_id: skill._id.toString()` on every primed file (the `allPrimedFiles` builder type now carries the field). - `processCodeOutput`'s `pushFile` (Code/process.js): `resource_id: ref.id` — for `kind: 'user'` this is informational (codeapi derives sessionKey from auth context) but emitted for shape uniformity with shared kinds. Bumps `@librechat/agents` to `^3.1.80-dev.1` (the version that ships the matching `CodeEnvFile.resource_id` field). ## Test plan - [x] `cd packages/api && npx jest src/agents` — 67 / 67 pass (skillFiles fixtures updated to assert `resource_id` on the emitted CodeSessionContext.files). - [x] `cd api && npx jest server/services/Files server/controllers/agents` — 445 / 445 pass (process.spec fixtures updated for the reupload + cache-hit emission). - [x] `npx tsc --noEmit -p packages/api/tsconfig.json` — clean. * fix(skill-tool-call): carry resource_id through primeSkillFiles → artifact Codeapi was 400ing every /exec following a `handle_skill` tool call with `resource_id is invalid` (`type: 'undefined'`). Both code paths in `primeSkillFiles` (cache-hit + fresh-upload) returned files without `resource_id`/`kind`/`version`, and the artifact in `handlers.ts` forwarded the stripped shape into `tc.codeSessionContext.files` → `_injected_files`. `primeInvokedSkills` (the NL-detected loader) had already been fixed end-to-end; this commit aligns the tool-invoked path with the same contract: `resource_id` = `skill._id.toString()`, `kind: 'skill'`, `version` = the skill's monotonic counter. Tests added to `skillFiles.spec.ts` lock the contract on `primeSkillFiles` directly so future refactors can't silently drop the resource identity again. * fix(handlers.spec): align session_id → storage_session_id rename + kind discriminator Pre-existing TS errors against the post-rename `CodeEnvFile` shape: the test file still used `session_id` on per-file objects (renamed to `storage_session_id` in agents Phase B/C) and was missing the `kind` discriminator the discriminated union requires. Both inputs and the matching `expect.toEqual(...)` mirrors updated together so the runtime equality check still holds. Lines 723-732 stay as-is — they sit behind `as unknown as ToolCallRequest` and TS already skipped them. * chore: fix `@librechat/agents`, correct version to 3.1.80-dev.0 in package.json files * chore: bump `@librechat/agents` to version 3.1.80-dev.1 in package.json and package-lock.json * chore: bump `@librechat/agents` to version 3.1.80-dev.2 * feat(observability): trace file priming chain from primeCodeFiles to _injected_files Diagnosing the user-upload "files=[] on first /exec" bug requires seeing where in the LC chain a file ref disappears. Prior to this patch the chain (primeCodeFiles → primedCodeFiles → initialSessions → CodeSessionContext → _injected_files) was opaque end-to-end: - primeCodeFiles silently dropped files without `metadata.codeEnvRef` - reuploadFile catches all errors and continues with no signal - the handlers.ts handoff to codeapi never logged what it was sending After this patch, a single grep on `[primeCodeFiles]` plus `[code-env:inject]` shows the full per-file path: [primeCodeFiles] in: file_ids=N resourceFiles=M [primeCodeFiles] file=<id> path=skip reason=no-codeenvref filename=... [primeCodeFiles] file=<id> path=cache-hit-by-session storage_session_id=... [primeCodeFiles] file=<id> path=reupload reason=no-uploadtime ... [primeCodeFiles] file=<id> path=reupload reason=stale ... [primeCodeFiles] file=<id> path=reupload-success oldSession=... newSession=... newFileId=... [primeCodeFiles] file=<id> path=reupload-failed session=... [primeCodeFiles] file=<id> path=fresh-active storage_session_id=... [primeCodeFiles] out: returned=N skippedNoRef=M reuploadFailures=K [code-env:inject] tool=<name> files=N missingResourceId=K (debug) [code-env:inject] M/N files missing resource_id ... (warn) [code-env:inject] tool=<name> _injected_files=0 ... (warn) The boundary log warns when LC sends zero injected files on a code-execution tool call — that's the user's actual symptom showing up at the LC side instead of having to correlate against codeapi's `Request received { files: [] }`. Tag chosen as `[code-env:inject]` rather than `[handoff:exec]` to avoid collision with the app-level "handoff" semantic (subagent handoff workflow). Structural cleanup in primeFiles: replaced the `if (ref) { ... }` nesting with an early `if (!ref) continue` so the per-path instrumentation hooks land at top-level scope instead of indented inside a conditional. Behavior unchanged; pushFile / reuploadFile identical. Spec fixtures (handlers.spec.ts, codeFilesSession.spec.ts) updated to include `resource_id` on `CodeEnvFile` literals — required by the post-3.1.80-dev.2 type now installed. ## Test plan - [x] `cd packages/api && npx jest src/agents/handlers.spec.ts src/agents/codeFilesSession.spec.ts src/agents/skillFiles.spec.ts` — 69/69 pass - [x] `cd api && npx jest server/services/Files/Code/process.spec.js` — 84/84 pass - [x] `npx tsc --noEmit -p packages/api` — clean - [x] `npx eslint` on all four touched files — clean * chore: add CONSOLE_JSON_STRING_LENGTH to .env.example for JSON log string length configuration * fix(files): align codeapi upload filename with LC's sanitized DB filename User-attached files for code execution were uploading to codeapi under `file.originalname` (raw upload filename, may contain spaces / special chars) while LC's DB record stored the sanitized form (`sanitizeFilename(file.originalname)`, underscores). Codeapi preserves whatever filename the upload sent, so the sandbox saw `/mnt/data/<originalname>` while LC's `primeFiles` toolContext text + `_injected_files.name` referenced `file.filename` (sanitized). Visible failure: agent gets system prompt saying /mnt/data/librechat_code_api_-_active_customer_-_2025-11-05.xlsx …tries that path, hits `FileNotFoundError`, then notices the sandbox's actual `Available files` line says /mnt/data/librechat code api - active customer - 2025-11-05.xlsx …retries with spaces, succeeds. Wastes a tool call per upload and leaks raw filenames into model context. Fix: sanitize once and use the sanitized form in both the codeapi upload AND the LC DB record. Sandbox path = LC toolContext text = in-memory ref name. No drift. Reupload path (`Code/process.js` line 867 `filename: file.filename`) already uses the sanitized DB name, so it stays consistent with the fresh-upload path after this change. ## Test plan - [x] `cd api && npx jest server/services/Files/process` — 32/32 pass - [x] `npx eslint` on the touched file — clean * chore: bump `@librechat/agents` to version 3.1.80-dev.3 in package.json and package-lock.json
1377 lines
45 KiB
JavaScript
1377 lines
45 KiB
JavaScript
const crypto = require('crypto');
|
|
const fetch = require('node-fetch');
|
|
const { logger } = require('@librechat/data-schemas');
|
|
const {
|
|
countTokens,
|
|
checkBalance,
|
|
getBalanceConfig,
|
|
buildMessageFiles,
|
|
extractFileContext,
|
|
encodeAndFormatAudios,
|
|
encodeAndFormatVideos,
|
|
encodeAndFormatDocuments,
|
|
} = require('@librechat/api');
|
|
const {
|
|
Constants,
|
|
FileSources,
|
|
ContentTypes,
|
|
excludedKeys,
|
|
EModelEndpoint,
|
|
mergeFileConfig,
|
|
isParamEndpoint,
|
|
isAgentsEndpoint,
|
|
isEphemeralAgentId,
|
|
supportsBalanceCheck,
|
|
isBedrockDocumentType,
|
|
getEndpointFileConfig,
|
|
} = require('librechat-data-provider');
|
|
const { getStrategyFunctions } = require('~/server/services/Files/strategies');
|
|
const { logViolation } = require('~/cache');
|
|
const TextStream = require('./TextStream');
|
|
const db = require('~/models');
|
|
|
|
class BaseClient {
|
|
constructor(apiKey, options = {}) {
|
|
this.apiKey = apiKey;
|
|
this.sender = options.sender ?? 'AI';
|
|
this.currentDateString = new Date().toLocaleDateString('en-us', {
|
|
year: 'numeric',
|
|
month: 'long',
|
|
day: 'numeric',
|
|
});
|
|
/** @type {boolean} */
|
|
this.skipSaveConvo = false;
|
|
/** @type {boolean} */
|
|
this.skipSaveUserMessage = false;
|
|
/** @type {string} */
|
|
this.user;
|
|
/** @type {string} */
|
|
this.conversationId;
|
|
/** @type {string} */
|
|
this.responseMessageId;
|
|
/** @type {string} */
|
|
this.parentMessageId;
|
|
/** @type {TAttachment[]} */
|
|
this.attachments;
|
|
/** The key for the usage object's input tokens
|
|
* @type {string} */
|
|
this.inputTokensKey = 'prompt_tokens';
|
|
/** The key for the usage object's output tokens
|
|
* @type {string} */
|
|
this.outputTokensKey = 'completion_tokens';
|
|
/** @type {Set<string>} */
|
|
this.savedMessageIds = new Set();
|
|
/**
|
|
* Flag to determine if the client re-submitted the latest assistant message.
|
|
* @type {boolean | undefined} */
|
|
this.continued;
|
|
/**
|
|
* Flag to determine if the client has already fetched the conversation while saving new messages.
|
|
* @type {boolean | undefined} */
|
|
this.fetchedConvo;
|
|
/** @type {TMessage[]} */
|
|
this.currentMessages = [];
|
|
/** @type {import('librechat-data-provider').VisionModes | undefined} */
|
|
this.visionMode;
|
|
/** @type {import('librechat-data-provider').FileConfig | undefined} */
|
|
this._mergedFileConfig;
|
|
/** @type {import('librechat-data-provider').EndpointFileConfig | undefined} */
|
|
this._endpointFileConfig;
|
|
}
|
|
|
|
setOptions() {
|
|
throw new Error("Method 'setOptions' must be implemented.");
|
|
}
|
|
|
|
async getCompletion() {
|
|
throw new Error("Method 'getCompletion' must be implemented.");
|
|
}
|
|
|
|
/** @type {sendCompletion} */
|
|
async sendCompletion() {
|
|
throw new Error("Method 'sendCompletion' must be implemented.");
|
|
}
|
|
|
|
getSaveOptions() {
|
|
throw new Error('Subclasses must implement getSaveOptions');
|
|
}
|
|
|
|
async buildMessages() {
|
|
throw new Error('Subclasses must implement buildMessages');
|
|
}
|
|
|
|
async summarizeMessages() {
|
|
throw new Error('Subclasses attempted to call summarizeMessages without implementing it');
|
|
}
|
|
|
|
/**
|
|
* @returns {string}
|
|
*/
|
|
getResponseModel() {
|
|
if (isAgentsEndpoint(this.options.endpoint) && this.options.agent && this.options.agent.id) {
|
|
return this.options.agent.id;
|
|
}
|
|
|
|
return this.modelOptions?.model ?? this.model;
|
|
}
|
|
|
|
/**
|
|
* Abstract method to get the token count for a message. Subclasses must implement this method.
|
|
* @param {TMessage} responseMessage
|
|
* @returns {number}
|
|
*/
|
|
getTokenCountForResponse(responseMessage) {
|
|
logger.debug('[BaseClient] `recordTokenUsage` not implemented.', {
|
|
messageId: responseMessage?.messageId,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Abstract method to record token usage. Subclasses must implement this method.
|
|
* If a correction to the token usage is needed, the method should return an object with the corrected token counts.
|
|
* Should only be used if `recordCollectedUsage` was not used instead.
|
|
* @param {string} [model]
|
|
* @param {AppConfig['balance']} [balance]
|
|
* @param {number} promptTokens
|
|
* @param {number} completionTokens
|
|
* @param {string} [messageId]
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async recordTokenUsage({ model, balance, promptTokens, completionTokens, messageId }) {
|
|
logger.debug('[BaseClient] `recordTokenUsage` not implemented.', {
|
|
model,
|
|
balance,
|
|
messageId,
|
|
promptTokens,
|
|
completionTokens,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Makes an HTTP request and logs the process.
|
|
*
|
|
* @param {RequestInfo} url - The URL to make the request to. Can be a string or a Request object.
|
|
* @param {RequestInit} [init] - Optional init options for the request.
|
|
* @returns {Promise<Response>} - A promise that resolves to the response of the fetch request.
|
|
*/
|
|
async fetch(_url, init) {
|
|
let url = _url;
|
|
if (this.options.directEndpoint) {
|
|
url = this.options.reverseProxyUrl;
|
|
}
|
|
logger.debug(`Making request to ${url}`);
|
|
if (typeof Bun !== 'undefined') {
|
|
return await fetch(url, init);
|
|
}
|
|
return await fetch(url, init);
|
|
}
|
|
|
|
getBuildMessagesOptions() {
|
|
throw new Error('Subclasses must implement getBuildMessagesOptions');
|
|
}
|
|
|
|
async generateTextStream(text, onProgress, options = {}) {
|
|
const stream = new TextStream(text, options);
|
|
await stream.processTextStream(onProgress);
|
|
}
|
|
|
|
/**
|
|
* @returns {[string|undefined, string|undefined]}
|
|
*/
|
|
processOverideIds() {
|
|
/** @type {Record<string, string | undefined>} */
|
|
let { overrideConvoId, overrideUserMessageId } = this.options?.req?.body ?? {};
|
|
if (overrideConvoId) {
|
|
const [conversationId, index] = overrideConvoId.split(Constants.COMMON_DIVIDER);
|
|
overrideConvoId = conversationId;
|
|
if (index !== '0') {
|
|
this.skipSaveConvo = true;
|
|
}
|
|
}
|
|
if (overrideUserMessageId) {
|
|
const [userMessageId, index] = overrideUserMessageId.split(Constants.COMMON_DIVIDER);
|
|
overrideUserMessageId = userMessageId;
|
|
if (index !== '0') {
|
|
this.skipSaveUserMessage = true;
|
|
}
|
|
}
|
|
|
|
return [overrideConvoId, overrideUserMessageId];
|
|
}
|
|
|
|
async setMessageOptions(opts = {}) {
|
|
if (opts && opts.replaceOptions) {
|
|
this.setOptions(opts);
|
|
}
|
|
|
|
const [overrideConvoId, overrideUserMessageId] = this.processOverideIds();
|
|
const { isEdited, isContinued } = opts;
|
|
const user = opts.user ?? null;
|
|
this.user = user;
|
|
const saveOptions = this.getSaveOptions();
|
|
this.abortController = opts.abortController ?? new AbortController();
|
|
const requestConvoId = overrideConvoId ?? opts.conversationId;
|
|
const conversationId = requestConvoId ?? crypto.randomUUID();
|
|
const parentMessageId = opts.parentMessageId ?? Constants.NO_PARENT;
|
|
const userMessageId =
|
|
overrideUserMessageId ?? opts.overrideParentMessageId ?? crypto.randomUUID();
|
|
let responseMessageId = opts.responseMessageId ?? crypto.randomUUID();
|
|
let head = isEdited ? responseMessageId : parentMessageId;
|
|
this.currentMessages = (await this.loadHistory(conversationId, head)) ?? [];
|
|
this.conversationId = conversationId;
|
|
|
|
if (isEdited && !isContinued) {
|
|
responseMessageId = crypto.randomUUID();
|
|
head = responseMessageId;
|
|
this.currentMessages[this.currentMessages.length - 1].messageId = head;
|
|
}
|
|
|
|
if (opts.isRegenerate && responseMessageId.endsWith('_')) {
|
|
responseMessageId = crypto.randomUUID();
|
|
}
|
|
|
|
this.responseMessageId = responseMessageId;
|
|
|
|
return {
|
|
...opts,
|
|
user,
|
|
head,
|
|
saveOptions,
|
|
userMessageId,
|
|
requestConvoId,
|
|
conversationId,
|
|
parentMessageId,
|
|
responseMessageId,
|
|
};
|
|
}
|
|
|
|
createUserMessage({ messageId, parentMessageId, conversationId, text }) {
|
|
return {
|
|
messageId,
|
|
parentMessageId,
|
|
conversationId,
|
|
sender: 'User',
|
|
text,
|
|
isCreatedByUser: true,
|
|
};
|
|
}
|
|
|
|
async handleStartMethods(message, opts) {
|
|
const {
|
|
user,
|
|
head,
|
|
saveOptions,
|
|
userMessageId,
|
|
requestConvoId,
|
|
conversationId,
|
|
parentMessageId,
|
|
responseMessageId,
|
|
} = await this.setMessageOptions(opts);
|
|
|
|
const userMessage = opts.isEdited
|
|
? this.currentMessages[this.currentMessages.length - 2]
|
|
: this.createUserMessage({
|
|
messageId: userMessageId,
|
|
parentMessageId,
|
|
conversationId,
|
|
text: message,
|
|
});
|
|
|
|
if (typeof opts?.getReqData === 'function') {
|
|
opts.getReqData({
|
|
userMessage,
|
|
conversationId,
|
|
responseMessageId,
|
|
sender: this.sender,
|
|
});
|
|
}
|
|
|
|
if (typeof opts?.onStart === 'function') {
|
|
const isNewConvo = !requestConvoId && parentMessageId === Constants.NO_PARENT;
|
|
opts.onStart(userMessage, responseMessageId, isNewConvo);
|
|
}
|
|
|
|
return {
|
|
...opts,
|
|
user,
|
|
head,
|
|
conversationId,
|
|
responseMessageId,
|
|
saveOptions,
|
|
userMessage,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Adds instructions to the messages array. If the instructions object is empty or undefined,
|
|
* the original messages array is returned. Otherwise, the instructions are added to the messages
|
|
* array either at the beginning (default) or preserving the last message at the end.
|
|
*
|
|
* @param {Array} messages - An array of messages.
|
|
* @param {Object} instructions - An object containing instructions to be added to the messages.
|
|
* @param {boolean} [beforeLast=false] - If true, adds instructions before the last message; if false, adds at the beginning.
|
|
* @returns {Array} An array containing messages and instructions, or the original messages if instructions are empty.
|
|
*/
|
|
addInstructions(messages, instructions, beforeLast = false) {
|
|
if (!instructions || Object.keys(instructions).length === 0) {
|
|
return messages;
|
|
}
|
|
|
|
if (!beforeLast) {
|
|
return [instructions, ...messages];
|
|
}
|
|
|
|
// Legacy behavior: add instructions before the last message
|
|
const payload = [];
|
|
if (messages.length > 1) {
|
|
payload.push(...messages.slice(0, -1));
|
|
}
|
|
|
|
payload.push(instructions);
|
|
|
|
if (messages.length > 0) {
|
|
payload.push(messages[messages.length - 1]);
|
|
}
|
|
|
|
return payload;
|
|
}
|
|
|
|
concatenateMessages(messages) {
|
|
return messages.reduce((acc, message) => {
|
|
const nameOrRole = message.name ?? message.role;
|
|
return acc + `${nameOrRole}:\n${message.content}\n\n`;
|
|
}, '');
|
|
}
|
|
|
|
/**
|
|
* This method processes an array of messages and returns a context of messages that fit within a specified token limit.
|
|
* It iterates over the messages from newest to oldest, adding them to the context until the token limit is reached.
|
|
* If the token limit would be exceeded by adding a message, that message is not added to the context and remains in the original array.
|
|
* The method uses `push` and `pop` operations for efficient array manipulation, and reverses the context array at the end to maintain the original order of the messages.
|
|
*
|
|
* @param {Object} params
|
|
* @param {TMessage[]} params.messages - An array of messages, each with a `tokenCount` property. The messages should be ordered from oldest to newest.
|
|
* @param {number} [params.maxContextTokens] - The max number of tokens allowed in the context. If not provided, defaults to `this.maxContextTokens`.
|
|
* @param {{ role: 'system', content: text, tokenCount: number }} [params.instructions] - Instructions already added to the context at index 0.
|
|
* @returns {Promise<{
|
|
* context: TMessage[],
|
|
* remainingContextTokens: number,
|
|
* messagesToRefine: TMessage[],
|
|
* }>} An object with three properties: `context`, `remainingContextTokens`, and `messagesToRefine`.
|
|
* `context` is an array of messages that fit within the token limit.
|
|
* `remainingContextTokens` is the number of tokens remaining within the limit after adding the messages to the context.
|
|
* `messagesToRefine` is an array of messages that were not added to the context because they would have exceeded the token limit.
|
|
*/
|
|
async getMessagesWithinTokenLimit({ messages: _messages, maxContextTokens, instructions }) {
|
|
// Every reply is primed with <|start|>assistant<|message|>, so we
|
|
// start with 3 tokens for the label after all messages have been counted.
|
|
let currentTokenCount = 3;
|
|
const instructionsTokenCount = instructions?.tokenCount ?? 0;
|
|
let remainingContextTokens =
|
|
(maxContextTokens ?? this.maxContextTokens) - instructionsTokenCount;
|
|
const messages = [..._messages];
|
|
|
|
const context = [];
|
|
|
|
if (currentTokenCount < remainingContextTokens) {
|
|
while (messages.length > 0 && currentTokenCount < remainingContextTokens) {
|
|
if (messages.length === 1 && instructions) {
|
|
break;
|
|
}
|
|
const poppedMessage = messages.pop();
|
|
const { tokenCount } = poppedMessage;
|
|
|
|
if (poppedMessage && currentTokenCount + tokenCount <= remainingContextTokens) {
|
|
context.push(poppedMessage);
|
|
currentTokenCount += tokenCount;
|
|
} else {
|
|
messages.push(poppedMessage);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (instructions) {
|
|
context.push(_messages[0]);
|
|
messages.shift();
|
|
}
|
|
|
|
const prunedMemory = messages;
|
|
remainingContextTokens -= currentTokenCount;
|
|
|
|
return {
|
|
context: context.reverse(),
|
|
remainingContextTokens,
|
|
messagesToRefine: prunedMemory,
|
|
};
|
|
}
|
|
|
|
async sendMessage(message, opts = {}) {
|
|
const appConfig = this.options.req?.config;
|
|
/** @type {Promise<TMessage>} */
|
|
let userMessagePromise;
|
|
const { user, head, isEdited, conversationId, responseMessageId, saveOptions, userMessage } =
|
|
await this.handleStartMethods(message, opts);
|
|
|
|
if (opts.progressCallback) {
|
|
opts.onProgress = opts.progressCallback.call(null, {
|
|
...(opts.progressOptions ?? {}),
|
|
parentMessageId: userMessage.messageId,
|
|
messageId: responseMessageId,
|
|
});
|
|
}
|
|
|
|
const { editedContent } = opts;
|
|
|
|
// It's not necessary to push to currentMessages
|
|
// depending on subclass implementation of handling messages
|
|
// When this is an edit, all messages are already in currentMessages, both user and response
|
|
if (isEdited) {
|
|
let latestMessage = this.currentMessages[this.currentMessages.length - 1];
|
|
if (!latestMessage) {
|
|
latestMessage = {
|
|
messageId: responseMessageId,
|
|
conversationId,
|
|
parentMessageId: userMessage.messageId,
|
|
isCreatedByUser: false,
|
|
model: this.modelOptions?.model ?? this.model,
|
|
sender: this.sender,
|
|
};
|
|
this.currentMessages.push(userMessage, latestMessage);
|
|
} else if (editedContent != null) {
|
|
// Handle editedContent for content parts
|
|
if (editedContent && latestMessage.content && Array.isArray(latestMessage.content)) {
|
|
const { index, text, type } = editedContent;
|
|
if (index >= 0 && index < latestMessage.content.length) {
|
|
const contentPart = latestMessage.content[index];
|
|
if (type === ContentTypes.THINK && contentPart.type === ContentTypes.THINK) {
|
|
contentPart[ContentTypes.THINK] = text;
|
|
} else if (type === ContentTypes.TEXT && contentPart.type === ContentTypes.TEXT) {
|
|
contentPart[ContentTypes.TEXT] = text;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
this.continued = true;
|
|
} else {
|
|
this.currentMessages.push(userMessage);
|
|
}
|
|
|
|
/**
|
|
* When the userMessage is pushed to currentMessages, the parentMessage is the userMessageId.
|
|
* this only matters when buildMessages is utilizing the parentMessageId, and may vary on implementation
|
|
*/
|
|
const parentMessageId = isEdited ? head : userMessage.messageId;
|
|
this.parentMessageId = parentMessageId;
|
|
let {
|
|
prompt: payload,
|
|
tokenCountMap,
|
|
promptTokens,
|
|
} = await this.buildMessages(
|
|
this.currentMessages,
|
|
parentMessageId,
|
|
this.getBuildMessagesOptions(opts),
|
|
opts,
|
|
);
|
|
|
|
if (tokenCountMap && tokenCountMap[userMessage.messageId]) {
|
|
userMessage.tokenCount = tokenCountMap[userMessage.messageId];
|
|
logger.debug('[BaseClient] userMessage', {
|
|
messageId: userMessage.messageId,
|
|
tokenCount: userMessage.tokenCount,
|
|
conversationId: userMessage.conversationId,
|
|
});
|
|
}
|
|
|
|
if (!isEdited && !this.skipSaveUserMessage) {
|
|
const reqFiles = this.options.req?.body?.files;
|
|
if (reqFiles && Array.isArray(this.options.attachments)) {
|
|
const files = buildMessageFiles(reqFiles, this.options.attachments);
|
|
if (files.length > 0) {
|
|
userMessage.files = files;
|
|
}
|
|
delete userMessage.image_urls;
|
|
}
|
|
/**
|
|
* Persist the user's manual skill picks onto the user message so the
|
|
* frontend `SkillPills` component can render them in history
|
|
* after reload. UI-only metadata — the runtime skill resolution
|
|
* pipeline reads the top-level `req.body.manualSkills` separately.
|
|
* Filter is defense-in-depth on top of Mongoose schema validation:
|
|
* keeps the DB row free of empty/non-string entries even if a
|
|
* crafted payload slips past schema checks upstream.
|
|
*/
|
|
const rawManualSkills = this.options.req?.body?.manualSkills;
|
|
if (Array.isArray(rawManualSkills) && rawManualSkills.length > 0) {
|
|
const skills = rawManualSkills.filter((s) => typeof s === 'string' && s.length > 0);
|
|
if (skills.length > 0) {
|
|
userMessage.manualSkills = skills;
|
|
}
|
|
}
|
|
/**
|
|
* Persist the names of skills auto-primed this turn via `always-apply`
|
|
* frontmatter so `SkillPills` can render pinned-variant badges
|
|
* on the user bubble that survive reload and history render. Frozen
|
|
* at turn time (not reconstructed from `Skill.alwaysApply` at render
|
|
* time) because the flag is mutable — historical turns must keep
|
|
* their audit trail even if an admin flips `alwaysApply` off later.
|
|
*/
|
|
const alwaysApplySkillPrimes = this.options.agent?.alwaysApplySkillPrimes;
|
|
if (Array.isArray(alwaysApplySkillPrimes) && alwaysApplySkillPrimes.length > 0) {
|
|
const names = alwaysApplySkillPrimes
|
|
.map((p) => p?.name)
|
|
.filter((n) => typeof n === 'string' && n.length > 0);
|
|
if (names.length > 0) {
|
|
userMessage.alwaysAppliedSkills = names;
|
|
}
|
|
}
|
|
userMessagePromise = this.saveMessageToDatabase(userMessage, saveOptions, user).catch(
|
|
(err) => {
|
|
logger.error('[BaseClient] Failed to save user message:', err);
|
|
return {};
|
|
},
|
|
);
|
|
this.savedMessageIds.add(userMessage.messageId);
|
|
if (typeof opts?.getReqData === 'function') {
|
|
opts.getReqData({
|
|
userMessagePromise,
|
|
});
|
|
}
|
|
}
|
|
|
|
const balanceConfig = getBalanceConfig(appConfig);
|
|
if (
|
|
balanceConfig?.enabled &&
|
|
supportsBalanceCheck[this.options.endpointType ?? this.options.endpoint]
|
|
) {
|
|
await checkBalance(
|
|
{
|
|
req: this.options.req,
|
|
res: this.options.res,
|
|
txData: {
|
|
user: this.user,
|
|
tokenType: 'prompt',
|
|
amount: promptTokens,
|
|
endpoint: this.options.endpoint,
|
|
model: this.modelOptions?.model ?? this.model,
|
|
endpointTokenConfig: this.options.endpointTokenConfig,
|
|
},
|
|
},
|
|
{
|
|
logViolation,
|
|
getMultiplier: db.getMultiplier,
|
|
findBalanceByUser: db.findBalanceByUser,
|
|
createAutoRefillTransaction: db.createAutoRefillTransaction,
|
|
balanceConfig,
|
|
upsertBalanceFields: db.upsertBalanceFields,
|
|
},
|
|
);
|
|
}
|
|
|
|
const { completion, metadata } = await this.sendCompletion(payload, opts);
|
|
if (this.abortController) {
|
|
this.abortController.requestCompleted = true;
|
|
}
|
|
|
|
/** @type {TMessage} */
|
|
const responseMessage = {
|
|
messageId: responseMessageId,
|
|
conversationId,
|
|
parentMessageId: userMessage.messageId,
|
|
isCreatedByUser: false,
|
|
isEdited,
|
|
model: this.getResponseModel(),
|
|
sender: this.sender,
|
|
promptTokens,
|
|
iconURL: this.options.iconURL,
|
|
endpoint: this.options.endpoint,
|
|
...(this.metadata ?? {}),
|
|
metadata: Object.keys(metadata ?? {}).length > 0 ? metadata : undefined,
|
|
};
|
|
|
|
if (typeof completion === 'string') {
|
|
responseMessage.text = completion;
|
|
} else if (
|
|
Array.isArray(completion) &&
|
|
(this.clientName === EModelEndpoint.agents ||
|
|
isParamEndpoint(this.options.endpoint, this.options.endpointType))
|
|
) {
|
|
responseMessage.text = '';
|
|
|
|
if (!opts.editedContent || this.currentMessages.length === 0) {
|
|
responseMessage.content = completion;
|
|
} else {
|
|
const latestMessage = this.currentMessages[this.currentMessages.length - 1];
|
|
if (!latestMessage?.content) {
|
|
responseMessage.content = completion;
|
|
} else {
|
|
const existingContent = [...latestMessage.content];
|
|
const { type: editedType } = opts.editedContent;
|
|
responseMessage.content = this.mergeEditedContent(
|
|
existingContent,
|
|
completion,
|
|
editedType,
|
|
);
|
|
}
|
|
}
|
|
} else if (Array.isArray(completion)) {
|
|
responseMessage.text = completion.join('');
|
|
}
|
|
|
|
if (tokenCountMap && this.recordTokenUsage && this.getTokenCountForResponse) {
|
|
let completionTokens;
|
|
|
|
/**
|
|
* Metadata about input/output costs for the current message. The client
|
|
* should provide a function to get the current stream usage metadata; if not,
|
|
* use the legacy token estimations.
|
|
* @type {StreamUsage | null} */
|
|
const usage = this.getStreamUsage != null ? this.getStreamUsage() : null;
|
|
|
|
if (usage != null && Number(usage[this.outputTokensKey]) > 0) {
|
|
responseMessage.tokenCount = usage[this.outputTokensKey];
|
|
completionTokens = responseMessage.tokenCount;
|
|
} else {
|
|
responseMessage.tokenCount = this.getTokenCountForResponse(responseMessage);
|
|
completionTokens = responseMessage.tokenCount;
|
|
await this.recordTokenUsage({
|
|
usage,
|
|
promptTokens,
|
|
completionTokens,
|
|
balance: balanceConfig,
|
|
/** Note: When using agents, responseMessage.model is the agent ID, not the model */
|
|
model: this.model,
|
|
messageId: this.responseMessageId,
|
|
});
|
|
}
|
|
|
|
logger.debug('[BaseClient] Response token usage', {
|
|
messageId: responseMessage.messageId,
|
|
model: responseMessage.model,
|
|
promptTokens,
|
|
completionTokens,
|
|
});
|
|
}
|
|
|
|
if (userMessagePromise) {
|
|
await userMessagePromise;
|
|
}
|
|
|
|
if (
|
|
this.contextMeta?.calibrationRatio > 0 &&
|
|
this.contextMeta.calibrationRatio !== 1 &&
|
|
userMessage.tokenCount > 0
|
|
) {
|
|
const calibrated = Math.round(userMessage.tokenCount * this.contextMeta.calibrationRatio);
|
|
if (calibrated !== userMessage.tokenCount) {
|
|
logger.debug('[BaseClient] Calibrated user message tokenCount', {
|
|
messageId: userMessage.messageId,
|
|
raw: userMessage.tokenCount,
|
|
calibrated,
|
|
ratio: this.contextMeta.calibrationRatio,
|
|
});
|
|
userMessage.tokenCount = calibrated;
|
|
await this.updateMessageInDatabase({
|
|
messageId: userMessage.messageId,
|
|
tokenCount: calibrated,
|
|
});
|
|
}
|
|
}
|
|
|
|
if (this.artifactPromises) {
|
|
responseMessage.attachments = (await Promise.all(this.artifactPromises)).filter((a) => a);
|
|
}
|
|
|
|
if (this.options.attachments) {
|
|
try {
|
|
saveOptions.files = this.options.attachments.map((attachments) => attachments.file_id);
|
|
} catch (error) {
|
|
logger.error('[BaseClient] Error mapping attachments for conversation', error);
|
|
}
|
|
}
|
|
|
|
if (this.contextMeta) {
|
|
responseMessage.contextMeta = this.contextMeta;
|
|
}
|
|
|
|
responseMessage.databasePromise = this.saveMessageToDatabase(
|
|
responseMessage,
|
|
saveOptions,
|
|
user,
|
|
);
|
|
this.savedMessageIds.add(responseMessage.messageId);
|
|
delete responseMessage.tokenCount;
|
|
return responseMessage;
|
|
}
|
|
|
|
async loadHistory(conversationId, parentMessageId = null) {
|
|
logger.debug('[BaseClient] Loading history:', { conversationId, parentMessageId });
|
|
|
|
const messages = (await db.getMessages({ conversationId })) ?? [];
|
|
|
|
if (messages.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
let mapMethod = null;
|
|
if (this.getMessageMapMethod) {
|
|
mapMethod = this.getMessageMapMethod();
|
|
}
|
|
|
|
let _messages = this.constructor.getMessagesForConversation({
|
|
messages,
|
|
parentMessageId,
|
|
mapMethod,
|
|
});
|
|
|
|
_messages = await this.addPreviousAttachments(_messages);
|
|
|
|
if (!this.shouldSummarize) {
|
|
return _messages;
|
|
}
|
|
|
|
for (let i = _messages.length - 1; i >= 0; i--) {
|
|
const msg = _messages[i];
|
|
if (!msg) {
|
|
continue;
|
|
}
|
|
|
|
const summaryBlock = BaseClient.findSummaryContentBlock(msg);
|
|
if (summaryBlock) {
|
|
this.previous_summary = {
|
|
...msg,
|
|
summary: BaseClient.getSummaryText(summaryBlock),
|
|
summaryTokenCount: summaryBlock.tokenCount,
|
|
};
|
|
break;
|
|
}
|
|
|
|
if (msg.summary) {
|
|
this.previous_summary = msg;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (this.previous_summary) {
|
|
const { messageId, summary, tokenCount, summaryTokenCount } = this.previous_summary;
|
|
logger.debug('[BaseClient] Previous summary:', {
|
|
messageId,
|
|
summary,
|
|
tokenCount,
|
|
summaryTokenCount,
|
|
});
|
|
}
|
|
|
|
return _messages;
|
|
}
|
|
|
|
/**
|
|
* Save a message to the database.
|
|
* @param {TMessage} message
|
|
* @param {Partial<TConversation>} endpointOptions
|
|
* @param {string | null} user
|
|
*/
|
|
async saveMessageToDatabase(message, endpointOptions, user = null) {
|
|
// Snapshot options before any await; disposeClient may set client.options = null
|
|
// while this method is suspended at an I/O boundary, but the local reference
|
|
// remains valid (disposeClient nulls the property, not the object itself).
|
|
const options = this.options;
|
|
if (!options) {
|
|
logger.error('[BaseClient] saveMessageToDatabase: client disposed before save, skipping');
|
|
return {};
|
|
}
|
|
|
|
if (this.user && user !== this.user) {
|
|
throw new Error('User mismatch.');
|
|
}
|
|
|
|
const hasAddedConvo = options?.req?.body?.addedConvo != null;
|
|
const reqCtx = {
|
|
userId: options?.req?.user?.id,
|
|
isTemporary: options?.req?.body?.isTemporary,
|
|
interfaceConfig: options?.req?.config?.interfaceConfig,
|
|
};
|
|
const savedMessage = await db.saveMessage(
|
|
reqCtx,
|
|
{
|
|
...message,
|
|
endpoint: options.endpoint,
|
|
unfinished: false,
|
|
user,
|
|
...(hasAddedConvo && { addedConvo: true }),
|
|
},
|
|
{ context: 'api/app/clients/BaseClient.js - saveMessageToDatabase #saveMessage' },
|
|
);
|
|
|
|
if (this.skipSaveConvo) {
|
|
return { message: savedMessage };
|
|
}
|
|
|
|
const fieldsToKeep = {
|
|
conversationId: message.conversationId,
|
|
endpoint: options.endpoint,
|
|
endpointType: options.endpointType,
|
|
...endpointOptions,
|
|
};
|
|
const conversationCreatedAt = options?.req?.conversationCreatedAt;
|
|
const createdAtOnInsert =
|
|
conversationCreatedAt != null ? new Date(conversationCreatedAt) : undefined;
|
|
const validCreatedAtOnInsert =
|
|
createdAtOnInsert && !Number.isNaN(createdAtOnInsert.getTime())
|
|
? createdAtOnInsert
|
|
: undefined;
|
|
|
|
const req = options?.req;
|
|
const skippedExistingConvoLookup = this.fetchedConvo === true;
|
|
const hasResolvedConversation =
|
|
req != null && Object.prototype.hasOwnProperty.call(req, 'resolvedConversation');
|
|
let existingConvo = null;
|
|
if (!skippedExistingConvoLookup && hasResolvedConversation) {
|
|
existingConvo = req.resolvedConversation;
|
|
} else if (!skippedExistingConvoLookup) {
|
|
existingConvo = await db.getConvo(req?.user?.id, message.conversationId);
|
|
}
|
|
if (hasResolvedConversation) {
|
|
delete req.resolvedConversation;
|
|
}
|
|
const shouldSetCreatedAtOnInsert = !skippedExistingConvoLookup && existingConvo == null;
|
|
|
|
const unsetFields = {};
|
|
const exceptions = new Set(['spec', 'iconURL']);
|
|
const hasNonEphemeralAgent =
|
|
isAgentsEndpoint(options.endpoint) &&
|
|
endpointOptions?.agent_id &&
|
|
!isEphemeralAgentId(endpointOptions.agent_id);
|
|
if (hasNonEphemeralAgent) {
|
|
exceptions.add('model');
|
|
}
|
|
if (existingConvo != null) {
|
|
this.fetchedConvo = true;
|
|
for (const key in existingConvo) {
|
|
if (!key) {
|
|
continue;
|
|
}
|
|
if (excludedKeys.has(key) && !exceptions.has(key)) {
|
|
continue;
|
|
}
|
|
|
|
if (endpointOptions?.[key] === undefined) {
|
|
unsetFields[key] = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
const conversation = await db.saveConvo(reqCtx, fieldsToKeep, {
|
|
context: 'api/app/clients/BaseClient.js - saveMessageToDatabase #saveConvo',
|
|
unsetFields,
|
|
createdAtOnInsert: shouldSetCreatedAtOnInsert ? validCreatedAtOnInsert : undefined,
|
|
});
|
|
|
|
return { message: savedMessage, conversation };
|
|
}
|
|
|
|
/**
|
|
* Update a message in the database.
|
|
* @param {Partial<TMessage>} message
|
|
*/
|
|
async updateMessageInDatabase(message) {
|
|
await db.updateMessage(this.options?.req?.user?.id, message);
|
|
}
|
|
|
|
/** Extracts text from a summary block (handles both legacy `text` field and new `content` array format). */
|
|
static getSummaryText(summaryBlock) {
|
|
if (Array.isArray(summaryBlock.content)) {
|
|
return summaryBlock.content.map((b) => b.text ?? '').join('');
|
|
}
|
|
if (typeof summaryBlock.content === 'string') {
|
|
return summaryBlock.content;
|
|
}
|
|
return summaryBlock.text ?? '';
|
|
}
|
|
|
|
/** Finds the last summary content block in a message's content array (last-summary-wins). */
|
|
static findSummaryContentBlock(message) {
|
|
if (!Array.isArray(message?.content)) {
|
|
return null;
|
|
}
|
|
let lastSummary = null;
|
|
for (const part of message.content) {
|
|
if (
|
|
part?.type === ContentTypes.SUMMARY &&
|
|
BaseClient.getSummaryText(part).trim().length > 0
|
|
) {
|
|
lastSummary = part;
|
|
}
|
|
}
|
|
return lastSummary;
|
|
}
|
|
|
|
/**
|
|
* Iterate through messages, building an array based on the parentMessageId.
|
|
*
|
|
* This function constructs a conversation thread by traversing messages from a given parentMessageId up to the root message.
|
|
* It handles cyclic references by ensuring that a message is not processed more than once.
|
|
* If the 'summary' option is set to true and a message has a 'summary' property:
|
|
* - The message's 'role' is set to 'system'.
|
|
* - The message's 'text' is set to its 'summary'.
|
|
* - If the message has a 'summaryTokenCount', the message's 'tokenCount' is set to 'summaryTokenCount'.
|
|
* The traversal stops at the message with the 'summary' property.
|
|
*
|
|
* Each message object should have an 'id' or 'messageId' property and may have a 'parentMessageId' property.
|
|
* The 'parentMessageId' is the ID of the message that the current message is a reply to.
|
|
* If 'parentMessageId' is not present, null, or is Constants.NO_PARENT,
|
|
* the message is considered a root message.
|
|
*
|
|
* @param {Object} options - The options for the function.
|
|
* @param {TMessage[]} options.messages - An array of message objects. Each object should have either an 'id' or 'messageId' property, and may have a 'parentMessageId' property.
|
|
* @param {string} options.parentMessageId - The ID of the parent message to start the traversal from.
|
|
* @param {Function} [options.mapMethod] - An optional function to map over the ordered messages. Applied conditionally based on mapCondition.
|
|
* @param {(message: TMessage) => boolean} [options.mapCondition] - An optional function to determine whether mapMethod should be applied to a given message. If not provided and mapMethod is set, mapMethod applies to all messages.
|
|
* @param {boolean} [options.summary=false] - If set to true, the traversal modifies messages with 'summary' and 'summaryTokenCount' properties and stops at the message with a 'summary' property.
|
|
* @returns {TMessage[]} An array containing the messages in the order they should be displayed, starting with the most recent message with a 'summary' property if the 'summary' option is true, and ending with the message identified by 'parentMessageId'.
|
|
*/
|
|
static getMessagesForConversation({
|
|
messages,
|
|
parentMessageId,
|
|
mapMethod = null,
|
|
mapCondition = null,
|
|
summary = false,
|
|
}) {
|
|
if (!messages || messages.length === 0) {
|
|
return [];
|
|
}
|
|
|
|
const orderedMessages = [];
|
|
let currentMessageId = parentMessageId;
|
|
const visitedMessageIds = new Set();
|
|
|
|
while (currentMessageId) {
|
|
if (visitedMessageIds.has(currentMessageId)) {
|
|
break;
|
|
}
|
|
const message = messages.find((msg) => {
|
|
const messageId = msg.messageId ?? msg.id;
|
|
return messageId === currentMessageId;
|
|
});
|
|
|
|
visitedMessageIds.add(currentMessageId);
|
|
|
|
if (!message) {
|
|
break;
|
|
}
|
|
|
|
let resolved = message;
|
|
let hasSummary = false;
|
|
if (summary) {
|
|
const summaryBlock = BaseClient.findSummaryContentBlock(message);
|
|
if (summaryBlock) {
|
|
const summaryText = BaseClient.getSummaryText(summaryBlock);
|
|
resolved = {
|
|
...message,
|
|
role: 'system',
|
|
content: [{ type: ContentTypes.TEXT, text: summaryText }],
|
|
tokenCount: summaryBlock.tokenCount,
|
|
};
|
|
hasSummary = true;
|
|
} else if (message.summary) {
|
|
resolved = {
|
|
...message,
|
|
role: 'system',
|
|
content: [{ type: ContentTypes.TEXT, text: message.summary }],
|
|
tokenCount: message.summaryTokenCount ?? message.tokenCount,
|
|
};
|
|
hasSummary = true;
|
|
}
|
|
}
|
|
|
|
const shouldMap = mapMethod != null && (mapCondition != null ? mapCondition(resolved) : true);
|
|
const processedMessage = shouldMap ? mapMethod(resolved) : resolved;
|
|
orderedMessages.push(processedMessage);
|
|
|
|
if (hasSummary) {
|
|
break;
|
|
}
|
|
|
|
currentMessageId =
|
|
message.parentMessageId === Constants.NO_PARENT ? null : message.parentMessageId;
|
|
}
|
|
|
|
orderedMessages.reverse();
|
|
return orderedMessages;
|
|
}
|
|
|
|
/**
|
|
* Algorithm adapted from "6. Counting tokens for chat API calls" of
|
|
* https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
|
|
*
|
|
* An additional 3 tokens need to be added for assistant label priming after all messages have been counted.
|
|
* In our implementation, this is accounted for in the getMessagesWithinTokenLimit method.
|
|
*
|
|
* The content parts example was adapted from the following example:
|
|
* https://github.com/openai/openai-cookbook/pull/881/files
|
|
*
|
|
* Note: image token calculation is to be done elsewhere where we have access to the image metadata
|
|
*
|
|
* @param {Object} message
|
|
*/
|
|
getTokenCountForMessage(message) {
|
|
// Note: gpt-3.5-turbo and gpt-4 may update over time. Use default for these as well as for unknown models
|
|
let tokensPerMessage = 3;
|
|
let tokensPerName = 1;
|
|
const model = this.modelOptions?.model ?? this.model;
|
|
|
|
if (model === 'gpt-3.5-turbo-0301') {
|
|
tokensPerMessage = 4;
|
|
tokensPerName = -1;
|
|
}
|
|
|
|
const processValue = (value) => {
|
|
if (Array.isArray(value)) {
|
|
for (let item of value) {
|
|
if (
|
|
!item ||
|
|
!item.type ||
|
|
item.type === ContentTypes.THINK ||
|
|
item.type === ContentTypes.ERROR ||
|
|
item.type === ContentTypes.IMAGE_URL
|
|
) {
|
|
continue;
|
|
}
|
|
|
|
if (item.type === ContentTypes.TOOL_CALL && item.tool_call != null) {
|
|
const toolName = item.tool_call?.name || '';
|
|
if (toolName != null && toolName && typeof toolName === 'string') {
|
|
numTokens += this.getTokenCount(toolName);
|
|
}
|
|
|
|
const args = item.tool_call?.args || '';
|
|
if (args != null && args && typeof args === 'string') {
|
|
numTokens += this.getTokenCount(args);
|
|
}
|
|
|
|
const output = item.tool_call?.output || '';
|
|
if (output != null && output && typeof output === 'string') {
|
|
numTokens += this.getTokenCount(output);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
const nestedValue = item[item.type];
|
|
|
|
if (!nestedValue) {
|
|
continue;
|
|
}
|
|
|
|
processValue(nestedValue);
|
|
}
|
|
} else if (typeof value === 'string') {
|
|
numTokens += this.getTokenCount(value);
|
|
} else if (typeof value === 'number') {
|
|
numTokens += this.getTokenCount(value.toString());
|
|
} else if (typeof value === 'boolean') {
|
|
numTokens += this.getTokenCount(value.toString());
|
|
}
|
|
};
|
|
|
|
let numTokens = tokensPerMessage;
|
|
for (let [key, value] of Object.entries(message)) {
|
|
processValue(value);
|
|
|
|
if (key === 'name') {
|
|
numTokens += tokensPerName;
|
|
}
|
|
}
|
|
return numTokens;
|
|
}
|
|
|
|
/**
|
|
* Merges completion content with existing content when editing TEXT or THINK types
|
|
* @param {Array} existingContent - The existing content array
|
|
* @param {Array} newCompletion - The new completion content
|
|
* @param {string} editedType - The type of content being edited
|
|
* @returns {Array} The merged content array
|
|
*/
|
|
mergeEditedContent(existingContent, newCompletion, editedType) {
|
|
if (!newCompletion.length) {
|
|
return existingContent.concat(newCompletion);
|
|
}
|
|
|
|
if (editedType !== ContentTypes.TEXT && editedType !== ContentTypes.THINK) {
|
|
return existingContent.concat(newCompletion);
|
|
}
|
|
|
|
const lastIndex = existingContent.length - 1;
|
|
const lastExisting = existingContent[lastIndex];
|
|
const firstNew = newCompletion[0];
|
|
|
|
if (lastExisting?.type !== firstNew?.type || firstNew?.type !== editedType) {
|
|
return existingContent.concat(newCompletion);
|
|
}
|
|
|
|
const mergedContent = [...existingContent];
|
|
if (editedType === ContentTypes.TEXT) {
|
|
mergedContent[lastIndex] = {
|
|
...mergedContent[lastIndex],
|
|
[ContentTypes.TEXT]:
|
|
(mergedContent[lastIndex][ContentTypes.TEXT] || '') + (firstNew[ContentTypes.TEXT] || ''),
|
|
};
|
|
} else {
|
|
mergedContent[lastIndex] = {
|
|
...mergedContent[lastIndex],
|
|
[ContentTypes.THINK]:
|
|
(mergedContent[lastIndex][ContentTypes.THINK] || '') +
|
|
(firstNew[ContentTypes.THINK] || ''),
|
|
};
|
|
}
|
|
|
|
// Add remaining completion items
|
|
return mergedContent.concat(newCompletion.slice(1));
|
|
}
|
|
|
|
async sendPayload(payload, opts = {}) {
|
|
if (opts && typeof opts === 'object') {
|
|
this.setOptions(opts);
|
|
}
|
|
|
|
return await this.sendCompletion(payload, opts);
|
|
}
|
|
|
|
async addDocuments(message, attachments) {
|
|
const documentResult = await encodeAndFormatDocuments(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent?.provider ?? this.options.endpoint,
|
|
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
|
|
useResponsesApi: this.options.agent?.model_parameters?.useResponsesApi,
|
|
model: this.modelOptions?.model ?? this.model,
|
|
},
|
|
getStrategyFunctions,
|
|
);
|
|
message.documents =
|
|
documentResult.documents && documentResult.documents.length
|
|
? documentResult.documents
|
|
: undefined;
|
|
return documentResult.files;
|
|
}
|
|
|
|
async addVideos(message, attachments) {
|
|
const videoResult = await encodeAndFormatVideos(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent?.provider ?? this.options.endpoint,
|
|
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
|
|
},
|
|
getStrategyFunctions,
|
|
);
|
|
message.videos =
|
|
videoResult.videos && videoResult.videos.length ? videoResult.videos : undefined;
|
|
return videoResult.files;
|
|
}
|
|
|
|
async addAudios(message, attachments) {
|
|
const audioResult = await encodeAndFormatAudios(
|
|
this.options.req,
|
|
attachments,
|
|
{
|
|
provider: this.options.agent?.provider ?? this.options.endpoint,
|
|
endpoint: this.options.agent?.endpoint ?? this.options.endpoint,
|
|
},
|
|
getStrategyFunctions,
|
|
);
|
|
message.audios =
|
|
audioResult.audios && audioResult.audios.length ? audioResult.audios : undefined;
|
|
return audioResult.files;
|
|
}
|
|
|
|
/**
|
|
* Extracts text context from attachments and sets it on the message.
|
|
* This handles text that was already extracted from files (OCR, transcriptions, document text, etc.)
|
|
* @param {TMessage} message - The message to add context to
|
|
* @param {MongoFile[]} attachments - Array of file attachments
|
|
* @returns {Promise<void>}
|
|
*/
|
|
async addFileContextToMessage(message, attachments) {
|
|
const fileContext = await extractFileContext({
|
|
attachments,
|
|
req: this.options?.req,
|
|
tokenCountFn: (text) => countTokens(text),
|
|
});
|
|
|
|
if (fileContext) {
|
|
message.fileContext = fileContext;
|
|
}
|
|
}
|
|
|
|
async processAttachments(message, attachments) {
|
|
const categorizedAttachments = {
|
|
images: [],
|
|
videos: [],
|
|
audios: [],
|
|
documents: [],
|
|
};
|
|
|
|
const allFiles = [];
|
|
|
|
const provider = this.options.agent?.provider ?? this.options.endpoint;
|
|
const isBedrock = provider === EModelEndpoint.bedrock;
|
|
|
|
if (!this._mergedFileConfig && this.options.req?.config?.fileConfig) {
|
|
this._mergedFileConfig = mergeFileConfig(this.options.req.config.fileConfig);
|
|
const endpoint = this.options.agent?.endpoint ?? this.options.endpoint;
|
|
this._endpointFileConfig = getEndpointFileConfig({
|
|
fileConfig: this._mergedFileConfig,
|
|
endpoint,
|
|
endpointType: this.options.endpointType,
|
|
});
|
|
}
|
|
|
|
for (const file of attachments) {
|
|
/** @type {FileSources} */
|
|
const source = file.source ?? FileSources.local;
|
|
if (source === FileSources.text) {
|
|
allFiles.push(file);
|
|
continue;
|
|
}
|
|
if (
|
|
file.embedded === true ||
|
|
file.metadata?.codeEnvRef != null ||
|
|
file.metadata?.fileIdentifier != null
|
|
) {
|
|
allFiles.push(file);
|
|
continue;
|
|
}
|
|
|
|
if (file.type.startsWith('image/')) {
|
|
categorizedAttachments.images.push(file);
|
|
} else if (file.type === 'application/pdf') {
|
|
categorizedAttachments.documents.push(file);
|
|
allFiles.push(file);
|
|
} else if (isBedrock && isBedrockDocumentType(file.type)) {
|
|
categorizedAttachments.documents.push(file);
|
|
allFiles.push(file);
|
|
} else if (file.type.startsWith('video/')) {
|
|
categorizedAttachments.videos.push(file);
|
|
allFiles.push(file);
|
|
} else if (file.type.startsWith('audio/')) {
|
|
categorizedAttachments.audios.push(file);
|
|
allFiles.push(file);
|
|
} else if (
|
|
file.type &&
|
|
this._mergedFileConfig &&
|
|
this._endpointFileConfig?.supportedMimeTypes &&
|
|
this._mergedFileConfig.checkType(file.type, this._endpointFileConfig.supportedMimeTypes)
|
|
) {
|
|
categorizedAttachments.documents.push(file);
|
|
allFiles.push(file);
|
|
}
|
|
}
|
|
|
|
const [imageFiles] = await Promise.all([
|
|
categorizedAttachments.images.length > 0
|
|
? this.addImageURLs(message, categorizedAttachments.images)
|
|
: Promise.resolve([]),
|
|
categorizedAttachments.documents.length > 0
|
|
? this.addDocuments(message, categorizedAttachments.documents)
|
|
: Promise.resolve([]),
|
|
categorizedAttachments.videos.length > 0
|
|
? this.addVideos(message, categorizedAttachments.videos)
|
|
: Promise.resolve([]),
|
|
categorizedAttachments.audios.length > 0
|
|
? this.addAudios(message, categorizedAttachments.audios)
|
|
: Promise.resolve([]),
|
|
]);
|
|
|
|
allFiles.push(...imageFiles);
|
|
|
|
const seenFileIds = new Set();
|
|
const uniqueFiles = [];
|
|
|
|
for (const file of allFiles) {
|
|
if (file.file_id && !seenFileIds.has(file.file_id)) {
|
|
seenFileIds.add(file.file_id);
|
|
uniqueFiles.push(file);
|
|
} else if (!file.file_id) {
|
|
uniqueFiles.push(file);
|
|
}
|
|
}
|
|
|
|
return uniqueFiles;
|
|
}
|
|
|
|
/**
|
|
* @param {TMessage[]} _messages
|
|
* @returns {Promise<TMessage[]>}
|
|
*/
|
|
async addPreviousAttachments(_messages) {
|
|
if (!this.options.resendFiles) {
|
|
return _messages;
|
|
}
|
|
|
|
const seen = new Set();
|
|
const attachmentsProcessed =
|
|
this.options.attachments && !(this.options.attachments instanceof Promise);
|
|
if (attachmentsProcessed) {
|
|
for (const attachment of this.options.attachments) {
|
|
seen.add(attachment.file_id);
|
|
}
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param {TMessage} message
|
|
*/
|
|
const processMessage = async (message) => {
|
|
if (!this.message_file_map) {
|
|
/** @type {Record<string, MongoFile[]> */
|
|
this.message_file_map = {};
|
|
}
|
|
|
|
const fileIds = [];
|
|
for (const file of message.files) {
|
|
if (seen.has(file.file_id)) {
|
|
continue;
|
|
}
|
|
fileIds.push(file.file_id);
|
|
seen.add(file.file_id);
|
|
}
|
|
|
|
if (fileIds.length === 0) {
|
|
return message;
|
|
}
|
|
|
|
const files = await db.getFiles(
|
|
{
|
|
file_id: { $in: fileIds },
|
|
},
|
|
{},
|
|
{},
|
|
);
|
|
|
|
await this.addFileContextToMessage(message, files);
|
|
await this.processAttachments(message, files);
|
|
|
|
this.message_file_map[message.messageId] = files;
|
|
return message;
|
|
};
|
|
|
|
const promises = [];
|
|
|
|
for (const message of _messages) {
|
|
if (!message.files) {
|
|
promises.push(message);
|
|
continue;
|
|
}
|
|
|
|
promises.push(processMessage(message));
|
|
}
|
|
|
|
const messages = await Promise.all(promises);
|
|
|
|
this.checkVisionRequest(Object.values(this.message_file_map ?? {}).flat());
|
|
return messages;
|
|
}
|
|
}
|
|
|
|
module.exports = BaseClient;
|