Complex Filtering and Search

Implement advanced search with multiple filter conditions

Filtering Search Querying

Problem

You need to implement a search feature that supports multiple filters, date ranges, text search, and sorting.

Solution

Advanced product search with multiple filters
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import get_rows
from typing import Optional
from datetime import datetime

class Product(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    description: str
    price: float
    category: str = Field(index=True)
    stock: int
    created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
    is_active: bool = Field(default=True)

def search_products(
    session: Session,
    search_term: Optional[str] = None,
    category: Optional[str] = None,
    min_price: Optional[float] = None,
    max_price: Optional[float] = None,
    in_stock_only: bool = False,
    created_after: Optional[datetime] = None,
    created_before: Optional[datetime] = None,
    page: int = 1,
    page_size: int = 20,
    sort_by: str = "created_at",
    sort_desc: bool = True
):
    """
    Advanced product search with multiple filter options
    """
    # Build filters dynamically
    filters = {"is_active": True}

    # Text search (partial match)
    if search_term:
        filters["name__like"] = f"%{search_term}%"

    # Category filter (exact match)
    if category:
        filters["category"] = category

    # Price range filters
    if min_price is not None:
        filters["price__gte"] = min_price
    if max_price is not None:
        filters["price__lte"] = max_price

    # Stock filter
    if in_stock_only:
        filters["stock__gt"] = 0

    # Date range filters
    if created_after:
        filters["created_at__gte"] = created_after
    if created_before:
        filters["created_at__lte"] = created_before

    # Calculate pagination
    offset = (page - 1) * page_size

    # Execute query
    success, products = get_rows(
        session_inst=session,
        model=Product,
        offset=offset,
        limit=page_size,
        sort_field=sort_by,
        sort_desc=sort_desc,
        **filters
    )

    return {
        "products": products if success else [],
        "page": page,
        "page_size": page_size,
        "total": len(products) if success else 0
    }

# Usage examples
with Session(engine) as session:
    # Search for electronics under $100
    results = search_products(
        session,
        category="electronics",
        max_price=100,
        in_stock_only=True
    )

    # Search for products created this month
    from datetime import datetime, timedelta
    thirty_days_ago = datetime.utcnow() - timedelta(days=30)
    recent = search_products(
        session,
        created_after=thirty_days_ago,
        sort_by="created_at",
        sort_desc=True
    )

    # Text search with pagination
    page_2 = search_products(
        session,
        search_term="laptop",
        page=2,
        page_size=10
    )

Explanation

This recipe demonstrates how to:

  • Use comparison operators (__like, __gte, __lte, __gt) for flexible filtering
  • Build filters dynamically based on user input
  • Combine multiple filter conditions
  • Implement pagination with calculated offsets
  • Support custom sorting

Key Points

  • Filters are combined with AND logic by default
  • Use __like for partial text matching with wildcards
  • Index frequently filtered fields for better performance
  • Always validate user inputs before building filters

Cursor-Based Pagination

Implement efficient pagination for real-time data

Pagination Performance Scalability

Problem

Offset-based pagination becomes slow with large datasets and can miss records when data is inserted during pagination. You need cursor-based pagination for better performance and consistency.

Solution

Cursor-based pagination implementation
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import get_rows
from typing import Optional, List
from datetime import datetime

class Post(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str
    content: str
    created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
    is_published: bool = Field(default=True)

def get_posts_cursor_paginated(
    session: Session,
    cursor: Optional[int] = None,
    limit: int = 20,
    is_published: bool = True
) -> dict:
    """
    Get posts using cursor-based pagination
    Cursor is the ID of the last post from the previous page
    """
    filters = {"is_published": is_published}

    # If cursor provided, get posts with ID greater than cursor
    if cursor:
        filters["id__gt"] = cursor

    success, posts = get_rows(
        session_inst=session,
        model=Post,
        limit=limit,
        sort_field="id",
        sort_desc=False,  # Important: consistent sort order
        **filters
    )

    posts = posts if success else []

    # Generate next cursor
    next_cursor = posts[-1].id if posts else None

    return {
        "posts": posts,
        "next_cursor": next_cursor,
        "has_more": len(posts) == limit
    }

def get_posts_time_based_cursor(
    session: Session,
    cursor_time: Optional[datetime] = None,
    cursor_id: Optional[int] = None,
    limit: int = 20
) -> dict:
    """
    Time-based cursor pagination (for chronological feeds)
    Uses created_at + id for stable pagination
    """
    filters = {"is_published": True}

    if cursor_time and cursor_id:
        # Get posts created before cursor_time
        # OR posts created at cursor_time but with id > cursor_id
        # This requires custom query - here's a simplified version
        filters["created_at__lte"] = cursor_time
        filters["id__gt"] = cursor_id

    success, posts = get_rows(
        session_inst=session,
        model=Post,
        limit=limit,
        sort_field="created_at",
        sort_desc=True,
        **filters
    )

    posts = posts if success else []

    # Generate next cursor
    if posts:
        last_post = posts[-1]
        next_cursor = {
            "time": last_post.created_at.isoformat(),
            "id": last_post.id
        }
    else:
        next_cursor = None

    return {
        "posts": posts,
        "next_cursor": next_cursor,
        "has_more": len(posts) == limit
    }

# Usage in FastAPI
from fastapi import APIRouter, Query

router = APIRouter()

@router.get("/posts")
async def list_posts(
    cursor: Optional[int] = Query(None),
    limit: int = Query(20, le=100),
    session: Session = Depends(get_session)
):
    """Get posts with cursor pagination"""
    return get_posts_cursor_paginated(session, cursor, limit)

# Client usage example
def fetch_all_posts(session: Session):
    """Fetch all posts using cursor pagination"""
    all_posts = []
    cursor = None

    while True:
        result = get_posts_cursor_paginated(session, cursor=cursor, limit=100)
        all_posts.extend(result["posts"])

        if not result["has_more"]:
            break

        cursor = result["next_cursor"]

    return all_posts

Explanation

Cursor-based pagination uses a unique identifier (like ID or timestamp) as a marker instead of offset:

  • ID-based cursor: Simple and efficient for most cases
  • Time-based cursor: Better for chronological feeds
  • Consistent ordering: Critical for stable pagination
  • No offset arithmetic: Better performance with large datasets
Pro Tip: For reverse chronological feeds (newest first), use id__lt instead of id__gt and sort descending.

Batch Processing with Error Handling

Process large datasets efficiently with graceful error handling

Batch Operations Error Handling Performance

Problem

You need to process thousands of records, but don't want a single failure to stop the entire batch. You need proper error tracking and the ability to retry failed records.

Solution

Batch processing with error recovery
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import (
    insert_data_rows, get_rows, update_row,
    transaction, BulkOperationError
)
from typing import List, Dict, Any, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ImportRecord(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    external_id: str = Field(index=True, unique=True)
    data: str
    status: str = Field(default="pending")  # pending, processing, success, failed
    error_message: Optional[str] = None
    retry_count: int = Field(default=0)

def process_batch(
    session: Session,
    records: List[ImportRecord],
    batch_size: int = 100
) -> Dict[str, Any]:
    """
    Process records in batches with error handling
    """
    total_success = 0
    total_failed = 0
    failed_records = []

    # Process in batches
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        logger.info(f"Processing batch {i // batch_size + 1}: {len(batch)} records")

        try:
            with transaction(session) as tx:
                # Try bulk insert first
                success = insert_data_rows(batch, tx)
                if success:
                    total_success += len(batch)
                    logger.info(f"Batch inserted successfully: {len(batch)} records")
                else:
                    # Bulk insert failed, try individual inserts
                    logger.warning("Bulk insert failed, trying individual inserts")
                    for record in batch:
                        try:
                            write_row(record, tx)
                            total_success += 1
                        except Exception as e:
                            total_failed += 1
                            failed_records.append({
                                "record": record,
                                "error": str(e)
                            })
                            logger.error(f"Failed to insert record {record.external_id}: {e}")
        except BulkOperationError as e:
            logger.error(f"Batch operation error: {e}")
            total_failed += len(batch)
            failed_records.extend([
                {"record": r, "error": str(e)}
                for r in batch
            ])

    return {
        "total_processed": len(records),
        "total_success": total_success,
        "total_failed": total_failed,
        "failed_records": failed_records
    }

def process_with_retry(
    session: Session,
    max_retries: int = 3
) -> Dict[str, Any]:
    """
    Process pending records with retry logic
    """
    # Get pending records
    success, pending = get_rows(
        session_inst=session,
        model=ImportRecord,
        status="pending",
        retry_count__lt=max_retries,
        limit=1000
    )

    if not success or not pending:
        return {"message": "No pending records"}

    results = {"processed": 0, "succeeded": 0, "failed": 0, "max_retries_exceeded": 0}

    for record in pending:
        # Mark as processing
        update_row(
            id_str=record.id,
            data_dict={"status": "processing"},
            model=ImportRecord,
            session_inst=session
        )

        try:
            # Simulate processing
            # In real scenario, this would be your business logic
            process_record(record)

            # Mark as success
            update_row(
                id_str=record.id,
                data_dict={"status": "success", "error_message": None},
                model=ImportRecord,
                session_inst=session
            )
            results["succeeded"] += 1

        except Exception as e:
            logger.error(f"Error processing record {record.id}: {e}")

            # Increment retry count
            new_retry_count = record.retry_count + 1

            if new_retry_count >= max_retries:
                # Max retries exceeded
                update_row(
                    id_str=record.id,
                    data_dict={
                        "status": "failed",
                        "retry_count": new_retry_count,
                        "error_message": f"Max retries exceeded: {str(e)}"
                    },
                    model=ImportRecord,
                    session_inst=session
                )
                results["max_retries_exceeded"] += 1
            else:
                # Mark for retry
                update_row(
                    id_str=record.id,
                    data_dict={
                        "status": "pending",
                        "retry_count": new_retry_count,
                        "error_message": str(e)
                    },
                    model=ImportRecord,
                    session_inst=session
                )
                results["failed"] += 1

        results["processed"] += 1

    return results

def process_record(record: ImportRecord):
    """
    Simulate record processing
    Replace with actual business logic
    """
    # Your processing logic here
    import json
    data = json.loads(record.data)
    # ... process data ...
    pass

# Usage
with Session(engine) as session:
    # Create sample records
    records = [
        ImportRecord(external_id=f"ext_{i}", data='{"value": 123}')
        for i in range(1000)
    ]

    # Process in batches
    results = process_batch(session, records, batch_size=100)
    print(f"Success: {results['total_success']}, Failed: {results['total_failed']}")

    # Process with retry logic
    retry_results = process_with_retry(session, max_retries=3)
    print(f"Retry results: {retry_results}")

Explanation

This recipe shows how to:

  • Process large datasets in manageable batches
  • Use bulk operations with fallback to individual processing
  • Track processing status in the database
  • Implement retry logic with exponential backoff
  • Log errors without stopping the entire process
  • Report detailed results with success/failure counts
Warning: Always use transactions for batch operations to ensure data consistency. If a batch fails, all changes in that batch are rolled back.

Optimistic Locking for Concurrent Updates

Prevent lost updates in multi-user environments

Concurrency Race Conditions Data Integrity

Problem

Multiple users can update the same record simultaneously, leading to lost updates. You need to detect and handle concurrent modifications.

Solution

Optimistic locking with version field
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import get_row, update_row, AuditMixin
from typing import Optional
from datetime import datetime

class BankAccount(SQLModel, AuditMixin, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    account_number: str = Field(index=True, unique=True)
    balance: float
    version: int = Field(default=0)  # Optimistic locking version

class ConcurrentUpdateError(Exception):
    """Raised when concurrent update is detected"""
    pass

def update_balance_safe(
    session: Session,
    account_id: int,
    amount: float,
    expected_version: int,
    user: str
) -> BankAccount:
    """
    Update account balance with optimistic locking
    """
    # Get current account state
    success, account = get_row(
        id_str=account_id,
        session_inst=session,
        model=BankAccount
    )

    if not success:
        raise ValueError(f"Account {account_id} not found")

    # Check version matches (no concurrent updates)
    if account.version != expected_version:
        raise ConcurrentUpdateError(
            f"Account was modified by another user. "
            f"Expected version {expected_version}, got {account.version}"
        )

    # Calculate new balance
    new_balance = account.balance + amount
    if new_balance < 0:
        raise ValueError("Insufficient funds")

    # Update with version increment
    success, updated = update_row(
        id_str=account_id,
        data_dict={
            "balance": new_balance,
            "version": account.version + 1,
            "updated_by": user
        },
        model=BankAccount,
        session_inst=session
    )

    if not success:
        raise RuntimeError("Update failed")

    return updated

def transfer_with_optimistic_locking(
    session: Session,
    from_account_id: int,
    to_account_id: int,
    amount: float,
    user: str,
    max_retries: int = 3
) -> dict:
    """
    Transfer money between accounts with retry on concurrent updates
    """
    for attempt in range(max_retries):
        try:
            # Get current versions
            _, from_account = get_row(
                id_str=from_account_id,
                session_inst=session,
                model=BankAccount
            )
            _, to_account = get_row(
                id_str=to_account_id,
                session_inst=session,
                model=BankAccount
            )

            from_version = from_account.version
            to_version = to_account.version

            # Perform transfer
            with transaction(session) as tx:
                # Debit from source
                update_balance_safe(
                    tx, from_account_id, -amount, from_version, user
                )
                # Credit to destination
                update_balance_safe(
                    tx, to_account_id, amount, to_version, user
                )

            return {
                "success": True,
                "attempts": attempt + 1,
                "message": f"Transferred {amount} successfully"
            }

        except ConcurrentUpdateError as e:
            if attempt == max_retries - 1:
                return {
                    "success": False,
                    "attempts": attempt + 1,
                    "error": f"Max retries exceeded: {str(e)}"
                }
            # Wait briefly before retry (exponential backoff)
            import time
            time.sleep(0.1 * (2 ** attempt))
            continue

    return {"success": False, "error": "Unexpected error"}

# Usage example
with Session(engine) as session:
    # Create accounts
    acc1 = BankAccount(account_number="ACC001", balance=1000.0, created_by="system")
    acc2 = BankAccount(account_number="ACC002", balance=500.0, created_by="system")

    write_row(acc1, session)
    write_row(acc2, session)

    # Transfer with optimistic locking
    result = transfer_with_optimistic_locking(
        session,
        from_account_id=acc1.id,
        to_account_id=acc2.id,
        amount=200.0,
        user="admin"
    )

    print(f"Transfer result: {result}")

# FastAPI endpoint with optimistic locking
from fastapi import HTTPException

@app.post("/accounts/{account_id}/deposit")
async def deposit(
    account_id: int,
    amount: float,
    version: int,
    session: Session = Depends(get_session)
):
    """Deposit money with optimistic locking"""
    try:
        updated_account = update_balance_safe(
            session,
            account_id,
            amount,
            expected_version=version,
            user="api"
        )
        return {
            "balance": updated_account.balance,
            "version": updated_account.version
        }
    except ConcurrentUpdateError as e:
        raise HTTPException(status_code=409, detail=str(e))
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

Explanation

Optimistic locking prevents lost updates by:

  • Adding a version field that increments on each update
  • Checking the version before updating
  • Raising an error if the version changed (concurrent update detected)
  • Implementing retry logic with exponential backoff

When to Use Optimistic Locking

  • Financial transactions (accounts, payments)
  • Inventory management
  • Document editing with multiple users
  • Any scenario where concurrent updates could cause data loss

Cascading Soft Deletes

Safely delete parent records and all related children

Soft Delete Relationships Data Integrity

Problem

When soft-deleting a parent record, you need to also soft-delete all related child records to maintain referential integrity and prevent orphaned records.

Solution

Cascading soft deletes with relationships
from sqlmodel import Session, Field, SQLModel, Relationship
from sqlmodel_crud_utils import SoftDeleteMixin, get_rows, transaction
from typing import Optional, List

class Organization(SQLModel, SoftDeleteMixin, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    # Relationship to teams
    teams: List["Team"] = Relationship(back_populates="organization")

class Team(SQLModel, SoftDeleteMixin, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    organization_id: int = Field(foreign_key="organization.id")
    # Relationships
    organization: Organization = Relationship(back_populates="teams")
    members: List["TeamMember"] = Relationship(back_populates="team")

class TeamMember(SQLModel, SoftDeleteMixin, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    email: str
    team_id: int = Field(foreign_key="team.id")
    # Relationship
    team: Team = Relationship(back_populates="members")

def soft_delete_organization_cascade(
    session: Session,
    org_id: int,
    deleted_by: str
) -> dict:
    """
    Soft delete an organization and all its teams and members
    """
    # Get organization with relationships
    success, org = get_row(
        id_str=org_id,
        session_inst=session,
        model=Organization,
        selectinload_list=["teams"]
    )

    if not success or org.is_deleted:
        return {"success": False, "error": "Organization not found"}

    stats = {
        "organizations": 0,
        "teams": 0,
        "members": 0
    }

    with transaction(session) as tx:
        # Get all teams for this organization
        _, teams = get_rows(
            session_inst=tx,
            model=Team,
            organization_id=org_id,
            is_deleted=False
        )

        # Soft delete all members in all teams
        for team in teams:
            _, members = get_rows(
                session_inst=tx,
                model=TeamMember,
                team_id=team.id,
                is_deleted=False
            )

            for member in members:
                member.soft_delete(user=deleted_by)
                tx.add(member)
                stats["members"] += 1

            # Soft delete the team
            team.soft_delete(user=deleted_by)
            tx.add(team)
            stats["teams"] += 1

        # Soft delete the organization
        org.soft_delete(user=deleted_by)
        tx.add(org)
        stats["organizations"] += 1

    return {
        "success": True,
        "deleted": stats,
        "message": f"Deleted organization and {stats['teams']} teams with {stats['members']} members"
    }

def restore_organization_cascade(
    session: Session,
    org_id: int
) -> dict:
    """
    Restore a soft-deleted organization and all its teams and members
    """
    # Get organization (including soft-deleted)
    success, org = get_row(
        id_str=org_id,
        session_inst=session,
        model=Organization
    )

    if not success or not org.is_deleted:
        return {"success": False, "error": "Organization not found or not deleted"}

    stats = {"organizations": 0, "teams": 0, "members": 0}

    with transaction(session) as tx:
        # Restore organization
        org.restore()
        tx.add(org)
        stats["organizations"] += 1

        # Get all soft-deleted teams for this organization
        _, teams = get_rows(
            session_inst=tx,
            model=Team,
            organization_id=org_id,
            is_deleted=True
        )

        for team in teams:
            # Restore team
            team.restore()
            tx.add(team)
            stats["teams"] += 1

            # Get all soft-deleted members for this team
            _, members = get_rows(
                session_inst=tx,
                model=TeamMember,
                team_id=team.id,
                is_deleted=True
            )

            for member in members:
                member.restore()
                tx.add(member)
                stats["members"] += 1

    return {
        "success": True,
        "restored": stats,
        "message": f"Restored organization with {stats['teams']} teams and {stats['members']} members"
    }

# Usage
with Session(engine) as session:
    # Create organization structure
    org = Organization(name="Acme Corp")
    write_row(org, session)

    team = Team(name="Engineering", organization_id=org.id)
    write_row(team, session)

    member = TeamMember(name="John Doe", email="john@acme.com", team_id=team.id)
    write_row(member, session)

    # Cascading soft delete
    result = soft_delete_organization_cascade(session, org.id, deleted_by="admin")
    print(result)
    # Output: {'success': True, 'deleted': {'organizations': 1, 'teams': 1, 'members': 1}, ...}

    # Restore everything
    restore_result = restore_organization_cascade(session, org.id)
    print(restore_result)

Explanation

This recipe demonstrates:

  • Recursively soft-deleting related records
  • Maintaining referential integrity with soft deletes
  • Using transactions to ensure all-or-nothing deletion
  • Tracking deletion statistics
  • Implementing restore functionality
Pro Tip: Consider adding a deleted_cascade_id field to track which parent deletion caused a child to be deleted. This makes it easier to restore related records together.

Performance Optimization Tips

Make your database queries faster and more efficient

Performance Optimization Best Practices

Problem

Your application is slow when querying large datasets or dealing with complex relationships. You need to optimize database operations without major refactoring.

Solution

Performance optimization techniques
from sqlmodel import Session, Field, SQLModel, Relationship
from sqlmodel_crud_utils import get_rows, get_row
from typing import Optional, List

# 1. Use indexes on frequently queried fields
class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    email: str = Field(index=True, unique=True)  # Index for lookups
    username: str = Field(index=True)  # Index for searches
    created_at: datetime = Field(index=True)  # Index for date filtering
    status: str = Field(index=True)  # Index for status filtering
    posts: List["Post"] = Relationship(back_populates="author")

class Post(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str = Field(index=True)
    author_id: int = Field(foreign_key="user.id", index=True)  # Foreign key index
    created_at: datetime = Field(index=True)
    author: User = Relationship(back_populates="posts")

# 2. Eager load relationships to avoid N+1 queries
def get_user_with_posts_efficient(session: Session, user_id: int):
    """Load user and all posts in single query"""
    success, user = get_row(
        id_str=user_id,
        session_inst=session,
        model=User,
        selectinload_list=["posts"]  # Eager load posts
    )
    return user

# BAD: N+1 query problem
def get_users_with_post_count_slow(session: Session):
    """This creates N+1 queries - AVOID THIS"""
    _, users = get_rows(session_inst=session, model=User, limit=100)
    result = []
    for user in users:
        # Each iteration queries the database again!
        _, posts = get_rows(
            session_inst=session,
            model=Post,
            author_id=user.id
        )
        result.append({
            "user": user,
            "post_count": len(posts)
        })
    return result

# GOOD: Single query with relationship loading
def get_users_with_post_count_fast(session: Session):
    """Load all data in one query"""
    _, users = get_rows(
        session_inst=session,
        model=User,
        selectinload_list=["posts"],  # Load all posts for all users
        limit=100
    )
    return [
        {
            "user": user,
            "post_count": len(user.posts)
        }
        for user in users
    ]

# 3. Limit the fields you select (if possible with custom queries)
# Note: sqlmodel_crud_utils loads full models, but you can optimize
# by only loading necessary relationships

# 4. Use pagination for large datasets
def get_all_users_paginated(session: Session, batch_size: int = 100):
    """Process large datasets in batches"""
    offset = 0
    while True:
        _, batch = get_rows(
            session_inst=session,
            model=User,
            offset=offset,
            limit=batch_size,
            sort_field="id"
        )

        if not batch:
            break

        yield batch
        offset += batch_size

# Usage
for user_batch in get_all_users_paginated(session):
    process_users(user_batch)

# 5. Batch operations for inserts/updates
def bulk_update_efficient(session: Session, user_ids: List[int], updates: dict):
    """More efficient than individual updates"""
    from sqlmodel_crud_utils import bulk_upsert_mappings

    mappings = [
        {"id": user_id, **updates}
        for user_id in user_ids
    ]

    bulk_upsert_mappings(
        session_inst=session,
        model=User,
        data_mappings=mappings
    )

# 6. Use connection pooling for concurrent access
from sqlmodel import create_engine

# Configure connection pool
engine = create_engine(
    "postgresql://localhost/mydb",
    pool_size=10,  # Number of persistent connections
    max_overflow=20,  # Additional connections if needed
    pool_pre_ping=True,  # Verify connections before use
    pool_recycle=3600  # Recycle connections after 1 hour
)

# 7. Add composite indexes for common filter combinations
# In your Alembic migration:
"""
def upgrade():
    op.create_index(
        'ix_user_status_created',
        'user',
        ['status', 'created_at']
    )
"""

# Then query efficiently:
def get_active_users_recent(session: Session):
    """Uses composite index on (status, created_at)"""
    from datetime import datetime, timedelta
    thirty_days_ago = datetime.utcnow() - timedelta(days=30)

    _, users = get_rows(
        session_inst=session,
        model=User,
        status="active",
        created_at__gte=thirty_days_ago,
        sort_field="created_at",
        limit=1000
    )
    return users

Key Optimization Techniques

  • Indexes: Add indexes to fields used in WHERE, ORDER BY, and JOIN clauses
  • Eager Loading: Use selectinload_list to prevent N+1 queries
  • Pagination: Always limit query results, use pagination for large datasets
  • Batch Operations: Use bulk insert/update instead of individual operations
  • Connection Pooling: Reuse database connections efficiently
  • Composite Indexes: Create indexes for common filter combinations

Performance Checklist

  • ✓ Index all foreign keys
  • ✓ Index fields used in filters and sorting
  • ✓ Use eager loading for relationships
  • ✓ Implement pagination (limit + offset or cursor)
  • ✓ Use bulk operations for multiple records
  • ✓ Configure appropriate connection pool size
  • ✓ Monitor slow queries and add indexes

Testing CRUD Operations

Write effective tests for database operations

Testing Pytest Best Practices

Problem

You need to test CRUD operations thoroughly without polluting your production database, with proper fixtures and cleanup.

Solution

Test fixtures and patterns with pytest
# tests/conftest.py
import pytest
from sqlmodel import Session, create_engine, SQLModel
from sqlalchemy.pool import StaticPool

@pytest.fixture(name="engine")
def engine_fixture():
    """Create in-memory SQLite database for testing"""
    engine = create_engine(
        "sqlite:///:memory:",
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    SQLModel.metadata.create_all(engine)
    yield engine
    SQLModel.metadata.drop_all(engine)

@pytest.fixture(name="session")
def session_fixture(engine):
    """Create database session for tests"""
    with Session(engine) as session:
        yield session
        session.rollback()  # Clean up after each test

# tests/test_user_crud.py
from sqlmodel_crud_utils import (
    get_row, get_rows, write_row, update_row, delete_row,
    RecordNotFoundError
)
from models import User

def test_create_user(session):
    """Test creating a user"""
    user = User(
        username="testuser",
        email="test@example.com",
        created_by="test"
    )

    created_user = write_row(user, session)

    assert created_user.id is not None
    assert created_user.username == "testuser"
    assert created_user.email == "test@example.com"

def test_get_user_by_id(session):
    """Test retrieving a user by ID"""
    # Arrange
    user = write_row(
        User(username="testuser", email="test@example.com"),
        session
    )

    # Act
    success, retrieved_user = get_row(
        id_str=user.id,
        session_inst=session,
        model=User
    )

    # Assert
    assert success is True
    assert retrieved_user.id == user.id
    assert retrieved_user.username == "testuser"

def test_get_nonexistent_user(session):
    """Test retrieving a non-existent user"""
    success, user = get_row(
        id_str=999,
        session_inst=session,
        model=User
    )

    assert success is False
    assert user is None

def test_filter_users(session):
    """Test filtering users"""
    # Arrange - create multiple users
    users = [
        User(username=f"user{i}", email=f"user{i}@example.com", is_active=i % 2 == 0)
        for i in range(10)
    ]
    for user in users:
        write_row(user, session)

    # Act - filter active users
    success, active_users = get_rows(
        session_inst=session,
        model=User,
        is_active=True
    )

    # Assert
    assert success is True
    assert len(active_users) == 5
    assert all(u.is_active for u in active_users)

def test_update_user(session):
    """Test updating a user"""
    # Arrange
    user = write_row(
        User(username="oldname", email="old@example.com"),
        session
    )

    # Act
    success, updated_user = update_row(
        id_str=user.id,
        data_dict={"username": "newname", "email": "new@example.com"},
        model=User,
        session_inst=session
    )

    # Assert
    assert success is True
    assert updated_user.username == "newname"
    assert updated_user.email == "new@example.com"

def test_soft_delete_user(session):
    """Test soft deleting a user"""
    # Arrange
    user = write_row(
        User(username="testuser", email="test@example.com"),
        session
    )

    # Act
    user.soft_delete(user="test")
    session.add(user)
    session.commit()
    session.refresh(user)

    # Assert
    assert user.is_deleted is True
    assert user.deleted_at is not None
    assert user.deleted_by == "test"

def test_pagination(session):
    """Test pagination"""
    # Arrange - create 50 users
    for i in range(50):
        write_row(
            User(username=f"user{i}", email=f"user{i}@example.com"),
            session
        )

    # Act - get page 2 (users 10-19)
    success, page2_users = get_rows(
        session_inst=session,
        model=User,
        offset=10,
        limit=10,
        sort_field="id"
    )

    # Assert
    assert success is True
    assert len(page2_users) == 10

# Test factories for complex scenarios
class UserFactory:
    """Factory for creating test users"""

    @staticmethod
    def create_user(session, **kwargs):
        defaults = {
            "username": "testuser",
            "email": "test@example.com",
            "is_active": True,
            "created_by": "test"
        }
        defaults.update(kwargs)
        user = User(**defaults)
        return write_row(user, session)

    @staticmethod
    def create_batch(session, count=5, **kwargs):
        return [
            UserFactory.create_user(
                session,
                username=f"user{i}",
                email=f"user{i}@example.com",
                **kwargs
            )
            for i in range(count)
        ]

def test_with_factory(session):
    """Test using factory"""
    users = UserFactory.create_batch(session, count=10, is_active=True)

    assert len(users) == 10
    assert all(u.is_active for u in users)

Testing Best Practices

  • Use in-memory SQLite for fast, isolated tests
  • Create fixtures for database setup and teardown
  • Use factories to create test data easily
  • Follow Arrange-Act-Assert pattern
  • Test both success and failure cases
  • Use transactions to rollback after each test
Pro Tip: Run tests with pytest -v --cov=sqlmodel_crud_utils to see test coverage and identify untested code paths.