Pre-import

This commit is contained in:
binwiederhier
2026-03-15 20:25:22 -04:00
parent 888850d8bc
commit 66208e6f88
2 changed files with 94 additions and 9 deletions

View File

@@ -13,6 +13,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
Please check out the release notes for [upcoming releases](#not-released-yet) below. Please check out the release notes for [upcoming releases](#not-released-yet) below.
## ntfy server v2.19.0 ## ntfy server v2.19.0
Released March 15, 2026
This is a fast-follow release that enables Postgres read replica support. This is a fast-follow release that enables Postgres read replica support.

View File

@@ -65,12 +65,12 @@ const (
key TEXT PRIMARY KEY, key TEXT PRIMARY KEY,
value BIGINT value BIGINT
); );
INSERT INTO message_stats (key, value) VALUES ('messages', 0); INSERT INTO message_stats (key, value) VALUES ('messages', 0) ON CONFLICT (key) DO NOTHING;
CREATE TABLE IF NOT EXISTS schema_version ( CREATE TABLE IF NOT EXISTS schema_version (
store TEXT PRIMARY KEY, store TEXT PRIMARY KEY,
version INT NOT NULL version INT NOT NULL
); );
INSERT INTO schema_version (store, version) VALUES ('message', 14); INSERT INTO schema_version (store, version) VALUES ('message', 14) ON CONFLICT (store) DO NOTHING;
` `
// Initial PostgreSQL schema for user store (from user/manager_postgres_schema.go) // Initial PostgreSQL schema for user store (from user/manager_postgres_schema.go)
@@ -146,7 +146,7 @@ const (
INSERT INTO "user" (id, user_name, pass, role, sync_topic, provisioned, created) INSERT INTO "user" (id, user_name, pass, role, sync_topic, provisioned, created)
VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, EXTRACT(EPOCH FROM NOW())::BIGINT) VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, EXTRACT(EPOCH FROM NOW())::BIGINT)
ON CONFLICT (id) DO NOTHING; ON CONFLICT (id) DO NOTHING;
INSERT INTO schema_version (store, version) VALUES ('user', 6); INSERT INTO schema_version (store, version) VALUES ('user', 6) ON CONFLICT (store) DO NOTHING;
` `
// Initial PostgreSQL schema for web push store (from webpush/store_postgres.go) // Initial PostgreSQL schema for web push store (from webpush/store_postgres.go)
@@ -174,7 +174,7 @@ const (
store TEXT PRIMARY KEY, store TEXT PRIMARY KEY,
version INT NOT NULL version INT NOT NULL
); );
INSERT INTO schema_version (store, version) VALUES ('webpush', 1); INSERT INTO schema_version (store, version) VALUES ('webpush', 1) ON CONFLICT (store) DO NOTHING;
` `
) )
@@ -185,6 +185,7 @@ var flags = []cli.Flag{
altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file"}, Usage: "SQLite user/auth database file path"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "auth-file", Aliases: []string{"auth_file"}, Usage: "SQLite user/auth database file path"}),
altsrc.NewStringFlag(&cli.StringFlag{Name: "web-push-file", Aliases: []string{"web_push_file"}, Usage: "SQLite web push database file path"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "web-push-file", Aliases: []string{"web_push_file"}, Usage: "SQLite web push database file path"}),
&cli.BoolFlag{Name: "create-schema", Usage: "create initial PostgreSQL schema before importing"}, &cli.BoolFlag{Name: "create-schema", Usage: "create initial PostgreSQL schema before importing"},
&cli.BoolFlag{Name: "pre-import", Usage: "pre-import messages while ntfy is still running (only imports messages)"},
} }
func main() { func main() {
@@ -207,10 +208,17 @@ func execImport(c *cli.Context) error {
cacheFile := c.String("cache-file") cacheFile := c.String("cache-file")
authFile := c.String("auth-file") authFile := c.String("auth-file")
webPushFile := c.String("web-push-file") webPushFile := c.String("web-push-file")
preImport := c.Bool("pre-import")
if databaseURL == "" { if databaseURL == "" {
return fmt.Errorf("database-url must be set (via --database-url or config file)") return fmt.Errorf("database-url must be set (via --database-url or config file)")
} }
if preImport {
if cacheFile == "" {
return fmt.Errorf("--cache-file must be set when using --pre-import")
}
return execPreImport(c, databaseURL, cacheFile)
}
if cacheFile == "" && authFile == "" && webPushFile == "" { if cacheFile == "" && authFile == "" && webPushFile == "" {
return fmt.Errorf("at least one of --cache-file, --auth-file, or --web-push-file must be set") return fmt.Errorf("at least one of --cache-file, --auth-file, or --web-push-file must be set")
} }
@@ -261,7 +269,8 @@ func execImport(c *cli.Context) error {
if err := verifySchemaVersion(pgDB, "message", expectedMessageSchemaVersion); err != nil { if err := verifySchemaVersion(pgDB, "message", expectedMessageSchemaVersion); err != nil {
return err return err
} }
if err := importMessages(cacheFile, pgDB); err != nil { sinceTime := maxMessageTime(pgDB)
if err := importMessages(cacheFile, pgDB, sinceTime); err != nil {
return fmt.Errorf("cannot import messages: %w", err) return fmt.Errorf("cannot import messages: %w", err)
} }
} }
@@ -300,6 +309,54 @@ func execImport(c *cli.Context) error {
return nil return nil
} }
func execPreImport(c *cli.Context, databaseURL, cacheFile string) error {
fmt.Println("pgimport - PRE-IMPORT mode (ntfy can keep running)")
fmt.Println()
fmt.Println("Source:")
printSource(" Cache file: ", cacheFile)
fmt.Println()
fmt.Println("Target:")
fmt.Printf(" Database URL: %s\n", maskPassword(databaseURL))
fmt.Println()
fmt.Println("This will pre-import messages into PostgreSQL while ntfy is still running.")
fmt.Println("After this completes, stop ntfy and run pgimport again without --pre-import")
fmt.Println("to import remaining messages, users, and web push subscriptions.")
fmt.Print("Continue? (y/n): ")
var answer string
fmt.Scanln(&answer)
if strings.TrimSpace(strings.ToLower(answer)) != "y" {
fmt.Println("Aborted.")
return nil
}
fmt.Println()
pgHost, err := pg.Open(databaseURL)
if err != nil {
return fmt.Errorf("cannot connect to PostgreSQL: %w", err)
}
pgDB := pgHost.DB
defer pgDB.Close()
if c.Bool("create-schema") {
if err := createSchema(pgDB, cacheFile, "", ""); err != nil {
return fmt.Errorf("cannot create schema: %w", err)
}
}
if err := verifySchemaVersion(pgDB, "message", expectedMessageSchemaVersion); err != nil {
return err
}
if err := importMessages(cacheFile, pgDB, 0); err != nil {
return fmt.Errorf("cannot import messages: %w", err)
}
fmt.Println()
fmt.Println("Pre-import complete. Now stop ntfy and run pgimport again without --pre-import")
fmt.Println("to import any remaining messages, users, and web push subscriptions.")
return nil
}
func createSchema(pgDB *sql.DB, cacheFile, authFile, webPushFile string) error { func createSchema(pgDB *sql.DB, cacheFile, authFile, webPushFile string) error {
fmt.Println("Creating initial PostgreSQL schema ...") fmt.Println("Creating initial PostgreSQL schema ...")
// User schema must be created before message schema, because message_stats and // User schema must be created before message schema, because message_stats and
@@ -645,16 +702,41 @@ func importUserPhones(sqlDB, pgDB *sql.DB) (int, error) {
// Message import // Message import
func importMessages(sqliteFile string, pgDB *sql.DB) error { const preImportTimeDelta = 30 // seconds to subtract from max time to account for in-flight messages
// maxMessageTime returns the maximum message time in PostgreSQL minus a small buffer,
// or 0 if there are no messages yet. This is used after a --pre-import run to only
// import messages that arrived since the pre-import.
func maxMessageTime(pgDB *sql.DB) int64 {
var maxTime sql.NullInt64
if err := pgDB.QueryRow(`SELECT MAX(time) FROM message`).Scan(&maxTime); err != nil || !maxTime.Valid || maxTime.Int64 == 0 {
return 0
}
sinceTime := maxTime.Int64 - preImportTimeDelta
if sinceTime < 0 {
return 0
}
fmt.Printf("Pre-imported messages detected (max time: %d), importing delta (since time %d) ...\n", maxTime.Int64, sinceTime)
return sinceTime
}
func importMessages(sqliteFile string, pgDB *sql.DB, sinceTime int64) error {
sqlDB, err := openSQLite(sqliteFile) sqlDB, err := openSQLite(sqliteFile)
if err != nil { if err != nil {
fmt.Printf("Skipping message import: %s\n", err) fmt.Printf("Skipping message import: %s\n", err)
return nil return nil
} }
defer sqlDB.Close() defer sqlDB.Close()
fmt.Printf("Importing messages from %s ...\n", sqliteFile)
rows, err := sqlDB.Query(`SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published FROM messages`) query := `SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published FROM messages`
var rows *sql.Rows
if sinceTime > 0 {
fmt.Printf("Importing messages from %s (since time %d) ...\n", sqliteFile, sinceTime)
rows, err = sqlDB.Query(query+` WHERE time >= ?`, sinceTime)
} else {
fmt.Printf("Importing messages from %s ...\n", sqliteFile)
rows, err = sqlDB.Query(query)
}
if err != nil { if err != nil {
return fmt.Errorf("querying messages: %w", err) return fmt.Errorf("querying messages: %w", err)
} }
@@ -837,7 +919,9 @@ func importWebPush(sqliteFile string, pgDB *sql.DB) error {
} }
func toUTF8(s string) string { func toUTF8(s string) string {
return strings.ToValidUTF8(s, "\uFFFD") s = strings.ToValidUTF8(s, "\uFFFD")
s = strings.ReplaceAll(s, "\x00", "")
return s
} }
// Verification // Verification