Files
ntfy/db/db.go
binwiederhier 9eaadd74cf Log
2026-03-11 22:09:00 -04:00

151 lines
3.9 KiB
Go

package db
import (
"context"
"database/sql"
"sync/atomic"
"time"
"heckel.io/ntfy/v2/log"
)
const (
tag = "db"
replicaHealthCheckInitialDelay = 5 * time.Second
replicaHealthCheckInterval = 30 * time.Second
replicaHealthCheckTimeout = 10 * time.Second
)
// Beginner is an interface for types that can begin a database transaction.
// Both *sql.DB and *DB implement this.
type Beginner interface {
Begin() (*sql.Tx, error)
}
// DB wraps a primary *sql.DB and optional read replicas. All standard query/exec methods
// delegate to the primary. The ReadOnly() method returns a *sql.DB from a healthy replica
// (round-robin), falling back to the primary if no replicas are configured or all are unhealthy.
type DB struct {
primary *Host
replicas []*Host
counter atomic.Uint64
cancel context.CancelFunc
}
// Host pairs a *sql.DB with the host:port it was opened against.
type Host struct {
Addr string // "host:port"
DB *sql.DB
healthy atomic.Bool
}
// New creates a new DB that wraps the given primary and optional replica connections.
// If replicas is nil or empty, ReadOnly() simply returns the primary.
// Replicas start unhealthy and are checked immediately by a background goroutine.
func New(primary *Host, replicas []*Host) *DB {
ctx, cancel := context.WithCancel(context.Background())
d := &DB{
primary: primary,
replicas: replicas,
cancel: cancel,
}
if len(d.replicas) > 0 {
go d.healthCheckLoop(ctx)
}
return d
}
// Primary returns the underlying primary *sql.DB. This is only intended for
// one-time schema setup during store initialization, not for regular queries.
func (d *DB) Primary() *sql.DB {
return d.primary.DB
}
// Query delegates to the primary database.
func (d *DB) Query(query string, args ...any) (*sql.Rows, error) {
return d.primary.DB.Query(query, args...)
}
// QueryRow delegates to the primary database.
func (d *DB) QueryRow(query string, args ...any) *sql.Row {
return d.primary.DB.QueryRow(query, args...)
}
// Exec delegates to the primary database.
func (d *DB) Exec(query string, args ...any) (sql.Result, error) {
return d.primary.DB.Exec(query, args...)
}
// Begin delegates to the primary database.
func (d *DB) Begin() (*sql.Tx, error) {
return d.primary.DB.Begin()
}
// Ping delegates to the primary database.
func (d *DB) Ping() error {
return d.primary.DB.Ping()
}
// Close closes the primary database and all replicas, and stops the health-check goroutine.
func (d *DB) Close() error {
d.cancel()
for _, r := range d.replicas {
r.DB.Close()
}
return d.primary.DB.Close()
}
// ReadOnly returns a *sql.DB suitable for read-only queries. It round-robins across healthy
// replicas. If all replicas are unhealthy or none are configured, the primary is returned.
func (d *DB) ReadOnly() *sql.DB {
if len(d.replicas) == 0 {
return d.primary.DB
}
n := len(d.replicas)
start := int(d.counter.Add(1) - 1)
for i := 0; i < n; i++ {
r := d.replicas[(start+i)%n]
if r.healthy.Load() {
return r.DB
}
}
return d.primary.DB
}
// healthCheckLoop checks replicas immediately, then periodically on a ticker.
func (d *DB) healthCheckLoop(ctx context.Context) {
select {
case <-ctx.Done():
return
case <-time.After(replicaHealthCheckInitialDelay):
d.checkReplicas(ctx)
}
for {
select {
case <-ctx.Done():
return
case <-time.After(replicaHealthCheckInterval):
d.checkReplicas(ctx)
}
}
}
// checkReplicas pings each replica with a timeout and updates its health status.
func (d *DB) checkReplicas(ctx context.Context) {
for _, r := range d.replicas {
wasHealthy := r.healthy.Load()
pingCtx, cancel := context.WithTimeout(ctx, replicaHealthCheckTimeout)
err := r.DB.PingContext(pingCtx)
cancel()
if err != nil {
r.healthy.Store(false)
log.Tag(tag).Error("Database replica %s is unhealthy: %s", r.Addr, err)
} else {
r.healthy.Store(true)
if !wasHealthy {
log.Tag(tag).Info("Database replica %s is healthy", r.Addr)
}
}
}
}