diff --git a/attachment/store.go b/attachment/store.go new file mode 100644 index 00000000..c48a1e90 --- /dev/null +++ b/attachment/store.go @@ -0,0 +1,25 @@ +package attachment + +import ( + "errors" + "fmt" + "io" + "regexp" + + "heckel.io/ntfy/v2/model" + "heckel.io/ntfy/v2/util" +) + +// Store is an interface for storing and retrieving attachment files +type Store interface { + Write(id string, in io.Reader, limiters ...util.Limiter) (int64, error) + Read(id string) (io.ReadCloser, int64, error) + Remove(ids ...string) error + Size() int64 + Remaining() int64 +} + +var ( + fileIDRegex = regexp.MustCompile(fmt.Sprintf(`^[-_A-Za-z0-9]{%d}$`, model.MessageIDLength)) + errInvalidFileID = errors.New("invalid file ID") +) diff --git a/server/file_cache.go b/attachment/store_file.go similarity index 66% rename from server/file_cache.go rename to attachment/store_file.go index a1803724..b26e86f0 100644 --- a/server/file_cache.go +++ b/attachment/store_file.go @@ -1,32 +1,29 @@ -package server +package attachment import ( "errors" - "fmt" - "heckel.io/ntfy/v2/log" - "heckel.io/ntfy/v2/model" - "heckel.io/ntfy/v2/util" "io" "os" "path/filepath" - "regexp" "sync" + + "heckel.io/ntfy/v2/log" + "heckel.io/ntfy/v2/util" ) -var ( - fileIDRegex = regexp.MustCompile(fmt.Sprintf(`^[-_A-Za-z0-9]{%d}$`, model.MessageIDLength)) - errInvalidFileID = errors.New("invalid file ID") - errFileExists = errors.New("file exists") -) +const tagFileStore = "file_store" -type fileCache struct { +var errFileExists = errors.New("file exists") + +type fileStore struct { dir string totalSizeCurrent int64 totalSizeLimit int64 mu sync.Mutex } -func newFileCache(dir string, totalSizeLimit int64) (*fileCache, error) { +// NewFileStore creates a new file-system backed attachment store +func NewFileStore(dir string, totalSizeLimit int64) (Store, error) { if err := os.MkdirAll(dir, 0700); err != nil { return nil, err } @@ -34,18 +31,18 @@ func newFileCache(dir string, totalSizeLimit int64) (*fileCache, error) { if err != nil { return nil, err } - return &fileCache{ + return &fileStore{ dir: dir, totalSizeCurrent: size, totalSizeLimit: totalSizeLimit, }, nil } -func (c *fileCache) Write(id string, in io.Reader, limiters ...util.Limiter) (int64, error) { +func (c *fileStore) Write(id string, in io.Reader, limiters ...util.Limiter) (int64, error) { if !fileIDRegex.MatchString(id) { return 0, errInvalidFileID } - log.Tag(tagFileCache).Field("message_id", id).Debug("Writing attachment") + log.Tag(tagFileStore).Field("message_id", id).Debug("Writing attachment") file := filepath.Join(c.dir, id) if _, err := os.Stat(file); err == nil { return 0, errFileExists @@ -68,20 +65,35 @@ func (c *fileCache) Write(id string, in io.Reader, limiters ...util.Limiter) (in } c.mu.Lock() c.totalSizeCurrent += size - mset(metricAttachmentsTotalSize, c.totalSizeCurrent) c.mu.Unlock() return size, nil } -func (c *fileCache) Remove(ids ...string) error { +func (c *fileStore) Read(id string) (io.ReadCloser, int64, error) { + if !fileIDRegex.MatchString(id) { + return nil, 0, errInvalidFileID + } + file := filepath.Join(c.dir, id) + stat, err := os.Stat(file) + if err != nil { + return nil, 0, err + } + f, err := os.Open(file) + if err != nil { + return nil, 0, err + } + return f, stat.Size(), nil +} + +func (c *fileStore) Remove(ids ...string) error { for _, id := range ids { if !fileIDRegex.MatchString(id) { return errInvalidFileID } - log.Tag(tagFileCache).Field("message_id", id).Debug("Deleting attachment") + log.Tag(tagFileStore).Field("message_id", id).Debug("Deleting attachment") file := filepath.Join(c.dir, id) if err := os.Remove(file); err != nil { - log.Tag(tagFileCache).Field("message_id", id).Err(err).Debug("Error deleting attachment") + log.Tag(tagFileStore).Field("message_id", id).Err(err).Debug("Error deleting attachment") } } size, err := dirSize(c.dir) @@ -91,17 +103,16 @@ func (c *fileCache) Remove(ids ...string) error { c.mu.Lock() c.totalSizeCurrent = size c.mu.Unlock() - mset(metricAttachmentsTotalSize, size) return nil } -func (c *fileCache) Size() int64 { +func (c *fileStore) Size() int64 { c.mu.Lock() defer c.mu.Unlock() return c.totalSizeCurrent } -func (c *fileCache) Remaining() int64 { +func (c *fileStore) Remaining() int64 { c.mu.Lock() defer c.mu.Unlock() remaining := c.totalSizeLimit - c.totalSizeCurrent diff --git a/attachment/store_file_test.go b/attachment/store_file_test.go new file mode 100644 index 00000000..5cfe0db4 --- /dev/null +++ b/attachment/store_file_test.go @@ -0,0 +1,99 @@ +package attachment + +import ( + "bytes" + "fmt" + "io" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "heckel.io/ntfy/v2/util" +) + +var ( + oneKilobyteArray = make([]byte, 1024) +) + +func TestFileStore_Write_Success(t *testing.T) { + dir, s := newTestFileStore(t) + size, err := s.Write("abcdefghijkl", strings.NewReader("normal file"), util.NewFixedLimiter(999)) + require.Nil(t, err) + require.Equal(t, int64(11), size) + require.Equal(t, "normal file", readFile(t, dir+"/abcdefghijkl")) + require.Equal(t, int64(11), s.Size()) + require.Equal(t, int64(10229), s.Remaining()) +} + +func TestFileStore_Write_Read_Success(t *testing.T) { + _, s := newTestFileStore(t) + size, err := s.Write("abcdefghijkl", strings.NewReader("hello world")) + require.Nil(t, err) + require.Equal(t, int64(11), size) + + reader, readSize, err := s.Read("abcdefghijkl") + require.Nil(t, err) + require.Equal(t, int64(11), readSize) + defer reader.Close() + data, err := io.ReadAll(reader) + require.Nil(t, err) + require.Equal(t, "hello world", string(data)) +} + +func TestFileStore_Write_Remove_Success(t *testing.T) { + dir, s := newTestFileStore(t) // max = 10k (10240), each = 1k (1024) + for i := 0; i < 10; i++ { // 10x999 = 9990 + size, err := s.Write(fmt.Sprintf("abcdefghijk%d", i), bytes.NewReader(make([]byte, 999))) + require.Nil(t, err) + require.Equal(t, int64(999), size) + } + require.Equal(t, int64(9990), s.Size()) + require.Equal(t, int64(250), s.Remaining()) + require.FileExists(t, dir+"/abcdefghijk1") + require.FileExists(t, dir+"/abcdefghijk5") + + require.Nil(t, s.Remove("abcdefghijk1", "abcdefghijk5")) + require.NoFileExists(t, dir+"/abcdefghijk1") + require.NoFileExists(t, dir+"/abcdefghijk5") + require.Equal(t, int64(7992), s.Size()) + require.Equal(t, int64(2248), s.Remaining()) +} + +func TestFileStore_Write_FailedTotalSizeLimit(t *testing.T) { + dir, s := newTestFileStore(t) + for i := 0; i < 10; i++ { + size, err := s.Write(fmt.Sprintf("abcdefghijk%d", i), bytes.NewReader(oneKilobyteArray)) + require.Nil(t, err) + require.Equal(t, int64(1024), size) + } + _, err := s.Write("abcdefghijkX", bytes.NewReader(oneKilobyteArray)) + require.Equal(t, util.ErrLimitReached, err) + require.NoFileExists(t, dir+"/abcdefghijkX") +} + +func TestFileStore_Write_FailedAdditionalLimiter(t *testing.T) { + dir, s := newTestFileStore(t) + _, err := s.Write("abcdefghijkl", bytes.NewReader(make([]byte, 1001)), util.NewFixedLimiter(1000)) + require.Equal(t, util.ErrLimitReached, err) + require.NoFileExists(t, dir+"/abcdefghijkl") +} + +func TestFileStore_Read_NotFound(t *testing.T) { + _, s := newTestFileStore(t) + _, _, err := s.Read("abcdefghijkl") + require.Error(t, err) +} + +func newTestFileStore(t *testing.T) (dir string, store Store) { + dir = t.TempDir() + store, err := NewFileStore(dir, 10*1024) + require.Nil(t, err) + return dir, store +} + +func readFile(t *testing.T, f string) string { + b, err := os.ReadFile(f) + require.Nil(t, err) + return string(b) +} diff --git a/attachment/store_s3.go b/attachment/store_s3.go new file mode 100644 index 00000000..118da4ce --- /dev/null +++ b/attachment/store_s3.go @@ -0,0 +1,255 @@ +package attachment + +import ( + "context" + "fmt" + "io" + "net/url" + "strings" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "heckel.io/ntfy/v2/log" + "heckel.io/ntfy/v2/util" +) + +const tagS3Store = "s3_store" + +type s3Store struct { + client *s3.Client + bucket string + prefix string + totalSizeCurrent int64 + totalSizeLimit int64 + mu sync.Mutex +} + +// NewS3Store creates a new S3-backed attachment store. The s3URL must be in the format: +// +// s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT] +func NewS3Store(s3URL string, totalSizeLimit int64) (Store, error) { + bucket, prefix, client, err := parseS3URL(s3URL) + if err != nil { + return nil, err + } + store := &s3Store{ + client: client, + bucket: bucket, + prefix: prefix, + totalSizeLimit: totalSizeLimit, + } + if totalSizeLimit > 0 { + size, err := store.computeSize() + if err != nil { + return nil, fmt.Errorf("s3 store: failed to compute initial size: %w", err) + } + store.totalSizeCurrent = size + } + return store, nil +} + +func parseS3URL(s3URL string) (bucket string, prefix string, client *s3.Client, err error) { + u, err := url.Parse(s3URL) + if err != nil { + return "", "", nil, fmt.Errorf("s3 store: invalid URL: %w", err) + } + if u.Scheme != "s3" { + return "", "", nil, fmt.Errorf("s3 store: URL scheme must be 's3', got '%s'", u.Scheme) + } + if u.Host == "" { + return "", "", nil, fmt.Errorf("s3 store: bucket name must be specified as host") + } + bucket = u.Host + prefix = strings.TrimPrefix(u.Path, "/") + + accessKey := u.User.Username() + secretKey, _ := u.User.Password() + if accessKey == "" || secretKey == "" { + return "", "", nil, fmt.Errorf("s3 store: access key and secret key must be specified in URL") + } + + region := u.Query().Get("region") + if region == "" { + return "", "", nil, fmt.Errorf("s3 store: region query parameter is required") + } + endpoint := u.Query().Get("endpoint") + + cfg := aws.Config{ + Region: region, + Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""), + } + var opts []func(*s3.Options) + if endpoint != "" { + opts = append(opts, func(o *s3.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.UsePathStyle = true + }) + } + client = s3.NewFromConfig(cfg, opts...) + return bucket, prefix, client, nil +} + +func (c *s3Store) objectKey(id string) string { + if c.prefix != "" { + return c.prefix + "/" + id + } + return id +} + +func (c *s3Store) Write(id string, in io.Reader, limiters ...util.Limiter) (int64, error) { + if !fileIDRegex.MatchString(id) { + return 0, errInvalidFileID + } + log.Tag(tagS3Store).Field("message_id", id).Debug("Writing attachment to S3") + + // Use io.Pipe so we can apply limiters while streaming to S3 + pr, pw := io.Pipe() + var writeErr error + var size int64 + + limiters = append(limiters, util.NewFixedLimiter(c.Remaining())) + go func() { + limitWriter := util.NewLimitWriter(pw, limiters...) + size, writeErr = io.Copy(limitWriter, in) + if writeErr != nil { + pw.CloseWithError(writeErr) + } else { + pw.Close() + } + }() + + key := c.objectKey(id) + _, err := c.client.PutObject(context.Background(), &s3.PutObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + Body: pr, + }) + if err != nil { + // If the limiter caused the error, return the original write error + if writeErr != nil { + return 0, writeErr + } + return 0, fmt.Errorf("s3 store: PutObject failed: %w", err) + } + if writeErr != nil { + // The write goroutine failed but PutObject somehow succeeded; clean up + _, _ = c.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }) + return 0, writeErr + } + + c.mu.Lock() + c.totalSizeCurrent += size + c.mu.Unlock() + return size, nil +} + +func (c *s3Store) Read(id string) (io.ReadCloser, int64, error) { + if !fileIDRegex.MatchString(id) { + return nil, 0, errInvalidFileID + } + key := c.objectKey(id) + resp, err := c.client.GetObject(context.Background(), &s3.GetObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, 0, fmt.Errorf("s3 store: GetObject failed: %w", err) + } + var size int64 + if resp.ContentLength != nil { + size = *resp.ContentLength + } + return resp.Body, size, nil +} + +func (c *s3Store) Remove(ids ...string) error { + for _, id := range ids { + if !fileIDRegex.MatchString(id) { + return errInvalidFileID + } + } + // S3 DeleteObjects supports up to 1000 keys per call + for i := 0; i < len(ids); i += 1000 { + end := i + 1000 + if end > len(ids) { + end = len(ids) + } + batch := ids[i:end] + objects := make([]s3types.ObjectIdentifier, len(batch)) + for j, id := range batch { + log.Tag(tagS3Store).Field("message_id", id).Debug("Deleting attachment from S3") + key := c.objectKey(id) + objects[j] = s3types.ObjectIdentifier{ + Key: aws.String(key), + } + } + _, err := c.client.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ + Bucket: aws.String(c.bucket), + Delete: &s3types.Delete{ + Objects: objects, + Quiet: aws.Bool(true), + }, + }) + if err != nil { + return fmt.Errorf("s3 store: DeleteObjects failed: %w", err) + } + } + // Recalculate totalSizeCurrent via ListObjectsV2 (matches fileStore's dirSize rescan pattern) + size, err := c.computeSize() + if err != nil { + return fmt.Errorf("s3 store: failed to compute size after remove: %w", err) + } + c.mu.Lock() + c.totalSizeCurrent = size + c.mu.Unlock() + return nil +} + +func (c *s3Store) Size() int64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.totalSizeCurrent +} + +func (c *s3Store) Remaining() int64 { + c.mu.Lock() + defer c.mu.Unlock() + remaining := c.totalSizeLimit - c.totalSizeCurrent + if remaining < 0 { + return 0 + } + return remaining +} + +func (c *s3Store) computeSize() (int64, error) { + var size int64 + paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(c.bucket), + Prefix: aws.String(c.prefixForList()), + }) + for paginator.HasMorePages() { + page, err := paginator.NextPage(context.Background()) + if err != nil { + return 0, err + } + for _, obj := range page.Contents { + if obj.Size != nil { + size += *obj.Size + } + } + } + return size, nil +} + +func (c *s3Store) prefixForList() string { + if c.prefix != "" { + return c.prefix + "/" + } + return "" +} diff --git a/attachment/store_s3_test.go b/attachment/store_s3_test.go new file mode 100644 index 00000000..a1808d0c --- /dev/null +++ b/attachment/store_s3_test.go @@ -0,0 +1,76 @@ +package attachment + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseS3URL_Success(t *testing.T) { + bucket, prefix, client, err := parseS3URL("s3://AKID:SECRET@my-bucket/attachments?region=us-east-1") + require.Nil(t, err) + require.Equal(t, "my-bucket", bucket) + require.Equal(t, "attachments", prefix) + require.NotNil(t, client) +} + +func TestParseS3URL_NoPrefix(t *testing.T) { + bucket, prefix, client, err := parseS3URL("s3://AKID:SECRET@my-bucket?region=us-east-1") + require.Nil(t, err) + require.Equal(t, "my-bucket", bucket) + require.Equal(t, "", prefix) + require.NotNil(t, client) +} + +func TestParseS3URL_WithEndpoint(t *testing.T) { + bucket, prefix, client, err := parseS3URL("s3://AKID:SECRET@my-bucket/prefix?region=us-east-1&endpoint=https://s3.example.com") + require.Nil(t, err) + require.Equal(t, "my-bucket", bucket) + require.Equal(t, "prefix", prefix) + require.NotNil(t, client) +} + +func TestParseS3URL_NestedPrefix(t *testing.T) { + bucket, prefix, _, err := parseS3URL("s3://AKID:SECRET@my-bucket/a/b/c?region=us-east-1") + require.Nil(t, err) + require.Equal(t, "my-bucket", bucket) + require.Equal(t, "a/b/c", prefix) +} + +func TestParseS3URL_MissingRegion(t *testing.T) { + _, _, _, err := parseS3URL("s3://AKID:SECRET@my-bucket") + require.Error(t, err) + require.Contains(t, err.Error(), "region") +} + +func TestParseS3URL_MissingCredentials(t *testing.T) { + _, _, _, err := parseS3URL("s3://my-bucket?region=us-east-1") + require.Error(t, err) + require.Contains(t, err.Error(), "access key") +} + +func TestParseS3URL_MissingSecretKey(t *testing.T) { + _, _, _, err := parseS3URL("s3://AKID@my-bucket?region=us-east-1") + require.Error(t, err) + require.Contains(t, err.Error(), "secret key") +} + +func TestParseS3URL_WrongScheme(t *testing.T) { + _, _, _, err := parseS3URL("http://AKID:SECRET@my-bucket?region=us-east-1") + require.Error(t, err) + require.Contains(t, err.Error(), "scheme") +} + +func TestParseS3URL_EmptyBucket(t *testing.T) { + _, _, _, err := parseS3URL("s3://AKID:SECRET@?region=us-east-1") + require.Error(t, err) + require.Contains(t, err.Error(), "bucket") +} + +func TestS3Store_ObjectKey(t *testing.T) { + s := &s3Store{prefix: "attachments"} + require.Equal(t, "attachments/abcdefghijkl", s.objectKey("abcdefghijkl")) + + s2 := &s3Store{prefix: ""} + require.Equal(t, "abcdefghijkl", s2.objectKey("abcdefghijkl")) +} diff --git a/cmd/serve.go b/cmd/serve.go index 415868fc..c2ed210a 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -53,6 +53,7 @@ var flagsServe = append( altsrc.NewStringSliceFlag(&cli.StringSliceFlag{Name: "auth-access", Aliases: []string{"auth_access"}, EnvVars: []string{"NTFY_AUTH_ACCESS"}, Usage: "pre-provisioned declarative access control entries"}), altsrc.NewStringSliceFlag(&cli.StringSliceFlag{Name: "auth-tokens", Aliases: []string{"auth_tokens"}, EnvVars: []string{"NTFY_AUTH_TOKENS"}, Usage: "pre-provisioned declarative access tokens"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-cache-dir", Aliases: []string{"attachment_cache_dir"}, EnvVars: []string{"NTFY_ATTACHMENT_CACHE_DIR"}, Usage: "cache directory for attached files"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-s3-url", Aliases: []string{"attachment_s3_url"}, EnvVars: []string{"NTFY_ATTACHMENT_S3_URL"}, Usage: "S3 URL for attachment storage (s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-total-size-limit", Aliases: []string{"attachment_total_size_limit", "A"}, EnvVars: []string{"NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT"}, Value: util.FormatSize(server.DefaultAttachmentTotalSizeLimit), Usage: "limit of the on-disk attachment cache"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-file-size-limit", Aliases: []string{"attachment_file_size_limit", "Y"}, EnvVars: []string{"NTFY_ATTACHMENT_FILE_SIZE_LIMIT"}, Value: util.FormatSize(server.DefaultAttachmentFileSizeLimit), Usage: "per-file attachment size limit (e.g. 300k, 2M, 100M)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-expiry-duration", Aliases: []string{"attachment_expiry_duration", "X"}, EnvVars: []string{"NTFY_ATTACHMENT_EXPIRY_DURATION"}, Value: util.FormatDuration(server.DefaultAttachmentExpiryDuration), Usage: "duration after which uploaded attachments will be deleted (e.g. 3h, 20h)"}), @@ -166,6 +167,7 @@ func execServe(c *cli.Context) error { authAccessRaw := c.StringSlice("auth-access") authTokensRaw := c.StringSlice("auth-tokens") attachmentCacheDir := c.String("attachment-cache-dir") + attachmentS3URL := c.String("attachment-s3-url") attachmentTotalSizeLimitStr := c.String("attachment-total-size-limit") attachmentFileSizeLimitStr := c.String("attachment-file-size-limit") attachmentExpiryDurationStr := c.String("attachment-expiry-duration") @@ -314,6 +316,10 @@ func execServe(c *cli.Context) error { return errors.New("if smtp-server-listen is set, smtp-server-domain must also be set") } else if attachmentCacheDir != "" && baseURL == "" { return errors.New("if attachment-cache-dir is set, base-url must also be set") + } else if attachmentS3URL != "" && baseURL == "" { + return errors.New("if attachment-s3-url is set, base-url must also be set") + } else if attachmentS3URL != "" && attachmentCacheDir != "" { + return errors.New("attachment-cache-dir and attachment-s3-url are mutually exclusive") } else if baseURL != "" { u, err := url.Parse(baseURL) if err != nil { @@ -457,6 +463,7 @@ func execServe(c *cli.Context) error { conf.AuthAccess = authAccess conf.AuthTokens = authTokens conf.AttachmentCacheDir = attachmentCacheDir + conf.AttachmentS3URL = attachmentS3URL conf.AttachmentTotalSizeLimit = attachmentTotalSizeLimit conf.AttachmentFileSizeLimit = attachmentFileSizeLimit conf.AttachmentExpiryDuration = attachmentExpiryDuration diff --git a/docs/config.md b/docs/config.md index b9c8f07f..34484a51 100644 --- a/docs/config.md +++ b/docs/config.md @@ -489,20 +489,23 @@ Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscri ## Attachments If desired, you may allow users to upload and [attach files to notifications](publish.md#attachments). To enable -this feature, you have to simply configure an attachment cache directory and a base URL (`attachment-cache-dir`, `base-url`). -Once these options are set and the directory is writable by the server user, you can upload attachments via PUT. +this feature, you have to configure an attachment storage backend and a base URL (`base-url`). Attachments can be stored +either on the local filesystem (`attachment-cache-dir`) or in an S3-compatible object store (`attachment-s3-url`). +Once configured, you can upload attachments via PUT. -By default, attachments are stored in the disk-cache **for only 3 hours**. The main reason for this is to avoid legal issues -and such when hosting user controlled content. Typically, this is more than enough time for the user (or the auto download +By default, attachments are stored **for only 3 hours**. The main reason for this is to avoid legal issues +and such when hosting user controlled content. Typically, this is more than enough time for the user (or the auto download feature) to download the file. The following config options are relevant to attachments: * `base-url` is the root URL for the ntfy server; this is needed for the generated attachment URLs -* `attachment-cache-dir` is the cache directory for attached files -* `attachment-total-size-limit` is the size limit of the on-disk attachment cache (default: 5G) +* `attachment-cache-dir` is the cache directory for attached files (mutually exclusive with `attachment-s3-url`) +* `attachment-s3-url` is the S3 URL for attachment storage (mutually exclusive with `attachment-cache-dir`) +* `attachment-total-size-limit` is the size limit of the attachment storage (default: 5G) * `attachment-file-size-limit` is the per-file attachment size limit (e.g. 300k, 2M, 100M, default: 15M) * `attachment-expiry-duration` is the duration after which uploaded attachments will be deleted (e.g. 3h, 20h, default: 3h) -Here's an example config using mostly the defaults (except for the cache directory, which is empty by default): +### Filesystem storage +Here's an example config using the local filesystem for attachment storage: === "/etc/ntfy/server.yml (minimal)" ``` yaml @@ -521,6 +524,30 @@ Here's an example config using mostly the defaults (except for the cache directo visitor-attachment-daily-bandwidth-limit: "500M" ``` +### S3 storage +As an alternative to the local filesystem, you can store attachments in an S3-compatible object store (e.g. AWS S3, +MinIO, DigitalOcean Spaces). This is useful for HA/cloud deployments where you don't want to rely on local disk storage. + +The `attachment-s3-url` option uses the following format: + +``` +s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT] +``` + +When `endpoint` is specified, path-style addressing is enabled automatically (useful for MinIO and other S3-compatible stores). + +=== "/etc/ntfy/server.yml (AWS S3)" + ``` yaml + base-url: "https://ntfy.sh" + attachment-s3-url: "s3://AKID:SECRET@my-bucket/attachments?region=us-east-1" + ``` + +=== "/etc/ntfy/server.yml (MinIO/custom endpoint)" + ``` yaml + base-url: "https://ntfy.sh" + attachment-s3-url: "s3://AKID:SECRET@my-bucket/attachments?region=us-east-1&endpoint=https://s3.example.com" + ``` + Please also refer to the [rate limiting](#rate-limiting) settings below, specifically `visitor-attachment-total-size-limit` and `visitor-attachment-daily-bandwidth-limit`. Setting these conservatively is necessary to avoid abuse. @@ -2116,7 +2143,8 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`). | `behind-proxy` | `NTFY_BEHIND_PROXY` | *bool* | false | If set, use forwarded header (e.g. X-Forwarded-For, X-Client-IP) to determine visitor IP address (for rate limiting) | | `proxy-forwarded-header` | `NTFY_PROXY_FORWARDED_HEADER` | *string* | `X-Forwarded-For` | Use specified header to determine visitor IP address (for rate limiting) | | `proxy-trusted-hosts` | `NTFY_PROXY_TRUSTED_HOSTS` | *comma-separated host/IP/CIDR list* | - | Comma-separated list of trusted IP addresses, hosts, or CIDRs to remove from forwarded header | -| `attachment-cache-dir` | `NTFY_ATTACHMENT_CACHE_DIR` | *directory* | - | Cache directory for attached files. To enable attachments, this has to be set. | +| `attachment-cache-dir` | `NTFY_ATTACHMENT_CACHE_DIR` | *directory* | - | Cache directory for attached files. Mutually exclusive with `attachment-s3-url`. | +| `attachment-s3-url` | `NTFY_ATTACHMENT_S3_URL` | *URL* | - | S3 URL for attachment storage (format: `s3://KEY:SECRET@BUCKET[/PREFIX]?region=REGION`). Mutually exclusive with `attachment-cache-dir`. | | `attachment-total-size-limit` | `NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT` | *size* | 5G | Limit of the on-disk attachment cache directory. If the limits is exceeded, new attachments will be rejected. | | `attachment-file-size-limit` | `NTFY_ATTACHMENT_FILE_SIZE_LIMIT` | *size* | 15M | Per-file attachment size limit (e.g. 300k, 2M, 100M). Larger attachment will be rejected. | | `attachment-expiry-duration` | `NTFY_ATTACHMENT_EXPIRY_DURATION` | *duration* | 3h | Duration after which uploaded attachments will be deleted (e.g. 3h, 20h). Strongly affects `visitor-attachment-total-size-limit`. | @@ -2219,6 +2247,7 @@ OPTIONS: --auth-startup-queries value, --auth_startup_queries value queries run when the auth database is initialized [$NTFY_AUTH_STARTUP_QUERIES] --auth-default-access value, --auth_default_access value, -p value default permissions if no matching entries in the auth database are found (default: "read-write") [$NTFY_AUTH_DEFAULT_ACCESS] --attachment-cache-dir value, --attachment_cache_dir value cache directory for attached files [$NTFY_ATTACHMENT_CACHE_DIR] + --attachment-s3-url value, --attachment_s3_url value S3 URL for attachment storage (s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION) [$NTFY_ATTACHMENT_S3_URL] --attachment-total-size-limit value, --attachment_total_size_limit value, -A value limit of the on-disk attachment cache (default: "5G") [$NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT] --attachment-file-size-limit value, --attachment_file_size_limit value, -Y value per-file attachment size limit (e.g. 300k, 2M, 100M) (default: "15M") [$NTFY_ATTACHMENT_FILE_SIZE_LIMIT] --attachment-expiry-duration value, --attachment_expiry_duration value, -X value duration after which uploaded attachments will be deleted (e.g. 3h, 20h) (default: "3h") [$NTFY_ATTACHMENT_EXPIRY_DURATION] diff --git a/docs/releases.md b/docs/releases.md index 7a40e5c4..7f5d6b45 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1761,6 +1761,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release * Support PostgreSQL read replicas for offloading non-critical read queries via `database-replica-urls` config option * Add interactive [config generator](config.md#config-generator) to the documentation to help create server configuration files +* Add S3-compatible object storage as an alternative attachment backend via `attachment-s3-url` config option **Bug fixes + maintenance:** diff --git a/go.mod b/go.mod index c073d6aa..ef8564c2 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,18 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 // indirect github.com/MicahParks/keyfunc v1.9.0 // indirect + github.com/aws/aws-sdk-go-v2 v1.41.4 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.7 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.19.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20 // indirect + github.com/aws/aws-sdk-go-v2/service/s3 v1.97.1 // indirect + github.com/aws/smithy-go v1.24.2 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index 1c6eada9..3f373614 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,30 @@ github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= github.com/SherClockHolmes/webpush-go v1.4.0 h1:ocnzNKWN23T9nvHi6IfyrQjkIc0oJWv1B1pULsf9i3s= github.com/SherClockHolmes/webpush-go v1.4.0/go.mod h1:XSq8pKX11vNV8MJEMwjrlTkxhAj1zKfxmyhdV7Pd6UA= +github.com/aws/aws-sdk-go-v2 v1.41.4 h1:10f50G7WyU02T56ox1wWXq+zTX9I1zxG46HYuG1hH/k= +github.com/aws/aws-sdk-go-v2 v1.41.4/go.mod h1:mwsPRE8ceUUpiTgF7QmQIJ7lgsKUPQOUl3o72QBrE1o= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.7 h1:3kGOqnh1pPeddVa/E37XNTaWJ8W6vrbYV9lJEkCnhuY= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.7/go.mod h1:lyw7GFp3qENLh7kwzf7iMzAxDn+NzjXEAGjKS2UOKqI= +github.com/aws/aws-sdk-go-v2/credentials v1.19.12 h1:oqtA6v+y5fZg//tcTWahyN9PEn5eDU/Wpvc2+kJ4aY8= +github.com/aws/aws-sdk-go-v2/credentials v1.19.12/go.mod h1:U3R1RtSHx6NB0DvEQFGyf/0sbrpJrluENHdPy1j/3TE= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20 h1:CNXO7mvgThFGqOFgbNAP2nol2qAWBOGfqR/7tQlvLmc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.20/go.mod h1:oydPDJKcfMhgfcgBUZaG+toBbwy8yPWubJXBVERtI4o= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20 h1:tN6W/hg+pkM+tf9XDkWUbDEjGLb+raoBMFsTodcoYKw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.20/go.mod h1:YJ898MhD067hSHA6xYCx5ts/jEd8BSOLtQDL3iZsvbc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21 h1:SwGMTMLIlvDNyhMteQ6r8IJSBPlRdXX5d4idhIGbkXA= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.21/go.mod h1:UUxgWxofmOdAMuqEsSppbDtGKLfR04HGsD0HXzvhI1k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 h1:5EniKhLZe4xzL7a+fU3C2tfUN4nWIqlLesfrjkuPFTY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7/go.mod h1:x0nZssQ3qZSnIcePWLvcoFisRXJzcTVvYpAAdYX8+GI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12 h1:qtJZ70afD3ISKWnoX3xB0J2otEqu3LqicRcDBqsj0hQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.12/go.mod h1:v2pNpJbRNl4vEUWEh5ytQok0zACAKfdmKS51Hotc3pQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20 h1:2HvVAIq+YqgGotK6EkMf+KIEqTISmTYh5zLpYyeTo1Y= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.20/go.mod h1:V4X406Y666khGa8ghKmphma/7C0DAtEQYhkq9z4vpbk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20 h1:siU1A6xjUZ2N8zjTHSXFhB9L/2OY8Dqs0xXiLjF30jA= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.20/go.mod h1:4TLZCmVJDM3FOu5P5TJP0zOlu9zWgDWU7aUxWbr+rcw= +github.com/aws/aws-sdk-go-v2/service/s3 v1.97.1 h1:csi9NLpFZXb9fxY7rS1xVzgPRGMt7MSNWeQ6eo247kE= +github.com/aws/aws-sdk-go-v2/service/s3 v1.97.1/go.mod h1:qXVal5H0ChqXP63t6jze5LmFalc7+ZE7wOdLtZ0LCP0= +github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng= +github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/server/config.go b/server/config.go index 8ead312c..97f72a1c 100644 --- a/server/config.go +++ b/server/config.go @@ -112,6 +112,7 @@ type Config struct { AuthBcryptCost int AuthStatsQueueWriterInterval time.Duration AttachmentCacheDir string + AttachmentS3URL string AttachmentTotalSizeLimit int64 AttachmentFileSizeLimit int64 AttachmentExpiryDuration time.Duration diff --git a/server/file_cache_test.go b/server/file_cache_test.go deleted file mode 100644 index e7dee3b3..00000000 --- a/server/file_cache_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "github.com/stretchr/testify/require" - "heckel.io/ntfy/v2/util" - "os" - "strings" - "testing" -) - -var ( - oneKilobyteArray = make([]byte, 1024) -) - -func TestFileCache_Write_Success(t *testing.T) { - dir, c := newTestFileCache(t) - size, err := c.Write("abcdefghijkl", strings.NewReader("normal file"), util.NewFixedLimiter(999)) - require.Nil(t, err) - require.Equal(t, int64(11), size) - require.Equal(t, "normal file", readFile(t, dir+"/abcdefghijkl")) - require.Equal(t, int64(11), c.Size()) - require.Equal(t, int64(10229), c.Remaining()) -} - -func TestFileCache_Write_Remove_Success(t *testing.T) { - dir, c := newTestFileCache(t) // max = 10k (10240), each = 1k (1024) - for i := 0; i < 10; i++ { // 10x999 = 9990 - size, err := c.Write(fmt.Sprintf("abcdefghijk%d", i), bytes.NewReader(make([]byte, 999))) - require.Nil(t, err) - require.Equal(t, int64(999), size) - } - require.Equal(t, int64(9990), c.Size()) - require.Equal(t, int64(250), c.Remaining()) - require.FileExists(t, dir+"/abcdefghijk1") - require.FileExists(t, dir+"/abcdefghijk5") - - require.Nil(t, c.Remove("abcdefghijk1", "abcdefghijk5")) - require.NoFileExists(t, dir+"/abcdefghijk1") - require.NoFileExists(t, dir+"/abcdefghijk5") - require.Equal(t, int64(7992), c.Size()) - require.Equal(t, int64(2248), c.Remaining()) -} - -func TestFileCache_Write_FailedTotalSizeLimit(t *testing.T) { - dir, c := newTestFileCache(t) - for i := 0; i < 10; i++ { - size, err := c.Write(fmt.Sprintf("abcdefghijk%d", i), bytes.NewReader(oneKilobyteArray)) - require.Nil(t, err) - require.Equal(t, int64(1024), size) - } - _, err := c.Write("abcdefghijkX", bytes.NewReader(oneKilobyteArray)) - require.Equal(t, util.ErrLimitReached, err) - require.NoFileExists(t, dir+"/abcdefghijkX") -} - -func TestFileCache_Write_FailedAdditionalLimiter(t *testing.T) { - dir, c := newTestFileCache(t) - _, err := c.Write("abcdefghijkl", bytes.NewReader(make([]byte, 1001)), util.NewFixedLimiter(1000)) - require.Equal(t, util.ErrLimitReached, err) - require.NoFileExists(t, dir+"/abcdefghijkl") -} - -func newTestFileCache(t *testing.T) (dir string, cache *fileCache) { - dir = t.TempDir() - cache, err := newFileCache(dir, 10*1024) - require.Nil(t, err) - return dir, cache -} - -func readFile(t *testing.T, f string) string { - b, err := os.ReadFile(f) - require.Nil(t, err) - return string(b) -} diff --git a/server/log.go b/server/log.go index 03600c0d..e4ddc178 100644 --- a/server/log.go +++ b/server/log.go @@ -24,7 +24,6 @@ const ( tagSMTP = "smtp" // Receive email tagEmail = "email" // Send email tagTwilio = "twilio" - tagFileCache = "file_cache" tagMessageCache = "message_cache" tagStripe = "stripe" tagAccount = "account" diff --git a/server/server.go b/server/server.go index 24c712bd..434f93af 100644 --- a/server/server.go +++ b/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" + "heckel.io/ntfy/v2/attachment" "heckel.io/ntfy/v2/db" "heckel.io/ntfy/v2/db/pg" "heckel.io/ntfy/v2/log" @@ -64,7 +65,7 @@ type Server struct { userManager *user.Manager // Might be nil! messageCache *message.Cache // Database that stores the messages webPush *webpush.Store // Database that stores web push subscriptions - fileCache *fileCache // File system based cache that stores attachments + fileCache attachment.Store // Attachment store (file system or S3) stripe stripeAPI // Stripe API, can be replaced with a mock priceCache *util.LookupCache[map[string]int64] // Stripe price ID -> price as cents (USD implied!) metricsHandler http.Handler // Handles /metrics if enable-metrics set, and listen-metrics-http not set @@ -227,12 +228,9 @@ func New(conf *Config) (*Server, error) { if err != nil { return nil, err } - var fileCache *fileCache - if conf.AttachmentCacheDir != "" { - fileCache, err = newFileCache(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit) - if err != nil { - return nil, err - } + fileCache, err := createAttachmentStore(conf) + if err != nil { + return nil, err } var userManager *user.Manager if conf.AuthFile != "" || pool != nil { @@ -301,6 +299,15 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) { return message.NewMemStore() } +func createAttachmentStore(conf *Config) (attachment.Store, error) { + if conf.AttachmentS3URL != "" { + return attachment.NewS3Store(conf.AttachmentS3URL, conf.AttachmentTotalSizeLimit) + } else if conf.AttachmentCacheDir != "" { + return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit) + } + return nil, nil +} + // Run executes the main server. It listens on HTTP (+ HTTPS, if configured), and starts // a manager go routine to print stats and prune messages. func (s *Server) Run() error { @@ -752,7 +759,7 @@ func (s *Server) handleStats(w http.ResponseWriter, _ *http.Request, _ *visitor) // Before streaming the file to a client, it locates uploader (m.Sender or m.User) in the message cache, so it // can associate the download bandwidth with the uploader. func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) error { - if s.config.AttachmentCacheDir == "" { + if s.fileCache == nil { return errHTTPInternalError } matches := fileRegex.FindStringSubmatch(r.URL.Path) @@ -760,16 +767,16 @@ func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) return errHTTPInternalErrorInvalidPath } messageID := matches[1] - file := filepath.Join(s.config.AttachmentCacheDir, messageID) - stat, err := os.Stat(file) + reader, size, err := s.fileCache.Read(messageID) if err != nil { return errHTTPNotFound.Fields(log.Context{ "message_id": messageID, - "error_context": "filesystem", + "error_context": "attachment_store", }) } + defer reader.Close() w.Header().Set("Access-Control-Allow-Origin", s.config.AccessControlAllowOrigin) // CORS, allow cross-origin requests - w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size())) + w.Header().Set("Content-Length", fmt.Sprintf("%d", size)) if r.Method == http.MethodHead { return nil } @@ -805,19 +812,14 @@ func (s *Server) handleFile(w http.ResponseWriter, r *http.Request, v *visitor) } else if m.Sender.IsValid() { bandwidthVisitor = s.visitor(m.Sender, nil) } - if !bandwidthVisitor.BandwidthAllowed(stat.Size()) { + if !bandwidthVisitor.BandwidthAllowed(size) { return errHTTPTooManyRequestsLimitAttachmentBandwidth.With(m) } // Actually send file - f, err := os.Open(file) - if err != nil { - return err - } - defer f.Close() if m.Attachment.Name != "" { w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(m.Attachment.Name)) } - _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), f) + _, err = io.Copy(util.NewContentTypeWriter(w, r.URL.Path), reader) return err } @@ -1408,7 +1410,7 @@ func (s *Server) renderTemplate(name, tpl, source string) (string, error) { } func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *model.Message, body *util.PeekedReadCloser) error { - if s.fileCache == nil || s.config.BaseURL == "" || s.config.AttachmentCacheDir == "" { + if s.fileCache == nil || s.config.BaseURL == "" { return errHTTPBadRequestAttachmentsDisallowed.With(m) } vinfo, err := v.Info() diff --git a/server/server.yml b/server/server.yml index 43cb5fb4..e6f7afee 100644 --- a/server/server.yml +++ b/server/server.yml @@ -159,6 +159,7 @@ # - attachment-expiry-duration is the duration after which uploaded attachments will be deleted (e.g. 3h, 20h) # # attachment-cache-dir: +# attachment-s3-url: "s3://ACCESS_KEY:SECRET_KEY@bucket/prefix?region=us-east-1" # attachment-total-size-limit: "5G" # attachment-file-size-limit: "15M" # attachment-expiry-duration: "3h" diff --git a/server/server_manager.go b/server/server_manager.go index afed7b33..5bf42924 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -99,6 +99,9 @@ func (s *Server) execManager() { mset(metricUsers, usersCount) mset(metricSubscribers, subscribers) mset(metricTopics, topicsCount) + if s.fileCache != nil { + mset(metricAttachmentsTotalSize, s.fileCache.Size()) + } } func (s *Server) pruneVisitors() {