From Zero to AI

Lesson 5.5: Practice - Data Pipeline

Duration: 60 minutes

Learning Objectives

By the end of this lesson, you will be able to:

  1. Design and implement a complete data processing pipeline
  2. Combine fetching, validation, transformation, and error handling
  3. Process data from multiple API endpoints
  4. Build a reusable pipeline architecture
  5. Handle real-world data processing scenarios

Introduction

In this practice lesson, we will build a complete data processing pipeline that fetches data from an API, validates it, transforms it, and produces a clean output. This combines everything we learned in this module: JSON parsing, data transformation, Zod validation, and error handling.

Our goal: Build a User Activity Report that combines user data with their posts and comments from the JSONPlaceholder API.


Project Overview

Data Sources

We will use three endpoints from JSONPlaceholder:

  • GET /users - List of users
  • GET /posts - All posts (linked to users via userId)
  • GET /comments - All comments (linked to posts via postId)

Final Output

interface UserActivityReport {
  generatedAt: Date;
  totalUsers: number;
  totalPosts: number;
  totalComments: number;
  users: UserActivity[];
}

interface UserActivity {
  id: number;
  name: string;
  email: string;
  username: string;
  postCount: number;
  commentCount: number;
  posts: PostSummary[];
}

interface PostSummary {
  id: number;
  title: string;
  commentCount: number;
}

Step 1: Define Schemas

First, let us define Zod schemas for the API responses.

import { z } from 'zod';

// User schema (from API)
const ApiUserSchema = z.object({
  id: z.number(),
  name: z.string(),
  username: z.string(),
  email: z.string().email(),
  address: z.object({
    street: z.string(),
    suite: z.string(),
    city: z.string(),
    zipcode: z.string(),
    geo: z.object({
      lat: z.string(),
      lng: z.string(),
    }),
  }),
  phone: z.string(),
  website: z.string(),
  company: z.object({
    name: z.string(),
    catchPhrase: z.string(),
    bs: z.string(),
  }),
});

// Post schema (from API)
const ApiPostSchema = z.object({
  userId: z.number(),
  id: z.number(),
  title: z.string(),
  body: z.string(),
});

// Comment schema (from API)
const ApiCommentSchema = z.object({
  postId: z.number(),
  id: z.number(),
  name: z.string(),
  email: z.string().email(),
  body: z.string(),
});

// Array schemas
const ApiUsersSchema = z.array(ApiUserSchema);
const ApiPostsSchema = z.array(ApiPostSchema);
const ApiCommentsSchema = z.array(ApiCommentSchema);

// Infer types
type ApiUser = z.infer<typeof ApiUserSchema>;
type ApiPost = z.infer<typeof ApiPostSchema>;
type ApiComment = z.infer<typeof ApiCommentSchema>;

Step 2: Create Fetch Utilities

Build reusable fetch functions with error handling.

// Result type for error handling
type Result<T, E = Error> = { success: true; data: T } | { success: false; error: E };

interface FetchError {
  type: 'network' | 'http' | 'validation';
  message: string;
  details?: unknown;
}

function ok<T>(data: T): Result<T, never> {
  return { success: true, data };
}

function err<E>(error: E): Result<never, E> {
  return { success: false, error };
}

// Generic fetch with validation
async function fetchAndValidate<T>(
  url: string,
  schema: z.ZodSchema<T>,
  retries: number = 3
): Promise<Result<T, FetchError>> {
  let lastError: FetchError | undefined;

  for (let attempt = 1; attempt <= retries; attempt++) {
    try {
      const response = await fetch(url);

      if (!response.ok) {
        lastError = {
          type: 'http',
          message: `HTTP ${response.status}: ${response.statusText}`,
        };

        // Retry on server errors
        if (response.status >= 500 && attempt < retries) {
          await sleep(1000 * attempt);
          continue;
        }

        return err(lastError);
      }

      const data = await response.json();
      const parsed = schema.safeParse(data);

      if (!parsed.success) {
        return err({
          type: 'validation',
          message: 'Invalid response data',
          details: parsed.error.errors,
        });
      }

      return ok(parsed.data);
    } catch (error) {
      lastError = {
        type: 'network',
        message: error instanceof Error ? error.message : 'Network error',
      };

      if (attempt < retries) {
        await sleep(1000 * attempt);
        continue;
      }
    }
  }

  return err(lastError || { type: 'network', message: 'Unknown error' });
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

Step 3: Fetch All Data

Create functions to fetch all required data in parallel.

const API_BASE = 'https://jsonplaceholder.typicode.com';

interface RawData {
  users: ApiUser[];
  posts: ApiPost[];
  comments: ApiComment[];
}

async function fetchAllData(): Promise<Result<RawData, FetchError>> {
  console.log('Fetching data from API...');

  // Fetch all data in parallel
  const [usersResult, postsResult, commentsResult] = await Promise.all([
    fetchAndValidate(`${API_BASE}/users`, ApiUsersSchema),
    fetchAndValidate(`${API_BASE}/posts`, ApiPostsSchema),
    fetchAndValidate(`${API_BASE}/comments`, ApiCommentsSchema),
  ]);

  // Check for errors
  if (!usersResult.success) {
    return err({ ...usersResult.error, message: `Users: ${usersResult.error.message}` });
  }

  if (!postsResult.success) {
    return err({ ...postsResult.error, message: `Posts: ${postsResult.error.message}` });
  }

  if (!commentsResult.success) {
    return err({ ...commentsResult.error, message: `Comments: ${commentsResult.error.message}` });
  }

  console.log(
    `Fetched: ${usersResult.data.length} users, ${postsResult.data.length} posts, ${commentsResult.data.length} comments`
  );

  return ok({
    users: usersResult.data,
    posts: postsResult.data,
    comments: commentsResult.data,
  });
}

Step 4: Transform Data

Transform the raw API data into our desired report format.

interface PostSummary {
  id: number;
  title: string;
  commentCount: number;
}

interface UserActivity {
  id: number;
  name: string;
  email: string;
  username: string;
  postCount: number;
  commentCount: number;
  posts: PostSummary[];
}

interface UserActivityReport {
  generatedAt: Date;
  totalUsers: number;
  totalPosts: number;
  totalComments: number;
  users: UserActivity[];
}

function transformData(raw: RawData): UserActivityReport {
  console.log('Transforming data...');

  // Group posts by userId
  const postsByUser = raw.posts.reduce(
    (acc, post) => {
      if (!acc[post.userId]) {
        acc[post.userId] = [];
      }
      acc[post.userId].push(post);
      return acc;
    },
    {} as Record<number, ApiPost[]>
  );

  // Count comments per post
  const commentsByPost = raw.comments.reduce(
    (acc, comment) => {
      acc[comment.postId] = (acc[comment.postId] || 0) + 1;
      return acc;
    },
    {} as Record<number, number>
  );

  // Transform users
  const users: UserActivity[] = raw.users.map((user) => {
    const userPosts = postsByUser[user.id] || [];

    // Create post summaries with comment counts
    const posts: PostSummary[] = userPosts.map((post) => ({
      id: post.id,
      title: post.title,
      commentCount: commentsByPost[post.id] || 0,
    }));

    // Calculate total comments for this user
    const commentCount = posts.reduce((sum, post) => sum + post.commentCount, 0);

    return {
      id: user.id,
      name: user.name,
      email: user.email,
      username: user.username,
      postCount: userPosts.length,
      commentCount,
      posts,
    };
  });

  // Sort users by activity (posts + comments)
  users.sort((a, b) => {
    const activityA = a.postCount + a.commentCount;
    const activityB = b.postCount + b.commentCount;
    return activityB - activityA;
  });

  return {
    generatedAt: new Date(),
    totalUsers: raw.users.length,
    totalPosts: raw.posts.length,
    totalComments: raw.comments.length,
    users,
  };
}

Step 5: Add Filtering and Statistics

Add utility functions to analyze the report.

interface ReportStatistics {
  averagePostsPerUser: number;
  averageCommentsPerPost: number;
  mostActiveUser: UserActivity | null;
  leastActiveUser: UserActivity | null;
  usersWithNoPosts: number;
  topPosters: UserActivity[];
}

function calculateStatistics(report: UserActivityReport): ReportStatistics {
  const { users, totalPosts, totalComments, totalUsers } = report;

  const averagePostsPerUser = totalUsers > 0 ? totalPosts / totalUsers : 0;
  const averageCommentsPerPost = totalPosts > 0 ? totalComments / totalPosts : 0;

  // Users are already sorted by activity
  const mostActiveUser = users.length > 0 ? users[0] : null;
  const leastActiveUser = users.length > 0 ? users[users.length - 1] : null;

  const usersWithNoPosts = users.filter((u) => u.postCount === 0).length;
  const topPosters = users.slice(0, 5);

  return {
    averagePostsPerUser,
    averageCommentsPerPost,
    mostActiveUser,
    leastActiveUser,
    usersWithNoPosts,
    topPosters,
  };
}

function filterUsersByActivity(
  report: UserActivityReport,
  minPosts: number = 0,
  minComments: number = 0
): UserActivity[] {
  return report.users.filter(
    (user) => user.postCount >= minPosts && user.commentCount >= minComments
  );
}

function searchUsers(report: UserActivityReport, query: string): UserActivity[] {
  const lowerQuery = query.toLowerCase();

  return report.users.filter(
    (user) =>
      user.name.toLowerCase().includes(lowerQuery) ||
      user.email.toLowerCase().includes(lowerQuery) ||
      user.username.toLowerCase().includes(lowerQuery)
  );
}

Step 6: Build the Pipeline

Combine everything into a single pipeline function.

interface PipelineOptions {
  minPosts?: number;
  minComments?: number;
  searchQuery?: string;
  includeStatistics?: boolean;
}

interface PipelineResult {
  report: UserActivityReport;
  filteredUsers: UserActivity[];
  statistics?: ReportStatistics;
}

async function runPipeline(
  options: PipelineOptions = {}
): Promise<Result<PipelineResult, FetchError>> {
  const { minPosts = 0, minComments = 0, searchQuery, includeStatistics = true } = options;

  // Step 1: Fetch data
  const dataResult = await fetchAllData();

  if (!dataResult.success) {
    return dataResult;
  }

  // Step 2: Transform
  const report = transformData(dataResult.data);

  // Step 3: Filter
  let filteredUsers = filterUsersByActivity(report, minPosts, minComments);

  if (searchQuery) {
    filteredUsers = filteredUsers.filter(
      (user) =>
        user.name.toLowerCase().includes(searchQuery.toLowerCase()) ||
        user.email.toLowerCase().includes(searchQuery.toLowerCase()) ||
        user.username.toLowerCase().includes(searchQuery.toLowerCase())
    );
  }

  // Step 4: Statistics (optional)
  const statistics = includeStatistics ? calculateStatistics(report) : undefined;

  return ok({
    report,
    filteredUsers,
    statistics,
  });
}

Step 7: Display Results

Create functions to format and display the results.

function formatReport(result: PipelineResult): string {
  const { report, filteredUsers, statistics } = result;

  const lines: string[] = [
    '='.repeat(60),
    'USER ACTIVITY REPORT',
    '='.repeat(60),
    '',
    `Generated: ${report.generatedAt.toISOString()}`,
    '',
    'SUMMARY',
    '-'.repeat(40),
    `Total Users: ${report.totalUsers}`,
    `Total Posts: ${report.totalPosts}`,
    `Total Comments: ${report.totalComments}`,
    '',
  ];

  if (statistics) {
    lines.push(
      'STATISTICS',
      '-'.repeat(40),
      `Average Posts per User: ${statistics.averagePostsPerUser.toFixed(1)}`,
      `Average Comments per Post: ${statistics.averageCommentsPerPost.toFixed(1)}`,
      `Users with No Posts: ${statistics.usersWithNoPosts}`,
      ''
    );

    if (statistics.mostActiveUser) {
      lines.push(
        `Most Active User: ${statistics.mostActiveUser.name}`,
        `  Posts: ${statistics.mostActiveUser.postCount}, Comments: ${statistics.mostActiveUser.commentCount}`,
        ''
      );
    }

    lines.push('TOP 5 POSTERS', '-'.repeat(40));

    statistics.topPosters.forEach((user, index) => {
      lines.push(
        `${index + 1}. ${user.name} (@${user.username})`,
        `   Posts: ${user.postCount}, Comments received: ${user.commentCount}`
      );
    });

    lines.push('');
  }

  lines.push(
    'FILTERED USERS',
    '-'.repeat(40),
    `Showing ${filteredUsers.length} of ${report.totalUsers} users`,
    ''
  );

  filteredUsers.forEach((user) => {
    lines.push(
      `${user.name} (@${user.username})`,
      `  Email: ${user.email}`,
      `  Posts: ${user.postCount}, Comments received: ${user.commentCount}`
    );

    if (user.posts.length > 0) {
      lines.push('  Recent posts:');
      user.posts.slice(0, 3).forEach((post) => {
        lines.push(`    - ${post.title.substring(0, 50)}... (${post.commentCount} comments)`);
      });
    }

    lines.push('');
  });

  lines.push('='.repeat(60));

  return lines.join('\n');
}

Step 8: Main Function

Put it all together.

async function main(): Promise<void> {
  console.log('Starting User Activity Report Pipeline...\n');

  const result = await runPipeline({
    minPosts: 5, // Only users with 5+ posts
    includeStatistics: true,
  });

  if (!result.success) {
    console.error('Pipeline failed!');
    console.error(`Error type: ${result.error.type}`);
    console.error(`Message: ${result.error.message}`);

    if (result.error.details) {
      console.error('Details:', JSON.stringify(result.error.details, null, 2));
    }

    process.exit(1);
  }

  const output = formatReport(result.data);
  console.log(output);

  // Export as JSON
  const jsonOutput = JSON.stringify(
    {
      generatedAt: result.data.report.generatedAt,
      statistics: result.data.statistics,
      users: result.data.filteredUsers,
    },
    null,
    2
  );

  console.log('\nJSON Output (first 500 chars):');
  console.log(jsonOutput.substring(0, 500) + '...');
}

main().catch(console.error);

Complete Code

Here is the complete pipeline code in one file:

import { z } from 'zod';

// ============================================
// SCHEMAS
// ============================================

const ApiUserSchema = z.object({
  id: z.number(),
  name: z.string(),
  username: z.string(),
  email: z.string().email(),
  address: z.object({
    street: z.string(),
    suite: z.string(),
    city: z.string(),
    zipcode: z.string(),
    geo: z.object({
      lat: z.string(),
      lng: z.string(),
    }),
  }),
  phone: z.string(),
  website: z.string(),
  company: z.object({
    name: z.string(),
    catchPhrase: z.string(),
    bs: z.string(),
  }),
});

const ApiPostSchema = z.object({
  userId: z.number(),
  id: z.number(),
  title: z.string(),
  body: z.string(),
});

const ApiCommentSchema = z.object({
  postId: z.number(),
  id: z.number(),
  name: z.string(),
  email: z.string().email(),
  body: z.string(),
});

const ApiUsersSchema = z.array(ApiUserSchema);
const ApiPostsSchema = z.array(ApiPostSchema);
const ApiCommentsSchema = z.array(ApiCommentSchema);

type ApiUser = z.infer<typeof ApiUserSchema>;
type ApiPost = z.infer<typeof ApiPostSchema>;
type ApiComment = z.infer<typeof ApiCommentSchema>;

// ============================================
// TYPES
// ============================================

type Result<T, E = Error> = { success: true; data: T } | { success: false; error: E };

interface FetchError {
  type: 'network' | 'http' | 'validation';
  message: string;
  details?: unknown;
}

interface RawData {
  users: ApiUser[];
  posts: ApiPost[];
  comments: ApiComment[];
}

interface PostSummary {
  id: number;
  title: string;
  commentCount: number;
}

interface UserActivity {
  id: number;
  name: string;
  email: string;
  username: string;
  postCount: number;
  commentCount: number;
  posts: PostSummary[];
}

interface UserActivityReport {
  generatedAt: Date;
  totalUsers: number;
  totalPosts: number;
  totalComments: number;
  users: UserActivity[];
}

interface ReportStatistics {
  averagePostsPerUser: number;
  averageCommentsPerPost: number;
  mostActiveUser: UserActivity | null;
  leastActiveUser: UserActivity | null;
  usersWithNoPosts: number;
  topPosters: UserActivity[];
}

interface PipelineOptions {
  minPosts?: number;
  minComments?: number;
  searchQuery?: string;
  includeStatistics?: boolean;
}

interface PipelineResult {
  report: UserActivityReport;
  filteredUsers: UserActivity[];
  statistics?: ReportStatistics;
}

// ============================================
// UTILITIES
// ============================================

function ok<T>(data: T): Result<T, never> {
  return { success: true, data };
}

function err<E>(error: E): Result<never, E> {
  return { success: false, error };
}

function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

// ============================================
// FETCH FUNCTIONS
// ============================================

const API_BASE = 'https://jsonplaceholder.typicode.com';

async function fetchAndValidate<T>(
  url: string,
  schema: z.ZodSchema<T>,
  retries: number = 3
): Promise<Result<T, FetchError>> {
  let lastError: FetchError | undefined;

  for (let attempt = 1; attempt <= retries; attempt++) {
    try {
      const response = await fetch(url);

      if (!response.ok) {
        lastError = {
          type: 'http',
          message: `HTTP ${response.status}: ${response.statusText}`,
        };

        if (response.status >= 500 && attempt < retries) {
          await sleep(1000 * attempt);
          continue;
        }

        return err(lastError);
      }

      const data = await response.json();
      const parsed = schema.safeParse(data);

      if (!parsed.success) {
        return err({
          type: 'validation',
          message: 'Invalid response data',
          details: parsed.error.errors,
        });
      }

      return ok(parsed.data);
    } catch (error) {
      lastError = {
        type: 'network',
        message: error instanceof Error ? error.message : 'Network error',
      };

      if (attempt < retries) {
        await sleep(1000 * attempt);
        continue;
      }
    }
  }

  return err(lastError || { type: 'network', message: 'Unknown error' });
}

async function fetchAllData(): Promise<Result<RawData, FetchError>> {
  console.log('Fetching data from API...');

  const [usersResult, postsResult, commentsResult] = await Promise.all([
    fetchAndValidate(`${API_BASE}/users`, ApiUsersSchema),
    fetchAndValidate(`${API_BASE}/posts`, ApiPostsSchema),
    fetchAndValidate(`${API_BASE}/comments`, ApiCommentsSchema),
  ]);

  if (!usersResult.success) {
    return err({ ...usersResult.error, message: `Users: ${usersResult.error.message}` });
  }

  if (!postsResult.success) {
    return err({ ...postsResult.error, message: `Posts: ${postsResult.error.message}` });
  }

  if (!commentsResult.success) {
    return err({ ...commentsResult.error, message: `Comments: ${commentsResult.error.message}` });
  }

  console.log(
    `Fetched: ${usersResult.data.length} users, ${postsResult.data.length} posts, ${commentsResult.data.length} comments`
  );

  return ok({
    users: usersResult.data,
    posts: postsResult.data,
    comments: commentsResult.data,
  });
}

// ============================================
// TRANSFORMATION
// ============================================

function transformData(raw: RawData): UserActivityReport {
  console.log('Transforming data...');

  const postsByUser = raw.posts.reduce(
    (acc, post) => {
      if (!acc[post.userId]) {
        acc[post.userId] = [];
      }
      acc[post.userId].push(post);
      return acc;
    },
    {} as Record<number, ApiPost[]>
  );

  const commentsByPost = raw.comments.reduce(
    (acc, comment) => {
      acc[comment.postId] = (acc[comment.postId] || 0) + 1;
      return acc;
    },
    {} as Record<number, number>
  );

  const users: UserActivity[] = raw.users.map((user) => {
    const userPosts = postsByUser[user.id] || [];

    const posts: PostSummary[] = userPosts.map((post) => ({
      id: post.id,
      title: post.title,
      commentCount: commentsByPost[post.id] || 0,
    }));

    const commentCount = posts.reduce((sum, post) => sum + post.commentCount, 0);

    return {
      id: user.id,
      name: user.name,
      email: user.email,
      username: user.username,
      postCount: userPosts.length,
      commentCount,
      posts,
    };
  });

  users.sort((a, b) => {
    const activityA = a.postCount + a.commentCount;
    const activityB = b.postCount + b.commentCount;
    return activityB - activityA;
  });

  return {
    generatedAt: new Date(),
    totalUsers: raw.users.length,
    totalPosts: raw.posts.length,
    totalComments: raw.comments.length,
    users,
  };
}

// ============================================
// ANALYSIS
// ============================================

function calculateStatistics(report: UserActivityReport): ReportStatistics {
  const { users, totalPosts, totalComments, totalUsers } = report;

  return {
    averagePostsPerUser: totalUsers > 0 ? totalPosts / totalUsers : 0,
    averageCommentsPerPost: totalPosts > 0 ? totalComments / totalPosts : 0,
    mostActiveUser: users.length > 0 ? users[0] : null,
    leastActiveUser: users.length > 0 ? users[users.length - 1] : null,
    usersWithNoPosts: users.filter((u) => u.postCount === 0).length,
    topPosters: users.slice(0, 5),
  };
}

function filterUsersByActivity(
  report: UserActivityReport,
  minPosts: number = 0,
  minComments: number = 0
): UserActivity[] {
  return report.users.filter(
    (user) => user.postCount >= minPosts && user.commentCount >= minComments
  );
}

// ============================================
// PIPELINE
// ============================================

async function runPipeline(
  options: PipelineOptions = {}
): Promise<Result<PipelineResult, FetchError>> {
  const { minPosts = 0, minComments = 0, searchQuery, includeStatistics = true } = options;

  const dataResult = await fetchAllData();

  if (!dataResult.success) {
    return dataResult;
  }

  const report = transformData(dataResult.data);

  let filteredUsers = filterUsersByActivity(report, minPosts, minComments);

  if (searchQuery) {
    const lowerQuery = searchQuery.toLowerCase();
    filteredUsers = filteredUsers.filter(
      (user) =>
        user.name.toLowerCase().includes(lowerQuery) ||
        user.email.toLowerCase().includes(lowerQuery) ||
        user.username.toLowerCase().includes(lowerQuery)
    );
  }

  const statistics = includeStatistics ? calculateStatistics(report) : undefined;

  return ok({
    report,
    filteredUsers,
    statistics,
  });
}

// ============================================
// MAIN
// ============================================

async function main(): Promise<void> {
  console.log('Starting User Activity Report Pipeline...\n');

  const result = await runPipeline({
    minPosts: 5,
    includeStatistics: true,
  });

  if (!result.success) {
    console.error('Pipeline failed!');
    console.error(`Error type: ${result.error.type}`);
    console.error(`Message: ${result.error.message}`);
    process.exit(1);
  }

  const { report, filteredUsers, statistics } = result.data;

  console.log('='.repeat(60));
  console.log('USER ACTIVITY REPORT');
  console.log('='.repeat(60));
  console.log(`\nGenerated: ${report.generatedAt.toISOString()}`);
  console.log(`\nTotal Users: ${report.totalUsers}`);
  console.log(`Total Posts: ${report.totalPosts}`);
  console.log(`Total Comments: ${report.totalComments}`);

  if (statistics) {
    console.log(`\nAverage Posts per User: ${statistics.averagePostsPerUser.toFixed(1)}`);
    console.log(`Average Comments per Post: ${statistics.averageCommentsPerPost.toFixed(1)}`);

    console.log('\nTop 5 Posters:');
    statistics.topPosters.forEach((user, i) => {
      console.log(
        `  ${i + 1}. ${user.name} - ${user.postCount} posts, ${user.commentCount} comments`
      );
    });
  }

  console.log(`\nFiltered Users (${filteredUsers.length}):`);
  filteredUsers.forEach((user) => {
    console.log(`  - ${user.name} (@${user.username}): ${user.postCount} posts`);
  });

  console.log('\n' + '='.repeat(60));
}

main().catch(console.error);

Exercises

Exercise 1: Add Caching

Modify the pipeline to cache API responses to avoid repeated fetches:

// Implement a simple cache
interface Cache<T> {
  get(key: string): T | undefined;
  set(key: string, value: T, ttlMs: number): void;
}

// Modify fetchAndValidate to use the cache
Solution
interface CacheEntry<T> {
  value: T;
  expiresAt: number;
}

class SimpleCache<T> implements Cache<T> {
  private cache = new Map<string, CacheEntry<T>>();

  get(key: string): T | undefined {
    const entry = this.cache.get(key);

    if (!entry) {
      return undefined;
    }

    if (Date.now() > entry.expiresAt) {
      this.cache.delete(key);
      return undefined;
    }

    return entry.value;
  }

  set(key: string, value: T, ttlMs: number): void {
    this.cache.set(key, {
      value,
      expiresAt: Date.now() + ttlMs,
    });
  }

  clear(): void {
    this.cache.clear();
  }
}

const cache = new SimpleCache<unknown>();
const CACHE_TTL = 5 * 60 * 1000; // 5 minutes

async function fetchAndValidateWithCache<T>(
  url: string,
  schema: z.ZodSchema<T>,
  retries: number = 3
): Promise<Result<T, FetchError>> {
  // Check cache first
  const cached = cache.get(url) as T | undefined;
  if (cached !== undefined) {
    console.log(`Cache hit: ${url}`);
    return ok(cached);
  }

  console.log(`Cache miss: ${url}`);

  const result = await fetchAndValidate(url, schema, retries);

  if (result.success) {
    cache.set(url, result.data, CACHE_TTL);
  }

  return result;
}

Exercise 2: Add Progress Reporting

Add progress callbacks to the pipeline:

interface ProgressCallback {
  (stage: string, progress: number, total: number): void;
}

async function runPipelineWithProgress(
  options: PipelineOptions,
  onProgress: ProgressCallback
): Promise<Result<PipelineResult, FetchError>> {
  // Report progress at each stage
}
Solution
interface ProgressCallback {
  (stage: string, progress: number, total: number): void;
}

async function runPipelineWithProgress(
  options: PipelineOptions = {},
  onProgress?: ProgressCallback
): Promise<Result<PipelineResult, FetchError>> {
  const report = (stage: string, progress: number, total: number) => {
    if (onProgress) {
      onProgress(stage, progress, total);
    }
  };

  // Stage 1: Fetch users
  report('Fetching', 0, 3);
  const usersResult = await fetchAndValidate(`${API_BASE}/users`, ApiUsersSchema);
  if (!usersResult.success) return err(usersResult.error);

  report('Fetching', 1, 3);
  const postsResult = await fetchAndValidate(`${API_BASE}/posts`, ApiPostsSchema);
  if (!postsResult.success) return err(postsResult.error);

  report('Fetching', 2, 3);
  const commentsResult = await fetchAndValidate(`${API_BASE}/comments`, ApiCommentsSchema);
  if (!commentsResult.success) return err(commentsResult.error);

  report('Fetching', 3, 3);

  // Stage 2: Transform
  report('Transforming', 0, 1);
  const rawData = {
    users: usersResult.data,
    posts: postsResult.data,
    comments: commentsResult.data,
  };
  const reportData = transformData(rawData);
  report('Transforming', 1, 1);

  // Stage 3: Filter
  report('Filtering', 0, 1);
  let filteredUsers = filterUsersByActivity(
    reportData,
    options.minPosts || 0,
    options.minComments || 0
  );

  if (options.searchQuery) {
    const lowerQuery = options.searchQuery.toLowerCase();
    filteredUsers = filteredUsers.filter(
      (user) =>
        user.name.toLowerCase().includes(lowerQuery) ||
        user.email.toLowerCase().includes(lowerQuery)
    );
  }
  report('Filtering', 1, 1);

  // Stage 4: Statistics
  report('Calculating', 0, 1);
  const statistics =
    options.includeStatistics !== false ? calculateStatistics(reportData) : undefined;
  report('Calculating', 1, 1);

  return ok({
    report: reportData,
    filteredUsers,
    statistics,
  });
}

// Usage
const result = await runPipelineWithProgress({ minPosts: 5 }, (stage, progress, total) => {
  const percent = Math.round((progress / total) * 100);
  console.log(`[${stage}] ${progress}/${total} (${percent}%)`);
});

Exercise 3: Export to Different Formats

Add functions to export the report in different formats:

function exportToCSV(report: UserActivityReport): string;
function exportToMarkdown(report: UserActivityReport): string;
Solution
function exportToCSV(report: UserActivityReport): string {
  const headers = ['ID', 'Name', 'Username', 'Email', 'Posts', 'Comments'];
  const rows = report.users.map((user) => [
    user.id.toString(),
    `"${user.name}"`,
    user.username,
    user.email,
    user.postCount.toString(),
    user.commentCount.toString(),
  ]);

  return [headers.join(','), ...rows.map((row) => row.join(','))].join('\n');
}

function exportToMarkdown(report: UserActivityReport): string {
  const lines: string[] = [
    '# User Activity Report',
    '',
    `Generated: ${report.generatedAt.toISOString()}`,
    '',
    '## Summary',
    '',
    `- **Total Users**: ${report.totalUsers}`,
    `- **Total Posts**: ${report.totalPosts}`,
    `- **Total Comments**: ${report.totalComments}`,
    '',
    '## Users',
    '',
    '| Name | Username | Email | Posts | Comments |',
    '|------|----------|-------|-------|----------|',
  ];

  report.users.forEach((user) => {
    lines.push(
      `| ${user.name} | @${user.username} | ${user.email} | ${user.postCount} | ${user.commentCount} |`
    );
  });

  return lines.join('\n');
}

// Usage
const csvOutput = exportToCSV(report);
const mdOutput = exportToMarkdown(report);

Key Takeaways

  1. Structure your pipeline in clear stages: fetch, validate, transform, analyze
  2. Use schemas to validate all external data at boundaries
  3. Return Results instead of throwing to make error handling explicit
  4. Fetch in parallel when data sources are independent
  5. Transform data to match your application's needs, not API structure
  6. Separate concerns: fetching, validation, transformation, and display
  7. Make pipelines configurable with options objects
  8. Add observability with progress reporting and logging

Resources

Resource Type Level
JSONPlaceholder API Beginner
Zod Documentation Documentation Intermediate
TypeScript Handbook Documentation Beginner
Node.js Streams Documentation Advanced

Module Complete

Congratulations! You have completed Module 5: Working with Data. You now know how to:

  • Parse and generate JSON safely
  • Transform data using map, filter, and reduce
  • Validate external data with Zod schemas
  • Handle errors gracefully with custom errors and the Result pattern
  • Build complete data processing pipelines

Continue your learning journey with the next module:

Module 6: Authentication