Transactions

This commit is contained in:
binwiederhier
2026-03-03 14:42:36 -05:00
parent 66449bd19b
commit 33b19814c7
3 changed files with 195 additions and 200 deletions

View File

@@ -267,13 +267,13 @@ check: test web-fmt-check fmt-check vet web-lint lint staticcheck
checkv: testv web-fmt-check fmt-check vet web-lint lint staticcheck checkv: testv web-fmt-check fmt-check vet web-lint lint staticcheck
test: .PHONY test: .PHONY
go test -parallel 3 $(shell go list ./... | grep -vE 'ntfy/(test|examples|tools)') go test -parallel 3 $(shell go list ./... | grep -vE 'ntfy/v2/(test|examples|tools|web)')
testv: .PHONY testv: .PHONY
go test -v -parallel 3 $(shell go list ./... | grep -vE 'ntfy/(test|examples|tools)') go test -v -parallel 3 $(shell go list ./... | grep -vE 'ntfy/v2/(test|examples|tools|web)')
race: .PHONY race: .PHONY
go test -v -race $(shell go list ./... | grep -vE 'ntfy/(test|examples|tools)') go test -v -race $(shell go list ./... | grep -vE 'ntfy/v2/(test|examples|tools|web)')
coverage: coverage:
mkdir -p build/coverage mkdir -p build/coverage

View File

@@ -12,7 +12,6 @@ import (
// Initial SQLite schema // Initial SQLite schema
const ( const (
sqliteCreateTablesQuery = ` sqliteCreateTablesQuery = `
BEGIN;
CREATE TABLE IF NOT EXISTS messages ( CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
mid TEXT NOT NULL, mid TEXT NOT NULL,
@@ -53,7 +52,6 @@ const (
value INT value INT
); );
INSERT INTO stats (key, value) VALUES ('messages', 0); INSERT INTO stats (key, value) VALUES ('messages', 0);
COMMIT;
` `
) )
@@ -75,11 +73,9 @@ const (
const ( const (
// 0 -> 1 // 0 -> 1
sqliteMigrate0To1AlterMessagesTableQuery = ` sqliteMigrate0To1AlterMessagesTableQuery = `
BEGIN;
ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0); ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
COMMIT;
` `
// 1 -> 2 // 1 -> 2
@@ -89,7 +85,6 @@ const (
// 2 -> 3 // 2 -> 3
sqliteMigrate2To3AlterMessagesTableQuery = ` sqliteMigrate2To3AlterMessagesTableQuery = `
BEGIN;
ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT('');
@@ -97,7 +92,6 @@ const (
ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0'); ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0');
ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT('');
COMMIT;
` `
// 3 -> 4 // 3 -> 4
sqliteMigrate3To4AlterMessagesTableQuery = ` sqliteMigrate3To4AlterMessagesTableQuery = `
@@ -106,7 +100,6 @@ const (
// 4 -> 5 // 4 -> 5
sqliteMigrate4To5AlterMessagesTableQuery = ` sqliteMigrate4To5AlterMessagesTableQuery = `
BEGIN;
CREATE TABLE IF NOT EXISTS messages_new ( CREATE TABLE IF NOT EXISTS messages_new (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
mid TEXT NOT NULL, mid TEXT NOT NULL,
@@ -138,7 +131,6 @@ const (
FROM messages; FROM messages;
DROP TABLE messages; DROP TABLE messages;
ALTER TABLE messages_new RENAME TO messages; ALTER TABLE messages_new RENAME TO messages;
COMMIT;
` `
// 5 -> 6 // 5 -> 6
@@ -259,17 +251,19 @@ func setupSQLite(db *sql.DB, startupQueries string, cacheDuration time.Duration)
return nil return nil
} }
func setupNewSQLite(db *sql.DB) error { func setupNewSQLite(sqlDB *sql.DB) error {
if _, err := db.Exec(sqliteCreateTablesQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteCreateTablesQuery); err != nil {
} return err
if _, err := db.Exec(sqliteCreateSchemaVersionTableQuery); err != nil { }
return err if _, err := tx.Exec(sqliteCreateSchemaVersionTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil { }
return err if _, err := tx.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil {
} return err
return nil }
return nil
})
} }
func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error { func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error {
@@ -281,106 +275,124 @@ func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error {
return nil return nil
} }
func sqliteMigrateFrom0(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom0(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 0 to 1") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 0 to 1")
if _, err := db.Exec(sqliteMigrate0To1AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate0To1AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteCreateSchemaVersionTableQuery); err != nil { }
return err if _, err := tx.Exec(sqliteCreateSchemaVersionTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteInsertSchemaVersionQuery, 1); err != nil { }
return err if _, err := tx.Exec(sqliteInsertSchemaVersionQuery, 1); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom1(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom1(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 1 to 2") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 1 to 2")
if _, err := db.Exec(sqliteMigrate1To2AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate1To2AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom2(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom2(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 2 to 3") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 2 to 3")
if _, err := db.Exec(sqliteMigrate2To3AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate2To3AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom3(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom3(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 3 to 4") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 3 to 4")
if _, err := db.Exec(sqliteMigrate3To4AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate3To4AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom4(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom4(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 4 to 5") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 4 to 5")
if _, err := db.Exec(sqliteMigrate4To5AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate4To5AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom5(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom5(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 5 to 6") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 5 to 6")
if _, err := db.Exec(sqliteMigrate5To6AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate5To6AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom6(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom6(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 6 to 7") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 6 to 7")
if _, err := db.Exec(sqliteMigrate6To7AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate6To7AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 7); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 7); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom7(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom7(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 7 to 8") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 7 to 8")
if _, err := db.Exec(sqliteMigrate7To8AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate7To8AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 8); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 8); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom8(db *sql.DB, _ time.Duration) error { func sqliteMigrateFrom8(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 8 to 9") log.Tag(tagMessageCache).Info("Migrating cache database schema: from 8 to 9")
if _, err := db.Exec(sqliteMigrate8To9AlterMessagesTableQuery); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteMigrate8To9AlterMessagesTableQuery); err != nil {
} return err
if _, err := db.Exec(sqliteUpdateSchemaVersionQuery, 9); err != nil { }
return err if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 9); err != nil {
} return err
return nil }
return nil
})
} }
func sqliteMigrateFrom9(sqlDB *sql.DB, cacheDuration time.Duration) error { func sqliteMigrateFrom9(sqlDB *sql.DB, cacheDuration time.Duration) error {

View File

@@ -4,6 +4,7 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"heckel.io/ntfy/v2/db"
"heckel.io/ntfy/v2/log" "heckel.io/ntfy/v2/log"
"heckel.io/ntfy/v2/util" "heckel.io/ntfy/v2/util"
) )
@@ -11,7 +12,6 @@ import (
// Initial SQLite schema // Initial SQLite schema
const ( const (
sqliteCreateTablesQueries = ` sqliteCreateTablesQueries = `
BEGIN;
CREATE TABLE IF NOT EXISTS tier ( CREATE TABLE IF NOT EXISTS tier (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
code TEXT NOT NULL, code TEXT NOT NULL,
@@ -92,7 +92,6 @@ const (
INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created) INSERT INTO user (id, user, pass, role, sync_topic, provisioned, created)
VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, UNIXEPOCH()) VALUES ('` + everyoneID + `', '*', '', 'anonymous', '', false, UNIXEPOCH())
ON CONFLICT (id) DO NOTHING; ON CONFLICT (id) DO NOTHING;
COMMIT;
` `
) )
@@ -347,14 +346,16 @@ func setupSQLite(db *sql.DB) error {
return nil return nil
} }
func setupNewSQLite(db *sql.DB) error { func setupNewSQLite(sqlDB *sql.DB) error {
if _, err := db.Exec(sqliteCreateTablesQueries); err != nil { return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
return err if _, err := tx.Exec(sqliteCreateTablesQueries); err != nil {
} return err
if _, err := db.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil { }
return err if _, err := tx.Exec(sqliteInsertSchemaVersionQuery, sqliteCurrentSchemaVersion); err != nil {
} return err
return nil }
return nil
})
} }
func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error { func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error {
@@ -369,114 +370,96 @@ func runSQLiteStartupQueries(db *sql.DB, startupQueries string) error {
return nil return nil
} }
func sqliteMigrateFrom1(db *sql.DB) error { func sqliteMigrateFrom1(sqlDB *sql.DB) error {
log.Tag(tag).Info("Migrating user database schema: from 1 to 2") log.Tag(tag).Info("Migrating user database schema: from 1 to 2")
tx, err := db.Begin() return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if err != nil { // Rename user -> user_old, and create new tables
return err if _, err := tx.Exec(sqliteMigrate1To2CreateTablesQueries); err != nil {
}
defer tx.Rollback()
// Rename user -> user_old, and create new tables
if _, err := tx.Exec(sqliteMigrate1To2CreateTablesQueries); err != nil {
return err
}
// Insert users from user_old into new user table, with ID and sync_topic
rows, err := tx.Query(sqliteMigrate1To2SelectAllOldUsernamesNoTxQuery)
if err != nil {
return err
}
defer rows.Close()
usernames := make([]string, 0)
for rows.Next() {
var username string
if err := rows.Scan(&username); err != nil {
return err return err
} }
usernames = append(usernames, username) // Insert users from user_old into new user table, with ID and sync_topic
} rows, err := tx.Query(sqliteMigrate1To2SelectAllOldUsernamesNoTxQuery)
if err := rows.Close(); err != nil { if err != nil {
return err
}
for _, username := range usernames {
userID := util.RandomStringPrefix(userIDPrefix, userIDLength)
syncTopic := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength)
if _, err := tx.Exec(sqliteMigrate1To2InsertUserNoTxQuery, userID, syncTopic, username); err != nil {
return err return err
} }
} defer rows.Close()
// Migrate old "access" table to "user_access" and drop "access" and "user_old" usernames := make([]string, 0)
if _, err := tx.Exec(sqliteMigrate1To2InsertFromOldTablesAndDropNoTxQuery); err != nil { for rows.Next() {
return err var username string
} if err := rows.Scan(&username); err != nil {
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil { return err
return err }
} usernames = append(usernames, username)
if err := tx.Commit(); err != nil { }
return err if err := rows.Close(); err != nil {
} return err
return nil }
for _, username := range usernames {
userID := util.RandomStringPrefix(userIDPrefix, userIDLength)
syncTopic := util.RandomStringPrefix(syncTopicPrefix, syncTopicLength)
if _, err := tx.Exec(sqliteMigrate1To2InsertUserNoTxQuery, userID, syncTopic, username); err != nil {
return err
}
}
// Migrate old "access" table to "user_access" and drop "access" and "user_old"
if _, err := tx.Exec(sqliteMigrate1To2InsertFromOldTablesAndDropNoTxQuery); err != nil {
return err
}
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 2); err != nil {
return err
}
return nil
})
} }
func sqliteMigrateFrom2(db *sql.DB) error { func sqliteMigrateFrom2(sqlDB *sql.DB) error {
log.Tag(tag).Info("Migrating user database schema: from 2 to 3") log.Tag(tag).Info("Migrating user database schema: from 2 to 3")
tx, err := db.Begin() return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if err != nil { if _, err := tx.Exec(sqliteMigrate2To3UpdateQueries); err != nil {
return err return err
} }
defer tx.Rollback() if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil {
if _, err := tx.Exec(sqliteMigrate2To3UpdateQueries); err != nil { return err
return err }
} return nil
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 3); err != nil { })
return err
}
return tx.Commit()
} }
func sqliteMigrateFrom3(db *sql.DB) error { func sqliteMigrateFrom3(sqlDB *sql.DB) error {
log.Tag(tag).Info("Migrating user database schema: from 3 to 4") log.Tag(tag).Info("Migrating user database schema: from 3 to 4")
tx, err := db.Begin() return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if err != nil { if _, err := tx.Exec(sqliteMigrate3To4UpdateQueries); err != nil {
return err return err
} }
defer tx.Rollback() if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil {
if _, err := tx.Exec(sqliteMigrate3To4UpdateQueries); err != nil { return err
return err }
} return nil
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 4); err != nil { })
return err
}
return tx.Commit()
} }
func sqliteMigrateFrom4(db *sql.DB) error { func sqliteMigrateFrom4(sqlDB *sql.DB) error {
log.Tag(tag).Info("Migrating user database schema: from 4 to 5") log.Tag(tag).Info("Migrating user database schema: from 4 to 5")
tx, err := db.Begin() return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if err != nil { if _, err := tx.Exec(sqliteMigrate4To5UpdateQueries); err != nil {
return err return err
} }
defer tx.Rollback() if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil {
if _, err := tx.Exec(sqliteMigrate4To5UpdateQueries); err != nil { return err
return err }
} return nil
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 5); err != nil { })
return err
}
return tx.Commit()
} }
func sqliteMigrateFrom5(db *sql.DB) error { func sqliteMigrateFrom5(sqlDB *sql.DB) error {
log.Tag(tag).Info("Migrating user database schema: from 5 to 6") log.Tag(tag).Info("Migrating user database schema: from 5 to 6")
tx, err := db.Begin() return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if err != nil { if _, err := tx.Exec(sqliteMigrate5To6UpdateQueries); err != nil {
return err return err
} }
defer tx.Rollback() if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil {
if _, err := tx.Exec(sqliteMigrate5To6UpdateQueries); err != nil { return err
return err }
} return nil
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 6); err != nil { })
return err
}
return tx.Commit()
} }