Claude Skills Guide

Claude Code for Batch Processing Optimization Workflow

Batch processing is a fundamental pattern in software development—when you need to handle thousands of records, generate reports, process images, or run data transformations, efficient batch workflows save hours of development time and compute resources. Claude Code offers powerful capabilities for building, optimizing, and debugging batch processing workflows. This guide walks you through practical strategies for creating high-performance batch processing systems.

Understanding Batch Processing Fundamentals

Batch processing involves breaking large workloads into manageable chunks, processing each chunk, and aggregating results. The key challenges include:

Claude Code can assist at every stage—from designing the batch architecture to implementing the processing logic and optimizing performance.

Designing Your Batch Processing Workflow

The first step is structuring your data for efficient processing. Consider this example of processing a large CSV file:

import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import os

def process_batch(records):
    """Process a batch of records with error tracking"""
    results = []
    errors = []
    
    for record in records:
        try:
            processed = transform_record(record)
            results.append(processed)
        except Exception as e:
            errors.append({"record": record, "error": str(e)})
    
    return {"results": results, "errors": errors}

def transform_record(record):
    # Your transformation logic here
    return record

This basic structure shows the essential pattern: iterate through records, handle each one, track successes and failures, and return both for downstream processing.

Parallel Execution Strategies

One of the most impactful optimizations is parallelizing your batch processing. Claude Code can help you implement several strategies:

ThreadPoolExecutor for I/O-Bound Tasks

When your processing involves network calls, database queries, or file operations, threading provides significant speedups:

def parallel_process(records, max_workers=10, batch_size=100):
    """Process records in parallel with controlled concurrency"""
    
    # Split into batches
    batches = [records[i:i + batch_size] 
               for i in range(0, len(records), batch_size)]
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_batch, batch) 
                   for batch in batches]
        
        all_results = []
        all_errors = []
        
        for future in futures:
            result = future.result()
            all_results.extend(result["results"])
            all_errors.extend(result["errors"])
    
    return {"results": all_results, "errors": all_errors}

The max_workers parameter controls concurrency—start conservative (5-10 workers) and tune based on your system’s capabilities and the nature of your I/O operations.

Multiprocessing for CPU-Intensive Tasks

For CPU-bound transformations like image processing or cryptographic operations, use multiprocessing to bypass Python’s GIL:

from multiprocessing import Pool, cpu_count

def cpu_intensive_transform(record):
    # Heavy computation here
    return compute_intensive_operation(record)

def parallel_cpu_process(records):
    num_workers = cpu_count() - 1  # Leave one core free
    
    with Pool(num_workers) as pool:
        results = pool.map(cpu_intensive_transform, records)
    
    return results

Error Handling and Recovery Patterns

Robust batch processing requires comprehensive error handling. Here are battle-tested patterns:

Checkpointing for Long-Running Jobs

Save progress periodically to recover from interruptions:

import json

class BatchProcessor:
    def __init__(self, checkpoint_file="checkpoint.json"):
        self.checkpoint_file = checkpoint_file
        self.processed_ids = self._load_checkpoint()
    
    def _load_checkpoint(self):
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'r') as f:
                return set(json.load(f))
        return set()
    
    def _save_checkpoint(self):
        with open(self.checkpoint_file, 'w') as f:
            json.dump(list(self.processed_ids), f)
    
    def process_with_checkpoint(self, records):
        for record in records:
            if record["id"] in self.processed_ids:
                continue
            
            # Process the record
            result = process_single(record)
            self.processed_ids.add(record["id"])
            
            # Save checkpoint every 100 records
            if len(self.processed_ids) % 100 == 0:
                self._save_checkpoint()
        
        self._save_checkpoint()
        return "Completed"

Dead Letter Queues for Failed Records

Always isolate failures rather than letting them halt entire batches:

def process_with_dead_letter(records):
    successful = []
    dead_letter = []
    
    for record in records:
        try:
            result = process_record(record)
            successful.append(result)
        except ProcessingError as e:
            dead_letter.append({
                "record": record,
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            })
    
    # Write failed records for later inspection
    with open("dead_letter.json", 'w') as f:
        json.dump(dead_letter, f, indent=2)
    
    return {"successful": successful, "failed": len(dead_letter)}

Memory Management for Large Datasets

Processing millions of records can exhaust memory. Here are essential techniques:

Chunked Processing

Never load entire datasets into memory:

def process_large_file(filepath, chunk_size=10000):
    """Process large files in chunks to avoid memory issues"""
    
    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # Process each chunk
        processed_chunk = transform_dataframe(chunk)
        
        # Write immediately to output
        processed_chunk.to_csv(
            "output.csv", 
            mode='a', 
            header=not os.path.exists("output.csv")
        )
        
        # Explicitly free memory
        del chunk
        del processed_chunk

Generator Patterns for Streaming

Use generators to process data lazily:

def record_generator(filepath):
    """Stream records one at a time"""
    with open(filepath, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

def stream_process(records):
    """Process records as they come in"""
    results = []
    for record in record_generator("large_file.csv"):
        processed = transform_record(record)
        results.append(processed)
        
        # Batch write every 1000 records
        if len(results) >= 1000:
            write_batch(results)
            results = []
    
    # Don't forget final batch
    if results:
        write_batch(results)

Monitoring and Optimization Tips

Finally, add observability to your batch jobs:

import time
from functools import wraps

def monitor_batch(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed = time.time() - start_time
        
        print(f"Processed {len(result.get('results', []))} items "
              f"in {elapsed:.2f} seconds")
        print(f"Throughput: {len(result.get('results', [])) / elapsed:.2f} items/sec")
        
        if result.get("errors"):
            print(f"Errors: {len(result['errors'])}")
        
        return result
    return wrapper

Conclusion

Optimizing batch processing workflows requires balancing throughput, reliability, and resource efficiency. Claude Code can help you implement these patterns quickly, debug issues, and iterate on your implementation. Start with the basic sequential approach, add parallelization where it matters most, implement checkpointing for long jobs, and always monitor your performance metrics.

The key is to measure first, optimize second—use the monitoring patterns shown here to identify bottlenecks before diving into complex optimizations. With these patterns in your toolkit, you’ll be equipped to handle batch processing challenges at any scale.