diff --git a/docs/releases.md b/docs/releases.md index 1ee3f1d6..0acc5ac8 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -13,6 +13,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release Please check out the release notes for [upcoming releases](#not-released-yet) below. ## ntfy server v2.19.0 +Released March 15, 2026 This is a fast-follow release that enables Postgres read replica support. diff --git a/tools/pgimport/main.go b/tools/pgimport/main.go index 3ba5273e..77c1d4cf 100644 --- a/tools/pgimport/main.go +++ b/tools/pgimport/main.go @@ -65,12 +65,12 @@ const ( key TEXT PRIMARY KEY, value BIGINT ); - INSERT INTO message_stats (key, value) VALUES ('messages', 0); + INSERT INTO message_stats (key, value) VALUES ('messages', 0) ON CONFLICT (key) DO NOTHING; CREATE TABLE IF NOT EXISTS schema_version ( store TEXT PRIMARY KEY, version INT NOT NULL ); - INSERT INTO schema_version (store, version) VALUES ('message', 14); + INSERT INTO schema_version (store, version) VALUES ('message', 14) ON CONFLICT (store) DO NOTHING; ` // Initial PostgreSQL schema for user store (from user/manager_postgres_schema.go) @@ -146,7 +146,7 @@ const ( INSERT INTO "user" (id, user_name, pass, role, sync_topic, provisioned, created) VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, EXTRACT(EPOCH FROM NOW())::BIGINT) ON CONFLICT (id) DO NOTHING; - INSERT INTO schema_version (store, version) VALUES ('user', 6); + INSERT INTO schema_version (store, version) VALUES ('user', 6) ON CONFLICT (store) DO NOTHING; ` // Initial PostgreSQL schema for web push store (from webpush/store_postgres.go) @@ -174,7 +174,7 @@ const ( store TEXT PRIMARY KEY, version INT NOT NULL ); - INSERT INTO schema_version (store, version) VALUES ('webpush', 1); + INSERT INTO schema_version (store, version) VALUES ('webpush', 1) ON CONFLICT (store) DO NOTHING; ` ) @@ -185,6 +185,7 @@ var flags = []cli.Flag{ altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file"}, Usage: "SQLite user/auth database file path"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "web-push-file", Aliases: []string{"web_push_file"}, Usage: "SQLite web push database file path"}), &cli.BoolFlag{Name: "create-schema", Usage: "create initial PostgreSQL schema before importing"}, + &cli.BoolFlag{Name: "pre-import", Usage: "pre-import messages while ntfy is still running (only imports messages)"}, } func main() { @@ -207,10 +208,17 @@ func execImport(c *cli.Context) error { cacheFile := c.String("cache-file") authFile := c.String("auth-file") webPushFile := c.String("web-push-file") + preImport := c.Bool("pre-import") if databaseURL == "" { return fmt.Errorf("database-url must be set (via --database-url or config file)") } + if preImport { + if cacheFile == "" { + return fmt.Errorf("--cache-file must be set when using --pre-import") + } + return execPreImport(c, databaseURL, cacheFile) + } if cacheFile == "" && authFile == "" && webPushFile == "" { return fmt.Errorf("at least one of --cache-file, --auth-file, or --web-push-file must be set") } @@ -261,7 +269,8 @@ func execImport(c *cli.Context) error { if err := verifySchemaVersion(pgDB, "message", expectedMessageSchemaVersion); err != nil { return err } - if err := importMessages(cacheFile, pgDB); err != nil { + sinceTime := maxMessageTime(pgDB) + if err := importMessages(cacheFile, pgDB, sinceTime); err != nil { return fmt.Errorf("cannot import messages: %w", err) } } @@ -300,6 +309,54 @@ func execImport(c *cli.Context) error { return nil } +func execPreImport(c *cli.Context, databaseURL, cacheFile string) error { + fmt.Println("pgimport - PRE-IMPORT mode (ntfy can keep running)") + fmt.Println() + fmt.Println("Source:") + printSource(" Cache file: ", cacheFile) + fmt.Println() + fmt.Println("Target:") + fmt.Printf(" Database URL: %s\n", maskPassword(databaseURL)) + fmt.Println() + fmt.Println("This will pre-import messages into PostgreSQL while ntfy is still running.") + fmt.Println("After this completes, stop ntfy and run pgimport again without --pre-import") + fmt.Println("to import remaining messages, users, and web push subscriptions.") + fmt.Print("Continue? (y/n): ") + + var answer string + fmt.Scanln(&answer) + if strings.TrimSpace(strings.ToLower(answer)) != "y" { + fmt.Println("Aborted.") + return nil + } + fmt.Println() + + pgHost, err := pg.Open(databaseURL) + if err != nil { + return fmt.Errorf("cannot connect to PostgreSQL: %w", err) + } + pgDB := pgHost.DB + defer pgDB.Close() + + if c.Bool("create-schema") { + if err := createSchema(pgDB, cacheFile, "", ""); err != nil { + return fmt.Errorf("cannot create schema: %w", err) + } + } + + if err := verifySchemaVersion(pgDB, "message", expectedMessageSchemaVersion); err != nil { + return err + } + if err := importMessages(cacheFile, pgDB, 0); err != nil { + return fmt.Errorf("cannot import messages: %w", err) + } + + fmt.Println() + fmt.Println("Pre-import complete. Now stop ntfy and run pgimport again without --pre-import") + fmt.Println("to import any remaining messages, users, and web push subscriptions.") + return nil +} + func createSchema(pgDB *sql.DB, cacheFile, authFile, webPushFile string) error { fmt.Println("Creating initial PostgreSQL schema ...") // User schema must be created before message schema, because message_stats and @@ -645,16 +702,41 @@ func importUserPhones(sqlDB, pgDB *sql.DB) (int, error) { // Message import -func importMessages(sqliteFile string, pgDB *sql.DB) error { +const preImportTimeDelta = 30 // seconds to subtract from max time to account for in-flight messages + +// maxMessageTime returns the maximum message time in PostgreSQL minus a small buffer, +// or 0 if there are no messages yet. This is used after a --pre-import run to only +// import messages that arrived since the pre-import. +func maxMessageTime(pgDB *sql.DB) int64 { + var maxTime sql.NullInt64 + if err := pgDB.QueryRow(`SELECT MAX(time) FROM message`).Scan(&maxTime); err != nil || !maxTime.Valid || maxTime.Int64 == 0 { + return 0 + } + sinceTime := maxTime.Int64 - preImportTimeDelta + if sinceTime < 0 { + return 0 + } + fmt.Printf("Pre-imported messages detected (max time: %d), importing delta (since time %d) ...\n", maxTime.Int64, sinceTime) + return sinceTime +} + +func importMessages(sqliteFile string, pgDB *sql.DB, sinceTime int64) error { sqlDB, err := openSQLite(sqliteFile) if err != nil { fmt.Printf("Skipping message import: %s\n", err) return nil } defer sqlDB.Close() - fmt.Printf("Importing messages from %s ...\n", sqliteFile) - rows, err := sqlDB.Query(`SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published FROM messages`) + query := `SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published FROM messages` + var rows *sql.Rows + if sinceTime > 0 { + fmt.Printf("Importing messages from %s (since time %d) ...\n", sqliteFile, sinceTime) + rows, err = sqlDB.Query(query+` WHERE time >= ?`, sinceTime) + } else { + fmt.Printf("Importing messages from %s ...\n", sqliteFile) + rows, err = sqlDB.Query(query) + } if err != nil { return fmt.Errorf("querying messages: %w", err) } @@ -837,7 +919,9 @@ func importWebPush(sqliteFile string, pgDB *sql.DB) error { } func toUTF8(s string) string { - return strings.ToValidUTF8(s, "\uFFFD") + s = strings.ToValidUTF8(s, "\uFFFD") + s = strings.ReplaceAll(s, "\x00", "") + return s } // Verification