sqlmodel_crud_utils

SQLModel CRUD Utilities

A set of CRUD utilities to expedite operations with SQLModel, providing both synchronous and asynchronous support for common database operations.

New in v0.2.0: - Custom exception hierarchy for better error handling - Transaction context managers for safe operations - Audit trail mixins (created_at, updated_at tracking) - Soft delete support (is_deleted flag) - Public API exports for easier imports

  1"""
  2SQLModel CRUD Utilities
  3
  4A set of CRUD utilities to expedite operations with SQLModel, providing both
  5synchronous and asynchronous support for common database operations.
  6
  7New in v0.2.0:
  8    - Custom exception hierarchy for better error handling
  9    - Transaction context managers for safe operations
 10    - Audit trail mixins (created_at, updated_at tracking)
 11    - Soft delete support (is_deleted flag)
 12    - Public API exports for easier imports
 13"""
 14
 15__version__ = "0.2.0"
 16
 17# Import asynchronous functions with a_ prefix
 18from sqlmodel_crud_utils.a_sync import (
 19    bulk_upsert_mappings as a_bulk_upsert_mappings,
 20)
 21from sqlmodel_crud_utils.a_sync import delete_row as a_delete_row
 22from sqlmodel_crud_utils.a_sync import get_one_or_create as a_get_one_or_create
 23from sqlmodel_crud_utils.a_sync import (
 24    get_result_from_query as a_get_result_from_query,
 25)
 26from sqlmodel_crud_utils.a_sync import get_row as a_get_row
 27from sqlmodel_crud_utils.a_sync import get_rows as a_get_rows
 28from sqlmodel_crud_utils.a_sync import (
 29    get_rows_within_id_list as a_get_rows_within_id_list,
 30)
 31from sqlmodel_crud_utils.a_sync import insert_data_rows as a_insert_data_rows
 32from sqlmodel_crud_utils.a_sync import update_row as a_update_row
 33from sqlmodel_crud_utils.a_sync import write_row as a_write_row
 34
 35# Import exceptions
 36from sqlmodel_crud_utils.exceptions import (
 37    BulkOperationError,
 38    MultipleRecordsError,
 39    RecordNotFoundError,
 40    SQLModelCRUDError,
 41    TransactionError,
 42    ValidationError,
 43)
 44
 45# Import mixins
 46from sqlmodel_crud_utils.mixins import AuditMixin, SoftDeleteMixin
 47
 48# Import synchronous functions
 49from sqlmodel_crud_utils.sync import (
 50    bulk_upsert_mappings,
 51    delete_row,
 52    get_one_or_create,
 53    get_result_from_query,
 54    get_row,
 55    get_rows,
 56    get_rows_within_id_list,
 57    insert_data_rows,
 58    update_row,
 59    write_row,
 60)
 61
 62# Import transaction managers
 63from sqlmodel_crud_utils.transactions import a_transaction, transaction
 64
 65__all__ = [
 66    # Version
 67    "__version__",
 68    # Synchronous functions
 69    "bulk_upsert_mappings",
 70    "delete_row",
 71    "get_one_or_create",
 72    "get_result_from_query",
 73    "get_row",
 74    "get_rows",
 75    "get_rows_within_id_list",
 76    "insert_data_rows",
 77    "update_row",
 78    "write_row",
 79    # Asynchronous functions
 80    "a_bulk_upsert_mappings",
 81    "a_delete_row",
 82    "a_get_one_or_create",
 83    "a_get_result_from_query",
 84    "a_get_row",
 85    "a_get_rows",
 86    "a_get_rows_within_id_list",
 87    "a_insert_data_rows",
 88    "a_update_row",
 89    "a_write_row",
 90    # Exceptions
 91    "SQLModelCRUDError",
 92    "RecordNotFoundError",
 93    "MultipleRecordsError",
 94    "ValidationError",
 95    "BulkOperationError",
 96    "TransactionError",
 97    # Transaction managers
 98    "transaction",
 99    "a_transaction",
100    # Mixins
101    "AuditMixin",
102    "SoftDeleteMixin",
103]
__version__ = '0.2.0'
def bulk_upsert_mappings( payload: list, session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], pk_fields: list[str] | None = None):
480def bulk_upsert_mappings(
481    payload: list,
482    session_inst: Session,
483    model: type[SQLModel],
484    pk_fields: list[str] | None = None,
485):
486    """
487
488    :param payload:
489    :param session_inst:
490    :param model:
491    :param pk_fields:
492    :return:
493    """
494    if not pk_fields:
495        pk_fields = ["id"]
496    stmnt = upsert(model).values(payload)
497    stmnt = stmnt.on_conflict_do_update(
498        index_elements=[getattr(model, x) for x in pk_fields],
499        set_={k: getattr(stmnt.excluded, k) for k in payload[0].keys()},
500    )
501    session_inst.exec(stmnt)
502
503    results = session_inst.scalars(
504        stmnt.returning(model), execution_options={"populate_existing": True}
505    )
506
507    session_inst.commit()
508
509    return True, results.all()
Parameters
  • payload:
  • session_inst:
  • model:
  • pk_fields:
Returns
def delete_row( id_str: str | int, session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], pk_field: str = 'id'):
443def delete_row(
444    id_str: str | int,
445    session_inst: Session,
446    model: type[SQLModel],
447    pk_field: str = "id",
448):
449    """
450
451    :param id_str:
452    :param session_inst:
453    :param model:
454    :param pk_field:
455    :return:
456    """
457    success = False
458    stmnt = select(model).where(getattr(model, pk_field) == id_str)
459    results = session_inst.exec(stmnt)
460
461    row = results.one_or_none()
462
463    if not row:
464        pass
465    else:
466        try:
467            session_inst.delete(row)
468            session_inst.commit()
469            success = True
470        except Exception as e:
471            logger.error(
472                f"Failed to delete data row. Please see error messages here: "
473                f"{type(e), e, e.args}"
474            )
475            session_inst.rollback()
476
477    return success
Parameters
  • id_str:
  • session_inst:
  • model:
  • pk_field:
Returns
def get_one_or_create( session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], create_method_kwargs: dict | None = None, selectin: bool = False, select_in_key: str | None = None, **kwargs):
44def get_one_or_create(
45    session_inst: Session,
46    model: type[SQLModel],
47    create_method_kwargs: dict | None = None,
48    selectin: bool = False,
49    select_in_key: str | None = None,
50    **kwargs,
51):
52    """
53    This function either returns an existing data row from the database or
54    creates a new instance and saves it to the DB.
55
56    :param session_inst: Session
57    :param model: SQLModel ORM
58    :param create_method_kwargs: dict
59    :param selectin: bool
60    :param select_in_key: str | None
61    :param kwargs: keyword args
62    :return: Tuple[Row, bool]
63    """
64
65    def _get_entry(sqlmodel, **key_args):
66        stmnt = select(sqlmodel).filter_by(**key_args)
67        results = get_result_from_query(query=stmnt, session=session_inst)
68
69        if results:
70            if selectin and select_in_key:
71                stmnt = stmnt.options(
72                    selectinload(getattr(sqlmodel, select_in_key))
73                )
74                results = get_result_from_query(
75                    query=stmnt, session=session_inst
76                )
77            return results, True
78        else:
79            return results, False
80
81    results, exists = _get_entry(model, **kwargs)
82    if results:
83        return results, exists
84    else:
85        kwargs.update(create_method_kwargs or {})
86        created = model()
87        [setattr(created, k, v) for k, v in kwargs.items()]
88        session_inst.add(created)
89        session_inst.commit()
90        return created, False

This function either returns an existing data row from the database or creates a new instance and saves it to the DB.

Parameters
  • session_inst: Session
  • model: SQLModel ORM
  • create_method_kwargs: dict
  • selectin: bool
  • select_in_key: str | None
  • kwargs: keyword args
Returns

Tuple[Row, bool]

def get_result_from_query( query: sqlmodel.sql._expression_select_cls.SelectOfScalar, session: sqlmodel.orm.session.Session):
23def get_result_from_query(query: SelectOfScalar, session: Session):
24    """
25    Processes an SQLModel query object and returns a singular result from the
26    return payload. If more than one row is returned, then only the first row is
27    returned. If no rows are available, then a null value is returned.
28
29    :param query: SelectOfScalar
30    :param session: Session
31
32    :return: Row
33    """
34    results = session.exec(query)
35    try:
36        results = results.one_or_none()
37    except MultipleResultsFound:
38        results = session.exec(query)
39        results = results.first()
40
41    return results

Processes an SQLModel query object and returns a singular result from the return payload. If more than one row is returned, then only the first row is returned. If no rows are available, then a null value is returned.

Parameters
  • query: SelectOfScalar
  • session: Session
Returns

Row

def get_row( id_str: str | int, session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], selectin: bool = False, lazy: bool = False, lazy_load_keys: list[str] | None = None, select_in_keys: list[str] | None = None, pk_field: str = 'id'):
156def get_row(
157    id_str: str | int,
158    session_inst: Session,
159    model: type[SQLModel],
160    selectin: bool = False,
161    lazy: bool = False,
162    lazy_load_keys: list[str] | None = None,
163    select_in_keys: list[str] | None = None,
164    pk_field: str = "id",
165):
166    """
167
168    :param id_str:
169    :param session_inst:
170    :param model:
171    :param selectin:
172    :param lazy:
173    :param lazy_load_keys:
174    :param select_in_keys:
175    :param pk_field:
176    :return:
177    """
178    stmnt = select(model).where(getattr(model, pk_field) == id_str)
179    if selectin and select_in_keys:
180        if isinstance(select_in_keys, list) is False:
181            select_in_keys = [select_in_keys]
182
183        for key in select_in_keys:
184            stmnt = stmnt.options(selectinload(getattr(model, key)))
185    if lazy and lazy_load_keys:
186        if isinstance(lazy_load_keys, list) is False:
187            lazy_load_keys = [lazy_load_keys]
188        for key in lazy_load_keys:
189            stmnt = stmnt.options(lazyload(getattr(model, key)))
190    results = session_inst.exec(stmnt)
191
192    row = results.one_or_none()
193
194    if not row:
195        success = False
196    else:
197        success = True
198
199    return success, row
Parameters
  • id_str:
  • session_inst:
  • model:
  • selectin:
  • lazy:
  • lazy_load_keys:
  • select_in_keys:
  • pk_field:
Returns
def get_rows( session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], selectin: bool = False, select_in_keys: list[str] | None = None, lazy: bool = False, lazy_load_keys: list[str] | None = None, page_size: int = 100, page: int = 1, text_field: str | None = None, stmnt: sqlmodel.sql._expression_select_cls.SelectOfScalar | None = None, **kwargs):
202def get_rows(
203    session_inst: Session,
204    model: type[SQLModel],
205    selectin: bool = False,
206    select_in_keys: list[str] | None = None,
207    lazy: bool = False,
208    lazy_load_keys: list[str] | None = None,
209    page_size: int = 100,
210    page: int = 1,
211    text_field: str | None = None,
212    stmnt: SelectOfScalar | None = None,
213    **kwargs,
214):
215    """
216
217    :param session_inst:
218    :param model:
219    :param selectin:
220    :param select_in_keys:
221    :param lazy:
222    :param lazy_load_keys:
223    :param page_size:
224    :param page:
225    :param text_field:
226    :param stmnt:
227    :param kwargs:
228    :return:
229    """
230    # Inside get_rows (sync and async versions)
231
232    # ... existing code ...
233    if stmnt is None:
234        stmnt = select(model)
235        if kwargs:
236            # Separate special filter keys from exact match keys
237            exact_match_kwargs = {}
238            special_filters = {}
239
240            keys_to_process = list(kwargs.keys())  # Iterate over a copy
241
242            for key in keys_to_process:
243                val = kwargs[key]
244                if "__like" in key:
245                    model_key = key.replace("__like", "")
246                    special_filters[key] = (
247                        getattr(model, model_key).like,
248                        f"%{val}%",
249                    )  # Adapt for like
250                elif "__gte" in key:
251                    model_key = key.replace("__gte", "")
252                    parsed_val = (
253                        date_parse(val)
254                        if "date" in key
255                        and isinstance(val, str)
256                        and is_date(val, fuzzy=False)
257                        else (
258                            int(val)
259                            if isinstance(val, str) and val.isdigit()
260                            else val
261                        )
262                    )
263                    special_filters[key] = (
264                        getattr(model, model_key).__ge__,
265                        parsed_val,
266                    )
267                elif "__lte" in key:
268                    model_key = key.replace("__lte", "")
269                    parsed_val = (
270                        date_parse(val)
271                        if "date" in key
272                        and isinstance(val, str)
273                        and is_date(val, fuzzy=False)
274                        else (
275                            int(val)
276                            if isinstance(val, str) and val.isdigit()
277                            else val
278                        )
279                    )
280                    special_filters[key] = (
281                        getattr(model, model_key).__le__,
282                        parsed_val,
283                    )
284                elif "__gt" in key:  # Add __gt if needed
285                    model_key = key.replace("__gt", "")
286                    parsed_val = (
287                        date_parse(val)
288                        if "date" in key
289                        and isinstance(val, str)
290                        and is_date(val, fuzzy=False)
291                        else (
292                            int(val)
293                            if isinstance(val, str) and val.isdigit()
294                            else val
295                        )
296                    )
297                    special_filters[key] = (
298                        getattr(model, model_key).__gt__,
299                        parsed_val,
300                    )
301                elif "__lt" in key:  # Add __lt if needed
302                    model_key = key.replace("__lt", "")
303                    parsed_val = (
304                        date_parse(val)
305                        if "date" in key
306                        and isinstance(val, str)
307                        and is_date(val, fuzzy=False)
308                        else (
309                            int(val)
310                            if isinstance(val, str) and val.isdigit()
311                            else val
312                        )
313                    )
314                    special_filters[key] = (
315                        getattr(model, model_key).__lt__,
316                        parsed_val,
317                    )
318                elif "__in" in key:  # Add __in if needed
319                    model_key = key.replace("__in", "")
320                    if isinstance(val, list):
321                        special_filters[key] = (
322                            getattr(model, model_key).in_,
323                            val,
324                        )
325                    else:
326                        logger.warning(
327                            f"Value for __in filter '{key}' is not a list, "
328                            f"skipping."
329                        )
330                elif key not in ("sort_desc", "sort_field") and (
331                    not text_field or key != text_field
332                ):
333                    # Collect keys for filter_by, excluding sort/text search
334                    # keys
335                    exact_match_kwargs[key] = val
336
337            # Apply special filters using filter()
338            for _filter_key, (
339                filter_method,
340                filter_value,
341            ) in special_filters.items():
342                stmnt = stmnt.filter(filter_method(filter_value))
343
344            # Apply sorting
345            sort_desc = kwargs.get("sort_desc")
346            sort_field = kwargs.get("sort_field")
347            if sort_field:
348                sort_attr = getattr(model, sort_field)
349                stmnt = stmnt.order_by(
350                    sort_attr.desc() if sort_desc else sort_attr
351                )
352
353            # Apply text search if applicable (assuming .match() is correct)
354            if text_field and text_field in kwargs:
355                search_val = kwargs[text_field]
356                stmnt = stmnt.where(
357                    getattr(model, text_field).match(search_val)
358                )
359                # Remove from exact_match_kwargs if it ended up there
360                exact_match_kwargs.pop(text_field, None)
361
362            # Apply exact matches using filter_by()
363            if exact_match_kwargs:
364                stmnt = stmnt.filter_by(**exact_match_kwargs)
365
366        # Apply relationship loading options (Check if key is a relationship
367        # first - simplified check)
368        if selectin and select_in_keys:
369            for key in select_in_keys:
370                # Basic check: Does the attribute exist and is it likely a
371                # relationship?
372                # A more robust check might involve inspecting
373                # model.__sqlmodel_relationships__
374                attr = getattr(model, key, None)
375                if (
376                    attr is not None
377                    and hasattr(attr, "property")
378                    and hasattr(attr.property, "mapper")
379                ):
380                    stmnt = stmnt.options(selectinload(attr))
381                else:
382                    logger.warning(
383                        f"Skipping selectinload for non-relationship "
384                        f"attribute '{key}' on model {model.__name__}"
385                    )
386
387        if lazy and lazy_load_keys:
388            for key in lazy_load_keys:
389                attr = getattr(model, key, None)
390                if (
391                    attr is not None
392                    and hasattr(attr, "property")
393                    and hasattr(attr.property, "mapper")
394                ):
395                    stmnt = stmnt.options(lazyload(attr))
396                else:
397                    logger.warning(
398                        f"Skipping lazyload for non-relationship attribute  "
399                        f"'{key}' on model {model.__name__}"
400                    )
401
402    # Apply pagination
403    stmnt = stmnt.offset((page - 1) * page_size).limit(
404        page_size
405    )  # Corrected offset calculation
406
407    _result = session_inst.exec(stmnt)
408    results = _result.all()
409    success = True if len(results) > 0 else False
410
411    return success, results
Parameters
  • session_inst:
  • model:
  • selectin:
  • select_in_keys:
  • lazy:
  • lazy_load_keys:
  • page_size:
  • page:
  • text_field:
  • stmnt:
  • kwargs:
Returns
def get_rows_within_id_list( id_str_list: list[str | int], session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], pk_field: str = 'id'):
414def get_rows_within_id_list(
415    id_str_list: list[str | int],
416    session_inst: Session,
417    model: type[SQLModel],
418    pk_field: str = "id",
419):
420    """
421    Retrieves rows from the database whose primary key is within the provided
422    list.
423
424    :param id_str_list: List of primary key values to fetch.
425    :param session_inst: SQLAlchemy Session instance.
426    :param model: SQLModel class representing the table.
427    :param pk_field: Name of the primary key field (default: "id").
428    :return: Tuple[bool, list[SQLModel]]: A tuple containing a success flag
429             (True if rows were found, False otherwise) and a list of the
430             found model instances.
431    """
432    if not id_str_list:  # Handle empty input list
433        return False, []
434
435    stmnt = select(model).where(getattr(model, pk_field).in_(id_str_list))
436    results = session_inst.exec(stmnt).all()  # Fetch all results into a list
437
438    success = len(results) > 0  # Success is true only if results were found
439
440    return success, results

Retrieves rows from the database whose primary key is within the provided list.

Parameters
  • id_str_list: List of primary key values to fetch.
  • session_inst: SQLAlchemy Session instance.
  • model: SQLModel class representing the table.
  • pk_field: Name of the primary key field (default: "id").
Returns

Tuple[bool, list[SQLModel]]: A tuple containing a success flag (True if rows were found, False otherwise) and a list of the found model instances.

def insert_data_rows(data_rows, session_inst: sqlmodel.orm.session.Session):
117def insert_data_rows(data_rows, session_inst: Session):
118    """
119
120    :param data_rows:
121    :param session_inst:
122    :return:
123    """
124    try:
125        session_inst.add_all(data_rows)
126        session_inst.commit()
127
128        return True, data_rows
129
130    except Exception as e:
131        logger.error(
132            f"Writing data rows to table failed. See error message: "
133            f"{type(e), e, e.args}"
134        )
135        logger.info(
136            "Attempting to write individual entries. This can be a "
137            "bit taxing, so please consider your payload to the DB"
138        )
139
140        session_inst.rollback()
141        processed_rows, failed_rows = [], []
142        for row in data_rows:
143            success, processed_row = write_row(row, session_inst=session_inst)
144            if not success:
145                failed_rows.append(row)
146            else:
147                processed_rows.append(row)
148
149        if processed_rows:
150            status = True
151        else:
152            status = (False,)
153        return status, {"success": processed_rows, "failed": failed_rows}
Parameters
  • data_rows:
  • session_inst:
Returns
def update_row( id_str: int | str, data: dict, session_inst: sqlmodel.orm.session.Session, model: type[sqlmodel.main.SQLModel], pk_field: str = 'id'):
512def update_row(
513    id_str: int | str,
514    data: dict,
515    session_inst: Session,
516    model: type[SQLModel],
517    pk_field: str = "id",
518):
519    """
520
521    :param id_str:
522    :param data:
523    :param session_inst:
524    :param model:
525    :param pk_field:
526    :return:
527    """
528    success = False
529    stmnt = select(model).where(getattr(model, pk_field) == id_str)
530    results = session_inst.exec(stmnt)
531
532    row = results.one_or_none()
533
534    if row:
535        [setattr(row, k, v) for k, v in data.items()]
536        try:
537            session_inst.add(row)
538            session_inst.commit()
539            success = True
540        except Exception as e:
541            session_inst.rollback()
542            logger.error(
543                f"Updating the data row failed. See error messages: "
544                f"{type(e), e, e.args}"
545            )
546        return success, row
547    else:
548        return success, None
Parameters
  • id_str:
  • data:
  • session_inst:
  • model:
  • pk_field:
Returns
def write_row( data_row: sqlmodel.main.SQLModel, session_inst: sqlmodel.orm.session.Session):
 93def write_row(data_row: SQLModel, session_inst: Session):
 94    """
 95    Writes a new instance of an SQLModel ORM model to the database, with an
 96    exception catch that rolls back the session in the event of failure.
 97
 98    :param data_row: SQLModel
 99    :param session_inst: Session
100    :return: Tuple[bool, ScalarResult]
101    """
102    try:
103        session_inst.add(data_row)
104        session_inst.commit()
105
106        return True, data_row
107    except Exception as e:
108        session_inst.rollback()
109        logger.error(
110            f"Writing data row to table failed. See error message: "
111            f"{type(e), e, e.args}"
112        )
113
114        return False, None

Writes a new instance of an SQLModel ORM model to the database, with an exception catch that rolls back the session in the event of failure.

Parameters
  • data_row: SQLModel
  • session_inst: Session
Returns

Tuple[bool, ScalarResult]

async def a_bulk_upsert_mappings( payload: list, session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], pk_fields: list[str] | None = None):
479async def bulk_upsert_mappings(
480    payload: list,
481    session_inst: AsyncSession,
482    model: type[SQLModel],
483    pk_fields: list[str] | None = None,
484):
485    """
486
487    :param payload:
488    :param session_inst:
489    :param model:
490    :param pk_fields:
491    :return:
492    """
493    if not pk_fields:
494        pk_fields = ["id"]
495    stmnt = upsert(model).values(payload)
496    stmnt = stmnt.on_conflict_do_update(
497        index_elements=[getattr(model, x) for x in pk_fields],
498        set_={k: getattr(stmnt.excluded, k) for k in payload[0].keys()},
499    )
500    await session_inst.exec(stmnt)
501
502    results = await session_inst.scalars(
503        stmnt.returning(model), execution_options={"populate_existing": True}
504    )
505
506    await session_inst.commit()
507
508    return True, results.all()
Parameters
  • payload:
  • session_inst:
  • model:
  • pk_fields:
Returns
async def a_delete_row( id_str: str | int, session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], pk_field: str = 'id'):
442async def delete_row(
443    id_str: str | int,
444    session_inst: AsyncSession,
445    model: type[SQLModel],
446    pk_field: str = "id",
447):
448    """
449
450    :param id_str:
451    :param session_inst:
452    :param model:
453    :param pk_field:
454    :return:
455    """
456    success = False
457    stmnt = select(model).where(getattr(model, pk_field) == id_str)
458    results = await session_inst.exec(stmnt)
459
460    row = results.one_or_none()
461
462    if not row:
463        pass
464    else:
465        try:
466            await session_inst.delete(row)
467            await session_inst.commit()
468            success = True
469        except Exception as e:
470            logger.error(
471                f"Failed to delete data row. Please see error messages here: "
472                f"{type(e), e, e.args}"
473            )
474            await session_inst.rollback()
475
476    return success
Parameters
  • id_str:
  • session_inst:
  • model:
  • pk_field:
Returns
async def a_get_one_or_create( session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], create_method_kwargs: dict | None = None, selectin: bool = False, select_in_key: str | None = None, **kwargs):
45async def get_one_or_create(
46    session_inst: AsyncSession,
47    model: type[SQLModel],
48    create_method_kwargs: dict | None = None,
49    selectin: bool = False,
50    select_in_key: str | None = None,
51    **kwargs,
52):
53    """
54    This function either returns an existing data row from the database or
55    creates a new instance and saves it to the DB.
56
57    :param session_inst: AsyncSession
58    :param model: SQLModel ORM
59    :param create_method_kwargs: dict
60    :param selectin: bool
61    :param select_in_key: str | None
62    :param kwargs: keyword args
63    :return: Tuple[Row, bool]
64    """
65
66    async def _get_entry(sqlmodel, **key_args):
67        stmnt = select(sqlmodel).filter_by(**key_args)
68        results = await get_result_from_query(query=stmnt, session=session_inst)
69
70        if results:
71            if selectin and select_in_key:
72                stmnt = stmnt.options(
73                    selectinload(getattr(sqlmodel, select_in_key))
74                )
75                results = await get_result_from_query(
76                    query=stmnt, session=session_inst
77                )
78            return results, True
79        else:
80            return results, False
81
82    results, exists = await _get_entry(model, **kwargs)
83    if results:
84        return results, exists
85    else:
86        kwargs.update(create_method_kwargs or {})
87        created = model()
88        [setattr(created, k, v) for k, v in kwargs.items()]
89        session_inst.add(created)
90        await session_inst.commit()
91        return created, False

This function either returns an existing data row from the database or creates a new instance and saves it to the DB.

Parameters
  • session_inst: AsyncSession
  • model: SQLModel ORM
  • create_method_kwargs: dict
  • selectin: bool
  • select_in_key: str | None
  • kwargs: keyword args
Returns

Tuple[Row, bool]

async def a_get_result_from_query( query: sqlmodel.sql._expression_select_cls.SelectOfScalar, session: sqlmodel.ext.asyncio.session.AsyncSession):
24async def get_result_from_query(query: SelectOfScalar, session: AsyncSession):
25    """
26    Processes an SQLModel query object and returns a singular result from the
27    return payload. If more than one row is returned, then only the first row is
28    returned. If no rows are available, then a null value is returned.
29
30    :param query: SelectOfScalar
31    :param session: AsyncSession
32
33    :return: Row
34    """
35    results = await session.exec(query)
36    try:
37        results = results.one_or_none()
38    except MultipleResultsFound:
39        results = await session.exec(query)
40        results = results.first()
41
42    return results

Processes an SQLModel query object and returns a singular result from the return payload. If more than one row is returned, then only the first row is returned. If no rows are available, then a null value is returned.

Parameters
  • query: SelectOfScalar
  • session: AsyncSession
Returns

Row

async def a_get_row( id_str: str | int, session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], selectin: bool = False, select_in_keys: list[str] | None = None, lazy: bool = False, lazy_load_keys: list[str] | None = None, pk_field: str = 'id'):
159async def get_row(
160    id_str: str | int,
161    session_inst: AsyncSession,
162    model: type[SQLModel],
163    selectin: bool = False,
164    select_in_keys: list[str] | None = None,
165    lazy: bool = False,
166    lazy_load_keys: list[str] | None = None,
167    pk_field: str = "id",
168):
169    """
170
171    :param id_str:
172    :param session_inst:
173    :param model:
174    :param selectin:
175    :param select_in_keys:
176    :param lazy:
177    :param lazy_load_keys:
178    :param pk_field:
179    :return:
180    """
181    stmnt = select(model).where(getattr(model, pk_field) == id_str)
182    if selectin and select_in_keys:
183        if isinstance(select_in_keys, list) is False:
184            select_in_keys = [select_in_keys]
185
186        for key in select_in_keys:
187            stmnt = stmnt.options(selectinload(getattr(model, key)))
188    if lazy and lazy_load_keys:
189        if isinstance(lazy_load_keys, list) is False:
190            lazy_load_keys = [lazy_load_keys]
191        for key in lazy_load_keys:
192            stmnt = stmnt.options(lazyload(getattr(model, key)))
193    results = await session_inst.exec(stmnt)
194
195    row = results.one_or_none()
196
197    if not row:
198        success = False
199    else:
200        success = True
201
202    return success, row
Parameters
  • id_str:
  • session_inst:
  • model:
  • selectin:
  • select_in_keys:
  • lazy:
  • lazy_load_keys:
  • pk_field:
Returns
async def a_get_rows( session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], selectin: bool = False, select_in_keys: list[str] | None = None, lazy: bool = False, lazy_load_keys: list[str] | None = None, page_size: int = 100, page: int = 1, text_field: str | None = None, stmnt: sqlmodel.sql._expression_select_cls.SelectOfScalar | None = None, **kwargs):
205async def get_rows(
206    session_inst: AsyncSession,
207    model: type[SQLModel],
208    selectin: bool = False,
209    select_in_keys: list[str] | None = None,
210    lazy: bool = False,
211    lazy_load_keys: list[str] | None = None,
212    page_size: int = 100,
213    page: int = 1,
214    text_field: str | None = None,
215    stmnt: SelectOfScalar | None = None,
216    **kwargs,
217):
218    """
219
220    :param session_inst:
221    :param model:
222    :param selectin:
223    :param select_in_keys:
224    :param lazy:
225    :param lazy_load_keys:
226    :param page_size:
227    :param page:
228    :param text_field:
229    :param stmnt:
230    :param kwargs:
231    :return:
232    """
233    # kwargs = {k: v for k, v in kwargs.items() if v}
234    # Inside get_rows (sync and async versions)
235
236    # ... existing code ...
237    if stmnt is None:
238        stmnt = select(model)
239        if kwargs:
240            # Separate special filter keys from exact match keys
241            exact_match_kwargs = {}
242            special_filters = {}
243
244            keys_to_process = list(kwargs.keys())  # Iterate over a copy
245
246            for key in keys_to_process:
247                val = kwargs[key]
248                if "__like" in key:
249                    model_key = key.replace("__like", "")
250                    special_filters[key] = (
251                        getattr(model, model_key).like,
252                        f"%{val}%",
253                    )  # Adapt for like
254                elif "__gte" in key:
255                    model_key = key.replace("__gte", "")
256                    parsed_val = (
257                        date_parse(val)
258                        if "date" in key
259                        and isinstance(val, str)
260                        and is_date(val, fuzzy=False)
261                        else (
262                            int(val)
263                            if isinstance(val, str) and val.isdigit()
264                            else val
265                        )
266                    )
267                    special_filters[key] = (
268                        getattr(model, model_key).__ge__,
269                        parsed_val,
270                    )
271                elif "__lte" in key:
272                    model_key = key.replace("__lte", "")
273                    parsed_val = (
274                        date_parse(val)
275                        if "date" in key
276                        and isinstance(val, str)
277                        and is_date(val, fuzzy=False)
278                        else (
279                            int(val)
280                            if isinstance(val, str) and val.isdigit()
281                            else val
282                        )
283                    )
284                    special_filters[key] = (
285                        getattr(model, model_key).__le__,
286                        parsed_val,
287                    )
288                elif "__gt" in key:  # Add __gt if needed
289                    model_key = key.replace("__gt", "")
290                    parsed_val = (
291                        date_parse(val)
292                        if "date" in key
293                        and isinstance(val, str)
294                        and is_date(val, fuzzy=False)
295                        else (
296                            int(val)
297                            if isinstance(val, str) and val.isdigit()
298                            else val
299                        )
300                    )
301                    special_filters[key] = (
302                        getattr(model, model_key).__gt__,
303                        parsed_val,
304                    )
305                elif "__lt" in key:  # Add __lt if needed
306                    model_key = key.replace("__lt", "")
307                    parsed_val = (
308                        date_parse(val)
309                        if "date" in key
310                        and isinstance(val, str)
311                        and is_date(val, fuzzy=False)
312                        else (
313                            int(val)
314                            if isinstance(val, str) and val.isdigit()
315                            else val
316                        )
317                    )
318                    special_filters[key] = (
319                        getattr(model, model_key).__lt__,
320                        parsed_val,
321                    )
322                elif "__in" in key:  # Add __in if needed
323                    model_key = key.replace("__in", "")
324                    if isinstance(val, list):
325                        special_filters[key] = (
326                            getattr(model, model_key).in_,
327                            val,
328                        )
329                    else:
330                        logger.warning(
331                            f"Value for __in filter '{key}' is not a list, "
332                            f"skipping."
333                        )
334                elif key not in ("sort_desc", "sort_field") and (
335                    not text_field or key != text_field
336                ):
337                    # Collect keys for filter_by, excluding sort/text search
338                    # keys
339                    exact_match_kwargs[key] = val
340
341            # Apply special filters using filter()
342            for _filter_key, (
343                filter_method,
344                filter_value,
345            ) in special_filters.items():
346                stmnt = stmnt.filter(filter_method(filter_value))
347
348            # Apply sorting
349            sort_desc = kwargs.get("sort_desc")
350            sort_field = kwargs.get("sort_field")
351            if sort_field:
352                sort_attr = getattr(model, sort_field)
353                stmnt = stmnt.order_by(
354                    sort_attr.desc() if sort_desc else sort_attr
355                )
356
357            # Apply text search if applicable (assuming .match() is correct)
358            if text_field and text_field in kwargs:
359                search_val = kwargs[text_field]
360                stmnt = stmnt.where(
361                    getattr(model, text_field).match(search_val)
362                )
363                # Remove from exact_match_kwargs if it ended up there
364                exact_match_kwargs.pop(text_field, None)
365
366            # Apply exact matches using filter_by()
367            if exact_match_kwargs:
368                stmnt = stmnt.filter_by(**exact_match_kwargs)
369
370        # Apply relationship loading options (Check if key is a relationship
371        # first - simplified check)
372        if selectin and select_in_keys:
373            for key in select_in_keys:
374                # Basic check: Does the attribute exist and is it likely a
375                # relationship?
376                # A more robust check might involve inspecting
377                # model.__sqlmodel_relationships__
378                attr = getattr(model, key, None)
379                if (
380                    attr is not None
381                    and hasattr(attr, "property")
382                    and hasattr(attr.property, "mapper")
383                ):
384                    stmnt = stmnt.options(selectinload(attr))
385                else:
386                    logger.warning(
387                        f"Skipping selectinload for non-relationship "
388                        f"attribute '{key}' on model {model.__name__}"
389                    )
390
391        if lazy and lazy_load_keys:
392            for key in lazy_load_keys:
393                attr = getattr(model, key, None)
394                if (
395                    attr is not None
396                    and hasattr(attr, "property")
397                    and hasattr(attr.property, "mapper")
398                ):
399                    stmnt = stmnt.options(lazyload(attr))
400                else:
401                    logger.warning(
402                        f"Skipping lazyload for non-relationship attribute "
403                        f"'{key}' on model {model.__name__}"
404                    )
405
406    # Apply pagination
407    stmnt = stmnt.offset((page - 1) * page_size).limit(
408        page_size
409    )  # Corrected offset calculation
410    _result = await session_inst.exec(stmnt)
411    results = _result.all()
412    success = True if len(results) > 0 else False
413
414    return success, results
Parameters
  • session_inst:
  • model:
  • selectin:
  • select_in_keys:
  • lazy:
  • lazy_load_keys:
  • page_size:
  • page:
  • text_field:
  • stmnt:
  • kwargs:
Returns
async def a_get_rows_within_id_list( id_str_list: list[str | int], session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], pk_field: str = 'id'):
417async def get_rows_within_id_list(
418    id_str_list: list[str | int],
419    session_inst: AsyncSession,
420    model: type[SQLModel],
421    pk_field: str = "id",
422):
423    """
424
425    :param id_str_list:
426    :param session_inst:
427    :param model:
428    :param pk_field:
429    :return:
430    """
431    stmnt = select(model).where(getattr(model, pk_field).in_(id_str_list))
432    results = await session_inst.exec(stmnt)
433
434    if results:
435        success = True
436    else:
437        success = False
438
439    return success, results
Parameters
  • id_str_list:
  • session_inst:
  • model:
  • pk_field:
Returns
async def a_insert_data_rows(data_rows, session_inst: sqlmodel.ext.asyncio.session.AsyncSession):
118async def insert_data_rows(data_rows, session_inst: AsyncSession):
119    """
120
121    :param data_rows:
122    :param session_inst:
123    :return:
124    """
125    try:
126        session_inst.add_all(data_rows)
127        await session_inst.commit()
128
129        return True, data_rows
130
131    except Exception as e:
132        logger.error(
133            f"Writing data rows to table failed. See error message: "
134            f"{type(e), e, e.args}"
135        )
136        logger.info(
137            "Attempting to write individual entries. This can be a "
138            "bit taxing, so please consider your payload to the DB"
139        )
140
141        await session_inst.rollback()
142        processed_rows, failed_rows = [], []
143        for row in data_rows:
144            success, processed_row = await write_row(
145                row, session_inst=session_inst
146            )
147            if not success:
148                failed_rows.append(row)
149            else:
150                processed_rows.append(row)
151
152        if processed_rows:
153            status = True
154        else:
155            status = (False,)
156        return status, {"success": processed_rows, "failed": failed_rows}
Parameters
  • data_rows:
  • session_inst:
Returns
async def a_update_row( id_str: int | str, data: dict, session_inst: sqlmodel.ext.asyncio.session.AsyncSession, model: type[sqlmodel.main.SQLModel], pk_field: str = 'id'):
511async def update_row(
512    id_str: int | str,
513    data: dict,
514    session_inst: AsyncSession,
515    model: type[SQLModel],
516    pk_field: str = "id",
517):
518    """
519
520    :param id_str:
521    :param data:
522    :param session_inst:
523    :param model:
524    :param pk_field:
525    :return:
526    """
527    success = False
528    stmnt = select(model).where(getattr(model, pk_field) == id_str)
529    results = await session_inst.exec(stmnt)
530
531    row = results.one_or_none()
532
533    if row:
534        [setattr(row, k, v) for k, v in data.items()]
535        try:
536            session_inst.add(row)
537            await session_inst.commit()
538            success = True
539        except Exception as e:
540            await session_inst.rollback()
541            logger.error(
542                f"Updating the data row failed. See error messages: "
543                f"{type(e), e, e.args}"
544            )
545        return success, row
546    else:
547        return success, None
Parameters
  • id_str:
  • data:
  • session_inst:
  • model:
  • pk_field:
Returns
async def a_write_row( data_row: sqlmodel.main.SQLModel, session_inst: sqlmodel.ext.asyncio.session.AsyncSession):
 94async def write_row(data_row: SQLModel, session_inst: AsyncSession):
 95    """
 96    Writes a new instance of an SQLModel ORM model to the database, with an
 97    exception catch that rolls back the session in the event of failure.
 98
 99    :param data_row: SQLModel
100    :param session_inst: AsyncSession
101    :return: Tuple[bool, ScalarResult]
102    """
103    try:
104        session_inst.add(data_row)
105        await session_inst.commit()
106
107        return True, data_row
108    except Exception as e:
109        await session_inst.rollback()
110        logger.error(
111            f"Writing data row to table failed. See error message: "
112            f"{type(e), e, e.args}"
113        )
114
115        return False, None

Writes a new instance of an SQLModel ORM model to the database, with an exception catch that rolls back the session in the event of failure.

Parameters
  • data_row: SQLModel
  • session_inst: AsyncSession
Returns

Tuple[bool, ScalarResult]

class SQLModelCRUDError(builtins.Exception):
34class SQLModelCRUDError(Exception):
35    """
36    Base exception for all sqlmodel-crud-utils errors.
37
38    This is the root exception class for the library. All other exceptions
39    inherit from this class, allowing users to catch all library-specific
40    errors with a single except clause.
41
42    Example:
43        >>> try:
44        ...     # Any library operation
45        ...     pass
46        ... except SQLModelCRUDError as e:
47        ...     # Handle any library error
48        ...     logger.error(f"CRUD operation failed: {e}")
49    """
50
51    pass

Base exception for all sqlmodel-crud-utils errors.

This is the root exception class for the library. All other exceptions inherit from this class, allowing users to catch all library-specific errors with a single except clause.

Example:

try: ... # Any library operation ... pass ... except SQLModelCRUDError as e: ... # Handle any library error ... logger.error(f"CRUD operation failed: {e}")

class RecordNotFoundError(sqlmodel_crud_utils.SQLModelCRUDError):
 54class RecordNotFoundError(SQLModelCRUDError):
 55    """
 56    Raised when a requested database record is not found.
 57
 58    This exception is raised when attempting to retrieve, update, or delete
 59    a record that doesn't exist in the database. It stores contextual
 60    information about the model, ID value, and primary key field for
 61    better debugging.
 62
 63    Attributes:
 64        model: The SQLModel class that was being queried.
 65        id_value: The primary key value that was searched for.
 66        pk_field: The name of the primary key field (default: "id").
 67
 68    Example:
 69        >>> from models import User
 70        >>> raise RecordNotFoundError(
 71        ...     model=User,
 72        ...     id_value=123,
 73        ...     pk_field="user_id"
 74        ... )
 75        RecordNotFoundError: User with user_id=123 not found
 76    """
 77
 78    def __init__(
 79        self,
 80        model: type,
 81        id_value: Any,
 82        pk_field: str = "id",
 83    ):
 84        """
 85        Initialize RecordNotFoundError.
 86
 87        Args:
 88            model: The SQLModel class being queried.
 89            id_value: The primary key value that wasn't found.
 90            pk_field: Name of the primary key field (default: "id").
 91        """
 92        self.model = model
 93        self.id_value = id_value
 94        self.pk_field = pk_field
 95
 96        # Create a helpful error message
 97        model_name = (
 98            model.__name__ if hasattr(model, "__name__") else str(model)
 99        )
100        super().__init__(f"{model_name} with {pk_field}={id_value!r} not found")

Raised when a requested database record is not found.

This exception is raised when attempting to retrieve, update, or delete a record that doesn't exist in the database. It stores contextual information about the model, ID value, and primary key field for better debugging.

Attributes: model: The SQLModel class that was being queried. id_value: The primary key value that was searched for. pk_field: The name of the primary key field (default: "id").

Example:

from models import User raise RecordNotFoundError( ... model=User, ... id_value=123, ... pk_field="user_id" ... ) RecordNotFoundError: User with user_id=123 not found

RecordNotFoundError(model: type, id_value: Any, pk_field: str = 'id')
 78    def __init__(
 79        self,
 80        model: type,
 81        id_value: Any,
 82        pk_field: str = "id",
 83    ):
 84        """
 85        Initialize RecordNotFoundError.
 86
 87        Args:
 88            model: The SQLModel class being queried.
 89            id_value: The primary key value that wasn't found.
 90            pk_field: Name of the primary key field (default: "id").
 91        """
 92        self.model = model
 93        self.id_value = id_value
 94        self.pk_field = pk_field
 95
 96        # Create a helpful error message
 97        model_name = (
 98            model.__name__ if hasattr(model, "__name__") else str(model)
 99        )
100        super().__init__(f"{model_name} with {pk_field}={id_value!r} not found")

Initialize RecordNotFoundError.

Args: model: The SQLModel class being queried. id_value: The primary key value that wasn't found. pk_field: Name of the primary key field (default: "id").

model
id_value
pk_field
class MultipleRecordsError(sqlmodel_crud_utils.SQLModelCRUDError):
103class MultipleRecordsError(SQLModelCRUDError):
104    """
105    Raised when multiple records are found where exactly one was expected.
106
107    This exception occurs when a query expected to return a single record
108    returns multiple results instead. This typically indicates a data integrity
109    issue or an incorrect query filter.
110
111    Attributes:
112        model: The SQLModel class that was being queried.
113        count: The number of records found.
114        filters: Optional dictionary of filters that were applied.
115
116    Example:
117        >>> from models import User
118        >>> raise MultipleRecordsError(
119        ...     model=User,
120        ...     count=3,
121        ...     filters={"email": "test@example.com"}
122        ... )
123        MultipleRecordsError: Expected 1 User, found 3
124            (filters: {'email': 'test@example.com'})
125    """
126
127    def __init__(
128        self,
129        model: type,
130        count: int,
131        filters: dict[str, Any] | None = None,
132    ):
133        """
134        Initialize MultipleRecordsError.
135
136        Args:
137            model: The SQLModel class being queried.
138            count: Number of records found.
139            filters: Optional dictionary of filters applied to the query.
140        """
141        self.model = model
142        self.count = count
143        self.filters = filters
144
145        # Create error message
146        model_name = (
147            model.__name__ if hasattr(model, "__name__") else str(model)
148        )
149        message = f"Expected 1 {model_name}, found {count}"
150
151        if filters:
152            message += f" (filters: {filters})"
153
154        super().__init__(message)

Raised when multiple records are found where exactly one was expected.

This exception occurs when a query expected to return a single record returns multiple results instead. This typically indicates a data integrity issue or an incorrect query filter.

Attributes: model: The SQLModel class that was being queried. count: The number of records found. filters: Optional dictionary of filters that were applied.

Example:

from models import User raise MultipleRecordsError( ... model=User, ... count=3, ... filters={"email": "test@example.com"} ... ) MultipleRecordsError: Expected 1 User, found 3 (filters: {'email': 'test@example.com'})

MultipleRecordsError( model: type, count: int, filters: dict[str, typing.Any] | None = None)
127    def __init__(
128        self,
129        model: type,
130        count: int,
131        filters: dict[str, Any] | None = None,
132    ):
133        """
134        Initialize MultipleRecordsError.
135
136        Args:
137            model: The SQLModel class being queried.
138            count: Number of records found.
139            filters: Optional dictionary of filters applied to the query.
140        """
141        self.model = model
142        self.count = count
143        self.filters = filters
144
145        # Create error message
146        model_name = (
147            model.__name__ if hasattr(model, "__name__") else str(model)
148        )
149        message = f"Expected 1 {model_name}, found {count}"
150
151        if filters:
152            message += f" (filters: {filters})"
153
154        super().__init__(message)

Initialize MultipleRecordsError.

Args: model: The SQLModel class being queried. count: Number of records found. filters: Optional dictionary of filters applied to the query.

model
count
filters
class ValidationError(sqlmodel_crud_utils.SQLModelCRUDError):
157class ValidationError(SQLModelCRUDError):
158    """
159    Raised when data validation fails before or after a database operation.
160
161    This exception is used for various validation failures, including:
162    - Invalid field values
163    - Missing required fields
164    - Type mismatches
165    - Business logic validation failures
166
167    Attributes:
168        field: Optional name of the field that failed validation.
169        value: Optional value that failed validation.
170        message: Detailed error message.
171        errors: Optional dictionary mapping field names to error messages.
172
173    Example:
174        >>> raise ValidationError(
175        ...     field="age",
176        ...     value=-5,
177        ...     message="Age must be a positive integer"
178        ... )
179        ValidationError: Validation failed for field 'age':
180            Age must be a positive integer (value: -5)
181
182        Multiple field errors:
183        >>> raise ValidationError(
184        ...     message="Multiple validation errors",
185        ...     errors={
186        ...         "email": "Invalid email format",
187        ...         "age": "Must be 18 or older"
188        ...     }
189        ... )
190    """
191
192    def __init__(
193        self,
194        message: str,
195        field: str | None = None,
196        value: Any | None = None,
197        errors: dict[str, str] | None = None,
198    ):
199        """
200        Initialize ValidationError.
201
202        Args:
203            message: Detailed error message.
204            field: Optional name of the field that failed.
205            value: Optional value that caused the validation failure.
206            errors: Optional dict mapping field names to error messages.
207        """
208        self.field = field
209        self.value = value
210        self.errors = errors
211
212        # Build comprehensive error message
213        if field:
214            full_message = f"Validation failed for field '{field}': {message}"
215            if value is not None:
216                full_message += f" (value: {value!r})"
217        else:
218            full_message = f"Validation failed: {message}"
219
220        if errors:
221            error_details = "\n".join(
222                f"  - {field}: {error}" for field, error in errors.items()
223            )
224            full_message += f"\n{error_details}"
225
226        super().__init__(full_message)

Raised when data validation fails before or after a database operation.

This exception is used for various validation failures, including:

  • Invalid field values
  • Missing required fields
  • Type mismatches
  • Business logic validation failures

Attributes: field: Optional name of the field that failed validation. value: Optional value that failed validation. message: Detailed error message. errors: Optional dictionary mapping field names to error messages.

Example:

raise ValidationError( ... field="age", ... value=-5, ... message="Age must be a positive integer" ... ) ValidationError: Validation failed for field 'age': Age must be a positive integer (value: -5)

Multiple field errors:
>>> raise ValidationError(
...     message="Multiple validation errors",
...     errors={
...         "email": "Invalid email format",
...         "age": "Must be 18 or older"
...     }
... )
ValidationError( message: str, field: str | None = None, value: typing.Any | None = None, errors: dict[str, str] | None = None)
192    def __init__(
193        self,
194        message: str,
195        field: str | None = None,
196        value: Any | None = None,
197        errors: dict[str, str] | None = None,
198    ):
199        """
200        Initialize ValidationError.
201
202        Args:
203            message: Detailed error message.
204            field: Optional name of the field that failed.
205            value: Optional value that caused the validation failure.
206            errors: Optional dict mapping field names to error messages.
207        """
208        self.field = field
209        self.value = value
210        self.errors = errors
211
212        # Build comprehensive error message
213        if field:
214            full_message = f"Validation failed for field '{field}': {message}"
215            if value is not None:
216                full_message += f" (value: {value!r})"
217        else:
218            full_message = f"Validation failed: {message}"
219
220        if errors:
221            error_details = "\n".join(
222                f"  - {field}: {error}" for field, error in errors.items()
223            )
224            full_message += f"\n{error_details}"
225
226        super().__init__(full_message)

Initialize ValidationError.

Args: message: Detailed error message. field: Optional name of the field that failed. value: Optional value that caused the validation failure. errors: Optional dict mapping field names to error messages.

field
value
errors
class BulkOperationError(sqlmodel_crud_utils.SQLModelCRUDError):
229class BulkOperationError(SQLModelCRUDError):
230    """
231    Raised when bulk database operations partially or completely fail.
232
233    This exception is used when processing multiple records in a batch
234    operation, and some or all of the records fail to process. It provides
235    detailed information about the failure rate and individual errors.
236
237    Attributes:
238        total: Total number of records in the bulk operation.
239        failed: Number of records that failed to process.
240        errors: List of error messages or exception details for failed records.
241        successful: Optional list of successfully processed record IDs.
242        failed_records: Optional list of records that failed processing.
243
244    Example:
245        >>> raise BulkOperationError(
246        ...     total=100,
247        ...     failed=5,
248        ...     errors=[
249        ...         "Record 23: Duplicate key violation",
250        ...         "Record 45: Invalid foreign key",
251        ...         "Record 67: Missing required field",
252        ...         "Record 89: Data too long for column",
253        ...         "Record 91: Constraint violation"
254        ...     ]
255        ... )
256        BulkOperationError: Bulk operation failed: 5/100 records failed
257    """
258
259    def __init__(
260        self,
261        total: int,
262        failed: int,
263        errors: list[str | Exception],
264        successful: list[Any] | None = None,
265        failed_records: list[Any] | None = None,
266    ):
267        """
268        Initialize BulkOperationError.
269
270        Args:
271            total: Total number of records in the operation.
272            failed: Number of records that failed.
273            errors: List of error messages or exceptions for failures.
274            successful: Optional list of successfully processed items.
275            failed_records: Optional list of records that failed.
276        """
277        self.total = total
278        self.failed = failed
279        self.errors = errors
280        self.successful = successful
281        self.failed_records = failed_records
282
283        # Calculate success rate for message
284        success_rate = ((total - failed) / total * 100) if total > 0 else 0
285
286        message = (
287            f"Bulk operation failed: {failed}/{total} records failed "
288            f"({success_rate:.1f}% success rate)"
289        )
290
291        super().__init__(message)
292
293    def get_error_summary(self) -> str:
294        """
295        Get a formatted summary of all errors.
296
297        Returns:
298            A multi-line string with formatted error details.
299
300        Example:
301            >>> try:
302            ...     bulk_operation()
303            ... except BulkOperationError as e:
304            ...     print(e.get_error_summary())
305        """
306        summary = [str(self)]
307        summary.append("\nError details:")
308
309        for i, error in enumerate(self.errors, 1):
310            if isinstance(error, Exception):
311                summary.append(f"  {i}. {type(error).__name__}: {error}")
312            else:
313                summary.append(f"  {i}. {error}")
314
315        return "\n".join(summary)

Raised when bulk database operations partially or completely fail.

This exception is used when processing multiple records in a batch operation, and some or all of the records fail to process. It provides detailed information about the failure rate and individual errors.

Attributes: total: Total number of records in the bulk operation. failed: Number of records that failed to process. errors: List of error messages or exception details for failed records. successful: Optional list of successfully processed record IDs. failed_records: Optional list of records that failed processing.

Example:

raise BulkOperationError( ... total=100, ... failed=5, ... errors=[ ... "Record 23: Duplicate key violation", ... "Record 45: Invalid foreign key", ... "Record 67: Missing required field", ... "Record 89: Data too long for column", ... "Record 91: Constraint violation" ... ] ... ) BulkOperationError: Bulk operation failed: 5/100 records failed

BulkOperationError( total: int, failed: int, errors: list[str | Exception], successful: list[typing.Any] | None = None, failed_records: list[typing.Any] | None = None)
259    def __init__(
260        self,
261        total: int,
262        failed: int,
263        errors: list[str | Exception],
264        successful: list[Any] | None = None,
265        failed_records: list[Any] | None = None,
266    ):
267        """
268        Initialize BulkOperationError.
269
270        Args:
271            total: Total number of records in the operation.
272            failed: Number of records that failed.
273            errors: List of error messages or exceptions for failures.
274            successful: Optional list of successfully processed items.
275            failed_records: Optional list of records that failed.
276        """
277        self.total = total
278        self.failed = failed
279        self.errors = errors
280        self.successful = successful
281        self.failed_records = failed_records
282
283        # Calculate success rate for message
284        success_rate = ((total - failed) / total * 100) if total > 0 else 0
285
286        message = (
287            f"Bulk operation failed: {failed}/{total} records failed "
288            f"({success_rate:.1f}% success rate)"
289        )
290
291        super().__init__(message)

Initialize BulkOperationError.

Args: total: Total number of records in the operation. failed: Number of records that failed. errors: List of error messages or exceptions for failures. successful: Optional list of successfully processed items. failed_records: Optional list of records that failed.

total
failed
errors
successful
failed_records
def get_error_summary(self) -> str:
293    def get_error_summary(self) -> str:
294        """
295        Get a formatted summary of all errors.
296
297        Returns:
298            A multi-line string with formatted error details.
299
300        Example:
301            >>> try:
302            ...     bulk_operation()
303            ... except BulkOperationError as e:
304            ...     print(e.get_error_summary())
305        """
306        summary = [str(self)]
307        summary.append("\nError details:")
308
309        for i, error in enumerate(self.errors, 1):
310            if isinstance(error, Exception):
311                summary.append(f"  {i}. {type(error).__name__}: {error}")
312            else:
313                summary.append(f"  {i}. {error}")
314
315        return "\n".join(summary)

Get a formatted summary of all errors.

Returns: A multi-line string with formatted error details.

Example:

try: ... bulk_operation() ... except BulkOperationError as e: ... print(e.get_error_summary())

class TransactionError(sqlmodel_crud_utils.SQLModelCRUDError):
318class TransactionError(SQLModelCRUDError):
319    """
320    Raised when database transaction operations fail.
321
322    This exception is used for transaction-related failures, including:
323    - Failed commits
324    - Rollback errors
325    - Nested transaction issues
326    - Deadlock scenarios
327    - Connection issues during transaction
328
329    Attributes:
330        operation: The transaction operation that failed
331            (e.g., "commit", "rollback").
332        original_error: The underlying exception that caused the
333            failure.
334
335    Example:
336        >>> try:
337        ...     session.commit()
338        ... except Exception as e:
339        ...     raise TransactionError(
340        ...         operation="commit",
341        ...         original_error=e
342        ...     )
343        TransactionError: Transaction operation 'commit' failed:
344            IntegrityError(...)
345    """
346
347    def __init__(
348        self,
349        message: str | None = None,
350        operation: str | None = None,
351        original_error: Exception | None = None,
352    ):
353        """
354        Initialize TransactionError.
355
356        Args:
357            message: Optional custom error message.
358            operation: The transaction operation that failed.
359            original_error: The underlying exception that caused the failure.
360        """
361        self.operation = operation
362        self.original_error = original_error
363
364        # Build error message
365        if message:
366            full_message = message
367        elif operation and original_error:
368            error_name = type(original_error).__name__
369            full_message = (
370                f"Transaction operation '{operation}' failed: "
371                f"{error_name}({original_error})"
372            )
373        elif operation:
374            full_message = f"Transaction operation '{operation}' failed"
375        elif original_error:
376            full_message = f"Transaction failed: {original_error}"
377        else:
378            full_message = "Transaction operation failed"
379
380        super().__init__(full_message)

Raised when database transaction operations fail.

This exception is used for transaction-related failures, including:

  • Failed commits
  • Rollback errors
  • Nested transaction issues
  • Deadlock scenarios
  • Connection issues during transaction

Attributes: operation: The transaction operation that failed (e.g., "commit", "rollback"). original_error: The underlying exception that caused the failure.

Example:

try: ... session.commit() ... except Exception as e: ... raise TransactionError( ... operation="commit", ... original_error=e ... ) TransactionError: Transaction operation 'commit' failed: IntegrityError(...)

TransactionError( message: str | None = None, operation: str | None = None, original_error: Exception | None = None)
347    def __init__(
348        self,
349        message: str | None = None,
350        operation: str | None = None,
351        original_error: Exception | None = None,
352    ):
353        """
354        Initialize TransactionError.
355
356        Args:
357            message: Optional custom error message.
358            operation: The transaction operation that failed.
359            original_error: The underlying exception that caused the failure.
360        """
361        self.operation = operation
362        self.original_error = original_error
363
364        # Build error message
365        if message:
366            full_message = message
367        elif operation and original_error:
368            error_name = type(original_error).__name__
369            full_message = (
370                f"Transaction operation '{operation}' failed: "
371                f"{error_name}({original_error})"
372            )
373        elif operation:
374            full_message = f"Transaction operation '{operation}' failed"
375        elif original_error:
376            full_message = f"Transaction failed: {original_error}"
377        else:
378            full_message = "Transaction operation failed"
379
380        super().__init__(full_message)

Initialize TransactionError.

Args: message: Optional custom error message. operation: The transaction operation that failed. original_error: The underlying exception that caused the failure.

operation
original_error
@contextmanager
def transaction( session: sqlmodel.orm.session.Session) -> Generator[sqlmodel.orm.session.Session, NoneType, NoneType]:
18@contextmanager
19def transaction(session: Session) -> Generator[Session, None, None]:
20    """Context manager for synchronous database transactions.
21
22    Automatically commits the transaction on successful completion or
23    rolls back on any exception. The original exception is wrapped in a
24    TransactionError to provide additional context while preserving the
25    exception chain.
26
27    Args:
28        session: An active SQLModel Session instance
29
30    Yields:
31        Session: The same session instance for use within the context
32
33    Raises:
34        TransactionError: Wraps any exception that occurs during
35            the transaction, with the original exception available
36            via __cause__
37
38    Example:
39        Basic usage with automatic commit:
40
41        >>> from sqlmodel import Session, create_engine
42        >>> from sqlmodel_crud_utils import write_row, update_row
43        >>> from sqlmodel_crud_utils.transactions import transaction
44        >>>
45        >>> engine = create_engine("sqlite:///database.db")
46        >>> with Session(engine) as session:
47        ...     with transaction(session) as tx:
48        ...         user = write_row(User(name="Alice"), tx)
49        ...         update_row(
50        ...             user.id, {"email": "alice@example.com"}, User, tx
51        ...         )
52        ...         # Automatically commits here if no exceptions
53
54        Error handling with automatic rollback:
55
56        >>> from sqlmodel import Session
57        >>> from sqlmodel_crud_utils.transactions import transaction
58        >>> from sqlmodel_crud_utils.exceptions import TransactionError
59        >>>
60        >>> with Session(engine) as session:
61        ...     try:
62        ...         with transaction(session) as tx:
63        ...             user = write_row(User(name="Bob"), tx)
64        ...             # Some operation that fails
65        ...             raise ValueError("Invalid email format")
66        ...     except TransactionError as e:
67        ...         print(f"Transaction failed: {e}")
68        ...         print(f"Original error: {e.__cause__}")
69        ...         # Database automatically rolled back
70
71        Multiple operations in a single transaction:
72
73        >>> with Session(engine) as session:
74        ...     with transaction(session) as tx:
75        ...         # All operations succeed together or all are rolled back
76        ...         user = write_row(User(name="Charlie"), tx)
77        ...         profile = write_row(Profile(user_id=user.id), tx)
78        ...         update_row(user.id, {"active": True}, User, tx)
79
80    Notes:
81        - The session is committed only if no exceptions occur
82        - Any exception triggers an automatic rollback before re-raising
83        - The session remains usable after the context exits
84        - Nested transactions are not supported; use savepoints instead
85        - The session must not be in an existing transaction when entering
86    """
87    try:
88        yield session
89        session.commit()
90    except Exception as e:
91        session.rollback()
92        raise TransactionError(f"Transaction failed: {e}") from e

Context manager for synchronous database transactions.

Automatically commits the transaction on successful completion or rolls back on any exception. The original exception is wrapped in a TransactionError to provide additional context while preserving the exception chain.

Args: session: An active SQLModel Session instance

Yields: Session: The same session instance for use within the context

Raises: TransactionError: Wraps any exception that occurs during the transaction, with the original exception available via __cause__

Example: Basic usage with automatic commit:

>>> from sqlmodel import Session, create_engine
>>> from sqlmodel_crud_utils import write_row, update_row
>>> from sqlmodel_crud_utils.transactions import transaction
>>>
>>> engine = create_engine("sqlite:///database.db")
>>> with Session(engine) as session:
...     with transaction(session) as tx:
...         user = write_row(User(name="Alice"), tx)
...         update_row(
...             user.id, {"email": "alice@example.com"}, User, tx
...         )
...         # Automatically commits here if no exceptions

Error handling with automatic rollback:

>>> from sqlmodel import Session
>>> from sqlmodel_crud_utils.transactions import transaction
>>> from sqlmodel_crud_utils.exceptions import TransactionError
>>>
>>> with Session(engine) as session:
...     try:
...         with transaction(session) as tx:
...             user = write_row(User(name="Bob"), tx)
...             # Some operation that fails
...             raise ValueError("Invalid email format")
...     except TransactionError as e:
...         print(f"Transaction failed: {e}")
...         print(f"Original error: {e.__cause__}")
...         # Database automatically rolled back

Multiple operations in a single transaction:

>>> with Session(engine) as session:
...     with transaction(session) as tx:
...         # All operations succeed together or all are rolled back
...         user = write_row(User(name="Charlie"), tx)
...         profile = write_row(Profile(user_id=user.id), tx)
...         update_row(user.id, {"active": True}, User, tx)

Notes: - The session is committed only if no exceptions occur - Any exception triggers an automatic rollback before re-raising - The session remains usable after the context exits - Nested transactions are not supported; use savepoints instead - The session must not be in an existing transaction when entering

@asynccontextmanager
async def a_transaction( session: sqlmodel.ext.asyncio.session.AsyncSession) -> AsyncGenerator[sqlmodel.ext.asyncio.session.AsyncSession, NoneType]:
 95@asynccontextmanager
 96async def a_transaction(
 97    session: AsyncSession,
 98) -> AsyncGenerator[AsyncSession, None]:
 99    """Context manager for asynchronous database transactions.
100
101    Asynchronous version of the transaction() context manager. Provides
102    the same automatic commit/rollback behavior for async database
103    operations.
104
105    Args:
106        session: An active AsyncSession instance from
107            sqlmodel.ext.asyncio.session
108
109    Yields:
110        AsyncSession: The same session instance for use within the
111            async context
112
113    Raises:
114        TransactionError: Wraps any exception that occurs during
115            the transaction, with the original exception available
116            via __cause__
117
118    Example:
119        Basic async usage with automatic commit:
120
121        >>> from sqlmodel.ext.asyncio.session import AsyncSession
122        >>> from sqlalchemy.ext.asyncio import create_async_engine
123        >>> from sqlmodel_crud_utils import a_write_row, a_update_row
124        >>> from sqlmodel_crud_utils.transactions import a_transaction
125        >>>
126        >>> engine = create_async_engine("sqlite+aiosqlite:///database.db")
127        >>> async with AsyncSession(engine) as session:
128        ...     async with a_transaction(session) as tx:
129        ...         user = await a_write_row(
130        ...             User(name="Alice"), tx
131        ...         )
132        ...         await a_update_row(
133        ...             user.id, {"email": "alice@example.com"},
134        ...             User, tx
135        ...         )
136        ...         # Automatically commits here if no exceptions
137
138        Error handling with automatic rollback:
139
140        >>> from sqlmodel.ext.asyncio.session import AsyncSession
141        >>> from sqlmodel_crud_utils.transactions import a_transaction
142        >>> from sqlmodel_crud_utils.exceptions import TransactionError
143        >>>
144        >>> async with AsyncSession(engine) as session:
145        ...     try:
146        ...         async with a_transaction(session) as tx:
147        ...             user = await a_write_row(User(name="Bob"), tx)
148        ...             # Some async operation that fails
149        ...             raise ValueError("Invalid email format")
150        ...     except TransactionError as e:
151        ...         print(f"Transaction failed: {e}")
152        ...         print(f"Original error: {e.__cause__}")
153        ...         # Database automatically rolled back
154
155        Multiple async operations in a single transaction:
156
157        >>> async with AsyncSession(engine) as session:
158        ...     async with a_transaction(session) as tx:
159        ...         # All operations succeed together or all are rolled back
160        ...         user = await a_write_row(User(name="Charlie"), tx)
161        ...         profile = await a_write_row(Profile(user_id=user.id), tx)
162        ...         await a_update_row(user.id, {"active": True}, User, tx)
163
164        Using with FastAPI dependency injection:
165
166        >>> from fastapi import Depends, FastAPI
167        >>> from sqlmodel.ext.asyncio.session import AsyncSession
168        >>> from sqlmodel_crud_utils.transactions import a_transaction
169        >>>
170        >>> app = FastAPI()
171        >>>
172        >>> async def get_session():
173        ...     async with AsyncSession(engine) as session:
174        ...         yield session
175        >>>
176        >>> @app.post("/users")
177        >>> async def create_user(
178        ...     user_data: dict,
179        ...     session: AsyncSession = Depends(get_session)
180        ... ):
181        ...     async with a_transaction(session) as tx:
182        ...         user = await a_write_row(User(**user_data), tx)
183        ...         await a_write_row(AuditLog(action="user_created"), tx)
184        ...         return user
185
186    Notes:
187        - The session is committed only if no exceptions occur
188        - Any exception triggers an automatic rollback before re-raising
189        - The session remains usable after the context exits
190        - Nested transactions are not supported; use savepoints instead
191        - The session must not be in an existing transaction when entering
192        - Works seamlessly with asyncio, FastAPI, and other async frameworks
193    """
194    try:
195        yield session
196        await session.commit()
197    except Exception as e:
198        await session.rollback()
199        raise TransactionError(f"Transaction failed: {e}") from e

Context manager for asynchronous database transactions.

Asynchronous version of the transaction() context manager. Provides the same automatic commit/rollback behavior for async database operations.

Args: session: An active AsyncSession instance from sqlmodel.ext.asyncio.session

Yields: AsyncSession: The same session instance for use within the async context

Raises: TransactionError: Wraps any exception that occurs during the transaction, with the original exception available via __cause__

Example: Basic async usage with automatic commit:

>>> from sqlmodel.ext.asyncio.session import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlmodel_crud_utils import a_write_row, a_update_row
>>> from sqlmodel_crud_utils.transactions import a_transaction
>>>
>>> engine = create_async_engine("sqlite+aiosqlite:///database.db")
>>> async with AsyncSession(engine) as session:
...     async with a_transaction(session) as tx:
...         user = await a_write_row(
...             User(name="Alice"), tx
...         )
...         await a_update_row(
...             user.id, {"email": "alice@example.com"},
...             User, tx
...         )
...         # Automatically commits here if no exceptions

Error handling with automatic rollback:

>>> from sqlmodel.ext.asyncio.session import AsyncSession
>>> from sqlmodel_crud_utils.transactions import a_transaction
>>> from sqlmodel_crud_utils.exceptions import TransactionError
>>>
>>> async with AsyncSession(engine) as session:
...     try:
...         async with a_transaction(session) as tx:
...             user = await a_write_row(User(name="Bob"), tx)
...             # Some async operation that fails
...             raise ValueError("Invalid email format")
...     except TransactionError as e:
...         print(f"Transaction failed: {e}")
...         print(f"Original error: {e.__cause__}")
...         # Database automatically rolled back

Multiple async operations in a single transaction:

>>> async with AsyncSession(engine) as session:
...     async with a_transaction(session) as tx:
...         # All operations succeed together or all are rolled back
...         user = await a_write_row(User(name="Charlie"), tx)
...         profile = await a_write_row(Profile(user_id=user.id), tx)
...         await a_update_row(user.id, {"active": True}, User, tx)

Using with FastAPI dependency injection:

>>> from fastapi import Depends, FastAPI
>>> from sqlmodel.ext.asyncio.session import AsyncSession
>>> from sqlmodel_crud_utils.transactions import a_transaction
>>>
>>> app = FastAPI()
>>>
>>> async def get_session():
...     async with AsyncSession(engine) as session:
...         yield session
>>>
>>> @app.post("/users")
>>> async def create_user(
...     user_data: dict,
...     session: AsyncSession = Depends(get_session)
... ):
...     async with a_transaction(session) as tx:
...         user = await a_write_row(User(**user_data), tx)
...         await a_write_row(AuditLog(action="user_created"), tx)
...         return user

Notes: - The session is committed only if no exceptions occur - Any exception triggers an automatic rollback before re-raising - The session remains usable after the context exits - Nested transactions are not supported; use savepoints instead - The session must not be in an existing transaction when entering - Works seamlessly with asyncio, FastAPI, and other async frameworks

class AuditMixin:
19class AuditMixin:
20    """Mixin for automatic audit trail tracking.
21
22    Adds timestamp fields to track when records are created and updated,
23    along with optional user tracking fields.
24
25    Attributes:
26        created_at: Timestamp when the record was created (auto-set)
27        updated_at: Timestamp when the record was last updated
28            (auto-set on update)
29        created_by: Optional username/ID of creator
30        updated_by: Optional username/ID of last updater
31
32    Example:
33        Basic usage:
34
35        >>> from sqlmodel import SQLModel, Field
36        >>> from sqlmodel_crud_utils.mixins import AuditMixin
37        >>>
38        >>> class User(SQLModel, AuditMixin, table=True):
39        ...     id: Optional[int] = Field(default=None, primary_key=True)
40        ...     name: str
41        ...     email: str
42        >>>
43        >>> # When you create a User, created_at is automatically set
44        >>> user = User(name="Alice", email="alice@example.com")
45        >>> # user.created_at will be set to current UTC time
46
47        With user tracking:
48
49        >>> user = User(
50        ...     name="Bob",
51        ...     email="bob@example.com",
52        ...     created_by="admin"
53        ... )
54        >>> # Later when updating
55        >>> user.name = "Robert"
56        >>> user.updated_by = "admin"
57        >>> # updated_at will be automatically set on commit
58
59    Note:
60        The `updated_at` field uses SQLAlchemy's `onupdate` parameter to
61        automatically update the timestamp when the record is modified.
62    """
63
64    created_at: datetime = Field(
65        default_factory=_utc_now,
66        nullable=False,
67        index=True,
68        description="Timestamp when the record was created",
69    )
70    updated_at: Optional[datetime] = Field(
71        default=None,
72        sa_column_kwargs={"onupdate": _utc_now},
73        description="Timestamp when the record was last updated",
74    )
75    created_by: Optional[str] = Field(
76        default=None,
77        max_length=100,
78        description="Username or ID of the user who created this record",
79    )
80    updated_by: Optional[str] = Field(
81        default=None,
82        max_length=100,
83        description="Username or ID of the user who last updated this record",
84    )

Mixin for automatic audit trail tracking.

Adds timestamp fields to track when records are created and updated, along with optional user tracking fields.

Attributes: created_at: Timestamp when the record was created (auto-set) updated_at: Timestamp when the record was last updated (auto-set on update) created_by: Optional username/ID of creator updated_by: Optional username/ID of last updater

Example: Basic usage:

>>> from sqlmodel import SQLModel, Field
>>> from sqlmodel_crud_utils.mixins import AuditMixin
>>>
>>> class User(SQLModel, AuditMixin, table=True):
...     id: Optional[int] = Field(default=None, primary_key=True)
...     name: str
...     email: str
>>>
>>> # When you create a User, created_at is automatically set
>>> user = User(name="Alice", email="alice@example.com")
>>> # user.created_at will be set to current UTC time

With user tracking:

>>> user = User(
...     name="Bob",
...     email="bob@example.com",
...     created_by="admin"
... )
>>> # Later when updating
>>> user.name = "Robert"
>>> user.updated_by = "admin"
>>> # updated_at will be automatically set on commit

Note: The updated_at field uses SQLAlchemy's onupdate parameter to automatically update the timestamp when the record is modified.

created_at: datetime.datetime = FieldInfo(annotation=NoneType, required=False, default_factory=_utc_now, description='Timestamp when the record was created')
updated_at: Optional[datetime.datetime] = FieldInfo(annotation=NoneType, required=False, default=None, description='Timestamp when the record was last updated')
created_by: Optional[str] = FieldInfo(annotation=NoneType, required=False, default=None, description='Username or ID of the user who created this record', metadata=[MaxLen(max_length=100)])
updated_by: Optional[str] = FieldInfo(annotation=NoneType, required=False, default=None, description='Username or ID of the user who last updated this record', metadata=[MaxLen(max_length=100)])
class SoftDeleteMixin:
 87class SoftDeleteMixin:
 88    """Mixin for soft delete functionality.
 89
 90    Adds fields and methods to support soft deletion, where records are marked
 91    as deleted but not actually removed from the database. This allows for
 92    recovery and maintains referential integrity.
 93
 94    Attributes:
 95        deleted_at: Timestamp when the record was soft-deleted
 96        deleted_by: Optional username/ID of user who deleted the record
 97        is_deleted: Boolean flag indicating if record is deleted
 98
 99    Methods:
100        soft_delete: Mark the record as deleted
101        restore: Restore a soft-deleted record
102
103    Example:
104        Basic usage:
105
106        >>> from sqlmodel import SQLModel, Field
107        >>> from sqlmodel_crud_utils.mixins import SoftDeleteMixin
108        >>>
109        >>> class Product(SQLModel, SoftDeleteMixin, table=True):
110        ...     id: Optional[int] = Field(default=None, primary_key=True)
111        ...     name: str
112        ...     price: float
113        >>>
114        >>> product = Product(name="Widget", price=9.99)
115        >>> # Later, soft delete it
116        >>> product.soft_delete(user="admin")
117        >>> # product.is_deleted is now True
118        >>> # product.deleted_at is set to current time
119        >>> # product.deleted_by is "admin"
120
121        Restoring a deleted record:
122
123        >>> product.restore()
124        >>> # product.is_deleted is now False
125        >>> # product.deleted_at is None
126        >>> # product.deleted_by is None
127
128        Combining with AuditMixin:
129
130        >>> class Order(SQLModel, AuditMixin, SoftDeleteMixin, table=True):
131        ...     id: Optional[int] = Field(default=None, primary_key=True)
132        ...     total: float
133        >>>
134        >>> # Now Order has both audit trail and soft delete support
135
136    Note:
137        When querying, you'll typically want to filter out soft-deleted records:
138        `select(Product).where(Product.is_deleted == False)`
139
140        The `get_rows` function in this library can be extended to automatically
141        exclude soft-deleted records by default.
142    """
143
144    deleted_at: Optional[datetime] = Field(
145        default=None,
146        index=True,
147        description="Timestamp when the record was soft-deleted",
148    )
149    deleted_by: Optional[str] = Field(
150        default=None,
151        max_length=100,
152        description="Username or ID of the user who deleted this record",
153    )
154    is_deleted: bool = Field(
155        default=False,
156        index=True,
157        description="Flag indicating if the record is soft-deleted",
158    )
159
160    def soft_delete(self, user: Optional[str] = None) -> None:
161        """Mark this record as deleted.
162
163        Sets the is_deleted flag to True, records the deletion
164        timestamp, and optionally tracks which user performed the
165        deletion.
166
167        Args:
168            user: Optional username or user ID of the person
169                performing the deletion
170
171        Example:
172            >>> product.soft_delete(user="admin")
173            >>> assert product.is_deleted is True
174            >>> assert product.deleted_at is not None
175            >>> assert product.deleted_by == "admin"
176
177        Note:
178            This method only modifies the object in memory. You must still
179            commit the session to persist the changes to the database.
180        """
181        self.is_deleted = True
182        self.deleted_at = _utc_now()
183        self.deleted_by = user
184
185    def restore(self) -> None:
186        """Restore a soft-deleted record.
187
188        Clears the deletion flag and resets all deletion-related fields
189        to their default values.
190
191        Example:
192            >>> product.restore()
193            >>> assert product.is_deleted is False
194            >>> assert product.deleted_at is None
195            >>> assert product.deleted_by is None
196
197        Note:
198            This method only modifies the object in memory. You must still
199            commit the session to persist the changes to the database.
200        """
201        self.is_deleted = False
202        self.deleted_at = None
203        self.deleted_by = None

Mixin for soft delete functionality.

Adds fields and methods to support soft deletion, where records are marked as deleted but not actually removed from the database. This allows for recovery and maintains referential integrity.

Attributes: deleted_at: Timestamp when the record was soft-deleted deleted_by: Optional username/ID of user who deleted the record is_deleted: Boolean flag indicating if record is deleted

Methods: soft_delete: Mark the record as deleted restore: Restore a soft-deleted record

Example: Basic usage:

>>> from sqlmodel import SQLModel, Field
>>> from sqlmodel_crud_utils.mixins import SoftDeleteMixin
>>>
>>> class Product(SQLModel, SoftDeleteMixin, table=True):
...     id: Optional[int] = Field(default=None, primary_key=True)
...     name: str
...     price: float
>>>
>>> product = Product(name="Widget", price=9.99)
>>> # Later, soft delete it
>>> product.soft_delete(user="admin")
>>> # product.is_deleted is now True
>>> # product.deleted_at is set to current time
>>> # product.deleted_by is "admin"

Restoring a deleted record:

>>> product.restore()
>>> # product.is_deleted is now False
>>> # product.deleted_at is None
>>> # product.deleted_by is None

Combining with AuditMixin:

>>> class Order(SQLModel, AuditMixin, SoftDeleteMixin, table=True):
...     id: Optional[int] = Field(default=None, primary_key=True)
...     total: float
>>>
>>> # Now Order has both audit trail and soft delete support

Note: When querying, you'll typically want to filter out soft-deleted records: select(Product).where(Product.is_deleted == False)

The `get_rows` function in this library can be extended to automatically
exclude soft-deleted records by default.
deleted_at: Optional[datetime.datetime] = FieldInfo(annotation=NoneType, required=False, default=None, description='Timestamp when the record was soft-deleted')
deleted_by: Optional[str] = FieldInfo(annotation=NoneType, required=False, default=None, description='Username or ID of the user who deleted this record', metadata=[MaxLen(max_length=100)])
is_deleted: bool = FieldInfo(annotation=NoneType, required=False, default=False, description='Flag indicating if the record is soft-deleted')
def soft_delete(self, user: Optional[str] = None) -> None:
160    def soft_delete(self, user: Optional[str] = None) -> None:
161        """Mark this record as deleted.
162
163        Sets the is_deleted flag to True, records the deletion
164        timestamp, and optionally tracks which user performed the
165        deletion.
166
167        Args:
168            user: Optional username or user ID of the person
169                performing the deletion
170
171        Example:
172            >>> product.soft_delete(user="admin")
173            >>> assert product.is_deleted is True
174            >>> assert product.deleted_at is not None
175            >>> assert product.deleted_by == "admin"
176
177        Note:
178            This method only modifies the object in memory. You must still
179            commit the session to persist the changes to the database.
180        """
181        self.is_deleted = True
182        self.deleted_at = _utc_now()
183        self.deleted_by = user

Mark this record as deleted.

Sets the is_deleted flag to True, records the deletion timestamp, and optionally tracks which user performed the deletion.

Args: user: Optional username or user ID of the person performing the deletion

Example:

product.soft_delete(user="admin") assert product.is_deleted is True assert product.deleted_at is not None assert product.deleted_by == "admin"

Note: This method only modifies the object in memory. You must still commit the session to persist the changes to the database.

def restore(self) -> None:
185    def restore(self) -> None:
186        """Restore a soft-deleted record.
187
188        Clears the deletion flag and resets all deletion-related fields
189        to their default values.
190
191        Example:
192            >>> product.restore()
193            >>> assert product.is_deleted is False
194            >>> assert product.deleted_at is None
195            >>> assert product.deleted_by is None
196
197        Note:
198            This method only modifies the object in memory. You must still
199            commit the session to persist the changes to the database.
200        """
201        self.is_deleted = False
202        self.deleted_at = None
203        self.deleted_by = None

Restore a soft-deleted record.

Clears the deletion flag and resets all deletion-related fields to their default values.

Example:

product.restore() assert product.is_deleted is False assert product.deleted_at is None assert product.deleted_by is None

Note: This method only modifies the object in memory. You must still commit the session to persist the changes to the database.