Skip to content
Support this libraryStar on GitHub

queue logo

queue is a queue and workflow library with pluggable backends and runtime extensions.

Go ReferenceLicense: MITGo TestGo versionLatest tagGo Report CardUnit tests (executed count)Integration tests (executed count)

Installation

bash
go get github.com/goforj/queue

Quick Start

go
import (
	"context"
	"fmt"

	"github.com/goforj/queue"
)

func main() {
	q, _ := queue.NewWorkerpool(
		queue.WithWorkers(2), // optional; default: runtime.NumCPU() (min 1)
	)
	type EmailPayload struct {
		To string `json:"to"`
	}

	q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
		var payload EmailPayload
		_ = m.Bind(&payload)
		fmt.Println("send to", payload.To)
		return nil
	})

	_ = q.StartWorkers(context.Background())
	defer q.Shutdown(context.Background())

	_, _ = q.Dispatch(
		queue.NewJob("emails:send").
			Payload(EmailPayload{To: "user@example.com"}),
	)
}

Drivers

Driver / BackendModeNotesDurableAsyncDelayUniqueBackoffTimeoutNative Stats
NullDrop-onlyDiscards dispatched jobs; useful for disabled queue modes and smoke tests.-------
SyncInline (caller)Deterministic local execution with no external infra.-----
WorkerpoolIn-process poolLocal async behavior without external broker/database.--
MySQLSQL durable queueMySQL driver module (driver/mysqlqueue) built on shared SQL queue core.
PostgresSQL durable queuePostgres driver module (driver/postgresqueue) built on shared SQL queue core.
SQLiteSQL durable queueSQLite driver module (driver/sqlitequeue) built on shared SQL queue core.
RedisRedis/AsynqProduction Redis backend (Asynq semantics).-
NATSBroker targetNATS transport with queue-subject routing.--
SQSBroker targetAWS SQS transport with endpoint overrides for localstack/testing.--
RabbitMQBroker targetRabbitMQ transport and worker consumption.--

SQL-backed queues (sqlite, mysql, postgres) are durable and convenient, but they trade throughput for operational simplicity. They default to 1 worker, and increasing concurrency may require DB tuning (indexes, connection pool, lock contention). Prefer broker-backed drivers for higher-throughput workloads.

Driver constructor quick examples

Use root constructors for in-process backends, and driver-module constructors for external backends. See the Driver Constructors API section below for full constructor shapes (New(...) and NewWithConfig(...)). Driver backends live in separate packages so applications only import/link the optional backend dependencies they actually use (smaller builds, less dependency overhead, cleaner deploys).

go
package main

import (
	"github.com/goforj/queue"
	"github.com/goforj/queue/driver/mysqlqueue"
	"github.com/goforj/queue/driver/natsqueue"
	"github.com/goforj/queue/driver/postgresqueue"
	"github.com/goforj/queue/driver/rabbitmqqueue"
	"github.com/goforj/queue/driver/redisqueue"
	"github.com/goforj/queue/driver/sqlitequeue"
	"github.com/goforj/queue/driver/sqsqueue"
)

func main() {
	queue.NewSync()       // in-process sync
	queue.NewWorkerpool() // in-process worker pool
	queue.NewNull()       // drop-only / disabled mode

	sqlitequeue.New("file:queue.db?_busy_timeout=5000") // SQL durable queue (SQLite)
	mysqlqueue.New("user:pass@tcp(127.0.0.1:3306)/app") // SQL durable queue (MySQL)
	postgresqueue.New("postgres://user:pass@127.0.0.1:5432/app?sslmode=disable") // SQL durable queue (Postgres)

	redisqueue.New("127.0.0.1:6379") // Redis/Asynq
	natsqueue.New("nats://127.0.0.1:4222") // NATS
	sqsqueue.New("us-east-1") // SQS
	rabbitmqqueue.New("amqp://guest:guest@127.0.0.1:5672/") // RabbitMQ
}

Quick Start (Advanced: Workflows)

go
import (
	"context"

	"github.com/goforj/queue"
)

type EmailPayload struct {
	ID int `json:"id"`
}

func main() {
	q, _ := queue.NewWorkerpool()

	q.Register("reports:generate", func(ctx context.Context, m queue.Message) error {
		return nil
	})
	q.Register("reports:upload", func(ctx context.Context, m queue.Message) error {
		var payload EmailPayload
		if err := m.Bind(&payload); err != nil {
			return err
		}
		return nil
	})
	q.Register("users:notify_report_ready", func(ctx context.Context, m queue.Message) error {
		return nil
	})

	_ = q.StartWorkers(context.Background())
	defer q.Shutdown(context.Background())

	chainID, _ := q.Chain(
		// 1) generate report data
		queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
		// 2) upload report artifact after generate succeeds
		queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
		// 3) notify user only after upload succeeds
		queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
	).OnQueue("critical").Dispatch(context.Background())
	_ = chainID
}

Run as a Worker Service

Use Run(ctx) for long-lived workers: it starts processing, waits for shutdown signals, and performs graceful termination.

go
import (
	"context"
	"log"
	"os/signal"
	"syscall"

	"github.com/goforj/queue"
)

func main() {
	q, _ := queue.NewWorkerpool()

	// Register handlers before starting workers.
	q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
		return nil
	})

	// Create a context that is canceled on SIGINT/SIGTERM (Ctrl+C, container stop).
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	// Run starts workers, blocks until ctx is canceled, then gracefully shuts down.
	if err := q.Run(ctx); err != nil {
		log.Fatal(err)
	}
}

Core Concepts

Job: Typed work unit for app handlers.

go
_, _ = q.Dispatch(
	queue.NewJob("emails:send").Payload(EmailPayload{To: "user@example.com"}),
)

Chain: Ordered workflow (A then B then C).

go
_, _ = q.Chain(
	queue.NewJob("reports:generate"),
	queue.NewJob("reports:upload"),
	queue.NewJob("users:notify_report_ready"),
).Dispatch(context.Background())

Batch: Parallel workflow with callbacks.

go
_, _ = q.Batch(
	queue.NewJob("emails:send"),
	queue.NewJob("sms:send"),
).Then(queue.NewJob("notifications:done")).Dispatch(context.Background())

Middleware: Cross-cutting execution policy.

go
q, _ := queue.New(
	queue.Config{Driver: queue.DriverWorkerpool},
	queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)

Events: Lifecycle hooks and observability.

go
q, _ := queue.New(
	queue.Config{Driver: queue.DriverWorkerpool, Observer: queue.NewStatsCollector()},
)

Backends: Driver/runtime transport selection.

go
q, _ := queue.NewWorkerpool()
rq, _ := redisqueue.New("127.0.0.1:6379")
_, _ = q, rq

Job builder options

go
// Define a struct for your job payload.
type EmailPayload struct {
	ID int `json:"id"`
	To string `json:"to"`
}

// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
	// Payload can be bytes, structs, maps, or JSON-marshalable values.
	// Default payload is empty.
	Payload(EmailPayload{ID: 123, To: "user@example.com"}).
	// OnQueue sets the queue name.
	// Default is empty; broker-style drivers expect an explicit queue.
	OnQueue("default").
	// Timeout sets per-job execution timeout.
	// Default is unset; some drivers may apply driver/runtime defaults.
	Timeout(20 * time.Second).
	// Retry sets max retries.
	// Default is 0, which means one total attempt.
	Retry(3).
	// Backoff sets retry delay.
	// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
	Backoff(500 * time.Millisecond).
	// Delay schedules first execution in the future.
	// Default is 0 (run immediately).
	Delay(2 * time.Second).
	// UniqueFor deduplicates Type+Payload for a TTL window.
	// Default is 0 (no dedupe).
	UniqueFor(45 * time.Second)

// Dispatch the job to the queue.
_, _ = q.Dispatch(job)

// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	return nil
})

Benchmarks

Run local + integration-backed benchmarks (requires Docker/testcontainers):

bash
cd docs && GOWORK=off INTEGRATION_BACKEND=all GOCACHE=/tmp/queue-gocache go test -tags=benchrender ./bench -run '^TestRenderBenchmarks$'

Latency (ns/op)

Queue benchmark latency chart

Throughput (ops/s)

Queue benchmark throughput chart

Allocated Bytes (B/op)

Queue benchmark bytes chart

Allocations (allocs/op)

Queue benchmark allocations chart

Tables

ClassDriverns/opops/sB/opallocs/op
Externalnats7741291823125813
Externalredis9529510494211333
Externalrabbitmq1657806032188257
Externalsqlite2023804941193147
Externalpostgres1056731946380978
Externalsqs1873911534947841082
Externalmysql2286406437330362
Localnull37266737801281
Localsync28235398234086
Localworkerpool65015384624567

Middleware

Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior to workflow job execution (logging, filtering, and error policy).

Common patterns:

  • wrap handler execution (before/after logging, timing, tracing)
  • skip jobs conditionally (maintenance mode, feature flags)
  • convert matched errors into terminal failures (no retry)
go
var errValidation = errors.New("validation failed")
maintenanceMode := false

audit := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
	log.Printf("start job=%s", m.JobType)
	err := next(ctx, m)
	log.Printf("done job=%s err=%v", m.JobType, err)
	return err
})

skipMaintenance := queue.SkipWhen{
	Predicate: func(context.Context, queue.Message) bool {
		return maintenanceMode
	},
}

fatalValidation := queue.FailOnError{
	When: func(err error) bool {
		return errors.Is(err, errValidation)
	},
}

q, _ := queue.New(
	queue.Config{Driver: queue.DriverWorkerpool},
	queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)
_ = q

Observability

Use queue.Observer implementations to capture normalized runtime events across drivers.

go
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
    collector,
    queue.ObserverFunc(func(event queue.Event) {
        _ = event.Kind
    }),
)

q, _ := queue.New(queue.Config{
    Driver:   queue.DriverWorkerpool,
    Observer: observer,
})
_ = q

Distributed counters and source of truth

  • StatsCollector counters are process-local and event-driven.
  • In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
  • Prefer backend-native stats when available.
  • queue.SupportsNativeStats(q) indicates native driver snapshot support.
  • queue.Snapshot(ctx, q, collector) merges native + collector where possible.

Compose observers

go
events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
    collector,
    queue.ChannelObserver{
        Events:     events,
        DropIfFull: true,
    },
    queue.ObserverFunc(func(e queue.Event) {
        _ = e
    }),
)

q, _ := queue.New(queue.Config{
    Driver:   queue.DriverWorkerpool,
    Observer: observer,
})
_ = q

Kitchen sink event logging (runtime + workflow)

Runnable example: examples/observeall/main.go

go
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
	attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
	jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)

	switch event.Kind {
	case queue.EventEnqueueAccepted:
		logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
	case queue.EventEnqueueRejected:
		logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
	case queue.EventEnqueueDuplicate:
		logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
	case queue.EventEnqueueCanceled:
		logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
	case queue.EventProcessStarted:
		logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
	case queue.EventProcessSucceeded:
		logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
	case queue.EventProcessFailed:
		logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
	case queue.EventProcessRetried:
		logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
	case queue.EventProcessArchived:
		logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
	case queue.EventQueuePaused:
		logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
	case queue.EventQueueResumed:
		logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
	default:
		logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
	}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
	logger.Info("workflow event",
		"kind", event.Kind,
		"dispatch_id", event.DispatchID,
		"job_id", event.JobID,
		"chain_id", event.ChainID,
		"batch_id", event.BatchID,
		"job_type", event.JobType,
		"queue", event.Queue,
		"attempt", event.Attempt,
		"duration", event.Duration,
		"err", event.Err,
	)
})

q, _ := queue.New(
	queue.Config{
		Driver:   queue.DriverSync,
		Observer: runtimeObserver,
	},
	queue.WithObserver(workflowObserver),
)
_ = q

Events reference

TypeEventKindMeaning
queueenqueue_acceptedJob accepted by driver for enqueue.
queueenqueue_rejectedJob enqueue failed.
queueenqueue_duplicateDuplicate job rejected due to uniqueness key.
queueenqueue_canceledContext cancellation prevented enqueue.
queueprocess_startedWorker began processing job.
queueprocess_succeededHandler returned success.
queueprocess_failedHandler returned error.
queueprocess_retriedDriver scheduled retry attempt.
queueprocess_archivedJob moved to terminal failure state.
queuequeue_pausedQueue was paused (driver supports pause).
queuequeue_resumedQueue was resumed.
workflowdispatch_startedWorkflow runtime accepted a dispatch request and created a dispatch record.
workflowdispatch_succeededDispatch was successfully enqueued to the underlying queue runtime.
workflowdispatch_failedDispatch failed before job execution could start.
workflowjob_startedA workflow job handler started execution.
workflowjob_succeededA workflow job handler completed successfully.
workflowjob_failedA workflow job handler returned an error.
workflowchain_startedA chain workflow was created and started.
workflowchain_advancedChain progressed from one node to the next node.
workflowchain_completedChain reached terminal success.
workflowchain_failedChain reached terminal failure.
workflowbatch_startedA batch workflow was created and started.
workflowbatch_progressedBatch state changed as jobs completed/failed.
workflowbatch_completedBatch reached terminal success (or allowed-failure completion).
workflowbatch_failedBatch reached terminal failure.
workflowbatch_cancelledBatch was cancelled before normal completion.
workflowcallback_startedChain/batch callback execution started.
workflowcallback_succeededChain/batch callback completed successfully.
workflowcallback_failedChain/batch callback returned an error.

Examples

Runnable examples live in the separate examples module (./examples). They are not included when applications import github.com/goforj/queue, which keeps dependency graphs and build/link overhead smaller.

API reference

The API section below is autogenerated; do not edit between the markers.

API Index

GroupFunctions
ConstructorsNew NewNull NewStatsCollector NewSync NewWorkerpool
JobBackoff Bind Delay NewJob OnQueue Payload PayloadBytes PayloadJSON Retry Timeout UniqueFor
ObservabilityActive Archived Failed MultiObserver ChannelObserver.Observe Observer.Observe ObserverFunc.Observe StatsCollector.Observe Pause Paused Pending Processed Queue Queues Resume RetryCount SafeObserve Scheduled Snapshot StatsCollector.Snapshot SupportsNativeStats SupportsPause Throughput
QueueBatch Chain Dispatch DispatchCtx Driver FindBatch FindChain Pause Prune Register Resume Run Shutdown StartWorkers Stats WithClock WithMiddleware WithObserver WithStore WithWorkers Queue.WithWorkers
Driver Constructorsmysqlqueue.New mysqlqueue.NewWithConfig natsqueue.New natsqueue.NewWithConfig postgresqueue.New postgresqueue.NewWithConfig rabbitmqqueue.New rabbitmqqueue.NewWithConfig redisqueue.New redisqueue.NewWithConfig sqlitequeue.New sqlitequeue.NewWithConfig sqsqueue.New sqsqueue.NewWithConfig
TestingAssertBatchCount AssertBatched AssertChained AssertCount AssertDispatched AssertDispatchedOn AssertDispatchedTimes AssertNotDispatched AssertNothingBatched AssertNothingDispatched AssertNothingWorkflowDispatched AssertWorkflowDispatched AssertWorkflowDispatchedOn AssertWorkflowDispatchedTimes AssertWorkflowNotDispatched Count CountJob CountOn New Queue Records Reset Workflow

API

Constructors

queue.New

New creates the high-level Queue API based on Config.Driver.

go
q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	_ = payload
	return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
	queue.NewJob("emails:send").
		Payload(EmailPayload{ID: 1}).
		OnQueue("default"),
)

NewNull

NewNull creates a Queue on the null backend.

go
q, err := queue.NewNull()
if err != nil {
	return
}

NewStatsCollector

NewStatsCollector creates an event collector for queue counters.

go
collector := queue.NewStatsCollector()

NewSync

NewSync creates a Queue on the synchronous in-process backend.

go
q, err := queue.NewSync()
if err != nil {
	return
}

NewWorkerpool

NewWorkerpool creates a Queue on the in-process workerpool backend.

go
q, err := queue.NewWorkerpool()
if err != nil {
	return
}

Job

Backoff

Backoff sets delay between retries.

go
job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)

Bind

Bind unmarshals job payload JSON into dst.

go
type EmailPayload struct {
	ID int    `json:"id"`
	To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
	ID: 1,
	To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
	return
}
_ = payload.To

Delay

Delay defers execution by duration.

go
job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)

NewJob

NewJob creates a job value with a required job type.

go
job := queue.NewJob("emails:send")

OnQueue

OnQueue sets the target queue name.

go
job := queue.NewJob("emails:send").OnQueue("critical")

Payload

Payload sets job payload from common value types.

Example: payload bytes

go
jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))

Example: payload struct

go
type Meta struct {
	Nested bool `json:"nested"`
}
type EmailPayload struct {
	ID   int    `json:"id"`
	To   string `json:"to"`
	Meta Meta   `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
	ID:   1,
	To:   "user@example.com",
	Meta: Meta{Nested: true},
})

Example: payload map

go
jobMap := queue.NewJob("emails:send").Payload(map[string]any{
	"id":  1,
	"to":  "user@example.com",
	"meta": map[string]any{"nested": true},
})

PayloadBytes

PayloadBytes returns a copy of job payload bytes.

go
job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()

PayloadJSON

PayloadJSON marshals payload as JSON.

go
job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})

Retry

Retry sets max retry attempts.

go
job := queue.NewJob("emails:send").Retry(4)

Timeout

Timeout sets per-job execution timeout.

go
job := queue.NewJob("emails:send").Timeout(10 * time.Second)

UniqueFor

UniqueFor enables uniqueness dedupe within the given TTL.

go
job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)

Observability

Active

Active returns active count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Active: 2},
	},
}
fmt.Println(snapshot.Active("default"))
// Output: 2

Archived

Archived returns archived count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Archived: 7},
	},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7

Failed

Failed returns failed count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Failed: 2},
	},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2

MultiObserver

MultiObserver fans out events to multiple observers.

go
events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
	queue.ChannelObserver{Events: events},
	queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1

ChannelObserver.Observe

Observe forwards an event to the configured channel.

go
ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-ch

Observer.Observe

Observe handles a queue runtime event.

go
var observer queue.Observer
observer.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
})

ObserverFunc.Observe

Observe calls the wrapped function.

go
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
	logger.Info("queue event",
		"kind", event.Kind,
		"driver", event.Driver,
		"queue", event.Queue,
		"job_type", event.JobType,
		"attempt", event.Attempt,
		"max_retry", event.MaxRetry,
		"duration", event.Duration,
		"err", event.Err,
	)
})
observer.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobType: "emails:send",
})

StatsCollector.Observe

Observe records an event and updates normalized counters.

go
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})

Pause

Pause pauses queue consumption for drivers that support it.

go
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1

Paused

Paused returns paused count for a queue.

go
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventQueuePaused,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1

Pending

Pending returns pending count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Pending: 3},
	},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3

Processed

Processed returns processed count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Processed: 11},
	},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11

StatsSnapshot.Queue

Queue returns queue counters for a queue name.

go
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1

Queues

Queues returns sorted queue names present in the snapshot.

go
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "critical",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 critical

Resume

Resume resumes queue consumption for drivers that support it.

go
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0

RetryCount

RetryCount returns retry count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Retry: 1},
	},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1

SafeObserve

SafeObserve delivers an event to an observer and recovers observer panics.

This is an advanced helper intended for driver-module implementations.

Scheduled

Scheduled returns scheduled count for a queue.

go
snapshot := queue.StatsSnapshot{
	ByQueue: map[string]queue.QueueCounters{
		"default": {Scheduled: 4},
	},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4

Snapshot

Snapshot returns driver-native stats, falling back to collector data.

go
q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: true

StatsCollector.Snapshot

Snapshot returns a copy of collected counters.

go
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventEnqueueAccepted,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:   queue.EventProcessStarted,
	Driver: queue.DriverSync,
	Queue:  "default",
	JobKey: "job-1",
	Time:   time.Now(),
})
collector.Observe(queue.Event{
	Kind:     queue.EventProcessSucceeded,
	Driver:   queue.DriverSync,
	Queue:    "default",
	JobKey:  "job-1",
	Duration: 12 * time.Millisecond,
	Time:     time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}

SupportsNativeStats

SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.

go
q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: true

SupportsPause

SupportsPause reports whether a queue runtime supports Pause/Resume.

go
q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: true

Throughput

Throughput returns rolling throughput windows for a queue name.

go
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
	Kind:   queue.EventProcessSucceeded,
	Driver: queue.DriverSync,
	Queue:  "default",
	Time:   time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}

Queue

Batch

Batch creates a batch builder for fan-out workflow execution.

go
q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
	queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
	queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())

Chain

Chain creates a chain builder for sequential workflow execution.

go
q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
	queue.NewJob("first"),
	queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())

Queue.Dispatch

Dispatch enqueues a high-level job using context.Background.

go
q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)

Queue.DispatchCtx

DispatchCtx enqueues a high-level job using the provided context.

Queue.Driver

Driver reports the configured backend driver for the underlying queue runtime.

go
q, err := queue.NewSync()
if err != nil {
	return
}
fmt.Println(q.Driver())
// Output: sync

FindBatch

FindBatch returns current batch state by ID.

go
q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindBatch(context.Background(), batchID)

FindChain

FindChain returns current chain state by ID.

go
q, err := queue.NewSync()
if err != nil {
	return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
	return
}
_, _ = q.FindChain(context.Background(), chainID)

Queue.Pause

Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences.

go
q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
	_ = q.Pause(context.Background(), "default")
}

Prune

Prune deletes old workflow state records.

go
q, err := queue.NewSync()
if err != nil {
	return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))

Queue.Register

Register binds a handler for a high-level job type.

go
q, err := queue.NewSync()
if err != nil {
	return
}
type EmailPayload struct {
	ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
	var payload EmailPayload
	if err := m.Bind(&payload); err != nil {
		return err
	}
	_ = payload
	return nil
})

Queue.Resume

Resume resumes consumption for a queue when supported by the underlying driver.

go
q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsPause(q) {
	_ = q.Resume(context.Background(), "default")
}

Run

Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down.

go
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
	time.Sleep(100 * time.Millisecond)
	cancel()
}()
_ = q.Run(ctx)

Queue.Shutdown

Shutdown drains workers and closes underlying resources.

go
q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())

Queue.StartWorkers

StartWorkers starts worker processing.

go
q, err := queue.NewWorkerpool()
if err != nil {
	return
}
_ = q.StartWorkers(context.Background())

Stats

Stats returns a normalized snapshot when supported by the underlying driver.

go
q, err := queue.NewSync()
if err != nil {
	return
}
if queue.SupportsNativeStats(q) {
	_, _ = q.Stats(context.Background())
}

WithClock

WithClock overrides the workflow runtime clock.

go
q, err := queue.New(
	queue.Config{Driver: queue.DriverSync},
	queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
	return
}

WithMiddleware

WithMiddleware appends queue workflow middleware.

go
mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
	return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
	return
}

WithObserver

WithObserver installs a workflow lifecycle observer.

go
observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
	_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
	return
}

WithStore

WithStore overrides the workflow orchestration store.

go
var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
	return
}

WithWorkers

WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync).

go
q, err := queue.NewWorkerpool(
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

Queue.WithWorkers

WithWorkers sets desired worker concurrency before StartWorkers.

go
q, err := queue.NewWorkerpool()
if err != nil {
	return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)

Testing

FakeQueue.AssertCount

AssertCount fails when dispatch count is not expected.

go
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)

FakeQueue.AssertDispatched

AssertDispatched fails when jobType was not dispatched.

go
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")

FakeQueue.AssertDispatchedOn

AssertDispatchedOn fails when jobType was not dispatched on queueName.

go
fake := queue.NewFake()
_ = fake.Dispatch(
	queue.NewJob("emails:send").
		OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")

FakeQueue.AssertDispatchedTimes

AssertDispatchedTimes fails when jobType dispatch count does not match expected.

go
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)

FakeQueue.AssertNotDispatched

AssertNotDispatched fails when jobType was dispatched.

go
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")

FakeQueue.AssertNothingDispatched

AssertNothingDispatched fails when any dispatch was recorded.

go
fake := queue.NewFake()
fake.AssertNothingDispatched(t)

FakeQueue.Dispatch

Dispatch records a typed job payload in-memory using the fake default queue.

go
fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))

FakeQueue.DispatchCtx

DispatchCtx submits a typed job payload using the provided context.

go
fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: true

FakeQueue.Driver

Driver returns the active queue driver.

go
fake := queue.NewFake()
driver := fake.Driver()

NewFake

NewFake creates a queue fake that records dispatches and provides assertions.

go
fake := queue.NewFake()
_ = fake.Dispatch(
	queue.NewJob("emails:send").
		Payload(map[string]any{"id": 1}).
		OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:send

FakeQueue.Records

Records returns a copy of all dispatch records.

go
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:send

FakeQueue.Register

Register associates a handler with a job type.

go
fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })

FakeQueue.Reset

Reset clears all recorded dispatches.

go
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0

FakeQueue.Shutdown

Shutdown drains running work and releases resources.

go
fake := queue.NewFake()
err := fake.Shutdown(context.Background())

FakeQueue.StartWorkers

StartWorkers starts worker execution.

go
fake := queue.NewFake()
err := fake.StartWorkers(context.Background())

FakeQueue.Workers

Workers sets desired worker concurrency before StartWorkers.

go
fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: true

Driver Constructors

mysqlqueue

mysqlqueue.New

New creates a high-level Queue using the MySQL SQL backend.

go
q, err := mysqlqueue.New(
	"user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true",
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

mysqlqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit MySQL SQL driver config.

go
q, err := mysqlqueue.NewWithConfig(
	mysqlqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		DB: nil, // optional; provide *sql.DB instead of DSN
		DSN: "user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true", // optional if DB is set
		ProcessingRecoveryGrace:  2 * time.Second, // default if <=0: 2s
		ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
	},
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

natsqueue

natsqueue.New

New creates a high-level Queue using the NATS backend.

go
q, err := natsqueue.New(
	"nats://127.0.0.1:4222",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

natsqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit NATS driver config.

go
q, err := natsqueue.NewWithConfig(
	natsqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		URL: "nats://127.0.0.1:4222", // required
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

postgresqueue

postgresqueue.New

New creates a high-level Queue using the Postgres SQL backend.

go
q, err := postgresqueue.New(
	"postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable",
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

postgresqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit Postgres SQL driver config.

go
q, err := postgresqueue.NewWithConfig(
	postgresqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		DB: nil, // optional; provide *sql.DB instead of DSN
		DSN: "postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable", // optional if DB is set
		ProcessingRecoveryGrace:  2 * time.Second, // default if <=0: 2s
		ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
	},
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

rabbitmqqueue

rabbitmqqueue.New

New creates a high-level Queue using the RabbitMQ backend.

go
q, err := rabbitmqqueue.New(
	"amqp://guest:guest@127.0.0.1:5672/",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

rabbitmqqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit RabbitMQ driver config.

go
q, err := rabbitmqqueue.NewWithConfig(
	rabbitmqqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		URL: "amqp://guest:guest@127.0.0.1:5672/", // required
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

redisqueue

redisqueue.New

New creates a high-level Queue using the Redis backend.

go
q, err := redisqueue.New(
	"127.0.0.1:6379",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

redisqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit Redis driver config.

go
q, err := redisqueue.NewWithConfig(
	redisqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		Addr: "127.0.0.1:6379", // required
		Password: "",           // optional; default empty
		DB: 0,                  // optional; default 0
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

sqlitequeue

sqlitequeue.New

New creates a high-level Queue using the SQLite SQL backend.

go
q, err := sqlitequeue.New(
	"file:queue.db?_busy_timeout=5000",
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

sqlitequeue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit SQLite SQL driver config.

go
q, err := sqlitequeue.NewWithConfig(
	sqlitequeue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		DB: nil, // optional; provide *sql.DB instead of DSN
		DSN: "file:queue.db?_busy_timeout=5000", // optional if DB is set
		ProcessingRecoveryGrace:  2 * time.Second, // default if <=0: 2s
		ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
	},
	queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
	return
}

sqsqueue

sqsqueue.New

New creates a high-level Queue using the SQS backend.

go
q, err := sqsqueue.New(
	"us-east-1",
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

sqsqueue.NewWithConfig

NewWithConfig creates a high-level Queue using an explicit SQS driver config.

go
q, err := sqsqueue.NewWithConfig(
	sqsqueue.Config{
		DriverBaseConfig: queueconfig.DriverBaseConfig{
			DefaultQueue: "critical", // default if empty: "default"
			Observer:     nil,        // default: nil
		},
		Region: "us-east-1", // default if empty: "us-east-1"
		Endpoint: "",        // optional; set for LocalStack/custom endpoint
		AccessKey: "",       // optional; static credentials
		SecretKey: "",       // optional; static credentials
	},
	queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
	return
}

Testing API

Examples in this section assume they are used inside tests and t is a *testing.T (or testing.TB).

Fake.AssertBatchCount

AssertBatchCount fails if total recorded workflow batch count does not match n.

go
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertBatchCount(t, 1)

Fake.AssertBatched

AssertBatched fails unless at least one recorded workflow batch matches predicate.

go
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertBatched(t, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })

Fake.AssertChained

AssertChained fails if no recorded workflow chain matches expected job type order.

go
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertChained(t, []string{"a", "b"})

Fake.AssertCount

AssertCount fails when total dispatch count is not expected.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
f.AssertCount(t, 2)

Fake.AssertDispatched

AssertDispatched fails when jobType was not dispatched.

go
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
f.AssertDispatched(t, "emails:send")

Fake.AssertDispatchedOn

AssertDispatchedOn fails when jobType was not dispatched on queueName.

go
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
f.AssertDispatchedOn(t, "critical", "emails:send")

Fake.AssertDispatchedTimes

AssertDispatchedTimes fails when jobType dispatch count does not match expected.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
f.AssertDispatchedTimes(t, "emails:send", 2)

Fake.AssertNotDispatched

AssertNotDispatched fails when jobType was dispatched.

go
f := queuefake.New()
f.AssertNotDispatched(t, "emails:send")

Fake.AssertNothingBatched

AssertNothingBatched fails if any workflow batch was recorded.

go
f := queuefake.New()
f.AssertNothingBatched(t)

Fake.AssertNothingDispatched

AssertNothingDispatched fails when any dispatch was recorded.

go
f := queuefake.New()
f.AssertNothingDispatched(t)

Fake.AssertNothingWorkflowDispatched

AssertNothingWorkflowDispatched fails when any workflow dispatch was recorded.

go
f := queuefake.New()
f.AssertNothingWorkflowDispatched(t)

Fake.AssertWorkflowDispatched

AssertWorkflowDispatched fails when jobType was not workflow-dispatched.

go
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatched(t, "a")

Fake.AssertWorkflowDispatchedOn

AssertWorkflowDispatchedOn fails when jobType was not workflow-dispatched on queueName.

go
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(nil)
f.AssertWorkflowDispatchedOn(t, "critical", "a")

Fake.AssertWorkflowDispatchedTimes

AssertWorkflowDispatchedTimes fails when workflow dispatch count for jobType does not match expected.

go
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatchedTimes(t, "a", 2)

Fake.AssertWorkflowNotDispatched

AssertWorkflowNotDispatched fails when jobType was workflow-dispatched.

go
f := queuefake.New()
f.AssertWorkflowNotDispatched(t, "emails:send")

Fake.Count

Count returns the total number of recorded dispatches.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
_ = f.Count()

Fake.CountJob

CountJob returns how many times a job type was dispatched.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = f.CountJob("emails:send")

Fake.CountOn

CountOn returns how many times a job type was dispatched on a queue.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
_ = f.CountOn("critical", "emails:send")

queuefake.New

New creates a fake queue harness backed by queue.NewFake().

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
f.AssertDispatched(t, "emails:send")
f.AssertCount(t, 1)

Fake.Queue

Queue returns the queue fake to inject into code under test.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))

Fake.Records

Records returns a copy of recorded dispatches.

go
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
records := f.Records()

Fake.Reset

Reset clears recorded dispatches.

go
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
f.Reset()
f.AssertNothingDispatched(t)

Fake.Workflow

Workflow returns the workflow/orchestration fake for chain/batch assertions.

go
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(
	bus.NewJob("a", nil),
	bus.NewJob("b", nil),
).Dispatch(context.Background())
f.AssertChained(t, []string{"a", "b"})

Contributing

Testing

Unit tests (root module):

bash
go test ./...

Integration tests (separate integration module):

bash
go test -tags=integration ./integration/...

Select specific backends with INTEGRATION_BACKEND (comma-separated), for example:

bash
INTEGRATION_BACKEND=sqlite go test -tags=integration ./integration/...
INTEGRATION_BACKEND=redis,rabbitmq go test -tags=integration ./integration/... -count=1
INTEGRATION_BACKEND=all go test -tags=integration ./integration/... -count=1

Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.