Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

8. Events & Background Jobs

Part 7 added a custom action for placing orders. This part defers the post-order work — sending a receipt email — to a background job so that a slow or failing mailer never blocks the purchase response or rolls back the order transaction.

Why a background worker

Running side-effects inside the request transaction creates two problems:

  • If the email fails, the whole order rolls back — a transient mail outage breaks the purchase flow entirely.
  • If the order rolls back after a successful email, the email can’t be unsent.

Decoupling fixes both: the transaction writes a small job description; a worker outside the transaction carries it out. If the worker crashes mid-task the job is retried automatically.

Wiring up the job queue

The jobs/ package family provides a durable queue with retries and REST-based status polling. For the bookstore we use jobs/sql, which enqueues inside the same database transaction as the business write.

Install the queue alongside the server setup in main.go:

import (
    "github.com/xaleel/maniflex"
    jobsmaniflex "github.com/xaleel/maniflex/jobs/maniflex"
    "github.com/xaleel/maniflex/jobs"
    jobssql "github.com/xaleel/maniflex/jobs/sql"
)

server := maniflex.New(maniflex.Config{ /* ... */ })
server.MustRegister(Order{}, User{} /* ... */)

db, _ := sqlite.Open(":memory:", server.Registry())
server.SetDB(db)

// Create the SQL-backed queue (uses the same DB as the server).
queue := jobssql.New(db)
jobssql.Migrate(ctx, db)

// Mount registers the StatusModel and returns a wrapped queue + sink.
// After this, GET /api/job_statuses/:id is available automatically.
sink, queue, err := jobsmaniflex.Mount(server, queue)
if err != nil { log.Fatal(err) }

// Wire up the worker.
w, _ := jobs.NewWorker(jobs.WorkerConfig{
    Source:  queue.(jobs.Source),
    Status:  sink,
    Handlers: map[string]jobs.Handler{
        "send_receipt": sendReceiptHandler(mailer),
    },
})

go w.Run(ctx)
log.Fatal(server.Start())

Enqueueing from the order action

Modify the order placement action from Part 7 to enqueue a job instead of calling the mailer directly:

server.Action(maniflex.ActionConfig{
    Method: "POST",
    Path:   "/orders",
    Handler: func(ctx *maniflex.ActionContext) error {
        // ... validate, insert order, etc. ...

        jobID, err := queue.Enqueue(ctx.Ctx, jobs.Job{
            Type:     "send_receipt",
            ActorID:  ctx.Auth.UserID,
            Payload:  mustJSON(map[string]any{"order_id": orderID}),
        })
        if err != nil {
            return err
        }

        ctx.Response = &maniflex.APIResponse{
            StatusCode: http.StatusAccepted,
            Data: map[string]any{
                "order_id": orderID,
                "job_id":   jobID,     // clients can poll /api/job_statuses/:job_id
            },
        }
        return nil
    },
})

The wrapped queue creates an enqueued status row before returning, so the client can poll immediately — no race between enqueue and the first GET.

The handler

func sendReceiptHandler(mailer Mailer) jobs.Handler {
    return func(ctx context.Context, j jobs.Job) (jobs.Result, error) {
        var p struct {
            OrderID string `json:"order_id"`
        }
        if err := json.Unmarshal(j.Payload, &p); err != nil {
            return jobs.Result{}, err
        }
        return jobs.Result{}, mailer.SendReceipt(ctx, p.OrderID)
    }
}

Handlers return (jobs.Result, error). On error the worker retries with exponential backoff (default up to 3 attempts). After all retries the job is marked dead and the status row records the final error message.

Polling for completion

The client receives job_id in the response and polls until done:

POST /api/orders
← 202 {"data": {"order_id": "xyz", "job_id": "01JABC..."}}

GET /api/job_statuses/01JABC...
← 200 {"data": {"status": "enqueued", ...}}

GET /api/job_statuses/01JABC...       (retry after a tick)
← 200 {"data": {"status": "succeeded", "completed_at": "2025-01-15T09:01:02Z"}}

No extra endpoint or custom table — the StatusModel is wired up automatically by Mount.

Emitting events from the pipeline

For lighter-weight fan-out — “notify other services every time an Order is created” — the service.Emit middleware is a simpler fit than the job queue:

import (
    "github.com/xaleel/maniflex/events/redis"
    "github.com/xaleel/maniflex/middleware/service"
)

bus := redis.New(redisClient)
server.Pipeline.DB.Register(
    service.Emit(bus),
    maniflex.ForModel("Order"),
    maniflex.AtPosition(maniflex.After),
)

Emit publishes order.created (and order.updated, order.deleted) to the bus on the DB-After step — only when the write succeeded. Subscribers in the same or other processes consume events independently. For WebSocket fan-out to connected clients, wire a realtime.Hub to the bus (see Realtime / WebSockets).

Webhooks

service.Webhook delivers events to external URLs with an HMAC signature — useful for one-off partner integrations:

server.Pipeline.DB.Register(
    service.Webhook(service.WebhookConfig{
        URL:    "https://partner.example.com/orders",
        Secret: os.Getenv("WEBHOOK_SECRET"),
    }),
    maniflex.ForModel("Order"),
    maniflex.AtPosition(maniflex.After),
)

What we built

CapabilityHow
Decoupled post-order emailjobs.Queue — enqueue in action, process in worker
Status pollingjobs/maniflex.MountGET /api/job_statuses/:id
Automatic retriesjobs.WorkerConfig.MaxRetry + exponential backoff
Transactional enqueuejobs/sql inserts the job row in the same DB transaction
Domain event fan-outservice.Emit on DB-After → event bus subscribers
External webhook deliveryservice.Webhook on DB-After

Next

In Part 9 — Testing the API we test the whole app end to end, including the job worker and the polling flow.