Lesson 2.3: Streaming with OpenAI
Duration: 60 minutes
Learning Objectives
By the end of this lesson, you will be able to:
- Implement streaming responses with the OpenAI SDK
- Handle stream chunks and build complete responses
- Process streaming events and metadata
- Implement proper error handling for streams
- Measure and optimize streaming performance
Setting Up
First, ensure you have the OpenAI SDK installed:
npm install openai
Create a basic setup file:
// src/openai-client.ts
import 'dotenv/config';
import OpenAI from 'openai';
export const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
Basic Streaming
The simplest streaming implementation:
import { openai } from './openai-client';
async function streamBasic(prompt: string): Promise<void> {
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
console.log(); // Final newline
}
await streamBasic('Explain quantum computing in simple terms.');
Key points:
- Set
stream: truein the request options - Use
for await...ofto iterate over chunks - Access content via
chunk.choices[0].delta.content - Delta contains only the NEW content for this chunk
Understanding Stream Chunks
Each chunk has this structure:
interface ChatCompletionChunk {
id: string;
object: 'chat.completion.chunk';
created: number;
model: string;
choices: Array<{
index: number;
delta: {
role?: 'assistant';
content?: string;
};
finish_reason: 'stop' | 'length' | null;
}>;
}
The first chunk typically contains the role, subsequent chunks contain content, and the last chunk has finish_reason: "stop".
Collecting the Full Response
Often you need both streaming output AND the complete response:
import { openai } from "./openai-client";
interface StreamResult {
content: string;
finishReason: string | null;
}
async function streamWithCollection(prompt: string): Promise<StreamResult> {
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
let fullContent = "";
let finishReason: string | null = null;
for await (const chunk of stream) {
const choice = chunk.choices[0];
if (choice?.delta?.content) {
const content = choice.delta.content;
fullContent += content;
process.stdout.write(content);
}
if (choice?.finish_reason) {
finishReason = choice.finish_reason;
}
}
console.log();
return { content: fullContent, finishReason };
}
const result = await streamWithCollection("Write a haiku about programming.");
console.log("
--- Complete Response ---");
console.log(result.content);
console.log("Finish reason:", result.finishReason);
Streaming with Conversation History
For chatbots, maintain message history while streaming:
import type { ChatCompletionMessageParam } from 'openai/resources/chat/completions';
import { openai } from './openai-client';
async function chatWithHistory(messages: ChatCompletionMessageParam[]): Promise<string> {
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages,
stream: true,
});
let response = '';
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
response += content;
process.stdout.write(content);
}
}
console.log();
return response;
}
// Usage
const history: ChatCompletionMessageParam[] = [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'What is TypeScript?' },
];
const response1 = await chatWithHistory(history);
history.push({ role: 'assistant', content: response1 });
history.push({ role: 'user', content: 'How is it different from JavaScript?' });
const response2 = await chatWithHistory(history);
Error Handling
Streams can fail mid-response. Handle errors gracefully:
import { openai } from "./openai-client";
async function streamWithErrorHandling(prompt: string): Promise<string> {
try {
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
let content = "";
try {
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
content += delta;
process.stdout.write(delta);
}
}
} catch (streamError) {
console.error("
Stream interrupted:", streamError);
// Return partial content
return content + "
[Response interrupted]";
}
console.log();
return content;
} catch (error) {
if (error instanceof Error) {
if (error.message.includes("429")) {
throw new Error("Rate limited. Please wait and try again.");
}
if (error.message.includes("401")) {
throw new Error("Invalid API key.");
}
}
throw error;
}
}
Aborting Streams
Allow users to cancel long responses:
import { openai } from "./openai-client";
async function streamWithAbort(
prompt: string,
signal?: AbortSignal
): Promise<string> {
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
let content = "";
for await (const chunk of stream) {
// Check for abort
if (signal?.aborted) {
console.log("
[Aborted by user]");
break;
}
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
content += delta;
process.stdout.write(delta);
}
}
console.log();
return content;
}
// Usage with timeout
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000); // Abort after 5 seconds
await streamWithAbort("Write a long story.", controller.signal);
Measuring Performance
Track streaming metrics:
import { openai } from "./openai-client";
interface StreamMetrics {
content: string;
timeToFirstToken: number;
totalTime: number;
tokenCount: number;
tokensPerSecond: number;
}
async function streamWithMetrics(prompt: string): Promise<StreamMetrics> {
const startTime = Date.now();
let firstTokenTime: number | null = null;
let tokenCount = 0;
let content = "";
const stream = await openai.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: prompt }],
stream: true,
});
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content;
if (delta) {
if (firstTokenTime === null) {
firstTokenTime = Date.now();
}
tokenCount++;
content += delta;
process.stdout.write(delta);
}
}
console.log();
const endTime = Date.now();
const totalTime = endTime - startTime;
return {
content,
timeToFirstToken: firstTokenTime ? firstTokenTime - startTime : 0,
totalTime,
tokenCount,
tokensPerSecond: tokenCount / (totalTime / 1000),
};
}
const metrics = await streamWithMetrics("Explain machine learning.");
console.log("
--- Metrics ---");
console.log(`Time to first token: ${metrics.timeToFirstToken}ms`);
console.log(`Total time: ${metrics.totalTime}ms`);
console.log(`Tokens: ${metrics.tokenCount}`);
console.log(`Speed: ${metrics.tokensPerSecond.toFixed(1)} tokens/sec`);
Key Takeaways
- Enable streaming with
stream: truein the API request - Use
for await...ofto iterate over chunks - Access new content via
delta.contenton each chunk - Collect full response by concatenating chunks
- Handle errors at two levels: request errors and stream errors
- Support cancellation with AbortController for better UX
Resources
| Resource | Type | Level |
|---|---|---|
| OpenAI Streaming Guide | Documentation | Beginner |
| OpenAI Node.js SDK | Repository | Beginner |
| Chat Completions API | Documentation | Beginner |
Next Lesson
You have learned how to stream with OpenAI. In the next lesson, you will implement the same functionality with Anthropic Claude, and see how the two approaches compare.