← Back to Home
Real-world integration patterns
Web API
FastAPI file-upload endpoint
from fastapi import FastAPI, UploadFile, HTTPException
from io import BytesIO
from TextSpitter import TextSpitter
app = FastAPI()
ALLOWED = {".pdf", ".docx", ".txt", ".csv"}
@app.post("/extract")
async def extract_text(file: UploadFile) -> dict:
ext = "." + file.filename.rsplit(".", 1)[-1].lower()
if ext not in ALLOWED:
raise HTTPException(400, f"Unsupported type: {ext}")
data = await file.read()
text = TextSpitter(file_obj=BytesIO(data), filename=file.filename)
return {"filename": file.filename, "chars": len(text), "text": text}Django / DRF
from io import BytesIO
from rest_framework.decorators import api_view
from rest_framework.response import Response
from TextSpitter import TextSpitter
@api_view(["POST"])
def extract(request):
f = request.FILES.get("file")
if not f:
return Response({"error": "No file"}, status=400)
text = TextSpitter(file_obj=BytesIO(f.read()), filename=f.name)
return Response({"text": text})Cloud storage (AWS S3)
Extract directly from S3
import boto3
from io import BytesIO
from TextSpitter import TextSpitter
s3 = boto3.client("s3")
def extract_from_s3(bucket: str, key: str) -> str:
obj = s3.get_object(Bucket=bucket, Key=key)
data = obj["Body"].read()
return TextSpitter(file_obj=BytesIO(data), filename=key.split("/")[-1])Batch-process an S3 prefix
import boto3
from io import BytesIO
from TextSpitter import TextSpitter
s3 = boto3.client("s3")
def extract_prefix(bucket: str, prefix: str) -> dict:
results = {}
pager = s3.get_paginator("list_objects_v2")
for page in pager.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
data = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
try:
results[key] = TextSpitter(
file_obj=BytesIO(data), filename=key.rsplit("/", 1)[-1]
)
except Exception as exc:
results[key] = f"[ERROR: {exc}]"
return resultsLLM / RAG pipelines
LangChain document loader
from langchain.schema import Document
from TextSpitter import TextSpitter
def load_documents(paths: list) -> list:
return [
Document(page_content=TextSpitter(filename=p), metadata={"source": p})
for p in paths
if TextSpitter(filename=p)
]OpenAI embedding pipeline
import openai
from TextSpitter import TextSpitter
def embed_file(path: str) -> list:
text = TextSpitter(filename=path)[:8000] # respect token limit
resp = openai.embeddings.create(input=text, model="text-embedding-3-small")
return resp.data[0].embeddingBatch processing
Directory tree
from pathlib import Path
from TextSpitter import TextSpitter
def extract_all(root: str, exts: set = None) -> dict:
exts = exts or {".pdf", ".docx", ".txt", ".csv"}
out = {}
for p in Path(root).rglob("*"):
if p.suffix.lower() in exts:
try:
out[str(p)] = TextSpitter(filename=str(p))
except Exception as e:
out[str(p)] = f"[ERROR: {e}]"
return outParallel with ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from TextSpitter import TextSpitter
def extract_parallel(paths: list, workers: int = 8) -> dict:
def _one(p):
try:
return p, TextSpitter(filename=p)
except Exception as e:
return p, f"[ERROR: {e}]"
with ThreadPoolExecutor(max_workers=workers) as pool:
return dict(f.result() for f in as_completed(pool.submit(_one, p) for p in paths))Logging
Enable loguru
pip install "textspitter[logging]"from loguru import logger
logger.add("textspitter.log", rotation="10 MB", level="WARNING")
from TextSpitter import TextSpitter
text = TextSpitter(filename="document.pdf")Stdlib logging (no loguru)
import logging
logging.getLogger("textspitter").setLevel(logging.DEBUG)
from TextSpitter import TextSpitter
text = TextSpitter(filename="document.pdf")