AI Tools Compared

Database schema diffing is more than ALTER TABLE generation. A good schema diff tool identifies breaking changes, flags unsafe migrations (locking issues, large table rewrites), and generates rollback scripts. AI adds a layer on top: natural language explanation of what changed and why it might be dangerous. This guide covers Atlas, pgcmp, and building a Claude-powered schema diff pipeline.

What AI Adds to Schema Diffing

Traditional schema diff tools tell you what changed. AI tells you:

Atlas (ariga.io)

Atlas is an open-source schema management tool with a built-in diff engine and an AI explanation layer (Atlas Cloud).

Basic schema diff:

# Compare current DB schema to desired state
atlas schema diff \
  --from "postgres://user:pass@localhost/prod_db" \
  --to "file://schema.hcl" \
  --dev-url "docker://postgres/15/dev"

Atlas HCL schema definition:

# schema.hcl
table "users" {
  schema = schema.public
  column "id" {
    type = bigserial
  }
  column "email" {
    type = varchar(255)
    null = false
  }
  column "email_verified_at" {
    type = timestamptz
    null = true  # New: add email verification
  }
  primary_key {
    columns = [column.id]
  }
  index "users_email_unique" {
    columns  = [column.email]
    unique   = true
  }
}

Atlas diff output:

-- Planned Changes:
-- Add column "email_verified_at" to table: "users"
ALTER TABLE "users" ADD COLUMN "email_verified_at" timestamptz NULL;

Atlas marks this as safe — adding a nullable column is a non-blocking operation. It flags unsafe operations like adding a NOT NULL column without a default on a large table.

Atlas Cloud AI explanation (for paid tier):

This migration adds an email_verified_at column to the users table.
Risk: LOW — Adding a nullable column in PostgreSQL takes an exclusive lock
briefly but does not rewrite the table. Safe for online deployment.

If your users table has > 10M rows, consider monitoring lock wait time.
No application queries will break unless you add NOT NULL to this column
in a future migration without a default value.

pgcmp

pgcmp is a PostgreSQL-specific schema comparison tool that produces detailed diff reports. It compares two database schemas and outputs differences in tables, columns, indexes, constraints, and functions.

# Install and compare two Postgres schemas
npm install -g pgcmp

pgcmp \
  "postgres://user:pass@localhost/db_v1" \
  "postgres://user:pass@localhost/db_v2" \
  --output diff-report.json

pgcmp output (JSON excerpt):

{
  "tables": {
    "added": ["audit_logs"],
    "removed": [],
    "modified": {
      "orders": {
        "columns": {
          "added": [
            { "name": "cancelled_reason", "type": "text", "nullable": true }
          ],
          "modified": [
            {
              "name": "status",
              "from": "varchar(20)",
              "to": "varchar(50)",
              "breaking": false
            }
          ]
        },
        "indexes": {
          "added": [
            "idx_orders_status_created_at"
          ]
        }
      }
    }
  }
}

pgcmp correctly flags varchar(20) → varchar(50) as non-breaking (increasing varchar length is safe in Postgres). It flags varchar(50) → varchar(20) as breaking.

Building a Claude-Powered Schema Diff Analyzer

For teams without dedicated schema tooling, use Claude to analyze schema diffs and provide migration safety assessments:

# schema_diff_analyzer.py
import anthropic
import subprocess
import json
from typing import NamedTuple

client = anthropic.Anthropic()


class SchemaDiffResult(NamedTuple):
    raw_diff: str
    safety_assessment: str
    migration_sql: str
    rollback_sql: str


def get_schema_dump(db_url: str) -> str:
    """Dump schema using pg_dump."""
    result = subprocess.run(
        ["pg_dump", "--schema-only", "--no-owner", "--no-acl", db_url],
        capture_output=True, text=True, check=True
    )
    return result.stdout


def analyze_schema_diff(
    old_schema: str,
    new_schema: str,
    table_row_counts: dict[str, int] | None = None
) -> SchemaDiffResult:
    """Use Claude to analyze schema differences and generate migration plan."""

    row_count_context = ""
    if table_row_counts:
        row_count_context = "Table row counts:\n" + "\n".join(
            f"  {table}: {count:,} rows"
            for table, count in table_row_counts.items()
        )

    message = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=4096,
        messages=[{
            "role": "user",
            "content": f"""Analyze the differences between these two PostgreSQL schemas.

OLD SCHEMA:
{old_schema[:8000]}

NEW SCHEMA:
{new_schema[:8000]}

{row_count_context}

Provide:
1. SAFETY ASSESSMENT: List each change with risk level (LOW/MEDIUM/HIGH) and reason
2. MIGRATION SQL: The ALTER statements to migrate from old to new schema, in correct order
3. ROLLBACK SQL: The statements to undo the migration
4. ONLINE SAFE: Which changes can run without a maintenance window
5. WARNINGS: Any potential breaking changes for application code

Format your response as JSON with keys: safety_assessment, migration_sql, rollback_sql, warnings"""
        }]
    )

    response_text = message.content[0].text

    # Parse JSON response
    try:
        # Claude sometimes wraps in ```json
 if "```json" in response_text:
            json_str = response_text.split("```json")[1].split("```")[0]
        else:
            json_str = response_text

        data = json.loads(json_str)
        return SchemaDiffResult(
            raw_diff=f"Diff between provided schemas",
            safety_assessment=data.get("safety_assessment", ""),
            migration_sql=data.get("migration_sql", ""),
            rollback_sql=data.get("rollback_sql", ""),
        )
    except json.JSONDecodeError:
        # Fall back to raw text
        return SchemaDiffResult(
            raw_diff="Parse error",
            safety_assessment=response_text,
            migration_sql="",
            rollback_sql="",
        )


def check_migration_safety(migration_file: str, db_url: str) -> dict:
    """Check a migration file for unsafe operations."""
    migration_sql = open(migration_file).read()

    # Get approximate row counts for affected tables
    affected_tables = extract_affected_tables(migration_sql)
    row_counts = get_table_row_counts(db_url, affected_tables)

    message = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"""Review this PostgreSQL migration for safety issues.

Migration SQL:
{migration_sql}

Table sizes:
{json.dumps(row_counts, indent=2)}

Identify:
1. Operations that require full table locks (AccessExclusiveLock)
2. Operations that rewrite the table (sequential scan + write)
3. Estimated lock duration for large tables
4. Safer alternatives (e.g., CREATE INDEX CONCURRENTLY, batched updates)
5. Overall risk: LOW / MEDIUM / HIGH

Be specific about PostgreSQL internals."""
        }]
    )

    return {"analysis": message.content[0].text, "row_counts": row_counts}


def extract_affected_tables(sql: str) -> list[str]:
    """Extract table names from SQL statements."""
    import re
    tables = re.findall(
        r'(?:ALTER TABLE|UPDATE|DROP TABLE|CREATE INDEX ON)\s+"?(\w+)"?',
        sql, re.IGNORECASE
    )
    return list(set(tables))


def get_table_row_counts(db_url: str, tables: list[str]) -> dict[str, int]:
    """Get approximate row counts from pg_stat_user_tables."""
    import psycopg2
    counts = {}
    if not tables:
        return counts
    try:
        conn = psycopg2.connect(db_url)
        cur = conn.cursor()
        placeholders = ','.join(['%s'] * len(tables))
        cur.execute(
            f"SELECT relname, n_live_tup FROM pg_stat_user_tables "
            f"WHERE relname IN ({placeholders})",
            tables
        )
        counts = dict(cur.fetchall())
        conn.close()
    except Exception:
        pass
    return counts

Claude Analysis Example

Input migration:

-- Add NOT NULL column with no default to large table
ALTER TABLE orders ADD COLUMN priority_level integer NOT NULL;

-- Change column type
ALTER TABLE products ALTER COLUMN price TYPE numeric(12,4);

-- Add index concurrently (safe)
CREATE INDEX CONCURRENTLY idx_orders_priority ON orders(priority_level);

Claude’s safety assessment:

{
  "safety_assessment": [
    {
      "operation": "ALTER TABLE orders ADD COLUMN priority_level integer NOT NULL",
      "risk": "HIGH",
      "reason": "Adding a NOT NULL column without DEFAULT rewrites the entire table in PostgreSQL < 11. In PG 11+, columns with constant defaults avoid table rewrites, but NOT NULL without DEFAULT still requires a full table scan to verify. With 8M rows, this could lock the table for 30-60 seconds.",
      "recommendation": "Add column as nullable first, backfill in batches, then add NOT NULL constraint."
    },
    {
      "operation": "ALTER TABLE products ALTER COLUMN price TYPE numeric(12,4)",
      "risk": "MEDIUM",
      "reason": "Type change from numeric to numeric with higher precision rewrites the table.",
      "recommendation": "Use a background migration pattern if the products table is large."
    },
    {
      "operation": "CREATE INDEX CONCURRENTLY idx_orders_priority ON orders(priority_level)",
      "risk": "LOW",
      "reason": "CONCURRENTLY builds the index without blocking reads or writes.",
      "recommendation": "Safe to run online. Cannot run inside a transaction block."
    }
  ]
}

Tool Comparison

Feature Atlas pgcmp Claude DIY
Schema diff generation Excellent Good Via prompt
Safety analysis Cloud tier Manual Excellent
Migration SQL generation Excellent No Good
Rollback generation Good No Good
Lock duration estimates No No Yes
Custom rules Via providers No Via prompt
Cost Open source + paid Cloud Free API costs

Built by theluckystrike — More at zovo.one