Common Use Cases

← Back to Home

Real-world integration patterns

Web API

FastAPI file-upload endpoint
from fastapi import FastAPI, UploadFile, HTTPException from io import BytesIO from TextSpitter import TextSpitter app = FastAPI() ALLOWED = {".pdf", ".docx", ".txt", ".csv"} @app.post("/extract") async def extract_text(file: UploadFile) -> dict: ext = "." + file.filename.rsplit(".", 1)[-1].lower() if ext not in ALLOWED: raise HTTPException(400, f"Unsupported type: {ext}") data = await file.read() text = TextSpitter(file_obj=BytesIO(data), filename=file.filename) return {"filename": file.filename, "chars": len(text), "text": text}
Django / DRF
from io import BytesIO from rest_framework.decorators import api_view from rest_framework.response import Response from TextSpitter import TextSpitter @api_view(["POST"]) def extract(request): f = request.FILES.get("file") if not f: return Response({"error": "No file"}, status=400) text = TextSpitter(file_obj=BytesIO(f.read()), filename=f.name) return Response({"text": text})

Cloud storage (AWS S3)

Extract directly from S3
import boto3 from io import BytesIO from TextSpitter import TextSpitter s3 = boto3.client("s3") def extract_from_s3(bucket: str, key: str) -> str: obj = s3.get_object(Bucket=bucket, Key=key) data = obj["Body"].read() return TextSpitter(file_obj=BytesIO(data), filename=key.split("/")[-1])
Batch-process an S3 prefix
import boto3 from io import BytesIO from TextSpitter import TextSpitter s3 = boto3.client("s3") def extract_prefix(bucket: str, prefix: str) -> dict: results = {} pager = s3.get_paginator("list_objects_v2") for page in pager.paginate(Bucket=bucket, Prefix=prefix): for obj in page.get("Contents", []): key = obj["Key"] data = s3.get_object(Bucket=bucket, Key=key)["Body"].read() try: results[key] = TextSpitter( file_obj=BytesIO(data), filename=key.rsplit("/", 1)[-1] ) except Exception as exc: results[key] = f"[ERROR: {exc}]" return results

LLM / RAG pipelines

LangChain document loader
from langchain.schema import Document from TextSpitter import TextSpitter def load_documents(paths: list) -> list: return [ Document(page_content=TextSpitter(filename=p), metadata={"source": p}) for p in paths if TextSpitter(filename=p) ]
OpenAI embedding pipeline
import openai from TextSpitter import TextSpitter def embed_file(path: str) -> list: text = TextSpitter(filename=path)[:8000] # respect token limit resp = openai.embeddings.create(input=text, model="text-embedding-3-small") return resp.data[0].embedding

Batch processing

Directory tree
from pathlib import Path from TextSpitter import TextSpitter def extract_all(root: str, exts: set = None) -> dict: exts = exts or {".pdf", ".docx", ".txt", ".csv"} out = {} for p in Path(root).rglob("*"): if p.suffix.lower() in exts: try: out[str(p)] = TextSpitter(filename=str(p)) except Exception as e: out[str(p)] = f"[ERROR: {e}]" return out
Parallel with ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed from TextSpitter import TextSpitter def extract_parallel(paths: list, workers: int = 8) -> dict: def _one(p): try: return p, TextSpitter(filename=p) except Exception as e: return p, f"[ERROR: {e}]" with ThreadPoolExecutor(max_workers=workers) as pool: return dict(f.result() for f in as_completed(pool.submit(_one, p) for p in paths))

Logging

Enable loguru
pip install "textspitter[logging]"
from loguru import logger logger.add("textspitter.log", rotation="10 MB", level="WARNING") from TextSpitter import TextSpitter text = TextSpitter(filename="document.pdf")
Stdlib logging (no loguru)
import logging logging.getLogger("textspitter").setLevel(logging.DEBUG) from TextSpitter import TextSpitter text = TextSpitter(filename="document.pdf")