REST API Backend with FastAPI

Building a production-ready REST API with clean CRUD operations

The Challenge

You're building a FastAPI application that needs to handle CRUD operations for multiple resources. You want clean, maintainable code with proper error handling, pagination, filtering, and transaction support without writing repetitive boilerplate for each endpoint.

The Solution

Use sqlmodel-crud-utils to handle all database operations with minimal code:

app/models.py - Define your models with audit trails
from sqlmodel import SQLModel, Field
from sqlmodel_crud_utils import AuditMixin, SoftDeleteMixin
from typing import Optional

class User(SQLModel, AuditMixin, SoftDeleteMixin, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(index=True, unique=True)
    email: str = Field(index=True)
    full_name: str
    is_active: bool = Field(default=True)
    # AuditMixin adds: created_at, updated_at, created_by, updated_by
    # SoftDeleteMixin adds: is_deleted, deleted_at, deleted_by

class Post(SQLModel, AuditMixin, SoftDeleteMixin, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str = Field(index=True)
    content: str
    author_id: int = Field(foreign_key="user.id")
    published: bool = Field(default=False)
    views: int = Field(default=0)
app/api/users.py - Clean API endpoints
from fastapi import APIRouter, HTTPException, Depends, Query
from sqlmodel import Session
from sqlmodel_crud_utils import (
    get_rows, get_row, write_row, update_row, delete_row,
    RecordNotFoundError, ValidationError, transaction
)
from app.database import get_session
from app.models import User
from typing import Optional

router = APIRouter()

@router.get("/users")
async def list_users(
    skip: int = Query(0, ge=0),
    limit: int = Query(100, le=100),
    is_active: Optional[bool] = None,
    search: Optional[str] = None,
    session: Session = Depends(get_session)
):
    """List users with pagination and filtering"""
    filters = {"is_deleted": False}
    if is_active is not None:
        filters["is_active"] = is_active
    if search:
        filters["username__like"] = f"%{search}%"

    success, users = get_rows(
        session_inst=session,
        model=User,
        offset=skip,
        limit=limit,
        sort_field="created_at",
        sort_desc=True,
        **filters
    )

    return {
        "total": len(users),
        "users": users
    }

@router.get("/users/{user_id}")
async def get_user(
    user_id: int,
    session: Session = Depends(get_session)
):
    """Get a single user by ID"""
    try:
        success, user = get_row(
            id_str=user_id,
            session_inst=session,
            model=User
        )
        if not success or user.is_deleted:
            raise RecordNotFoundError(model=User, id_value=user_id)
        return user
    except RecordNotFoundError as e:
        raise HTTPException(status_code=404, detail=str(e))

@router.post("/users")
async def create_user(
    user_data: dict,
    session: Session = Depends(get_session)
):
    """Create a new user"""
    try:
        with transaction(session) as tx:
            user = User(**user_data, created_by="api")
            new_user = write_row(user, tx)
            return new_user
    except ValidationError as e:
        raise HTTPException(status_code=422, detail=str(e))

@router.patch("/users/{user_id}")
async def update_user(
    user_id: int,
    updates: dict,
    session: Session = Depends(get_session)
):
    """Update a user"""
    try:
        updates["updated_by"] = "api"
        success, updated_user = update_row(
            id_str=user_id,
            data_dict=updates,
            model=User,
            session_inst=session
        )
        if not success:
            raise RecordNotFoundError(model=User, id_value=user_id)
        return updated_user
    except (RecordNotFoundError, ValidationError) as e:
        raise HTTPException(status_code=404 if isinstance(e, RecordNotFoundError) else 422, detail=str(e))

@router.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    hard_delete: bool = Query(False),
    session: Session = Depends(get_session)
):
    """Delete a user (soft delete by default)"""
    try:
        success, user = get_row(id_str=user_id, session_inst=session, model=User)
        if not success:
            raise RecordNotFoundError(model=User, id_value=user_id)

        if hard_delete:
            delete_row(id_str=user_id, session_inst=session, model=User)
        else:
            user.soft_delete(user="api")
            session.add(user)
            session.commit()

        return {"message": "User deleted successfully"}
    except RecordNotFoundError as e:
        raise HTTPException(status_code=404, detail=str(e))

Benefits

  • Reduced boilerplate code - endpoints are clean and focused on business logic
  • Built-in pagination and filtering with minimal configuration
  • Automatic audit trails track who created/modified records
  • Soft deletes prevent data loss while maintaining clean queries
  • Transaction support ensures data consistency
  • Proper error handling with meaningful exceptions
  • Type-safe operations with full IDE support

Microservices Data Layer

Building a consistent data access pattern across multiple services

The Challenge

You have multiple microservices that need to interact with their own databases. You want a consistent data access pattern across all services, with support for both synchronous and asynchronous operations, while maintaining service independence.

The Solution

Create a shared data layer package using sqlmodel-crud-utils:

shared_data/repository.py - Generic repository pattern
from typing import Generic, TypeVar, Type, Optional, List, Dict, Any
from sqlmodel import Session, SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlmodel_crud_utils import (
    get_row, get_rows, write_row, update_row, delete_row,
    a_get_row, a_get_rows, a_write_row, a_update_row, a_delete_row,
    transaction, a_transaction,
    RecordNotFoundError
)

T = TypeVar("T", bound=SQLModel)

class Repository(Generic[T]):
    """Generic repository for any SQLModel"""

    def __init__(self, model: Type[T]):
        self.model = model

    def get_by_id(self, session: Session, id: int) -> Optional[T]:
        """Get a single record by ID"""
        success, record = get_row(
            id_str=id,
            session_inst=session,
            model=self.model
        )
        return record if success else None

    def get_all(
        self,
        session: Session,
        offset: int = 0,
        limit: int = 100,
        **filters
    ) -> List[T]:
        """Get all records with optional filtering"""
        success, records = get_rows(
            session_inst=session,
            model=self.model,
            offset=offset,
            limit=limit,
            **filters
        )
        return records if success else []

    def create(self, session: Session, data: T) -> T:
        """Create a new record"""
        with transaction(session) as tx:
            return write_row(data, tx)

    def update(self, session: Session, id: int, updates: Dict[str, Any]) -> Optional[T]:
        """Update a record"""
        success, updated = update_row(
            id_str=id,
            data_dict=updates,
            model=self.model,
            session_inst=session
        )
        return updated if success else None

    def delete(self, session: Session, id: int) -> bool:
        """Delete a record"""
        return delete_row(
            id_str=id,
            session_inst=session,
            model=self.model
        )

class AsyncRepository(Generic[T]):
    """Async version of the repository"""

    def __init__(self, model: Type[T]):
        self.model = model

    async def get_by_id(self, session: AsyncSession, id: int) -> Optional[T]:
        success, record = await a_get_row(
            id_str=id,
            session_inst=session,
            model=self.model
        )
        return record if success else None

    async def get_all(
        self,
        session: AsyncSession,
        offset: int = 0,
        limit: int = 100,
        **filters
    ) -> List[T]:
        success, records = await a_get_rows(
            session_inst=session,
            model=self.model,
            offset=offset,
            limit=limit,
            **filters
        )
        return records if success else []

    async def create(self, session: AsyncSession, data: T) -> T:
        async with a_transaction(session) as tx:
            return await a_write_row(data, tx)

    async def update(
        self,
        session: AsyncSession,
        id: int,
        updates: Dict[str, Any]
    ) -> Optional[T]:
        success, updated = await a_update_row(
            id_str=id,
            data_dict=updates,
            model=self.model,
            session_inst=session
        )
        return updated if success else None
order_service/repositories.py - Service-specific repositories
from shared_data.repository import Repository, AsyncRepository
from .models import Order, OrderItem

class OrderRepository(Repository[Order]):
    """Order-specific repository with custom methods"""

    def get_by_customer(self, session, customer_id: int, limit: int = 50):
        """Get orders for a specific customer"""
        return self.get_all(
            session,
            customer_id=customer_id,
            limit=limit,
            sort_field="created_at",
            sort_desc=True
        )

    def get_pending_orders(self, session):
        """Get all pending orders"""
        return self.get_all(
            session,
            status="pending",
            is_deleted=False
        )

# Use in service
order_repo = OrderRepository(Order)
item_repo = Repository(OrderItem)

Benefits

  • Consistent data access pattern across all microservices
  • Both sync and async support for different service requirements
  • Easy to extend with service-specific methods
  • Reduces duplication and maintenance burden
  • Type-safe generic repositories
  • Transaction support built-in
  • Simple to test and mock

Data Migration and ETL Scripts

Efficiently migrating and transforming large datasets

The Challenge

You need to migrate data from a legacy system to your new SQLModel-based application. The migration involves transforming data, handling errors gracefully, and processing millions of records efficiently with proper logging and rollback capabilities.

The Solution

Use bulk operations and transactions for efficient, safe migrations:

migrations/migrate_users.py
from sqlmodel import Session, create_engine
from sqlmodel_crud_utils import (
    insert_data_rows, bulk_upsert_mappings,
    transaction, BulkOperationError
)
from new_system.models import User
from legacy_system import get_legacy_users
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def transform_user(legacy_user: dict) -> User:
    """Transform legacy user data to new format"""
    return User(
        id=legacy_user["user_id"],
        username=legacy_user["login"],
        email=legacy_user["email_address"],
        full_name=f"{legacy_user['first_name']} {legacy_user['last_name']}",
        is_active=legacy_user["status"] == "active",
        created_by="migration_script"
    )

def migrate_users_batch(engine, batch_size: int = 1000):
    """Migrate users in batches"""
    offset = 0
    total_migrated = 0
    errors = []

    while True:
        # Get batch from legacy system
        legacy_users = get_legacy_users(offset=offset, limit=batch_size)
        if not legacy_users:
            break

        logger.info(f"Processing batch: {offset} to {offset + len(legacy_users)}")

        # Transform data
        new_users = [transform_user(u) for u in legacy_users]

        with Session(engine) as session:
            try:
                with transaction(session) as tx:
                    # Bulk insert with automatic fallback to individual inserts on error
                    success = insert_data_rows(new_users, tx)
                    if success:
                        total_migrated += len(new_users)
                        logger.info(f"Migrated {len(new_users)} users successfully")
                    else:
                        logger.warning(f"Batch insert failed, falling back to individual inserts")
            except BulkOperationError as e:
                logger.error(f"Error in batch {offset}: {e}")
                logger.error(f"Failed records: {e.failed_count}")
                errors.append({
                    "offset": offset,
                    "error": str(e),
                    "failed_count": e.failed_count
                })

        offset += batch_size

    logger.info(f"Migration complete. Total migrated: {total_migrated}")
    if errors:
        logger.error(f"Encountered {len(errors)} batch errors")
        for error in errors:
            logger.error(f"  Batch at offset {error['offset']}: {error['error']}")

    return total_migrated, errors

def upsert_users_from_csv(engine, csv_path: str):
    """Upsert users from CSV file"""
    import csv

    with open(csv_path, 'r') as f:
        reader = csv.DictReader(f)
        user_mappings = [
            {
                "id": int(row["id"]),
                "username": row["username"],
                "email": row["email"],
                "full_name": row["full_name"],
                "is_active": row["is_active"].lower() == "true"
            }
            for row in reader
        ]

    with Session(engine) as session:
        try:
            # Bulk upsert - insert new records, update existing ones
            bulk_upsert_mappings(
                session_inst=session,
                model=User,
                data_mappings=user_mappings
            )
            logger.info(f"Upserted {len(user_mappings)} users from CSV")
        except Exception as e:
            logger.error(f"Upsert failed: {e}")
            raise

if __name__ == "__main__":
    engine = create_engine("postgresql://localhost/new_system")
    migrate_users_batch(engine)

Benefits

  • Efficient bulk operations for large datasets
  • Automatic fallback to individual inserts on bulk failures
  • Transaction support ensures all-or-nothing batch processing
  • Detailed error reporting with failed record counts
  • Upsert functionality for idempotent migrations
  • Easy to resume failed migrations
  • Clean separation of transformation and persistence logic

Admin Dashboard with Complex Queries

Building powerful admin interfaces with advanced filtering and search

The Challenge

You're building an admin dashboard that needs to support complex filtering, sorting, pagination, and search across multiple fields. Admins need to perform bulk operations, view audit trails, and manage soft-deleted records.

The Solution

Leverage flexible filtering and audit mixins for powerful admin features:

admin/user_management.py
from sqlmodel import Session
from sqlmodel_crud_utils import get_rows, update_row, transaction
from models import User
from typing import Optional, List

class UserManagementService:
    """Service for admin user management"""

    def search_users(
        self,
        session: Session,
        search_term: Optional[str] = None,
        is_active: Optional[bool] = None,
        created_after: Optional[str] = None,
        created_by: Optional[str] = None,
        include_deleted: bool = False,
        page: int = 1,
        page_size: int = 50,
        sort_by: str = "created_at",
        sort_desc: bool = True
    ) -> tuple[List[User], int]:
        """
        Advanced user search with multiple filters
        """
        # Build dynamic filters
        filters = {}

        # Search across multiple fields
        if search_term:
            # Note: For OR conditions, you'd use get_result_from_query
            # This example shows AND filtering
            filters["username__like"] = f"%{search_term}%"

        if is_active is not None:
            filters["is_active"] = is_active

        if created_after:
            filters["created_at__gte"] = created_after

        if created_by:
            filters["created_by"] = created_by

        if not include_deleted:
            filters["is_deleted"] = False

        # Calculate offset
        offset = (page - 1) * page_size

        # Execute query
        success, users = get_rows(
            session_inst=session,
            model=User,
            offset=offset,
            limit=page_size,
            sort_field=sort_by,
            sort_desc=sort_desc,
            **filters
        )

        return users if success else [], len(users)

    def bulk_activate_users(
        self,
        session: Session,
        user_ids: List[int],
        admin_user: str
    ) -> dict:
        """Bulk activate multiple users"""
        results = {"success": [], "failed": []}

        with transaction(session) as tx:
            for user_id in user_ids:
                success, user = update_row(
                    id_str=user_id,
                    data_dict={
                        "is_active": True,
                        "updated_by": admin_user
                    },
                    model=User,
                    session_inst=tx
                )

                if success:
                    results["success"].append(user_id)
                else:
                    results["failed"].append(user_id)

        return results

    def get_user_audit_trail(
        self,
        session: Session,
        user_id: int
    ) -> dict:
        """Get complete audit information for a user"""
        success, user = get_row(
            id_str=user_id,
            session_inst=session,
            model=User
        )

        if not success:
            return None

        return {
            "user_id": user.id,
            "created_at": user.created_at,
            "created_by": user.created_by,
            "updated_at": user.updated_at,
            "updated_by": user.updated_by,
            "is_deleted": user.is_deleted,
            "deleted_at": user.deleted_at,
            "deleted_by": user.deleted_by
        }

    def restore_deleted_user(
        self,
        session: Session,
        user_id: int,
        admin_user: str
    ) -> bool:
        """Restore a soft-deleted user"""
        success, user = get_row(
            id_str=user_id,
            session_inst=session,
            model=User
        )

        if not success or not user.is_deleted:
            return False

        user.restore()
        user.updated_by = admin_user
        session.add(user)
        session.commit()

        return True

Benefits

  • Flexible filtering with comparison operators (__like, __gte, etc.)
  • Built-in pagination for large datasets
  • Audit trail tracking for compliance and debugging
  • Soft delete management with restore capabilities
  • Bulk operations with transaction safety
  • Clean service layer separation
  • Easy to extend with additional filters

Multi-Tenant SaaS Applications

Building secure, isolated data access for multiple tenants

The Challenge

You're building a SaaS application where each customer (tenant) needs isolated data. Every database query must be automatically scoped to the current tenant, with no risk of data leakage between tenants. You also need per-tenant analytics and reporting.

The Solution

Create a tenant-aware data access layer:

app/tenant_repository.py
from sqlmodel import Session, SQLModel
from sqlmodel_crud_utils import (
    get_rows, get_row, write_row, update_row,
    transaction, RecordNotFoundError
)
from typing import TypeVar, Type, Generic, Optional, List, Dict, Any
from contextlib import contextmanager

T = TypeVar("T", bound=SQLModel)

class TenantRepository(Generic[T]):
    """Repository that automatically scopes all queries to a tenant"""

    def __init__(self, model: Type[T], tenant_id: int):
        self.model = model
        self.tenant_id = tenant_id

    def _ensure_tenant_field(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Ensure tenant_id is set on all operations"""
        data["tenant_id"] = self.tenant_id
        return data

    def get_all(
        self,
        session: Session,
        offset: int = 0,
        limit: int = 100,
        **filters
    ) -> List[T]:
        """Get all records for this tenant"""
        # Automatically add tenant filter
        filters["tenant_id"] = self.tenant_id
        filters["is_deleted"] = False

        success, records = get_rows(
            session_inst=session,
            model=self.model,
            offset=offset,
            limit=limit,
            **filters
        )
        return records if success else []

    def get_by_id(self, session: Session, id: int) -> Optional[T]:
        """Get a record by ID, scoped to tenant"""
        success, record = get_row(
            id_str=id,
            session_inst=session,
            model=self.model
        )

        # Verify tenant ownership
        if success and hasattr(record, "tenant_id") and record.tenant_id == self.tenant_id:
            return record
        return None

    def create(self, session: Session, data: Dict[str, Any]) -> T:
        """Create a record for this tenant"""
        data = self._ensure_tenant_field(data)

        with transaction(session) as tx:
            instance = self.model(**data)
            return write_row(instance, tx)

    def update(
        self,
        session: Session,
        id: int,
        updates: Dict[str, Any]
    ) -> Optional[T]:
        """Update a record, ensuring tenant ownership"""
        # First verify tenant ownership
        record = self.get_by_id(session, id)
        if not record:
            return None

        # Prevent tenant_id modification
        updates.pop("tenant_id", None)

        success, updated = update_row(
            id_str=id,
            data_dict=updates,
            model=self.model,
            session_inst=session
        )
        return updated if success else None

    def get_tenant_stats(self, session: Session) -> Dict[str, Any]:
        """Get statistics for this tenant"""
        all_records = self.get_all(session, limit=10000)

        return {
            "tenant_id": self.tenant_id,
            "total_records": len(all_records),
            "active_records": len([r for r in all_records if getattr(r, "is_active", True)])
        }

# Usage in FastAPI
from fastapi import Depends, Header, HTTPException

def get_current_tenant_id(x_tenant_id: str = Header(...)) -> int:
    """Extract tenant ID from request header"""
    try:
        return int(x_tenant_id)
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid tenant ID")

@app.get("/projects")
async def list_projects(
    tenant_id: int = Depends(get_current_tenant_id),
    session: Session = Depends(get_session)
):
    """List projects for the current tenant"""
    repo = TenantRepository(Project, tenant_id)
    projects = repo.get_all(session, is_active=True)
    return {"projects": projects}

@app.post("/projects")
async def create_project(
    project_data: dict,
    tenant_id: int = Depends(get_current_tenant_id),
    session: Session = Depends(get_session)
):
    """Create a project for the current tenant"""
    repo = TenantRepository(Project, tenant_id)
    project = repo.create(session, project_data)
    return project

Benefits

  • Automatic tenant isolation on all queries
  • No risk of cross-tenant data leakage
  • Clean, reusable pattern across all entities
  • Tenant-scoped statistics and analytics
  • Prevents accidental tenant_id modifications
  • Easy to audit and test tenant isolation
  • Works seamlessly with existing CRUD utilities

Event Sourcing and Audit Logging

Tracking all changes with complete audit trails

The Challenge

You need to maintain a complete history of all changes to critical entities for compliance, debugging, and audit purposes. Every create, update, and delete operation must be logged with timestamp, user, and change details.

The Solution

Combine AuditMixin with event logging:

app/audited_repository.py
from sqlmodel import Session, SQLModel, Field
from sqlmodel_crud_utils import (
    write_row, update_row, AuditMixin,
    transaction
)
from datetime import datetime
from typing import Optional, Dict, Any
import json

class AuditLog(SQLModel, table=True):
    """Stores all audit events"""
    id: Optional[int] = Field(default=None, primary_key=True)
    entity_type: str = Field(index=True)
    entity_id: int = Field(index=True)
    operation: str  # CREATE, UPDATE, DELETE
    changes: str  # JSON of changes
    performed_by: str = Field(index=True)
    performed_at: datetime = Field(default_factory=datetime.utcnow)
    ip_address: Optional[str] = None

class AuditedRepository:
    """Repository that logs all changes"""

    def __init__(self, model: type, user: str, ip_address: Optional[str] = None):
        self.model = model
        self.user = user
        self.ip_address = ip_address

    def _log_event(
        self,
        session: Session,
        entity_id: int,
        operation: str,
        changes: Dict[str, Any]
    ):
        """Log an audit event"""
        log_entry = AuditLog(
            entity_type=self.model.__name__,
            entity_id=entity_id,
            operation=operation,
            changes=json.dumps(changes),
            performed_by=self.user,
            ip_address=self.ip_address
        )
        write_row(log_entry, session)

    def create(self, session: Session, data: Dict[str, Any]) -> Any:
        """Create entity with audit logging"""
        with transaction(session) as tx:
            # Add audit fields
            data["created_by"] = self.user

            # Create entity
            instance = self.model(**data)
            new_record = write_row(instance, tx)

            # Log creation
            self._log_event(
                tx,
                new_record.id,
                "CREATE",
                {"new_values": data}
            )

            return new_record

    def update(
        self,
        session: Session,
        entity_id: int,
        updates: Dict[str, Any]
    ) -> Optional[Any]:
        """Update entity with audit logging"""
        # Get current state
        success, old_record = get_row(
            id_str=entity_id,
            session_inst=session,
            model=self.model
        )
        if not success:
            return None

        old_values = {
            k: getattr(old_record, k)
            for k in updates.keys()
            if hasattr(old_record, k)
        }

        with transaction(session) as tx:
            # Add audit fields
            updates["updated_by"] = self.user

            # Update entity
            success, updated_record = update_row(
                id_str=entity_id,
                data_dict=updates,
                model=self.model,
                session_inst=tx
            )

            if success:
                # Log update
                self._log_event(
                    tx,
                    entity_id,
                    "UPDATE",
                    {
                        "old_values": old_values,
                        "new_values": updates
                    }
                )

            return updated_record if success else None

    def get_audit_trail(
        self,
        session: Session,
        entity_id: int
    ) -> list[AuditLog]:
        """Get complete audit trail for an entity"""
        success, logs = get_rows(
            session_inst=session,
            model=AuditLog,
            entity_type=self.model.__name__,
            entity_id=entity_id,
            sort_field="performed_at",
            sort_desc=True
        )
        return logs if success else []

# Usage
repo = AuditedRepository(User, user="admin@example.com", ip_address="192.168.1.1")

with Session(engine) as session:
    # Create with automatic audit logging
    user = repo.create(session, {
        "username": "jdoe",
        "email": "jdoe@example.com"
    })

    # Update with automatic audit logging
    repo.update(session, user.id, {"email": "john.doe@example.com"})

    # View audit trail
    trail = repo.get_audit_trail(session, user.id)
    for event in trail:
        print(f"{event.performed_at}: {event.operation} by {event.performed_by}")
        print(f"  Changes: {event.changes}")

Benefits

  • Complete audit trail for compliance
  • Track who changed what and when
  • Store old and new values for every change
  • Easy to query audit logs by entity or user
  • Automatic timestamp and user tracking with AuditMixin
  • Transaction safety ensures audit logs are always in sync
  • Useful for debugging and rollback scenarios
Want more examples? Check out the Recipes page for practical code patterns and the API Reference for detailed function documentation.