Compare commits

..

6 Commits

Author SHA1 Message Date
binwiederhier
e55d1cee6b Attachment fixes to address inconsistencies between DB and backend store 2026-03-24 17:37:50 -04:00
binwiederhier
dd760f16f7 Merge branch 'main' of github.com:binwiederhier/ntfy 2026-03-23 13:05:09 -04:00
binwiederhier
d00277107a Release notes 2026-03-23 13:04:47 -04:00
binwiederhier
b95efe8dd3 Tighten message IDs in attachments 2026-03-23 12:54:13 -04:00
binwiederhier
075f2ffa15 Refine sync() to properly update sizes 2026-03-23 12:44:40 -04:00
Philipp C. Heckel
2f6a044c34 Merge pull request #1656 from binwiederhier/attachment-s3
S3 attachment storage
2026-03-22 21:33:07 -04:00
12 changed files with 177 additions and 115 deletions

View File

@@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io"
"regexp"
"sync"
"time"
@@ -20,52 +19,59 @@ const (
orphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads
)
var (
fileIDRegex = regexp.MustCompile(fmt.Sprintf(`^[-_A-Za-z0-9]{%d}$`, model.MessageIDLength))
errInvalidFileID = errors.New("invalid file ID")
)
var errInvalidFileID = errors.New("invalid file ID")
// Store manages attachment storage with shared logic for size tracking, limiting,
// ID validation, and background sync to reconcile storage with the database.
type Store struct {
backend backend
limit int64 // Defined limit of the store in bytes
size int64 // Current size of the store in bytes
sizes map[string]int64 // File ID -> size, for subtracting on Remove
localIDs func() ([]string, error) // Returns file IDs that should exist locally, used for sync()
closeChan chan struct{}
mu sync.RWMutex // Protects size and sizes
backend backend
limit int64 // Defined limit of the store in bytes
size int64 // Current size of the store in bytes
sizes map[string]int64 // File ID -> size, for subtracting on Remove
attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments
closeChan chan struct{}
mu sync.RWMutex // Protects size and sizes
}
// NewFileStore creates a new file-system backed attachment cache
func NewFileStore(dir string, totalSizeLimit int64, localIDsFn func() ([]string, error)) (*Store, error) {
func NewFileStore(dir string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
b, err := newFileBackend(dir)
if err != nil {
return nil, err
}
return newStore(b, totalSizeLimit, localIDsFn)
return newStore(b, totalSizeLimit, attachmentsWithSizes)
}
// NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format:
//
// s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT]
func NewS3Store(s3URL string, totalSizeLimit int64, localIDs func() ([]string, error)) (*Store, error) {
func NewS3Store(s3URL string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
config, err := s3.ParseURL(s3URL)
if err != nil {
return nil, err
}
return newStore(newS3Backend(s3.New(config)), totalSizeLimit, localIDs)
return newStore(newS3Backend(s3.New(config)), totalSizeLimit, attachmentsWithSizes)
}
func newStore(backend backend, totalSizeLimit int64, localIDs func() ([]string, error)) (*Store, error) {
func newStore(backend backend, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
c := &Store{
backend: backend,
limit: totalSizeLimit,
sizes: make(map[string]int64),
localIDs: localIDs,
closeChan: make(chan struct{}),
backend: backend,
limit: totalSizeLimit,
sizes: make(map[string]int64),
attachmentsWithSizes: attachmentsWithSizes,
closeChan: make(chan struct{}),
}
if localIDs != nil {
// Hydrate sizes from the database immediately so that Size()/Remaining()/Remove()
// are accurate from the start, without waiting for the first sync() call.
if attachmentsWithSizes != nil {
attachments, err := attachmentsWithSizes()
if err != nil {
return nil, fmt.Errorf("attachment store: failed to load existing attachments: %w", err)
}
for id, size := range attachments {
c.sizes[id] = size
c.size += size
}
go c.syncLoop()
}
return c, nil
@@ -76,7 +82,7 @@ func newStore(backend backend, totalSizeLimit int64, localIDs func() ([]string,
// from the client's Content-Length header; backends may use it to optimize uploads (e.g.
// streaming directly to S3 without buffering).
func (c *Store) Write(id string, reader io.Reader, untrustedLength int64, limiters ...util.Limiter) (int64, error) {
if !fileIDRegex.MatchString(id) {
if !model.ValidMessageID(id) {
return 0, errInvalidFileID
}
log.Tag(tagStore).Field("message_id", id).Debug("Writing attachment")
@@ -97,7 +103,7 @@ func (c *Store) Write(id string, reader io.Reader, untrustedLength int64, limite
// Read retrieves an attachment file by ID
func (c *Store) Read(id string) (io.ReadCloser, int64, error) {
if !fileIDRegex.MatchString(id) {
if !model.ValidMessageID(id) {
return nil, 0, errInvalidFileID
}
return c.backend.Get(id)
@@ -108,7 +114,7 @@ func (c *Store) Read(id string) (io.ReadCloser, int64, error) {
// started and before the first sync) are corrected by the next sync() call.
func (c *Store) Remove(ids ...string) error {
for _, id := range ids {
if !fileIDRegex.MatchString(id) {
if !model.ValidMessageID(id) {
return errInvalidFileID
}
}
@@ -136,18 +142,14 @@ func (c *Store) Remove(ids ...string) error {
// sync reconciles the backend storage with the database. It lists all objects,
// deletes orphans (not in the valid ID set and older than 1 hour), and recomputes
// the total size from the remaining objects.
// the total size from the existing attachments in the database.
func (c *Store) sync() error {
if c.localIDs == nil {
if c.attachmentsWithSizes == nil {
return nil
}
localIDs, err := c.localIDs()
attachmentsWithSizes, err := c.attachmentsWithSizes()
if err != nil {
return fmt.Errorf("attachment sync: failed to get valid IDs: %w", err)
}
localIDMap := make(map[string]struct{}, len(localIDs))
for _, id := range localIDs {
localIDMap[id] = struct{}{}
return fmt.Errorf("attachment sync: failed to get existing attachments: %w", err)
}
remoteObjects, err := c.backend.List()
if err != nil {
@@ -157,23 +159,23 @@ func (c *Store) sync() error {
// than the grace period to account for races, and skipping objects with invalid IDs.
cutoff := time.Now().Add(-orphanGracePeriod)
var orphanIDs []string
var count, size int64
var count, totalSize int64
sizes := make(map[string]int64, len(remoteObjects))
for _, obj := range remoteObjects {
if !fileIDRegex.MatchString(obj.ID) {
if !model.ValidMessageID(obj.ID) {
continue
}
if _, ok := localIDMap[obj.ID]; !ok && obj.LastModified.Before(cutoff) {
if _, ok := attachmentsWithSizes[obj.ID]; !ok && obj.LastModified.Before(cutoff) {
orphanIDs = append(orphanIDs, obj.ID)
} else {
count++
size += obj.Size
sizes[obj.ID] = obj.Size
totalSize += attachmentsWithSizes[obj.ID]
sizes[obj.ID] = attachmentsWithSizes[obj.ID]
}
}
log.Tag(tagStore).Debug("Attachment store updated: %d attachment(s), %s", count, util.FormatSizeHuman(size))
log.Tag(tagStore).Debug("Attachment store updated: %d attachment(s), %s", count, util.FormatSizeHuman(totalSize))
c.mu.Lock()
c.size = size
c.size = totalSize
c.sizes = sizes
c.mu.Unlock()
// Delete orphaned attachments

View File

@@ -162,9 +162,9 @@ func TestStore_SyncRecomputesSize(t *testing.T) {
s.mu.Unlock()
require.Equal(t, int64(999), s.Size())
// Set localIDs to include both files so nothing gets deleted
s.localIDs = func() ([]string, error) {
return []string{"abcdefghijk0", "abcdefghijk1"}, nil
// Set attachmentsWithSizes to include both files so nothing gets deleted
s.attachmentsWithSizes = func() (map[string]int64, error) {
return map[string]int64{"abcdefghijk0": 100, "abcdefghijk1": 200}, nil
}
// Sync should recompute size from the backend
@@ -280,8 +280,8 @@ func TestStore_Sync(t *testing.T) {
require.Equal(t, int64(15), s.Size())
// Set the ID provider to only know about file 0 and 2
s.localIDs = func() ([]string, error) {
return []string{"abcdefghijk0", "abcdefghijk2"}, nil
s.attachmentsWithSizes = func() (map[string]int64, error) {
return map[string]int64{"abcdefghijk0": 5, "abcdefghijk2": 5}, nil
}
// Make file 1 old enough to be cleaned up
@@ -314,8 +314,8 @@ func TestStore_Sync_SkipsRecentFiles(t *testing.T) {
require.Nil(t, err)
// Set the ID provider to return empty (no valid IDs)
s.localIDs = func() ([]string, error) {
return []string{}, nil
s.attachmentsWithSizes = func() (map[string]int64, error) {
return map[string]int64{}, nil
}
// File was just created, so it should NOT be deleted (< 1 hour old)

View File

@@ -12,7 +12,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
Please check out the release notes for [upcoming releases](#not-released-yet) below.
### ntfy server v2.19.2
## ntfy server v2.19.2
Released March 16, 2026
This is another small bugfix release for PostgreSQL, avoiding races between primary and read replica, as well as to
@@ -1800,6 +1800,19 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
### ntfy server v2.20.x (UNRELEASED)
This release is another step towards making it possible to help scale ntfy up and out 🔥! With this release, you can store
attachments in an S3-compatible object store as an alterative to the directory. See [attachment store](config.md#attachments)
for details.
!!! warning
With this release, ntfy will take full control over the attachment directory or S3 bucket. Files/objects in the configured `attachment-cache-dir`
that match the message ID format (12 chars, matching `^[A-Za-z0-9]{12}$`), and have no entries in the message database will be deleted.
**Do not use a directory or S3 bucket as `attachment-cache-dir` that is also used for something else.**
This is a small behavioral change that was necessary because the old logic often left attachments behind and would not clean them
up. Unless you have re-used the attachment directory for anything else (which is hopefully never done), this should not affect
you at all.
**Features:**
* Add S3-compatible object storage as an alternative [attachment store](config.md#attachments) via `attachment-cache-dir` config option

View File

@@ -43,10 +43,10 @@ type queries struct {
selectAttachmentsExpired string
selectAttachmentsSizeBySender string
selectAttachmentsSizeByUserID string
selectAttachmentsWithSizes string
selectStats string
updateStats string
updateMessageTime string
selectAttachmentIDs string
}
// Cache stores published messages
@@ -246,7 +246,7 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) {
return readMessages(rows)
}
// MessagesExpired returns a list of IDs for messages that have expired (should be deleted)
// MessagesExpired returns a list of message IDs that have expired and should be deleted
func (c *Cache) MessagesExpired() ([]string, error) {
rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix())
if err != nil {
@@ -262,10 +262,10 @@ func (c *Cache) Message(id string) (*model.Message, error) {
if err != nil {
return nil, err
}
defer rows.Close()
if !rows.Next() {
return nil, model.ErrMessageNotFound
}
defer rows.Close()
return readMessage(rows)
}
@@ -363,16 +363,6 @@ func (c *Cache) ExpireMessages(topics ...string) error {
})
}
// AttachmentIDs returns message IDs with active (non-expired, non-deleted) attachments
func (c *Cache) AttachmentIDs() ([]string, error) {
rows, err := c.db.ReadOnly().Query(c.queries.selectAttachmentIDs, time.Now().Unix())
if err != nil {
return nil, err
}
defer rows.Close()
return readStrings(rows)
}
// AttachmentsExpired returns message IDs with expired attachments that have not been deleted
func (c *Cache) AttachmentsExpired() ([]string, error) {
rows, err := c.db.Query(c.queries.selectAttachmentsExpired, time.Now().Unix())
@@ -415,6 +405,30 @@ func (c *Cache) AttachmentBytesUsedByUser(userID string) (int64, error) {
return c.readAttachmentBytesUsed(rows)
}
// AttachmentsWithSizes returns a map of message ID to attachment size for all active
// (non-expired, non-deleted) attachments. This is used to hydrate the attachment store's
// size tracking on startup and during periodic sync.
func (c *Cache) AttachmentsWithSizes() (map[string]int64, error) {
rows, err := c.db.ReadOnly().Query(c.queries.selectAttachmentsWithSizes, time.Now().Unix())
if err != nil {
return nil, err
}
defer rows.Close()
attachments := make(map[string]int64)
for rows.Next() {
var id string
var size int64
if err := rows.Scan(&id, &size); err != nil {
return nil, err
}
attachments[id] = size
}
if err := rows.Err(); err != nil {
return nil, err
}
return attachments, nil
}
func (c *Cache) readAttachmentBytesUsed(rows *sql.Rows) (int64, error) {
defer rows.Close()
var size int64

View File

@@ -70,12 +70,11 @@ const (
postgresSelectAttachmentsExpiredQuery = `SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE`
postgresSelectAttachmentsSizeBySenderQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = '' AND sender = $1 AND attachment_expires >= $2`
postgresSelectAttachmentsSizeByUserIDQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = $1 AND attachment_expires >= $2`
postgresSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE`
postgresSelectStatsQuery = `SELECT value FROM message_stats WHERE key = 'messages'`
postgresUpdateStatsQuery = `UPDATE message_stats SET value = $1 WHERE key = 'messages'`
postgresUpdateMessageTimeQuery = `UPDATE message SET time = $1 WHERE mid = $2`
postgresSelectAttachmentIDsQuery = `SELECT mid FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE`
)
var postgresQueries = queries{
@@ -99,10 +98,10 @@ var postgresQueries = queries{
selectAttachmentsExpired: postgresSelectAttachmentsExpiredQuery,
selectAttachmentsSizeBySender: postgresSelectAttachmentsSizeBySenderQuery,
selectAttachmentsSizeByUserID: postgresSelectAttachmentsSizeByUserIDQuery,
selectAttachmentsWithSizes: postgresSelectAttachmentsWithSizesQuery,
selectStats: postgresSelectStatsQuery,
updateStats: postgresUpdateStatsQuery,
updateMessageTime: postgresUpdateMessageTimeQuery,
selectAttachmentIDs: postgresSelectAttachmentIDsQuery,
}
// NewPostgresStore creates a new PostgreSQL-backed message cache store using an existing database connection pool.

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"heckel.io/ntfy/v2/db"
"heckel.io/ntfy/v2/log"
)
// Initial PostgreSQL schema
@@ -41,6 +42,7 @@ const (
CREATE INDEX IF NOT EXISTS idx_message_sequence_id ON message (sequence_id);
CREATE INDEX IF NOT EXISTS idx_message_topic_published_time ON message (topic, published, time, id);
CREATE INDEX IF NOT EXISTS idx_message_published_expires ON message (published, expires);
CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE;
CREATE INDEX IF NOT EXISTS idx_message_sender_attachment_expires ON message (sender, attachment_expires) WHERE user_id = '';
CREATE INDEX IF NOT EXISTS idx_message_user_id_attachment_expires ON message (user_id, attachment_expires);
CREATE TABLE IF NOT EXISTS message_stats (
@@ -57,21 +59,57 @@ const (
// PostgreSQL schema management queries
const (
postgresCurrentSchemaVersion = 14
postgresCurrentSchemaVersion = 15
postgresInsertSchemaVersionQuery = `INSERT INTO schema_version (store, version) VALUES ('message', $1)`
postgresUpdateSchemaVersionQuery = `UPDATE schema_version SET version = $1 WHERE store = 'message'`
postgresSelectSchemaVersionQuery = `SELECT version FROM schema_version WHERE store = 'message'`
)
func setupPostgres(db *sql.DB) error {
// PostgreSQL schema migrations
const (
// 14 -> 15
postgresMigrate14To15CreateIndexQuery = `
CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE;
`
)
var postgresMigrations = map[int]func(db *sql.DB) error{
14: postgresMigrateFrom14,
}
func setupPostgres(sqlDB *sql.DB) error {
var schemaVersion int
if err := db.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
return setupNewPostgresDB(db)
if err := sqlDB.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
return setupNewPostgresDB(sqlDB)
} else if schemaVersion == postgresCurrentSchemaVersion {
return nil
} else if schemaVersion > postgresCurrentSchemaVersion {
return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, postgresCurrentSchemaVersion)
}
for i := schemaVersion; i < postgresCurrentSchemaVersion; i++ {
fn, ok := postgresMigrations[i]
if !ok {
return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
} else if err := fn(sqlDB); err != nil {
return err
}
}
return nil
}
func postgresMigrateFrom14(sqlDB *sql.DB) error {
log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15")
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil {
return err
}
if _, err := tx.Exec(postgresUpdateSchemaVersionQuery, 15); err != nil {
return err
}
return nil
})
}
func setupNewPostgresDB(sqlDB *sql.DB) error {
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if _, err := tx.Exec(postgresCreateTablesQuery); err != nil {

View File

@@ -73,12 +73,11 @@ const (
sqliteSelectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0`
sqliteSelectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = '' AND sender = ? AND attachment_expires >= ?`
sqliteSelectAttachmentsSizeByUserIDQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?`
sqliteSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0`
sqliteSelectStatsQuery = `SELECT value FROM stats WHERE key = 'messages'`
sqliteUpdateStatsQuery = `UPDATE stats SET value = ? WHERE key = 'messages'`
sqliteUpdateMessageTimeQuery = `UPDATE messages SET time = ? WHERE mid = ?`
sqliteSelectAttachmentIDsQuery = `SELECT mid FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0`
)
var sqliteQueries = queries{
@@ -102,10 +101,10 @@ var sqliteQueries = queries{
selectAttachmentsExpired: sqliteSelectAttachmentsExpiredQuery,
selectAttachmentsSizeBySender: sqliteSelectAttachmentsSizeBySenderQuery,
selectAttachmentsSizeByUserID: sqliteSelectAttachmentsSizeByUserIDQuery,
selectAttachmentsWithSizes: sqliteSelectAttachmentsWithSizesQuery,
selectStats: sqliteSelectStatsQuery,
updateStats: sqliteUpdateStatsQuery,
updateMessageTime: sqliteUpdateMessageTimeQuery,
selectAttachmentIDs: sqliteSelectAttachmentIDsQuery,
}
// NewSQLiteStore creates a SQLite file-backed cache

View File

@@ -57,7 +57,7 @@ const (
// Schema version management for SQLite
const (
sqliteCurrentSchemaVersion = 14
sqliteCurrentSchemaVersion = 15
sqliteCreateSchemaVersionTableQuery = `
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
@@ -208,6 +208,7 @@ var (
11: sqliteMigrateFrom11,
12: sqliteMigrateFrom12,
13: sqliteMigrateFrom13,
14: sqliteMigrateFrom14,
}
)
@@ -451,3 +452,15 @@ func sqliteMigrateFrom13(sqlDB *sql.DB, _ time.Duration) error {
return nil
})
}
// sqliteMigrateFrom14 is a no-op; the corresponding Postgres migration adds
// idx_message_attachment_expires, which SQLite already has from the initial schema.
func sqliteMigrateFrom14(sqlDB *sql.DB, _ time.Duration) error {
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 14 to 15")
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 15); err != nil {
return err
}
return nil
})
}

View File

@@ -19,8 +19,8 @@ const (
PollRequestEvent = "poll_request"
)
// MessageIDLength is the length of a randomly generated message ID
const MessageIDLength = 12
// messageIDLength is the length of a randomly generated message ID
const messageIDLength = 12
// Errors for message operations
var (
@@ -133,10 +133,20 @@ func NewAction() *Action {
}
}
// GenerateMessageID creates a new random message ID
func GenerateMessageID() string {
return util.RandomString(messageIDLength)
}
// ValidMessageID returns true if the given string is a valid message ID
func ValidMessageID(s string) bool {
return util.ValidRandomString(s, messageIDLength)
}
// NewMessage creates a new message with the current timestamp
func NewMessage(event, topic, msg string) *Message {
return &Message{
ID: util.RandomString(MessageIDLength),
ID: GenerateMessageID(),
Time: time.Now().Unix(),
Event: event,
Topic: topic,
@@ -173,11 +183,6 @@ func NewPollRequestMessage(topic, pollID string) *Message {
return m
}
// ValidMessageID returns true if the given string is a valid message ID
func ValidMessageID(s string) bool {
return util.ValidRandomString(s, MessageIDLength)
}
// SinceMarker represents a point in time or message ID from which to retrieve messages
type SinceMarker struct {
time time.Time

View File

@@ -301,13 +301,10 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) {
}
func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) {
attachmentIDs := func() ([]string, error) {
return messageCache.AttachmentIDs()
}
if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") {
return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, attachmentIDs)
return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes)
} else if conf.AttachmentCacheDir != "" {
return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, attachmentIDs)
return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes)
}
return nil, nil
}
@@ -1429,6 +1426,9 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *model.Me
return err
}
attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
if m.Expires > 0 && attachmentExpiry > m.Expires {
attachmentExpiry = m.Expires // Attachment must never outlive the message
}
if m.Time > attachmentExpiry {
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
}

View File

@@ -3,7 +3,6 @@ package server
import (
"heckel.io/ntfy/v2/log"
"heckel.io/ntfy/v2/util"
"strings"
)
func (s *Server) execManager() {
@@ -151,11 +150,11 @@ func (s *Server) pruneAttachments() {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
} else if len(ids) > 0 {
if log.Tag(tagManager).IsDebug() {
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
}
if err := s.attachment.Remove(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
log.Tag(tagManager).Debug("Marking %d expired attachment(s) as deleted", len(ids))
}
// Only mark as deleted in DB. The actual storage files are cleaned up
// by the attachment store's sync() loop, which periodically reconciles
// storage with the database and removes orphaned files.
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
@@ -174,13 +173,11 @@ func (s *Server) pruneMessages() {
if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
} else if len(expiredMessageIDs) > 0 {
if s.attachment != nil {
if err := s.attachment.Remove(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
}
}
// Only delete DB rows. Attachment storage files are cleaned up by the
// attachment store's sync() loop, which periodically reconciles storage
// with the database and removes orphaned files.
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
log.Tag(tagManager).Err(err).Warn("Error deleting expired messages")
}
} else {
log.Tag(tagManager).Debug("No expired messages to delete")

18
web/package-lock.json generated
View File

@@ -9514,24 +9514,6 @@
"dev": true,
"license": "ISC"
},
"node_modules/yaml": {
"version": "2.8.3",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.3.tgz",
"integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==",
"dev": true,
"license": "ISC",
"optional": true,
"peer": true,
"bin": {
"yaml": "bin.mjs"
},
"engines": {
"node": ">= 14.6"
},
"funding": {
"url": "https://github.com/sponsors/eemeli"
}
},
"node_modules/yocto-queue": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",