Claude Skills Guide

Claude Code Kafka Consumer Producer Tutorial Guide

Apache Kafka has become the backbone of modern event-driven architectures. Whether you’re building real-time analytics pipelines, microservice communication systems, or streaming applications, understanding how to effectively implement Kafka consumers and producers is essential. This guide walks you through building robust Kafka integrations using Claude Code, with practical examples you can apply immediately to your projects.

Setting Up Your Kafka Environment

Before diving into code, ensure you have a working Kafka setup. For local development, the easiest approach is using Docker Compose. Create a docker-compose.yml file with Kafka and Zookeeper:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
  
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Start your containers with docker-compose up -d. Verify Kafka is running by creating a test topic:

docker exec kafka kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Building a Kafka Producer with Claude Code

A producer publishes messages to Kafka topics. The following example demonstrates a production-ready producer that handles serialization, error handling, and configuration management:

from kafka import KafkaProducer
import json
import logging
from typing import Any, Dict, Optional

class EventProducer:
    def __init__(self, bootstrap_servers: str = "localhost:9092"):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',
            retries=3,
            retry_backoff_ms=1000,
        )
        self.logger = logging.getLogger(__name__)
    
    def send_event(self, topic: str, key: Optional[str], value: Dict[str, Any]) -> bool:
        try:
            future = self.producer.send(topic, key=key, value=value)
            record_metadata = future.get(timeout=10)
            self.logger.info(f"Message sent to {record_metadata.topic} "
                           f"partition {record_metadata.partition} "
                           f"offset {record_metadata.offset}")
            return True
        except Exception as e:
            self.logger.error(f"Failed to send message: {e}")
            return False
    
    def close(self):
        self.producer.flush()
        self.producer.close()

# Usage example
producer = EventProducer()
event = {"user_id": "12345", "action": "purchase", "amount": 99.99}
producer.send_event("user-events", "12345", event)
producer.close()

Key producer configurations explained:

Building a Kafka Consumer with Claude Code

Consumers read messages from Kafka topics. A well-designed consumer handles partition assignment, offset management, and error recovery:

from kafka import KafkaConsumer
import json
import logging
from typing import Callable, Optional

class EventConsumer:
    def __init__(self, 
                 bootstrap_servers: str = "localhost:9092",
                 group_id: str = "my-consumer-group",
                 auto_offset_reset: str = "earliest"):
        self.consumer = KafkaConsumer(
            'user-events',
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset=auto_offset_reset,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            enable_auto_commit=True,
            auto_commit_interval_ms=5000,
        )
        self.logger = logging.getLogger(__name__)
    
    def process_messages(self, handler: Callable[[dict], None]):
        try:
            for message in self.consumer:
                self.logger.info(f"Received message: {message.value} "
                               f"from partition {message.partition} "
                               f"offset {message.offset}")
                try:
                    handler(message.value)
                except Exception as e:
                    self.logger.error(f"Error processing message: {e}")
        except KeyboardInterrupt:
            self.logger.info("Consumer stopped")
        finally:
            self.consumer.close()
    
    def close(self):
        self.consumer.close()

# Usage example
def handle_event(event: dict):
    print(f"Processing event: {event}")

consumer = EventConsumer(group_id="my-app-group")
consumer.process_messages(handle_event)

Consumer best practices:

Advanced Patterns for Production Systems

Exactly-Once Semantics

For critical applications requiring exactly-once delivery, implement idempotent producers and transactions:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    enable_idempotence=True,
    transaction_id='my-transaction-id'
)

# All messages in a transaction are committed together
with producer.transaction():
    producer.send('topic1', value={'data': 'A'})
    producer.send('topic2', value={'data': 'B'})

Consumer Rebalance Handling

When Kafka reassigns partitions, your consumer must handle the transition gracefully:

from kafka import KafkaConsumer
from kafka.coordinator.assignors.round_robin import RoundRobinPartitionAssignor

consumer = KafkaConsumer(
    'events',
    bootstrap_servers='localhost:9092',
    group_id='my-group',
    partition_assignment_strategy=[RoundRobinPartitionAssignor],
    # Handle rebalance callbacks
    rebalance_timeout_ms=30000,
)

Message Schema Management

For robust systems, define message schemas using Avro or Protocol Buffers:

from confluent_kafka import Producer, Consumer
from avro.schema import parse_schema

schema = parse_schema({
    "type": "record",
    "name": "UserEvent",
    "fields": [
        {"name": "user_id", "type": "string"},
        {"name": "action", "type": "string"},
        {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}}
    ]
})

Actionable Advice for Claude Code Projects

  1. Use environment variables for configuration: Never hardcode bootstrap servers or credentials. Use .env files with libraries like python-dotenv.

  2. Implement dead letter queues: When message processing fails after retries, send to a separate topic for investigation rather than losing data.

  3. Monitor consumer lag: Use tools like kafka-consumer-groups to track how far behind your consumers are. Lag indicates processing issues or insufficient consumer capacity.

  4. Test with realistic workloads: Use tools like kafka-producer-perf-test to benchmark your producers and identify bottlenecks before production.

  5. Graceful shutdown: Ensure your applications handle SIGTERM properly, committing offsets before exit to prevent message loss.

Conclusion

Building reliable Kafka consumers and producers requires attention to error handling, configuration, and monitoring. The patterns and examples in this guide provide a solid foundation for integrating Kafka into your Claude Code projects. Start with the basic producer and consumer implementations, then add exactly-once semantics, schema validation, and comprehensive monitoring as your requirements evolve.

Kafka’s distributed nature provides scalability and durability, but the benefits only materialize when your implementations follow best practices. Apply these patterns to your projects, and you’ll have robust message processing that scales with your application’s demands.

Step-by-Step Guide: Deploying Kafka Consumers and Producers

Here is a concrete approach to going from local Kafka development to a production-ready integration.

Step 1 — Set up local Kafka with Docker Compose. Before writing application code, get a Kafka cluster running locally. Claude Code generates a Docker Compose file with a single-node Kafka broker and Zookeeper, health checks that wait for Kafka to be fully ready, and a topic initialization script that creates your topics with the appropriate partition count and replication factor.

Step 2 — Write a smoke test for your producer and consumer. Before implementing business logic, write a test that produces a known message and verifies the consumer receives it. Claude Code generates the smoke test using pytest-kafka or a simple script that starts producer and consumer in separate threads, sends one message, and asserts receipt within a timeout.

Step 3 — Implement your message schema with Avro. Define your message schema in Avro IDL or JSON Schema before writing the producer code. Claude Code generates the schema file, the Python dataclass or TypeScript interface derived from it, and the Confluent Schema Registry client configuration that validates messages on every produce and consume call.

Step 4 — Add consumer group offset monitoring. Set up a monitoring dashboard that tracks consumer group lag. Claude Code generates the Prometheus metrics exporter that queries Kafka’s consumer group API and exposes lag as a gauge metric, plus a Grafana dashboard definition that alerts when lag exceeds your acceptable threshold.

Step 5 — Implement graceful shutdown for consumers. Kafka consumers killed without committing offsets will reprocess messages from the last committed offset on restart. Claude Code generates the signal handler that calls consumer.close() on SIGTERM and SIGINT, ensuring the consumer commits its current offset before exiting.

Common Pitfalls

Committing offsets before processing completes. Auto-commit offsets every 5 seconds (the default) means your consumer might commit an offset before the message has been fully processed. If the consumer crashes after the commit but before the database write, the message is lost. Claude Code generates the manual commit pattern that only commits after the database write is confirmed.

Using a single partition for high-throughput topics. A Kafka topic with one partition can only be consumed by one consumer in a group, limiting throughput to a single machine. Size your partition count based on your target throughput divided by the throughput of a single consumer. Claude Code generates the partition sizing calculator.

Not configuring max.poll.interval.ms appropriately. If your consumer’s message processing takes longer than max.poll.interval.ms (default 5 minutes), Kafka considers the consumer dead and reassigns its partitions. Long-running processing jobs need this value increased. Claude Code generates the consumer configuration with recommended values for different processing latency profiles.

Producing messages without error callbacks. The Kafka producer is asynchronous. Calling producer.send() without waiting for confirmation or registering an error callback means failed messages are silently dropped. Claude Code generates the producer with delivery reports enabled and a dead letter queue writer that captures failed messages.

Not handling deserialization errors. If a producer sends a malformed message, the consumer’s deserializer will throw an exception. Without handling this exception, the consumer gets stuck and cannot advance past the bad message. Claude Code generates the deserialization error handler that logs the raw bytes to a dead letter queue and advances the offset.

Best Practices

Use idempotent producers for exactly-once delivery. Enable idempotent producers to prevent duplicate messages from network retries. Combined with exactly-once transactions, you can build pipelines where each message is processed exactly once even under failure conditions. Claude Code generates the idempotent producer configuration.

Monitor broker metrics alongside application metrics. Consumer lag is only part of the picture. Monitor broker disk usage, network throughput, and request latency to catch infrastructure bottlenecks before they affect your application. Claude Code generates the JMX metric collection configuration.

Version your message schemas with backward compatibility. When you need to change a message schema, use Avro’s schema evolution rules to add optional fields with defaults rather than removing or changing existing fields. Claude Code generates the schema evolution validation script.

Test with realistic message rates. A consumer that handles 10 messages per second in a unit test may fail at 10,000 messages per second in production. Claude Code generates the load test script that ramps up message rate and measures consumer lag growth.

Integration Patterns

Faust stream processing. For Python services that need stream processing capabilities such as windowed aggregations and stateful transformations, Faust provides a higher-level API built on top of Kafka. Claude Code generates the Faust app definition with agents for your consumer topics.

Kafka Connect for database integration. For reading from or writing to databases without writing consumer or producer code, Kafka Connect provides managed connectors. Claude Code generates the connector configuration JSON for common databases including PostgreSQL CDC via Debezium, Elasticsearch sink, and S3 sink.

Dead letter queue patterns. For messages that fail processing after all retries, a dead letter queue topic captures the failed message with metadata about the failure. Claude Code generates the DLQ producer wrapper and the monitoring alert that notifies your team when DLQ depth exceeds a threshold.

Schema Evolution and Compatibility

As your application evolves, message schemas change. Adding new fields, removing deprecated fields, or changing field types all require careful management to avoid breaking consumers that run on older code versions. The Confluent Schema Registry supports three compatibility modes: backward (new schema can read old messages), forward (old schema can read new messages), and full (both directions).

Claude Code generates the schema evolution test that validates a proposed schema change against your registered compatibility mode before you publish it. The test fetches the current schema from the registry, applies your proposed change, and runs the compatibility check locally without making any registry updates. This prevents accidentally registering an incompatible schema that would break live consumers.

For teams that manage schemas in files rather than through the registry directly, Claude Code generates the CI workflow that registers new schema versions from your schemas directory, enforces your compatibility mode, and fails the CI job if a proposed schema change violates compatibility.