This commit is contained in:
binwiederhier
2026-03-11 11:58:40 -04:00
parent f1865749d7
commit ab33ac7ae5
8 changed files with 107 additions and 55 deletions

View File

@@ -4,6 +4,8 @@ import (
"database/sql" "database/sql"
"sync/atomic" "sync/atomic"
"time" "time"
"heckel.io/ntfy/v2/log"
) )
const ( const (
@@ -46,6 +48,12 @@ func NewDB(primary *sql.DB, replicas []*sql.DB) *DB {
return d return d
} }
// Primary returns the underlying primary *sql.DB. This is only intended for
// one-time schema setup during store initialization, not for regular queries.
func (d *DB) Primary() *sql.DB {
return d.primary
}
// Query delegates to the primary database. // Query delegates to the primary database.
func (d *DB) Query(query string, args ...any) (*sql.Rows, error) { func (d *DB) Query(query string, args ...any) (*sql.Rows, error) {
return d.primary.Query(query, args...) return d.primary.Query(query, args...)
@@ -79,12 +87,6 @@ func (d *DB) Close() error {
return d.primary.Close() return d.primary.Close()
} }
// SetupPrimary returns the underlying primary *sql.DB. This is only intended for
// one-time schema setup during store initialization, not for regular queries.
func (d *DB) SetupPrimary() *sql.DB {
return d.primary
}
// ReadOnly returns a *sql.DB suitable for read-only queries. It round-robins across healthy // ReadOnly returns a *sql.DB suitable for read-only queries. It round-robins across healthy
// replicas. If a replica's health status is stale (older than replicaHealthCheckInterval), it // replicas. If a replica's health status is stale (older than replicaHealthCheckInterval), it
// is re-checked with a ping. If all replicas are unhealthy or none are configured, the primary // is re-checked with a ping. If all replicas are unhealthy or none are configured, the primary
@@ -111,46 +113,20 @@ func (d *DB) isHealthy(r *replica) bool {
lastChecked := r.lastChecked.Load() lastChecked := r.lastChecked.Load()
if now-lastChecked >= int64(replicaHealthCheckInterval.Seconds()) { if now-lastChecked >= int64(replicaHealthCheckInterval.Seconds()) {
if r.lastChecked.CompareAndSwap(lastChecked, now) { if r.lastChecked.CompareAndSwap(lastChecked, now) {
wasHealthy := r.healthy.Load()
if err := r.db.Ping(); err != nil { if err := r.db.Ping(); err != nil {
r.healthy.Store(false) r.healthy.Store(false)
if wasHealthy {
log.Error("Database replica is now unhealthy: %s", err)
}
return false return false
} }
r.healthy.Store(true) r.healthy.Store(true)
if !wasHealthy {
log.Info("Database replica is now healthy again")
}
return true return true
} }
} }
return r.healthy.Load() return r.healthy.Load()
} }
// ExecTx executes a function within a database transaction. If the function returns an error,
// the transaction is rolled back. Otherwise, the transaction is committed.
func ExecTx(db Beginner, f func(tx *sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := f(tx); err != nil {
return err
}
return tx.Commit()
}
// QueryTx executes a function within a database transaction and returns the result. If the function
// returns an error, the transaction is rolled back. Otherwise, the transaction is committed.
func QueryTx[T any](db Beginner, f func(tx *sql.Tx) (T, error)) (T, error) {
tx, err := db.Begin()
if err != nil {
var zero T
return zero, err
}
defer tx.Rollback()
t, err := f(tx)
if err != nil {
return t, err
}
if err := tx.Commit(); err != nil {
return t, err
}
return t, nil
}

36
db/util.go Normal file
View File

@@ -0,0 +1,36 @@
package db
import "database/sql"
// ExecTx executes a function within a database transaction. If the function returns an error,
// the transaction is rolled back. Otherwise, the transaction is committed.
func ExecTx(db Beginner, f func(tx *sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := f(tx); err != nil {
return err
}
return tx.Commit()
}
// QueryTx executes a function within a database transaction and returns the result. If the function
// returns an error, the transaction is rolled back. Otherwise, the transaction is committed.
func QueryTx[T any](db Beginner, f func(tx *sql.Tx) (T, error)) (T, error) {
tx, err := db.Begin()
if err != nil {
var zero T
return zero, err
}
defer tx.Rollback()
t, err := f(tx)
if err != nil {
return t, err
}
if err := tx.Commit(); err != nil {
return t, err
}
return t, nil
}

View File

@@ -149,11 +149,7 @@ no external dependencies:
### PostgreSQL (EXPERIMENTAL) ### PostgreSQL (EXPERIMENTAL)
As an alternative, you can configure ntfy to use PostgreSQL for **all** database-backed stores by setting the As an alternative, you can configure ntfy to use PostgreSQL for **all** database-backed stores by setting the
`database-url` option to a PostgreSQL connection string: `database-url` option to a PostgreSQL connection string.
```yaml
database-url: "postgres://user:pass@host:5432/ntfy"
```
When `database-url` is set, ntfy will use PostgreSQL for the [message cache](#message-cache), When `database-url` is set, ntfy will use PostgreSQL for the [message cache](#message-cache),
[access control](#access-control), and [web push](#web-push) subscriptions instead of SQLite. The `cache-file`, [access control](#access-control), and [web push](#web-push) subscriptions instead of SQLite. The `cache-file`,
@@ -165,11 +161,44 @@ topics. To restrict access, set `auth-default-access` to `deny-all` (see [access
You can also set this via the environment variable `NTFY_DATABASE_URL` or the command line flag `--database-url`. You can also set this via the environment variable `NTFY_DATABASE_URL` or the command line flag `--database-url`.
To offload read-heavy queries from the primary database, you can optionally configure one or more read replicas
using the `database-replica-urls` option. When configured, non-critical read-only queries (e.g. fetching messages, checking access permissions, etc)
are distributed across the replicas using round-robin, while all writes and correctness-critical reads continue to go
to the primary. If a replica becomes unhealthy, ntfy automatically falls back to the primary until the replica recovers.
You can also set this via the environment variable `NTFY_DATABASE_REPLICA_URLS` (comma-separated) or the command line
flag `--database-replica-urls`.
Examples:
=== "Simple"
```yaml
database-url: "postgres://user:pass@host:5432/ntfy"
```
=== "With SSL and pool tuning"
```yaml
database-url: "postgres://user:pass@host:5432/ntfy?sslmode=require&pool_max_conns=50&pool_conn_max_idle_time=5m"
```
=== "With CA certificate"
```yaml
database-url: "postgres://user:pass@host:25060/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30"
```
=== "With read replicas"
```yaml
database-url: "postgres://user:pass@primary:5432/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30"
database-replica-urls:
- "postgres://user:pass@replica1:5432/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30"
- "postgres://user:pass@replica2:5432/ntfy?sslmode=require&sslrootcert=/etc/ntfy/db-ca-cert.pem&pool_max_conns=30"
```
The database URL supports the standard [PostgreSQL connection parameters](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS) The database URL supports the standard [PostgreSQL connection parameters](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS)
as query parameters, such as `sslmode`, `connect_timeout`, `sslcert`, `sslkey`, `sslrootcert`, and `application_name`. as query parameters, such as `sslmode`, `connect_timeout`, `sslcert`, `sslkey`, `sslrootcert`, and `application_name`.
See the [pgx driver documentation](https://pkg.go.dev/github.com/jackc/pgx/v5) for the full list of supported parameters. See the [pgx driver documentation](https://pkg.go.dev/github.com/jackc/pgx/v5) for the full list of supported parameters.
In addition, ntfy supports the following custom query parameters to tune the connection pool: In addition, ntfy supports the following custom query parameters to tune the connection pool (these apply to both
the primary and replica URLs):
| Parameter | Default | Description | | Parameter | Default | Description |
|---------------------------|---------|----------------------------------------------------------------------------------| |---------------------------|---------|----------------------------------------------------------------------------------|
@@ -178,11 +207,6 @@ In addition, ntfy supports the following custom query parameters to tune the con
| `pool_conn_max_lifetime` | - | Maximum amount of time a connection may be reused (Go duration, e.g. `5m`, `1h`) | | `pool_conn_max_lifetime` | - | Maximum amount of time a connection may be reused (Go duration, e.g. `5m`, `1h`) |
| `pool_conn_max_idle_time` | - | Maximum amount of time a connection may be idle (Go duration, e.g. `30s`, `5m`) | | `pool_conn_max_idle_time` | - | Maximum amount of time a connection may be idle (Go duration, e.g. `30s`, `5m`) |
Example:
```yaml
database-url: "postgres://user:pass@host:5432/ntfy?sslmode=require&pool_max_conns=50&pool_conn_max_idle_time=5m"
```
## Message cache ## Message cache
If desired, ntfy can temporarily keep notifications in an in-memory or an on-disk cache. Caching messages for a short period If desired, ntfy can temporarily keep notifications in an in-memory or an on-disk cache. Caching messages for a short period
@@ -1819,6 +1843,7 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`).
| `cert-file` | `NTFY_CERT_FILE` | *filename* | - | HTTPS/TLS certificate file, only used if `listen-https` is set. | | `cert-file` | `NTFY_CERT_FILE` | *filename* | - | HTTPS/TLS certificate file, only used if `listen-https` is set. |
| `firebase-key-file` | `NTFY_FIREBASE_KEY_FILE` | *filename* | - | If set, also publish messages to a Firebase Cloud Messaging (FCM) topic for your app. This is optional and only required to save battery when using the Android app. See [Firebase (FCM)](#firebase-fcm). | | `firebase-key-file` | `NTFY_FIREBASE_KEY_FILE` | *filename* | - | If set, also publish messages to a Firebase Cloud Messaging (FCM) topic for your app. This is optional and only required to save battery when using the Android app. See [Firebase (FCM)](#firebase-fcm). |
| `database-url` | `NTFY_DATABASE_URL` | *string (connection URL)* | - | PostgreSQL connection string (e.g. `postgres://user:pass@host:5432/ntfy`). If set, uses PostgreSQL for all database-backed stores (message cache, user manager, web push) instead of SQLite. See [database options](#database-options). | | `database-url` | `NTFY_DATABASE_URL` | *string (connection URL)* | - | PostgreSQL connection string (e.g. `postgres://user:pass@host:5432/ntfy`). If set, uses PostgreSQL for all database-backed stores (message cache, user manager, web push) instead of SQLite. See [database options](#database-options). |
| `database-replica-urls` | `NTFY_DATABASE_REPLICA_URLS` | *list of strings (connection URLs)* | - | PostgreSQL read replica connection strings. Non-critical read-only queries are distributed across replicas (round-robin) with automatic fallback to primary. Requires `database-url`. See [read replicas](#read-replicas). |
| `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). | | `cache-file` | `NTFY_CACHE_FILE` | *filename* | - | If set, messages are cached in a local SQLite database instead of only in-memory. This allows for service restarts without losing messages in support of the since= parameter. See [message cache](#message-cache). |
| `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. | | `cache-duration` | `NTFY_CACHE_DURATION` | *duration* | 12h | Duration for which messages will be buffered before they are deleted. This is required to support the `since=...` and `poll=1` parameter. Set this to `0` to disable the cache entirely. |
| `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#message-cache) | | `cache-startup-queries` | `NTFY_CACHE_STARTUP_QUERIES` | *string (SQL queries)* | - | SQL queries to run during database startup; this is useful for tuning and [enabling WAL mode](#message-cache) |

View File

@@ -1757,6 +1757,10 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
### ntfy server v2.19.x (UNRELEASED) ### ntfy server v2.19.x (UNRELEASED)
**Features:**
* Support PostgreSQL read replicas for offloading non-critical read queries via `database-replica-urls` config option
**Bug fixes + maintenance:** **Bug fixes + maintenance:**
* Web: Throttle notification sound in web app to play at most once every 2 seconds (similar to [#1550](https://github.com/binwiederhier/ntfy/issues/1550), thanks to [@jlaffaye](https://github.com/jlaffaye) for reporting) * Web: Throttle notification sound in web app to play at most once every 2 seconds (similar to [#1550](https://github.com/binwiederhier/ntfy/issues/1550), thanks to [@jlaffaye](https://github.com/jlaffaye) for reporting)

View File

@@ -104,7 +104,7 @@ var postgresQueries = queries{
// NewPostgresStore creates a new PostgreSQL-backed message cache store using an existing database connection pool. // NewPostgresStore creates a new PostgreSQL-backed message cache store using an existing database connection pool.
func NewPostgresStore(d *db.DB, batchSize int, batchTimeout time.Duration) (*Cache, error) { func NewPostgresStore(d *db.DB, batchSize int, batchTimeout time.Duration) (*Cache, error) {
if err := setupPostgres(d.SetupPrimary()); err != nil { if err := setupPostgres(d.Primary()); err != nil {
return nil, err return nil, err
} }
return newCache(d, postgresQueries, nil, batchSize, batchTimeout, false), nil return newCache(d, postgresQueries, nil, batchSize, batchTimeout, false), nil

View File

@@ -3,7 +3,7 @@ package test
import ( import (
"fmt" "fmt"
"heckel.io/ntfy/v2/server" "heckel.io/ntfy/v2/server"
"math/rand" "net"
"net/http" "net/http"
"path/filepath" "path/filepath"
"testing" "testing"
@@ -16,7 +16,7 @@ func StartServer(t *testing.T) (*server.Server, int) {
// StartServerWithConfig starts a server.Server with a random port and waits for the server to be up // StartServerWithConfig starts a server.Server with a random port and waits for the server to be up
func StartServerWithConfig(t *testing.T, conf *server.Config) (*server.Server, int) { func StartServerWithConfig(t *testing.T, conf *server.Config) (*server.Server, int) {
port := 10000 + rand.Intn(30000) port := findAvailablePort(t)
conf.ListenHTTP = fmt.Sprintf(":%d", port) conf.ListenHTTP = fmt.Sprintf(":%d", port)
conf.AttachmentCacheDir = t.TempDir() conf.AttachmentCacheDir = t.TempDir()
conf.CacheFile = filepath.Join(t.TempDir(), "cache.db") conf.CacheFile = filepath.Join(t.TempDir(), "cache.db")
@@ -33,6 +33,17 @@ func StartServerWithConfig(t *testing.T, conf *server.Config) (*server.Server, i
return s, port return s, port
} }
// findAvailablePort asks the OS for a free port by binding to :0
func findAvailablePort(t *testing.T) int {
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
port := listener.Addr().(*net.TCPAddr).Port
listener.Close()
return port
}
// StopServer stops the test server and waits for the port to be down // StopServer stops the test server and waits for the port to be down
func StopServer(t *testing.T, s *server.Server, port int) { func StopServer(t *testing.T, s *server.Server, port int) {
s.Stop() s.Stop()

View File

@@ -279,7 +279,7 @@ var postgresQueries = queries{
// NewPostgresManager creates a new Manager backed by a PostgreSQL database // NewPostgresManager creates a new Manager backed by a PostgreSQL database
func NewPostgresManager(d *db.DB, config *Config) (*Manager, error) { func NewPostgresManager(d *db.DB, config *Config) (*Manager, error) {
if err := setupPostgres(d.SetupPrimary()); err != nil { if err := setupPostgres(d.Primary()); err != nil {
return nil, err return nil, err
} }
return newManager(d, postgresQueries, config) return newManager(d, postgresQueries, config)

View File

@@ -74,7 +74,7 @@ const (
// NewPostgresStore creates a new PostgreSQL-backed web push store using an existing database connection pool. // NewPostgresStore creates a new PostgreSQL-backed web push store using an existing database connection pool.
func NewPostgresStore(d *ntfydb.DB) (*Store, error) { func NewPostgresStore(d *ntfydb.DB) (*Store, error) {
if err := setupPostgres(d.SetupPrimary()); err != nil { if err := setupPostgres(d.Primary()); err != nil {
return nil, err return nil, err
} }
return &Store{ return &Store{