Data pipeline failures are quiet — you don’t get a stack trace, you get wrong numbers in a dashboard three days later. Automated testing is the fix, but writing data quality tests is tedious. AI tools can generate test suites from sample data, SQL schemas, and business rules in minutes instead of hours.
This guide covers three approaches: AI-generated dbt tests, Great Expectations suite generation, and LLM-based anomaly checks on pipeline outputs.
Table of Contents
- Why Pipeline Testing Is Underinvested
- Setting Up the Stack
- Approach 1: AI-Generated dbt Tests
- Approach 2: Great Expectations Suite Generation
- Approach 3: LLM-Based Output Anomaly Detection
- Integrating Anomaly Detection into Airflow
- CI Integration for dbt
- Tool Comparison
- Related Reading
Why Pipeline Testing Is Underinvested
Application code has decades of testing culture: unit tests, integration tests, CI pipelines. Data pipelines don’t. The reasons are practical: data quality tests require domain knowledge about acceptable value ranges, valid states, and business rules. A generic not_null check doesn’t encode that an order with status = 'shipped' must have a non-null shipped_at. Writing that test requires someone who understands both the SQL and the business logic.
AI tools close this gap by inferring business rules from data patterns and model structure. When you show Claude a model that filters WHERE created_at >= '2024-01-01' and a sample row with status = 'shipped' and shipped_at = null, it knows to generate the constraint test. You don’t have to write it.
Setting Up the Stack
Install the dependencies for all three approaches:
pip install anthropic dbt-postgres great-expectations pandas psycopg2-binary pyyaml
# For dbt project initialization (if you don't have one)
dbt init my_project
cd my_project
# Configure your profile in ~/.dbt/profiles.yml
# Then test connection
dbt debug
For Great Expectations:
pip install great-expectations
great_expectations init
# Creates great_expectations/ directory with context
Approach 1: AI-Generated dbt Tests
dbt’s built-in tests (not_null, unique, accepted_values) cover basics. The hard part is writing custom SQL tests for business rules. Claude handles this well.
# generate_dbt_tests.py
from anthropic import Anthropic
import yaml
from pathlib import Path
client = Anthropic()
def generate_dbt_tests(model_sql: str, model_name: str, sample_data: list[dict]) -> dict:
"""Generate dbt test YAML for a model given its SQL and sample data."""
sample_json = str(sample_data[:10])
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2000,
messages=[{
"role": "user",
"content": f"""Generate comprehensive dbt tests for this model.
Model name: {model_name}
Sample data (10 rows): {sample_json}
Model SQL:
{model_sql}
Generate YAML with:
1. Column-level tests: not_null, unique, accepted_values (based on actual values in sample)
2. At least 2 custom relationship tests if foreign keys are apparent
3. At least 1 custom SQL test for business rule validation
4. Source freshness tests if there's a created_at/updated_at column
Return valid dbt YAML schema format only."""
}]
)
# Parse and validate the YAML
yaml_text = response.content[0].text
if yaml_text.startswith("```"):
yaml_text = "\n".join(yaml_text.split("\n")[1:-1])
return yaml.safe_load(yaml_text)
# Example usage
model_sql = """
SELECT
order_id,
user_id,
status,
total_amount,
currency,
created_at,
shipped_at
FROM {{ source('ecommerce', 'orders') }}
WHERE created_at >= '2024-01-01'
"""
sample_data = [
{"order_id": "ord_001", "user_id": "usr_123", "status": "shipped",
"total_amount": 4999, "currency": "USD", "created_at": "2026-01-15",
"shipped_at": "2026-01-17"},
{"order_id": "ord_002", "user_id": "usr_456", "status": "pending",
"total_amount": 1299, "currency": "USD", "created_at": "2026-01-16",
"shipped_at": None},
]
tests = generate_dbt_tests(model_sql, "orders", sample_data)
print(yaml.dump(tests, default_flow_style=False))
Generated dbt YAML:
version: 2
models:
- name: orders
columns:
- name: order_id
tests:
- not_null
- unique
- name: user_id
tests:
- not_null
- relationships:
to: ref('users')
field: user_id
- name: status
tests:
- not_null
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled', 'refunded']
- name: total_amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: currency
tests:
- not_null
- accepted_values:
values: ['USD', 'EUR', 'GBP']
- name: created_at
tests:
- not_null
tests:
# Business rule: shipped orders must have a shipped_at date
- dbt_utils.expression_is_true:
expression: "status != 'shipped' OR shipped_at IS NOT NULL"
name: shipped_orders_have_shipped_at
# Business rule: shipped_at cannot be before created_at
- dbt_utils.expression_is_true:
expression: "shipped_at IS NULL OR shipped_at >= created_at"
name: shipped_at_not_before_created_at
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 24, period: hour}
The two custom SQL tests at the bottom are the valuable ones. not_null and unique are boilerplate — the business rule tests are what actually catch data quality problems in production.
Approach 2: Great Expectations Suite Generation
Great Expectations needs a test suite for each dataset. Generating one manually takes hours. AI can generate it from a sample DataFrame:
# generate_ge_suite.py
import pandas as pd
import json
from anthropic import Anthropic
client = Anthropic()
def profile_dataframe(df: pd.DataFrame) -> dict:
"""Generate a statistical profile for AI to analyze."""
profile = {}
for col in df.columns:
col_profile = {
"dtype": str(df[col].dtype),
"null_count": int(df[col].isna().sum()),
"null_pct": round(df[col].isna().mean() * 100, 1),
"unique_count": int(df[col].nunique()),
}
if df[col].dtype in ["int64", "float64"]:
col_profile.update({
"min": float(df[col].min()),
"max": float(df[col].max()),
"mean": round(float(df[col].mean()), 2),
"p5": float(df[col].quantile(0.05)),
"p95": float(df[col].quantile(0.95)),
})
elif df[col].dtype == "object":
col_profile["sample_values"] = df[col].dropna().unique()[:10].tolist()
profile[col] = col_profile
return profile
def generate_ge_expectations(df: pd.DataFrame, dataset_name: str) -> list[dict]:
"""Generate Great Expectations expectations for a DataFrame."""
profile = profile_dataframe(df)
row_count = len(df)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2500,
messages=[{
"role": "user",
"content": f"""Generate Great Expectations expectations for this dataset.
Dataset: {dataset_name}
Row count: {row_count}
Column profiles:
{json.dumps(profile, indent=2, default=str)}
Return a JSON array of expectation objects using Great Expectations API format.
Include:
- expect_column_values_to_not_be_null for non-null columns
- expect_column_values_to_be_between for numeric columns (use p5/p95 with buffer)
- expect_column_values_to_be_in_set for low-cardinality string columns
- expect_column_to_exist for all columns
- expect_table_row_count_to_be_between (50% to 200% of current count)
Return only valid JSON, no explanation."""
}]
)
text = response.content[0].text
if text.startswith("```"):
text = "\n".join(text.split("\n")[1:-1])
return json.loads(text)
def save_ge_suite(expectations: list[dict], suite_name: str, output_dir: str = "great_expectations/expectations"):
"""Save expectations to a GE suite file."""
suite = {
"data_asset_type": None,
"expectation_suite_name": suite_name,
"expectations": expectations,
"meta": {"great_expectations_version": "0.18.0"}
}
output_path = f"{output_dir}/{suite_name}.json"
with open(output_path, "w") as f:
json.dump(suite, f, indent=2)
print(f"Saved {len(expectations)} expectations to {output_path}")
The key advantage of using the statistical profile rather than raw data is that it’s safe to send to an external API. You’re not sending actual orders or user records — you’re sending distributions, null rates, and sample cardinalities. For regulated data environments this distinction matters.
Approach 3: LLM-Based Output Anomaly Detection
For pipelines where the data structure changes between runs, use Claude to analyze output distributions:
# pipeline_output_checker.py
from anthropic import Anthropic
import pandas as pd
import json
client = Anthropic()
def compare_pipeline_runs(
current_df: pd.DataFrame,
baseline_df: pd.DataFrame,
pipeline_name: str
) -> dict:
"""Use Claude to identify significant differences between two pipeline outputs."""
def summarize_df(df: pd.DataFrame) -> dict:
return {
"row_count": len(df),
"columns": list(df.columns),
"numeric_stats": df.describe().round(2).to_dict(),
"null_counts": df.isna().sum().to_dict(),
}
current_summary = summarize_df(current_df)
baseline_summary = summarize_df(baseline_df)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1000,
messages=[{
"role": "user",
"content": f"""Compare these two pipeline run outputs and identify anomalies.
Pipeline: {pipeline_name}
Baseline run:
{json.dumps(baseline_summary, indent=2)}
Current run:
{json.dumps(current_summary, indent=2)}
Identify:
1. ROW_COUNT_CHANGE: Is the change in row count expected or suspicious?
2. DISTRIBUTION_SHIFTS: Are any numeric distributions significantly different?
3. NULL_CHANGES: Have null rates changed significantly (>5% absolute change)?
4. MISSING_COLUMNS: Any columns present in baseline but missing from current?
5. VERDICT: [PASS / WARN / FAIL] — should this pipeline output be trusted?
Be specific with numbers."""
}]
)
text = response.content[0].text
# Parse verdict
verdict = "WARN"
if "VERDICT: PASS" in text:
verdict = "PASS"
elif "VERDICT: FAIL" in text:
verdict = "FAIL"
return {
"verdict": verdict,
"analysis": text,
"pipeline": pipeline_name
}
The strength of this approach is catching shifts that fixed thresholds miss. A total_amount column where the mean drops from $85 to $12 between runs is suspicious — a fixed not-null check won’t catch it, but the LLM comparison will flag it immediately as a distribution shift worth investigating.
Integrating Anomaly Detection into Airflow
# airflow_dag_with_checks.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
def run_pipeline_with_check(**context):
"""Run pipeline and validate output before marking success."""
# Run your actual pipeline
current_df = run_etl_pipeline()
# Load baseline from yesterday's run
baseline_df = load_baseline_from_s3(
date=context["ds"],
pipeline="orders_daily"
)
result = compare_pipeline_runs(current_df, baseline_df, "orders_daily")
if result["verdict"] == "FAIL":
raise ValueError(f"Pipeline output failed quality check:\n{result['analysis']}")
elif result["verdict"] == "WARN":
# Log warning but don't fail the DAG
context["task_instance"].xcom_push(
key="quality_warning",
value=result["analysis"]
)
return result["verdict"]
with DAG(
"orders_daily_pipeline",
default_args={"retries": 1, "retry_delay": timedelta(minutes=5)},
schedule_interval="0 6 * * *",
start_date=datetime(2026, 1, 1),
) as dag:
run_and_check = PythonOperator(
task_id="run_pipeline_with_quality_check",
python_callable=run_pipeline_with_check,
provide_context=True
)
CI Integration for dbt
# .github/workflows/dbt-test-gen.yml
name: Generate dbt Tests
on:
pull_request:
paths:
- 'models/**/*.sql'
jobs:
generate-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup dbt
run: pip install dbt-postgres anthropic pyyaml
- name: Generate tests for new models
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
DBT_PROFILES_DIR: .
run: |
python scripts/generate_dbt_tests.py \
--models-dir models/ \
--output-dir models/ \
--only-missing-tests
- name: Run generated tests
run: dbt test --select state:modified+
This workflow triggers on any PR that modifies SQL models. For new models with no test YAML, it generates one. For existing models where the SQL changed, it regenerates the tests from the updated SQL. The --only-missing-tests flag prevents overwriting hand-crafted tests with AI-generated ones.
Tool Comparison
| Tool | dbt Tests | GE Suite | Distribution Checks | Custom Rules |
|---|---|---|---|---|
| Claude (API) | Excellent | Excellent | Via code | Natural language |
| GPT-4 | Good | Good | Via code | Natural language |
| dbt-osmosis | Auto-docs only | No | No | No |
| re_data | Anomaly detection | No | Yes | Limited |
| Elementary | Monitoring | No | Yes | No |
Claude’s edge over GPT-4 for this use case is consistency: it reliably returns valid YAML and JSON rather than prose explanations mixed with code. For automated pipelines where you’re parsing the output programmatically, that matters.