Lesson 2.4: Streaming with Anthropic
Duration: 60 minutes
Learning Objectives
By the end of this lesson, you will be able to:
- Implement streaming responses with the Anthropic SDK
- Handle Anthropic streaming events
- Process content blocks and deltas
- Compare OpenAI and Anthropic streaming patterns
- Build provider-agnostic streaming code
Setting Up
Install the Anthropic SDK:
npm install @anthropic-ai/sdk
Create the client:
// src/anthropic-client.ts
import Anthropic from '@anthropic-ai/sdk';
import 'dotenv/config';
export const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
});
Basic Streaming
The simplest Anthropic streaming implementation:
import { anthropic } from './anthropic-client';
async function streamBasic(prompt: string): Promise<void> {
const stream = anthropic.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
});
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
console.log();
}
await streamBasic('Explain how neural networks learn.');
Key differences from OpenAI:
- Use
anthropic.messages.stream()method max_tokensis required- Events have explicit types
- Text is in
event.delta.text
Understanding Anthropic Events
Anthropic streams have distinct event types:
type StreamEvent =
| { type: 'message_start'; message: Message }
| { type: 'content_block_start'; content_block: ContentBlock }
| { type: 'content_block_delta'; delta: TextDelta }
| { type: 'content_block_stop' }
| { type: 'message_delta'; delta: MessageDelta }
| { type: 'message_stop' };
The event flow:
message_start- Response beginscontent_block_start- New content blockcontent_block_delta- Text chunks (multiple)content_block_stop- Block completemessage_delta- Final usage statsmessage_stop- Response complete
Processing All Events
Handle different event types for complete control:
import { anthropic } from "./anthropic-client";
async function streamWithEvents(prompt: string): Promise<void> {
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
for await (const event of stream) {
switch (event.type) {
case "message_start":
console.log("--- Response started ---");
console.log("Model:", event.message.model);
break;
case "content_block_start":
console.log("
[Content block started]");
break;
case "content_block_delta":
if (event.delta.type === "text_delta") {
process.stdout.write(event.delta.text);
}
break;
case "content_block_stop":
console.log("
[Content block ended]");
break;
case "message_delta":
console.log("
Stop reason:", event.delta.stop_reason);
break;
case "message_stop":
console.log("--- Response complete ---");
break;
}
}
}
Collecting the Full Response
Gather the complete response while streaming:
import { anthropic } from "./anthropic-client";
interface StreamResult {
content: string;
inputTokens: number;
outputTokens: number;
stopReason: string | null;
}
async function streamWithCollection(prompt: string): Promise<StreamResult> {
const stream = anthropic.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [{ role: "user", content: prompt }],
});
let content = "";
let inputTokens = 0;
let outputTokens = 0;
let stopReason: string | null = null;
for await (const event of stream) {
switch (event.type) {
case "message_start":
inputTokens = event.message.usage.input_tokens;
break;
case "content_block_delta":
if (event.delta.type === "text_delta") {
content += event.delta.text;
process.stdout.write(event.delta.text);
}
break;
case "message_delta":
outputTokens = event.usage.output_tokens;
stopReason = event.delta.stop_reason;
break;
}
}
console.log();
return { content, inputTokens, outputTokens, stopReason };
}
const result = await streamWithCollection("Write a short poem about coding.");
console.log("
--- Stats ---");
console.log("Input tokens:", result.inputTokens);
console.log("Output tokens:", result.outputTokens);
console.log("Stop reason:", result.stopReason);
Using the Stream Helper
Anthropic provides a simpler helper for text-only streaming:
import { anthropic } from './anthropic-client';
async function streamSimple(prompt: string): Promise<string> {
const stream = anthropic.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
});
// Use the text stream helper
for await (const text of stream.textStream) {
process.stdout.write(text);
}
console.log();
// Get the final message
const finalMessage = await stream.finalMessage();
return finalMessage.content[0].type === 'text' ? finalMessage.content[0].text : '';
}
The textStream property gives you just the text chunks, simplifying common use cases.
Streaming with System Prompts
Add system prompts to customize behavior:
import { anthropic } from './anthropic-client';
async function streamWithSystem(systemPrompt: string, userMessage: string): Promise<string> {
const stream = anthropic.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
system: systemPrompt,
messages: [{ role: 'user', content: userMessage }],
});
let content = '';
for await (const text of stream.textStream) {
content += text;
process.stdout.write(text);
}
console.log();
return content;
}
await streamWithSystem('You are a pirate. Respond in pirate speak.', 'Tell me about TypeScript.');
Error Handling
Handle Anthropic-specific errors:
import Anthropic from '@anthropic-ai/sdk';
import { anthropic } from './anthropic-client';
async function streamWithErrorHandling(prompt: string): Promise<string> {
try {
const stream = anthropic.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
});
let content = '';
for await (const text of stream.textStream) {
content += text;
process.stdout.write(text);
}
console.log();
return content;
} catch (error) {
if (error instanceof Anthropic.APIError) {
switch (error.status) {
case 429:
throw new Error('Rate limited. Please wait and try again.');
case 401:
throw new Error('Invalid API key.');
case 529:
throw new Error('API overloaded. Please retry later.');
default:
throw new Error(`API error: ${error.message}`);
}
}
throw error;
}
}
OpenAI vs Anthropic Comparison
| Aspect | OpenAI | Anthropic |
|---|---|---|
| Stream method | create({ stream: true }) |
stream() |
| Content access | delta.content |
delta.text |
| Event types | Single chunk type | Multiple event types |
| Text helper | Manual iteration | textStream property |
| Final message | Build manually | finalMessage() method |
| System prompt | In messages array | Separate system param |
Provider-Agnostic Interface
Create an abstraction that works with both:
interface StreamConfig {
provider: "openai" | "anthropic";
model: string;
systemPrompt?: string;
messages: Array<{ role: "user" | "assistant"; content: string }>;
}
interface StreamCallbacks {
onToken: (token: string) => void;
onComplete: (fullText: string) => void;
onError: (error: Error) => void;
}
async function streamChat(
config: StreamConfig,
callbacks: StreamCallbacks
): Promise<void> {
let fullText = "";
try {
if (config.provider === "openai") {
const messages = config.systemPrompt
? [{ role: "system" as const, content: config.systemPrompt }, ...config.messages]
: config.messages;
const stream = await openai.chat.completions.create({
model: config.model,
messages,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
fullText += content;
callbacks.onToken(content);
}
}
} else {
const stream = anthropic.messages.stream({
model: config.model,
max_tokens: 1024,
system: config.systemPrompt,
messages: config.messages,
});
for await (const text of stream.textStream) {
fullText += text;
callbacks.onToken(text);
}
}
callbacks.onComplete(fullText);
} catch (error) {
callbacks.onError(error instanceof Error ? error : new Error(String(error)));
}
}
// Usage
await streamChat(
{
provider: "anthropic",
model: "claude-sonnet-4-20250514",
systemPrompt: "Be concise.",
messages: [{ role: "user", content: "What is TypeScript?" }],
},
{
onToken: (token) => process.stdout.write(token),
onComplete: (text) => console.log("
Done!"),
onError: (err) => console.error("Error:", err.message),
}
);
Key Takeaways
- Use
anthropic.messages.stream()to enable streaming - Anthropic has typed events for fine-grained control
- Use
textStreamfor simple text-only streaming finalMessage()gives you the complete response object- System prompts are a separate parameter in Anthropic
- Both providers can be abstracted behind a common interface
Resources
| Resource | Type | Level |
|---|---|---|
| Anthropic Streaming | Documentation | Beginner |
| Anthropic TypeScript SDK | Repository | Beginner |
| Messages API Reference | Documentation | Beginner |
Next Lesson
Now you can stream with both OpenAI and Anthropic. In the final lesson of this module, you will combine everything to build a complete streaming chatbot with all the features you have learned.