mirror of
https://github.com/danny-avila/LibreChat.git
synced 2026-07-03 12:54:01 +00:00
🎣 fix: Surface Resumable Stream Start Errors (#14072)
* fix: surface resumable stream start errors * style: format stream start error check * fix: handle crlf stream start errors * fix: parse only stream error event data
This commit is contained in:
parent
8f0756ed9e
commit
f1ea4159af
2 changed files with 179 additions and 19 deletions
|
|
@ -999,6 +999,92 @@ describe('useResumableSSE - 404 error path', () => {
|
|||
unmount();
|
||||
});
|
||||
|
||||
it('surfaces SSE error bodies returned while starting generation', async () => {
|
||||
(request.post as jest.Mock).mockResolvedValueOnce(
|
||||
'event: error\ndata: {"text":"No model spec selected"}\n\n',
|
||||
);
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
||||
const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(mockSetSubmission).toHaveBeenCalledWith(null);
|
||||
});
|
||||
|
||||
expect(mockSSEInstances).toHaveLength(0);
|
||||
expect(mockErrorHandler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
data: {
|
||||
text: 'No model spec selected',
|
||||
metadata: { streamStartFailed: true },
|
||||
},
|
||||
submission,
|
||||
}),
|
||||
);
|
||||
expect(mockSetIsSubmitting).toHaveBeenCalledWith(false);
|
||||
expect(mockSetShowStopButton).toHaveBeenCalledWith(false);
|
||||
unmount();
|
||||
});
|
||||
|
||||
it('surfaces CRLF SSE error bodies returned while starting generation', async () => {
|
||||
(request.post as jest.Mock).mockResolvedValueOnce(
|
||||
'event: error\r\ndata: {"text":"No model spec selected"}\r\n\r\n',
|
||||
);
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
||||
const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(mockSetSubmission).toHaveBeenCalledWith(null);
|
||||
});
|
||||
|
||||
expect(mockErrorHandler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
data: {
|
||||
text: 'No model spec selected',
|
||||
metadata: { streamStartFailed: true },
|
||||
},
|
||||
submission,
|
||||
}),
|
||||
);
|
||||
unmount();
|
||||
});
|
||||
|
||||
it('uses only the error event data from multi-event SSE start failures', async () => {
|
||||
(request.post as jest.Mock).mockResolvedValueOnce(
|
||||
[
|
||||
'event: message',
|
||||
'data: {"created":true,"message":{"messageId":"msg-1"}}',
|
||||
'',
|
||||
'event: error',
|
||||
'data: {"text":"Request was blocked"}',
|
||||
'',
|
||||
'',
|
||||
].join('\n'),
|
||||
);
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
||||
const { unmount } = renderHook(() => useResumableSSE(submission, chatHelpers));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(mockSetSubmission).toHaveBeenCalledWith(null);
|
||||
});
|
||||
|
||||
expect(mockErrorHandler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
data: {
|
||||
text: 'Request was blocked',
|
||||
metadata: { streamStartFailed: true },
|
||||
},
|
||||
submission,
|
||||
}),
|
||||
);
|
||||
unmount();
|
||||
});
|
||||
|
||||
it('replays title events from resume state sync', async () => {
|
||||
const submission = buildSubmission();
|
||||
const chatHelpers = buildChatHelpers();
|
||||
|
|
|
|||
|
|
@ -54,14 +54,6 @@ type ChatHelpers = Pick<
|
|||
'setMessages' | 'getMessages' | 'setConversation' | 'setIsSubmitting' | 'newConversation'
|
||||
>;
|
||||
|
||||
const getStreamStartFailureData = (errorData?: Record<string, unknown>): TResData =>
|
||||
({
|
||||
text: errorData
|
||||
? JSON.stringify(errorData)
|
||||
: 'Error connecting to server, try refreshing the page.',
|
||||
metadata: markStreamStartFailedMetadata(),
|
||||
}) as unknown as TResData;
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
const START_GENERATION_NETWORK_RETRIES = 3;
|
||||
const START_GENERATION_READINESS_TIMEOUT_MS = 120000;
|
||||
|
|
@ -71,9 +63,7 @@ type StartGenerationError = {
|
|||
code?: string;
|
||||
response?: {
|
||||
status?: number;
|
||||
data?: {
|
||||
code?: string;
|
||||
};
|
||||
data?: unknown;
|
||||
headers?: Record<string, string | number | string[] | undefined>;
|
||||
};
|
||||
};
|
||||
|
|
@ -81,6 +71,82 @@ type StartGenerationError = {
|
|||
const toStartGenerationError = (error: unknown): StartGenerationError | undefined =>
|
||||
error != null && typeof error === 'object' ? (error as StartGenerationError) : undefined;
|
||||
|
||||
const getStartGenerationStreamId = (data: unknown): string | null => {
|
||||
if (data == null || typeof data !== 'object' || !('streamId' in data)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const streamId = (data as { streamId?: unknown }).streamId;
|
||||
return typeof streamId === 'string' && streamId.length > 0 ? streamId : null;
|
||||
};
|
||||
|
||||
const parseSSEErrorData = (body: string): unknown | null => {
|
||||
const blocks = body.split(/\r?\n\r?\n/);
|
||||
for (const block of blocks) {
|
||||
const lines = block.split(/\r?\n/);
|
||||
const event = lines
|
||||
.find((line) => line.startsWith('event:'))
|
||||
?.slice('event:'.length)
|
||||
.trim();
|
||||
if (event !== 'error') {
|
||||
continue;
|
||||
}
|
||||
|
||||
const data = lines
|
||||
.filter((line) => line.startsWith('data:'))
|
||||
.map((line) => line.slice('data:'.length).trimStart())
|
||||
.join('\n')
|
||||
.trim();
|
||||
|
||||
if (!data) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.parse(data);
|
||||
} catch {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
};
|
||||
|
||||
const getSSEErrorText = (payload: unknown): string | null => {
|
||||
if (typeof payload === 'string') {
|
||||
return payload;
|
||||
}
|
||||
|
||||
if (payload == null || typeof payload !== 'object') {
|
||||
return null;
|
||||
}
|
||||
|
||||
const record = payload as Record<string, unknown>;
|
||||
const text = record.text ?? record.message ?? record.error;
|
||||
return typeof text === 'string' && text.length > 0 ? text : null;
|
||||
};
|
||||
|
||||
const getStreamStartFailureText = (errorData?: unknown): string => {
|
||||
if (typeof errorData === 'string') {
|
||||
const sseErrorData = parseSSEErrorData(errorData);
|
||||
if (sseErrorData != null) {
|
||||
return getSSEErrorText(sseErrorData) ?? JSON.stringify(sseErrorData);
|
||||
}
|
||||
|
||||
return errorData || 'Error connecting to server, try refreshing the page.';
|
||||
}
|
||||
|
||||
return errorData
|
||||
? JSON.stringify(errorData)
|
||||
: 'Error connecting to server, try refreshing the page.';
|
||||
};
|
||||
|
||||
const getStreamStartFailureData = (errorData?: unknown): TResData =>
|
||||
({
|
||||
text: getStreamStartFailureText(errorData),
|
||||
metadata: markStreamStartFailedMetadata(),
|
||||
}) as unknown as TResData;
|
||||
|
||||
const isRetryableNetworkError = (error: unknown) => {
|
||||
if (!(error instanceof Error)) {
|
||||
return false;
|
||||
|
|
@ -92,9 +158,12 @@ const isRetryableNetworkError = (error: unknown) => {
|
|||
|
||||
const isServerNotReadyError = (error: unknown) => {
|
||||
const candidate = toStartGenerationError(error);
|
||||
return (
|
||||
candidate?.response?.status === 503 && candidate.response?.data?.code === SERVER_NOT_READY_CODE
|
||||
);
|
||||
const data = candidate?.response?.data;
|
||||
const code =
|
||||
data != null && typeof data === 'object' && 'code' in data
|
||||
? (data as { code?: unknown }).code
|
||||
: undefined;
|
||||
return candidate?.response?.status === 503 && code === SERVER_NOT_READY_CODE;
|
||||
};
|
||||
|
||||
const getRetryAfterDelay = (error: unknown, fallbackDelay: number) => {
|
||||
|
|
@ -1216,12 +1285,18 @@ export default function useResumableSSE(
|
|||
requestAttempts += 1;
|
||||
try {
|
||||
// Use request.post which handles auth token refresh via axios interceptors
|
||||
const data = (await request.post(url, payload)) as { streamId: string };
|
||||
const data = await request.post(url, payload);
|
||||
if (signal?.aborted) {
|
||||
return null;
|
||||
}
|
||||
logger.log('ResumableSSE', 'Generation started:', { streamId: data.streamId });
|
||||
return data.streamId;
|
||||
const streamId = getStartGenerationStreamId(data);
|
||||
if (streamId) {
|
||||
logger.log('ResumableSSE', 'Generation started:', { streamId });
|
||||
return streamId;
|
||||
}
|
||||
|
||||
lastError = { response: { data } };
|
||||
break;
|
||||
} catch (error) {
|
||||
if (signal?.aborted) {
|
||||
return null;
|
||||
|
|
@ -1269,8 +1344,7 @@ export default function useResumableSSE(
|
|||
|
||||
logger.error('ResumableSSE', 'Error starting generation:', lastError);
|
||||
|
||||
const axiosError = lastError as { response?: { data?: Record<string, unknown> } };
|
||||
const errorData = axiosError?.response?.data;
|
||||
const errorData = toStartGenerationError(lastError)?.response?.data;
|
||||
errorHandler({
|
||||
data: getStreamStartFailureData(errorData),
|
||||
submission: currentSubmission as EventSubmission,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue