From ab33ac7ae5c8036e1fff3afb4d68ad244ab49bd2 Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Wed, 11 Mar 2026 11:58:40 -0400 Subject: [PATCH] Refine --- db/db.go | 54 +++++++++++---------------------------- db/util.go | 36 ++++++++++++++++++++++++++ docs/config.md | 47 ++++++++++++++++++++++++++-------- docs/releases.md | 4 +++ message/cache_postgres.go | 2 +- test/server.go | 15 +++++++++-- user/manager_postgres.go | 2 +- webpush/store_postgres.go | 2 +- 8 files changed, 107 insertions(+), 55 deletions(-) create mode 100644 db/util.go diff --git a/db/db.go b/db/db.go index f2a539e5..f27632c6 100644 --- a/db/db.go +++ b/db/db.go @@ -4,6 +4,8 @@ import ( "database/sql" "sync/atomic" "time" + + "heckel.io/ntfy/v2/log" ) const ( @@ -46,6 +48,12 @@ func NewDB(primary *sql.DB, replicas []*sql.DB) *DB { return d } +// Primary returns the underlying primary *sql.DB. This is only intended for +// one-time schema setup during store initialization, not for regular queries. +func (d *DB) Primary() *sql.DB { + return d.primary +} + // Query delegates to the primary database. func (d *DB) Query(query string, args ...any) (*sql.Rows, error) { return d.primary.Query(query, args...) @@ -79,12 +87,6 @@ func (d *DB) Close() error { return d.primary.Close() } -// SetupPrimary returns the underlying primary *sql.DB. This is only intended for -// one-time schema setup during store initialization, not for regular queries. -func (d *DB) SetupPrimary() *sql.DB { - return d.primary -} - // ReadOnly returns a *sql.DB suitable for read-only queries. It round-robins across healthy // replicas. If a replica's health status is stale (older than replicaHealthCheckInterval), it // is re-checked with a ping. If all replicas are unhealthy or none are configured, the primary @@ -111,46 +113,20 @@ func (d *DB) isHealthy(r *replica) bool { lastChecked := r.lastChecked.Load() if now-lastChecked >= int64(replicaHealthCheckInterval.Seconds()) { if r.lastChecked.CompareAndSwap(lastChecked, now) { + wasHealthy := r.healthy.Load() if err := r.db.Ping(); err != nil { r.healthy.Store(false) + if wasHealthy { + log.Error("Database replica is now unhealthy: %s", err) + } return false } r.healthy.Store(true) + if !wasHealthy { + log.Info("Database replica is now healthy again") + } return true } } return r.healthy.Load() } - -// ExecTx executes a function within a database transaction. If the function returns an error, -// the transaction is rolled back. Otherwise, the transaction is committed. -func ExecTx(db Beginner, f func(tx *sql.Tx) error) error { - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - if err := f(tx); err != nil { - return err - } - return tx.Commit() -} - -// QueryTx executes a function within a database transaction and returns the result. If the function -// returns an error, the transaction is rolled back. Otherwise, the transaction is committed. -func QueryTx[T any](db Beginner, f func(tx *sql.Tx) (T, error)) (T, error) { - tx, err := db.Begin() - if err != nil { - var zero T - return zero, err - } - defer tx.Rollback() - t, err := f(tx) - if err != nil { - return t, err - } - if err := tx.Commit(); err != nil { - return t, err - } - return t, nil -} diff --git a/db/util.go b/db/util.go new file mode 100644 index 00000000..4621cb38 --- /dev/null +++ b/db/util.go @@ -0,0 +1,36 @@ +package db + +import "database/sql" + +// ExecTx executes a function within a database transaction. If the function returns an error, +// the transaction is rolled back. Otherwise, the transaction is committed. +func ExecTx(db Beginner, f func(tx *sql.Tx) error) error { + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + if err := f(tx); err != nil { + return err + } + return tx.Commit() +} + +// QueryTx executes a function within a database transaction and returns the result. If the function +// returns an error, the transaction is rolled back. Otherwise, the transaction is committed. +func QueryTx[T any](db Beginner, f func(tx *sql.Tx) (T, error)) (T, error) { + tx, err := db.Begin() + if err != nil { + var zero T + return zero, err + } + defer tx.Rollback() + t, err := f(tx) + if err != nil { + return t, err + } + if err := tx.Commit(); err != nil { + return t, err + } + return t, nil +} diff --git a/docs/config.md b/docs/config.md index a202cd95..8a2f965e 100644 --- a/docs/config.md +++ b/docs/config.md @@ -149,11 +149,7 @@ no external dependencies: ### PostgreSQL (EXPERIMENTAL) As an alternative, you can configure ntfy to use PostgreSQL for **all** database-backed stores by setting the -`database-url` option to a PostgreSQL connection string: - -```yaml -database-url: "postgres://user:pass@host:5432/ntfy" -``` +`database-url` option to a PostgreSQL connection string. When `database-url` is set, ntfy will use PostgreSQL for the [message cache](#message-cache), [access control](#access-control), and [web push](#web-push) subscriptions instead of SQLite. The `cache-file`, @@ -165,11 +161,44 @@ topics. To restrict access, set `auth-default-access` to `deny-all` (see [access You can also set this via the environment variable `NTFY_DATABASE_URL` or the command line flag `--database-url`. +To offload read-heavy queries from the primary database, you can optionally configure one or more read replicas +using the `database-replica-urls` option. When configured, non-critical read-only queries (e.g. fetching messages, checking access permissions, etc) +are distributed across the replicas using round-robin, while all writes and correctness-critical reads continue to go +to the primary. If a replica becomes unhealthy, ntfy automatically falls back to the primary until the replica recovers. +You can also set this via the environment variable `NTFY_DATABASE_REPLICA_URLS` (comma-separated) or the command line +flag `--database-replica-urls`. + +Examples: + +=== "Simple" + ```yaml + database-url: "postgres://user:pass@host:5432/ntfy" + ``` + +=== "With SSL and pool tuning" + ```yaml + database-url: "postgres://user:pass@host:5432/ntfy?sslmode=require&pool_max_conns=50&pool_conn_max_idle_time=5m" + ``` + +=== "With CA certificate" + ```yaml + database-url: "postgres://user:pass@host:25060/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30" + ``` + +=== "With read replicas" + ```yaml + database-url: "postgres://user:pass@primary:5432/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30" + database-replica-urls: + - "postgres://user:pass@replica1:5432/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30" + - "postgres://user:pass@replica2:5432/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30" + ``` + The database URL supports the standard [PostgreSQL connection parameters](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) as query parameters, such as `sslmode`, `connect_timeout`, `sslcert`, `sslkey`, `sslrootcert`, and `application_name`. See the [pgx driver documentation](https://pkg.go.dev/github.com/jackc/pgx/v5) for the full list of supported parameters. -In addition, ntfy supports the following custom query parameters to tune the connection pool: +In addition, ntfy supports the following custom query parameters to tune the connection pool (these apply to both +the primary and replica URLs): | Parameter | Default | Description | |---------------------------|---------|----------------------------------------------------------------------------------| @@ -178,11 +207,6 @@ In addition, ntfy supports the following custom query parameters to tune the con | `pool_conn_max_lifetime` | - | Maximum amount of time a connection may be reused (Go duration, e.g. `5m`, `1h`) | | `pool_conn_max_idle_time` | - | Maximum amount of time a connection may be idle (Go duration, e.g. `30s`, `5m`) | -Example: - -```yaml -database-url: "postgres://user:pass@host:5432/ntfy?sslmode=require&pool_max_conns=50&pool_conn_max_idle_time=5m" -``` ## Message cache If desired, ntfy can temporarily keep notifications in an in-memory or an on-disk cache. Caching messages for a short period @@ -1819,6 +1843,7 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`). | `cert-file` | `NTFY_CERT_FILE` | *filename* | - | HTTPS/TLS certificate file, only used if `listen-https` is set. | | `firebase-key-file` | `NTFY_FIREBASE_KEY_FILE` | *filename* | - | If set, also publish messages to a Firebase Cloud Messaging (FCM) topic for your app. This is optional and only required to save battery when using the Android app. See [Firebase (FCM)](#firebase-fcm). | | `database-url` | `NTFY_DATABASE_URL` | *string (connection URL)* | - | PostgreSQL connection string (e.g. `postgres://user:pass@host:5432/ntfy`). If set, uses PostgreSQL for all database-backed stores (message cache, user manager, web push) instead of SQLite. See [database options](#database-options). | +| `database-replica-urls` | `NTFY_DATABASE_REPLICA_URLS` | *list of strings (connection URLs)* | - | PostgreSQL read replica connection strings. Non-critical read-only queries are distributed across replicas (round-robin) with automatic fallback to primary. Requires `database-url`. See [read replicas](#read-replicas). | | `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). | | `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. | | `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#message-cache) | diff --git a/docs/releases.md b/docs/releases.md index 15018d88..6ca2b86f 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1757,6 +1757,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release ### ntfy server v2.19.x (UNRELEASED) +**Features:** + +* Support PostgreSQL read replicas for offloading non-critical read queries via `database-replica-urls` config option + **Bug fixes + maintenance:** * Web: Throttle notification sound in web app to play at most once every 2 seconds (similar to [#1550](https://github.com/binwiederhier/ntfy/issues/1550), thanks to [@jlaffaye](https://github.com/jlaffaye) for reporting) diff --git a/message/cache_postgres.go b/message/cache_postgres.go index 5b7a0293..ba162da2 100644 --- a/message/cache_postgres.go +++ b/message/cache_postgres.go @@ -104,7 +104,7 @@ var postgresQueries = queries{ // NewPostgresStore creates a new PostgreSQL-backed message cache store using an existing database connection pool. func NewPostgresStore(d *db.DB, batchSize int, batchTimeout time.Duration) (*Cache, error) { - if err := setupPostgres(d.SetupPrimary()); err != nil { + if err := setupPostgres(d.Primary()); err != nil { return nil, err } return newCache(d, postgresQueries, nil, batchSize, batchTimeout, false), nil diff --git a/test/server.go b/test/server.go index 5398cf9e..21e3af78 100644 --- a/test/server.go +++ b/test/server.go @@ -3,7 +3,7 @@ package test import ( "fmt" "heckel.io/ntfy/v2/server" - "math/rand" + "net" "net/http" "path/filepath" "testing" @@ -16,7 +16,7 @@ func StartServer(t *testing.T) (*server.Server, int) { // StartServerWithConfig starts a server.Server with a random port and waits for the server to be up func StartServerWithConfig(t *testing.T, conf *server.Config) (*server.Server, int) { - port := 10000 + rand.Intn(30000) + port := findAvailablePort(t) conf.ListenHTTP = fmt.Sprintf(":%d", port) conf.AttachmentCacheDir = t.TempDir() conf.CacheFile = filepath.Join(t.TempDir(), "cache.db") @@ -33,6 +33,17 @@ func StartServerWithConfig(t *testing.T, conf *server.Config) (*server.Server, i return s, port } +// findAvailablePort asks the OS for a free port by binding to :0 +func findAvailablePort(t *testing.T) int { + listener, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + port := listener.Addr().(*net.TCPAddr).Port + listener.Close() + return port +} + // StopServer stops the test server and waits for the port to be down func StopServer(t *testing.T, s *server.Server, port int) { s.Stop() diff --git a/user/manager_postgres.go b/user/manager_postgres.go index 7a332070..77c35ece 100644 --- a/user/manager_postgres.go +++ b/user/manager_postgres.go @@ -279,7 +279,7 @@ var postgresQueries = queries{ // NewPostgresManager creates a new Manager backed by a PostgreSQL database func NewPostgresManager(d *db.DB, config *Config) (*Manager, error) { - if err := setupPostgres(d.SetupPrimary()); err != nil { + if err := setupPostgres(d.Primary()); err != nil { return nil, err } return newManager(d, postgresQueries, config) diff --git a/webpush/store_postgres.go b/webpush/store_postgres.go index cce9ec73..a8404af3 100644 --- a/webpush/store_postgres.go +++ b/webpush/store_postgres.go @@ -74,7 +74,7 @@ const ( // NewPostgresStore creates a new PostgreSQL-backed web push store using an existing database connection pool. func NewPostgresStore(d *ntfydb.DB) (*Store, error) { - if err := setupPostgres(d.SetupPrimary()); err != nil { + if err := setupPostgres(d.Primary()); err != nil { return nil, err } return &Store{