Compare commits

..

1 Commits

Author SHA1 Message Date
binwiederhier
e55d1cee6b Attachment fixes to address inconsistencies between DB and backend store 2026-03-24 17:37:50 -04:00
6 changed files with 69 additions and 36 deletions

View File

@@ -246,7 +246,7 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) {
return readMessages(rows)
}
// MessagesExpired returns a list of IDs for messages that have expired (should be deleted)
// MessagesExpired returns a list of message IDs that have expired and should be deleted
func (c *Cache) MessagesExpired() ([]string, error) {
rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix())
if err != nil {
@@ -262,10 +262,10 @@ func (c *Cache) Message(id string) (*model.Message, error) {
if err != nil {
return nil, err
}
defer rows.Close()
if !rows.Next() {
return nil, model.ErrMessageNotFound
}
defer rows.Close()
return readMessage(rows)
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"heckel.io/ntfy/v2/db"
"heckel.io/ntfy/v2/log"
)
// Initial PostgreSQL schema
@@ -41,6 +42,7 @@ const (
CREATE INDEX IF NOT EXISTS idx_message_sequence_id ON message (sequence_id);
CREATE INDEX IF NOT EXISTS idx_message_topic_published_time ON message (topic, published, time, id);
CREATE INDEX IF NOT EXISTS idx_message_published_expires ON message (published, expires);
CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE;
CREATE INDEX IF NOT EXISTS idx_message_sender_attachment_expires ON message (sender, attachment_expires) WHERE user_id = '';
CREATE INDEX IF NOT EXISTS idx_message_user_id_attachment_expires ON message (user_id, attachment_expires);
CREATE TABLE IF NOT EXISTS message_stats (
@@ -57,21 +59,57 @@ const (
// PostgreSQL schema management queries
const (
postgresCurrentSchemaVersion = 14
postgresCurrentSchemaVersion = 15
postgresInsertSchemaVersionQuery = `INSERT INTO schema_version (store, version) VALUES ('message', $1)`
postgresUpdateSchemaVersionQuery = `UPDATE schema_version SET version = $1 WHERE store = 'message'`
postgresSelectSchemaVersionQuery = `SELECT version FROM schema_version WHERE store = 'message'`
)
func setupPostgres(db *sql.DB) error {
// PostgreSQL schema migrations
const (
// 14 -> 15
postgresMigrate14To15CreateIndexQuery = `
CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE;
`
)
var postgresMigrations = map[int]func(db *sql.DB) error{
14: postgresMigrateFrom14,
}
func setupPostgres(sqlDB *sql.DB) error {
var schemaVersion int
if err := db.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
return setupNewPostgresDB(db)
if err := sqlDB.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
return setupNewPostgresDB(sqlDB)
} else if schemaVersion == postgresCurrentSchemaVersion {
return nil
} else if schemaVersion > postgresCurrentSchemaVersion {
return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, postgresCurrentSchemaVersion)
}
for i := schemaVersion; i < postgresCurrentSchemaVersion; i++ {
fn, ok := postgresMigrations[i]
if !ok {
return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
} else if err := fn(sqlDB); err != nil {
return err
}
}
return nil
}
func postgresMigrateFrom14(sqlDB *sql.DB) error {
log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15")
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil {
return err
}
if _, err := tx.Exec(postgresUpdateSchemaVersionQuery, 15); err != nil {
return err
}
return nil
})
}
func setupNewPostgresDB(sqlDB *sql.DB) error {
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if _, err := tx.Exec(postgresCreateTablesQuery); err != nil {

View File

@@ -57,7 +57,7 @@ const (
// Schema version management for SQLite
const (
sqliteCurrentSchemaVersion = 14
sqliteCurrentSchemaVersion = 15
sqliteCreateSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
@@ -208,6 +208,7 @@ var (
11: sqliteMigrateFrom11,
12: sqliteMigrateFrom12,
13: sqliteMigrateFrom13,
14: sqliteMigrateFrom14,
}
)
@@ -451,3 +452,15 @@ func sqliteMigrateFrom13(sqlDB *sql.DB, _ time.Duration) error {
return nil
})
}
// sqliteMigrateFrom14 is a no-op; the corresponding Postgres migration adds
// idx_message_attachment_expires, which SQLite already has from the initial schema.
func sqliteMigrateFrom14(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 14 to 15")
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 15); err != nil {
return err
}
return nil
})
}

View File

@@ -1426,6 +1426,9 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *model.Me
return err
}
attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
if m.Expires > 0 && attachmentExpiry > m.Expires {
attachmentExpiry = m.Expires // Attachment must never outlive the message
}
if m.Time > attachmentExpiry {
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
}

View File

@@ -3,7 +3,6 @@ package server
import (
"heckel.io/ntfy/v2/log"
"heckel.io/ntfy/v2/util"
"strings"
)
func (s *Server) execManager() {
@@ -151,11 +150,11 @@ func (s *Server) pruneAttachments() {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
} else if len(ids) > 0 {
if log.Tag(tagManager).IsDebug() {
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
}
if err := s.attachment.Remove(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
log.Tag(tagManager).Debug("Marking %d expired attachment(s) as deleted", len(ids))
}
// Only mark as deleted in DB. The actual storage files are cleaned up
// by the attachment store's sync() loop, which periodically reconciles
// storage with the database and removes orphaned files.
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
@@ -174,13 +173,11 @@ func (s *Server) pruneMessages() {
if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
} else if len(expiredMessageIDs) > 0 {
if s.attachment != nil {
if err := s.attachment.Remove(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
}
}
// Only delete DB rows. Attachment storage files are cleaned up by the
// attachment store's sync() loop, which periodically reconciles storage
// with the database and removes orphaned files.
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
log.Tag(tagManager).Err(err).Warn("Error deleting expired messages")
}
} else {
log.Tag(tagManager).Debug("No expired messages to delete")

18
web/package-lock.json generated
View File

@@ -9514,24 +9514,6 @@
"dev": true,
"license": "ISC"
},
"node_modules/yaml": {
"version": "2.8.3",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.3.tgz",
"integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==",
"dev": true,
"license": "ISC",
"optional": true,
"peer": true,
"bin": {
"yaml": "bin.mjs"
},
"engines": {
"node": ">= 14.6"
},
"funding": {
"url": "https://github.com/sponsors/eemeli"
}
},
"node_modules/yocto-queue": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",