Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 21 additions & 32 deletions src/lib/BetaMessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,83 +603,72 @@ export class BetaMessageStream<ParsedT = null> implements AsyncIterable<BetaMess

return snapshot;
case 'content_block_start':
snapshot.content.push(event.content_block);
snapshot.content.push({ ...event.content_block });
return snapshot;
case 'content_block_delta': {
const snapshotContent = snapshot.content.at(event.index);

switch (event.delta.type) {
case 'text_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
text: (snapshotContent.text || '') + event.delta.text,
};
snapshotContent.text = (snapshotContent.text || '') + event.delta.text;
}
break;
}
case 'citations_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
citations: [...(snapshotContent.citations ?? []), event.delta.citation],
};
if (!snapshotContent.citations) {
snapshotContent.citations = [];
}
snapshotContent.citations.push(event.delta.citation);
}
break;
}
case 'input_json_delta': {
if (snapshotContent && tracksToolInput(snapshotContent)) {
// we need to keep track of the raw JSON string as well so that we can
// re-parse it for each delta, for now we just store it as an untyped
// non-enumerable property on the snapshot
// Track the raw JSON string as a non-enumerable property so it's
// hidden from JSON.stringify/serialization but available for re-parsing
let jsonBuf = (snapshotContent as any)[JSON_BUF_PROPERTY] || '';
jsonBuf += event.delta.partial_json;

const newContent = { ...snapshotContent };
Object.defineProperty(newContent, JSON_BUF_PROPERTY, {
value: jsonBuf,
enumerable: false,
writable: true,
});
if (!(JSON_BUF_PROPERTY in snapshotContent)) {
Object.defineProperty(snapshotContent, JSON_BUF_PROPERTY, {
value: jsonBuf,
enumerable: false,
writable: true,
});
} else {
(snapshotContent as any)[JSON_BUF_PROPERTY] = jsonBuf;
}

if (jsonBuf) {
try {
newContent.input = partialParse(jsonBuf);
(snapshotContent as any).input = partialParse(jsonBuf);
} catch (err) {
const error = new AnthropicError(
`Unable to parse tool parameter JSON from model. Please retry your request or adjust your prompt. Error: ${err}. JSON: ${jsonBuf}`,
);
this.#handleError(error);
}
}
snapshot.content[event.index] = newContent;
}
break;
}
case 'thinking_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
thinking: snapshotContent.thinking + event.delta.thinking,
};
snapshotContent.thinking = snapshotContent.thinking + event.delta.thinking;
}
break;
}
case 'signature_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
signature: event.delta.signature,
};
snapshotContent.signature = event.delta.signature;
}
break;
}
case 'compaction_delta': {
if (snapshotContent?.type === 'compaction') {
snapshot.content[event.index] = {
...snapshotContent,
content: (snapshotContent.content || '') + event.delta.content,
};
snapshotContent.content = (snapshotContent.content || '') + event.delta.content;
}
break;
}
Expand Down
46 changes: 19 additions & 27 deletions src/lib/MessageStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -605,59 +605,51 @@ export class MessageStream<ParsedT = null> implements AsyncIterable<MessageStrea
switch (event.delta.type) {
case 'text_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
text: (snapshotContent.text || '') + event.delta.text,
};
snapshotContent.text = (snapshotContent.text || '') + event.delta.text;
}
break;
}
case 'citations_delta': {
if (snapshotContent?.type === 'text') {
snapshot.content[event.index] = {
...snapshotContent,
citations: [...(snapshotContent.citations ?? []), event.delta.citation],
};
if (!snapshotContent.citations) {
snapshotContent.citations = [];
}
snapshotContent.citations.push(event.delta.citation);
}
break;
}
case 'input_json_delta': {
if (snapshotContent && tracksToolInput(snapshotContent)) {
// we need to keep track of the raw JSON string as well so that we can
// re-parse it for each delta, for now we just store it as an untyped
// non-enumerable property on the snapshot
// Track the raw JSON string as a non-enumerable property so it's
// hidden from JSON.stringify/serialization but available for re-parsing
let jsonBuf = (snapshotContent as any)[JSON_BUF_PROPERTY] || '';
jsonBuf += event.delta.partial_json;

const newContent = { ...snapshotContent };
Object.defineProperty(newContent, JSON_BUF_PROPERTY, {
value: jsonBuf,
enumerable: false,
writable: true,
});
if (!(JSON_BUF_PROPERTY in snapshotContent)) {
Object.defineProperty(snapshotContent, JSON_BUF_PROPERTY, {
value: jsonBuf,
enumerable: false,
writable: true,
});
} else {
(snapshotContent as any)[JSON_BUF_PROPERTY] = jsonBuf;
}

if (jsonBuf) {
newContent.input = partialParse(jsonBuf);
(snapshotContent as any).input = partialParse(jsonBuf);
}
snapshot.content[event.index] = newContent;
}
break;
}
case 'thinking_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
thinking: snapshotContent.thinking + event.delta.thinking,
};
snapshotContent.thinking = snapshotContent.thinking + event.delta.thinking;
}
break;
}
case 'signature_delta': {
if (snapshotContent?.type === 'thinking') {
snapshot.content[event.index] = {
...snapshotContent,
signature: event.delta.signature,
};
snapshotContent.signature = event.delta.signature;
}
break;
}
Expand Down
63 changes: 63 additions & 0 deletions tests/api-resources/MessageStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,69 @@ describe('MessageStream class', () => {
expect(finalText).toBe("I'll check the current weather in Paris for you.");
});

it('accumulates large text responses in linear time', async () => {
const { fetch, handleStreamEvents } = mockFetch();

const anthropic = new Anthropic({ apiKey: '...', fetch });

const NUM_DELTAS = 5000;
const CHUNK = 'abcdefghij'; // 10 chars per delta

// Build a synthetic stream with many text deltas
const events: any[] = [
{
type: 'message_start',
message: {
id: 'msg_perf_test',
type: 'message',
role: 'assistant',
content: [],
model: 'claude-opus-4-20250514',
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 10, output_tokens: 1 },
},
},
{
type: 'content_block_start',
index: 0,
content_block: { type: 'text', text: '' },
},
];

for (let i = 0; i < NUM_DELTAS; i++) {
events.push({
type: 'content_block_delta',
index: 0,
delta: { type: 'text_delta', text: CHUNK },
});
}

events.push(
{ type: 'content_block_stop', index: 0 },
{ type: 'message_delta', delta: { stop_reason: 'end_turn', stop_sequence: null }, usage: { output_tokens: NUM_DELTAS } },
{ type: 'message_stop' },
);

handleStreamEvents(events);

const stream = anthropic.messages.stream({
max_tokens: 1024,
model: 'claude-opus-4-20250514',
messages: [{ role: 'user', content: 'test' }],
});

const finalMessage = await stream.finalMessage();
const finalText = await stream.finalText();

// Verify correctness
expect(finalText).toBe(CHUNK.repeat(NUM_DELTAS));
expect(finalMessage.content[0]!.type).toBe('text');
if (finalMessage.content[0]!.type === 'text') {
expect(finalMessage.content[0]!.text).toBe(CHUNK.repeat(NUM_DELTAS));
}
});

it('does not throw unhandled rejection with withResponse()', async () => {
const { fetch, handleRequest } = mockFetch();
const anthropic = new Anthropic({
Expand Down