
events is a typed event bus library for local dispatch and distributed pub/sub.
What events is
events is a typed event bus for Go and handles event publication and fan-out. Durable background work such as retries and worker queues belongs in queue.
It lets applications publish and subscribe to events using normal Go types, with delivery handled either in-process or through distributed backends like NATS, Redis, Kafka, or Google Pub/Sub.
Installation
go get github.com/goforj/eventsQuick Start
package main
import (
"context"
"fmt"
"github.com/goforj/events"
)
type UserCreated struct {
ID string `json:"id"`
}
func main() {
bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
fmt.Println("received", event.ID, ctx != nil)
return nil
})
_ = bus.Publish(UserCreated{ID: "123"})
}Topic Override
type UserCreated struct {
ID string `json:"id"`
}
func (UserCreated) Topic() string { return "users.created" }Drivers
Optional distributed backends are separate modules. Keep dependencies lean and install only what you use:
go get github.com/goforj/events/driver/natsevents
go get github.com/goforj/events/driver/redisevents
go get github.com/goforj/events/driver/kafkaevents
go get github.com/goforj/events/driver/natsjetstreamevents
go get github.com/goforj/events/driver/gcppubsubevents
go get github.com/goforj/events/driver/snseventsDrivers
| Driver / Backend | Mode | Fan-out | Durable | Queue Semantics | Notes |
|---|---|---|---|---|---|
| In-process | ✓ | x | x | Root-backed synchronous dispatch in the caller path. | |
| Drop-only | x | x | x | Root-backed no-op transport for disabled eventing and tests. | |
| Distributed pub/sub | ✓ | x | x | Subject-based transport with live integration coverage. | |
| Distributed stream | ✓ | Partial | x | Ephemeral JetStream consumers preserve subscribe/close semantics while adding durable stream storage. | |
| Distributed pub/sub | ✓ | x | x | Redis pub/sub transport; Streams are intentionally deferred. | |
| Distributed topic/log | ✓ | Partial | x | Current driver validates topic-based fan-out compatibility, not full consumer-group semantics. | |
| Distributed topic plus queue | ✓ | Partial | x | SNS fan-out with per-subscription SQS queues to preserve bus-style delivery semantics. | |
| Distributed topic/subscription | ✓ | Partial | x | Emulator-backed Google Pub/Sub integration with per-subscription fan-out mapping. | |
| Queue target | Planned | ✓ | ✓ | Deferred until a separate async capability surface is intentionally introduced. |
Driver Constructor Quick Examples
Use root constructors for local backends, and driver-module constructors for distributed backends. Driver backends live in separate modules so applications only import/link the optional dependencies they actually use.
package main
import (
"context"
"github.com/goforj/events"
"github.com/goforj/events/driver/gcppubsubevents"
"github.com/goforj/events/driver/kafkaevents"
"github.com/goforj/events/driver/natsjetstreamevents"
"github.com/goforj/events/driver/natsevents"
"github.com/goforj/events/driver/redisevents"
"github.com/goforj/events/driver/snsevents"
)
func main() {
ctx := context.Background()
events.NewSync()
events.NewNull()
natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})
natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})
redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})
kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})
gcppubsubevents.New(ctx, gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085",
})
snsevents.New(snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566",
})
}Benchmarks
To refresh the live benchmark snapshot and regenerate the charts:
sh scripts/refresh-bench-snapshot.shThese charts compare one publish-plus-delivery round trip for sync and each enabled distributed driver fixture.
Note: sns and gcppubsub run through local emulators in this repo, so read those results as development approximations rather than direct managed-service latency comparisons.
These checks are for obvious regression detection, not for noisy micro-optimism or hard CI performance gates.
API Index
| Group | Functions |
|---|---|
| Bus | Driver Ready ReadyContext |
| Config | Config gcppubsubevents.Config kafkaevents.Config natsevents.Config natsjetstreamevents.Config redisevents.Config snsevents.Config |
| Construction | New NewNull NewSync |
| Driver Constructors | gcppubsubevents.New kafkaevents.New natsevents.New natsjetstreamevents.New redisevents.New snsevents.New |
| Lifecycle | Close |
| Options | Option WithCodec |
| Publish | Publish PublishContext TopicEvent |
| Subscribe | Subscribe SubscribeContext Subscription |
| Testing | Fake Fake.Bus Fake.Count Fake.Records Fake.Reset NewFake Record |
Bus
Driver
Driver reports the active backend.
bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: syncReady
Ready reports whether the bus is ready.
bus, _ := events.NewSync()
fmt.Println(bus.Ready() == nil)
// Output: trueReadyContext
ReadyContext reports whether the bus is ready.
bus, _ := events.NewSync()
fmt.Println(bus.ReadyContext(context.Background()) == nil)
// Output: trueConfig
Config
Config configures root bus construction.
Example: define bus construction config
cfg := events.Config{Driver: eventscore.DriverSync}Example: define bus construction config with all fields
cfg := events.Config{
Driver: eventscore.DriverSync, // default: "sync" when empty and no Transport is provided
Codec: nil, // default: nil uses the built-in JSON codec
Transport: nil, // default: nil keeps dispatch in-process
}gcppubsubevents.Config
Config configures Google Pub/Sub transport construction.
Example: define Google Pub/Sub driver config
cfg := gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085",
}Example: define Google Pub/Sub driver config with all fields
cfg := gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085", // default: "" is invalid unless Client is provided
Client: nil, // default: nil creates a client from ProjectID and URI
}kafkaevents.Config
Config configures Kafka transport construction.
Example: define Kafka driver config
cfg := kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}}Example: define Kafka driver config with all fields
cfg := kafkaevents.Config{
Brokers: []string{"127.0.0.1:9092"},
Dialer: nil, // default: nil uses a zero-value kafka.Dialer
Writer: nil, // default: nil builds a writer with single-message, auto-topic defaults
}natsevents.Config
Config configures NATS transport construction.
Example: define NATS driver config
cfg := natsevents.Config{URL: "nats://127.0.0.1:4222"}Example: define NATS driver config with all fields
cfg := natsevents.Config{
URL: "nats://127.0.0.1:4222",
Conn: nil, // default: nil dials URL instead of reusing an existing connection
}natsjetstreamevents.Config
Config configures NATS JetStream transport construction.
Example: define NATS JetStream driver config
cfg := natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"}Example: define NATS JetStream driver config with all fields
cfg := natsjetstreamevents.Config{
URL: "nats://127.0.0.1:4222",
Conn: nil, // default: nil dials URL instead of reusing an existing connection
SubjectPrefix: "events.", // default: "events."
StreamNamePrefix: "EVENTS_", // default: "EVENTS_"
InactiveThreshold: 30 * time.Second, // default: 30s
AckWait: 30 * time.Second, // default: 30s
FetchMaxWait: 250 * time.Millisecond, // default: 250ms
Storage: jetstream.MemoryStorage,// default: MemoryStorage
}redisevents.Config
Config configures Redis transport construction.
Example: define Redis driver config
cfg := redisevents.Config{Addr: "127.0.0.1:6379"}Example: define Redis driver config with all fields
cfg := redisevents.Config{
Addr: "127.0.0.1:6379",
Client: nil, // default: nil constructs a client from Addr
}snsevents.Config
Config configures SNS transport construction.
Example: define SNS driver config
cfg := snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566",
}Example: define SNS driver config with all fields
cfg := snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566", // default: "" uses normal AWS resolution
SNSClient: nil, // default: nil creates a client from Region and Endpoint
SQSClient: nil, // default: nil creates a client from Region and Endpoint
TopicNamePrefix: "events-", // default: ""
QueueNamePrefix: "events-", // default: ""
WaitTimeSeconds: 1, // default: 1
VisibilityTimeout: 30, // default: 30
}Construction
New
New constructs a root bus for the requested driver.
bus, _ := events.New(events.Config{Driver: "sync"})
fmt.Println(bus.Driver())
// Output: syncNewNull
NewNull constructs the root null bus.
bus, _ := events.NewNull()
fmt.Println(bus.Driver())
// Output: nullNewSync
NewSync constructs the root sync bus.
bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: syncDriver Constructors
gcppubsubevents.New
New constructs a Google Pub/Sub-backed driver.
driver, _ := gcppubsubevents.New(context.Background(), gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085",
})kafkaevents.New
New constructs a Kafka-backed driver.
driver, _ := kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})natsevents.New
New connects a NATS-backed driver from config.
driver, _ := natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})natsjetstreamevents.New
New connects a NATS JetStream-backed driver from config.
driver, _ := natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})redisevents.New
New constructs a Redis pub/sub-backed driver.
driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})snsevents.New
New constructs an SNS-backed driver.
driver, _ := snsevents.New(snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566",
})Lifecycle
Close
Close closes the underlying Pub/Sub client.
driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})Options
Option
Option configures root bus behavior.
WithCodec
WithCodec overrides the default event codec.
bus, _ := events.NewSync(events.WithCodec(nil))
fmt.Println(bus.Driver())
// Output: syncPublish
Publish
Publish publishes an event using the background context.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(event UserCreated) {
fmt.Println(event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123PublishContext
PublishContext publishes an event using the configured codec and dispatch flow.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
fmt.Println(event.ID, ctx != nil)
return nil
})
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 trueTopicEvent
TopicEvent overrides the derived topic for an event.
Subscribe
Subscribe
Subscribe registers a handler using the background context.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(ctx context.Context, event UserCreated) error {
fmt.Println(event.ID)
return nil
})
defer sub.Close()
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123SubscribeContext
SubscribeContext registers a typed handler.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
sub, _ := bus.SubscribeContext(context.Background(), func(ctx context.Context, event UserCreated) error {
fmt.Println(event.ID, ctx != nil)
return nil
})
defer sub.Close()
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 trueSubscription
Subscription releases a subscription when closed.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(event UserCreated) {
fmt.Println("received", event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
_ = sub.Close()
_ = bus.Publish(UserCreated{ID: "456"})
// Output: received 123Testing
Fake
Fake provides a root-package testing helper that records published events.
fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0Fake.Bus
Bus returns the wrapped API to inject into code under test.
fake := events.NewFake()
bus := fake.Bus()
fmt.Println(bus.Ready() == nil)
// Output: trueFake.Count
Count returns the total number of recorded publishes.
type UserCreated struct {
ID string `json:"id"`
}
fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(fake.Count())
// Output: 1Fake.Records
Records returns a copy of recorded publishes.
type UserCreated struct {
ID string `json:"id"`
}
fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(len(fake.Records()))
// Output: 1Fake.Reset
Reset clears recorded publishes.
type UserCreated struct {
ID string `json:"id"`
}
fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fake.Reset()
fmt.Println(fake.Count())
// Output: 0NewFake
NewFake creates a new fake event harness backed by the root sync bus.
fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0Record
Record captures one published event observed by a Fake bus.
type UserCreated struct {
ID string `json:"id"`
}
record := events.Record{Event: UserCreated{ID: "123"}}
fmt.Printf("%T\n", record.Event)
// Output: main.UserCreatedDocs Tooling
The repository includes lightweight docs tooling under docs/.
Run the watcher to auto-regenerate docs on file changes:
sh docs/watcher.sh