The Principal Dev – Masterclass for Tech Leads

The Principal Dev – Masterclass for Tech LeadsJuly 17-18

Join

GoEventBus

A blazing‑fast, in‑memory, lock‑free event bus for Go—ideal for low‑latency pipelines, microservices, and game loops.

Go Report Card

📚 Table of Contents

Features

Why GoEventBus?

Modern Go apps demand lightning‑fast, non‑blocking communication—but channels can bottleneck and external brokers add latency, complexity and ops overhead. GoEventBus is your in‑process, lock‑free solution:

Whether you’re building real‑time analytics, high‑throughput microservices, or game engines, GoEventBus keeps your events moving at Go‑speed.

Installation

go get github.com/Raezil/GoEventBus

Quick Start

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/Raezil/GoEventBus"
)

// Define a typed projection as a struct
type HouseWasSold struct{}

func main() {
	// Create a dispatcher mapping projections (string or struct) to handlers
	dispatcher := GoEventBus.Dispatcher{
		"user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) {
			userID := args["id"].(string)
			fmt.Println("User created with ID:", userID)
			return GoEventBus.Result{Message: "handled user_created"}, nil
		},
		HouseWasSold{}: func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) {
			address := args["address"].(string)
			price := args["price"].(int)
			fmt.Printf("House sold at %s for $%d\n", address, price)
			return GoEventBus.Result{Message: "handled HouseWasSold"}, nil
		},
	}

	// Initialise an EventStore with a 64K ring buffer and DropOldest overrun policy
	store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest)

	// Enable asynchronous processing
	store.Async = true

	// Enqueue a string-based event
	_ = store.Subscribe(context.Background(), GoEventBus.Event{
		ID:         "evt1",
		Projection: "user_created",
		Args:       map[string]any{"id": "12345"},
	})

	// Enqueue a struct-based event
	_ = store.Subscribe(context.Background(), GoEventBus.Event{
		ID:         "evt2",
		Projection: HouseWasSold{},
		Args:       map[string]any{"address": "123 Main St", "price": 500000},
	})

	// Process pending events
	store.Publish()

	// Wait for all async handlers to finish
	if err := store.Drain(context.Background()); err != nil {
		log.Fatalf("Failed to drain EventStore: %v", err)
	}

	// Retrieve metrics
	published, processed, errors := store.Metrics()
	fmt.Printf("published=%d processed=%d errors=%d\n", published, processed, errors)
}

Transactions

GoEventBus now supports atomic transactions, allowing you to group multiple events and commit them together. This ensures that either all events are successfully published and handled, or none are.

package main

import (
	"context"
	"log"

	"github.com/Raezil/GoEventBus"
)

func main() {
	// Begin a new transaction on the existing EventStore

	// Create a dispatcher mapping projections to handlers
	dispatcher := GoEventBus.Dispatcher{
		"user_created": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) {
			return GoEventBus.Result{Message: "handled user_created"}, nil
		},
		"send_email": func(ctx context.Context, args map[string]any) (GoEventBus.Result, error) {
			log.Println("Hello")
			return GoEventBus.Result{Message: "handled send_email"}, nil
		},
	}

	// Initialise an EventStore with a 64K ring buffer and DropOldest policy
	store := GoEventBus.NewEventStore(&dispatcher, 1<<16, GoEventBus.DropOldest)
	tx := store.BeginTransaction()

	// Buffer multiple events
	tx.Publish(GoEventBus.Event{
		ID:         "evtA",
		Projection: "user_created",
		Args:       map[string]any{"id": "12345"},
	})
	tx.Publish(GoEventBus.Event{
		ID:         "evtB",
		Projection: "send_email",
		Args:       map[string]any{"template": "welcome", "userID": "12345"},
	})
	tx.Rollback()

	// Commit the transaction atomically
	if err := tx.Commit(context.Background()); err != nil {
		log.Fatalf("transaction failed: %v", err)
	}
}

API Reference

type Result

type Result struct {
    Message string // Outcome message from handler
}

type Dispatcher

type Dispatcher map[interface{}]func(context.Context, map[string]any) (Result, error)

A map from projection keys to handler functions. Handlers receive a context.Context and an argument map, and return a Result and an error.

type Event

type Event struct {
    ID         string          // Unique identifier for the event
    Projection interface{}     // Key to look up the handler in the dispatcher
    Args       map[string]any  // Payload data for the event
}

type Transaction

type Transaction struct {
    store  *EventStore
    events []Event
    startHead uint64 // head position when transaction began

}

type OverrunPolicy

type OverrunPolicy int

const (
    DropOldest  OverrunPolicy = iota // Default: discard oldest events
    Block                            // Block until space is available
    ReturnError                      // Fail fast with ErrBufferFull
)

type EventStore

type EventStore struct {
    dispatcher     *Dispatcher      // Pointer to the dispatcher map
    size           uint64           // Buffer size (power of two)
    buf            []unsafe.Pointer // Ring buffer of event pointers
    events         []Event          // Backing slice for Event data
    head           uint64           // Atomic write index
    tail           uint64           // Atomic read index

    // Config flags
    Async          bool
    OverrunPolicy  OverrunPolicy

    // Counters
    publishedCount uint64
    processedCount uint64
    errorCount     uint64
}

NewEventStore

func NewEventStore(dispatcher *Dispatcher, bufferSize uint64, policy OverrunPolicy) *EventStore

Creates a new EventStore with the provided dispatcher, ring buffer size, and overrun policy.

Subscribe

func (es *EventStore) Subscribe(ctx context.Context, e Event) error

Atomically enqueues an Event for later publication, applying back‑pressure according to OverrunPolicy. If OverrunPolicy is ReturnError and the buffer is full, the function returns ErrBufferFull.

Publish

func (es *EventStore) Publish()

Dispatches all events from the last published position to the current head. If Async is true, handlers run in separate goroutines; otherwise they run in the caller's goroutine.

Drain

func (es *EventStore) Drain(ctx context.Context) error

Blocks until all in-flight asynchronous handlers complete, then stops the worker pool. Returns an error if the provided context.Context is canceled or its deadline is exceeded.

Close

func (es *EventStore) Close(ctx context.Context) error

Drains all pending asynchronous handlers and shuts down the EventStore. Blocks until all in-flight handlers complete or the provided context.Context is canceled. Returns an error if the context’s deadline is exceeded or it is otherwise canceled.

Metrics

func (es *EventStore) Metrics() (published, processed, errors uint64)

Returns the total count of published, processed, and errored events.

Back-pressure and Overrun Policies

GoEventBus provides three strategies for handling a saturated ring buffer:

Policy Behaviour When to use
DropOldest Atomically advances the read index, discarding the oldest event to make room for the new one. Low‑latency scenarios where the newest data is most valuable and occasional loss is acceptable.
Block Causes Subscribe to block (respecting its context) until space becomes available. Workloads that must not lose events but can tolerate the latency of back‑pressure.
ReturnError Subscribe returns ErrBufferFull immediately, allowing the caller to decide what to do. Pipelines where upstream logic controls retries and failures explicitly.

DropOldest is the default behaviour and matches releases prior to April 2025.

💡 Use Cases

GoEventBus is ideal for scenarios where low‑latency, high‑throughput, and non‑blocking event dispatching is needed:

Benchmarks

All benchmarks were run with Go’s testing harness (go test -bench .) on an -8 procs configuration. Numbers below are from the April 2025 release.

Benchmark Iterations ns/op
BenchmarkSubscribe-8 27,080,376 40.37
BenchmarkSubscribeParallel-8 26,418,999 38.42
BenchmarkPublish-8 295,661,464 3.910
BenchmarkPublishAfterPrefill-8 252,943,526 4.585
BenchmarkSubscribeLargePayload-8 1,613,017 771.5
BenchmarkPublishLargePayload-8 296,434,225 3.910
BenchmarkEventStore_Async-8 2,816,988 436.5
BenchmarkEventStore_Sync-8 2,638,519 428.5
BenchmarkFastHTTPSync-8 6,275,112 163.8
BenchmarkFastHTTPAsync-8 1,954,884 662.0
BenchmarkFastHTTPParallel-8 4,489,274 262.3

Contributing

Contributions, issues, and feature requests are welcome! Feel free to check the issues page.

License

Distributed under the MIT License. See LICENSE for more information.

Join libs.tech

...and unlock some superpowers

GitHub

We won't share your data with anyone else.