Lesson 5.5: Practice - Data Pipeline
Duration: 60 minutes
Learning Objectives
By the end of this lesson, you will be able to:
- Design and implement a complete data processing pipeline
- Combine fetching, validation, transformation, and error handling
- Process data from multiple API endpoints
- Build a reusable pipeline architecture
- Handle real-world data processing scenarios
Introduction
In this practice lesson, we will build a complete data processing pipeline that fetches data from an API, validates it, transforms it, and produces a clean output. This combines everything we learned in this module: JSON parsing, data transformation, Zod validation, and error handling.
Our goal: Build a User Activity Report that combines user data with their posts and comments from the JSONPlaceholder API.
Project Overview
Data Sources
We will use three endpoints from JSONPlaceholder:
GET /users- List of usersGET /posts- All posts (linked to users via userId)GET /comments- All comments (linked to posts via postId)
Final Output
interface UserActivityReport {
generatedAt: Date;
totalUsers: number;
totalPosts: number;
totalComments: number;
users: UserActivity[];
}
interface UserActivity {
id: number;
name: string;
email: string;
username: string;
postCount: number;
commentCount: number;
posts: PostSummary[];
}
interface PostSummary {
id: number;
title: string;
commentCount: number;
}
Step 1: Define Schemas
First, let us define Zod schemas for the API responses.
import { z } from 'zod';
// User schema (from API)
const ApiUserSchema = z.object({
id: z.number(),
name: z.string(),
username: z.string(),
email: z.string().email(),
address: z.object({
street: z.string(),
suite: z.string(),
city: z.string(),
zipcode: z.string(),
geo: z.object({
lat: z.string(),
lng: z.string(),
}),
}),
phone: z.string(),
website: z.string(),
company: z.object({
name: z.string(),
catchPhrase: z.string(),
bs: z.string(),
}),
});
// Post schema (from API)
const ApiPostSchema = z.object({
userId: z.number(),
id: z.number(),
title: z.string(),
body: z.string(),
});
// Comment schema (from API)
const ApiCommentSchema = z.object({
postId: z.number(),
id: z.number(),
name: z.string(),
email: z.string().email(),
body: z.string(),
});
// Array schemas
const ApiUsersSchema = z.array(ApiUserSchema);
const ApiPostsSchema = z.array(ApiPostSchema);
const ApiCommentsSchema = z.array(ApiCommentSchema);
// Infer types
type ApiUser = z.infer<typeof ApiUserSchema>;
type ApiPost = z.infer<typeof ApiPostSchema>;
type ApiComment = z.infer<typeof ApiCommentSchema>;
Step 2: Create Fetch Utilities
Build reusable fetch functions with error handling.
// Result type for error handling
type Result<T, E = Error> = { success: true; data: T } | { success: false; error: E };
interface FetchError {
type: 'network' | 'http' | 'validation';
message: string;
details?: unknown;
}
function ok<T>(data: T): Result<T, never> {
return { success: true, data };
}
function err<E>(error: E): Result<never, E> {
return { success: false, error };
}
// Generic fetch with validation
async function fetchAndValidate<T>(
url: string,
schema: z.ZodSchema<T>,
retries: number = 3
): Promise<Result<T, FetchError>> {
let lastError: FetchError | undefined;
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const response = await fetch(url);
if (!response.ok) {
lastError = {
type: 'http',
message: `HTTP ${response.status}: ${response.statusText}`,
};
// Retry on server errors
if (response.status >= 500 && attempt < retries) {
await sleep(1000 * attempt);
continue;
}
return err(lastError);
}
const data = await response.json();
const parsed = schema.safeParse(data);
if (!parsed.success) {
return err({
type: 'validation',
message: 'Invalid response data',
details: parsed.error.errors,
});
}
return ok(parsed.data);
} catch (error) {
lastError = {
type: 'network',
message: error instanceof Error ? error.message : 'Network error',
};
if (attempt < retries) {
await sleep(1000 * attempt);
continue;
}
}
}
return err(lastError || { type: 'network', message: 'Unknown error' });
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Step 3: Fetch All Data
Create functions to fetch all required data in parallel.
const API_BASE = 'https://jsonplaceholder.typicode.com';
interface RawData {
users: ApiUser[];
posts: ApiPost[];
comments: ApiComment[];
}
async function fetchAllData(): Promise<Result<RawData, FetchError>> {
console.log('Fetching data from API...');
// Fetch all data in parallel
const [usersResult, postsResult, commentsResult] = await Promise.all([
fetchAndValidate(`${API_BASE}/users`, ApiUsersSchema),
fetchAndValidate(`${API_BASE}/posts`, ApiPostsSchema),
fetchAndValidate(`${API_BASE}/comments`, ApiCommentsSchema),
]);
// Check for errors
if (!usersResult.success) {
return err({ ...usersResult.error, message: `Users: ${usersResult.error.message}` });
}
if (!postsResult.success) {
return err({ ...postsResult.error, message: `Posts: ${postsResult.error.message}` });
}
if (!commentsResult.success) {
return err({ ...commentsResult.error, message: `Comments: ${commentsResult.error.message}` });
}
console.log(
`Fetched: ${usersResult.data.length} users, ${postsResult.data.length} posts, ${commentsResult.data.length} comments`
);
return ok({
users: usersResult.data,
posts: postsResult.data,
comments: commentsResult.data,
});
}
Step 4: Transform Data
Transform the raw API data into our desired report format.
interface PostSummary {
id: number;
title: string;
commentCount: number;
}
interface UserActivity {
id: number;
name: string;
email: string;
username: string;
postCount: number;
commentCount: number;
posts: PostSummary[];
}
interface UserActivityReport {
generatedAt: Date;
totalUsers: number;
totalPosts: number;
totalComments: number;
users: UserActivity[];
}
function transformData(raw: RawData): UserActivityReport {
console.log('Transforming data...');
// Group posts by userId
const postsByUser = raw.posts.reduce(
(acc, post) => {
if (!acc[post.userId]) {
acc[post.userId] = [];
}
acc[post.userId].push(post);
return acc;
},
{} as Record<number, ApiPost[]>
);
// Count comments per post
const commentsByPost = raw.comments.reduce(
(acc, comment) => {
acc[comment.postId] = (acc[comment.postId] || 0) + 1;
return acc;
},
{} as Record<number, number>
);
// Transform users
const users: UserActivity[] = raw.users.map((user) => {
const userPosts = postsByUser[user.id] || [];
// Create post summaries with comment counts
const posts: PostSummary[] = userPosts.map((post) => ({
id: post.id,
title: post.title,
commentCount: commentsByPost[post.id] || 0,
}));
// Calculate total comments for this user
const commentCount = posts.reduce((sum, post) => sum + post.commentCount, 0);
return {
id: user.id,
name: user.name,
email: user.email,
username: user.username,
postCount: userPosts.length,
commentCount,
posts,
};
});
// Sort users by activity (posts + comments)
users.sort((a, b) => {
const activityA = a.postCount + a.commentCount;
const activityB = b.postCount + b.commentCount;
return activityB - activityA;
});
return {
generatedAt: new Date(),
totalUsers: raw.users.length,
totalPosts: raw.posts.length,
totalComments: raw.comments.length,
users,
};
}
Step 5: Add Filtering and Statistics
Add utility functions to analyze the report.
interface ReportStatistics {
averagePostsPerUser: number;
averageCommentsPerPost: number;
mostActiveUser: UserActivity | null;
leastActiveUser: UserActivity | null;
usersWithNoPosts: number;
topPosters: UserActivity[];
}
function calculateStatistics(report: UserActivityReport): ReportStatistics {
const { users, totalPosts, totalComments, totalUsers } = report;
const averagePostsPerUser = totalUsers > 0 ? totalPosts / totalUsers : 0;
const averageCommentsPerPost = totalPosts > 0 ? totalComments / totalPosts : 0;
// Users are already sorted by activity
const mostActiveUser = users.length > 0 ? users[0] : null;
const leastActiveUser = users.length > 0 ? users[users.length - 1] : null;
const usersWithNoPosts = users.filter((u) => u.postCount === 0).length;
const topPosters = users.slice(0, 5);
return {
averagePostsPerUser,
averageCommentsPerPost,
mostActiveUser,
leastActiveUser,
usersWithNoPosts,
topPosters,
};
}
function filterUsersByActivity(
report: UserActivityReport,
minPosts: number = 0,
minComments: number = 0
): UserActivity[] {
return report.users.filter(
(user) => user.postCount >= minPosts && user.commentCount >= minComments
);
}
function searchUsers(report: UserActivityReport, query: string): UserActivity[] {
const lowerQuery = query.toLowerCase();
return report.users.filter(
(user) =>
user.name.toLowerCase().includes(lowerQuery) ||
user.email.toLowerCase().includes(lowerQuery) ||
user.username.toLowerCase().includes(lowerQuery)
);
}
Step 6: Build the Pipeline
Combine everything into a single pipeline function.
interface PipelineOptions {
minPosts?: number;
minComments?: number;
searchQuery?: string;
includeStatistics?: boolean;
}
interface PipelineResult {
report: UserActivityReport;
filteredUsers: UserActivity[];
statistics?: ReportStatistics;
}
async function runPipeline(
options: PipelineOptions = {}
): Promise<Result<PipelineResult, FetchError>> {
const { minPosts = 0, minComments = 0, searchQuery, includeStatistics = true } = options;
// Step 1: Fetch data
const dataResult = await fetchAllData();
if (!dataResult.success) {
return dataResult;
}
// Step 2: Transform
const report = transformData(dataResult.data);
// Step 3: Filter
let filteredUsers = filterUsersByActivity(report, minPosts, minComments);
if (searchQuery) {
filteredUsers = filteredUsers.filter(
(user) =>
user.name.toLowerCase().includes(searchQuery.toLowerCase()) ||
user.email.toLowerCase().includes(searchQuery.toLowerCase()) ||
user.username.toLowerCase().includes(searchQuery.toLowerCase())
);
}
// Step 4: Statistics (optional)
const statistics = includeStatistics ? calculateStatistics(report) : undefined;
return ok({
report,
filteredUsers,
statistics,
});
}
Step 7: Display Results
Create functions to format and display the results.
function formatReport(result: PipelineResult): string {
const { report, filteredUsers, statistics } = result;
const lines: string[] = [
'='.repeat(60),
'USER ACTIVITY REPORT',
'='.repeat(60),
'',
`Generated: ${report.generatedAt.toISOString()}`,
'',
'SUMMARY',
'-'.repeat(40),
`Total Users: ${report.totalUsers}`,
`Total Posts: ${report.totalPosts}`,
`Total Comments: ${report.totalComments}`,
'',
];
if (statistics) {
lines.push(
'STATISTICS',
'-'.repeat(40),
`Average Posts per User: ${statistics.averagePostsPerUser.toFixed(1)}`,
`Average Comments per Post: ${statistics.averageCommentsPerPost.toFixed(1)}`,
`Users with No Posts: ${statistics.usersWithNoPosts}`,
''
);
if (statistics.mostActiveUser) {
lines.push(
`Most Active User: ${statistics.mostActiveUser.name}`,
` Posts: ${statistics.mostActiveUser.postCount}, Comments: ${statistics.mostActiveUser.commentCount}`,
''
);
}
lines.push('TOP 5 POSTERS', '-'.repeat(40));
statistics.topPosters.forEach((user, index) => {
lines.push(
`${index + 1}. ${user.name} (@${user.username})`,
` Posts: ${user.postCount}, Comments received: ${user.commentCount}`
);
});
lines.push('');
}
lines.push(
'FILTERED USERS',
'-'.repeat(40),
`Showing ${filteredUsers.length} of ${report.totalUsers} users`,
''
);
filteredUsers.forEach((user) => {
lines.push(
`${user.name} (@${user.username})`,
` Email: ${user.email}`,
` Posts: ${user.postCount}, Comments received: ${user.commentCount}`
);
if (user.posts.length > 0) {
lines.push(' Recent posts:');
user.posts.slice(0, 3).forEach((post) => {
lines.push(` - ${post.title.substring(0, 50)}... (${post.commentCount} comments)`);
});
}
lines.push('');
});
lines.push('='.repeat(60));
return lines.join('\n');
}
Step 8: Main Function
Put it all together.
async function main(): Promise<void> {
console.log('Starting User Activity Report Pipeline...\n');
const result = await runPipeline({
minPosts: 5, // Only users with 5+ posts
includeStatistics: true,
});
if (!result.success) {
console.error('Pipeline failed!');
console.error(`Error type: ${result.error.type}`);
console.error(`Message: ${result.error.message}`);
if (result.error.details) {
console.error('Details:', JSON.stringify(result.error.details, null, 2));
}
process.exit(1);
}
const output = formatReport(result.data);
console.log(output);
// Export as JSON
const jsonOutput = JSON.stringify(
{
generatedAt: result.data.report.generatedAt,
statistics: result.data.statistics,
users: result.data.filteredUsers,
},
null,
2
);
console.log('\nJSON Output (first 500 chars):');
console.log(jsonOutput.substring(0, 500) + '...');
}
main().catch(console.error);
Complete Code
Here is the complete pipeline code in one file:
import { z } from 'zod';
// ============================================
// SCHEMAS
// ============================================
const ApiUserSchema = z.object({
id: z.number(),
name: z.string(),
username: z.string(),
email: z.string().email(),
address: z.object({
street: z.string(),
suite: z.string(),
city: z.string(),
zipcode: z.string(),
geo: z.object({
lat: z.string(),
lng: z.string(),
}),
}),
phone: z.string(),
website: z.string(),
company: z.object({
name: z.string(),
catchPhrase: z.string(),
bs: z.string(),
}),
});
const ApiPostSchema = z.object({
userId: z.number(),
id: z.number(),
title: z.string(),
body: z.string(),
});
const ApiCommentSchema = z.object({
postId: z.number(),
id: z.number(),
name: z.string(),
email: z.string().email(),
body: z.string(),
});
const ApiUsersSchema = z.array(ApiUserSchema);
const ApiPostsSchema = z.array(ApiPostSchema);
const ApiCommentsSchema = z.array(ApiCommentSchema);
type ApiUser = z.infer<typeof ApiUserSchema>;
type ApiPost = z.infer<typeof ApiPostSchema>;
type ApiComment = z.infer<typeof ApiCommentSchema>;
// ============================================
// TYPES
// ============================================
type Result<T, E = Error> = { success: true; data: T } | { success: false; error: E };
interface FetchError {
type: 'network' | 'http' | 'validation';
message: string;
details?: unknown;
}
interface RawData {
users: ApiUser[];
posts: ApiPost[];
comments: ApiComment[];
}
interface PostSummary {
id: number;
title: string;
commentCount: number;
}
interface UserActivity {
id: number;
name: string;
email: string;
username: string;
postCount: number;
commentCount: number;
posts: PostSummary[];
}
interface UserActivityReport {
generatedAt: Date;
totalUsers: number;
totalPosts: number;
totalComments: number;
users: UserActivity[];
}
interface ReportStatistics {
averagePostsPerUser: number;
averageCommentsPerPost: number;
mostActiveUser: UserActivity | null;
leastActiveUser: UserActivity | null;
usersWithNoPosts: number;
topPosters: UserActivity[];
}
interface PipelineOptions {
minPosts?: number;
minComments?: number;
searchQuery?: string;
includeStatistics?: boolean;
}
interface PipelineResult {
report: UserActivityReport;
filteredUsers: UserActivity[];
statistics?: ReportStatistics;
}
// ============================================
// UTILITIES
// ============================================
function ok<T>(data: T): Result<T, never> {
return { success: true, data };
}
function err<E>(error: E): Result<never, E> {
return { success: false, error };
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// ============================================
// FETCH FUNCTIONS
// ============================================
const API_BASE = 'https://jsonplaceholder.typicode.com';
async function fetchAndValidate<T>(
url: string,
schema: z.ZodSchema<T>,
retries: number = 3
): Promise<Result<T, FetchError>> {
let lastError: FetchError | undefined;
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const response = await fetch(url);
if (!response.ok) {
lastError = {
type: 'http',
message: `HTTP ${response.status}: ${response.statusText}`,
};
if (response.status >= 500 && attempt < retries) {
await sleep(1000 * attempt);
continue;
}
return err(lastError);
}
const data = await response.json();
const parsed = schema.safeParse(data);
if (!parsed.success) {
return err({
type: 'validation',
message: 'Invalid response data',
details: parsed.error.errors,
});
}
return ok(parsed.data);
} catch (error) {
lastError = {
type: 'network',
message: error instanceof Error ? error.message : 'Network error',
};
if (attempt < retries) {
await sleep(1000 * attempt);
continue;
}
}
}
return err(lastError || { type: 'network', message: 'Unknown error' });
}
async function fetchAllData(): Promise<Result<RawData, FetchError>> {
console.log('Fetching data from API...');
const [usersResult, postsResult, commentsResult] = await Promise.all([
fetchAndValidate(`${API_BASE}/users`, ApiUsersSchema),
fetchAndValidate(`${API_BASE}/posts`, ApiPostsSchema),
fetchAndValidate(`${API_BASE}/comments`, ApiCommentsSchema),
]);
if (!usersResult.success) {
return err({ ...usersResult.error, message: `Users: ${usersResult.error.message}` });
}
if (!postsResult.success) {
return err({ ...postsResult.error, message: `Posts: ${postsResult.error.message}` });
}
if (!commentsResult.success) {
return err({ ...commentsResult.error, message: `Comments: ${commentsResult.error.message}` });
}
console.log(
`Fetched: ${usersResult.data.length} users, ${postsResult.data.length} posts, ${commentsResult.data.length} comments`
);
return ok({
users: usersResult.data,
posts: postsResult.data,
comments: commentsResult.data,
});
}
// ============================================
// TRANSFORMATION
// ============================================
function transformData(raw: RawData): UserActivityReport {
console.log('Transforming data...');
const postsByUser = raw.posts.reduce(
(acc, post) => {
if (!acc[post.userId]) {
acc[post.userId] = [];
}
acc[post.userId].push(post);
return acc;
},
{} as Record<number, ApiPost[]>
);
const commentsByPost = raw.comments.reduce(
(acc, comment) => {
acc[comment.postId] = (acc[comment.postId] || 0) + 1;
return acc;
},
{} as Record<number, number>
);
const users: UserActivity[] = raw.users.map((user) => {
const userPosts = postsByUser[user.id] || [];
const posts: PostSummary[] = userPosts.map((post) => ({
id: post.id,
title: post.title,
commentCount: commentsByPost[post.id] || 0,
}));
const commentCount = posts.reduce((sum, post) => sum + post.commentCount, 0);
return {
id: user.id,
name: user.name,
email: user.email,
username: user.username,
postCount: userPosts.length,
commentCount,
posts,
};
});
users.sort((a, b) => {
const activityA = a.postCount + a.commentCount;
const activityB = b.postCount + b.commentCount;
return activityB - activityA;
});
return {
generatedAt: new Date(),
totalUsers: raw.users.length,
totalPosts: raw.posts.length,
totalComments: raw.comments.length,
users,
};
}
// ============================================
// ANALYSIS
// ============================================
function calculateStatistics(report: UserActivityReport): ReportStatistics {
const { users, totalPosts, totalComments, totalUsers } = report;
return {
averagePostsPerUser: totalUsers > 0 ? totalPosts / totalUsers : 0,
averageCommentsPerPost: totalPosts > 0 ? totalComments / totalPosts : 0,
mostActiveUser: users.length > 0 ? users[0] : null,
leastActiveUser: users.length > 0 ? users[users.length - 1] : null,
usersWithNoPosts: users.filter((u) => u.postCount === 0).length,
topPosters: users.slice(0, 5),
};
}
function filterUsersByActivity(
report: UserActivityReport,
minPosts: number = 0,
minComments: number = 0
): UserActivity[] {
return report.users.filter(
(user) => user.postCount >= minPosts && user.commentCount >= minComments
);
}
// ============================================
// PIPELINE
// ============================================
async function runPipeline(
options: PipelineOptions = {}
): Promise<Result<PipelineResult, FetchError>> {
const { minPosts = 0, minComments = 0, searchQuery, includeStatistics = true } = options;
const dataResult = await fetchAllData();
if (!dataResult.success) {
return dataResult;
}
const report = transformData(dataResult.data);
let filteredUsers = filterUsersByActivity(report, minPosts, minComments);
if (searchQuery) {
const lowerQuery = searchQuery.toLowerCase();
filteredUsers = filteredUsers.filter(
(user) =>
user.name.toLowerCase().includes(lowerQuery) ||
user.email.toLowerCase().includes(lowerQuery) ||
user.username.toLowerCase().includes(lowerQuery)
);
}
const statistics = includeStatistics ? calculateStatistics(report) : undefined;
return ok({
report,
filteredUsers,
statistics,
});
}
// ============================================
// MAIN
// ============================================
async function main(): Promise<void> {
console.log('Starting User Activity Report Pipeline...\n');
const result = await runPipeline({
minPosts: 5,
includeStatistics: true,
});
if (!result.success) {
console.error('Pipeline failed!');
console.error(`Error type: ${result.error.type}`);
console.error(`Message: ${result.error.message}`);
process.exit(1);
}
const { report, filteredUsers, statistics } = result.data;
console.log('='.repeat(60));
console.log('USER ACTIVITY REPORT');
console.log('='.repeat(60));
console.log(`\nGenerated: ${report.generatedAt.toISOString()}`);
console.log(`\nTotal Users: ${report.totalUsers}`);
console.log(`Total Posts: ${report.totalPosts}`);
console.log(`Total Comments: ${report.totalComments}`);
if (statistics) {
console.log(`\nAverage Posts per User: ${statistics.averagePostsPerUser.toFixed(1)}`);
console.log(`Average Comments per Post: ${statistics.averageCommentsPerPost.toFixed(1)}`);
console.log('\nTop 5 Posters:');
statistics.topPosters.forEach((user, i) => {
console.log(
` ${i + 1}. ${user.name} - ${user.postCount} posts, ${user.commentCount} comments`
);
});
}
console.log(`\nFiltered Users (${filteredUsers.length}):`);
filteredUsers.forEach((user) => {
console.log(` - ${user.name} (@${user.username}): ${user.postCount} posts`);
});
console.log('\n' + '='.repeat(60));
}
main().catch(console.error);
Exercises
Exercise 1: Add Caching
Modify the pipeline to cache API responses to avoid repeated fetches:
// Implement a simple cache
interface Cache<T> {
get(key: string): T | undefined;
set(key: string, value: T, ttlMs: number): void;
}
// Modify fetchAndValidate to use the cache
Solution
interface CacheEntry<T> {
value: T;
expiresAt: number;
}
class SimpleCache<T> implements Cache<T> {
private cache = new Map<string, CacheEntry<T>>();
get(key: string): T | undefined {
const entry = this.cache.get(key);
if (!entry) {
return undefined;
}
if (Date.now() > entry.expiresAt) {
this.cache.delete(key);
return undefined;
}
return entry.value;
}
set(key: string, value: T, ttlMs: number): void {
this.cache.set(key, {
value,
expiresAt: Date.now() + ttlMs,
});
}
clear(): void {
this.cache.clear();
}
}
const cache = new SimpleCache<unknown>();
const CACHE_TTL = 5 * 60 * 1000; // 5 minutes
async function fetchAndValidateWithCache<T>(
url: string,
schema: z.ZodSchema<T>,
retries: number = 3
): Promise<Result<T, FetchError>> {
// Check cache first
const cached = cache.get(url) as T | undefined;
if (cached !== undefined) {
console.log(`Cache hit: ${url}`);
return ok(cached);
}
console.log(`Cache miss: ${url}`);
const result = await fetchAndValidate(url, schema, retries);
if (result.success) {
cache.set(url, result.data, CACHE_TTL);
}
return result;
}
Exercise 2: Add Progress Reporting
Add progress callbacks to the pipeline:
interface ProgressCallback {
(stage: string, progress: number, total: number): void;
}
async function runPipelineWithProgress(
options: PipelineOptions,
onProgress: ProgressCallback
): Promise<Result<PipelineResult, FetchError>> {
// Report progress at each stage
}
Solution
interface ProgressCallback {
(stage: string, progress: number, total: number): void;
}
async function runPipelineWithProgress(
options: PipelineOptions = {},
onProgress?: ProgressCallback
): Promise<Result<PipelineResult, FetchError>> {
const report = (stage: string, progress: number, total: number) => {
if (onProgress) {
onProgress(stage, progress, total);
}
};
// Stage 1: Fetch users
report('Fetching', 0, 3);
const usersResult = await fetchAndValidate(`${API_BASE}/users`, ApiUsersSchema);
if (!usersResult.success) return err(usersResult.error);
report('Fetching', 1, 3);
const postsResult = await fetchAndValidate(`${API_BASE}/posts`, ApiPostsSchema);
if (!postsResult.success) return err(postsResult.error);
report('Fetching', 2, 3);
const commentsResult = await fetchAndValidate(`${API_BASE}/comments`, ApiCommentsSchema);
if (!commentsResult.success) return err(commentsResult.error);
report('Fetching', 3, 3);
// Stage 2: Transform
report('Transforming', 0, 1);
const rawData = {
users: usersResult.data,
posts: postsResult.data,
comments: commentsResult.data,
};
const reportData = transformData(rawData);
report('Transforming', 1, 1);
// Stage 3: Filter
report('Filtering', 0, 1);
let filteredUsers = filterUsersByActivity(
reportData,
options.minPosts || 0,
options.minComments || 0
);
if (options.searchQuery) {
const lowerQuery = options.searchQuery.toLowerCase();
filteredUsers = filteredUsers.filter(
(user) =>
user.name.toLowerCase().includes(lowerQuery) ||
user.email.toLowerCase().includes(lowerQuery)
);
}
report('Filtering', 1, 1);
// Stage 4: Statistics
report('Calculating', 0, 1);
const statistics =
options.includeStatistics !== false ? calculateStatistics(reportData) : undefined;
report('Calculating', 1, 1);
return ok({
report: reportData,
filteredUsers,
statistics,
});
}
// Usage
const result = await runPipelineWithProgress({ minPosts: 5 }, (stage, progress, total) => {
const percent = Math.round((progress / total) * 100);
console.log(`[${stage}] ${progress}/${total} (${percent}%)`);
});
Exercise 3: Export to Different Formats
Add functions to export the report in different formats:
function exportToCSV(report: UserActivityReport): string;
function exportToMarkdown(report: UserActivityReport): string;
Solution
function exportToCSV(report: UserActivityReport): string {
const headers = ['ID', 'Name', 'Username', 'Email', 'Posts', 'Comments'];
const rows = report.users.map((user) => [
user.id.toString(),
`"${user.name}"`,
user.username,
user.email,
user.postCount.toString(),
user.commentCount.toString(),
]);
return [headers.join(','), ...rows.map((row) => row.join(','))].join('\n');
}
function exportToMarkdown(report: UserActivityReport): string {
const lines: string[] = [
'# User Activity Report',
'',
`Generated: ${report.generatedAt.toISOString()}`,
'',
'## Summary',
'',
`- **Total Users**: ${report.totalUsers}`,
`- **Total Posts**: ${report.totalPosts}`,
`- **Total Comments**: ${report.totalComments}`,
'',
'## Users',
'',
'| Name | Username | Email | Posts | Comments |',
'|------|----------|-------|-------|----------|',
];
report.users.forEach((user) => {
lines.push(
`| ${user.name} | @${user.username} | ${user.email} | ${user.postCount} | ${user.commentCount} |`
);
});
return lines.join('\n');
}
// Usage
const csvOutput = exportToCSV(report);
const mdOutput = exportToMarkdown(report);
Key Takeaways
- Structure your pipeline in clear stages: fetch, validate, transform, analyze
- Use schemas to validate all external data at boundaries
- Return Results instead of throwing to make error handling explicit
- Fetch in parallel when data sources are independent
- Transform data to match your application's needs, not API structure
- Separate concerns: fetching, validation, transformation, and display
- Make pipelines configurable with options objects
- Add observability with progress reporting and logging
Resources
| Resource | Type | Level |
|---|---|---|
| JSONPlaceholder | API | Beginner |
| Zod Documentation | Documentation | Intermediate |
| TypeScript Handbook | Documentation | Beginner |
| Node.js Streams | Documentation | Advanced |
Module Complete
Congratulations! You have completed Module 5: Working with Data. You now know how to:
- Parse and generate JSON safely
- Transform data using map, filter, and reduce
- Validate external data with Zod schemas
- Handle errors gracefully with custom errors and the Result pattern
- Build complete data processing pipelines
Continue your learning journey with the next module: