From 33b19814c71f7a5e66849b5928a1645f9085c194 Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Tue, 3 Mar 2026 14:42:36 -0500 Subject: [PATCH] Transactions --- Makefile | 6 +- message/cache_sqlite_schema.go | 200 +++++++++++++++++---------------- user/manager_sqlite_schema.go | 189 ++++++++++++++----------------- 3 files changed, 195 insertions(+), 200 deletions(-) diff --git a/Makefile b/Makefile index 997ea54c..3805a19a 100644 --- a/Makefile +++ b/Makefile @@ -267,13 +267,13 @@ check: test web-fmt-check fmt-check vet web-lint lint staticcheck checkv: testv web-fmt-check fmt-check vet web-lint lint staticcheck test: .PHONY - go test -parallel 3 $(shell go list ./... | grep -vE 'ntfy/(test|examples|tools)') + go test -parallel 3 $(shell go list ./... | grep -vE 'ntfy/v2/(test|examples|tools|web)') testv: .PHONY - go test -v -parallel 3 $(shell go list ./... | grep -vE 'ntfy/(test|examples|tools)') + go test -v -parallel 3 $(shell go list ./... | grep -vE 'ntfy/v2/(test|examples|tools|web)') race: .PHONY - go test -v -race $(shell go list ./... | grep -vE 'ntfy/(test|examples|tools)') + go test -v -race $(shell go list ./... | grep -vE 'ntfy/v2/(test|examples|tools|web)') coverage: mkdir -p build/coverage diff --git a/message/cache_sqlite_schema.go b/message/cache_sqlite_schema.go index da744887..368594f6 100644 --- a/message/cache_sqlite_schema.go +++ b/message/cache_sqlite_schema.go @@ -12,7 +12,6 @@ import ( // Initial SQLite schema const ( sqliteCreateTablesQuery = ` - BEGIN; CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, mid TEXT NOT NULL, @@ -53,7 +52,6 @@ const ( value INT ); INSERT INTO stats (key, value) VALUES ('messages', 0); - COMMIT; ` ) @@ -75,11 +73,9 @@ const ( const ( // 0 -> 1 sqliteMigrate0To1AlterMessagesTableQuery = ` - BEGIN; ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0); ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT(''); - COMMIT; ` // 1 -> 2 @@ -89,7 +85,6 @@ const ( // 2 -> 3 sqliteMigrate2To3AlterMessagesTableQuery = ` - BEGIN; ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT(''); @@ -97,7 +92,6 @@ const ( ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0'); ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT(''); - COMMIT; ` // 3 -> 4 sqliteMigrate3To4AlterMessagesTableQuery = ` @@ -106,7 +100,6 @@ const ( // 4 -> 5 sqliteMigrate4To5AlterMessagesTableQuery = ` - BEGIN; CREATE TABLE IF NOT EXISTS messages_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, mid TEXT NOT NULL, @@ -138,7 +131,6 @@ const ( FROM messages; DROP TABLE messages; ALTER TABLE messages_new RENAME TO messages; - COMMIT; ` // 5 -> 6 @@ -259,17 +251,19 @@ func setupSQLite(db *sql.DB, startupQueries string, cacheDuration time.Duration) return nil } -func setupNewSQLite(db *sql.DB) error { - if _, err := db.Exec(sqliteCreateTablesQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteCreateSchemaVersionTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil { - return err - } - return nil +func setupNewSQLite(sqlDB *sql.DB) error { + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteCreateTablesQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteCreateSchemaVersionTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil { + return err + } + return nil + }) } func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error { @@ -281,106 +275,124 @@ func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error { return nil } -func sqliteMigrateFrom0(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom0(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 0 to 1") - if _, err := db.Exec(sqliteMigrate0To1AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteCreateSchemaVersionTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteInsertSchemaVersionQuery, 1); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate0To1AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteCreateSchemaVersionTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteInsertSchemaVersionQuery, 1); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom1(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom1(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 1 to 2") - if _, err := db.Exec(sqliteMigrate1To2AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate1To2AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom2(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom2(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 2 to 3") - if _, err := db.Exec(sqliteMigrate2To3AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate2To3AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom3(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom3(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 3 to 4") - if _, err := db.Exec(sqliteMigrate3To4AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate3To4AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom4(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom4(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 4 to 5") - if _, err := db.Exec(sqliteMigrate4To5AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate4To5AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom5(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom5(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 5 to 6") - if _, err := db.Exec(sqliteMigrate5To6AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate5To6AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom6(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom6(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 6 to 7") - if _, err := db.Exec(sqliteMigrate6To7AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 7); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate6To7AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 7); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom7(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom7(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 7 to 8") - if _, err := db.Exec(sqliteMigrate7To8AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 8); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate7To8AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 8); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom8(db *sql.DB, _ time.Duration) error { +func sqliteMigrateFrom8(sqlDB *sql.DB, _ time.Duration) error { log.Tag(tagMessageCache).Info("Migrating cache database schema: from 8 to 9") - if _, err := db.Exec(sqliteMigrate8To9AlterMessagesTableQuery); err != nil { - return err - } - if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 9); err != nil { - return err - } - return nil + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate8To9AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 9); err != nil { + return err + } + return nil + }) } func sqliteMigrateFrom9(sqlDB *sql.DB, cacheDuration time.Duration) error { diff --git a/user/manager_sqlite_schema.go b/user/manager_sqlite_schema.go index e6c84e01..01942163 100644 --- a/user/manager_sqlite_schema.go +++ b/user/manager_sqlite_schema.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" + "heckel.io/ntfy/v2/db" "heckel.io/ntfy/v2/log" "heckel.io/ntfy/v2/util" ) @@ -11,7 +12,6 @@ import ( // Initial SQLite schema const ( sqliteCreateTablesQueries = ` - BEGIN; CREATE TABLE IF NOT EXISTS tier ( id TEXT PRIMARY KEY, code TEXT NOT NULL, @@ -92,7 +92,6 @@ const ( INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created) VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, UNIXEPOCH()) ON CONFLICT (id) DO NOTHING; - COMMIT; ` ) @@ -347,14 +346,16 @@ func setupSQLite(db *sql.DB) error { return nil } -func setupNewSQLite(db *sql.DB) error { - if _, err := db.Exec(sqliteCreateTablesQueries); err != nil { - return err - } - if _, err := db.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil { - return err - } - return nil +func setupNewSQLite(sqlDB *sql.DB) error { + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteCreateTablesQueries); err != nil { + return err + } + if _, err := tx.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil { + return err + } + return nil + }) } func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error { @@ -369,114 +370,96 @@ func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error { return nil } -func sqliteMigrateFrom1(db *sql.DB) error { +func sqliteMigrateFrom1(sqlDB *sql.DB) error { log.Tag(tag).Info("Migrating user database schema: from 1 to 2") - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - // Rename user -> user_old, and create new tables - if _, err := tx.Exec(sqliteMigrate1To2CreateTablesQueries); err != nil { - return err - } - // Insert users from user_old into new user table, with ID and sync_topic - rows, err := tx.Query(sqliteMigrate1To2SelectAllOldUsernamesNoTxQuery) - if err != nil { - return err - } - defer rows.Close() - usernames := make([]string, 0) - for rows.Next() { - var username string - if err := rows.Scan(&username); err != nil { + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + // Rename user -> user_old, and create new tables + if _, err := tx.Exec(sqliteMigrate1To2CreateTablesQueries); err != nil { return err } - usernames = append(usernames, username) - } - if err := rows.Close(); err != nil { - return err - } - for _, username := range usernames { - userID := util.RandomStringPrefix(userIDPrefix, userIDLength) - syncTopic := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength) - if _, err := tx.Exec(sqliteMigrate1To2InsertUserNoTxQuery, userID, syncTopic, username); err != nil { + // Insert users from user_old into new user table, with ID and sync_topic + rows, err := tx.Query(sqliteMigrate1To2SelectAllOldUsernamesNoTxQuery) + if err != nil { return err } - } - // Migrate old "access" table to "user_access" and drop "access" and "user_old" - if _, err := tx.Exec(sqliteMigrate1To2InsertFromOldTablesAndDropNoTxQuery); err != nil { - return err - } - if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil { - return err - } - if err := tx.Commit(); err != nil { - return err - } - return nil + defer rows.Close() + usernames := make([]string, 0) + for rows.Next() { + var username string + if err := rows.Scan(&username); err != nil { + return err + } + usernames = append(usernames, username) + } + if err := rows.Close(); err != nil { + return err + } + for _, username := range usernames { + userID := util.RandomStringPrefix(userIDPrefix, userIDLength) + syncTopic := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength) + if _, err := tx.Exec(sqliteMigrate1To2InsertUserNoTxQuery, userID, syncTopic, username); err != nil { + return err + } + } + // Migrate old "access" table to "user_access" and drop "access" and "user_old" + if _, err := tx.Exec(sqliteMigrate1To2InsertFromOldTablesAndDropNoTxQuery); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom2(db *sql.DB) error { +func sqliteMigrateFrom2(sqlDB *sql.DB) error { log.Tag(tag).Info("Migrating user database schema: from 2 to 3") - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - if _, err := tx.Exec(sqliteMigrate2To3UpdateQueries); err != nil { - return err - } - if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil { - return err - } - return tx.Commit() + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate2To3UpdateQueries); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom3(db *sql.DB) error { +func sqliteMigrateFrom3(sqlDB *sql.DB) error { log.Tag(tag).Info("Migrating user database schema: from 3 to 4") - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - if _, err := tx.Exec(sqliteMigrate3To4UpdateQueries); err != nil { - return err - } - if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil { - return err - } - return tx.Commit() + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate3To4UpdateQueries); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom4(db *sql.DB) error { +func sqliteMigrateFrom4(sqlDB *sql.DB) error { log.Tag(tag).Info("Migrating user database schema: from 4 to 5") - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - if _, err := tx.Exec(sqliteMigrate4To5UpdateQueries); err != nil { - return err - } - if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil { - return err - } - return tx.Commit() + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate4To5UpdateQueries); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil { + return err + } + return nil + }) } -func sqliteMigrateFrom5(db *sql.DB) error { +func sqliteMigrateFrom5(sqlDB *sql.DB) error { log.Tag(tag).Info("Migrating user database schema: from 5 to 6") - tx, err := db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - if _, err := tx.Exec(sqliteMigrate5To6UpdateQueries); err != nil { - return err - } - if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil { - return err - } - return tx.Commit() + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteMigrate5To6UpdateQueries); err != nil { + return err + } + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil { + return err + } + return nil + }) }