Add webhook management and scheduling functionality

This commit is contained in:
2025-04-25 10:53:00 +02:00
parent 5f8e9dd589
commit 56039bd3b9
7 changed files with 665 additions and 17 deletions

View File

@@ -0,0 +1,345 @@
const axios = require('axios');
const dbInstance = require('../db');
const EventEmitter = require('events');
class WebhookManager {
constructor() {
if (WebhookManager.instance) {
return WebhookManager.instance;
}
this.eventEmitter = new EventEmitter();
this.setupEventListeners();
WebhookManager.instance = this;
}
setupEventListeners() {
// Adding event listeners for different events
this.eventEmitter.on('playback_started', async (data) => {
await this.triggerEventWebhooks('playback_started', data);
});
this.eventEmitter.on('user_login', async (data) => {
await this.triggerEventWebhooks('user_login', data);
});
// If needed, add more event listeners here
}
async getWebhooksByEventType(eventType) {
return await dbInstance.query(
'SELECT * FROM webhooks WHERE trigger_type = $1 AND event_type = $2 AND enabled = true',
['event', eventType]
).then(res => res.rows);
}
async getScheduledWebhooks() {
return await dbInstance.query(
'SELECT * FROM webhooks WHERE trigger_type = $1 AND enabled = true',
['scheduled']
).then(res => res.rows);
}
async triggerEventWebhooks(eventType, data) {
const webhooks = await this.getWebhooksByEventType(eventType);
for (const webhook of webhooks) {
await this.executeWebhook(webhook, data);
}
}
async executeWebhook(webhook, data = {}) {
try {
let headers = {};
let payload = {};
const isDiscordWebhook = webhook.url.includes('discord.com/api/webhooks');
try {
headers = typeof webhook.headers === 'string'
? JSON.parse(webhook.headers || '{}')
: (webhook.headers || {});
payload = typeof webhook.payload === 'string'
? JSON.parse(webhook.payload || '{}')
: (webhook.payload || {});
} catch (e) {
console.error("[WEBHOOK] Error while parsing:", e);
return false;
}
if (isDiscordWebhook) {
console.log("[WEBHOOK] Webhook Discord detected");
await axios({
method: webhook.method || 'POST',
url: webhook.url,
headers: { 'Content-Type': 'application/json' },
data: payload,
timeout: 10000
});
console.log(`[WEBHOOK] Discord webhook ${webhook.name} send successfully`);
} else {
const compiledPayload = this.compileTemplate(payload, data);
await axios({
method: webhook.method || 'POST',
url: webhook.url,
headers,
data: compiledPayload,
timeout: 10000
});
console.log(`[WEBHOOK] Webhook ${webhook.name} send successfully`);
}
//Update the last triggered timestamp
await dbInstance.query(
'UPDATE webhooks SET last_triggered = NOW() WHERE id = $1',
[webhook.id]
);
return true;
} catch (error) {
console.error(`[WEBHOOK] Error triggering webhook ${webhook.name}:`, error.message);
if (error.response) {
console.error(`[WEBHOOK] Response status: ${error.response.status}`);
console.error(`[WEBHOOK] Response data:`, error.response.data);
}
return false;
}
}
compileTemplate(template, data) {
if (typeof template === 'object') {
return Object.keys(template).reduce((result, key) => {
result[key] = this.compileTemplate(template[key], data);
return result;
}, {});
} else if (typeof template === 'string') {
// Replace {{variable}} with the corresponding value from data
return template.replace(/\{\{([^}]+)\}\}/g, (match, path) => {
const keys = path.trim().split('.');
let value = data;
for (const key of keys) {
if (value === undefined) return match;
value = value[key];
}
return value !== undefined ? value : match;
});
}
return template;
}
emitEvent(eventType, data) {
this.eventEmitter.emit(eventType, data);
}
async getTopWatchedContent(contentType, period = 'month', limit = 5) {
// Calculate period start date
const today = new Date();
let startDate;
if (period === 'month') {
startDate = new Date(today.getFullYear(), today.getMonth() - 1, 1);
} else if (period === 'week') {
const day = today.getDay();
startDate = new Date(today.getFullYear(), today.getMonth(), today.getDate() - day - 7);
} else {
startDate = new Date(today.getFullYear(), today.getMonth() - 1, 1);
}
const formattedStartDate = startDate.toISOString().split('T')[0];
// SQL query to get top watched content
let query;
if (contentType === 'movie') {
query = `
SELECT
"NowPlayingItemName" as title,
COUNT(DISTINCT "UserId") as unique_viewers,
SUM("PlaybackDuration") / 60000 as total_minutes
FROM jf_playback_activity
WHERE "ActivityDateInserted" >= $1
AND "NowPlayingItemName" IS NOT NULL
AND "SeriesName" IS NULL
GROUP BY "NowPlayingItemName", "NowPlayingItemId"
ORDER BY total_minutes DESC
LIMIT $2
`;
} else if (contentType === 'series') {
query = `
SELECT
"SeriesName" as title,
COUNT(DISTINCT "UserId") as unique_viewers,
SUM("PlaybackDuration") / 60000 as total_minutes
FROM jf_playback_activity
WHERE "ActivityDateInserted" >= $1
AND "SeriesName" IS NOT NULL
GROUP BY "SeriesName"
ORDER BY total_minutes DESC
LIMIT $2
`;
}
try {
const result = await dbInstance.query(query, [formattedStartDate, limit]);
return result.rows || [];
} catch (error) {
console.error(`[WEBHOOK] Erreur SQL (${contentType}):`, error.message);
return [];
}
}
async getMonthlySummaryData() {
try {
// Get the top watched movies and series
const topMovies = await this.getTopWatchedContent('movie', 'month', 5);
const topSeries = await this.getTopWatchedContent('series', 'month', 5);
const prevMonth = new Date();
prevMonth.setMonth(prevMonth.getMonth() - 1);
const prevMonthStart = new Date(prevMonth.getFullYear(), prevMonth.getMonth(), 1);
const prevMonthEnd = new Date(prevMonth.getFullYear(), prevMonth.getMonth() + 1, 0);
const formattedStart = prevMonthStart.toISOString().split('T')[0];
const formattedEnd = prevMonthEnd.toISOString().split('T')[0];
// Get general statistics
const statsQuery = `
SELECT
COUNT(DISTINCT "UserId") as active_users,
COUNT(*) as total_plays,
SUM("PlaybackDuration") / 3600000 as total_hours
FROM jf_playback_activity
WHERE "ActivityDateInserted" BETWEEN $1 AND $2
`;
const statsResult = await dbInstance.query(statsQuery, [formattedStart, formattedEnd]);
const generalStats = statsResult.rows[0] || {
active_users: 0,
total_plays: 0,
total_hours: 0
};
return {
period: {
start: formattedStart,
end: formattedEnd,
name: prevMonth.toLocaleString('fr-FR', { month: 'long', year: 'numeric' })
},
topMovies,
topSeries,
stats: generalStats
};
} catch (error) {
console.error("[WEBHOOK] Erreur récupération données:", error.message);
throw error;
}
}
async triggerMonthlySummaryWebhook(webhookId) {
try {
// Get the webhook details
const result = await dbInstance.query(
'SELECT * FROM webhooks WHERE id = $1 AND enabled = true',
[webhookId]
);
if (result.rows.length === 0) {
console.error(`[WEBHOOK] Webhook ID ${webhookId} non trouvé ou désactivé`);
return false;
}
const webhook = result.rows[0];
// Generate the monthly summary data
try {
const data = await this.getMonthlySummaryData();
const moviesFields = data.topMovies.map((movie, index) => ({
name: `${index + 1}. ${movie.title}`,
value: `${Math.round(movie.total_minutes)} minutes • ${movie.unique_viewers} spectateurs`,
inline: false
}));
const seriesFields = data.topSeries.map((series, index) => ({
name: `${index + 1}. ${series.title}`,
value: `${Math.round(series.total_minutes)} minutes • ${series.unique_viewers} spectateurs`,
inline: false
}));
const monthlyPayload = {
content: `📊 **Rapport mensuel - ${data.period.name}**`,
embeds: [
{
title: "🎬 Most Watched Movies",
color: 15844367, // Orange
fields: moviesFields.length > 0 ? moviesFields : [{ name: "Aucune donnée", value: "Pas de films regardés ce mois-ci" }]
},
{
title: "📺 Most Watched Series",
color: 5793266, // Bleu
fields: seriesFields.length > 0 ? seriesFields : [{ name: "Aucune donnée", value: "Pas de séries regardées ce mois-ci" }]
},
{
title: "📈 General Statistics",
color: 5763719, // Vert
fields: [
{
name: "Utilisateurs actifs",
value: `${data.stats.active_users || 0}`,
inline: true
},
{
name: "Lectures totales",
value: `${data.stats.total_plays || 0}`,
inline: true
},
{
name: "Heures visionnées",
value: `${Math.round(data.stats.total_hours || 0)}`,
inline: true
}
],
footer: {
text: `Période: du ${new Date(data.period.start).toLocaleDateString('fr-FR')} au ${new Date(data.period.end).toLocaleDateString('fr-FR')}`
}
}
]
};
// Send the webhook
await axios({
method: webhook.method || 'POST',
url: webhook.url,
headers: { 'Content-Type': 'application/json' },
data: monthlyPayload,
timeout: 10000
});
console.log(`[WEBHOOK] Rapport mensuel envoyé avec succès via ${webhook.name}`);
// Update the last triggered timestamp
await dbInstance.query(
'UPDATE webhooks SET last_triggered = NOW() WHERE id = $1',
[webhook.id]
);
return true;
} catch (dataError) {
console.error(`[WEBHOOK] Erreur préparation données:`, dataError.message);
return false;
}
} catch (error) {
console.error(`[WEBHOOK] Erreur lors de l'envoi du rapport mensuel:`, error.message);
return false;
}
}
}
module.exports = WebhookManager;

View File

@@ -0,0 +1,53 @@
const cron = require('node-cron');
const WebhookManager = require('./webhook-manager');
const dbInstance = require('../db');
class WebhookScheduler {
constructor() {
this.webhookManager = new WebhookManager();
this.cronJobs = {};
this.loadScheduledWebhooks();
}
async loadScheduledWebhooks() {
try {
const webhooks = await this.webhookManager.getScheduledWebhooks();
// Clean existing tasks
Object.values(this.cronJobs).forEach(job => job.stop());
this.cronJobs = {};
// Create new tasks
webhooks.forEach(webhook => {
if (webhook.schedule && cron.validate(webhook.schedule)) {
this.scheduleWebhook(webhook);
} else {
console.error(`[WEBHOOK] Invalid cron schedule for webhook ${webhook.id}: ${webhook.schedule}`);
}
});
console.log(`[WEBHOOK] Scheduled ${Object.keys(this.cronJobs).length} webhooks`);
} catch (error) {
console.error('[WEBHOOK] Failed to load scheduled webhooks:', error);
}
}
scheduleWebhook(webhook) {
try {
this.cronJobs[webhook.id] = cron.schedule(webhook.schedule, async () => {
console.log(`[WEBHOOK] Executing scheduled webhook: ${webhook.name}`);
await this.webhookManager.executeWebhook(webhook);
});
console.log(`[WEBHOOK] Webhook ${webhook.name} scheduled with cron: ${webhook.schedule}`);
} catch (error) {
console.error(`[WEBHOOK] Error scheduling webhook ${webhook.id}:`, error);
}
}
async refreshSchedule() {
await this.loadScheduledWebhooks();
}
}
module.exports = WebhookScheduler;

View File

@@ -0,0 +1,23 @@
exports.up = function(knex) {
return knex.schema.createTable('webhooks', table => {
table.increments('id').primary();
table.string('name').notNullable();
table.string('url').notNullable();
table.text('headers').defaultTo('{}');
table.text('payload').defaultTo('{}');
table.string('method').defaultTo('POST');
table.string('trigger_type').notNullable();
table.string('webhook_type').defaultTo('generic');
table.string('schedule').nullable();
table.string('event_type').nullable();
table.boolean('enabled').defaultTo(true);
table.timestamp('last_triggered').nullable();
table.boolean('retry_on_failure').defaultTo(false);
table.integer('max_retries').defaultTo(3);
table.timestamps(true, true);
});
};
exports.down = function(knex) {
return knex.schema.dropTable('webhooks');
};

214
backend/routes/webhooks.js Normal file
View File

@@ -0,0 +1,214 @@
const express = require('express');
const router = express.Router();
const dbInstance = require('../db');
const WebhookManager = require('../classes/webhook-manager');
const WebhookScheduler = require('../classes/webhook-scheduler');
const webhookScheduler = new WebhookScheduler();
const webhookManager = new WebhookManager();
// Get all webhooks
router.get('/', async (req, res) => {
try {
const result = await dbInstance.query('SELECT * FROM webhooks ORDER BY id DESC');
res.json(result.rows);
} catch (error) {
console.error('Error fetching webhooks:', error);
res.status(500).json({ error: 'Failed to fetch webhooks' });
}
});
// Get a specific webhook by ID
router.get('/:id', async (req, res) => {
try {
const { id } = req.params;
const result = await dbInstance.query('SELECT * FROM webhooks WHERE id = $1', [id]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Webhook not found' });
}
res.json(result.rows[0]);
} catch (error) {
console.error('Error fetching webhook:', error);
res.status(500).json({ error: 'Failed to fetch webhook' });
}
});
// Create a new webhook
router.post('/', async (req, res) => {
try {
const {
name,
url,
headers,
payload,
method,
trigger_type,
schedule,
event_type,
enabled,
retry_on_failure,
max_retries
} = req.body;
if (!name || !url || !trigger_type) {
return res.status(400).json({ error: 'Name, URL and trigger type are required' });
}
if (trigger_type === 'scheduled' && !schedule) {
return res.status(400).json({ error: 'Schedule is required for scheduled webhooks' });
}
if (trigger_type === 'event' && !event_type) {
return res.status(400).json({ error: 'Event type is required for event webhooks' });
}
const result = await dbInstance.query(
`INSERT INTO webhooks (name, url, headers, payload, method, trigger_type, schedule, event_type, enabled, retry_on_failure, max_retries)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING *`,
[
name,
url,
JSON.stringify(headers || {}),
JSON.stringify(payload || {}),
method || 'POST',
trigger_type,
schedule,
event_type,
enabled !== undefined ? enabled : true,
retry_on_failure || false,
max_retries || 3
]
);
// Refresh the schedule if the webhook is scheduled
if (trigger_type === 'scheduled' && enabled) {
await webhookScheduler.refreshSchedule();
}
res.status(201).json(result.rows[0]);
} catch (error) {
console.error('Error creating webhook:', error);
res.status(500).json({ error: 'Failed to create webhook' });
}
});
// Update a webhook
router.put('/:id', async (req, res) => {
try {
const { id } = req.params;
const {
name,
url,
headers,
payload,
method,
trigger_type,
schedule,
event_type,
enabled,
retry_on_failure,
max_retries
} = req.body;
if (!name || !url || !trigger_type) {
return res.status(400).json({ error: 'Name, URL and trigger type are required' });
}
const result = await dbInstance.query(
`UPDATE webhooks
SET name = $1, url = $2, headers = $3, payload = $4, method = $5,
trigger_type = $6, schedule = $7, event_type = $8, enabled = $9,
retry_on_failure = $10, max_retries = $11
WHERE id = $12
RETURNING *`,
[
name,
url,
JSON.stringify(headers || {}),
JSON.stringify(payload || {}),
method || 'POST',
trigger_type,
schedule,
event_type,
enabled !== undefined ? enabled : true,
retry_on_failure || false,
max_retries || 3,
id
]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Webhook not found' });
}
// Refresh the schedule if the webhook is scheduled
await webhookScheduler.refreshSchedule();
res.json(result.rows[0]);
} catch (error) {
console.error('Error updating webhook:', error);
res.status(500).json({ error: 'Failed to update webhook' });
}
});
// Delete a webhook
router.delete('/:id', async (req, res) => {
try {
const { id } = req.params;
const result = await dbInstance.query('DELETE FROM webhooks WHERE id = $1 RETURNING *', [id]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Webhook not found' });
}
// Refresh the schedule if the webhook was scheduled
await webhookScheduler.refreshSchedule();
res.json({ message: 'Webhook deleted successfully', webhook: result.rows[0] });
} catch (error) {
console.error('Error deleting webhook:', error);
res.status(500).json({ error: 'Failed to delete webhook' });
}
});
// Test a webhook
router.post('/:id/test', async (req, res) => {
try {
const { id } = req.params;
const result = await dbInstance.query('SELECT * FROM webhooks WHERE id = $1', [id]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Webhook not found' });
}
const webhook = result.rows[0];
const testData = req.body || {};
const success = await webhookManager.executeWebhook(webhook, testData);
if (success) {
res.json({ message: 'Webhook executed successfully' });
} else {
res.status(500).json({ error: 'Webhook execution failed' });
}
} catch (error) {
console.error('Error testing webhook:', error);
res.status(500).json({ error: 'Failed to test webhook' });
}
});
router.post('/:id/trigger-monthly', async (req, res) => {
const webhookManager = new WebhookManager();
const success = await webhookManager.triggerMonthlySummaryWebhook(req.params.id);
if (success) {
res.status(200).json({ message: "Rapport mensuel envoyé avec succès" });
} else {
res.status(500).json({ message: "Échec de l'envoi du rapport mensuel" });
}
});
module.exports = router;

View File

@@ -25,11 +25,13 @@ const statsRouter = require("./routes/stats");
const backupRouter = require("./routes/backup");
const logRouter = require("./routes/logging");
const utilsRouter = require("./routes/utils");
const webhooksRouter = require('./routes/webhooks');
// tasks
const ActivityMonitor = require("./tasks/ActivityMonitor");
const TaskManager = require("./classes/task-manager-singleton");
const TaskScheduler = require("./classes/task-scheduler-singleton");
const WebhookScheduler = require('./classes/webhook-scheduler');
// const tasks = require("./tasks/tasks");
// websocket
@@ -165,6 +167,9 @@ app.use("/logs", authenticate, logRouter, () => {
app.use("/utils", authenticate, utilsRouter, () => {
/* #swagger.tags = ['Utils']*/
}); // mount the API router at /utils, with JWT middleware
app.use("/webhooks", authenticate, webhooksRouter, () => {
/* #swagger.tags = ['Webhooks']*/
}); // mount the API router at /webhooks, with JWT middleware
// Swagger
app.use("/swagger", swaggerUi.serve, swaggerUi.setup(swaggerDocument));
@@ -243,6 +248,7 @@ try {
ActivityMonitor.ActivityMonitor(1000);
new TaskManager();
new TaskScheduler();
new WebhookScheduler();
});
});
});

40
package-lock.json generated
View File

@@ -6,7 +6,7 @@
"packages": {
"": {
"name": "jfstat",
"version": "1.1.4",
"version": "1.1.5",
"dependencies": {
"@emotion/react": "^11.14.0",
"@emotion/styled": "^11.14.0",
@@ -46,6 +46,7 @@
"memoizee": "^0.4.17",
"moment": "^2.29.4",
"multer": "^1.4.5-lts.1",
"node-cron": "^3.0.3",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",
"pg": "^8.9.0",
@@ -15353,6 +15354,18 @@
"tslib": "^2.0.3"
}
},
"node_modules/node-cron": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz",
"integrity": "sha512-dOal67//nohNgYWb+nWmg5dkFdIwDm8EpeGYMekPMrngV3637lqnX0lbUcCtgibHTz6SEz7DAIjKvKDFYCnO1A==",
"license": "ISC",
"dependencies": {
"uuid": "8.3.2"
},
"engines": {
"node": ">=6.0.0"
}
},
"node_modules/node-fetch": {
"version": "2.7.0",
"resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz",
@@ -19460,14 +19473,6 @@
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"node_modules/sequelize/node_modules/uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
"bin": {
"uuid": "dist/bin/uuid"
}
},
"node_modules/serialize-javascript": {
"version": "6.0.1",
"resolved": "https://registry.npmjs.org/serialize-javascript/-/serialize-javascript-6.0.1.tgz",
@@ -19793,14 +19798,6 @@
"websocket-driver": "^0.7.4"
}
},
"node_modules/sockjs/node_modules/uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
"bin": {
"uuid": "dist/bin/uuid"
}
},
"node_modules/source-list-map": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/source-list-map/-/source-list-map-2.0.1.tgz",
@@ -21514,6 +21511,15 @@
"node": ">= 0.4.0"
}
},
"node_modules/uuid": {
"version": "8.3.2",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz",
"integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==",
"license": "MIT",
"bin": {
"uuid": "dist/bin/uuid"
}
},
"node_modules/v8-to-istanbul": {
"version": "8.1.1",
"resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-8.1.1.tgz",

View File

@@ -53,6 +53,7 @@
"memoizee": "^0.4.17",
"moment": "^2.29.4",
"multer": "^1.4.5-lts.1",
"node-cron": "^3.0.3",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",
"pg": "^8.9.0",