Claude Skills Guide

Claude Code MongoDB Aggregation Pipeline Workflow Guide

MongoDB’s aggregation framework is one of the most powerful tools for data analysis and transformation in the NoSQL world. When combined with Claude Code’s capabilities, you can build sophisticated data pipelines that handle complex transformations, analytics, and real-time processing. This guide walks you through practical workflows for working with MongoDB aggregation pipelines using Claude Code.

Understanding Aggregation Pipelines

Aggregation pipelines process documents through a series of stages, where each stage transforms the data and passes the results to the next. This approach is similar to a production line where each worker performs a specific task before passing the work to the next person.

Basic Pipeline Structure

The aggregation pipeline is an array of stages, each beginning with a $ operator:

db.collection.aggregate([
  { $match: { status: "active" } },      // Filter documents
  { $group: {                             // Group by field
      _id: "$category",
      total: { $sum: "$amount" }
    }
  },
  { $sort: { total: -1 } }                // Sort results
]);

Each stage in the pipeline performs a specific operation:

Setting Up Your MongoDB Connection

Before building aggregation workflows, establish a reliable connection to your MongoDB instance using Claude Code:

// Connection configuration
const mongoConfig = {
  uri: process.env.MONGODB_URI || "mongodb://localhost:27017",
  database: "analytics",
  options: {
    maxPoolSize: 10,
    serverSelectionTimeoutMS: 5000,
    socketTimeoutMS: 45000
  }
};

// Using the official MongoDB driver
import { MongoClient } from "mongodb";

async function connectToMongoDB() {
  const client = new MongoClient(mongoConfig.uri, mongoConfig.options);
  await client.connect();
  console.log("Connected to MongoDB");
  return client.db(mongoConfig.database);
}

Building Complex Aggregation Workflows

Financial Analytics Pipeline

Let’s build a comprehensive aggregation pipeline for financial analytics:

async function runFinancialAnalytics(db, startDate, endDate) {
  const pipeline = [
    // Stage 1: Date range filter
    {
      $match: {
        transactionDate: {
          $gte: new Date(startDate),
          $lte: new Date(endDate)
        },
        status: "completed"
      }
    },
    
    // Stage 2: Add computed fields
    {
      $addFields: {
        month: { $month: "$transactionDate" },
        year: { $year: "$transactionDate" },
        quarter: { $quarter: "$transactionDate" }
      }
    },
    
    // Stage 3: Group by multiple dimensions
    {
      $group: {
        _id: {
          year: "$year",
          quarter: "$quarter",
          category: "$category"
        },
        totalRevenue: { $sum: "$amount" },
        transactionCount: { $sum: 1 },
        avgTransaction: { $avg: "$amount" },
        maxTransaction: { $max: "$amount" },
        minTransaction: { $min: "$amount" },
        uniqueCustomers: { $addToSet: "$customerId" }
      }
    },
    
    // Stage 4: Calculate derived metrics
    {
      $addFields: {
        uniqueCustomerCount: { $size: "$uniqueCustomers" },
        avgPerCustomer: {
          $divide: ["$totalRevenue", { $size: "$uniqueCustomers" }]
        }
      }
    },
    
    // Stage 5: Sort by revenue descending
    {
      $sort: { "totalRevenue": -1 }
    },
    
    // Stage 6: Limit to top results
    {
      $limit: 100
    }
  ];
  
  const results = await db.collection("transactions").aggregate(pipeline).toArray();
  return results;
}

Real-Time Analytics with $facet

The $facet operator allows you to run multiple aggregation pipelines in a single stage:

async function getComprehensiveAnalytics(db) {
  const pipeline = [
    {
      $facet: {
        // Revenue by category
        "byCategory": [
          { $group: { _id: "$category", revenue: { $sum: "$amount" } } },
          { $sort: { revenue: -1 } }
        ],
        
        // Revenue by month (last 12 months)
        "monthlyTrend": [
          { $match: { date: { $gte: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000) } } },
          { $group: { _id: { $month: "$date" }, revenue: { $sum: "$amount" } } },
          { $sort: { "_id": 1 } }
        ],
        
        // Top customers
        "topCustomers": [
          { $group: { _id: "$customerId", totalSpent: { $sum: "$amount" } } },
          { $sort: { totalSpent: -1 } },
          { $limit: 10 }
        ],
        
        // Statistics
        "statistics": [
          {
            $group: {
              _id: null,
              totalRevenue: { $sum: "$amount" },
              avgTransaction: { $avg: "$amount" },
              totalTransactions: { $sum: 1 }
            }
          }
        ]
      }
    }
  ];
  
  return await db.collection("transactions").aggregate(pipeline).next();
}

Data Transformation Patterns

Unwinding and Reshaping Data

Transform arrays into separate documents for analysis:

async function analyzeOrderItems(db) {
  const pipeline = [
    // Unwind the items array
    { $unwind: "$items" },
    
    // Enrich with product information
    {
      $lookup: {
        from: "products",
        localField: "items.productId",
        foreignField: "_id",
        as: "productInfo"
      }
    },
    
    // Flatten the lookup result
    { $unwind: "$productInfo" },
    
    // Calculate item-level revenue
    {
      $addFields: {
        "items.revenue": {
          $multiply: ["$items.quantity", "$items.price"]
        },
        "items.productName": "$productInfo.name",
        "items.category": "$productInfo.category"
      }
    },
    
    // Group back by order
    {
      $group: {
        _id: "$_id",
        orderDate: { $first: "$orderDate" },
        customerId: { $first: "$customerId" },
        items: { $push: "$items" },
        totalOrderValue: { $sum: "$items.revenue" }
      }
    }
  ];
  
  return await db.collection("orders").aggregate(pipeline).toArray();
}

Working with Time Series Data

MongoDB aggregation provides powerful date operators for time series analysis:

async function analyzeTimeSeriesData(db, timeRange = "7d") {
  // Calculate start date based on time range
  const ranges = {
    "7d": 7 * 24 * 60 * 60 * 1000,
    "30d": 30 * 24 * 60 * 60 * 1000,
    "90d": 90 * 24 * 60 * 60 * 1000
  };
  
  const startDate = new Date(Date.now() - (ranges[timeRange] || ranges["7d"]));
  
  const pipeline = [
    // Filter by time range
    { $match: { timestamp: { $gte: startDate } } },
    
    // Group by hour
    {
      $group: {
        _id: {
          year: { $year: "$timestamp" },
          month: { $month: "$timestamp" },
          day: { $dayOfMonth: "$timestamp" },
          hour: { $hour: "$timestamp" }
        },
        count: { $sum: 1 },
        avgValue: { $avg: "$value" },
        minValue: { $min: "$value" },
        maxValue: { $max: "$value" }
      }
    },
    
    // Sort chronologically
    { $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1, "_id.hour": 1 } },
    
    // Format the output
    {
      $project: {
        _id: 0,
        datetime: {
          $dateFromParts: {
            year: "$_id.year",
            month: "$_id.month",
            day: "$_id.day",
            hour: "$_id.hour"
          }
        },
        count: 1,
        avgValue: { $round: ["$avgValue", 2] },
        minValue: 1,
        maxValue: 1
      }
    }
  ];
  
  return await db.collection("metrics").aggregate(pipeline).toArray();
}

Optimization and Performance

Using Indexes Effectively

Ensure your aggregation pipelines are optimized with proper indexes:

// Create compound index for common query patterns
db.transactions.createIndex(
  { transactionDate: 1, status: 1, category: 1 },
  { name: "transaction_analytics_idx" }
);

// Create index for text search in aggregations
db.products.createIndex(
  { name: "text", description: "text", tags: "text" },
  { name: "product_text_idx" }
);

Pipeline Optimization Tips

  1. Place $match early: Filter documents as early as possible to reduce the dataset
  2. Limit fields with $project: Only include necessary fields to reduce memory usage
  3. Use $limit strategically: Apply limits before expensive operations when possible
  4. Consider $sample for large datasets: Use { $sample: { size: 100 } } for random sampling
// Optimized pipeline example
const optimizedPipeline = [
  // Most restrictive filter first
  { $match: { status: "active", date: { $gte: lastMonth } } },
  
  // Project only needed fields
  { $project: { userId: 1, amount: 1, category: 1 } },
  
  // Sort after filtering
  { $sort: { amount: -1 } },
  
  // Limit early if you only need top results
  { $limit: 100 },
  
  // Additional processing on reduced dataset
  { $group: { _id: "$category", total: { $sum: "$amount" } } }
];

Integration with Claude Code Workflows

Building Reusable Pipeline Templates

Create a library of reusable aggregation patterns:

// aggregation-library.js

// Common pipeline builders
export const PipelineBuilders = {
  // Time-based filtering
  timeFilter: (field, startDate, endDate) => ({
    $match: {
      [field]: {
        $gte: new Date(startDate),
        $lte: new Date(endDate)
      }
    }
  }),
  
  // Basic grouping
  groupByField: (groupField, sumField) => ({
    $group: {
      _id: `$${groupField}`,
      total: { $sum: `$${sumField}` },
      count: { $sum: 1 }
    }
  }),
  
  // Pagination
  paginate: (page = 1, limit = 20) => [
    { $skip: (page - 1) * limit },
    { $limit: limit }
  ],
  
  // Sort helper
  sortBy: (field, order = -1) => ({ $sort: { [field]: order } })
};

// Usage with Claude Code
async function runAnalysis(db) {
  const { timeFilter, groupByField, paginate, sortBy } = PipelineBuilders;
  
  const pipeline = [
    timeFilter("createdAt", "2025-01-01", "2025-12-31"),
    groupByField("category", "amount"),
    sortBy("total", -1),
    ...paginate(1, 10)
  ];
  
  return await db.collection("transactions").aggregate(pipeline).toArray();
}

Error Handling and Debugging

Pipeline Validation and Testing

Always validate your aggregation pipelines before running in production:

async function validateAndRunPipeline(db, collectionName, pipeline) {
  try {
    // Explain the pipeline to see execution plan
    const explanation = await db.collection(collectionName)
      .aggregate(pipeline)
      .explain("queryPlanner");
    
    console.log("Execution Plan:", explanation.queryPlanner);
    
    // Check for warnings
    if (explanation.queryPlanner.warnings) {
      console.warn("Pipeline Warnings:", explanation.queryPlanner.warnings);
    }
    
    // Run the pipeline with a timeout
    const result = await db.collection(collectionName)
      .aggregate(pipeline)
      .maxTimeMS(30000)  // 30 second timeout
      .toArray();
    
    return result;
    
  } catch (error) {
    if (error.code === 16819) {
      console.error("Pipeline timeout - consider optimizing with indexes");
    } else if (error.code === 2) {
      console.error("Invalid pipeline syntax:", error.message);
    }
    throw error;
  }
}

Conclusion

MongoDB aggregation pipelines combined with Claude Code provide a powerful combination for building data analytics and transformation workflows. The key to success lies in understanding the available stages, optimizing pipeline order, and leveraging indexes effectively. Start with simple pipelines and gradually add complexity as you become more comfortable with the framework.

Remember these best practices:

With these patterns and practices, you’re well-equipped to build robust MongoDB aggregation workflows that scale with your application’s needs.


Built by theluckystrike — More at https://zovo.one