Beanis: Stop Fighting Redis, Start Building with It

Picture this: You’re building a high-performance API. You need Redis for caching and fast queries, but you’re drowning in boilerplate code. Every simple operation requires 15-20 lines of manual serialization, type conversion, and key management. Your codebase is littered with json.dumps(), json.loads(), and fragile string manipulation.

There had to be a better way.

That’s why I built Beanis - a Redis ODM that brings the elegance of modern ORMs to Redis, without sacrificing the speed that makes Redis special.

The “Aha!” Moment

I was working on a real-time recommendation system that needed to query thousands of products per second. Redis was the obvious choice for speed, but the code was becoming unmaintainable:

# The old way - painful and error-prone
product_key = f"Product:{product_id}"
data = await redis.hgetall(product_key)

# Manual type conversion for EVERY field
product = {
    'name': data.get('name', ''),
    'price': float(data.get('price', 0)) if data.get('price') else 0.0,
    'stock': int(data.get('stock', 0)) if data.get('stock') else 0,
    'tags': json.loads(data.get('tags', '[]')),
    'metadata': json.loads(data.get('metadata', '{}')),
}

# And that's just reading ONE document!

I wanted to write code that looked like this instead:

# The Beanis way - clean and type-safe
product = await Product.get(product_id)

Spoiler: I made it happen. And it’s only 8% slower than raw Redis.


Why Build Another Redis Library?

I’ve spent years working with databases in AI/ML projects. I love MongoDB’s ODMs like Beanie - the clean API, Pydantic integration, and how they let you focus on business logic instead of CRUD boilerplate. But when you need Redis-level performance, you’re stuck with manual serialization and key management.

The existing Redis libraries weren’t cutting it:

  • Redis OM - Excellent, but requires RedisJSON/RediSearch modules (not always available)
  • Walrus - No async support, predates Pydantic v2
  • Raw redis-py - Fast but verbose, no type safety

I wanted something that combined:

  • ✅ Vanilla Redis compatibility (no modules required)
  • ✅ Pydantic v2 validation and type safety
  • ✅ Beanie-like clean API
  • ✅ Async-first design
  • ✅ Minimal performance overhead

When I couldn’t find it, I built it. Beanis is what I wish existed when I started working with Redis.


Real-World Example: Building a Product Catalog

Let’s build something real: a product catalog for an e-commerce platform. You need:

  • Fast lookups by ID
  • Range queries on price
  • Category filtering
  • Real-time stock updates
  • Audit trails

Traditional Redis Approach

With raw redis-py, you’d write something like this for a single product insert:

import json
import redis.asyncio as redis

async def create_product(redis_client, product_data):
    # Generate unique ID
    product_id = await redis_client.incr("product:id:counter")
    product_key = f"Product:{product_id}"

    # Manually serialize complex types
    redis_data = {
        'id': str(product_id),
        'name': product_data['name'],
        'price': str(product_data['price']),
        'category': product_data['category'],
        'stock': str(product_data['stock']),
        'tags': json.dumps(product_data.get('tags', [])),
        'metadata': json.dumps(product_data.get('metadata', {}))
    }

    # Save to hash
    await redis_client.hset(product_key, mapping=redis_data)

    # Manually maintain indexes for queries
    await redis_client.zadd(f"Product:idx:price", {product_key: product_data['price']})
    await redis_client.sadd(f"Product:idx:category:{product_data['category']}", product_key)
    await redis_client.sadd("Product:all", product_key)

    return product_id

# Query by price range - also manual
async def find_products_by_price(redis_client, min_price, max_price):
    keys = await redis_client.zrangebyscore(
        "Product:idx:price",
        min_price,
        max_price
    )

    products = []
    for key in keys:
        data = await redis_client.hgetall(key)
        # Manual deserialization for each product
        products.append({
            'id': data['id'],
            'name': data['name'],
            'price': float(data['price']),
            'stock': int(data['stock']),
            'tags': json.loads(data['tags']),
            'metadata': json.loads(data['metadata'])
        })

    return products

That’s over 50 lines for basic CRUD + one query. And we haven’t even added:

  • Input validation
  • Error handling
  • Type safety
  • Audit trails
  • Cascade deletes

The Beanis Approach

Here’s the same functionality with Beanis:

from beanis import Document, Indexed, init_beanis
from beanis.odm.actions import before_event, Insert, Update
from typing import Optional, Set
from datetime import datetime
from pydantic import Field
import redis.asyncio as redis

class Product(Document):
    name: str = Field(min_length=1, max_length=200)
    description: Optional[str] = None
    price: Indexed[float] = Field(gt=0)  # Auto-indexed, validated > 0
    category: Indexed[str]  # Auto-indexed
    stock: int = Field(ge=0)  # Validated >= 0
    tags: Set[str] = set()
    metadata: dict = {}

    # Audit fields - automatically managed
    created_at: datetime = Field(default_factory=datetime.now)
    updated_at: datetime = Field(default_factory=datetime.now)

    @before_event(Insert)
    async def on_create(self):
        self.created_at = datetime.now()

    @before_event(Update)
    async def on_update(self):
        self.updated_at = datetime.now()

    class Settings:
        key_prefix = "Product"

# Initialize once
client = redis.Redis(decode_responses=True)
await init_beanis(database=client, document_models=[Product])

# Create - with validation!
product = Product(
    name="MacBook Pro M3",
    price=2499.99,
    category="electronics",
    stock=50,
    tags={"laptop", "apple", "premium"},
    metadata={"warranty": "2 years", "color": "Space Gray"}
)
await product.insert()

# Query - indexes handled automatically
expensive = await Product.find(
    category="electronics",
    price__gte=1000,
    price__lte=3000
)

# Update - type-safe
await product.update(stock=45, price=2299.99)

# Complex queries
out_of_stock = await Product.find(stock=0)
premium_laptops = await Product.find(
    category="electronics",
    price__gte=2000
)

That’s about 30 lines - including validation, audit trails, and automatic indexing. A 70% reduction in code.


What Makes Beanis Different?

🎯 Full Pydantic v2 Integration

Beanis isn’t just wrapping Redis - it’s bringing Pydantic’s power to your data layer:

from pydantic import EmailStr, HttpUrl, validator
from decimal import Decimal

class User(Document):
    email: EmailStr  # Automatic email validation
    username: str = Field(min_length=3, max_length=20, pattern="^[a-zA-Z0-9_]+$")
    age: int = Field(ge=13, le=120)
    website: Optional[HttpUrl] = None
    balance: Decimal = Decimal("0.00")

    @validator('username')
    def username_alphanumeric(cls, v):
        if not v.isalnum():
            raise ValueError('Username must be alphanumeric')
        return v.lower()

# This will raise validation errors BEFORE hitting Redis
try:
    user = User(
        email="not-an-email",  # ❌ Invalid
        username="ab",  # ❌ Too short
        age=200  # ❌ Too old
    )
except ValidationError as e:
    print(e)

🚀 Smart Indexing that Just Works

No more manually maintaining sorted sets and managing index consistency:

class Article(Document):
    title: str
    views: Indexed[int]  # Sorted set automatically maintained
    published_at: Indexed[datetime]  # Time-based queries
    author: Indexed[str]  # Categorical filtering
    score: Indexed[float]  # Range queries

# All these queries use optimized indexes under the hood
trending = await Article.find(views__gte=10000)
recent = await Article.find(
    published_at__gte=datetime.now() - timedelta(days=7)
)
popular_by_author = await Article.find(
    author="john_doe",
    score__gte=4.5
)

Behind the scenes, Beanis:

  • Maintains Redis sorted sets for each indexed field
  • Automatically updates indexes on insert/update/delete
  • Optimizes queries by choosing the best index
  • Handles index cleanup when documents are deleted

🎨 Custom Encoders for Any Type

Working with complex types? Beanis has you covered:

import numpy as np
from PIL import Image
from beanis.odm.custom_encoders import register_custom_encoder, register_custom_decoder

# NumPy arrays
@register_custom_encoder(np.ndarray)
def encode_numpy(arr: np.ndarray) -> str:
    return arr.tobytes().hex()

@register_custom_decoder(np.ndarray)
def decode_numpy(data: str, dtype=np.float32) -> np.ndarray:
    return np.frombuffer(bytes.fromhex(data), dtype=dtype)

# PIL Images
@register_custom_encoder(Image.Image)
def encode_image(img: Image.Image) -> str:
    buffer = io.BytesIO()
    img.save(buffer, format='PNG')
    return base64.b64encode(buffer.getvalue()).decode()

class MLModel(Document):
    name: str
    weights: np.ndarray  # Seamlessly stored and retrieved
    bias: np.ndarray
    thumbnail: Image.Image

# It just works!
model = MLModel(
    name="sentiment-classifier",
    weights=np.random.rand(100, 50),
    bias=np.zeros(50)
)
await model.insert()

🌍 Geo-Spatial Queries Out of the Box

Building location-based features? We got you:

from beanis import GeoPoint

class Restaurant(Document):
    name: str
    cuisine: Indexed[str]
    location: GeoPoint  # Lat/lon with automatic geo-indexing
    rating: Indexed[float]

# Find restaurants
italian_nearby = await Restaurant.find_near(
    location=GeoPoint(lat=41.9028, lon=12.4964),  # Rome, Italy
    radius=2000,  # 2km
    category="italian",
    rating__gte=4.0
)

# Get distance to each result
for restaurant in italian_nearby:
    distance = restaurant.location.distance_to(
        GeoPoint(lat=41.9028, lon=12.4964)
    )
    print(f"{restaurant.name}: {distance:.2f}m away")

🔄 Lifecycle Hooks for Business Logic

Implement audit trails, cache invalidation, or notifications:

class Order(Document):
    user_id: str
    total: Decimal
    status: str = "pending"

    # Audit trail
    created_at: datetime
    updated_at: datetime
    status_history: list = []

    @before_event(Insert)
    async def set_timestamps(self):
        now = datetime.now()
        self.created_at = now
        self.updated_at = now

    @before_event(Update)
    async def track_changes(self):
        self.updated_at = datetime.now()
        # Track status changes
        if hasattr(self, '_original_status') and self.status != self._original_status:
            self.status_history.append({
                'from': self._original_status,
                'to': self.status,
                'at': datetime.now().isoformat()
            })

    @after_event(Update)
    async def notify_status_change(self):
        if self.status == "shipped":
            await send_notification(self.user_id, f"Order {self.id} shipped!")

    @after_event(Delete)
    async def cleanup(self):
        # Clean up related data
        await OrderItem.delete_many(order_id=self.id)

Performance: Fast Enough for Production

I benchmarked Beanis against raw redis-py with 10,000 operations:

Operation Raw Redis Beanis Overhead Why?
Insert 0.45ms 0.49ms +8% Pydantic validation
Get by ID 0.38ms 0.41ms +8% Type conversion
Range Query 0.52ms 0.56ms +7% Index optimization
Batch Insert (100) 42ms 47ms +12% Validation batching

The verdict: ~8% overhead for features you’d have to build anyway (validation, serialization, type safety).

When NOT to Use Beanis

Be honest about trade-offs:

Ultra-low latency requirements (< 1ms per operation) ❌ Simple key-value caching (use raw redis-py) ❌ You need RedisJSON/RediSearch modules (use Redis OM instead) ❌ Prototyping with unpredictable schema (use raw Redis first)

Building production APIs with complex data modelsNeed type safety and validationWorking with teams who value clean codeMigrating from MongoDB/Postgres but need Redis speed


Real-World Use Cases

E-Commerce Product Catalog

# 10,000+ products, 1000+ queries/second
products = await Product.find(
    category="electronics",
    price__gte=100,
    price__lte=500,
    stock__gt=0
)

Session Management

class Session(Document):
    user_id: str
    token: str
    expires_at: Indexed[datetime]

# Auto-cleanup expired sessions
await Session.delete_many(expires_at__lt=datetime.now())

Real-time Leaderboards

class Score(Document):
    player_id: Indexed[str]
    score: Indexed[int]
    achieved_at: datetime

# Top 10 globally
top_players = await Score.find(score__gte=1000).sort('-score').limit(10)

Migrating from Raw Redis: A Step-by-Step Guide

Already have a Redis codebase? Here’s how to migrate incrementally without breaking production.

Step 1: Identify Your Data Models

Look at your existing Redis keys and group them:

# Current Redis structure
# User:1 -> hash {name, email, age}
# User:2 -> hash {name, email, age}
# User:idx:email -> sorted set
# User:all -> set

# This becomes a Beanis document
class User(Document):
    name: str
    email: Indexed[str]
    age: int

    class Settings:
        key_prefix = "User"

Step 2: Add Validation Gradually

Start with basic types, add constraints later:

# Phase 1: Just types
class Product(Document):
    name: str
    price: float
    stock: int

# Phase 2: Add validation
class Product(Document):
    name: str = Field(min_length=1, max_length=200)
    price: float = Field(gt=0)  # Must be positive
    stock: int = Field(ge=0)  # Can't be negative

Step 3: Dual-Write During Migration

Run both systems in parallel:

async def create_product_safe(data):
    # Write to Beanis
    product = Product(**data)
    await product.insert()

    # Still write to old Redis (for rollback safety)
    await redis_client.hset(
        f"Product:{product.id}",
        mapping=legacy_serialize(data)
    )

    return product

# After 1-2 weeks of dual-write, stop reading from old keys
# After 1 month, stop dual-writing

Step 4: Verify Data Consistency

async def verify_migration():
    """Compare old vs new data"""
    old_keys = await redis_client.keys("Product:*")

    for key in old_keys:
        product_id = key.split(":")[1]

        # Get from both systems
        old_data = await redis_client.hgetall(key)
        new_product = await Product.get(product_id)

        # Compare
        assert old_data['name'] == new_product.name
        assert float(old_data['price']) == new_product.price
        # ... verify all fields

Advanced Patterns and Best Practices

Pattern 1: Caching with TTL

Beanis doesn’t have built-in TTL yet, but you can implement it:

class CachedResult(Document):
    query_hash: Indexed[str]
    result_data: dict
    created_at: datetime = Field(default_factory=datetime.now)

    class Settings:
        key_prefix = "Cache"

    async def is_expired(self, ttl_seconds: int = 300) -> bool:
        age = (datetime.now() - self.created_at).total_seconds()
        return age > ttl_seconds

# Usage
async def get_with_cache(query: str, ttl: int = 300):
    query_hash = hashlib.md5(query.encode()).hexdigest()

    # Check cache
    cached = await CachedResult.find_one(query_hash=query_hash)
    if cached and not await cached.is_expired(ttl):
        return cached.result_data

    # Compute and cache
    result = await expensive_operation(query)
    await CachedResult(
        query_hash=query_hash,
        result_data=result
    ).insert()

    return result

Pattern 2: Optimistic Locking

Prevent race conditions with version numbers:

class BankAccount(Document):
    account_number: str
    balance: Decimal
    version: int = 0

    async def withdraw(self, amount: Decimal):
        # Read current version
        original_version = self.version

        # Check balance
        if self.balance < amount:
            raise InsufficientFunds()

        # Update
        self.balance -= amount
        self.version += 1

        try:
            await self.save()
        except Exception:
            # In a real implementation, check if version changed
            # and retry or raise ConcurrentModificationError
            raise

# Better: Use Redis transactions
async def atomic_withdraw(account_id: str, amount: Decimal):
    async with redis_client.pipeline(transaction=True) as pipe:
        account = await BankAccount.get(account_id)
        if account.balance >= amount:
            account.balance -= amount
            await account.save()

Pattern 3: Batch Operations for Performance

Process thousands of records efficiently:

# ❌ Slow: One query per item
products = []
for product_id in product_ids:
    product = await Product.get(product_id)
    products.append(product)

# ✅ Fast: Batch fetch
products = await Product.find(
    id__in=product_ids
).to_list()

# ✅ Even faster: Pipeline for insertions
async def bulk_insert_products(product_data_list):
    products = [Product(**data) for data in product_data_list]

    # Validate all first (fails fast)
    for p in products:
        p.model_validate(p)

    # Bulk insert (uses Redis pipeline internally)
    await Product.insert_many(products)

Pattern 4: Computed Fields and Denormalization

Redis favors denormalization - embrace it:

class Order(Document):
    user_id: str
    items: list[dict]  # [{product_id, quantity, price}]

    # Denormalized fields for fast queries
    total_amount: Decimal
    item_count: int
    user_email: str  # Copied from User

    @classmethod
    async def create_order(cls, user: User, items: list):
        total = sum(item['price'] * item['quantity'] for item in items)

        order = cls(
            user_id=user.id,
            items=items,
            total_amount=total,
            item_count=len(items),
            user_email=user.email  # Denormalize for queries
        )
        await order.insert()
        return order

# Now you can query orders by email without joining
expensive_orders = await Order.find(
    user_email="vip@example.com",
    total_amount__gte=1000
)

Common Pitfalls and How to Avoid Them

Pitfall 1: Over-Indexing

Problem: Every indexed field creates a sorted set. Too many = memory bloat.

# ❌ Bad: 10 indexes = 10 sorted sets per document
class User(Document):
    name: Indexed[str]
    email: Indexed[str]
    age: Indexed[int]
    created_at: Indexed[datetime]
    last_login: Indexed[datetime]
    status: Indexed[str]
    role: Indexed[str]
    department: Indexed[str]
    manager_id: Indexed[str]
    salary: Indexed[Decimal]

# ✅ Good: Only index what you query
class User(Document):
    name: str
    email: Indexed[str]  # Frequent lookups
    age: int
    created_at: Indexed[datetime]  # Time-range queries
    last_login: datetime  # Don't need to query this
    status: Indexed[str]  # Filter by active/inactive
    role: str  # Can filter client-side
    department: str
    manager_id: str
    salary: Decimal  # Sensitive, don't index

Pitfall 2: Forgetting Async/Await

# ❌ This will fail silently or hang
user = User.get(user_id)  # Missing await!

# ✅ Always await
user = await User.get(user_id)

# ✅ Use async comprehensions
users = [await User.get(uid) for uid in user_ids]

# ✅ Even better: batch fetch
users = await User.find(id__in=user_ids).to_list()

Pitfall 3: N+1 Query Problem

# ❌ N+1 queries (slow!)
orders = await Order.find_all()
for order in orders:
    user = await User.get(order.user_id)  # N queries!
    print(f"{user.name}: ${order.total}")

# ✅ Denormalize (recommended for Redis)
class Order(Document):
    user_id: str
    user_name: str  # Denormalized
    total: Decimal

orders = await Order.find_all()
for order in orders:
    print(f"{order.user_name}: ${order.total}")  # No extra query!

# ✅ Or batch fetch users
orders = await Order.find_all()
user_ids = {order.user_id for order in orders}
users = {u.id: u for u in await User.find(id__in=user_ids)}

for order in orders:
    user = users[order.user_id]
    print(f"{user.name}: ${order.total}")

Performance Tuning Tips

1. Use Connection Pooling

import redis.asyncio as redis
from redis.asyncio.connection import ConnectionPool

# ✅ Reuse connections
pool = ConnectionPool.from_url(
    "redis://localhost",
    max_connections=50,
    decode_responses=True
)
client = redis.Redis(connection_pool=pool)

await init_beanis(database=client, document_models=[Product, User])

2. Batch Validation

# If inserting many documents, validate in bulk
products_data = [...]  # 1000 products

# ✅ Validate all first (parallel)
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    validated = list(executor.map(
        lambda d: Product(**d),
        products_data
    ))

# Then insert (uses pipeline automatically)
await Product.insert_many(validated)

3. Query Optimization

# ❌ Fetching everything then filtering in Python
all_products = await Product.find_all()
cheap = [p for p in all_products if p.price < 100]

# ✅ Filter in Redis
cheap = await Product.find(price__lt=100)

# ✅ Use projections (when implemented)
# cheap = await Product.find(price__lt=100).project(['name', 'price'])

Getting Started in 60 Seconds

pip install beanis
from beanis import Document, Indexed, init_beanis
import redis.asyncio as redis

# 1. Define your model
class User(Document):
    username: str
    email: Indexed[str]
    score: Indexed[int] = 0

# 2. Initialize
client = redis.Redis(decode_responses=True)
await init_beanis(database=client, document_models=[User])

# 3. Use it!
user = User(username="john", email="john@example.com")
await user.insert()

# Find users
top_users = await User.find(score__gte=100)

Full documentation: andreim14.github.io/beanis


What’s Next?

Beanis is production-ready today with:

  • ✅ 150+ tests passing
  • ✅ 56% code coverage
  • ✅ Full CI/CD pipeline
  • ✅ Comprehensive docs

Roadmap:

  • 🔄 Relationship support (OneToOne, OneToMany)
  • 📊 Aggregation pipeline
  • 🔐 Field-level encryption
  • ⚡ Connection pooling optimizations
  • 📈 Query analytics and slow query detection

Try It, Star It, Break It

I built Beanis to scratch my own itch, and now I’m sharing it with the world. If you:

  • Want cleaner Redis code
  • Value type safety
  • Need fast queries without the boilerplate

Give Beanis a try:

Found a bug? Have a feature request? Open an issue - I read and respond to everything.

Happy coding! 🚀


Beanis is inspired by Beanie by Roman Right. Standing on the shoulders of giants.

Built with ❤️ by Andrei Stefan Bejgu - AI Applied Scientist @ SylloTips