Database Integration Patterns

Module 16: MCP & Tool Integration | Expansion Guide

Back to Module 16

The Problem

You want Claude to query your database. "Show me all users who signed up last week." "Which orders are still pending?" Simple questions with simple SQL answers.

But you're terrified. What if Claude generates DROP TABLE users;? What if it exposes sensitive data? What if a prompt injection tricks it into bypassing restrictions? Database access is powerful, which means it's dangerous.

Most developers either ban AI database access entirely (losing the productivity win) or YOLO it with full credentials (disaster waiting to happen). There's a middle path: safety patterns that give AI useful access without the existential risk.

The Core Insight

Database access for AI needs three layers: technical restrictions, approval workflows, and sandboxing. Pick the right layer for your risk tolerance.

Think of it like giving someone keys to your house:

The pattern you choose depends on your context:

Pattern Use Case Risk Level Complexity
Read-Only Analytics, reporting, debugging Low Easy
Approval-Required Data updates, schema changes Medium Medium
Sandbox Experimentation, testing Very Low High

The Walkthrough

Pattern 1: Read-Only Access (Start Here)

Create a database user with SELECT-only permissions. Even if Claude tries to write, the database refuses.

# PostgreSQL example
CREATE ROLE ai_readonly WITH LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE myapp TO ai_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ai_readonly;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO ai_readonly;

# Prevent future tables from having write access
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO ai_readonly;

Now build an MCP server using these credentials:

import asyncpg
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio

server = Server("postgres-readonly")

# Connection pool with read-only user
pool = None

async def init_db():
    global pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="myapp",
        user="ai_readonly",
        password="secure_password",
        min_size=1,
        max_size=5
    )

@server.list_tools()
async def list_tools():
    return [
        Tool(
            name="query_database",
            description="Execute a SELECT query against the database. Read-only access. Returns up to 100 rows.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "SELECT query to execute"
                    }
                },
                "required": ["query"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    if name != "query_database":
        raise ValueError(f"Unknown tool: {name}")

    query = arguments["query"].strip()

    # Validation: ensure it's a SELECT query
    if not query.upper().startswith("SELECT"):
        raise ValueError("Only SELECT queries are allowed")

    # Additional safety: check for suspicious patterns
    forbidden = ["DROP", "DELETE", "UPDATE", "INSERT", "ALTER", "CREATE", "TRUNCATE"]
    query_upper = query.upper()
    for keyword in forbidden:
        if keyword in query_upper:
            raise ValueError(f"Query contains forbidden keyword: {keyword}")

    # Execute query
    async with pool.acquire() as conn:
        try:
            rows = await conn.fetch(query, timeout=10.0)
        except asyncpg.PostgresError as e:
            return [TextContent(type="text", text=f"Database error: {str(e)}")]

    # Format results
    if not rows:
        result = "Query returned 0 rows"
    else:
        # Limit to 100 rows
        rows = rows[:100]
        headers = list(rows[0].keys())
        result = f"Returned {len(rows)} rows:\n\n"
        result += " | ".join(headers) + "\n"
        result += "-" * (len(headers) * 15) + "\n"
        for row in rows:
            result += " | ".join(str(row[h]) for h in headers) + "\n"

    return [TextContent(type="text", text=result)]

async def main():
    await init_db()
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Defense in Depth

Notice we validate queries in code AND use database permissions. If your validation has a bug, the DB user still can't write. Never rely on a single layer of protection.

Pattern 2: Approval-Required Writes

For write operations, generate the query but require human approval before execution:

@server.list_tools()
async def list_tools():
    return [
        # ... existing query_database tool ...
        Tool(
            name="propose_update",
            description="Generate an UPDATE or INSERT query for review. Does NOT execute - returns SQL for human approval.",
            inputSchema={
                "type": "object",
                "properties": {
                    "intent": {
                        "type": "string",
                        "description": "What you want to change (e.g., 'Update user email')"
                    },
                    "table": {
                        "type": "string",
                        "description": "Table to modify"
                    }
                },
                "required": ["intent", "table"]
            }
        )
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    # ... existing query_database handler ...

    elif name == "propose_update":
        intent = arguments["intent"]
        table = arguments["table"]

        # Use Claude to generate the SQL
        # (In real implementation, you'd call the LLM API here)
        result = f"""
PROPOSED UPDATE - REQUIRES APPROVAL

Intent: {intent}
Table: {table}

Generated SQL:
-- Review this carefully before executing
-- [SQL would be generated here]

To execute:
1. Review the SQL above
2. Test on staging database first
3. Run manually if approved

This query has NOT been executed.
        """

        return [TextContent(type="text", text=result)]

The tool generates SQL but doesn't run it. Human stays in the loop for dangerous operations.

Pattern 3: Sandbox Database

For experimentation, point the AI at a copy of production data:

# Create sandbox database (script run periodically)
#!/bin/bash

# Dump production (read replica recommended)
pg_dump production_db -h prod-replica -U admin > /tmp/prod_dump.sql

# Restore to sandbox
dropdb sandbox_db --if-exists
createdb sandbox_db
psql sandbox_db < /tmp/prod_dump.sql

# Grant AI full access to sandbox
psql sandbox_db -c "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO ai_user;"

echo "Sandbox refreshed from production at $(date)" >> /var/log/sandbox_refresh.log

Now the AI can DROP TABLE all it wants - it's just a sandbox. Worst case, you refresh it.

PII in Sandbox

If your production data contains PII, anonymize it before copying to sandbox. The AI shouldn't see real user emails, names, or sensitive data even in a sandbox environment.

Query Validation Strategies

Validating AI-generated SQL is hard. Here's a layered approach:

Layer 1: Keyword Blacklist

FORBIDDEN_KEYWORDS = [
    "DROP", "DELETE", "UPDATE", "INSERT", "TRUNCATE",
    "ALTER", "CREATE", "GRANT", "REVOKE", "EXECUTE"
]

def is_read_only(query: str) -> bool:
    query_upper = query.upper()
    return all(keyword not in query_upper for keyword in FORBIDDEN_KEYWORDS)

Limitation: Doesn't catch SQL injection or nested queries.

Layer 2: SQL Parser

import sqlparse

def validate_query(query: str) -> tuple[bool, str]:
    try:
        parsed = sqlparse.parse(query)
        if not parsed:
            return False, "Empty query"

        statement = parsed[0]
        stmt_type = statement.get_type()

        if stmt_type != "SELECT":
            return False, f"Only SELECT allowed, got {stmt_type}"

        return True, "Valid"
    except Exception as e:
        return False, f"Parse error: {e}"

Better: Understands SQL structure, not just keywords.

Layer 3: Database Transaction Isolation

async with pool.acquire() as conn:
    # Start read-only transaction
    async with conn.transaction(readonly=True):
        rows = await conn.fetch(query)
    # Transaction auto-commits (or rolls back if error)

Best: Database enforces read-only at transaction level. Even if validation fails, writes are impossible.

Failure Patterns

1. Credential Leak

Symptom: Database credentials exposed in error messages or logs.

Fix: Never log connection strings. Use environment variables, not hardcoded passwords:

import os
pool = await asyncpg.create_pool(
    host=os.getenv("DB_HOST"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD")  # Never log this
)

2. Unbounded Result Sets

Symptom: Claude asks for SELECT * FROM large_table, server runs out of memory.

Fix: Always add LIMIT to user queries:

def add_limit(query: str, max_rows: int = 100) -> str:
    query = query.rstrip(";").strip()
    if "LIMIT" not in query.upper():
        query += f" LIMIT {max_rows}"
    return query

3. Slow Query DoS

Symptom: AI generates expensive joins that lock the database.

Fix: Set query timeout and use EXPLAIN to estimate cost:

async with conn.transaction():
    # Set statement timeout to 5 seconds
    await conn.execute("SET statement_timeout = 5000")

    # Optionally: check query cost first
    explain = await conn.fetch(f"EXPLAIN {query}")
    # Parse explain output, reject if estimated cost too high

    rows = await conn.fetch(query)

4. Schema Confusion

Symptom: AI queries wrong tables or misunderstands schema.

Fix: Provide schema as an MCP resource:

@server.list_resources()
async def list_resources():
    return [
        Resource(
            uri="schema://database/tables",
            name="Database Schema",
            description="List of tables and columns",
            mimeType="text/plain"
        )
    ]

@server.read_resource()
async def read_resource(uri: str):
    if uri == "schema://database/tables":
        # Generate schema documentation
        async with pool.acquire() as conn:
            tables = await conn.fetch("""
                SELECT table_name, column_name, data_type
                FROM information_schema.columns
                WHERE table_schema = 'public'
                ORDER BY table_name, ordinal_position
            """)

        schema_doc = "# Database Schema\n\n"
        # Format as markdown table...
        return [TextContent(type="text", text=schema_doc)]

Now Claude can reference the schema before generating queries.

Quick Reference

Safety Checklist:

Pattern Selection:

Production Hardening: