mirror of
https://github.com/binwiederhier/ntfy.git
synced 2026-03-25 08:41:01 +01:00
Compare commits
6 Commits
attachment
...
attachment
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e55d1cee6b | ||
|
|
dd760f16f7 | ||
|
|
d00277107a | ||
|
|
b95efe8dd3 | ||
|
|
075f2ffa15 | ||
|
|
2f6a044c34 |
@@ -4,7 +4,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -20,52 +19,59 @@ const (
|
||||
orphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads
|
||||
)
|
||||
|
||||
var (
|
||||
fileIDRegex = regexp.MustCompile(fmt.Sprintf(`^[-_A-Za-z0-9]{%d}$`, model.MessageIDLength))
|
||||
errInvalidFileID = errors.New("invalid file ID")
|
||||
)
|
||||
var errInvalidFileID = errors.New("invalid file ID")
|
||||
|
||||
// Store manages attachment storage with shared logic for size tracking, limiting,
|
||||
// ID validation, and background sync to reconcile storage with the database.
|
||||
type Store struct {
|
||||
backend backend
|
||||
limit int64 // Defined limit of the store in bytes
|
||||
size int64 // Current size of the store in bytes
|
||||
sizes map[string]int64 // File ID -> size, for subtracting on Remove
|
||||
localIDs func() ([]string, error) // Returns file IDs that should exist locally, used for sync()
|
||||
closeChan chan struct{}
|
||||
mu sync.RWMutex // Protects size and sizes
|
||||
backend backend
|
||||
limit int64 // Defined limit of the store in bytes
|
||||
size int64 // Current size of the store in bytes
|
||||
sizes map[string]int64 // File ID -> size, for subtracting on Remove
|
||||
attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments
|
||||
closeChan chan struct{}
|
||||
mu sync.RWMutex // Protects size and sizes
|
||||
}
|
||||
|
||||
// NewFileStore creates a new file-system backed attachment cache
|
||||
func NewFileStore(dir string, totalSizeLimit int64, localIDsFn func() ([]string, error)) (*Store, error) {
|
||||
func NewFileStore(dir string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
|
||||
b, err := newFileBackend(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStore(b, totalSizeLimit, localIDsFn)
|
||||
return newStore(b, totalSizeLimit, attachmentsWithSizes)
|
||||
}
|
||||
|
||||
// NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format:
|
||||
//
|
||||
// s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT]
|
||||
func NewS3Store(s3URL string, totalSizeLimit int64, localIDs func() ([]string, error)) (*Store, error) {
|
||||
func NewS3Store(s3URL string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
|
||||
config, err := s3.ParseURL(s3URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newStore(newS3Backend(s3.New(config)), totalSizeLimit, localIDs)
|
||||
return newStore(newS3Backend(s3.New(config)), totalSizeLimit, attachmentsWithSizes)
|
||||
}
|
||||
|
||||
func newStore(backend backend, totalSizeLimit int64, localIDs func() ([]string, error)) (*Store, error) {
|
||||
func newStore(backend backend, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
|
||||
c := &Store{
|
||||
backend: backend,
|
||||
limit: totalSizeLimit,
|
||||
sizes: make(map[string]int64),
|
||||
localIDs: localIDs,
|
||||
closeChan: make(chan struct{}),
|
||||
backend: backend,
|
||||
limit: totalSizeLimit,
|
||||
sizes: make(map[string]int64),
|
||||
attachmentsWithSizes: attachmentsWithSizes,
|
||||
closeChan: make(chan struct{}),
|
||||
}
|
||||
if localIDs != nil {
|
||||
// Hydrate sizes from the database immediately so that Size()/Remaining()/Remove()
|
||||
// are accurate from the start, without waiting for the first sync() call.
|
||||
if attachmentsWithSizes != nil {
|
||||
attachments, err := attachmentsWithSizes()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("attachment store: failed to load existing attachments: %w", err)
|
||||
}
|
||||
for id, size := range attachments {
|
||||
c.sizes[id] = size
|
||||
c.size += size
|
||||
}
|
||||
go c.syncLoop()
|
||||
}
|
||||
return c, nil
|
||||
@@ -76,7 +82,7 @@ func newStore(backend backend, totalSizeLimit int64, localIDs func() ([]string,
|
||||
// from the client's Content-Length header; backends may use it to optimize uploads (e.g.
|
||||
// streaming directly to S3 without buffering).
|
||||
func (c *Store) Write(id string, reader io.Reader, untrustedLength int64, limiters ...util.Limiter) (int64, error) {
|
||||
if !fileIDRegex.MatchString(id) {
|
||||
if !model.ValidMessageID(id) {
|
||||
return 0, errInvalidFileID
|
||||
}
|
||||
log.Tag(tagStore).Field("message_id", id).Debug("Writing attachment")
|
||||
@@ -97,7 +103,7 @@ func (c *Store) Write(id string, reader io.Reader, untrustedLength int64, limite
|
||||
|
||||
// Read retrieves an attachment file by ID
|
||||
func (c *Store) Read(id string) (io.ReadCloser, int64, error) {
|
||||
if !fileIDRegex.MatchString(id) {
|
||||
if !model.ValidMessageID(id) {
|
||||
return nil, 0, errInvalidFileID
|
||||
}
|
||||
return c.backend.Get(id)
|
||||
@@ -108,7 +114,7 @@ func (c *Store) Read(id string) (io.ReadCloser, int64, error) {
|
||||
// started and before the first sync) are corrected by the next sync() call.
|
||||
func (c *Store) Remove(ids ...string) error {
|
||||
for _, id := range ids {
|
||||
if !fileIDRegex.MatchString(id) {
|
||||
if !model.ValidMessageID(id) {
|
||||
return errInvalidFileID
|
||||
}
|
||||
}
|
||||
@@ -136,18 +142,14 @@ func (c *Store) Remove(ids ...string) error {
|
||||
|
||||
// sync reconciles the backend storage with the database. It lists all objects,
|
||||
// deletes orphans (not in the valid ID set and older than 1 hour), and recomputes
|
||||
// the total size from the remaining objects.
|
||||
// the total size from the existing attachments in the database.
|
||||
func (c *Store) sync() error {
|
||||
if c.localIDs == nil {
|
||||
if c.attachmentsWithSizes == nil {
|
||||
return nil
|
||||
}
|
||||
localIDs, err := c.localIDs()
|
||||
attachmentsWithSizes, err := c.attachmentsWithSizes()
|
||||
if err != nil {
|
||||
return fmt.Errorf("attachment sync: failed to get valid IDs: %w", err)
|
||||
}
|
||||
localIDMap := make(map[string]struct{}, len(localIDs))
|
||||
for _, id := range localIDs {
|
||||
localIDMap[id] = struct{}{}
|
||||
return fmt.Errorf("attachment sync: failed to get existing attachments: %w", err)
|
||||
}
|
||||
remoteObjects, err := c.backend.List()
|
||||
if err != nil {
|
||||
@@ -157,23 +159,23 @@ func (c *Store) sync() error {
|
||||
// than the grace period to account for races, and skipping objects with invalid IDs.
|
||||
cutoff := time.Now().Add(-orphanGracePeriod)
|
||||
var orphanIDs []string
|
||||
var count, size int64
|
||||
var count, totalSize int64
|
||||
sizes := make(map[string]int64, len(remoteObjects))
|
||||
for _, obj := range remoteObjects {
|
||||
if !fileIDRegex.MatchString(obj.ID) {
|
||||
if !model.ValidMessageID(obj.ID) {
|
||||
continue
|
||||
}
|
||||
if _, ok := localIDMap[obj.ID]; !ok && obj.LastModified.Before(cutoff) {
|
||||
if _, ok := attachmentsWithSizes[obj.ID]; !ok && obj.LastModified.Before(cutoff) {
|
||||
orphanIDs = append(orphanIDs, obj.ID)
|
||||
} else {
|
||||
count++
|
||||
size += obj.Size
|
||||
sizes[obj.ID] = obj.Size
|
||||
totalSize += attachmentsWithSizes[obj.ID]
|
||||
sizes[obj.ID] = attachmentsWithSizes[obj.ID]
|
||||
}
|
||||
}
|
||||
log.Tag(tagStore).Debug("Attachment store updated: %d attachment(s), %s", count, util.FormatSizeHuman(size))
|
||||
log.Tag(tagStore).Debug("Attachment store updated: %d attachment(s), %s", count, util.FormatSizeHuman(totalSize))
|
||||
c.mu.Lock()
|
||||
c.size = size
|
||||
c.size = totalSize
|
||||
c.sizes = sizes
|
||||
c.mu.Unlock()
|
||||
// Delete orphaned attachments
|
||||
|
||||
@@ -162,9 +162,9 @@ func TestStore_SyncRecomputesSize(t *testing.T) {
|
||||
s.mu.Unlock()
|
||||
require.Equal(t, int64(999), s.Size())
|
||||
|
||||
// Set localIDs to include both files so nothing gets deleted
|
||||
s.localIDs = func() ([]string, error) {
|
||||
return []string{"abcdefghijk0", "abcdefghijk1"}, nil
|
||||
// Set attachmentsWithSizes to include both files so nothing gets deleted
|
||||
s.attachmentsWithSizes = func() (map[string]int64, error) {
|
||||
return map[string]int64{"abcdefghijk0": 100, "abcdefghijk1": 200}, nil
|
||||
}
|
||||
|
||||
// Sync should recompute size from the backend
|
||||
@@ -280,8 +280,8 @@ func TestStore_Sync(t *testing.T) {
|
||||
require.Equal(t, int64(15), s.Size())
|
||||
|
||||
// Set the ID provider to only know about file 0 and 2
|
||||
s.localIDs = func() ([]string, error) {
|
||||
return []string{"abcdefghijk0", "abcdefghijk2"}, nil
|
||||
s.attachmentsWithSizes = func() (map[string]int64, error) {
|
||||
return map[string]int64{"abcdefghijk0": 5, "abcdefghijk2": 5}, nil
|
||||
}
|
||||
|
||||
// Make file 1 old enough to be cleaned up
|
||||
@@ -314,8 +314,8 @@ func TestStore_Sync_SkipsRecentFiles(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
|
||||
// Set the ID provider to return empty (no valid IDs)
|
||||
s.localIDs = func() ([]string, error) {
|
||||
return []string{}, nil
|
||||
s.attachmentsWithSizes = func() (map[string]int64, error) {
|
||||
return map[string]int64{}, nil
|
||||
}
|
||||
|
||||
// File was just created, so it should NOT be deleted (< 1 hour old)
|
||||
|
||||
@@ -12,7 +12,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
|
||||
|
||||
Please check out the release notes for [upcoming releases](#not-released-yet) below.
|
||||
|
||||
### ntfy server v2.19.2
|
||||
## ntfy server v2.19.2
|
||||
Released March 16, 2026
|
||||
|
||||
This is another small bugfix release for PostgreSQL, avoiding races between primary and read replica, as well as to
|
||||
@@ -1800,6 +1800,19 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release
|
||||
|
||||
### ntfy server v2.20.x (UNRELEASED)
|
||||
|
||||
This release is another step towards making it possible to help scale ntfy up and out 🔥! With this release, you can store
|
||||
attachments in an S3-compatible object store as an alterative to the directory. See [attachment store](config.md#attachments)
|
||||
for details.
|
||||
|
||||
!!! warning
|
||||
With this release, ntfy will take full control over the attachment directory or S3 bucket. Files/objects in the configured `attachment-cache-dir`
|
||||
that match the message ID format (12 chars, matching `^[A-Za-z0-9]{12}$`), and have no entries in the message database will be deleted.
|
||||
**Do not use a directory or S3 bucket as `attachment-cache-dir` that is also used for something else.**
|
||||
|
||||
This is a small behavioral change that was necessary because the old logic often left attachments behind and would not clean them
|
||||
up. Unless you have re-used the attachment directory for anything else (which is hopefully never done), this should not affect
|
||||
you at all.
|
||||
|
||||
**Features:**
|
||||
|
||||
* Add S3-compatible object storage as an alternative [attachment store](config.md#attachments) via `attachment-cache-dir` config option
|
||||
|
||||
@@ -43,10 +43,10 @@ type queries struct {
|
||||
selectAttachmentsExpired string
|
||||
selectAttachmentsSizeBySender string
|
||||
selectAttachmentsSizeByUserID string
|
||||
selectAttachmentsWithSizes string
|
||||
selectStats string
|
||||
updateStats string
|
||||
updateMessageTime string
|
||||
selectAttachmentIDs string
|
||||
}
|
||||
|
||||
// Cache stores published messages
|
||||
@@ -246,7 +246,7 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) {
|
||||
return readMessages(rows)
|
||||
}
|
||||
|
||||
// MessagesExpired returns a list of IDs for messages that have expired (should be deleted)
|
||||
// MessagesExpired returns a list of message IDs that have expired and should be deleted
|
||||
func (c *Cache) MessagesExpired() ([]string, error) {
|
||||
rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix())
|
||||
if err != nil {
|
||||
@@ -262,10 +262,10 @@ func (c *Cache) Message(id string) (*model.Message, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
if !rows.Next() {
|
||||
return nil, model.ErrMessageNotFound
|
||||
}
|
||||
defer rows.Close()
|
||||
return readMessage(rows)
|
||||
}
|
||||
|
||||
@@ -363,16 +363,6 @@ func (c *Cache) ExpireMessages(topics ...string) error {
|
||||
})
|
||||
}
|
||||
|
||||
// AttachmentIDs returns message IDs with active (non-expired, non-deleted) attachments
|
||||
func (c *Cache) AttachmentIDs() ([]string, error) {
|
||||
rows, err := c.db.ReadOnly().Query(c.queries.selectAttachmentIDs, time.Now().Unix())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
return readStrings(rows)
|
||||
}
|
||||
|
||||
// AttachmentsExpired returns message IDs with expired attachments that have not been deleted
|
||||
func (c *Cache) AttachmentsExpired() ([]string, error) {
|
||||
rows, err := c.db.Query(c.queries.selectAttachmentsExpired, time.Now().Unix())
|
||||
@@ -415,6 +405,30 @@ func (c *Cache) AttachmentBytesUsedByUser(userID string) (int64, error) {
|
||||
return c.readAttachmentBytesUsed(rows)
|
||||
}
|
||||
|
||||
// AttachmentsWithSizes returns a map of message ID to attachment size for all active
|
||||
// (non-expired, non-deleted) attachments. This is used to hydrate the attachment store's
|
||||
// size tracking on startup and during periodic sync.
|
||||
func (c *Cache) AttachmentsWithSizes() (map[string]int64, error) {
|
||||
rows, err := c.db.ReadOnly().Query(c.queries.selectAttachmentsWithSizes, time.Now().Unix())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
attachments := make(map[string]int64)
|
||||
for rows.Next() {
|
||||
var id string
|
||||
var size int64
|
||||
if err := rows.Scan(&id, &size); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
attachments[id] = size
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return attachments, nil
|
||||
}
|
||||
|
||||
func (c *Cache) readAttachmentBytesUsed(rows *sql.Rows) (int64, error) {
|
||||
defer rows.Close()
|
||||
var size int64
|
||||
|
||||
@@ -70,12 +70,11 @@ const (
|
||||
postgresSelectAttachmentsExpiredQuery = `SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE`
|
||||
postgresSelectAttachmentsSizeBySenderQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = '' AND sender = $1 AND attachment_expires >= $2`
|
||||
postgresSelectAttachmentsSizeByUserIDQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = $1 AND attachment_expires >= $2`
|
||||
postgresSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE`
|
||||
|
||||
postgresSelectStatsQuery = `SELECT value FROM message_stats WHERE key = 'messages'`
|
||||
postgresUpdateStatsQuery = `UPDATE message_stats SET value = $1 WHERE key = 'messages'`
|
||||
postgresUpdateMessageTimeQuery = `UPDATE message SET time = $1 WHERE mid = $2`
|
||||
|
||||
postgresSelectAttachmentIDsQuery = `SELECT mid FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE`
|
||||
)
|
||||
|
||||
var postgresQueries = queries{
|
||||
@@ -99,10 +98,10 @@ var postgresQueries = queries{
|
||||
selectAttachmentsExpired: postgresSelectAttachmentsExpiredQuery,
|
||||
selectAttachmentsSizeBySender: postgresSelectAttachmentsSizeBySenderQuery,
|
||||
selectAttachmentsSizeByUserID: postgresSelectAttachmentsSizeByUserIDQuery,
|
||||
selectAttachmentsWithSizes: postgresSelectAttachmentsWithSizesQuery,
|
||||
selectStats: postgresSelectStatsQuery,
|
||||
updateStats: postgresUpdateStatsQuery,
|
||||
updateMessageTime: postgresUpdateMessageTimeQuery,
|
||||
selectAttachmentIDs: postgresSelectAttachmentIDsQuery,
|
||||
}
|
||||
|
||||
// NewPostgresStore creates a new PostgreSQL-backed message cache store using an existing database connection pool.
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"heckel.io/ntfy/v2/db"
|
||||
"heckel.io/ntfy/v2/log"
|
||||
)
|
||||
|
||||
// Initial PostgreSQL schema
|
||||
@@ -41,6 +42,7 @@ const (
|
||||
CREATE INDEX IF NOT EXISTS idx_message_sequence_id ON message (sequence_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_message_topic_published_time ON message (topic, published, time, id);
|
||||
CREATE INDEX IF NOT EXISTS idx_message_published_expires ON message (published, expires);
|
||||
CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE;
|
||||
CREATE INDEX IF NOT EXISTS idx_message_sender_attachment_expires ON message (sender, attachment_expires) WHERE user_id = '';
|
||||
CREATE INDEX IF NOT EXISTS idx_message_user_id_attachment_expires ON message (user_id, attachment_expires);
|
||||
CREATE TABLE IF NOT EXISTS message_stats (
|
||||
@@ -57,21 +59,57 @@ const (
|
||||
|
||||
// PostgreSQL schema management queries
|
||||
const (
|
||||
postgresCurrentSchemaVersion = 14
|
||||
postgresCurrentSchemaVersion = 15
|
||||
postgresInsertSchemaVersionQuery = `INSERT INTO schema_version (store, version) VALUES ('message', $1)`
|
||||
postgresUpdateSchemaVersionQuery = `UPDATE schema_version SET version = $1 WHERE store = 'message'`
|
||||
postgresSelectSchemaVersionQuery = `SELECT version FROM schema_version WHERE store = 'message'`
|
||||
)
|
||||
|
||||
func setupPostgres(db *sql.DB) error {
|
||||
// PostgreSQL schema migrations
|
||||
const (
|
||||
// 14 -> 15
|
||||
postgresMigrate14To15CreateIndexQuery = `
|
||||
CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE;
|
||||
`
|
||||
)
|
||||
|
||||
var postgresMigrations = map[int]func(db *sql.DB) error{
|
||||
14: postgresMigrateFrom14,
|
||||
}
|
||||
|
||||
func setupPostgres(sqlDB *sql.DB) error {
|
||||
var schemaVersion int
|
||||
if err := db.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
|
||||
return setupNewPostgresDB(db)
|
||||
if err := sqlDB.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
|
||||
return setupNewPostgresDB(sqlDB)
|
||||
} else if schemaVersion == postgresCurrentSchemaVersion {
|
||||
return nil
|
||||
} else if schemaVersion > postgresCurrentSchemaVersion {
|
||||
return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, postgresCurrentSchemaVersion)
|
||||
}
|
||||
for i := schemaVersion; i < postgresCurrentSchemaVersion; i++ {
|
||||
fn, ok := postgresMigrations[i]
|
||||
if !ok {
|
||||
return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
|
||||
} else if err := fn(sqlDB); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func postgresMigrateFrom14(sqlDB *sql.DB) error {
|
||||
log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15")
|
||||
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(postgresUpdateSchemaVersionQuery, 15); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func setupNewPostgresDB(sqlDB *sql.DB) error {
|
||||
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(postgresCreateTablesQuery); err != nil {
|
||||
|
||||
@@ -73,12 +73,11 @@ const (
|
||||
sqliteSelectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0`
|
||||
sqliteSelectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = '' AND sender = ? AND attachment_expires >= ?`
|
||||
sqliteSelectAttachmentsSizeByUserIDQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?`
|
||||
sqliteSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0`
|
||||
|
||||
sqliteSelectStatsQuery = `SELECT value FROM stats WHERE key = 'messages'`
|
||||
sqliteUpdateStatsQuery = `UPDATE stats SET value = ? WHERE key = 'messages'`
|
||||
sqliteUpdateMessageTimeQuery = `UPDATE messages SET time = ? WHERE mid = ?`
|
||||
|
||||
sqliteSelectAttachmentIDsQuery = `SELECT mid FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0`
|
||||
)
|
||||
|
||||
var sqliteQueries = queries{
|
||||
@@ -102,10 +101,10 @@ var sqliteQueries = queries{
|
||||
selectAttachmentsExpired: sqliteSelectAttachmentsExpiredQuery,
|
||||
selectAttachmentsSizeBySender: sqliteSelectAttachmentsSizeBySenderQuery,
|
||||
selectAttachmentsSizeByUserID: sqliteSelectAttachmentsSizeByUserIDQuery,
|
||||
selectAttachmentsWithSizes: sqliteSelectAttachmentsWithSizesQuery,
|
||||
selectStats: sqliteSelectStatsQuery,
|
||||
updateStats: sqliteUpdateStatsQuery,
|
||||
updateMessageTime: sqliteUpdateMessageTimeQuery,
|
||||
selectAttachmentIDs: sqliteSelectAttachmentIDsQuery,
|
||||
}
|
||||
|
||||
// NewSQLiteStore creates a SQLite file-backed cache
|
||||
|
||||
@@ -57,7 +57,7 @@ const (
|
||||
|
||||
// Schema version management for SQLite
|
||||
const (
|
||||
sqliteCurrentSchemaVersion = 14
|
||||
sqliteCurrentSchemaVersion = 15
|
||||
sqliteCreateSchemaVersionTableQuery = `
|
||||
CREATE TABLE IF NOT EXISTS schemaVersion (
|
||||
id INT PRIMARY KEY,
|
||||
@@ -208,6 +208,7 @@ var (
|
||||
11: sqliteMigrateFrom11,
|
||||
12: sqliteMigrateFrom12,
|
||||
13: sqliteMigrateFrom13,
|
||||
14: sqliteMigrateFrom14,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -451,3 +452,15 @@ func sqliteMigrateFrom13(sqlDB *sql.DB, _ time.Duration) error {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// sqliteMigrateFrom14 is a no-op; the corresponding Postgres migration adds
|
||||
// idx_message_attachment_expires, which SQLite already has from the initial schema.
|
||||
func sqliteMigrateFrom14(sqlDB *sql.DB, _ time.Duration) error {
|
||||
log.Tag(tagMessageCache).Info("Migrating cache database schema: from 14 to 15")
|
||||
return db.ExecTx(sqlDB, func(tx *sql.Tx) error {
|
||||
if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 15); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -19,8 +19,8 @@ const (
|
||||
PollRequestEvent = "poll_request"
|
||||
)
|
||||
|
||||
// MessageIDLength is the length of a randomly generated message ID
|
||||
const MessageIDLength = 12
|
||||
// messageIDLength is the length of a randomly generated message ID
|
||||
const messageIDLength = 12
|
||||
|
||||
// Errors for message operations
|
||||
var (
|
||||
@@ -133,10 +133,20 @@ func NewAction() *Action {
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateMessageID creates a new random message ID
|
||||
func GenerateMessageID() string {
|
||||
return util.RandomString(messageIDLength)
|
||||
}
|
||||
|
||||
// ValidMessageID returns true if the given string is a valid message ID
|
||||
func ValidMessageID(s string) bool {
|
||||
return util.ValidRandomString(s, messageIDLength)
|
||||
}
|
||||
|
||||
// NewMessage creates a new message with the current timestamp
|
||||
func NewMessage(event, topic, msg string) *Message {
|
||||
return &Message{
|
||||
ID: util.RandomString(MessageIDLength),
|
||||
ID: GenerateMessageID(),
|
||||
Time: time.Now().Unix(),
|
||||
Event: event,
|
||||
Topic: topic,
|
||||
@@ -173,11 +183,6 @@ func NewPollRequestMessage(topic, pollID string) *Message {
|
||||
return m
|
||||
}
|
||||
|
||||
// ValidMessageID returns true if the given string is a valid message ID
|
||||
func ValidMessageID(s string) bool {
|
||||
return util.ValidRandomString(s, MessageIDLength)
|
||||
}
|
||||
|
||||
// SinceMarker represents a point in time or message ID from which to retrieve messages
|
||||
type SinceMarker struct {
|
||||
time time.Time
|
||||
|
||||
@@ -301,13 +301,10 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) {
|
||||
}
|
||||
|
||||
func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) {
|
||||
attachmentIDs := func() ([]string, error) {
|
||||
return messageCache.AttachmentIDs()
|
||||
}
|
||||
if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") {
|
||||
return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, attachmentIDs)
|
||||
return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes)
|
||||
} else if conf.AttachmentCacheDir != "" {
|
||||
return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, attachmentIDs)
|
||||
return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
@@ -1429,6 +1426,9 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *model.Me
|
||||
return err
|
||||
}
|
||||
attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix()
|
||||
if m.Expires > 0 && attachmentExpiry > m.Expires {
|
||||
attachmentExpiry = m.Expires // Attachment must never outlive the message
|
||||
}
|
||||
if m.Time > attachmentExpiry {
|
||||
return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package server
|
||||
import (
|
||||
"heckel.io/ntfy/v2/log"
|
||||
"heckel.io/ntfy/v2/util"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (s *Server) execManager() {
|
||||
@@ -151,11 +150,11 @@ func (s *Server) pruneAttachments() {
|
||||
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments")
|
||||
} else if len(ids) > 0 {
|
||||
if log.Tag(tagManager).IsDebug() {
|
||||
log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", "))
|
||||
}
|
||||
if err := s.attachment.Remove(ids...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting attachments")
|
||||
log.Tag(tagManager).Debug("Marking %d expired attachment(s) as deleted", len(ids))
|
||||
}
|
||||
// Only mark as deleted in DB. The actual storage files are cleaned up
|
||||
// by the attachment store's sync() loop, which periodically reconciles
|
||||
// storage with the database and removes orphaned files.
|
||||
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
|
||||
}
|
||||
@@ -174,13 +173,11 @@ func (s *Server) pruneMessages() {
|
||||
if err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages")
|
||||
} else if len(expiredMessageIDs) > 0 {
|
||||
if s.attachment != nil {
|
||||
if err := s.attachment.Remove(expiredMessageIDs...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages")
|
||||
}
|
||||
}
|
||||
// Only delete DB rows. Attachment storage files are cleaned up by the
|
||||
// attachment store's sync() loop, which periodically reconciles storage
|
||||
// with the database and removes orphaned files.
|
||||
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
|
||||
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
|
||||
log.Tag(tagManager).Err(err).Warn("Error deleting expired messages")
|
||||
}
|
||||
} else {
|
||||
log.Tag(tagManager).Debug("No expired messages to delete")
|
||||
|
||||
18
web/package-lock.json
generated
18
web/package-lock.json
generated
@@ -9514,24 +9514,6 @@
|
||||
"dev": true,
|
||||
"license": "ISC"
|
||||
},
|
||||
"node_modules/yaml": {
|
||||
"version": "2.8.3",
|
||||
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.3.tgz",
|
||||
"integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==",
|
||||
"dev": true,
|
||||
"license": "ISC",
|
||||
"optional": true,
|
||||
"peer": true,
|
||||
"bin": {
|
||||
"yaml": "bin.mjs"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 14.6"
|
||||
},
|
||||
"funding": {
|
||||
"url": "https://github.com/sponsors/eemeli"
|
||||
}
|
||||
},
|
||||
"node_modules/yocto-queue": {
|
||||
"version": "0.1.0",
|
||||
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",
|
||||
|
||||
Reference in New Issue
Block a user