Copy-paste ready code patterns for common database operations
Implement advanced search with multiple filter conditions
You need to implement a search feature that supports multiple filters, date ranges, text search, and sorting.
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import get_rows
from typing import Optional
from datetime import datetime
class Product(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(index=True)
description: str
price: float
category: str = Field(index=True)
stock: int
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
is_active: bool = Field(default=True)
def search_products(
session: Session,
search_term: Optional[str] = None,
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock_only: bool = False,
created_after: Optional[datetime] = None,
created_before: Optional[datetime] = None,
page: int = 1,
page_size: int = 20,
sort_by: str = "created_at",
sort_desc: bool = True
):
"""
Advanced product search with multiple filter options
"""
# Build filters dynamically
filters = {"is_active": True}
# Text search (partial match)
if search_term:
filters["name__like"] = f"%{search_term}%"
# Category filter (exact match)
if category:
filters["category"] = category
# Price range filters
if min_price is not None:
filters["price__gte"] = min_price
if max_price is not None:
filters["price__lte"] = max_price
# Stock filter
if in_stock_only:
filters["stock__gt"] = 0
# Date range filters
if created_after:
filters["created_at__gte"] = created_after
if created_before:
filters["created_at__lte"] = created_before
# Calculate pagination
offset = (page - 1) * page_size
# Execute query
success, products = get_rows(
session_inst=session,
model=Product,
offset=offset,
limit=page_size,
sort_field=sort_by,
sort_desc=sort_desc,
**filters
)
return {
"products": products if success else [],
"page": page,
"page_size": page_size,
"total": len(products) if success else 0
}
# Usage examples
with Session(engine) as session:
# Search for electronics under $100
results = search_products(
session,
category="electronics",
max_price=100,
in_stock_only=True
)
# Search for products created this month
from datetime import datetime, timedelta
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
recent = search_products(
session,
created_after=thirty_days_ago,
sort_by="created_at",
sort_desc=True
)
# Text search with pagination
page_2 = search_products(
session,
search_term="laptop",
page=2,
page_size=10
)
This recipe demonstrates how to:
__like, __gte, __lte, __gt) for flexible filtering__like for partial text matching with wildcardsImplement efficient pagination for real-time data
Offset-based pagination becomes slow with large datasets and can miss records when data is inserted during pagination. You need cursor-based pagination for better performance and consistency.
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import get_rows
from typing import Optional, List
from datetime import datetime
class Post(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
title: str
content: str
created_at: datetime = Field(default_factory=datetime.utcnow, index=True)
is_published: bool = Field(default=True)
def get_posts_cursor_paginated(
session: Session,
cursor: Optional[int] = None,
limit: int = 20,
is_published: bool = True
) -> dict:
"""
Get posts using cursor-based pagination
Cursor is the ID of the last post from the previous page
"""
filters = {"is_published": is_published}
# If cursor provided, get posts with ID greater than cursor
if cursor:
filters["id__gt"] = cursor
success, posts = get_rows(
session_inst=session,
model=Post,
limit=limit,
sort_field="id",
sort_desc=False, # Important: consistent sort order
**filters
)
posts = posts if success else []
# Generate next cursor
next_cursor = posts[-1].id if posts else None
return {
"posts": posts,
"next_cursor": next_cursor,
"has_more": len(posts) == limit
}
def get_posts_time_based_cursor(
session: Session,
cursor_time: Optional[datetime] = None,
cursor_id: Optional[int] = None,
limit: int = 20
) -> dict:
"""
Time-based cursor pagination (for chronological feeds)
Uses created_at + id for stable pagination
"""
filters = {"is_published": True}
if cursor_time and cursor_id:
# Get posts created before cursor_time
# OR posts created at cursor_time but with id > cursor_id
# This requires custom query - here's a simplified version
filters["created_at__lte"] = cursor_time
filters["id__gt"] = cursor_id
success, posts = get_rows(
session_inst=session,
model=Post,
limit=limit,
sort_field="created_at",
sort_desc=True,
**filters
)
posts = posts if success else []
# Generate next cursor
if posts:
last_post = posts[-1]
next_cursor = {
"time": last_post.created_at.isoformat(),
"id": last_post.id
}
else:
next_cursor = None
return {
"posts": posts,
"next_cursor": next_cursor,
"has_more": len(posts) == limit
}
# Usage in FastAPI
from fastapi import APIRouter, Query
router = APIRouter()
@router.get("/posts")
async def list_posts(
cursor: Optional[int] = Query(None),
limit: int = Query(20, le=100),
session: Session = Depends(get_session)
):
"""Get posts with cursor pagination"""
return get_posts_cursor_paginated(session, cursor, limit)
# Client usage example
def fetch_all_posts(session: Session):
"""Fetch all posts using cursor pagination"""
all_posts = []
cursor = None
while True:
result = get_posts_cursor_paginated(session, cursor=cursor, limit=100)
all_posts.extend(result["posts"])
if not result["has_more"]:
break
cursor = result["next_cursor"]
return all_posts
Cursor-based pagination uses a unique identifier (like ID or timestamp) as a marker instead of offset:
id__lt instead of id__gt and sort descending.
Process large datasets efficiently with graceful error handling
You need to process thousands of records, but don't want a single failure to stop the entire batch. You need proper error tracking and the ability to retry failed records.
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import (
insert_data_rows, get_rows, update_row,
transaction, BulkOperationError
)
from typing import List, Dict, Any, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ImportRecord(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
external_id: str = Field(index=True, unique=True)
data: str
status: str = Field(default="pending") # pending, processing, success, failed
error_message: Optional[str] = None
retry_count: int = Field(default=0)
def process_batch(
session: Session,
records: List[ImportRecord],
batch_size: int = 100
) -> Dict[str, Any]:
"""
Process records in batches with error handling
"""
total_success = 0
total_failed = 0
failed_records = []
# Process in batches
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
logger.info(f"Processing batch {i // batch_size + 1}: {len(batch)} records")
try:
with transaction(session) as tx:
# Try bulk insert first
success = insert_data_rows(batch, tx)
if success:
total_success += len(batch)
logger.info(f"Batch inserted successfully: {len(batch)} records")
else:
# Bulk insert failed, try individual inserts
logger.warning("Bulk insert failed, trying individual inserts")
for record in batch:
try:
write_row(record, tx)
total_success += 1
except Exception as e:
total_failed += 1
failed_records.append({
"record": record,
"error": str(e)
})
logger.error(f"Failed to insert record {record.external_id}: {e}")
except BulkOperationError as e:
logger.error(f"Batch operation error: {e}")
total_failed += len(batch)
failed_records.extend([
{"record": r, "error": str(e)}
for r in batch
])
return {
"total_processed": len(records),
"total_success": total_success,
"total_failed": total_failed,
"failed_records": failed_records
}
def process_with_retry(
session: Session,
max_retries: int = 3
) -> Dict[str, Any]:
"""
Process pending records with retry logic
"""
# Get pending records
success, pending = get_rows(
session_inst=session,
model=ImportRecord,
status="pending",
retry_count__lt=max_retries,
limit=1000
)
if not success or not pending:
return {"message": "No pending records"}
results = {"processed": 0, "succeeded": 0, "failed": 0, "max_retries_exceeded": 0}
for record in pending:
# Mark as processing
update_row(
id_str=record.id,
data_dict={"status": "processing"},
model=ImportRecord,
session_inst=session
)
try:
# Simulate processing
# In real scenario, this would be your business logic
process_record(record)
# Mark as success
update_row(
id_str=record.id,
data_dict={"status": "success", "error_message": None},
model=ImportRecord,
session_inst=session
)
results["succeeded"] += 1
except Exception as e:
logger.error(f"Error processing record {record.id}: {e}")
# Increment retry count
new_retry_count = record.retry_count + 1
if new_retry_count >= max_retries:
# Max retries exceeded
update_row(
id_str=record.id,
data_dict={
"status": "failed",
"retry_count": new_retry_count,
"error_message": f"Max retries exceeded: {str(e)}"
},
model=ImportRecord,
session_inst=session
)
results["max_retries_exceeded"] += 1
else:
# Mark for retry
update_row(
id_str=record.id,
data_dict={
"status": "pending",
"retry_count": new_retry_count,
"error_message": str(e)
},
model=ImportRecord,
session_inst=session
)
results["failed"] += 1
results["processed"] += 1
return results
def process_record(record: ImportRecord):
"""
Simulate record processing
Replace with actual business logic
"""
# Your processing logic here
import json
data = json.loads(record.data)
# ... process data ...
pass
# Usage
with Session(engine) as session:
# Create sample records
records = [
ImportRecord(external_id=f"ext_{i}", data='{"value": 123}')
for i in range(1000)
]
# Process in batches
results = process_batch(session, records, batch_size=100)
print(f"Success: {results['total_success']}, Failed: {results['total_failed']}")
# Process with retry logic
retry_results = process_with_retry(session, max_retries=3)
print(f"Retry results: {retry_results}")
This recipe shows how to:
Prevent lost updates in multi-user environments
Multiple users can update the same record simultaneously, leading to lost updates. You need to detect and handle concurrent modifications.
from sqlmodel import Session, Field, SQLModel
from sqlmodel_crud_utils import get_row, update_row, AuditMixin
from typing import Optional
from datetime import datetime
class BankAccount(SQLModel, AuditMixin, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
account_number: str = Field(index=True, unique=True)
balance: float
version: int = Field(default=0) # Optimistic locking version
class ConcurrentUpdateError(Exception):
"""Raised when concurrent update is detected"""
pass
def update_balance_safe(
session: Session,
account_id: int,
amount: float,
expected_version: int,
user: str
) -> BankAccount:
"""
Update account balance with optimistic locking
"""
# Get current account state
success, account = get_row(
id_str=account_id,
session_inst=session,
model=BankAccount
)
if not success:
raise ValueError(f"Account {account_id} not found")
# Check version matches (no concurrent updates)
if account.version != expected_version:
raise ConcurrentUpdateError(
f"Account was modified by another user. "
f"Expected version {expected_version}, got {account.version}"
)
# Calculate new balance
new_balance = account.balance + amount
if new_balance < 0:
raise ValueError("Insufficient funds")
# Update with version increment
success, updated = update_row(
id_str=account_id,
data_dict={
"balance": new_balance,
"version": account.version + 1,
"updated_by": user
},
model=BankAccount,
session_inst=session
)
if not success:
raise RuntimeError("Update failed")
return updated
def transfer_with_optimistic_locking(
session: Session,
from_account_id: int,
to_account_id: int,
amount: float,
user: str,
max_retries: int = 3
) -> dict:
"""
Transfer money between accounts with retry on concurrent updates
"""
for attempt in range(max_retries):
try:
# Get current versions
_, from_account = get_row(
id_str=from_account_id,
session_inst=session,
model=BankAccount
)
_, to_account = get_row(
id_str=to_account_id,
session_inst=session,
model=BankAccount
)
from_version = from_account.version
to_version = to_account.version
# Perform transfer
with transaction(session) as tx:
# Debit from source
update_balance_safe(
tx, from_account_id, -amount, from_version, user
)
# Credit to destination
update_balance_safe(
tx, to_account_id, amount, to_version, user
)
return {
"success": True,
"attempts": attempt + 1,
"message": f"Transferred {amount} successfully"
}
except ConcurrentUpdateError as e:
if attempt == max_retries - 1:
return {
"success": False,
"attempts": attempt + 1,
"error": f"Max retries exceeded: {str(e)}"
}
# Wait briefly before retry (exponential backoff)
import time
time.sleep(0.1 * (2 ** attempt))
continue
return {"success": False, "error": "Unexpected error"}
# Usage example
with Session(engine) as session:
# Create accounts
acc1 = BankAccount(account_number="ACC001", balance=1000.0, created_by="system")
acc2 = BankAccount(account_number="ACC002", balance=500.0, created_by="system")
write_row(acc1, session)
write_row(acc2, session)
# Transfer with optimistic locking
result = transfer_with_optimistic_locking(
session,
from_account_id=acc1.id,
to_account_id=acc2.id,
amount=200.0,
user="admin"
)
print(f"Transfer result: {result}")
# FastAPI endpoint with optimistic locking
from fastapi import HTTPException
@app.post("/accounts/{account_id}/deposit")
async def deposit(
account_id: int,
amount: float,
version: int,
session: Session = Depends(get_session)
):
"""Deposit money with optimistic locking"""
try:
updated_account = update_balance_safe(
session,
account_id,
amount,
expected_version=version,
user="api"
)
return {
"balance": updated_account.balance,
"version": updated_account.version
}
except ConcurrentUpdateError as e:
raise HTTPException(status_code=409, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
Optimistic locking prevents lost updates by:
version field that increments on each updateSafely delete parent records and all related children
When soft-deleting a parent record, you need to also soft-delete all related child records to maintain referential integrity and prevent orphaned records.
from sqlmodel import Session, Field, SQLModel, Relationship
from sqlmodel_crud_utils import SoftDeleteMixin, get_rows, transaction
from typing import Optional, List
class Organization(SQLModel, SoftDeleteMixin, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
# Relationship to teams
teams: List["Team"] = Relationship(back_populates="organization")
class Team(SQLModel, SoftDeleteMixin, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
organization_id: int = Field(foreign_key="organization.id")
# Relationships
organization: Organization = Relationship(back_populates="teams")
members: List["TeamMember"] = Relationship(back_populates="team")
class TeamMember(SQLModel, SoftDeleteMixin, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str
email: str
team_id: int = Field(foreign_key="team.id")
# Relationship
team: Team = Relationship(back_populates="members")
def soft_delete_organization_cascade(
session: Session,
org_id: int,
deleted_by: str
) -> dict:
"""
Soft delete an organization and all its teams and members
"""
# Get organization with relationships
success, org = get_row(
id_str=org_id,
session_inst=session,
model=Organization,
selectinload_list=["teams"]
)
if not success or org.is_deleted:
return {"success": False, "error": "Organization not found"}
stats = {
"organizations": 0,
"teams": 0,
"members": 0
}
with transaction(session) as tx:
# Get all teams for this organization
_, teams = get_rows(
session_inst=tx,
model=Team,
organization_id=org_id,
is_deleted=False
)
# Soft delete all members in all teams
for team in teams:
_, members = get_rows(
session_inst=tx,
model=TeamMember,
team_id=team.id,
is_deleted=False
)
for member in members:
member.soft_delete(user=deleted_by)
tx.add(member)
stats["members"] += 1
# Soft delete the team
team.soft_delete(user=deleted_by)
tx.add(team)
stats["teams"] += 1
# Soft delete the organization
org.soft_delete(user=deleted_by)
tx.add(org)
stats["organizations"] += 1
return {
"success": True,
"deleted": stats,
"message": f"Deleted organization and {stats['teams']} teams with {stats['members']} members"
}
def restore_organization_cascade(
session: Session,
org_id: int
) -> dict:
"""
Restore a soft-deleted organization and all its teams and members
"""
# Get organization (including soft-deleted)
success, org = get_row(
id_str=org_id,
session_inst=session,
model=Organization
)
if not success or not org.is_deleted:
return {"success": False, "error": "Organization not found or not deleted"}
stats = {"organizations": 0, "teams": 0, "members": 0}
with transaction(session) as tx:
# Restore organization
org.restore()
tx.add(org)
stats["organizations"] += 1
# Get all soft-deleted teams for this organization
_, teams = get_rows(
session_inst=tx,
model=Team,
organization_id=org_id,
is_deleted=True
)
for team in teams:
# Restore team
team.restore()
tx.add(team)
stats["teams"] += 1
# Get all soft-deleted members for this team
_, members = get_rows(
session_inst=tx,
model=TeamMember,
team_id=team.id,
is_deleted=True
)
for member in members:
member.restore()
tx.add(member)
stats["members"] += 1
return {
"success": True,
"restored": stats,
"message": f"Restored organization with {stats['teams']} teams and {stats['members']} members"
}
# Usage
with Session(engine) as session:
# Create organization structure
org = Organization(name="Acme Corp")
write_row(org, session)
team = Team(name="Engineering", organization_id=org.id)
write_row(team, session)
member = TeamMember(name="John Doe", email="john@acme.com", team_id=team.id)
write_row(member, session)
# Cascading soft delete
result = soft_delete_organization_cascade(session, org.id, deleted_by="admin")
print(result)
# Output: {'success': True, 'deleted': {'organizations': 1, 'teams': 1, 'members': 1}, ...}
# Restore everything
restore_result = restore_organization_cascade(session, org.id)
print(restore_result)
This recipe demonstrates:
deleted_cascade_id field to track which parent deletion caused a child to be deleted. This makes it easier to restore related records together.
Make your database queries faster and more efficient
Your application is slow when querying large datasets or dealing with complex relationships. You need to optimize database operations without major refactoring.
from sqlmodel import Session, Field, SQLModel, Relationship
from sqlmodel_crud_utils import get_rows, get_row
from typing import Optional, List
# 1. Use indexes on frequently queried fields
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
email: str = Field(index=True, unique=True) # Index for lookups
username: str = Field(index=True) # Index for searches
created_at: datetime = Field(index=True) # Index for date filtering
status: str = Field(index=True) # Index for status filtering
posts: List["Post"] = Relationship(back_populates="author")
class Post(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
title: str = Field(index=True)
author_id: int = Field(foreign_key="user.id", index=True) # Foreign key index
created_at: datetime = Field(index=True)
author: User = Relationship(back_populates="posts")
# 2. Eager load relationships to avoid N+1 queries
def get_user_with_posts_efficient(session: Session, user_id: int):
"""Load user and all posts in single query"""
success, user = get_row(
id_str=user_id,
session_inst=session,
model=User,
selectinload_list=["posts"] # Eager load posts
)
return user
# BAD: N+1 query problem
def get_users_with_post_count_slow(session: Session):
"""This creates N+1 queries - AVOID THIS"""
_, users = get_rows(session_inst=session, model=User, limit=100)
result = []
for user in users:
# Each iteration queries the database again!
_, posts = get_rows(
session_inst=session,
model=Post,
author_id=user.id
)
result.append({
"user": user,
"post_count": len(posts)
})
return result
# GOOD: Single query with relationship loading
def get_users_with_post_count_fast(session: Session):
"""Load all data in one query"""
_, users = get_rows(
session_inst=session,
model=User,
selectinload_list=["posts"], # Load all posts for all users
limit=100
)
return [
{
"user": user,
"post_count": len(user.posts)
}
for user in users
]
# 3. Limit the fields you select (if possible with custom queries)
# Note: sqlmodel_crud_utils loads full models, but you can optimize
# by only loading necessary relationships
# 4. Use pagination for large datasets
def get_all_users_paginated(session: Session, batch_size: int = 100):
"""Process large datasets in batches"""
offset = 0
while True:
_, batch = get_rows(
session_inst=session,
model=User,
offset=offset,
limit=batch_size,
sort_field="id"
)
if not batch:
break
yield batch
offset += batch_size
# Usage
for user_batch in get_all_users_paginated(session):
process_users(user_batch)
# 5. Batch operations for inserts/updates
def bulk_update_efficient(session: Session, user_ids: List[int], updates: dict):
"""More efficient than individual updates"""
from sqlmodel_crud_utils import bulk_upsert_mappings
mappings = [
{"id": user_id, **updates}
for user_id in user_ids
]
bulk_upsert_mappings(
session_inst=session,
model=User,
data_mappings=mappings
)
# 6. Use connection pooling for concurrent access
from sqlmodel import create_engine
# Configure connection pool
engine = create_engine(
"postgresql://localhost/mydb",
pool_size=10, # Number of persistent connections
max_overflow=20, # Additional connections if needed
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600 # Recycle connections after 1 hour
)
# 7. Add composite indexes for common filter combinations
# In your Alembic migration:
"""
def upgrade():
op.create_index(
'ix_user_status_created',
'user',
['status', 'created_at']
)
"""
# Then query efficiently:
def get_active_users_recent(session: Session):
"""Uses composite index on (status, created_at)"""
from datetime import datetime, timedelta
thirty_days_ago = datetime.utcnow() - timedelta(days=30)
_, users = get_rows(
session_inst=session,
model=User,
status="active",
created_at__gte=thirty_days_ago,
sort_field="created_at",
limit=1000
)
return users
selectinload_list to prevent N+1 queriesWrite effective tests for database operations
You need to test CRUD operations thoroughly without polluting your production database, with proper fixtures and cleanup.
# tests/conftest.py
import pytest
from sqlmodel import Session, create_engine, SQLModel
from sqlalchemy.pool import StaticPool
@pytest.fixture(name="engine")
def engine_fixture():
"""Create in-memory SQLite database for testing"""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
SQLModel.metadata.create_all(engine)
yield engine
SQLModel.metadata.drop_all(engine)
@pytest.fixture(name="session")
def session_fixture(engine):
"""Create database session for tests"""
with Session(engine) as session:
yield session
session.rollback() # Clean up after each test
# tests/test_user_crud.py
from sqlmodel_crud_utils import (
get_row, get_rows, write_row, update_row, delete_row,
RecordNotFoundError
)
from models import User
def test_create_user(session):
"""Test creating a user"""
user = User(
username="testuser",
email="test@example.com",
created_by="test"
)
created_user = write_row(user, session)
assert created_user.id is not None
assert created_user.username == "testuser"
assert created_user.email == "test@example.com"
def test_get_user_by_id(session):
"""Test retrieving a user by ID"""
# Arrange
user = write_row(
User(username="testuser", email="test@example.com"),
session
)
# Act
success, retrieved_user = get_row(
id_str=user.id,
session_inst=session,
model=User
)
# Assert
assert success is True
assert retrieved_user.id == user.id
assert retrieved_user.username == "testuser"
def test_get_nonexistent_user(session):
"""Test retrieving a non-existent user"""
success, user = get_row(
id_str=999,
session_inst=session,
model=User
)
assert success is False
assert user is None
def test_filter_users(session):
"""Test filtering users"""
# Arrange - create multiple users
users = [
User(username=f"user{i}", email=f"user{i}@example.com", is_active=i % 2 == 0)
for i in range(10)
]
for user in users:
write_row(user, session)
# Act - filter active users
success, active_users = get_rows(
session_inst=session,
model=User,
is_active=True
)
# Assert
assert success is True
assert len(active_users) == 5
assert all(u.is_active for u in active_users)
def test_update_user(session):
"""Test updating a user"""
# Arrange
user = write_row(
User(username="oldname", email="old@example.com"),
session
)
# Act
success, updated_user = update_row(
id_str=user.id,
data_dict={"username": "newname", "email": "new@example.com"},
model=User,
session_inst=session
)
# Assert
assert success is True
assert updated_user.username == "newname"
assert updated_user.email == "new@example.com"
def test_soft_delete_user(session):
"""Test soft deleting a user"""
# Arrange
user = write_row(
User(username="testuser", email="test@example.com"),
session
)
# Act
user.soft_delete(user="test")
session.add(user)
session.commit()
session.refresh(user)
# Assert
assert user.is_deleted is True
assert user.deleted_at is not None
assert user.deleted_by == "test"
def test_pagination(session):
"""Test pagination"""
# Arrange - create 50 users
for i in range(50):
write_row(
User(username=f"user{i}", email=f"user{i}@example.com"),
session
)
# Act - get page 2 (users 10-19)
success, page2_users = get_rows(
session_inst=session,
model=User,
offset=10,
limit=10,
sort_field="id"
)
# Assert
assert success is True
assert len(page2_users) == 10
# Test factories for complex scenarios
class UserFactory:
"""Factory for creating test users"""
@staticmethod
def create_user(session, **kwargs):
defaults = {
"username": "testuser",
"email": "test@example.com",
"is_active": True,
"created_by": "test"
}
defaults.update(kwargs)
user = User(**defaults)
return write_row(user, session)
@staticmethod
def create_batch(session, count=5, **kwargs):
return [
UserFactory.create_user(
session,
username=f"user{i}",
email=f"user{i}@example.com",
**kwargs
)
for i in range(count)
]
def test_with_factory(session):
"""Test using factory"""
users = UserFactory.create_batch(session, count=10, is_active=True)
assert len(users) == 10
assert all(u.is_active for u in users)
pytest -v --cov=sqlmodel_crud_utils to see test coverage and identify untested code paths.