Users Created Event
Verified Scenario
We test this scenario against the current GoForj templates, including the generated files, wiring changes, commands, and verification steps.
This scenario extends the user profile flow with a POST /api/v1/users endpoint that publishes a typed users.created event after a user is saved.
The event announces that something happened. The subscriber reacts to it. Durable background work, retries, and worker lifecycle belong in a job and queue, which the next scenario introduces.
What You Will Build
EVENTS_*config selects the local event driver.internal/events.UserCreateddefines the typed event.UserServicepublishes through a small application boundary.notifications.Subscribersreacts tousers.created.internal/app/lifecycle_registry.goregisters and closes the subscription.- A service test proves the event is published without starting the App runtime.
Prerequisites
Complete Cached User Profile first.
The generated App should have event support enabled. Verify that the generated event package exists:
internal/eventsGolden Path State
Before this scenario, user profile behavior is synchronous: HTTP calls the controller, the controller calls the service, and the service reads application state through the cached repository.
After this scenario, creating a user also publishes a typed users.created fact. A subscriber reacts to the event through lifecycle-registered wiring, but durable work is still left for the next job scenario.
Files
This scenario edits or creates:
Configuration
.envEvents
internal/events/user_created_event.goUsers feature
internal/users/repository.go
internal/users/events.go
internal/users/service.go
internal/users/service_test.go
internal/users/controller.goNotifications
internal/notifications/service.go
internal/notifications/subscribers.goLifecycle and wiring
internal/app/lifecycle_registry.go
wire/inject_app_services.goThe event generator may update generated event manager files.
internal/events/accessors_gen.go
internal/events/manager_gen.goDo not edit generated event files by hand.
Step 1: Configure Events
Use the in-process event driver for local development. The generated App exposes app.Bus() and app.Events().
Update .env so it includes:
EVENTS_INPROC_WORKERS=0forj buildStep 2: Add The Event
Create internal/events/user_created_event.go.
Events should be small typed facts. They should not carry full domain objects, request payloads, or driver-specific metadata.
Create or replace internal/events/user_created_event.go:
package events
const UserCreatedTopic = "users.created"
type UserCreated struct {
UserID string `json:"user_id"`
Email string `json:"email"`
}
func (UserCreated) Topic() string {
return UserCreatedTopic
}Step 3: Extend The Repository
Replace internal/users/repository.go.
The repository still owns source-of-truth storage and cache-aside reads. This example uses memory so the scenario stays local and runnable.
Create or replace internal/users/repository.go:
package users
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/goforj/cache"
)
const profileCacheTTL = 5 * time.Minute
type UserRepository interface {
Find(ctx context.Context, id string) (User, error)
Save(ctx context.Context, user User) (User, error)
}
type MemoryUserRepository struct {
mu sync.Mutex
nextID int
users map[string]User
}
func NewMemoryUserRepository() *MemoryUserRepository {
return &MemoryUserRepository{
nextID: 43,
users: map[string]User{
"42": {
ID: "42",
Name: "Ada Lovelace",
Email: "ada@example.test",
},
},
}
}
func (r *MemoryUserRepository) Find(ctx context.Context, id string) (User, error) {
r.mu.Lock()
defer r.mu.Unlock()
user, ok := r.users[id]
if !ok {
return User{}, ErrUserNotFound
}
return user, nil
}
func (r *MemoryUserRepository) Save(ctx context.Context, user User) (User, error) {
r.mu.Lock()
defer r.mu.Unlock()
if user.ID == "" {
user.ID = strconv.Itoa(r.nextID)
r.nextID++
}
r.users[user.ID] = user
return user, nil
}
type CachedUserRepository struct {
source UserRepository
profileCache *cache.Cache
}
func NewCachedUserRepository(source UserRepository, profileCache *cache.Cache) *CachedUserRepository {
return &CachedUserRepository{
source: source,
profileCache: profileCache,
}
}
func (r *CachedUserRepository) Find(ctx context.Context, id string) (User, error) {
key := profileCacheKey(id)
user, ok, err := cache.Get[User](r.profileCache.WithContext(ctx), key)
if err != nil {
return User{}, fmt.Errorf("read user profile cache: %w", err)
}
if ok {
return user, nil
}
user, err = r.source.Find(ctx, id)
if err != nil {
return User{}, err
}
if err := cache.Set(r.profileCache.WithContext(ctx), key, user, profileCacheTTL); err != nil {
return User{}, fmt.Errorf("write user profile cache: %w", err)
}
return user, nil
}
func (r *CachedUserRepository) Save(ctx context.Context, user User) (User, error) {
user, err := r.source.Save(ctx, user)
if err != nil {
return User{}, err
}
if err := cache.Set(r.profileCache.WithContext(ctx), profileCacheKey(user.ID), user, profileCacheTTL); err != nil {
return User{}, fmt.Errorf("write user profile cache: %w", err)
}
return user, nil
}
func profileCacheKey(id string) string {
return "users:" + id + ":profile"
}Step 4: Add An Event Publisher Boundary
Create internal/users/events.go.
The service will depend on UserEvents, not on global App state. That keeps tests direct and keeps the event boundary visible.
Create or replace internal/users/events.go:
package users
import (
"context"
"your/module/internal/events"
)
type UserEvents interface {
PublishCreated(ctx context.Context, user User) error
}
type UserEventPublisher struct {
bus events.Bus
}
func NewUserEventPublisher(bus events.Bus) *UserEventPublisher {
return &UserEventPublisher{bus: bus}
}
func (p *UserEventPublisher) PublishCreated(ctx context.Context, user User) error {
return p.bus.WithContext(ctx).Publish(events.UserCreated{
UserID: user.ID,
Email: user.Email,
})
}Step 5: Publish From The Service
Replace internal/users/service.go.
The service owns the application workflow: validate, save, publish. The repository owns cache-aside access. The controller still owns HTTP concerns.
Create or replace internal/users/service.go:
package users
import (
"context"
"errors"
"fmt"
"strings"
)
var (
ErrUserNotFound = errors.New("user not found")
ErrNameRequired = errors.New("name is required")
ErrEmailRequired = errors.New("email is required")
)
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
type CreateUserInput struct {
Name string
Email string
}
type Service struct {
repo UserRepository
userEvents UserEvents
}
func NewService(repo UserRepository, userEvents UserEvents) *Service {
return &Service{
repo: repo,
userEvents: userEvents,
}
}
func (s *Service) Find(ctx context.Context, id string) (User, error) {
if id == "" {
return User{}, ErrUserNotFound
}
return s.repo.Find(ctx, id)
}
func (s *Service) Create(ctx context.Context, input CreateUserInput) (User, error) {
name := strings.TrimSpace(input.Name)
if name == "" {
return User{}, ErrNameRequired
}
email := strings.TrimSpace(input.Email)
if email == "" {
return User{}, ErrEmailRequired
}
user, err := s.repo.Save(ctx, User{
Name: name,
Email: email,
})
if err != nil {
return User{}, fmt.Errorf("save user: %w", err)
}
if err := s.userEvents.PublishCreated(ctx, user); err != nil {
return User{}, fmt.Errorf("publish user created event: %w", err)
}
return user, nil
}Step 6: Add The Subscriber
Create internal/notifications/service.go and internal/notifications/subscribers.go.
The subscriber reacts to a typed fact. It should not become a hidden replacement for explicit service orchestration.
Create or replace internal/notifications/service.go:
package notifications
import "context"
type Service struct{}
func NewService() *Service {
return &Service{}
}
func (s *Service) SendWelcome(ctx context.Context, userID string, email string) error {
return nil
}Step 7: Add Subscriber Registration
Subscribers.Register subscribes to the typed users.created event.
Create or replace internal/notifications/subscribers.go:
package notifications
import (
"context"
"your/module/internal/events"
)
type Subscribers struct {
service *Service
}
func NewSubscribers(service *Service) *Subscribers {
return &Subscribers{service: service}
}
func (s *Subscribers) Register(ctx context.Context, bus events.Bus) (events.Subscription, error) {
return bus.WithContext(ctx).Subscribe(func(ctx context.Context, event events.UserCreated) error {
return s.service.SendWelcome(ctx, event.UserID, event.Email)
})
}Step 8: Register Subscribers In The Lifecycle
Update internal/app/lifecycle_registry.go.
This keeps subscriber registration in the App lifecycle, not in init, package globals, or controller constructors.
Create or replace internal/app/lifecycle_registry.go:
package app
import (
"context"
"your/module/internal/events"
"your/module/internal/notifications"
)
type LifecycleRegistry struct {
eventManager *events.Manager
notificationSubscribers *notifications.Subscribers
notificationSubscription events.Subscription
}
func NewLifecycleRegistry(
eventManager *events.Manager,
notificationSubscribers *notifications.Subscribers,
) *LifecycleRegistry {
return &LifecycleRegistry{
eventManager: eventManager,
notificationSubscribers: notificationSubscribers,
}
}
func (r *LifecycleRegistry) Register(lifecycle *Lifecycle) {
lifecycle.On(Startup, func(ctx context.Context) error {
subscription, err := r.notificationSubscribers.Register(ctx, r.eventManager.Default())
if err != nil {
return err
}
r.notificationSubscription = subscription
return nil
})
lifecycle.On(Shutdown, func(ctx context.Context) error {
return r.notificationSubscription.Close()
})
}Step 9: Update The Controller
Update internal/users/controller.go so it supports both GET /users/:id and POST /users.
Create or replace internal/users/controller.go:
package users
import (
"errors"
"net/http"
"github.com/goforj/web"
)
type Controller struct {
service *Service
}
type CreateRequest struct {
Name string `json:"name"`
Email string `json:"email"`
}
func NewController(service *Service) *Controller {
return &Controller{service: service}
}
func (c *Controller) Routes() []web.Route {
return []web.Route{
web.NewRoute(http.MethodGet, "/users/:id", c.Show),
web.NewRoute(http.MethodPost, "/users", c.Store),
}
}
func (c *Controller) Show(ctx web.Context) error {
user, err := c.service.Find(ctx.Context(), ctx.Param("id"))
if errors.Is(err, ErrUserNotFound) {
return ctx.JSON(http.StatusNotFound, map[string]string{
"error": "user not found",
})
}
if err != nil {
return err
}
return ctx.JSON(http.StatusOK, user)
}
func (c *Controller) Store(ctx web.Context) error {
var req CreateRequest
if err := ctx.Bind(&req); err != nil {
return ctx.JSON(http.StatusBadRequest, map[string]string{
"error": "invalid payload",
})
}
user, err := c.service.Create(ctx.Context(), CreateUserInput{
Name: req.Name,
Email: req.Email,
})
if errors.Is(err, ErrNameRequired) || errors.Is(err, ErrEmailRequired) {
return ctx.JSON(http.StatusBadRequest, map[string]string{
"error": err.Error(),
})
}
if err != nil {
return err
}
return ctx.JSON(http.StatusCreated, user)
}Step 10: Wire The Event Boundary And Subscriber
Add the event publisher and notification subscriber providers.
Update wire/inject_app_services.go so it includes:
"your/module/internal/makecmd"
"your/module/internal/notifications"Step 11: Add Event Providers
provideEventBus exposes the generated default event bus to the application publisher.
Update wire/inject_app_services.go so it includes:
provideEventBus,
users.NewUserEventPublisher,
wire.Bind(new(users.UserEvents), new(*users.UserEventPublisher)),
users.NewService,
notifications.NewService,
notifications.NewSubscribers,Step 12: Add The Event Bus Provider
Wire can now satisfy users.NewUserEventPublisher.
Update wire/inject_app_services.go so it includes:
func provideEventBus(manager *events.Manager) events.Bus {
return manager.Default()
}
func provideUserRepository(source *users.MemoryUserRepository, profileCache *cache.Cache) users.UserRepository {Step 13: Update The Service Test
The test uses a small event boundary fake. It does not need to start the event bus or the App runtime.
Create or replace internal/users/service_test.go:
package users
import (
"context"
"testing"
"github.com/goforj/cache"
)
type recordingUserEvents struct {
created []User
}
func (r *recordingUserEvents) PublishCreated(ctx context.Context, user User) error {
r.created = append(r.created, user)
return nil
}
func TestServiceFindsUser(t *testing.T) {
ctx := context.Background()
service := NewService(
NewCachedUserRepository(NewMemoryUserRepository(), cache.NewCache(cache.NewMemoryStore(ctx))),
&recordingUserEvents{},
)
user, err := service.Find(ctx, "42")
if err != nil {
t.Fatalf("find user: %v", err)
}
if user.ID != "42" {
t.Fatalf("user id = %q, want %q", user.ID, "42")
}
}
func TestServicePublishesUserCreatedEvent(t *testing.T) {
ctx := context.Background()
events := &recordingUserEvents{}
service := NewService(
NewCachedUserRepository(NewMemoryUserRepository(), cache.NewCache(cache.NewMemoryStore(ctx))),
events,
)
user, err := service.Create(ctx, CreateUserInput{
Name: "Grace Hopper",
Email: "grace@example.test",
})
if err != nil {
t.Fatalf("create user: %v", err)
}
if user.ID == "" {
t.Fatal("expected saved user id")
}
if len(events.created) != 1 {
t.Fatalf("created events = %d, want 1", len(events.created))
}
if events.created[0].Email != "grace@example.test" {
t.Fatalf("created event email = %q", events.created[0].Email)
}
}
func TestServiceRejectsEmptyID(t *testing.T) {
ctx := context.Background()
service := NewService(
NewCachedUserRepository(NewMemoryUserRepository(), cache.NewCache(cache.NewMemoryStore(ctx))),
&recordingUserEvents{},
)
_, err := service.Find(ctx, "")
if err == nil {
t.Fatal("expected error")
}
}Build And Verify
forj buildgo test ./...forj route:listExpected output includes:
/api/v1/users
Try The Route
Run the HTTP server:
forj apiCreate a user:
curl -X POST http://localhost:3000/api/v1/users \
-H 'Content-Type: application/json' \
-d '{"name":"Grace Hopper","email":"grace@example.test"}'Expected response:
{"id":"43","name":"Grace Hopper","email":"grace@example.test"}The service saved the user, published users.created, and the lifecycle-registered subscriber handled the event.
Operations
Operational notes:
inprocis process-local and non-durable, which is useful for local same-process reactions.- Use a distributed event driver when events must cross process boundaries.
- Use a queue and job when work needs retries, delay, timeout, worker lifecycle, or operational replay.
- Published events and subscriber deliveries can appear in metrics, inspect records, Lighthouse runtime views, and driver configuration.
Swap The Driver
To publish through Redis in production, compile Redis event support and select it:
EVENTS_SUPPORTED_DRIVERS=inproc,redis
EVENTS_DRIVER=redis
EVENTS_ADDR=redis:6379Then run:
forj buildBusiness code does not change. The service still publishes through UserEvents, and the subscriber still registers against the generated bus.
Common Mistakes
Common mistakes
- Do not use events as a substitute for durable background jobs.
- Do not register subscribers in package
initfunctions. - Do not publish from controllers when the service owns the workflow.
- Do not put full user records, tokens, or request payloads in events.
- Do not assume
inprocevents are visible across processes. - Do not hide critical business workflows only inside subscribers.
Next Steps
- Next, dispatch durable work from an event subscriber with Reports Generate Job.
