From 2aae3577cb5238a303eec3c8dde82a2c744181b3 Mon Sep 17 00:00:00 2001 From: Hunter Kehoe Date: Fri, 17 Oct 2025 17:39:55 -0600 Subject: [PATCH] add sid, mtime, and deleted to message_cache --- server/message_cache.go | 79 +++++++++++++++++++++++++++--------- server/message_cache_test.go | 29 +++++++++++-- server/types.go | 13 +++++- 3 files changed, 98 insertions(+), 23 deletions(-) diff --git a/server/message_cache.go b/server/message_cache.go index 03cb4969..2523d66b 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -28,7 +28,9 @@ const ( CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, mid TEXT NOT NULL, + sid TEXT NOT NULL, time INT NOT NULL, + mtime INT NOT NULL, expires INT NOT NULL, topic TEXT NOT NULL, message TEXT NOT NULL, @@ -48,10 +50,13 @@ const ( user TEXT NOT NULL, content_type TEXT NOT NULL, encoding TEXT NOT NULL, - published INT NOT NULL + published INT NOT NULL, + deleted INT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid); + CREATE INDEX IF NOT EXISTS idx_sid ON messages (sid); CREATE INDEX IF NOT EXISTS idx_time ON messages (time); + CREATE INDEX IF NOT EXISTS idx_mtime ON messages (mtime); CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires); CREATE INDEX IF NOT EXISTS idx_sender ON messages (sender); @@ -65,56 +70,57 @@ const ( COMMIT; ` insertMessageQuery = ` - INSERT INTO messages (mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO messages (mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published, deleted) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` deleteMessageQuery = `DELETE FROM messages WHERE mid = ?` updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics selectMessagesByIDQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE mid = ? ` selectMessagesSinceTimeQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE topic = ? AND time >= ? AND published = 1 - ORDER BY time, id + ORDER BY mtime, id ` selectMessagesSinceTimeIncludeScheduledQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE topic = ? AND time >= ? - ORDER BY time, id + ORDER BY mtime, id ` selectMessagesSinceIDQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE topic = ? AND id > ? AND published = 1 - ORDER BY time, id + ORDER BY mtime, id ` selectMessagesSinceIDIncludeScheduledQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE topic = ? AND (id > ? OR published = 0) - ORDER BY time, id + ORDER BY mtime, id ` selectMessagesLatestQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE topic = ? AND published = 1 ORDER BY time DESC, id DESC LIMIT 1 ` selectMessagesDueQuery = ` - SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding + SELECT mid, sid, time, mtime, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted FROM messages WHERE time <= ? AND published = 0 - ORDER BY time, id + ORDER BY mtime, id ` selectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1` updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` + updateMessageDeletedQuery = `UPDATE messages SET deleted = 1 WHERE mid = ?` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages` selectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM messages GROUP BY topic` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` @@ -130,7 +136,7 @@ const ( // Schema management queries const ( - currentSchemaVersion = 13 + currentSchemaVersion = 14 createSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -259,6 +265,15 @@ const ( migrate12To13AlterMessagesTableQuery = ` CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic); ` + + //13 -> 14 + migrate13To14AlterMessagesTableQuery = ` + ALTER TABLE messages ADD COLUMN sid TEXT NOT NULL DEFAULT(''); + ALTER TABLE messages ADD COLUMN mtime INT NOT NULL DEFAULT('0'); + ALTER TABLE messages ADD COLUMN deleted INT NOT NULL DEFAULT('0'); + CREATE INDEX IF NOT EXISTS idx_sid ON messages (sid); + CREATE INDEX IF NOT EXISTS idx_mtime ON messages (mtime); + ` ) var ( @@ -276,6 +291,7 @@ var ( 10: migrateFrom10, 11: migrateFrom11, 12: migrateFrom12, + 13: migrateFrom13, } ) @@ -393,7 +409,9 @@ func (c *messageCache) addMessages(ms []*message) error { } _, err := stmt.Exec( m.ID, + m.SID, m.Time, + m.MTime, m.Expires, m.Topic, m.Message, @@ -414,6 +432,7 @@ func (c *messageCache) addMessages(ms []*message) error { m.ContentType, m.Encoding, published, + 0, ) if err != nil { return err @@ -692,12 +711,14 @@ func readMessages(rows *sql.Rows) ([]*message, error) { } func readMessage(rows *sql.Rows) (*message, error) { - var timestamp, expires, attachmentSize, attachmentExpires int64 - var priority int - var id, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding string + var timestamp, mtimestamp, expires, attachmentSize, attachmentExpires int64 + var priority, deleted int + var id, sid, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding string err := rows.Scan( &id, + &sid, ×tamp, + &mtimestamp, &expires, &topic, &msg, @@ -716,6 +737,7 @@ func readMessage(rows *sql.Rows) (*message, error) { &user, &contentType, &encoding, + &deleted, ) if err != nil { return nil, err @@ -746,7 +768,9 @@ func readMessage(rows *sql.Rows) (*message, error) { } return &message{ ID: id, + SID: sid, Time: timestamp, + MTime: mtimestamp, Expires: expires, Event: messageEvent, Topic: topic, @@ -762,6 +786,7 @@ func readMessage(rows *sql.Rows) (*message, error) { User: user, ContentType: contentType, Encoding: encoding, + Deleted: deleted, }, nil } @@ -1016,3 +1041,19 @@ func migrateFrom12(db *sql.DB, _ time.Duration) error { } return tx.Commit() } + +func migrateFrom13(db *sql.DB, _ time.Duration) error { + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 13 to 14") + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + if _, err := tx.Exec(migrate13To14AlterMessagesTableQuery); err != nil { + return err + } + if _, err := tx.Exec(updateSchemaVersion, 14); err != nil { + return err + } + return tx.Commit() +} diff --git a/server/message_cache_test.go b/server/message_cache_test.go index 778f28fe..6878d78d 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -22,9 +22,11 @@ func TestMemCache_Messages(t *testing.T) { func testCacheMessages(t *testing.T, c *messageCache) { m1 := newDefaultMessage("mytopic", "my message") m1.Time = 1 + m1.MTime = 1000 m2 := newDefaultMessage("mytopic", "my other message") m2.Time = 2 + m2.MTime = 2000 require.Nil(t, c.AddMessage(m1)) require.Nil(t, c.AddMessage(newDefaultMessage("example", "my example message"))) @@ -102,10 +104,13 @@ func testCacheMessagesScheduled(t *testing.T, c *messageCache) { m1 := newDefaultMessage("mytopic", "message 1") m2 := newDefaultMessage("mytopic", "message 2") m2.Time = time.Now().Add(time.Hour).Unix() + m2.MTime = time.Now().Add(time.Hour).UnixMilli() m3 := newDefaultMessage("mytopic", "message 3") - m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2! + m3.Time = time.Now().Add(time.Minute).Unix() // earlier than m2! + m3.MTime = time.Now().Add(time.Minute).UnixMilli() // earlier than m2! m4 := newDefaultMessage("mytopic2", "message 4") m4.Time = time.Now().Add(time.Minute).Unix() + m4.MTime = time.Now().Add(time.Minute).UnixMilli() require.Nil(t, c.AddMessage(m1)) require.Nil(t, c.AddMessage(m2)) require.Nil(t, c.AddMessage(m3)) @@ -179,18 +184,25 @@ func TestMemCache_MessagesSinceID(t *testing.T) { func testCacheMessagesSinceID(t *testing.T, c *messageCache) { m1 := newDefaultMessage("mytopic", "message 1") m1.Time = 100 + m1.MTime = 100000 m2 := newDefaultMessage("mytopic", "message 2") m2.Time = 200 + m2.MTime = 200000 m3 := newDefaultMessage("mytopic", "message 3") - m3.Time = time.Now().Add(time.Hour).Unix() // Scheduled, in the future, later than m7 and m5 + m3.Time = time.Now().Add(time.Hour).Unix() // Scheduled, in the future, later than m7 and m5 + m3.MTime = time.Now().Add(time.Hour).UnixMilli() // Scheduled, in the future, later than m7 and m5 m4 := newDefaultMessage("mytopic", "message 4") m4.Time = 400 + m4.MTime = 400000 m5 := newDefaultMessage("mytopic", "message 5") - m5.Time = time.Now().Add(time.Minute).Unix() // Scheduled, in the future, later than m7 + m5.Time = time.Now().Add(time.Minute).Unix() // Scheduled, in the future, later than m7 + m5.MTime = time.Now().Add(time.Minute).UnixMilli() // Scheduled, in the future, later than m7 m6 := newDefaultMessage("mytopic", "message 6") m6.Time = 600 + m6.MTime = 600000 m7 := newDefaultMessage("mytopic", "message 7") m7.Time = 700 + m7.MTime = 700000 require.Nil(t, c.AddMessage(m1)) require.Nil(t, c.AddMessage(m2)) @@ -251,14 +263,17 @@ func testCachePrune(t *testing.T, c *messageCache) { m1 := newDefaultMessage("mytopic", "my message") m1.Time = now - 10 + m1.MTime = (now - 10) * 1000 m1.Expires = now - 5 m2 := newDefaultMessage("mytopic", "my other message") m2.Time = now - 5 + m2.MTime = (now - 5) * 1000 m2.Expires = now + 5 // In the future m3 := newDefaultMessage("another_topic", "and another one") m3.Time = now - 12 + m3.MTime = (now - 12) * 1000 m3.Expires = now - 2 require.Nil(t, c.AddMessage(m1)) @@ -297,6 +312,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) { expires1 := time.Now().Add(-4 * time.Hour).Unix() // Expired m := newDefaultMessage("mytopic", "flower for you") m.ID = "m1" + m.SID = "m1" m.Sender = netip.MustParseAddr("1.2.3.4") m.Attachment = &attachment{ Name: "flower.jpg", @@ -310,6 +326,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) { expires2 := time.Now().Add(2 * time.Hour).Unix() // Future m = newDefaultMessage("mytopic", "sending you a car") m.ID = "m2" + m.SID = "m2" m.Sender = netip.MustParseAddr("1.2.3.4") m.Attachment = &attachment{ Name: "car.jpg", @@ -323,6 +340,7 @@ func testCacheAttachments(t *testing.T, c *messageCache) { expires3 := time.Now().Add(1 * time.Hour).Unix() // Future m = newDefaultMessage("another-topic", "sending you another car") m.ID = "m3" + m.SID = "m3" m.User = "u_BAsbaAa" m.Sender = netip.MustParseAddr("5.6.7.8") m.Attachment = &attachment{ @@ -378,11 +396,13 @@ func TestMemCache_Attachments_Expired(t *testing.T) { func testCacheAttachmentsExpired(t *testing.T, c *messageCache) { m := newDefaultMessage("mytopic", "flower for you") m.ID = "m1" + m.SID = "m1" m.Expires = time.Now().Add(time.Hour).Unix() require.Nil(t, c.AddMessage(m)) m = newDefaultMessage("mytopic", "message with attachment") m.ID = "m2" + m.SID = "m2" m.Expires = time.Now().Add(2 * time.Hour).Unix() m.Attachment = &attachment{ Name: "car.jpg", @@ -395,6 +415,7 @@ func testCacheAttachmentsExpired(t *testing.T, c *messageCache) { m = newDefaultMessage("mytopic", "message with external attachment") m.ID = "m3" + m.SID = "m3" m.Expires = time.Now().Add(2 * time.Hour).Unix() m.Attachment = &attachment{ Name: "car.jpg", @@ -406,6 +427,7 @@ func testCacheAttachmentsExpired(t *testing.T, c *messageCache) { m = newDefaultMessage("mytopic2", "message with expired attachment") m.ID = "m4" + m.SID = "m4" m.Expires = time.Now().Add(2 * time.Hour).Unix() m.Attachment = &attachment{ Name: "expired-car.jpg", @@ -502,6 +524,7 @@ func TestSqliteCache_Migration_From1(t *testing.T) { // Add delayed message delayedMessage := newDefaultMessage("mytopic", "some delayed message") delayedMessage.Time = time.Now().Add(time.Minute).Unix() + delayedMessage.MTime = time.Now().Add(time.Minute).UnixMilli() require.Nil(t, c.AddMessage(delayedMessage)) // 10, not 11! diff --git a/server/types.go b/server/types.go index 65492e46..9e65045f 100644 --- a/server/types.go +++ b/server/types.go @@ -25,7 +25,9 @@ const ( // message represents a message published to a topic type message struct { ID string `json:"id"` // Random message ID + SID string `json:"sid"` // Message sequence ID for updating message contents Time int64 `json:"time"` // Unix time in seconds + MTime int64 `json:"mtime"` // Unix time in milliseconds Expires int64 `json:"expires,omitempty"` // Unix time in seconds (not required for open/keepalive) Event string `json:"event"` // One of the above Topic string `json:"topic"` @@ -42,13 +44,16 @@ type message struct { Encoding string `json:"encoding,omitempty"` // empty for raw UTF-8, or "base64" for encoded bytes Sender netip.Addr `json:"-"` // IP address of uploader, used for rate limiting User string `json:"-"` // UserID of the uploader, used to associated attachments + Deleted int `json:"deleted,omitempty"` } func (m *message) Context() log.Context { fields := map[string]any{ "topic": m.Topic, "message_id": m.ID, + "message_sid": m.SID, "message_time": m.Time, + "message_mtime": m.MTime, "message_event": m.Event, "message_body_size": len(m.Message), } @@ -92,6 +97,7 @@ func newAction() *action { // publishMessage is used as input when publishing as JSON type publishMessage struct { Topic string `json:"topic"` + SID string `json:"sid"` Title string `json:"title"` Message string `json:"message"` Priority int `json:"priority"` @@ -117,6 +123,7 @@ func newMessage(event, topic, msg string) *message { return &message{ ID: util.RandomString(messageIDLength), Time: time.Now().Unix(), + MTime: time.Now().UnixMilli(), Event: event, Topic: topic, Message: msg, @@ -155,7 +162,11 @@ type sinceMarker struct { } func newSinceTime(timestamp int64) sinceMarker { - return sinceMarker{time.Unix(timestamp, 0), ""} + return newSinceMTime(timestamp * 1000) +} + +func newSinceMTime(mtimestamp int64) sinceMarker { + return sinceMarker{time.UnixMilli(mtimestamp), ""} } func newSinceID(id string) sinceMarker {