Claude Code MongoDB Aggregation Pipeline Workflow Guide
MongoDB’s aggregation framework is one of the most powerful tools for data analysis and transformation in the NoSQL world. When combined with Claude Code’s capabilities, you can build sophisticated data pipelines that handle complex transformations, analytics, and real-time processing. This guide walks you through practical workflows for working with MongoDB aggregation pipelines using Claude Code.
Understanding Aggregation Pipelines
Aggregation pipelines process documents through a series of stages, where each stage transforms the data and passes the results to the next. This approach is similar to a production line where each worker performs a specific task before passing the work to the next person.
Basic Pipeline Structure
The aggregation pipeline is an array of stages, each beginning with a $ operator:
db.collection.aggregate([
{ $match: { status: "active" } }, // Filter documents
{ $group: { // Group by field
_id: "$category",
total: { $sum: "$amount" }
}
},
{ $sort: { total: -1 } } // Sort results
]);
Each stage in the pipeline performs a specific operation:
- $match: Filters documents (like WHERE in SQL)
- $project: Reshapes documents, selecting or excluding fields
- $group: Groups documents by a key and performs aggregations
- $sort: Orders documents by specified fields
- $limit: Limits the number of documents
- $skip: Skips a number of documents
Setting Up Your MongoDB Connection
Before building aggregation workflows, establish a reliable connection to your MongoDB instance using Claude Code:
// Connection configuration
const mongoConfig = {
uri: process.env.MONGODB_URI || "mongodb://localhost:27017",
database: "analytics",
options: {
maxPoolSize: 10,
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000
}
};
// Using the official MongoDB driver
import { MongoClient } from "mongodb";
async function connectToMongoDB() {
const client = new MongoClient(mongoConfig.uri, mongoConfig.options);
await client.connect();
console.log("Connected to MongoDB");
return client.db(mongoConfig.database);
}
Building Complex Aggregation Workflows
Financial Analytics Pipeline
Let’s build a comprehensive aggregation pipeline for financial analytics:
async function runFinancialAnalytics(db, startDate, endDate) {
const pipeline = [
// Stage 1: Date range filter
{
$match: {
transactionDate: {
$gte: new Date(startDate),
$lte: new Date(endDate)
},
status: "completed"
}
},
// Stage 2: Add computed fields
{
$addFields: {
month: { $month: "$transactionDate" },
year: { $year: "$transactionDate" },
quarter: { $quarter: "$transactionDate" }
}
},
// Stage 3: Group by multiple dimensions
{
$group: {
_id: {
year: "$year",
quarter: "$quarter",
category: "$category"
},
totalRevenue: { $sum: "$amount" },
transactionCount: { $sum: 1 },
avgTransaction: { $avg: "$amount" },
maxTransaction: { $max: "$amount" },
minTransaction: { $min: "$amount" },
uniqueCustomers: { $addToSet: "$customerId" }
}
},
// Stage 4: Calculate derived metrics
{
$addFields: {
uniqueCustomerCount: { $size: "$uniqueCustomers" },
avgPerCustomer: {
$divide: ["$totalRevenue", { $size: "$uniqueCustomers" }]
}
}
},
// Stage 5: Sort by revenue descending
{
$sort: { "totalRevenue": -1 }
},
// Stage 6: Limit to top results
{
$limit: 100
}
];
const results = await db.collection("transactions").aggregate(pipeline).toArray();
return results;
}
Real-Time Analytics with $facet
The $facet operator allows you to run multiple aggregation pipelines in a single stage:
async function getComprehensiveAnalytics(db) {
const pipeline = [
{
$facet: {
// Revenue by category
"byCategory": [
{ $group: { _id: "$category", revenue: { $sum: "$amount" } } },
{ $sort: { revenue: -1 } }
],
// Revenue by month (last 12 months)
"monthlyTrend": [
{ $match: { date: { $gte: new Date(Date.now() - 365 * 24 * 60 * 60 * 1000) } } },
{ $group: { _id: { $month: "$date" }, revenue: { $sum: "$amount" } } },
{ $sort: { "_id": 1 } }
],
// Top customers
"topCustomers": [
{ $group: { _id: "$customerId", totalSpent: { $sum: "$amount" } } },
{ $sort: { totalSpent: -1 } },
{ $limit: 10 }
],
// Statistics
"statistics": [
{
$group: {
_id: null,
totalRevenue: { $sum: "$amount" },
avgTransaction: { $avg: "$amount" },
totalTransactions: { $sum: 1 }
}
}
]
}
}
];
return await db.collection("transactions").aggregate(pipeline).next();
}
Data Transformation Patterns
Unwinding and Reshaping Data
Transform arrays into separate documents for analysis:
async function analyzeOrderItems(db) {
const pipeline = [
// Unwind the items array
{ $unwind: "$items" },
// Enrich with product information
{
$lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "productInfo"
}
},
// Flatten the lookup result
{ $unwind: "$productInfo" },
// Calculate item-level revenue
{
$addFields: {
"items.revenue": {
$multiply: ["$items.quantity", "$items.price"]
},
"items.productName": "$productInfo.name",
"items.category": "$productInfo.category"
}
},
// Group back by order
{
$group: {
_id: "$_id",
orderDate: { $first: "$orderDate" },
customerId: { $first: "$customerId" },
items: { $push: "$items" },
totalOrderValue: { $sum: "$items.revenue" }
}
}
];
return await db.collection("orders").aggregate(pipeline).toArray();
}
Working with Time Series Data
MongoDB aggregation provides powerful date operators for time series analysis:
async function analyzeTimeSeriesData(db, timeRange = "7d") {
// Calculate start date based on time range
const ranges = {
"7d": 7 * 24 * 60 * 60 * 1000,
"30d": 30 * 24 * 60 * 60 * 1000,
"90d": 90 * 24 * 60 * 60 * 1000
};
const startDate = new Date(Date.now() - (ranges[timeRange] || ranges["7d"]));
const pipeline = [
// Filter by time range
{ $match: { timestamp: { $gte: startDate } } },
// Group by hour
{
$group: {
_id: {
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
day: { $dayOfMonth: "$timestamp" },
hour: { $hour: "$timestamp" }
},
count: { $sum: 1 },
avgValue: { $avg: "$value" },
minValue: { $min: "$value" },
maxValue: { $max: "$value" }
}
},
// Sort chronologically
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1, "_id.hour": 1 } },
// Format the output
{
$project: {
_id: 0,
datetime: {
$dateFromParts: {
year: "$_id.year",
month: "$_id.month",
day: "$_id.day",
hour: "$_id.hour"
}
},
count: 1,
avgValue: { $round: ["$avgValue", 2] },
minValue: 1,
maxValue: 1
}
}
];
return await db.collection("metrics").aggregate(pipeline).toArray();
}
Optimization and Performance
Using Indexes Effectively
Ensure your aggregation pipelines are optimized with proper indexes:
// Create compound index for common query patterns
db.transactions.createIndex(
{ transactionDate: 1, status: 1, category: 1 },
{ name: "transaction_analytics_idx" }
);
// Create index for text search in aggregations
db.products.createIndex(
{ name: "text", description: "text", tags: "text" },
{ name: "product_text_idx" }
);
Pipeline Optimization Tips
- Place $match early: Filter documents as early as possible to reduce the dataset
- Limit fields with $project: Only include necessary fields to reduce memory usage
- Use $limit strategically: Apply limits before expensive operations when possible
- Consider $sample for large datasets: Use { $sample: { size: 100 } } for random sampling
// Optimized pipeline example
const optimizedPipeline = [
// Most restrictive filter first
{ $match: { status: "active", date: { $gte: lastMonth } } },
// Project only needed fields
{ $project: { userId: 1, amount: 1, category: 1 } },
// Sort after filtering
{ $sort: { amount: -1 } },
// Limit early if you only need top results
{ $limit: 100 },
// Additional processing on reduced dataset
{ $group: { _id: "$category", total: { $sum: "$amount" } } }
];
Integration with Claude Code Workflows
Building Reusable Pipeline Templates
Create a library of reusable aggregation patterns:
// aggregation-library.js
// Common pipeline builders
export const PipelineBuilders = {
// Time-based filtering
timeFilter: (field, startDate, endDate) => ({
$match: {
[field]: {
$gte: new Date(startDate),
$lte: new Date(endDate)
}
}
}),
// Basic grouping
groupByField: (groupField, sumField) => ({
$group: {
_id: `$${groupField}`,
total: { $sum: `$${sumField}` },
count: { $sum: 1 }
}
}),
// Pagination
paginate: (page = 1, limit = 20) => [
{ $skip: (page - 1) * limit },
{ $limit: limit }
],
// Sort helper
sortBy: (field, order = -1) => ({ $sort: { [field]: order } })
};
// Usage with Claude Code
async function runAnalysis(db) {
const { timeFilter, groupByField, paginate, sortBy } = PipelineBuilders;
const pipeline = [
timeFilter("createdAt", "2025-01-01", "2025-12-31"),
groupByField("category", "amount"),
sortBy("total", -1),
...paginate(1, 10)
];
return await db.collection("transactions").aggregate(pipeline).toArray();
}
Error Handling and Debugging
Pipeline Validation and Testing
Always validate your aggregation pipelines before running in production:
async function validateAndRunPipeline(db, collectionName, pipeline) {
try {
// Explain the pipeline to see execution plan
const explanation = await db.collection(collectionName)
.aggregate(pipeline)
.explain("queryPlanner");
console.log("Execution Plan:", explanation.queryPlanner);
// Check for warnings
if (explanation.queryPlanner.warnings) {
console.warn("Pipeline Warnings:", explanation.queryPlanner.warnings);
}
// Run the pipeline with a timeout
const result = await db.collection(collectionName)
.aggregate(pipeline)
.maxTimeMS(30000) // 30 second timeout
.toArray();
return result;
} catch (error) {
if (error.code === 16819) {
console.error("Pipeline timeout - consider optimizing with indexes");
} else if (error.code === 2) {
console.error("Invalid pipeline syntax:", error.message);
}
throw error;
}
}
Conclusion
MongoDB aggregation pipelines combined with Claude Code provide a powerful combination for building data analytics and transformation workflows. The key to success lies in understanding the available stages, optimizing pipeline order, and leveraging indexes effectively. Start with simple pipelines and gradually add complexity as you become more comfortable with the framework.
Remember these best practices:
- Place
$matchstages early to reduce data volume - Use
$projectto limit fields and reshape documents - Leverage
$facetfor multi-dimensional analysis - Always test with
.explain()before production deployment - Set appropriate timeouts for long-running pipelines
With these patterns and practices, you’re well-equipped to build robust MongoDB aggregation workflows that scale with your application’s needs.
Related Reading
- Claude Code for Beginners: Complete Getting Started Guide
- Best Claude Skills for Developers in 2026
- Claude Skills Guides Hub
Built by theluckystrike — More at https://zovo.one