The Principal Dev – Masterclass for Tech Leads

The Principal Dev – Masterclass for Tech Leads28-29 May

Join

GoEventBus

A high-performance, in-memory, lock-free event bus for Go — built on a cache-line-padded atomic ring buffer with fan-out dispatch, middleware, lifecycle hooks, and back-pressure policies.

Go Report Card

Table of Contents


Features


Installation

go get github.com/Protocol-Lattice/GoEventBus

Requires Go 1.21+.


Quick Start

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/Protocol-Lattice/GoEventBus"
)

type UserCreatedPayload struct{ ID string }
type HouseWasSold struct{}
type HouseSoldPayload struct {
    Address string
    Price   int
}

func main() {
    disp := GoEventBus.Dispatcher{}

    disp.Register("user_created", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
        p := ev.Data.(UserCreatedPayload)
        fmt.Println("User created:", p.ID)
        return GoEventBus.Result{Message: "ok"}, nil
    })

    disp.Register(HouseWasSold{}, func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
        p := ev.Data.(HouseSoldPayload)
        fmt.Printf("House sold at %s for $%d\n", p.Address, p.Price)
        return GoEventBus.Result{Message: "ok"}, nil
    })

    store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
    store.Async = true

    _ = store.Subscribe(context.Background(), GoEventBus.Event{
        ID:         "evt1",
        Projection: "user_created",
        Data:       UserCreatedPayload{ID: "u-42"},
    })
    _ = store.Subscribe(context.Background(), GoEventBus.Event{
        ID:         "evt2",
        Projection: HouseWasSold{},
        Data:       HouseSoldPayload{Address: "1 Main St", Price: 500_000},
    })

    store.Publish()

    if err := store.Drain(context.Background()); err != nil {
        log.Fatal(err)
    }

    published, processed, errors := store.Metrics()
    fmt.Printf("published=%d processed=%d errors=%d\n", published, processed, errors)
}

Key points:


Fan-out

Register accumulates handlers. Every handler registered for a projection is called in order for each dispatched event. Handlers are fully independent — an error in one does not prevent the others from running.

disp := GoEventBus.Dispatcher{}

// Register as many handlers as you need for the same projection.
disp.Register("order.placed",
    auditLogger,
    inventoryReducer,
    notificationSender,
)

// Register can also be called incrementally across different packages.
disp.Register("order.placed", analyticsTracker)

In async mode each handler invocation becomes its own work item, so the four handlers above may run in parallel across the worker pool.


Batch Handlers

RegisterBatch collects all events for a projection that accumulate between Publish calls and delivers them to the handler as a slice — one call per chunk of up to size events. This eliminates per-event overhead for bulk operations like database inserts or HTTP batch APIs.

store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)

store.RegisterBatch("metrics.recorded", 50, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
    rows := make([]MetricRow, len(evs))
    for i, ev := range evs {
        rows[i] = ev.Data.(MetricRow)
    }
    return nil, db.BulkInsert(ctx, rows)
})

Chunking

If more events arrive than the configured size, the handler is called multiple times — once per full chunk, then once for the remainder.

// 7 events, size=3 → called with [3 events], [3 events], [1 event]
store.RegisterBatch("tick", 3, handler)

Per-event results

Return a []Result aligned with the input slice to pass per-event results to OnAfter hooks. A nil or shorter slice is fine — missing positions are treated as zero Result values.

store.RegisterBatch("order.placed", 100, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
    results := make([]GoEventBus.Result, len(evs))
    for i, ev := range evs {
        results[i] = GoEventBus.Result{Message: "processed"}
    }
    return results, nil
})

Fan-out with batch handlers

Multiple batch handlers can be registered for the same projection. Each receives the full chunk independently.

store.RegisterBatch("order.placed", 50, writeToDatabase)
store.RegisterBatch("order.placed", 50, pushToAnalytics)

Regular per-event handlers and batch handlers can coexist on the same projection. Both fire on each Publish call.

disp.Register("order.placed", auditLogger)       // called once per event
store.RegisterBatch("order.placed", 50, bulkWriter) // called once per chunk

Error handling and the DLQ

A batch handler returns a single error for the whole chunk. On error, every event in the failing chunk is sent to the DLQ and OnError fires once per event. Chunks that succeeded in the same Publish cycle are unaffected.

store.DLQ = GoEventBus.NewDeadLetterQueue()

store.RegisterBatch("write", 25, func(ctx context.Context, evs []GoEventBus.Event) ([]GoEventBus.Result, error) {
    if err := db.BulkInsert(ctx, evs); err != nil {
        return nil, err // all 25 events land in the DLQ
    }
    return nil, nil
})

Panics

Panics in a batch handler are recovered and treated identically to a returned error — the chunk lands in the DLQ, OnError fires, and the worker pool continues running.

Lifecycle hooks

OnBefore, OnAfter, and OnError fire once per event even for batch handlers, keeping observability consistent regardless of handler type. Middleware is not applied to batch handlers (the signatures are incompatible); use hooks for cross-cutting concerns instead.

Async mode

Batch handlers participate in the async worker pool when store.Async = true. Each chunk is dispatched as one work item.

store.Async = true

store.RegisterBatch("events", 100, bulkWriter)

for i := 0; i < 300; i++ {
    _ = store.Subscribe(ctx, GoEventBus.Event{Projection: "events", Data: rows[i]})
}

store.Publish()
_ = store.Drain(context.Background()) // wait for all chunks to complete

Middleware

Middleware wraps the handler chain and is applied in the order it is registered. Use it for logging, tracing, timeout injection, or retries.

// Logging middleware
store.Use(func(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc {
    return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
        log.Printf("handling %v", ev.Projection)
        res, err := next(ctx, ev)
        log.Printf("done %v err=%v", ev.Projection, err)
        return res, err
    }
})

// Timeout middleware
store.Use(func(next GoEventBus.HandlerFunc) GoEventBus.HandlerFunc {
    return func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
        ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
        defer cancel()
        return next(ctx, ev)
    }
})

Each handler in a fan-out gets its own independent copy of the middleware chain.


Lifecycle Hooks

Hooks fire outside the middleware chain and are useful for metrics, structured logging, and alerting without polluting handler code. For batch handlers, hooks fire once per event in the chunk rather than once per chunk.

store.OnBefore(func(ctx context.Context, ev GoEventBus.Event) {
    // fires before each handler invocation
})

store.OnAfter(func(ctx context.Context, ev GoEventBus.Event, res GoEventBus.Result, err error) {
    // fires after each handler invocation, whether it succeeded or failed
})

store.OnError(func(ctx context.Context, ev GoEventBus.Event, err error) {
    // fires only when a handler returns a non-nil error
    log.Printf("handler error for %v: %v", ev.Projection, err)
})

Back-pressure Policies

Choose how Subscribe behaves when the ring buffer is full.

Policy Behaviour Best for
DropOldest Discards the oldest event to make room Low-latency pipelines where fresh data matters more than completeness
Block Blocks the caller (respecting its context) until space is available Workloads that cannot afford to lose events
ReturnError Returns ErrBufferFull immediately Callers that manage their own retry or back-pressure logic
store := GoEventBus.NewEventStore(&disp, 1<<14, GoEventBus.Block)

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

if err := store.Subscribe(ctx, ev); errors.Is(err, context.DeadlineExceeded) {
    // buffer stayed full for 50 ms — handle accordingly
}

Async Mode

Set store.Async = true before the first Publish call. GoEventBus starts a worker pool sized to runtime.NumCPU() and routes every handler invocation through it.

store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.Async = true

store.Publish() // dispatches handlers to the worker pool

// Wait for all in-flight handlers before shutting down.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := store.Drain(ctx); err != nil {
    log.Printf("drain timeout: %v", err)
}

Drain (or equivalently Close) must be called once when the store is no longer needed so worker goroutines are cleaned up.


Dead Letter Queue

Attach a DeadLetterQueue to an EventStore and every event whose handler returns an error or panics is captured there instead of being silently swallowed into the error counter. Panics are wrapped as errors with full errors.Is / errors.As support when the original panic value was itself an error.

store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)
store.DLQ = GoEventBus.NewDeadLetterQueue()

Inspecting failures

store.Publish()

for _, dl := range store.DLQ.Entries() {
    fmt.Printf("event=%s err=%v attempts=%d failed=%s\n",
        dl.Event.ID, dl.Err, dl.Attempts, dl.FailedAt.Format(time.RFC3339))
}

Entries() returns a snapshot copy — mutations to the slice do not affect the queue.

Draining

failed := store.DLQ.Drain() // empties the queue and returns all entries

Replaying

Replay re-subscribes all entries and calls Publish once after. Each entry has its Attempts counter incremented. Entries that fail to re-enqueue (e.g. buffer full) are kept in the queue and the first Subscribe error is returned.

if err := store.DLQ.Replay(ctx, store); err != nil {
    log.Printf("replay partially failed: %v", err)
}

A common pattern is to gate replays on Attempts to avoid infinite retry loops:

const maxAttempts = 3

for _, dl := range store.DLQ.Drain() {
    if dl.Attempts >= maxAttempts {
        log.Printf("dropping %s after %d attempts: %v", dl.Event.ID, dl.Attempts, dl.Err)
        continue
    }
    _ = store.Subscribe(ctx, dl.Event)
}
store.Publish()

Fan-out and the DLQ

Each handler in a fan-out is independent. If handler A fails and handler B succeeds, only A's invocation produces a dead letter — B is unaffected. The same rule applies to batch handlers: only the failing chunk's events land in the DLQ.

disp.Register("order.placed",
    auditLogger,     // fails -> one dead letter per event
    invoiceHandler,  // succeeds -> no dead letters
)

Panic recovery

The DLQ also catches handler panics. In sync and async modes alike, the panic is converted to an error, routed to the DLQ and error hooks, and execution continues. The worker pool is never killed by a misbehaving handler.

disp.Register("risky", func(ctx context.Context, ev GoEventBus.Event) (GoEventBus.Result, error) {
    panic("something went wrong") // caught — not fatal
})

store.DLQ = GoEventBus.NewDeadLetterQueue()
store.Publish()

dl := store.DLQ.Entries()[0]
fmt.Println(dl.Err) // "handler panic: something went wrong"

Transactions

Group multiple events into a single commit. If any handler returns an error, Commit stops and returns that error. Call Rollback to discard buffered events without publishing.

type OrderPayload struct{ OrderID string }
type InvoicePayload struct{ OrderID string }

disp := GoEventBus.Dispatcher{}
disp.Register("order.created", handleOrder)
disp.Register("invoice.issued", handleInvoice)

store := GoEventBus.NewEventStore(&disp, 1<<16, GoEventBus.DropOldest)

tx := store.BeginTransaction()

tx.Publish(GoEventBus.Event{
    ID:         "ev-1",
    Projection: "order.created",
    Data:       OrderPayload{OrderID: "ord-99"},
})
tx.Publish(GoEventBus.Event{
    ID:         "ev-2",
    Projection: "invoice.issued",
    Data:       InvoicePayload{OrderID: "ord-99"},
})

if err := tx.Commit(context.Background()); err != nil {
    tx.Rollback()
    log.Fatal("transaction failed:", err)
}

Scheduling

Fire an event at a specific point in time or after a duration. The returned *time.Timer can be stopped to cancel the event before it fires.

// Fire at an absolute time.
timer := store.Schedule(ctx, time.Now().Add(10*time.Second), ev)
if timer != nil {
    timer.Stop() // cancel before it fires
}

// Fire after a duration.
timer = store.ScheduleAfter(ctx, 30*time.Second, ev)

If the target time is in the past, or the duration is zero or negative, the event is executed synchronously and immediately without entering the ring buffer, and nil is returned.


API Reference

Dispatcher

type Dispatcher map[interface{}][]HandlerFunc

func (d Dispatcher) Register(projection interface{}, handlers ...HandlerFunc)

Register appends one or more handlers for the given projection. Calling it multiple times on the same key accumulates handlers in call order.

Event

type Event struct {
    ID         string
    Projection interface{}    // key used to look up handlers
    Data       any            // type-safe payload (preferred)
    Args       map[string]any // legacy payload (deprecated)
}

HandlerFunc / Middleware

type HandlerFunc func(context.Context, Event) (Result, error)
type Middleware  func(HandlerFunc) HandlerFunc

BatchHandlerFunc

type BatchHandlerFunc func(context.Context, []Event) ([]Result, error)

Receives a slice of events collected during a Publish cycle. Returns one Result per input event (may be nil or shorter — missing entries default to zero Result) and a single error covering the whole batch. Middleware is not applied; use lifecycle hooks for cross-cutting concerns.

Result

type Result struct {
    Message string
}

OverrunPolicy

const (
    DropOldest  OverrunPolicy = iota
    Block
    ReturnError
)

var ErrBufferFull = errors.New("goeventbus: buffer is full")

EventStore

Constructor

func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStore

Panics if dispatcher is nil or bufferSize is not a non-zero power of two.

Fields

Field Type Description
Async bool Enable async worker pool dispatch
DLQ *DeadLetterQueue Optional dead letter queue; nil by default

Methods

Method Description
Subscribe(ctx, Event) error Enqueue an event; applies back-pressure per OverrunPolicy
Publish() Dispatch all pending events to their handlers
RegisterBatch(projection, size, BatchHandlerFunc) Register a batch handler; events are delivered in chunks of up to size
Use(Middleware) Append middleware to the chain (applied to per-event handlers only)
OnBefore(BeforeHook) Register a hook that runs before each handler invocation
OnAfter(AfterHook) Register a hook that runs after each handler invocation
OnError(ErrorHook) Register a hook that runs when a handler errors
Drain(ctx) error Wait for all async handlers; shuts down the worker pool
Close(ctx) error Alias for Drain
Metrics() (published, processed, errors uint64) Snapshot event counters; processed counts individual events, not batch calls
Schedule(ctx, time.Time, Event) *time.Timer Fire an event at a time
ScheduleAfter(ctx, time.Duration, Event) *time.Timer Fire an event after a duration
BeginTransaction() *Transaction Returns a *Transaction scoped to this store

DeadLetter

type DeadLetter struct {
    Event    Event
    Err      error     // handler error, or a wrapped panic value
    FailedAt time.Time
    Attempts int       // 1 on first failure; incremented on each Replay call
}

DeadLetterQueue

func NewDeadLetterQueue() *DeadLetterQueue
Method Description
Len() int Number of entries currently in the queue
Entries() []DeadLetter Snapshot copy — safe to iterate without holding a lock
Drain() []DeadLetter Remove and return all entries, leaving the queue empty
Replay(ctx, store) error Re-subscribe all entries, call Publish, keep failures in queue

Attach to a store via store.DLQ = GoEventBus.NewDeadLetterQueue(). When DLQ is nil (default) behaviour is unchanged.

Transaction

Method Description
Publish(Event) Buffer an event within the transaction
Commit(ctx) error Enqueue and process all buffered events; stops on first error
Rollback() Discard all buffered events

Benchmarks

Run on Apple M-series with go test -bench . -benchtime=3s:

Benchmark Iterations ns/op
BenchmarkSubscribe 27,080,376 40.37
BenchmarkSubscribeParallel 26,418,999 38.42
BenchmarkPublish 295,661,464 3.91
BenchmarkPublishAfterPrefill 252,943,526 4.59
BenchmarkSubscribeLargePayload 1,613,017 771.5
BenchmarkPublishLargePayload 296,434,225 3.91
BenchmarkEventStore_Async 2,816,988 436.5
BenchmarkEventStore_Sync 2,638,519 428.5
BenchmarkFastHTTPSync 6,275,112 163.8
BenchmarkFastHTTPAsync 1,954,884 662.0
BenchmarkFastHTTPParallel 4,489,274 262.3

Contributing

Contributions, issues, and feature requests are welcome. See the issues page to get started.

License

Distributed under the MIT License. See LICENSE for details.

Join libs.tech

...and unlock some superpowers

GitHub

We won't share your data with anyone else.