From Zero to AI

Lesson 2.4: Streaming with Anthropic

Duration: 60 minutes

Learning Objectives

By the end of this lesson, you will be able to:

  1. Implement streaming responses with the Anthropic SDK
  2. Handle Anthropic streaming events
  3. Process content blocks and deltas
  4. Compare OpenAI and Anthropic streaming patterns
  5. Build provider-agnostic streaming code

Setting Up

Install the Anthropic SDK:

npm install @anthropic-ai/sdk

Create the client:

// src/anthropic-client.ts
import Anthropic from '@anthropic-ai/sdk';
import 'dotenv/config';

export const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

Basic Streaming

The simplest Anthropic streaming implementation:

import { anthropic } from './anthropic-client';

async function streamBasic(prompt: string): Promise<void> {
  const stream = anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: prompt }],
  });

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  }
  console.log();
}

await streamBasic('Explain how neural networks learn.');

Key differences from OpenAI:

  • Use anthropic.messages.stream() method
  • max_tokens is required
  • Events have explicit types
  • Text is in event.delta.text

Understanding Anthropic Events

Anthropic streams have distinct event types:

type StreamEvent =
  | { type: 'message_start'; message: Message }
  | { type: 'content_block_start'; content_block: ContentBlock }
  | { type: 'content_block_delta'; delta: TextDelta }
  | { type: 'content_block_stop' }
  | { type: 'message_delta'; delta: MessageDelta }
  | { type: 'message_stop' };

The event flow:

  1. message_start - Response begins
  2. content_block_start - New content block
  3. content_block_delta - Text chunks (multiple)
  4. content_block_stop - Block complete
  5. message_delta - Final usage stats
  6. message_stop - Response complete

Processing All Events

Handle different event types for complete control:

import { anthropic } from "./anthropic-client";

async function streamWithEvents(prompt: string): Promise<void> {
  const stream = anthropic.messages.stream({
    model: "claude-sonnet-4-20250514",
    max_tokens: 1024,
    messages: [{ role: "user", content: prompt }],
  });

  for await (const event of stream) {
    switch (event.type) {
      case "message_start":
        console.log("--- Response started ---");
        console.log("Model:", event.message.model);
        break;

      case "content_block_start":
        console.log("
[Content block started]");
        break;

      case "content_block_delta":
        if (event.delta.type === "text_delta") {
          process.stdout.write(event.delta.text);
        }
        break;

      case "content_block_stop":
        console.log("
[Content block ended]");
        break;

      case "message_delta":
        console.log("
Stop reason:", event.delta.stop_reason);
        break;

      case "message_stop":
        console.log("--- Response complete ---");
        break;
    }
  }
}

Collecting the Full Response

Gather the complete response while streaming:

import { anthropic } from "./anthropic-client";

interface StreamResult {
  content: string;
  inputTokens: number;
  outputTokens: number;
  stopReason: string | null;
}

async function streamWithCollection(prompt: string): Promise<StreamResult> {
  const stream = anthropic.messages.stream({
    model: "claude-sonnet-4-20250514",
    max_tokens: 1024,
    messages: [{ role: "user", content: prompt }],
  });

  let content = "";
  let inputTokens = 0;
  let outputTokens = 0;
  let stopReason: string | null = null;

  for await (const event of stream) {
    switch (event.type) {
      case "message_start":
        inputTokens = event.message.usage.input_tokens;
        break;

      case "content_block_delta":
        if (event.delta.type === "text_delta") {
          content += event.delta.text;
          process.stdout.write(event.delta.text);
        }
        break;

      case "message_delta":
        outputTokens = event.usage.output_tokens;
        stopReason = event.delta.stop_reason;
        break;
    }
  }

  console.log();

  return { content, inputTokens, outputTokens, stopReason };
}

const result = await streamWithCollection("Write a short poem about coding.");
console.log("
--- Stats ---");
console.log("Input tokens:", result.inputTokens);
console.log("Output tokens:", result.outputTokens);
console.log("Stop reason:", result.stopReason);

Using the Stream Helper

Anthropic provides a simpler helper for text-only streaming:

import { anthropic } from './anthropic-client';

async function streamSimple(prompt: string): Promise<string> {
  const stream = anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: prompt }],
  });

  // Use the text stream helper
  for await (const text of stream.textStream) {
    process.stdout.write(text);
  }

  console.log();

  // Get the final message
  const finalMessage = await stream.finalMessage();
  return finalMessage.content[0].type === 'text' ? finalMessage.content[0].text : '';
}

The textStream property gives you just the text chunks, simplifying common use cases.


Streaming with System Prompts

Add system prompts to customize behavior:

import { anthropic } from './anthropic-client';

async function streamWithSystem(systemPrompt: string, userMessage: string): Promise<string> {
  const stream = anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    system: systemPrompt,
    messages: [{ role: 'user', content: userMessage }],
  });

  let content = '';

  for await (const text of stream.textStream) {
    content += text;
    process.stdout.write(text);
  }

  console.log();
  return content;
}

await streamWithSystem('You are a pirate. Respond in pirate speak.', 'Tell me about TypeScript.');

Error Handling

Handle Anthropic-specific errors:

import Anthropic from '@anthropic-ai/sdk';

import { anthropic } from './anthropic-client';

async function streamWithErrorHandling(prompt: string): Promise<string> {
  try {
    const stream = anthropic.messages.stream({
      model: 'claude-sonnet-4-20250514',
      max_tokens: 1024,
      messages: [{ role: 'user', content: prompt }],
    });

    let content = '';

    for await (const text of stream.textStream) {
      content += text;
      process.stdout.write(text);
    }

    console.log();
    return content;
  } catch (error) {
    if (error instanceof Anthropic.APIError) {
      switch (error.status) {
        case 429:
          throw new Error('Rate limited. Please wait and try again.');
        case 401:
          throw new Error('Invalid API key.');
        case 529:
          throw new Error('API overloaded. Please retry later.');
        default:
          throw new Error(`API error: ${error.message}`);
      }
    }
    throw error;
  }
}

OpenAI vs Anthropic Comparison

Aspect OpenAI Anthropic
Stream method create({ stream: true }) stream()
Content access delta.content delta.text
Event types Single chunk type Multiple event types
Text helper Manual iteration textStream property
Final message Build manually finalMessage() method
System prompt In messages array Separate system param

Provider-Agnostic Interface

Create an abstraction that works with both:

interface StreamConfig {
  provider: "openai" | "anthropic";
  model: string;
  systemPrompt?: string;
  messages: Array<{ role: "user" | "assistant"; content: string }>;
}

interface StreamCallbacks {
  onToken: (token: string) => void;
  onComplete: (fullText: string) => void;
  onError: (error: Error) => void;
}

async function streamChat(
  config: StreamConfig,
  callbacks: StreamCallbacks
): Promise<void> {
  let fullText = "";

  try {
    if (config.provider === "openai") {
      const messages = config.systemPrompt
        ? [{ role: "system" as const, content: config.systemPrompt }, ...config.messages]
        : config.messages;

      const stream = await openai.chat.completions.create({
        model: config.model,
        messages,
        stream: true,
      });

      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
          fullText += content;
          callbacks.onToken(content);
        }
      }
    } else {
      const stream = anthropic.messages.stream({
        model: config.model,
        max_tokens: 1024,
        system: config.systemPrompt,
        messages: config.messages,
      });

      for await (const text of stream.textStream) {
        fullText += text;
        callbacks.onToken(text);
      }
    }

    callbacks.onComplete(fullText);
  } catch (error) {
    callbacks.onError(error instanceof Error ? error : new Error(String(error)));
  }
}

// Usage
await streamChat(
  {
    provider: "anthropic",
    model: "claude-sonnet-4-20250514",
    systemPrompt: "Be concise.",
    messages: [{ role: "user", content: "What is TypeScript?" }],
  },
  {
    onToken: (token) => process.stdout.write(token),
    onComplete: (text) => console.log("

Done!"),
    onError: (err) => console.error("Error:", err.message),
  }
);

Key Takeaways

  1. Use anthropic.messages.stream() to enable streaming
  2. Anthropic has typed events for fine-grained control
  3. Use textStream for simple text-only streaming
  4. finalMessage() gives you the complete response object
  5. System prompts are a separate parameter in Anthropic
  6. Both providers can be abstracted behind a common interface

Resources

Resource Type Level
Anthropic Streaming Documentation Beginner
Anthropic TypeScript SDK Repository Beginner
Messages API Reference Documentation Beginner

Next Lesson

Now you can stream with both OpenAI and Anthropic. In the final lesson of this module, you will combine everything to build a complete streaming chatbot with all the features you have learned.

Continue to Lesson 2.5: Practice - Streaming Chatbot