sqlmodel_crud_utils
SQLModel CRUD Utilities
A set of CRUD utilities to expedite operations with SQLModel, providing both synchronous and asynchronous support for common database operations.
New in v0.2.0: - Custom exception hierarchy for better error handling - Transaction context managers for safe operations - Audit trail mixins (created_at, updated_at tracking) - Soft delete support (is_deleted flag) - Public API exports for easier imports
1""" 2SQLModel CRUD Utilities 3 4A set of CRUD utilities to expedite operations with SQLModel, providing both 5synchronous and asynchronous support for common database operations. 6 7New in v0.2.0: 8 - Custom exception hierarchy for better error handling 9 - Transaction context managers for safe operations 10 - Audit trail mixins (created_at, updated_at tracking) 11 - Soft delete support (is_deleted flag) 12 - Public API exports for easier imports 13""" 14 15__version__ = "0.2.0" 16 17# Import asynchronous functions with a_ prefix 18from sqlmodel_crud_utils.a_sync import ( 19 bulk_upsert_mappings as a_bulk_upsert_mappings, 20) 21from sqlmodel_crud_utils.a_sync import delete_row as a_delete_row 22from sqlmodel_crud_utils.a_sync import get_one_or_create as a_get_one_or_create 23from sqlmodel_crud_utils.a_sync import ( 24 get_result_from_query as a_get_result_from_query, 25) 26from sqlmodel_crud_utils.a_sync import get_row as a_get_row 27from sqlmodel_crud_utils.a_sync import get_rows as a_get_rows 28from sqlmodel_crud_utils.a_sync import ( 29 get_rows_within_id_list as a_get_rows_within_id_list, 30) 31from sqlmodel_crud_utils.a_sync import insert_data_rows as a_insert_data_rows 32from sqlmodel_crud_utils.a_sync import update_row as a_update_row 33from sqlmodel_crud_utils.a_sync import write_row as a_write_row 34 35# Import exceptions 36from sqlmodel_crud_utils.exceptions import ( 37 BulkOperationError, 38 MultipleRecordsError, 39 RecordNotFoundError, 40 SQLModelCRUDError, 41 TransactionError, 42 ValidationError, 43) 44 45# Import mixins 46from sqlmodel_crud_utils.mixins import AuditMixin, SoftDeleteMixin 47 48# Import synchronous functions 49from sqlmodel_crud_utils.sync import ( 50 bulk_upsert_mappings, 51 delete_row, 52 get_one_or_create, 53 get_result_from_query, 54 get_row, 55 get_rows, 56 get_rows_within_id_list, 57 insert_data_rows, 58 update_row, 59 write_row, 60) 61 62# Import transaction managers 63from sqlmodel_crud_utils.transactions import a_transaction, transaction 64 65__all__ = [ 66 # Version 67 "__version__", 68 # Synchronous functions 69 "bulk_upsert_mappings", 70 "delete_row", 71 "get_one_or_create", 72 "get_result_from_query", 73 "get_row", 74 "get_rows", 75 "get_rows_within_id_list", 76 "insert_data_rows", 77 "update_row", 78 "write_row", 79 # Asynchronous functions 80 "a_bulk_upsert_mappings", 81 "a_delete_row", 82 "a_get_one_or_create", 83 "a_get_result_from_query", 84 "a_get_row", 85 "a_get_rows", 86 "a_get_rows_within_id_list", 87 "a_insert_data_rows", 88 "a_update_row", 89 "a_write_row", 90 # Exceptions 91 "SQLModelCRUDError", 92 "RecordNotFoundError", 93 "MultipleRecordsError", 94 "ValidationError", 95 "BulkOperationError", 96 "TransactionError", 97 # Transaction managers 98 "transaction", 99 "a_transaction", 100 # Mixins 101 "AuditMixin", 102 "SoftDeleteMixin", 103]
480def bulk_upsert_mappings( 481 payload: list, 482 session_inst: Session, 483 model: type[SQLModel], 484 pk_fields: list[str] | None = None, 485): 486 """ 487 488 :param payload: 489 :param session_inst: 490 :param model: 491 :param pk_fields: 492 :return: 493 """ 494 if not pk_fields: 495 pk_fields = ["id"] 496 stmnt = upsert(model).values(payload) 497 stmnt = stmnt.on_conflict_do_update( 498 index_elements=[getattr(model, x) for x in pk_fields], 499 set_={k: getattr(stmnt.excluded, k) for k in payload[0].keys()}, 500 ) 501 session_inst.exec(stmnt) 502 503 results = session_inst.scalars( 504 stmnt.returning(model), execution_options={"populate_existing": True} 505 ) 506 507 session_inst.commit() 508 509 return True, results.all()
Parameters
- payload:
- session_inst:
- model:
- pk_fields:
Returns
443def delete_row( 444 id_str: str | int, 445 session_inst: Session, 446 model: type[SQLModel], 447 pk_field: str = "id", 448): 449 """ 450 451 :param id_str: 452 :param session_inst: 453 :param model: 454 :param pk_field: 455 :return: 456 """ 457 success = False 458 stmnt = select(model).where(getattr(model, pk_field) == id_str) 459 results = session_inst.exec(stmnt) 460 461 row = results.one_or_none() 462 463 if not row: 464 pass 465 else: 466 try: 467 session_inst.delete(row) 468 session_inst.commit() 469 success = True 470 except Exception as e: 471 logger.error( 472 f"Failed to delete data row. Please see error messages here: " 473 f"{type(e), e, e.args}" 474 ) 475 session_inst.rollback() 476 477 return success
Parameters
- id_str:
- session_inst:
- model:
- pk_field:
Returns
44def get_one_or_create( 45 session_inst: Session, 46 model: type[SQLModel], 47 create_method_kwargs: dict | None = None, 48 selectin: bool = False, 49 select_in_key: str | None = None, 50 **kwargs, 51): 52 """ 53 This function either returns an existing data row from the database or 54 creates a new instance and saves it to the DB. 55 56 :param session_inst: Session 57 :param model: SQLModel ORM 58 :param create_method_kwargs: dict 59 :param selectin: bool 60 :param select_in_key: str | None 61 :param kwargs: keyword args 62 :return: Tuple[Row, bool] 63 """ 64 65 def _get_entry(sqlmodel, **key_args): 66 stmnt = select(sqlmodel).filter_by(**key_args) 67 results = get_result_from_query(query=stmnt, session=session_inst) 68 69 if results: 70 if selectin and select_in_key: 71 stmnt = stmnt.options( 72 selectinload(getattr(sqlmodel, select_in_key)) 73 ) 74 results = get_result_from_query( 75 query=stmnt, session=session_inst 76 ) 77 return results, True 78 else: 79 return results, False 80 81 results, exists = _get_entry(model, **kwargs) 82 if results: 83 return results, exists 84 else: 85 kwargs.update(create_method_kwargs or {}) 86 created = model() 87 [setattr(created, k, v) for k, v in kwargs.items()] 88 session_inst.add(created) 89 session_inst.commit() 90 return created, False
This function either returns an existing data row from the database or creates a new instance and saves it to the DB.
Parameters
- session_inst: Session
- model: SQLModel ORM
- create_method_kwargs: dict
- selectin: bool
- select_in_key: str | None
- kwargs: keyword args
Returns
Tuple[Row, bool]
23def get_result_from_query(query: SelectOfScalar, session: Session): 24 """ 25 Processes an SQLModel query object and returns a singular result from the 26 return payload. If more than one row is returned, then only the first row is 27 returned. If no rows are available, then a null value is returned. 28 29 :param query: SelectOfScalar 30 :param session: Session 31 32 :return: Row 33 """ 34 results = session.exec(query) 35 try: 36 results = results.one_or_none() 37 except MultipleResultsFound: 38 results = session.exec(query) 39 results = results.first() 40 41 return results
Processes an SQLModel query object and returns a singular result from the return payload. If more than one row is returned, then only the first row is returned. If no rows are available, then a null value is returned.
Parameters
- query: SelectOfScalar
- session: Session
Returns
Row
156def get_row( 157 id_str: str | int, 158 session_inst: Session, 159 model: type[SQLModel], 160 selectin: bool = False, 161 lazy: bool = False, 162 lazy_load_keys: list[str] | None = None, 163 select_in_keys: list[str] | None = None, 164 pk_field: str = "id", 165): 166 """ 167 168 :param id_str: 169 :param session_inst: 170 :param model: 171 :param selectin: 172 :param lazy: 173 :param lazy_load_keys: 174 :param select_in_keys: 175 :param pk_field: 176 :return: 177 """ 178 stmnt = select(model).where(getattr(model, pk_field) == id_str) 179 if selectin and select_in_keys: 180 if isinstance(select_in_keys, list) is False: 181 select_in_keys = [select_in_keys] 182 183 for key in select_in_keys: 184 stmnt = stmnt.options(selectinload(getattr(model, key))) 185 if lazy and lazy_load_keys: 186 if isinstance(lazy_load_keys, list) is False: 187 lazy_load_keys = [lazy_load_keys] 188 for key in lazy_load_keys: 189 stmnt = stmnt.options(lazyload(getattr(model, key))) 190 results = session_inst.exec(stmnt) 191 192 row = results.one_or_none() 193 194 if not row: 195 success = False 196 else: 197 success = True 198 199 return success, row
Parameters
- id_str:
- session_inst:
- model:
- selectin:
- lazy:
- lazy_load_keys:
- select_in_keys:
- pk_field:
Returns
202def get_rows( 203 session_inst: Session, 204 model: type[SQLModel], 205 selectin: bool = False, 206 select_in_keys: list[str] | None = None, 207 lazy: bool = False, 208 lazy_load_keys: list[str] | None = None, 209 page_size: int = 100, 210 page: int = 1, 211 text_field: str | None = None, 212 stmnt: SelectOfScalar | None = None, 213 **kwargs, 214): 215 """ 216 217 :param session_inst: 218 :param model: 219 :param selectin: 220 :param select_in_keys: 221 :param lazy: 222 :param lazy_load_keys: 223 :param page_size: 224 :param page: 225 :param text_field: 226 :param stmnt: 227 :param kwargs: 228 :return: 229 """ 230 # Inside get_rows (sync and async versions) 231 232 # ... existing code ... 233 if stmnt is None: 234 stmnt = select(model) 235 if kwargs: 236 # Separate special filter keys from exact match keys 237 exact_match_kwargs = {} 238 special_filters = {} 239 240 keys_to_process = list(kwargs.keys()) # Iterate over a copy 241 242 for key in keys_to_process: 243 val = kwargs[key] 244 if "__like" in key: 245 model_key = key.replace("__like", "") 246 special_filters[key] = ( 247 getattr(model, model_key).like, 248 f"%{val}%", 249 ) # Adapt for like 250 elif "__gte" in key: 251 model_key = key.replace("__gte", "") 252 parsed_val = ( 253 date_parse(val) 254 if "date" in key 255 and isinstance(val, str) 256 and is_date(val, fuzzy=False) 257 else ( 258 int(val) 259 if isinstance(val, str) and val.isdigit() 260 else val 261 ) 262 ) 263 special_filters[key] = ( 264 getattr(model, model_key).__ge__, 265 parsed_val, 266 ) 267 elif "__lte" in key: 268 model_key = key.replace("__lte", "") 269 parsed_val = ( 270 date_parse(val) 271 if "date" in key 272 and isinstance(val, str) 273 and is_date(val, fuzzy=False) 274 else ( 275 int(val) 276 if isinstance(val, str) and val.isdigit() 277 else val 278 ) 279 ) 280 special_filters[key] = ( 281 getattr(model, model_key).__le__, 282 parsed_val, 283 ) 284 elif "__gt" in key: # Add __gt if needed 285 model_key = key.replace("__gt", "") 286 parsed_val = ( 287 date_parse(val) 288 if "date" in key 289 and isinstance(val, str) 290 and is_date(val, fuzzy=False) 291 else ( 292 int(val) 293 if isinstance(val, str) and val.isdigit() 294 else val 295 ) 296 ) 297 special_filters[key] = ( 298 getattr(model, model_key).__gt__, 299 parsed_val, 300 ) 301 elif "__lt" in key: # Add __lt if needed 302 model_key = key.replace("__lt", "") 303 parsed_val = ( 304 date_parse(val) 305 if "date" in key 306 and isinstance(val, str) 307 and is_date(val, fuzzy=False) 308 else ( 309 int(val) 310 if isinstance(val, str) and val.isdigit() 311 else val 312 ) 313 ) 314 special_filters[key] = ( 315 getattr(model, model_key).__lt__, 316 parsed_val, 317 ) 318 elif "__in" in key: # Add __in if needed 319 model_key = key.replace("__in", "") 320 if isinstance(val, list): 321 special_filters[key] = ( 322 getattr(model, model_key).in_, 323 val, 324 ) 325 else: 326 logger.warning( 327 f"Value for __in filter '{key}' is not a list, " 328 f"skipping." 329 ) 330 elif key not in ("sort_desc", "sort_field") and ( 331 not text_field or key != text_field 332 ): 333 # Collect keys for filter_by, excluding sort/text search 334 # keys 335 exact_match_kwargs[key] = val 336 337 # Apply special filters using filter() 338 for _filter_key, ( 339 filter_method, 340 filter_value, 341 ) in special_filters.items(): 342 stmnt = stmnt.filter(filter_method(filter_value)) 343 344 # Apply sorting 345 sort_desc = kwargs.get("sort_desc") 346 sort_field = kwargs.get("sort_field") 347 if sort_field: 348 sort_attr = getattr(model, sort_field) 349 stmnt = stmnt.order_by( 350 sort_attr.desc() if sort_desc else sort_attr 351 ) 352 353 # Apply text search if applicable (assuming .match() is correct) 354 if text_field and text_field in kwargs: 355 search_val = kwargs[text_field] 356 stmnt = stmnt.where( 357 getattr(model, text_field).match(search_val) 358 ) 359 # Remove from exact_match_kwargs if it ended up there 360 exact_match_kwargs.pop(text_field, None) 361 362 # Apply exact matches using filter_by() 363 if exact_match_kwargs: 364 stmnt = stmnt.filter_by(**exact_match_kwargs) 365 366 # Apply relationship loading options (Check if key is a relationship 367 # first - simplified check) 368 if selectin and select_in_keys: 369 for key in select_in_keys: 370 # Basic check: Does the attribute exist and is it likely a 371 # relationship? 372 # A more robust check might involve inspecting 373 # model.__sqlmodel_relationships__ 374 attr = getattr(model, key, None) 375 if ( 376 attr is not None 377 and hasattr(attr, "property") 378 and hasattr(attr.property, "mapper") 379 ): 380 stmnt = stmnt.options(selectinload(attr)) 381 else: 382 logger.warning( 383 f"Skipping selectinload for non-relationship " 384 f"attribute '{key}' on model {model.__name__}" 385 ) 386 387 if lazy and lazy_load_keys: 388 for key in lazy_load_keys: 389 attr = getattr(model, key, None) 390 if ( 391 attr is not None 392 and hasattr(attr, "property") 393 and hasattr(attr.property, "mapper") 394 ): 395 stmnt = stmnt.options(lazyload(attr)) 396 else: 397 logger.warning( 398 f"Skipping lazyload for non-relationship attribute " 399 f"'{key}' on model {model.__name__}" 400 ) 401 402 # Apply pagination 403 stmnt = stmnt.offset((page - 1) * page_size).limit( 404 page_size 405 ) # Corrected offset calculation 406 407 _result = session_inst.exec(stmnt) 408 results = _result.all() 409 success = True if len(results) > 0 else False 410 411 return success, results
Parameters
- session_inst:
- model:
- selectin:
- select_in_keys:
- lazy:
- lazy_load_keys:
- page_size:
- page:
- text_field:
- stmnt:
- kwargs:
Returns
414def get_rows_within_id_list( 415 id_str_list: list[str | int], 416 session_inst: Session, 417 model: type[SQLModel], 418 pk_field: str = "id", 419): 420 """ 421 Retrieves rows from the database whose primary key is within the provided 422 list. 423 424 :param id_str_list: List of primary key values to fetch. 425 :param session_inst: SQLAlchemy Session instance. 426 :param model: SQLModel class representing the table. 427 :param pk_field: Name of the primary key field (default: "id"). 428 :return: Tuple[bool, list[SQLModel]]: A tuple containing a success flag 429 (True if rows were found, False otherwise) and a list of the 430 found model instances. 431 """ 432 if not id_str_list: # Handle empty input list 433 return False, [] 434 435 stmnt = select(model).where(getattr(model, pk_field).in_(id_str_list)) 436 results = session_inst.exec(stmnt).all() # Fetch all results into a list 437 438 success = len(results) > 0 # Success is true only if results were found 439 440 return success, results
Retrieves rows from the database whose primary key is within the provided list.
Parameters
- id_str_list: List of primary key values to fetch.
- session_inst: SQLAlchemy Session instance.
- model: SQLModel class representing the table.
- pk_field: Name of the primary key field (default: "id").
Returns
Tuple[bool, list[SQLModel]]: A tuple containing a success flag (True if rows were found, False otherwise) and a list of the found model instances.
117def insert_data_rows(data_rows, session_inst: Session): 118 """ 119 120 :param data_rows: 121 :param session_inst: 122 :return: 123 """ 124 try: 125 session_inst.add_all(data_rows) 126 session_inst.commit() 127 128 return True, data_rows 129 130 except Exception as e: 131 logger.error( 132 f"Writing data rows to table failed. See error message: " 133 f"{type(e), e, e.args}" 134 ) 135 logger.info( 136 "Attempting to write individual entries. This can be a " 137 "bit taxing, so please consider your payload to the DB" 138 ) 139 140 session_inst.rollback() 141 processed_rows, failed_rows = [], [] 142 for row in data_rows: 143 success, processed_row = write_row(row, session_inst=session_inst) 144 if not success: 145 failed_rows.append(row) 146 else: 147 processed_rows.append(row) 148 149 if processed_rows: 150 status = True 151 else: 152 status = (False,) 153 return status, {"success": processed_rows, "failed": failed_rows}
Parameters
- data_rows:
- session_inst:
Returns
512def update_row( 513 id_str: int | str, 514 data: dict, 515 session_inst: Session, 516 model: type[SQLModel], 517 pk_field: str = "id", 518): 519 """ 520 521 :param id_str: 522 :param data: 523 :param session_inst: 524 :param model: 525 :param pk_field: 526 :return: 527 """ 528 success = False 529 stmnt = select(model).where(getattr(model, pk_field) == id_str) 530 results = session_inst.exec(stmnt) 531 532 row = results.one_or_none() 533 534 if row: 535 [setattr(row, k, v) for k, v in data.items()] 536 try: 537 session_inst.add(row) 538 session_inst.commit() 539 success = True 540 except Exception as e: 541 session_inst.rollback() 542 logger.error( 543 f"Updating the data row failed. See error messages: " 544 f"{type(e), e, e.args}" 545 ) 546 return success, row 547 else: 548 return success, None
Parameters
- id_str:
- data:
- session_inst:
- model:
- pk_field:
Returns
93def write_row(data_row: SQLModel, session_inst: Session): 94 """ 95 Writes a new instance of an SQLModel ORM model to the database, with an 96 exception catch that rolls back the session in the event of failure. 97 98 :param data_row: SQLModel 99 :param session_inst: Session 100 :return: Tuple[bool, ScalarResult] 101 """ 102 try: 103 session_inst.add(data_row) 104 session_inst.commit() 105 106 return True, data_row 107 except Exception as e: 108 session_inst.rollback() 109 logger.error( 110 f"Writing data row to table failed. See error message: " 111 f"{type(e), e, e.args}" 112 ) 113 114 return False, None
Writes a new instance of an SQLModel ORM model to the database, with an exception catch that rolls back the session in the event of failure.
Parameters
- data_row: SQLModel
- session_inst: Session
Returns
Tuple[bool, ScalarResult]
479async def bulk_upsert_mappings( 480 payload: list, 481 session_inst: AsyncSession, 482 model: type[SQLModel], 483 pk_fields: list[str] | None = None, 484): 485 """ 486 487 :param payload: 488 :param session_inst: 489 :param model: 490 :param pk_fields: 491 :return: 492 """ 493 if not pk_fields: 494 pk_fields = ["id"] 495 stmnt = upsert(model).values(payload) 496 stmnt = stmnt.on_conflict_do_update( 497 index_elements=[getattr(model, x) for x in pk_fields], 498 set_={k: getattr(stmnt.excluded, k) for k in payload[0].keys()}, 499 ) 500 await session_inst.exec(stmnt) 501 502 results = await session_inst.scalars( 503 stmnt.returning(model), execution_options={"populate_existing": True} 504 ) 505 506 await session_inst.commit() 507 508 return True, results.all()
Parameters
- payload:
- session_inst:
- model:
- pk_fields:
Returns
442async def delete_row( 443 id_str: str | int, 444 session_inst: AsyncSession, 445 model: type[SQLModel], 446 pk_field: str = "id", 447): 448 """ 449 450 :param id_str: 451 :param session_inst: 452 :param model: 453 :param pk_field: 454 :return: 455 """ 456 success = False 457 stmnt = select(model).where(getattr(model, pk_field) == id_str) 458 results = await session_inst.exec(stmnt) 459 460 row = results.one_or_none() 461 462 if not row: 463 pass 464 else: 465 try: 466 await session_inst.delete(row) 467 await session_inst.commit() 468 success = True 469 except Exception as e: 470 logger.error( 471 f"Failed to delete data row. Please see error messages here: " 472 f"{type(e), e, e.args}" 473 ) 474 await session_inst.rollback() 475 476 return success
Parameters
- id_str:
- session_inst:
- model:
- pk_field:
Returns
45async def get_one_or_create( 46 session_inst: AsyncSession, 47 model: type[SQLModel], 48 create_method_kwargs: dict | None = None, 49 selectin: bool = False, 50 select_in_key: str | None = None, 51 **kwargs, 52): 53 """ 54 This function either returns an existing data row from the database or 55 creates a new instance and saves it to the DB. 56 57 :param session_inst: AsyncSession 58 :param model: SQLModel ORM 59 :param create_method_kwargs: dict 60 :param selectin: bool 61 :param select_in_key: str | None 62 :param kwargs: keyword args 63 :return: Tuple[Row, bool] 64 """ 65 66 async def _get_entry(sqlmodel, **key_args): 67 stmnt = select(sqlmodel).filter_by(**key_args) 68 results = await get_result_from_query(query=stmnt, session=session_inst) 69 70 if results: 71 if selectin and select_in_key: 72 stmnt = stmnt.options( 73 selectinload(getattr(sqlmodel, select_in_key)) 74 ) 75 results = await get_result_from_query( 76 query=stmnt, session=session_inst 77 ) 78 return results, True 79 else: 80 return results, False 81 82 results, exists = await _get_entry(model, **kwargs) 83 if results: 84 return results, exists 85 else: 86 kwargs.update(create_method_kwargs or {}) 87 created = model() 88 [setattr(created, k, v) for k, v in kwargs.items()] 89 session_inst.add(created) 90 await session_inst.commit() 91 return created, False
This function either returns an existing data row from the database or creates a new instance and saves it to the DB.
Parameters
- session_inst: AsyncSession
- model: SQLModel ORM
- create_method_kwargs: dict
- selectin: bool
- select_in_key: str | None
- kwargs: keyword args
Returns
Tuple[Row, bool]
24async def get_result_from_query(query: SelectOfScalar, session: AsyncSession): 25 """ 26 Processes an SQLModel query object and returns a singular result from the 27 return payload. If more than one row is returned, then only the first row is 28 returned. If no rows are available, then a null value is returned. 29 30 :param query: SelectOfScalar 31 :param session: AsyncSession 32 33 :return: Row 34 """ 35 results = await session.exec(query) 36 try: 37 results = results.one_or_none() 38 except MultipleResultsFound: 39 results = await session.exec(query) 40 results = results.first() 41 42 return results
Processes an SQLModel query object and returns a singular result from the return payload. If more than one row is returned, then only the first row is returned. If no rows are available, then a null value is returned.
Parameters
- query: SelectOfScalar
- session: AsyncSession
Returns
Row
159async def get_row( 160 id_str: str | int, 161 session_inst: AsyncSession, 162 model: type[SQLModel], 163 selectin: bool = False, 164 select_in_keys: list[str] | None = None, 165 lazy: bool = False, 166 lazy_load_keys: list[str] | None = None, 167 pk_field: str = "id", 168): 169 """ 170 171 :param id_str: 172 :param session_inst: 173 :param model: 174 :param selectin: 175 :param select_in_keys: 176 :param lazy: 177 :param lazy_load_keys: 178 :param pk_field: 179 :return: 180 """ 181 stmnt = select(model).where(getattr(model, pk_field) == id_str) 182 if selectin and select_in_keys: 183 if isinstance(select_in_keys, list) is False: 184 select_in_keys = [select_in_keys] 185 186 for key in select_in_keys: 187 stmnt = stmnt.options(selectinload(getattr(model, key))) 188 if lazy and lazy_load_keys: 189 if isinstance(lazy_load_keys, list) is False: 190 lazy_load_keys = [lazy_load_keys] 191 for key in lazy_load_keys: 192 stmnt = stmnt.options(lazyload(getattr(model, key))) 193 results = await session_inst.exec(stmnt) 194 195 row = results.one_or_none() 196 197 if not row: 198 success = False 199 else: 200 success = True 201 202 return success, row
Parameters
- id_str:
- session_inst:
- model:
- selectin:
- select_in_keys:
- lazy:
- lazy_load_keys:
- pk_field:
Returns
205async def get_rows( 206 session_inst: AsyncSession, 207 model: type[SQLModel], 208 selectin: bool = False, 209 select_in_keys: list[str] | None = None, 210 lazy: bool = False, 211 lazy_load_keys: list[str] | None = None, 212 page_size: int = 100, 213 page: int = 1, 214 text_field: str | None = None, 215 stmnt: SelectOfScalar | None = None, 216 **kwargs, 217): 218 """ 219 220 :param session_inst: 221 :param model: 222 :param selectin: 223 :param select_in_keys: 224 :param lazy: 225 :param lazy_load_keys: 226 :param page_size: 227 :param page: 228 :param text_field: 229 :param stmnt: 230 :param kwargs: 231 :return: 232 """ 233 # kwargs = {k: v for k, v in kwargs.items() if v} 234 # Inside get_rows (sync and async versions) 235 236 # ... existing code ... 237 if stmnt is None: 238 stmnt = select(model) 239 if kwargs: 240 # Separate special filter keys from exact match keys 241 exact_match_kwargs = {} 242 special_filters = {} 243 244 keys_to_process = list(kwargs.keys()) # Iterate over a copy 245 246 for key in keys_to_process: 247 val = kwargs[key] 248 if "__like" in key: 249 model_key = key.replace("__like", "") 250 special_filters[key] = ( 251 getattr(model, model_key).like, 252 f"%{val}%", 253 ) # Adapt for like 254 elif "__gte" in key: 255 model_key = key.replace("__gte", "") 256 parsed_val = ( 257 date_parse(val) 258 if "date" in key 259 and isinstance(val, str) 260 and is_date(val, fuzzy=False) 261 else ( 262 int(val) 263 if isinstance(val, str) and val.isdigit() 264 else val 265 ) 266 ) 267 special_filters[key] = ( 268 getattr(model, model_key).__ge__, 269 parsed_val, 270 ) 271 elif "__lte" in key: 272 model_key = key.replace("__lte", "") 273 parsed_val = ( 274 date_parse(val) 275 if "date" in key 276 and isinstance(val, str) 277 and is_date(val, fuzzy=False) 278 else ( 279 int(val) 280 if isinstance(val, str) and val.isdigit() 281 else val 282 ) 283 ) 284 special_filters[key] = ( 285 getattr(model, model_key).__le__, 286 parsed_val, 287 ) 288 elif "__gt" in key: # Add __gt if needed 289 model_key = key.replace("__gt", "") 290 parsed_val = ( 291 date_parse(val) 292 if "date" in key 293 and isinstance(val, str) 294 and is_date(val, fuzzy=False) 295 else ( 296 int(val) 297 if isinstance(val, str) and val.isdigit() 298 else val 299 ) 300 ) 301 special_filters[key] = ( 302 getattr(model, model_key).__gt__, 303 parsed_val, 304 ) 305 elif "__lt" in key: # Add __lt if needed 306 model_key = key.replace("__lt", "") 307 parsed_val = ( 308 date_parse(val) 309 if "date" in key 310 and isinstance(val, str) 311 and is_date(val, fuzzy=False) 312 else ( 313 int(val) 314 if isinstance(val, str) and val.isdigit() 315 else val 316 ) 317 ) 318 special_filters[key] = ( 319 getattr(model, model_key).__lt__, 320 parsed_val, 321 ) 322 elif "__in" in key: # Add __in if needed 323 model_key = key.replace("__in", "") 324 if isinstance(val, list): 325 special_filters[key] = ( 326 getattr(model, model_key).in_, 327 val, 328 ) 329 else: 330 logger.warning( 331 f"Value for __in filter '{key}' is not a list, " 332 f"skipping." 333 ) 334 elif key not in ("sort_desc", "sort_field") and ( 335 not text_field or key != text_field 336 ): 337 # Collect keys for filter_by, excluding sort/text search 338 # keys 339 exact_match_kwargs[key] = val 340 341 # Apply special filters using filter() 342 for _filter_key, ( 343 filter_method, 344 filter_value, 345 ) in special_filters.items(): 346 stmnt = stmnt.filter(filter_method(filter_value)) 347 348 # Apply sorting 349 sort_desc = kwargs.get("sort_desc") 350 sort_field = kwargs.get("sort_field") 351 if sort_field: 352 sort_attr = getattr(model, sort_field) 353 stmnt = stmnt.order_by( 354 sort_attr.desc() if sort_desc else sort_attr 355 ) 356 357 # Apply text search if applicable (assuming .match() is correct) 358 if text_field and text_field in kwargs: 359 search_val = kwargs[text_field] 360 stmnt = stmnt.where( 361 getattr(model, text_field).match(search_val) 362 ) 363 # Remove from exact_match_kwargs if it ended up there 364 exact_match_kwargs.pop(text_field, None) 365 366 # Apply exact matches using filter_by() 367 if exact_match_kwargs: 368 stmnt = stmnt.filter_by(**exact_match_kwargs) 369 370 # Apply relationship loading options (Check if key is a relationship 371 # first - simplified check) 372 if selectin and select_in_keys: 373 for key in select_in_keys: 374 # Basic check: Does the attribute exist and is it likely a 375 # relationship? 376 # A more robust check might involve inspecting 377 # model.__sqlmodel_relationships__ 378 attr = getattr(model, key, None) 379 if ( 380 attr is not None 381 and hasattr(attr, "property") 382 and hasattr(attr.property, "mapper") 383 ): 384 stmnt = stmnt.options(selectinload(attr)) 385 else: 386 logger.warning( 387 f"Skipping selectinload for non-relationship " 388 f"attribute '{key}' on model {model.__name__}" 389 ) 390 391 if lazy and lazy_load_keys: 392 for key in lazy_load_keys: 393 attr = getattr(model, key, None) 394 if ( 395 attr is not None 396 and hasattr(attr, "property") 397 and hasattr(attr.property, "mapper") 398 ): 399 stmnt = stmnt.options(lazyload(attr)) 400 else: 401 logger.warning( 402 f"Skipping lazyload for non-relationship attribute " 403 f"'{key}' on model {model.__name__}" 404 ) 405 406 # Apply pagination 407 stmnt = stmnt.offset((page - 1) * page_size).limit( 408 page_size 409 ) # Corrected offset calculation 410 _result = await session_inst.exec(stmnt) 411 results = _result.all() 412 success = True if len(results) > 0 else False 413 414 return success, results
Parameters
- session_inst:
- model:
- selectin:
- select_in_keys:
- lazy:
- lazy_load_keys:
- page_size:
- page:
- text_field:
- stmnt:
- kwargs:
Returns
417async def get_rows_within_id_list( 418 id_str_list: list[str | int], 419 session_inst: AsyncSession, 420 model: type[SQLModel], 421 pk_field: str = "id", 422): 423 """ 424 425 :param id_str_list: 426 :param session_inst: 427 :param model: 428 :param pk_field: 429 :return: 430 """ 431 stmnt = select(model).where(getattr(model, pk_field).in_(id_str_list)) 432 results = await session_inst.exec(stmnt) 433 434 if results: 435 success = True 436 else: 437 success = False 438 439 return success, results
Parameters
- id_str_list:
- session_inst:
- model:
- pk_field:
Returns
118async def insert_data_rows(data_rows, session_inst: AsyncSession): 119 """ 120 121 :param data_rows: 122 :param session_inst: 123 :return: 124 """ 125 try: 126 session_inst.add_all(data_rows) 127 await session_inst.commit() 128 129 return True, data_rows 130 131 except Exception as e: 132 logger.error( 133 f"Writing data rows to table failed. See error message: " 134 f"{type(e), e, e.args}" 135 ) 136 logger.info( 137 "Attempting to write individual entries. This can be a " 138 "bit taxing, so please consider your payload to the DB" 139 ) 140 141 await session_inst.rollback() 142 processed_rows, failed_rows = [], [] 143 for row in data_rows: 144 success, processed_row = await write_row( 145 row, session_inst=session_inst 146 ) 147 if not success: 148 failed_rows.append(row) 149 else: 150 processed_rows.append(row) 151 152 if processed_rows: 153 status = True 154 else: 155 status = (False,) 156 return status, {"success": processed_rows, "failed": failed_rows}
Parameters
- data_rows:
- session_inst:
Returns
511async def update_row( 512 id_str: int | str, 513 data: dict, 514 session_inst: AsyncSession, 515 model: type[SQLModel], 516 pk_field: str = "id", 517): 518 """ 519 520 :param id_str: 521 :param data: 522 :param session_inst: 523 :param model: 524 :param pk_field: 525 :return: 526 """ 527 success = False 528 stmnt = select(model).where(getattr(model, pk_field) == id_str) 529 results = await session_inst.exec(stmnt) 530 531 row = results.one_or_none() 532 533 if row: 534 [setattr(row, k, v) for k, v in data.items()] 535 try: 536 session_inst.add(row) 537 await session_inst.commit() 538 success = True 539 except Exception as e: 540 await session_inst.rollback() 541 logger.error( 542 f"Updating the data row failed. See error messages: " 543 f"{type(e), e, e.args}" 544 ) 545 return success, row 546 else: 547 return success, None
Parameters
- id_str:
- data:
- session_inst:
- model:
- pk_field:
Returns
94async def write_row(data_row: SQLModel, session_inst: AsyncSession): 95 """ 96 Writes a new instance of an SQLModel ORM model to the database, with an 97 exception catch that rolls back the session in the event of failure. 98 99 :param data_row: SQLModel 100 :param session_inst: AsyncSession 101 :return: Tuple[bool, ScalarResult] 102 """ 103 try: 104 session_inst.add(data_row) 105 await session_inst.commit() 106 107 return True, data_row 108 except Exception as e: 109 await session_inst.rollback() 110 logger.error( 111 f"Writing data row to table failed. See error message: " 112 f"{type(e), e, e.args}" 113 ) 114 115 return False, None
Writes a new instance of an SQLModel ORM model to the database, with an exception catch that rolls back the session in the event of failure.
Parameters
- data_row: SQLModel
- session_inst: AsyncSession
Returns
Tuple[bool, ScalarResult]
34class SQLModelCRUDError(Exception): 35 """ 36 Base exception for all sqlmodel-crud-utils errors. 37 38 This is the root exception class for the library. All other exceptions 39 inherit from this class, allowing users to catch all library-specific 40 errors with a single except clause. 41 42 Example: 43 >>> try: 44 ... # Any library operation 45 ... pass 46 ... except SQLModelCRUDError as e: 47 ... # Handle any library error 48 ... logger.error(f"CRUD operation failed: {e}") 49 """ 50 51 pass
Base exception for all sqlmodel-crud-utils errors.
This is the root exception class for the library. All other exceptions inherit from this class, allowing users to catch all library-specific errors with a single except clause.
Example:
try: ... # Any library operation ... pass ... except SQLModelCRUDError as e: ... # Handle any library error ... logger.error(f"CRUD operation failed: {e}")
54class RecordNotFoundError(SQLModelCRUDError): 55 """ 56 Raised when a requested database record is not found. 57 58 This exception is raised when attempting to retrieve, update, or delete 59 a record that doesn't exist in the database. It stores contextual 60 information about the model, ID value, and primary key field for 61 better debugging. 62 63 Attributes: 64 model: The SQLModel class that was being queried. 65 id_value: The primary key value that was searched for. 66 pk_field: The name of the primary key field (default: "id"). 67 68 Example: 69 >>> from models import User 70 >>> raise RecordNotFoundError( 71 ... model=User, 72 ... id_value=123, 73 ... pk_field="user_id" 74 ... ) 75 RecordNotFoundError: User with user_id=123 not found 76 """ 77 78 def __init__( 79 self, 80 model: type, 81 id_value: Any, 82 pk_field: str = "id", 83 ): 84 """ 85 Initialize RecordNotFoundError. 86 87 Args: 88 model: The SQLModel class being queried. 89 id_value: The primary key value that wasn't found. 90 pk_field: Name of the primary key field (default: "id"). 91 """ 92 self.model = model 93 self.id_value = id_value 94 self.pk_field = pk_field 95 96 # Create a helpful error message 97 model_name = ( 98 model.__name__ if hasattr(model, "__name__") else str(model) 99 ) 100 super().__init__(f"{model_name} with {pk_field}={id_value!r} not found")
Raised when a requested database record is not found.
This exception is raised when attempting to retrieve, update, or delete a record that doesn't exist in the database. It stores contextual information about the model, ID value, and primary key field for better debugging.
Attributes: model: The SQLModel class that was being queried. id_value: The primary key value that was searched for. pk_field: The name of the primary key field (default: "id").
Example:
from models import User raise RecordNotFoundError( ... model=User, ... id_value=123, ... pk_field="user_id" ... ) RecordNotFoundError: User with user_id=123 not found
78 def __init__( 79 self, 80 model: type, 81 id_value: Any, 82 pk_field: str = "id", 83 ): 84 """ 85 Initialize RecordNotFoundError. 86 87 Args: 88 model: The SQLModel class being queried. 89 id_value: The primary key value that wasn't found. 90 pk_field: Name of the primary key field (default: "id"). 91 """ 92 self.model = model 93 self.id_value = id_value 94 self.pk_field = pk_field 95 96 # Create a helpful error message 97 model_name = ( 98 model.__name__ if hasattr(model, "__name__") else str(model) 99 ) 100 super().__init__(f"{model_name} with {pk_field}={id_value!r} not found")
Initialize RecordNotFoundError.
Args: model: The SQLModel class being queried. id_value: The primary key value that wasn't found. pk_field: Name of the primary key field (default: "id").
103class MultipleRecordsError(SQLModelCRUDError): 104 """ 105 Raised when multiple records are found where exactly one was expected. 106 107 This exception occurs when a query expected to return a single record 108 returns multiple results instead. This typically indicates a data integrity 109 issue or an incorrect query filter. 110 111 Attributes: 112 model: The SQLModel class that was being queried. 113 count: The number of records found. 114 filters: Optional dictionary of filters that were applied. 115 116 Example: 117 >>> from models import User 118 >>> raise MultipleRecordsError( 119 ... model=User, 120 ... count=3, 121 ... filters={"email": "test@example.com"} 122 ... ) 123 MultipleRecordsError: Expected 1 User, found 3 124 (filters: {'email': 'test@example.com'}) 125 """ 126 127 def __init__( 128 self, 129 model: type, 130 count: int, 131 filters: dict[str, Any] | None = None, 132 ): 133 """ 134 Initialize MultipleRecordsError. 135 136 Args: 137 model: The SQLModel class being queried. 138 count: Number of records found. 139 filters: Optional dictionary of filters applied to the query. 140 """ 141 self.model = model 142 self.count = count 143 self.filters = filters 144 145 # Create error message 146 model_name = ( 147 model.__name__ if hasattr(model, "__name__") else str(model) 148 ) 149 message = f"Expected 1 {model_name}, found {count}" 150 151 if filters: 152 message += f" (filters: {filters})" 153 154 super().__init__(message)
Raised when multiple records are found where exactly one was expected.
This exception occurs when a query expected to return a single record returns multiple results instead. This typically indicates a data integrity issue or an incorrect query filter.
Attributes: model: The SQLModel class that was being queried. count: The number of records found. filters: Optional dictionary of filters that were applied.
Example:
from models import User raise MultipleRecordsError( ... model=User, ... count=3, ... filters={"email": "test@example.com"} ... ) MultipleRecordsError: Expected 1 User, found 3 (filters: {'email': 'test@example.com'})
127 def __init__( 128 self, 129 model: type, 130 count: int, 131 filters: dict[str, Any] | None = None, 132 ): 133 """ 134 Initialize MultipleRecordsError. 135 136 Args: 137 model: The SQLModel class being queried. 138 count: Number of records found. 139 filters: Optional dictionary of filters applied to the query. 140 """ 141 self.model = model 142 self.count = count 143 self.filters = filters 144 145 # Create error message 146 model_name = ( 147 model.__name__ if hasattr(model, "__name__") else str(model) 148 ) 149 message = f"Expected 1 {model_name}, found {count}" 150 151 if filters: 152 message += f" (filters: {filters})" 153 154 super().__init__(message)
Initialize MultipleRecordsError.
Args: model: The SQLModel class being queried. count: Number of records found. filters: Optional dictionary of filters applied to the query.
157class ValidationError(SQLModelCRUDError): 158 """ 159 Raised when data validation fails before or after a database operation. 160 161 This exception is used for various validation failures, including: 162 - Invalid field values 163 - Missing required fields 164 - Type mismatches 165 - Business logic validation failures 166 167 Attributes: 168 field: Optional name of the field that failed validation. 169 value: Optional value that failed validation. 170 message: Detailed error message. 171 errors: Optional dictionary mapping field names to error messages. 172 173 Example: 174 >>> raise ValidationError( 175 ... field="age", 176 ... value=-5, 177 ... message="Age must be a positive integer" 178 ... ) 179 ValidationError: Validation failed for field 'age': 180 Age must be a positive integer (value: -5) 181 182 Multiple field errors: 183 >>> raise ValidationError( 184 ... message="Multiple validation errors", 185 ... errors={ 186 ... "email": "Invalid email format", 187 ... "age": "Must be 18 or older" 188 ... } 189 ... ) 190 """ 191 192 def __init__( 193 self, 194 message: str, 195 field: str | None = None, 196 value: Any | None = None, 197 errors: dict[str, str] | None = None, 198 ): 199 """ 200 Initialize ValidationError. 201 202 Args: 203 message: Detailed error message. 204 field: Optional name of the field that failed. 205 value: Optional value that caused the validation failure. 206 errors: Optional dict mapping field names to error messages. 207 """ 208 self.field = field 209 self.value = value 210 self.errors = errors 211 212 # Build comprehensive error message 213 if field: 214 full_message = f"Validation failed for field '{field}': {message}" 215 if value is not None: 216 full_message += f" (value: {value!r})" 217 else: 218 full_message = f"Validation failed: {message}" 219 220 if errors: 221 error_details = "\n".join( 222 f" - {field}: {error}" for field, error in errors.items() 223 ) 224 full_message += f"\n{error_details}" 225 226 super().__init__(full_message)
Raised when data validation fails before or after a database operation.
This exception is used for various validation failures, including:
- Invalid field values
- Missing required fields
- Type mismatches
- Business logic validation failures
Attributes: field: Optional name of the field that failed validation. value: Optional value that failed validation. message: Detailed error message. errors: Optional dictionary mapping field names to error messages.
Example:
raise ValidationError( ... field="age", ... value=-5, ... message="Age must be a positive integer" ... ) ValidationError: Validation failed for field 'age': Age must be a positive integer (value: -5)
Multiple field errors: >>> raise ValidationError( ... message="Multiple validation errors", ... errors={ ... "email": "Invalid email format", ... "age": "Must be 18 or older" ... } ... )
192 def __init__( 193 self, 194 message: str, 195 field: str | None = None, 196 value: Any | None = None, 197 errors: dict[str, str] | None = None, 198 ): 199 """ 200 Initialize ValidationError. 201 202 Args: 203 message: Detailed error message. 204 field: Optional name of the field that failed. 205 value: Optional value that caused the validation failure. 206 errors: Optional dict mapping field names to error messages. 207 """ 208 self.field = field 209 self.value = value 210 self.errors = errors 211 212 # Build comprehensive error message 213 if field: 214 full_message = f"Validation failed for field '{field}': {message}" 215 if value is not None: 216 full_message += f" (value: {value!r})" 217 else: 218 full_message = f"Validation failed: {message}" 219 220 if errors: 221 error_details = "\n".join( 222 f" - {field}: {error}" for field, error in errors.items() 223 ) 224 full_message += f"\n{error_details}" 225 226 super().__init__(full_message)
Initialize ValidationError.
Args: message: Detailed error message. field: Optional name of the field that failed. value: Optional value that caused the validation failure. errors: Optional dict mapping field names to error messages.
229class BulkOperationError(SQLModelCRUDError): 230 """ 231 Raised when bulk database operations partially or completely fail. 232 233 This exception is used when processing multiple records in a batch 234 operation, and some or all of the records fail to process. It provides 235 detailed information about the failure rate and individual errors. 236 237 Attributes: 238 total: Total number of records in the bulk operation. 239 failed: Number of records that failed to process. 240 errors: List of error messages or exception details for failed records. 241 successful: Optional list of successfully processed record IDs. 242 failed_records: Optional list of records that failed processing. 243 244 Example: 245 >>> raise BulkOperationError( 246 ... total=100, 247 ... failed=5, 248 ... errors=[ 249 ... "Record 23: Duplicate key violation", 250 ... "Record 45: Invalid foreign key", 251 ... "Record 67: Missing required field", 252 ... "Record 89: Data too long for column", 253 ... "Record 91: Constraint violation" 254 ... ] 255 ... ) 256 BulkOperationError: Bulk operation failed: 5/100 records failed 257 """ 258 259 def __init__( 260 self, 261 total: int, 262 failed: int, 263 errors: list[str | Exception], 264 successful: list[Any] | None = None, 265 failed_records: list[Any] | None = None, 266 ): 267 """ 268 Initialize BulkOperationError. 269 270 Args: 271 total: Total number of records in the operation. 272 failed: Number of records that failed. 273 errors: List of error messages or exceptions for failures. 274 successful: Optional list of successfully processed items. 275 failed_records: Optional list of records that failed. 276 """ 277 self.total = total 278 self.failed = failed 279 self.errors = errors 280 self.successful = successful 281 self.failed_records = failed_records 282 283 # Calculate success rate for message 284 success_rate = ((total - failed) / total * 100) if total > 0 else 0 285 286 message = ( 287 f"Bulk operation failed: {failed}/{total} records failed " 288 f"({success_rate:.1f}% success rate)" 289 ) 290 291 super().__init__(message) 292 293 def get_error_summary(self) -> str: 294 """ 295 Get a formatted summary of all errors. 296 297 Returns: 298 A multi-line string with formatted error details. 299 300 Example: 301 >>> try: 302 ... bulk_operation() 303 ... except BulkOperationError as e: 304 ... print(e.get_error_summary()) 305 """ 306 summary = [str(self)] 307 summary.append("\nError details:") 308 309 for i, error in enumerate(self.errors, 1): 310 if isinstance(error, Exception): 311 summary.append(f" {i}. {type(error).__name__}: {error}") 312 else: 313 summary.append(f" {i}. {error}") 314 315 return "\n".join(summary)
Raised when bulk database operations partially or completely fail.
This exception is used when processing multiple records in a batch operation, and some or all of the records fail to process. It provides detailed information about the failure rate and individual errors.
Attributes: total: Total number of records in the bulk operation. failed: Number of records that failed to process. errors: List of error messages or exception details for failed records. successful: Optional list of successfully processed record IDs. failed_records: Optional list of records that failed processing.
Example:
raise BulkOperationError( ... total=100, ... failed=5, ... errors=[ ... "Record 23: Duplicate key violation", ... "Record 45: Invalid foreign key", ... "Record 67: Missing required field", ... "Record 89: Data too long for column", ... "Record 91: Constraint violation" ... ] ... ) BulkOperationError: Bulk operation failed: 5/100 records failed
259 def __init__( 260 self, 261 total: int, 262 failed: int, 263 errors: list[str | Exception], 264 successful: list[Any] | None = None, 265 failed_records: list[Any] | None = None, 266 ): 267 """ 268 Initialize BulkOperationError. 269 270 Args: 271 total: Total number of records in the operation. 272 failed: Number of records that failed. 273 errors: List of error messages or exceptions for failures. 274 successful: Optional list of successfully processed items. 275 failed_records: Optional list of records that failed. 276 """ 277 self.total = total 278 self.failed = failed 279 self.errors = errors 280 self.successful = successful 281 self.failed_records = failed_records 282 283 # Calculate success rate for message 284 success_rate = ((total - failed) / total * 100) if total > 0 else 0 285 286 message = ( 287 f"Bulk operation failed: {failed}/{total} records failed " 288 f"({success_rate:.1f}% success rate)" 289 ) 290 291 super().__init__(message)
Initialize BulkOperationError.
Args: total: Total number of records in the operation. failed: Number of records that failed. errors: List of error messages or exceptions for failures. successful: Optional list of successfully processed items. failed_records: Optional list of records that failed.
293 def get_error_summary(self) -> str: 294 """ 295 Get a formatted summary of all errors. 296 297 Returns: 298 A multi-line string with formatted error details. 299 300 Example: 301 >>> try: 302 ... bulk_operation() 303 ... except BulkOperationError as e: 304 ... print(e.get_error_summary()) 305 """ 306 summary = [str(self)] 307 summary.append("\nError details:") 308 309 for i, error in enumerate(self.errors, 1): 310 if isinstance(error, Exception): 311 summary.append(f" {i}. {type(error).__name__}: {error}") 312 else: 313 summary.append(f" {i}. {error}") 314 315 return "\n".join(summary)
Get a formatted summary of all errors.
Returns: A multi-line string with formatted error details.
Example:
try: ... bulk_operation() ... except BulkOperationError as e: ... print(e.get_error_summary())
318class TransactionError(SQLModelCRUDError): 319 """ 320 Raised when database transaction operations fail. 321 322 This exception is used for transaction-related failures, including: 323 - Failed commits 324 - Rollback errors 325 - Nested transaction issues 326 - Deadlock scenarios 327 - Connection issues during transaction 328 329 Attributes: 330 operation: The transaction operation that failed 331 (e.g., "commit", "rollback"). 332 original_error: The underlying exception that caused the 333 failure. 334 335 Example: 336 >>> try: 337 ... session.commit() 338 ... except Exception as e: 339 ... raise TransactionError( 340 ... operation="commit", 341 ... original_error=e 342 ... ) 343 TransactionError: Transaction operation 'commit' failed: 344 IntegrityError(...) 345 """ 346 347 def __init__( 348 self, 349 message: str | None = None, 350 operation: str | None = None, 351 original_error: Exception | None = None, 352 ): 353 """ 354 Initialize TransactionError. 355 356 Args: 357 message: Optional custom error message. 358 operation: The transaction operation that failed. 359 original_error: The underlying exception that caused the failure. 360 """ 361 self.operation = operation 362 self.original_error = original_error 363 364 # Build error message 365 if message: 366 full_message = message 367 elif operation and original_error: 368 error_name = type(original_error).__name__ 369 full_message = ( 370 f"Transaction operation '{operation}' failed: " 371 f"{error_name}({original_error})" 372 ) 373 elif operation: 374 full_message = f"Transaction operation '{operation}' failed" 375 elif original_error: 376 full_message = f"Transaction failed: {original_error}" 377 else: 378 full_message = "Transaction operation failed" 379 380 super().__init__(full_message)
Raised when database transaction operations fail.
This exception is used for transaction-related failures, including:
- Failed commits
- Rollback errors
- Nested transaction issues
- Deadlock scenarios
- Connection issues during transaction
Attributes: operation: The transaction operation that failed (e.g., "commit", "rollback"). original_error: The underlying exception that caused the failure.
Example:
try: ... session.commit() ... except Exception as e: ... raise TransactionError( ... operation="commit", ... original_error=e ... ) TransactionError: Transaction operation 'commit' failed: IntegrityError(...)
347 def __init__( 348 self, 349 message: str | None = None, 350 operation: str | None = None, 351 original_error: Exception | None = None, 352 ): 353 """ 354 Initialize TransactionError. 355 356 Args: 357 message: Optional custom error message. 358 operation: The transaction operation that failed. 359 original_error: The underlying exception that caused the failure. 360 """ 361 self.operation = operation 362 self.original_error = original_error 363 364 # Build error message 365 if message: 366 full_message = message 367 elif operation and original_error: 368 error_name = type(original_error).__name__ 369 full_message = ( 370 f"Transaction operation '{operation}' failed: " 371 f"{error_name}({original_error})" 372 ) 373 elif operation: 374 full_message = f"Transaction operation '{operation}' failed" 375 elif original_error: 376 full_message = f"Transaction failed: {original_error}" 377 else: 378 full_message = "Transaction operation failed" 379 380 super().__init__(full_message)
Initialize TransactionError.
Args: message: Optional custom error message. operation: The transaction operation that failed. original_error: The underlying exception that caused the failure.
18@contextmanager 19def transaction(session: Session) -> Generator[Session, None, None]: 20 """Context manager for synchronous database transactions. 21 22 Automatically commits the transaction on successful completion or 23 rolls back on any exception. The original exception is wrapped in a 24 TransactionError to provide additional context while preserving the 25 exception chain. 26 27 Args: 28 session: An active SQLModel Session instance 29 30 Yields: 31 Session: The same session instance for use within the context 32 33 Raises: 34 TransactionError: Wraps any exception that occurs during 35 the transaction, with the original exception available 36 via __cause__ 37 38 Example: 39 Basic usage with automatic commit: 40 41 >>> from sqlmodel import Session, create_engine 42 >>> from sqlmodel_crud_utils import write_row, update_row 43 >>> from sqlmodel_crud_utils.transactions import transaction 44 >>> 45 >>> engine = create_engine("sqlite:///database.db") 46 >>> with Session(engine) as session: 47 ... with transaction(session) as tx: 48 ... user = write_row(User(name="Alice"), tx) 49 ... update_row( 50 ... user.id, {"email": "alice@example.com"}, User, tx 51 ... ) 52 ... # Automatically commits here if no exceptions 53 54 Error handling with automatic rollback: 55 56 >>> from sqlmodel import Session 57 >>> from sqlmodel_crud_utils.transactions import transaction 58 >>> from sqlmodel_crud_utils.exceptions import TransactionError 59 >>> 60 >>> with Session(engine) as session: 61 ... try: 62 ... with transaction(session) as tx: 63 ... user = write_row(User(name="Bob"), tx) 64 ... # Some operation that fails 65 ... raise ValueError("Invalid email format") 66 ... except TransactionError as e: 67 ... print(f"Transaction failed: {e}") 68 ... print(f"Original error: {e.__cause__}") 69 ... # Database automatically rolled back 70 71 Multiple operations in a single transaction: 72 73 >>> with Session(engine) as session: 74 ... with transaction(session) as tx: 75 ... # All operations succeed together or all are rolled back 76 ... user = write_row(User(name="Charlie"), tx) 77 ... profile = write_row(Profile(user_id=user.id), tx) 78 ... update_row(user.id, {"active": True}, User, tx) 79 80 Notes: 81 - The session is committed only if no exceptions occur 82 - Any exception triggers an automatic rollback before re-raising 83 - The session remains usable after the context exits 84 - Nested transactions are not supported; use savepoints instead 85 - The session must not be in an existing transaction when entering 86 """ 87 try: 88 yield session 89 session.commit() 90 except Exception as e: 91 session.rollback() 92 raise TransactionError(f"Transaction failed: {e}") from e
Context manager for synchronous database transactions.
Automatically commits the transaction on successful completion or rolls back on any exception. The original exception is wrapped in a TransactionError to provide additional context while preserving the exception chain.
Args: session: An active SQLModel Session instance
Yields: Session: The same session instance for use within the context
Raises: TransactionError: Wraps any exception that occurs during the transaction, with the original exception available via __cause__
Example: Basic usage with automatic commit:
>>> from sqlmodel import Session, create_engine
>>> from sqlmodel_crud_utils import write_row, update_row
>>> from sqlmodel_crud_utils.transactions import transaction
>>>
>>> engine = create_engine("sqlite:///database.db")
>>> with Session(engine) as session:
... with transaction(session) as tx:
... user = write_row(User(name="Alice"), tx)
... update_row(
... user.id, {"email": "alice@example.com"}, User, tx
... )
... # Automatically commits here if no exceptions
Error handling with automatic rollback:
>>> from sqlmodel import Session
>>> from sqlmodel_crud_utils.transactions import transaction
>>> from sqlmodel_crud_utils.exceptions import TransactionError
>>>
>>> with Session(engine) as session:
... try:
... with transaction(session) as tx:
... user = write_row(User(name="Bob"), tx)
... # Some operation that fails
... raise ValueError("Invalid email format")
... except TransactionError as e:
... print(f"Transaction failed: {e}")
... print(f"Original error: {e.__cause__}")
... # Database automatically rolled back
Multiple operations in a single transaction:
>>> with Session(engine) as session:
... with transaction(session) as tx:
... # All operations succeed together or all are rolled back
... user = write_row(User(name="Charlie"), tx)
... profile = write_row(Profile(user_id=user.id), tx)
... update_row(user.id, {"active": True}, User, tx)
Notes: - The session is committed only if no exceptions occur - Any exception triggers an automatic rollback before re-raising - The session remains usable after the context exits - Nested transactions are not supported; use savepoints instead - The session must not be in an existing transaction when entering
95@asynccontextmanager 96async def a_transaction( 97 session: AsyncSession, 98) -> AsyncGenerator[AsyncSession, None]: 99 """Context manager for asynchronous database transactions. 100 101 Asynchronous version of the transaction() context manager. Provides 102 the same automatic commit/rollback behavior for async database 103 operations. 104 105 Args: 106 session: An active AsyncSession instance from 107 sqlmodel.ext.asyncio.session 108 109 Yields: 110 AsyncSession: The same session instance for use within the 111 async context 112 113 Raises: 114 TransactionError: Wraps any exception that occurs during 115 the transaction, with the original exception available 116 via __cause__ 117 118 Example: 119 Basic async usage with automatic commit: 120 121 >>> from sqlmodel.ext.asyncio.session import AsyncSession 122 >>> from sqlalchemy.ext.asyncio import create_async_engine 123 >>> from sqlmodel_crud_utils import a_write_row, a_update_row 124 >>> from sqlmodel_crud_utils.transactions import a_transaction 125 >>> 126 >>> engine = create_async_engine("sqlite+aiosqlite:///database.db") 127 >>> async with AsyncSession(engine) as session: 128 ... async with a_transaction(session) as tx: 129 ... user = await a_write_row( 130 ... User(name="Alice"), tx 131 ... ) 132 ... await a_update_row( 133 ... user.id, {"email": "alice@example.com"}, 134 ... User, tx 135 ... ) 136 ... # Automatically commits here if no exceptions 137 138 Error handling with automatic rollback: 139 140 >>> from sqlmodel.ext.asyncio.session import AsyncSession 141 >>> from sqlmodel_crud_utils.transactions import a_transaction 142 >>> from sqlmodel_crud_utils.exceptions import TransactionError 143 >>> 144 >>> async with AsyncSession(engine) as session: 145 ... try: 146 ... async with a_transaction(session) as tx: 147 ... user = await a_write_row(User(name="Bob"), tx) 148 ... # Some async operation that fails 149 ... raise ValueError("Invalid email format") 150 ... except TransactionError as e: 151 ... print(f"Transaction failed: {e}") 152 ... print(f"Original error: {e.__cause__}") 153 ... # Database automatically rolled back 154 155 Multiple async operations in a single transaction: 156 157 >>> async with AsyncSession(engine) as session: 158 ... async with a_transaction(session) as tx: 159 ... # All operations succeed together or all are rolled back 160 ... user = await a_write_row(User(name="Charlie"), tx) 161 ... profile = await a_write_row(Profile(user_id=user.id), tx) 162 ... await a_update_row(user.id, {"active": True}, User, tx) 163 164 Using with FastAPI dependency injection: 165 166 >>> from fastapi import Depends, FastAPI 167 >>> from sqlmodel.ext.asyncio.session import AsyncSession 168 >>> from sqlmodel_crud_utils.transactions import a_transaction 169 >>> 170 >>> app = FastAPI() 171 >>> 172 >>> async def get_session(): 173 ... async with AsyncSession(engine) as session: 174 ... yield session 175 >>> 176 >>> @app.post("/users") 177 >>> async def create_user( 178 ... user_data: dict, 179 ... session: AsyncSession = Depends(get_session) 180 ... ): 181 ... async with a_transaction(session) as tx: 182 ... user = await a_write_row(User(**user_data), tx) 183 ... await a_write_row(AuditLog(action="user_created"), tx) 184 ... return user 185 186 Notes: 187 - The session is committed only if no exceptions occur 188 - Any exception triggers an automatic rollback before re-raising 189 - The session remains usable after the context exits 190 - Nested transactions are not supported; use savepoints instead 191 - The session must not be in an existing transaction when entering 192 - Works seamlessly with asyncio, FastAPI, and other async frameworks 193 """ 194 try: 195 yield session 196 await session.commit() 197 except Exception as e: 198 await session.rollback() 199 raise TransactionError(f"Transaction failed: {e}") from e
Context manager for asynchronous database transactions.
Asynchronous version of the transaction() context manager. Provides the same automatic commit/rollback behavior for async database operations.
Args: session: An active AsyncSession instance from sqlmodel.ext.asyncio.session
Yields: AsyncSession: The same session instance for use within the async context
Raises: TransactionError: Wraps any exception that occurs during the transaction, with the original exception available via __cause__
Example: Basic async usage with automatic commit:
>>> from sqlmodel.ext.asyncio.session import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlmodel_crud_utils import a_write_row, a_update_row
>>> from sqlmodel_crud_utils.transactions import a_transaction
>>>
>>> engine = create_async_engine("sqlite+aiosqlite:///database.db")
>>> async with AsyncSession(engine) as session:
... async with a_transaction(session) as tx:
... user = await a_write_row(
... User(name="Alice"), tx
... )
... await a_update_row(
... user.id, {"email": "alice@example.com"},
... User, tx
... )
... # Automatically commits here if no exceptions
Error handling with automatic rollback:
>>> from sqlmodel.ext.asyncio.session import AsyncSession
>>> from sqlmodel_crud_utils.transactions import a_transaction
>>> from sqlmodel_crud_utils.exceptions import TransactionError
>>>
>>> async with AsyncSession(engine) as session:
... try:
... async with a_transaction(session) as tx:
... user = await a_write_row(User(name="Bob"), tx)
... # Some async operation that fails
... raise ValueError("Invalid email format")
... except TransactionError as e:
... print(f"Transaction failed: {e}")
... print(f"Original error: {e.__cause__}")
... # Database automatically rolled back
Multiple async operations in a single transaction:
>>> async with AsyncSession(engine) as session:
... async with a_transaction(session) as tx:
... # All operations succeed together or all are rolled back
... user = await a_write_row(User(name="Charlie"), tx)
... profile = await a_write_row(Profile(user_id=user.id), tx)
... await a_update_row(user.id, {"active": True}, User, tx)
Using with FastAPI dependency injection:
>>> from fastapi import Depends, FastAPI
>>> from sqlmodel.ext.asyncio.session import AsyncSession
>>> from sqlmodel_crud_utils.transactions import a_transaction
>>>
>>> app = FastAPI()
>>>
>>> async def get_session():
... async with AsyncSession(engine) as session:
... yield session
>>>
>>> @app.post("/users")
>>> async def create_user(
... user_data: dict,
... session: AsyncSession = Depends(get_session)
... ):
... async with a_transaction(session) as tx:
... user = await a_write_row(User(**user_data), tx)
... await a_write_row(AuditLog(action="user_created"), tx)
... return user
Notes: - The session is committed only if no exceptions occur - Any exception triggers an automatic rollback before re-raising - The session remains usable after the context exits - Nested transactions are not supported; use savepoints instead - The session must not be in an existing transaction when entering - Works seamlessly with asyncio, FastAPI, and other async frameworks
19class AuditMixin: 20 """Mixin for automatic audit trail tracking. 21 22 Adds timestamp fields to track when records are created and updated, 23 along with optional user tracking fields. 24 25 Attributes: 26 created_at: Timestamp when the record was created (auto-set) 27 updated_at: Timestamp when the record was last updated 28 (auto-set on update) 29 created_by: Optional username/ID of creator 30 updated_by: Optional username/ID of last updater 31 32 Example: 33 Basic usage: 34 35 >>> from sqlmodel import SQLModel, Field 36 >>> from sqlmodel_crud_utils.mixins import AuditMixin 37 >>> 38 >>> class User(SQLModel, AuditMixin, table=True): 39 ... id: Optional[int] = Field(default=None, primary_key=True) 40 ... name: str 41 ... email: str 42 >>> 43 >>> # When you create a User, created_at is automatically set 44 >>> user = User(name="Alice", email="alice@example.com") 45 >>> # user.created_at will be set to current UTC time 46 47 With user tracking: 48 49 >>> user = User( 50 ... name="Bob", 51 ... email="bob@example.com", 52 ... created_by="admin" 53 ... ) 54 >>> # Later when updating 55 >>> user.name = "Robert" 56 >>> user.updated_by = "admin" 57 >>> # updated_at will be automatically set on commit 58 59 Note: 60 The `updated_at` field uses SQLAlchemy's `onupdate` parameter to 61 automatically update the timestamp when the record is modified. 62 """ 63 64 created_at: datetime = Field( 65 default_factory=_utc_now, 66 nullable=False, 67 index=True, 68 description="Timestamp when the record was created", 69 ) 70 updated_at: Optional[datetime] = Field( 71 default=None, 72 sa_column_kwargs={"onupdate": _utc_now}, 73 description="Timestamp when the record was last updated", 74 ) 75 created_by: Optional[str] = Field( 76 default=None, 77 max_length=100, 78 description="Username or ID of the user who created this record", 79 ) 80 updated_by: Optional[str] = Field( 81 default=None, 82 max_length=100, 83 description="Username or ID of the user who last updated this record", 84 )
Mixin for automatic audit trail tracking.
Adds timestamp fields to track when records are created and updated, along with optional user tracking fields.
Attributes: created_at: Timestamp when the record was created (auto-set) updated_at: Timestamp when the record was last updated (auto-set on update) created_by: Optional username/ID of creator updated_by: Optional username/ID of last updater
Example: Basic usage:
>>> from sqlmodel import SQLModel, Field
>>> from sqlmodel_crud_utils.mixins import AuditMixin
>>>
>>> class User(SQLModel, AuditMixin, table=True):
... id: Optional[int] = Field(default=None, primary_key=True)
... name: str
... email: str
>>>
>>> # When you create a User, created_at is automatically set
>>> user = User(name="Alice", email="alice@example.com")
>>> # user.created_at will be set to current UTC time
With user tracking:
>>> user = User(
... name="Bob",
... email="bob@example.com",
... created_by="admin"
... )
>>> # Later when updating
>>> user.name = "Robert"
>>> user.updated_by = "admin"
>>> # updated_at will be automatically set on commit
Note:
The updated_at field uses SQLAlchemy's onupdate parameter to
automatically update the timestamp when the record is modified.
87class SoftDeleteMixin: 88 """Mixin for soft delete functionality. 89 90 Adds fields and methods to support soft deletion, where records are marked 91 as deleted but not actually removed from the database. This allows for 92 recovery and maintains referential integrity. 93 94 Attributes: 95 deleted_at: Timestamp when the record was soft-deleted 96 deleted_by: Optional username/ID of user who deleted the record 97 is_deleted: Boolean flag indicating if record is deleted 98 99 Methods: 100 soft_delete: Mark the record as deleted 101 restore: Restore a soft-deleted record 102 103 Example: 104 Basic usage: 105 106 >>> from sqlmodel import SQLModel, Field 107 >>> from sqlmodel_crud_utils.mixins import SoftDeleteMixin 108 >>> 109 >>> class Product(SQLModel, SoftDeleteMixin, table=True): 110 ... id: Optional[int] = Field(default=None, primary_key=True) 111 ... name: str 112 ... price: float 113 >>> 114 >>> product = Product(name="Widget", price=9.99) 115 >>> # Later, soft delete it 116 >>> product.soft_delete(user="admin") 117 >>> # product.is_deleted is now True 118 >>> # product.deleted_at is set to current time 119 >>> # product.deleted_by is "admin" 120 121 Restoring a deleted record: 122 123 >>> product.restore() 124 >>> # product.is_deleted is now False 125 >>> # product.deleted_at is None 126 >>> # product.deleted_by is None 127 128 Combining with AuditMixin: 129 130 >>> class Order(SQLModel, AuditMixin, SoftDeleteMixin, table=True): 131 ... id: Optional[int] = Field(default=None, primary_key=True) 132 ... total: float 133 >>> 134 >>> # Now Order has both audit trail and soft delete support 135 136 Note: 137 When querying, you'll typically want to filter out soft-deleted records: 138 `select(Product).where(Product.is_deleted == False)` 139 140 The `get_rows` function in this library can be extended to automatically 141 exclude soft-deleted records by default. 142 """ 143 144 deleted_at: Optional[datetime] = Field( 145 default=None, 146 index=True, 147 description="Timestamp when the record was soft-deleted", 148 ) 149 deleted_by: Optional[str] = Field( 150 default=None, 151 max_length=100, 152 description="Username or ID of the user who deleted this record", 153 ) 154 is_deleted: bool = Field( 155 default=False, 156 index=True, 157 description="Flag indicating if the record is soft-deleted", 158 ) 159 160 def soft_delete(self, user: Optional[str] = None) -> None: 161 """Mark this record as deleted. 162 163 Sets the is_deleted flag to True, records the deletion 164 timestamp, and optionally tracks which user performed the 165 deletion. 166 167 Args: 168 user: Optional username or user ID of the person 169 performing the deletion 170 171 Example: 172 >>> product.soft_delete(user="admin") 173 >>> assert product.is_deleted is True 174 >>> assert product.deleted_at is not None 175 >>> assert product.deleted_by == "admin" 176 177 Note: 178 This method only modifies the object in memory. You must still 179 commit the session to persist the changes to the database. 180 """ 181 self.is_deleted = True 182 self.deleted_at = _utc_now() 183 self.deleted_by = user 184 185 def restore(self) -> None: 186 """Restore a soft-deleted record. 187 188 Clears the deletion flag and resets all deletion-related fields 189 to their default values. 190 191 Example: 192 >>> product.restore() 193 >>> assert product.is_deleted is False 194 >>> assert product.deleted_at is None 195 >>> assert product.deleted_by is None 196 197 Note: 198 This method only modifies the object in memory. You must still 199 commit the session to persist the changes to the database. 200 """ 201 self.is_deleted = False 202 self.deleted_at = None 203 self.deleted_by = None
Mixin for soft delete functionality.
Adds fields and methods to support soft deletion, where records are marked as deleted but not actually removed from the database. This allows for recovery and maintains referential integrity.
Attributes: deleted_at: Timestamp when the record was soft-deleted deleted_by: Optional username/ID of user who deleted the record is_deleted: Boolean flag indicating if record is deleted
Methods: soft_delete: Mark the record as deleted restore: Restore a soft-deleted record
Example: Basic usage:
>>> from sqlmodel import SQLModel, Field
>>> from sqlmodel_crud_utils.mixins import SoftDeleteMixin
>>>
>>> class Product(SQLModel, SoftDeleteMixin, table=True):
... id: Optional[int] = Field(default=None, primary_key=True)
... name: str
... price: float
>>>
>>> product = Product(name="Widget", price=9.99)
>>> # Later, soft delete it
>>> product.soft_delete(user="admin")
>>> # product.is_deleted is now True
>>> # product.deleted_at is set to current time
>>> # product.deleted_by is "admin"
Restoring a deleted record:
>>> product.restore()
>>> # product.is_deleted is now False
>>> # product.deleted_at is None
>>> # product.deleted_by is None
Combining with AuditMixin:
>>> class Order(SQLModel, AuditMixin, SoftDeleteMixin, table=True):
... id: Optional[int] = Field(default=None, primary_key=True)
... total: float
>>>
>>> # Now Order has both audit trail and soft delete support
Note:
When querying, you'll typically want to filter out soft-deleted records:
select(Product).where(Product.is_deleted == False)
The `get_rows` function in this library can be extended to automatically
exclude soft-deleted records by default.
160 def soft_delete(self, user: Optional[str] = None) -> None: 161 """Mark this record as deleted. 162 163 Sets the is_deleted flag to True, records the deletion 164 timestamp, and optionally tracks which user performed the 165 deletion. 166 167 Args: 168 user: Optional username or user ID of the person 169 performing the deletion 170 171 Example: 172 >>> product.soft_delete(user="admin") 173 >>> assert product.is_deleted is True 174 >>> assert product.deleted_at is not None 175 >>> assert product.deleted_by == "admin" 176 177 Note: 178 This method only modifies the object in memory. You must still 179 commit the session to persist the changes to the database. 180 """ 181 self.is_deleted = True 182 self.deleted_at = _utc_now() 183 self.deleted_by = user
Mark this record as deleted.
Sets the is_deleted flag to True, records the deletion timestamp, and optionally tracks which user performed the deletion.
Args: user: Optional username or user ID of the person performing the deletion
Example:
product.soft_delete(user="admin") assert product.is_deleted is True assert product.deleted_at is not None assert product.deleted_by == "admin"
Note: This method only modifies the object in memory. You must still commit the session to persist the changes to the database.
185 def restore(self) -> None: 186 """Restore a soft-deleted record. 187 188 Clears the deletion flag and resets all deletion-related fields 189 to their default values. 190 191 Example: 192 >>> product.restore() 193 >>> assert product.is_deleted is False 194 >>> assert product.deleted_at is None 195 >>> assert product.deleted_by is None 196 197 Note: 198 This method only modifies the object in memory. You must still 199 commit the session to persist the changes to the database. 200 """ 201 self.is_deleted = False 202 self.deleted_at = None 203 self.deleted_by = None
Restore a soft-deleted record.
Clears the deletion flag and resets all deletion-related fields to their default values.
Example:
product.restore() assert product.is_deleted is False assert product.deleted_at is None assert product.deleted_by is None
Note: This method only modifies the object in memory. You must still commit the session to persist the changes to the database.