Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

DB Middleware

The maniflex/middleware/db package wraps the DB step with row-level scoping, request budgeting, post-write hooks, and result caching.

Row-level scoping

ForceFilter

Injects a filter on every list, read, update, and delete, regardless of what the client requested. Used to enforce invariants the client cannot override:

import "github.com/xaleel/maniflex/middleware/db"

server.Pipeline.DB.Register(
    db.ForceFilter("org_id", func(ctx *maniflex.ServerContext) any {
        return ctx.Auth.Claims["org_id"]
    }),
)

Tenancy

A specialised ForceFilter for the common multi-tenant case. Reads the tenant id from ctx.Auth and pins every query to it:

server.Pipeline.DB.Register(
    db.Tenancy("org_id", func(ctx *maniflex.ServerContext) string {
        return ctx.Auth.Claims["org_id"].(string)
    }),
)

Tenancy also rewrites the org_id field on creates and updates so a tenant cannot place rows into another tenant’s bucket.

Request budgeting

Paginate

Caps the maximum ?limit= accepted on list responses. Per-model overrides are common for tables whose rows are expensive to render:

server.Pipeline.DB.Register(db.Paginate(50), maniflex.ForModel("AuditLog"))

RateLimit

A token-bucket rate limiter scoped by IP or by authenticated user:

server.Pipeline.DB.Register(
    db.RateLimit(db.RateLimitConfig{
        RequestsPerMinute: 10,
        Key: func(ctx *maniflex.ServerContext) string {
            if ctx.Auth != nil {
                return ctx.Auth.UserID
            }
            return ctx.Request.RemoteAddr
        },
    }),
    maniflex.ForModel("PasswordReset"),
)

Rejected requests receive 429 RATE_LIMITED.

Post-write hooks

These run at maniflex.After position so they only fire when the database write succeeded.

AuditLog

Writes one audit record per mutating operation to a configured sink:

server.Pipeline.DB.Register(
    db.AuditLog(mySink),
    maniflex.ForOperation(maniflex.OpCreate, maniflex.OpUpdate, maniflex.OpDelete),
    maniflex.AtPosition(maniflex.After),
)

mySink is anything implementing the audit interface — a logger, a database table, an external SIEM. The record carries the operation, model name, actor (from ctx.Auth), and a JSON diff of the affected row.

Invalidate

Invalidates cache keys when a row changes. The key list is computed per request:

server.Pipeline.DB.Register(
    db.Invalidate(redisCache, func(ctx *maniflex.ServerContext) []string {
        return []string{
            "posts:list",
            fmt.Sprintf("post:%s", ctx.ResourceID),
        }
    }),
    maniflex.ForModel("Post"),
    maniflex.AtPosition(maniflex.After),
)

CacheQuery

Memoises read results (OpRead, OpList) in a CacheStore — the read-side complement to Invalidate. On a cache hit it sets ctx.DBResult and the adapter read is skipped; the Response step renders the cached result. On a miss it runs the read and stores the result for TTL. Pair it with Invalidate on writes to evict stale entries.

cache := maniflex.NewMemoryCache() // or a Redis-backed CacheStore

server.Pipeline.DB.Register(
    db.CacheQuery(cache, db.CacheConfig{
        TTL:     5 * time.Minute,
        KeyFunc: func(ctx *maniflex.ServerContext) string {
            // Only cache the common, high-traffic query shapes. Requests that
            // filter on `name` or carry a ?q= search are typically long-tail,
            // ad-hoc lookups — caching them floods the store with low-value
            // entries that are rarely read back, so skip them by returning "".
            if q := ctx.Query; q != nil {
                if q.Search != "" {
                    return ""
                }
                for _, f := range q.Filters {
                    if f.Field == "name" {
                        return ""
                    }
                }
            }
            return "products:list:" + ctx.Request.URL.RawQuery
        },
    }),
    maniflex.ForModel("Product"),
    maniflex.ForOperation(maniflex.OpList, maniflex.OpRead),
)
server.Pipeline.DB.Register(
    db.Invalidate(cache, func(*maniflex.ServerContext) []string {
        return []string{"products:list:..."}
    }),
    maniflex.ForModel("Product"),
    maniflex.ForOperation(maniflex.OpCreate, maniflex.OpUpdate, maniflex.OpDelete),
    maniflex.AtPosition(maniflex.After),
)

KeyFunc must capture every input that changes the result (model, tenant, filters, sort, pagination, includes); returning "" skips the cache for that request. The value stored is ctx.DBResult, so a distributed CacheStore must round-trip a *maniflex.ListResult for lists — a store that decodes list entries into a bare map is treated as a miss rather than panicking. Avoid caching mfx:"encrypted" models, since the decrypted result would live in the cache.

Ordering

Row-level scopers (ForceFilter, Tenancy) must run Before the default DB step — the default position — so the filter is in place when the SELECT or UPDATE runs. Post-write hooks must run After, so they observe the result. The package’s defaults follow this; only change them if you know why.