Claude Skills Guide

Claude Code for Redis Streams Workflow Guide

Redis Streams is a powerful data structure designed for building real-time message processing systems, event sourcing architectures, and distributed task queues. When combined with Claude Code, you can create intelligent, context-aware workflows that process stream data with the power of AI. This guide walks you through building robust Redis Streams workflows using Claude Code, from basic patterns to production-ready implementations.

Understanding Redis Streams and Claude Code

Redis Streams (introduced in Redis 5.0) provides a log-structured data type that excels at capturing ordered, immutable events. Unlike traditional pub/sub, streams support consumer groups, message acknowledgment, and persistent storage—making them ideal for building reliable, scalable workflows.

Claude Code extends these capabilities by adding intelligent processing logic. You can use Claude Code to:

Setting Up Your Redis Streams Environment

Before building workflows, ensure you have Redis running with streams support. Most managed Redis services (Redis Enterprise, Amazon ElastiCache, Redis Cloud) support streams out of the box.

Installing Required Dependencies

Create a new Claude Code project and install the necessary Redis client:

npm init -y
npm install ioredis uuid

Basic Redis Streams Connection

Here’s a practical connection setup for working with streams:

const Redis = require('ioredis');

const redis = new Redis({
  host: 'localhost',
  port: 6379,
  maxRetriesPerRequest: 3,
  retryDelayOnFailover: 100,
  enableReadyCheck: true,
  lazyConnect: true,
});

async function connect() {
  await redis.connect();
  console.log('Connected to Redis');
}

module.exports = { redis, connect };

Creating Stream Producers

Producers are the foundation of any stream workflow. They generate entries that flow through your system. Here’s how to create a robust stream producer:

const { redis } = require('./redis-client');
const { v4: uuidv4 } = require('uuid');

async function addStreamEntry(streamName, data) {
  const entryId = `${Date.now()}-${uuidv4().slice(0, 8)}`;
  
  await redis.xadd(
    streamName,
    '*',  // Auto-generate ID
    'data', JSON.stringify(data),
    'timestamp', Date.now().toString(),
    'source', 'claude-code-producer'
  );
  
  return entryId;
}

// Example: Adding user events to a stream
async function trackUserEvent(userId, eventType, properties) {
  return addStreamEntry('user-events', {
    userId,
    eventType,
    properties,
    sessionId: uuidv4(),
  });
}

This pattern creates entries with structured data that Claude Code can process intelligently.

Building Stream Consumers with Claude Code

Consumer groups enable multiple workers to process stream entries collaboratively. Here’s a production-ready consumer pattern:

const { redis } = require('./redis-client');

class StreamConsumer {
  constructor(groupName, consumerName, streamName) {
    this.group = groupName;
    this.consumer = consumerName;
    this.stream = streamName;
  }

  async initialize() {
    try {
      await redis.xgroup('CREATE', this.stream, this.group, '0', 'MKSTREAM');
    } catch (error) {
      if (!error.message.includes('BUSYGROUP')) {
        throw error;
      }
    }
  }

  async processEntries(processorFn) {
    const entries = await redis.xreadgroup(
      'GROUP', this.group, this.consumer,
      'COUNT', 10,
      'BLOCK', 5000,
      'STREAMS', this.stream, '>'
    );

    if (!entries) return [];

    const results = [];
    for (const [stream, messages] of entries) {
      for (const [id, fields] of messages) {
        const data = this.parseEntry(fields);
        
        try {
          const result = await processorFn(data);
          await redis.xack(this.stream, this.group, id);
          results.push({ id, status: 'processed', result });
        } catch (error) {
          console.error(`Failed to process ${id}:`, error.message);
          await this.handleFailure(id, data, error);
        }
      }
    }
    return results;
  }

  parseEntry(fields) {
    const obj = {};
    for (let i = 0; i < fields.length; i += 2) {
      obj[fields[i]] = fields[i + 1];
    }
    if (obj.data) {
      obj.data = JSON.parse(obj.data);
    }
    return obj;
  }

  async handleFailure(id, data, error) {
    // Implement dead letter queue logic here
    await redis.xadd('stream-errors', '*',
      'originalStream', this.stream,
      'entryId', id,
      'error', error.message,
      'data', JSON.stringify(data)
    );
  }
}

Integrating Claude Code for Intelligent Processing

Now comes the powerful part—using Claude Code to process stream entries intelligently:

const { Anthropic } = require('@anthropic-ai/sdk');

const anthropic = new Anthropic({
  apiKey: process.env.ANTHROPIC_API_KEY,
});

class IntelligentStreamProcessor {
  constructor(consumer) {
    this.consumer = consumer;
  }

  async analyzeEntry(data) {
    const prompt = `Analyze this user event and determine the appropriate action:
    
Event: ${JSON.stringify(data, null, 2)}

Respond with a JSON object containing:
- priority: "high" | "medium" | "low"
- category: one of ["support", "billing", "feature_request", "bug", "general"]
- sentiment: "positive" | "neutral" | "negative"
- action_required: brief description`;

    const response = await anthropic.messages.create({
      model: 'claude-3-haiku-20240307',
      max_tokens: 200,
      messages: [{ role: 'user', content: prompt }],
    });

    return JSON.parse(response.content[0].text);
  }

  async process() {
    await this.consumer.processEntries(async (data) => {
      const analysis = await this.analyzeEntry(data.data);
      
      // Route to appropriate stream based on analysis
      await redis.xadd(
        `priority-${analysis.priority}`,
        '*',
        'data', JSON.stringify({ ...data.data, analysis })
      );

      return analysis;
    });
  }
}

This creates an intelligent routing system where Claude Code analyzes each event and directs it to the appropriate queue.

Building a Complete Workflow

Here’s how all the pieces fit together in a production workflow:

async function main() {
  const streamName = 'user-events';
  const groupName = 'claude-processors';
  const consumerName = `worker-${process.env.HOSTNAME || 'local'}`;

  // Initialize consumer group
  const consumer = new StreamConsumer(groupName, consumerName, streamName);
  await consumer.initialize();

  // Create intelligent processor
  const processor = new IntelligentStreamProcessor(consumer);

  // Start processing loop
  console.log(`Starting ${consumerName}...`);
  
  setInterval(async () => {
    try {
      await processor.process();
    } catch (error) {
      console.error('Processing error:', error);
    }
  }, 1000);
}

main().catch(console.error);

Best Practices for Production Deployments

When deploying Redis Streams workflows with Claude Code in production, follow these essential practices:

1. Implement Proper Error Handling

Always wrap stream operations in try-catch blocks and maintain dead letter queues for failed entries. This prevents data loss and enables debugging.

2. Use Consumer Group Patterns

Consumer groups provide exactly-once processing semantics and enable horizontal scaling. Create groups during initialization and handle the BUSYGROUP error gracefully.

3. Monitor Stream Length

Periodically check stream length using XINFO STREAM and implement cleanup policies:

async function trimStream(streamName, maxLength = 10000) {
  const info = await redis.xinfo('STREAM', streamName);
  if (info['length'] > maxLength) {
    await redis.xtrim(streamName, 'MAXLEN', maxLength);
  }
}

4. Set Up Idempotent Processing

Design your processors to handle duplicate deliveries gracefully by tracking processed entry IDs:

const processed = new Set();

async function processEntry(id, data) {
  if (processed.has(id)) {
    console.log(`Skipping duplicate: ${id}`);
    return;
  }
  
  // Process the entry
  await doProcessing(data);
  
  processed.add(id);
  if (processed.size > 10000) {
    processed.clear();
  }
}

Conclusion

Redis Streams combined with Claude Code creates powerful, intelligent workflow systems. By using streams for reliable message delivery and Claude Code for intelligent processing, you can build applications that handle complex routing, analysis, and decision-making automatically.

Start with the basic patterns in this guide, then extend them based on your specific requirements. The combination of Redis’s rock-solid streaming infrastructure with Claude Code’s AI capabilities opens up possibilities for building sophisticated, autonomous systems that can understand, categorize, and route data intelligently.

Remember to monitor your streams in production, implement proper error handling, and design for scale from the beginning. With these patterns in place, you’ll have a robust foundation for building event-driven, AI-powered applications.

Built by theluckystrike — More at zovo.one