Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Realtime / WebSockets

maniflex is a synchronous request/response framework, but the realtime package ships a first-class event hub that pushes domain events to browsers over WebSocket and Server-Sent Events. It is a pure consumer of the event bus: producers publish through events.Emit exactly as they would for any other subscriber, and the hub fans those events out to connected clients.

Nothing about realtime leaks into a CRUD-only app — the hub is mounted by your own code outside server.Handler(), so a blog that never imports realtime pays no websocket dependency, goroutine, or shutdown phase.

The shape of it

import (
    "github.com/xaleel/maniflex"
    "github.com/xaleel/maniflex/events"
    "github.com/xaleel/maniflex/events/inproc"
    "github.com/xaleel/maniflex/realtime"
)

bus := inproc.New() // or events/redis, events/nats, … for multi-replica

// Producer: every create/update/delete publishes a domain event.
server.Pipeline.DB.Register(
    events.Emit(bus, events.EmitConfig{Source: "billing"}),
    maniflex.ForOperation(maniflex.OpCreate, maniflex.OpUpdate, maniflex.OpDelete),
    maniflex.AtPosition(maniflex.After),
)

// Consumer: the hub fans those events out to clients.
hub, err := realtime.NewHub(realtime.HubConfig{Bus: bus})
if err != nil {
    log.Fatal(err)
}

r := chi.NewRouter()
r.Mount("/api", server.Handler())
r.Handle("/ws", hub.Handler())     // WebSocket upgrade
r.Handle("/sse", hub.SSEHandler()) // Server-Sent Events fallback
http.ListenAndServe(":8080", r)

Removing realtime is a one-line revert: drop the two r.Handle lines and the events.Emit registration.

Topics

Events are addressed by their CloudEvents type — a dotted string like invoice.created or queue.position_changed. Clients subscribe with glob patterns (the same matcher the event bus uses):

PatternMatches
invoice.*invoice.created, invoice.updated, …
*.createdany …​.created event
*every event

HubConfig.AllowPatterns is an optional whitelist of subscribable patterns; an empty list allows any. A client that asks for a forbidden pattern gets a FORBIDDEN_PATTERN error (WS) or a 403 (SSE).

WebSocket protocol

The client speaks a tiny JSON protocol over the socket:

client → server                              server → client
{"op":"subscribe","patterns":["invoice.*"]}  {"op":"ack","subId":"s_1"}
{"op":"unsubscribe","subId":"s_1"}           {"op":"event","subId":"s_1","data":<event>}
{"op":"ping"}                                {"op":"pong"}
                                             {"op":"error","code":"…","msg":"…"}

The data field is the full CloudEvents JSON document, so a browser can parse it with any CE SDK.

SSE protocol

SSE is push-only and subscribes via query parameters — ideal for corporate networks that break WebSockets:

GET /sse?subscribe=invoice.*&subscribe=queue.position_changed

Each event arrives as a standard data: frame whose body is the same CloudEvents JSON.

Authentication

Connections are authenticated once, on connect (never per message). Supply an Authenticator; the default AnonymousOnly{} accepts everyone.

hub, _ := realtime.NewHub(realtime.HubConfig{
    Bus: bus,
    Authenticator: realtime.BearerToken(func(tok string) (*realtime.Principal, error) {
        claims, err := verifyMyJWT(tok)
        if err != nil {
            return nil, err
        }
        return &realtime.Principal{UserID: claims.Sub, TenantID: claims.Tenant, Roles: claims.Roles}, nil
    }),
})

BearerToken pulls the token from the Authorization: Bearer … header, the ?access_token= query parameter (browsers can’t set headers on WebSocket()), or the Sec-WebSocket-Protocol: access_token.<token> subprotocol. Composite tries several authenticators in order.

Per-event authorisation

AllowPatterns controls which topics a client may subscribe to; Visibility controls which individual events it actually receives. The hook runs once per (event, client) pair and can also redact the payload:

HubConfig{
    Visibility: func(p *realtime.Principal, e events.Event) (bool, *events.Event) {
        if e.TenantID != p.TenantID {
            return false, nil // suppress cross-tenant events
        }
        return true, nil
    },
}

Return (true, &copy) to deliver a transformed event — the hub clones before mutation so each client sees its own view.

Heartbeat

Idle connections are kept alive automatically so L7 proxies (ALB, NGINX, with their typical 30–60s idle timeouts) don’t drop them:

  • WebSocket — the server sends a ping frame every PingInterval (default 30s); compliant clients answer with a pong.
  • SSE — the server emits a : keepalive comment on the same interval.

Resumable streams (lastEventId)

By default delivery is ephemeral: a client that disconnects misses whatever was published while it was away. Enable resume to give clients a replay buffer.

hub, _ := realtime.NewHub(realtime.HubConfig{
    Bus:          bus,
    ResumeBuffer: 1024, // retain the most recent 1024 events for replay
})

With resume enabled, every delivered event carries a cursor:

  • SSE — the cursor is the standard id: line. On reconnect the browser’s EventSource automatically sends Last-Event-ID, and the hub replays everything after it before resuming the live stream. (You can also pass ?lastEventId=<cursor> explicitly.)
  • WebSocket — events include a "cursor" field; resume by adding it to your subscribe message: {"op":"subscribe","patterns":["invoice.*"],"after":"<cursor>"}.

If the cursor is older than the retained buffer (or the hub restarted), the client receives a resync signal — event: resync on SSE, {"op":"resync"} on WebSocket — telling it to refetch current state instead of silently missing events. Across the reconnect seam delivery is at-least-once; because cursors are monotonic, clients drop anything at or below their last applied cursor.

ResumeBuffer installs an in-process ring buffer, so resume works when the client reconnects to the same replica (WebSocket affinity). For cross-replica resume, supply your own ResumeStore (e.g. backed by a Redis stream) via HubConfig.ResumeStore.

Schema-emitting events (AsyncAPI)

Just as /openapi.json lets clients codegen typed REST clients, the hub’s event catalogue can be published as an AsyncAPI 2.6 document so clients codegen typed event payloads. Declare it once:

server.RealtimeDoc(maniflex.AsyncAPIConfig{
    Title:   "Billing events",
    Servers: []maniflex.AsyncAPIServerConfig{
        {Name: "ws", URL: "ws://localhost:8080/ws", Protocol: "ws"},
    },
    // Derive invoice.created|updated|deleted channels from registered models:
    AutoModelEvents: true,
    // …and/or declare custom events with a Go struct payload:
    Events: []maniflex.EventDoc{
        {Type: "payment.received", Title: "Payment received", Payload: PaymentReceived{}},
    },
})

This mounts GET {PathPrefix}/asyncapi.json. The payload struct is reflected with the same json + mfx tags as models and actions (Actions). The endpoint is opt-in — apps that never call RealtimeDoc get no new route.

Backpressure & slow clients

Each connection has a bounded outbound queue (SendBuffer, default 64). If a client can’t keep up within SendTimeout (default 5s) it is kicked — a WebSocket close 1013 Try Again Later, or an SSE disconnect that triggers EventSource reconnection. Hub.Stats() exposes the live connection count and cumulative kick count for monitoring. A frame larger than MaxMessageSize (default 64 KiB) is rejected with close 1009.

Scaling out

The hub is single-process by design; cross-replica fan-out is the bus’s job:

  • inproc (single binary) — one hub, all clients local.
  • redis / nats / kafka — every replica subscribes to the bus, so an event published anywhere reaches local clients on every replica. Pair with a sticky load balancer so each client stays on one replica (WebSocket affinity).

The hub does not create a consumer group per connection — per-client filtering happens server-side, downstream of one shared bus subscription, so broker load doesn’t scale with connection count.

Graceful shutdown

Hub.Shutdown(ctx) stops accepting connections, sends a 1001 Going Away close to every client, drains in-flight writes until the deadline, then cancels the bus subscription. Call it alongside *http.Server.Shutdown from the same signal handler — the hub is mounted by your code, so it isn’t part of server.Shutdown.

HubConfig reference

FieldDefaultPurpose
Bus— (required)the events.Bus the hub consumes
AuthenticatorAnonymousOnly{}connection auth
Visibilityallow-allper-event authorisation / redaction
AllowPatternsallow-allsubscribable topic whitelist
ResumeStorenil (disabled)replay buffer for lastEventId resume
ResumeBuffer0 (disabled)shortcut: install an in-memory store of this size
PingInterval30sWS ping / SSE keepalive cadence
SendBuffer64per-client outbound queue depth
SendTimeout5sslow-client kick threshold
MaxMessageSize64 KiBinbound frame size limit
Originsallow-allallowed Origin values for the WS upgrade