Refactored Activity monitor code to be easily debugged and to fix issue #234

Added pkName  option to deleteBulk query for Primary keys that are not called "Id"
This commit is contained in:
CyferShepard
2024-08-08 23:15:38 +02:00
parent 159679377b
commit f77a440b9d
2 changed files with 119 additions and 79 deletions

View File

@@ -28,7 +28,7 @@ pool.on("error", (err, client) => {
//process.exit(-1);
});
async function deleteBulk(table_name, data) {
async function deleteBulk(table_name, data, pkName) {
const client = await pool.connect();
let result = "SUCCESS";
let message = "";
@@ -37,7 +37,7 @@ async function deleteBulk(table_name, data) {
if (data && data.length !== 0) {
const deleteQuery = {
text: `DELETE FROM ${table_name} WHERE "Id" IN (${pgp.as.csv(data)})`,
text: `DELETE FROM ${table_name} WHERE "${pkName ?? "Id"}" IN (${pgp.as.csv(data)})`,
};
// console.log(deleteQuery);
await client.query(deleteQuery);

View File

@@ -1,15 +1,115 @@
const db = require("../db");
const pgp = require("pg-promise")();
const moment = require("moment");
const { columnsPlayback, mappingPlayback } = require("../models/jf_playback_activity");
const { columnsPlayback } = require("../models/jf_playback_activity");
const { jf_activity_watchdog_columns, jf_activity_watchdog_mapping } = require("../models/jf_activity_watchdog");
const configClass = require("../classes/config");
const API = require("../classes/api-loader");
const { sendUpdate } = require("../ws");
async function ActivityMonitor(interval) {
async function getSessionsInWatchDog(SessionData, WatchdogData) {
let existingData = await WatchdogData.filter((wdData) => {
return SessionData.some((sessionData) => {
let NowPlayingItemId = sessionData.NowPlayingItem.SeriesId || sessionData.NowPlayingItem.Id;
let matchesEpisodeId =
sessionData.NowPlayingItem.SeriesId != undefined ? wdData.EpisodeId === sessionData.NowPlayingItem.Id : true;
let matchingSessionFound =
wdData.Id === sessionData.Id &&
wdData.UserId === sessionData.UserId &&
wdData.DeviceId === sessionData.DeviceId &&
wdData.NowPlayingItemId === NowPlayingItemId &&
matchesEpisodeId;
if (matchingSessionFound && wdData.IsPaused != sessionData.PlayState.IsPaused) {
wdData.IsPaused = sessionData.PlayState.IsPaused;
//if the playstate was paused, calculate the difference in seconds and add to the playback duration
if (sessionData.PlayState.IsPaused == true) {
let startTime = moment(wdData.ActivityDateInserted, "YYYY-MM-DD HH:mm:ss.SSSZ");
let lastPausedDate = moment(sessionData.LastPausedDate);
let diffInSeconds = lastPausedDate.diff(startTime, "seconds");
wdData.PlaybackDuration = parseInt(wdData.PlaybackDuration) + diffInSeconds;
wdData.ActivityDateInserted = `${lastPausedDate.format("YYYY-MM-DD HH:mm:ss.SSSZ")}`;
}
return true;
}
return false; // we return false if playstate didnt change to reduce db writes
});
});
return existingData;
}
async function getSessionsNotInWatchDog(SessionData, WatchdogData) {
let newData = await SessionData.filter((sessionData) => {
if (WatchdogData.length === 0) return true;
return WatchdogData.some((wdData) => {
let NowPlayingItemId = sessionData.NowPlayingItem.SeriesId || sessionData.NowPlayingItem.Id;
let matchesEpisodeId =
sessionData.NowPlayingItem.SeriesId != undefined ? wdData.EpisodeId === sessionData.NowPlayingItem.Id : true;
let matchingSessionFound =
wdData.Id === sessionData.Id &&
wdData.UserId === sessionData.UserId &&
wdData.DeviceId === sessionData.DeviceId &&
wdData.NowPlayingItemId === NowPlayingItemId &&
matchesEpisodeId;
return !matchingSessionFound;
});
}).map(jf_activity_watchdog_mapping);
return newData;
}
function getWatchDogNotInSessions(SessionData, WatchdogData) {
let removedData = WatchdogData.filter((wdData) => {
if (SessionData.length === 0) return true;
return SessionData.some((sessionData) => {
let NowPlayingItemId = sessionData.NowPlayingItem.SeriesId || sessionData.NowPlayingItem.Id;
let matchesEpisodeId =
sessionData.NowPlayingItem.SeriesId != undefined ? wdData.EpisodeId === sessionData.NowPlayingItem.Id : true;
let noMatchingSessionFound = !(
wdData.Id === sessionData.Id &&
wdData.UserId === sessionData.UserId &&
wdData.DeviceId === sessionData.DeviceId &&
wdData.NowPlayingItemId === NowPlayingItemId &&
matchesEpisodeId
);
return noMatchingSessionFound;
});
});
//this is to update the playback duration for the removed items where it was playing before stopped as duration is only updated on pause
removedData.map((obj) => {
obj.Id = obj.ActivityId;
let startTime = moment(obj.ActivityDateInserted, "YYYY-MM-DD HH:mm:ss.SSSZ");
let endTime = moment();
let diffInSeconds = endTime.diff(startTime, "seconds");
if (obj.IsPaused == false) {
obj.PlaybackDuration = parseInt(obj.PlaybackDuration) + diffInSeconds;
}
obj.ActivityDateInserted = endTime.format("YYYY-MM-DD HH:mm:ss.SSSZ");
const { ...rest } = obj;
return { ...rest };
});
return removedData;
}
async function ActivityMonitor(interval) {
// console.log("Activity Interval: " + interval);
setInterval(async () => {
@@ -30,64 +130,21 @@ async function ActivityMonitor(interval) {
if (SessionData.length === 0 && WatchdogData.length === 0) {
return;
}
// New Code
// //compare to sessiondata
let WatchdogDataToInsert = await getSessionsNotInWatchDog(SessionData, WatchdogData);
let WatchdogDataToUpdate = await getSessionsInWatchDog(SessionData, WatchdogData);
let dataToRemove = await getWatchDogNotInSessions(SessionData, WatchdogData);
console.clear();
let WatchdogDataToInsert = [];
let WatchdogDataToUpdate = [];
// console.log("New Data: ", WatchdogDataToInsert.length);
// console.log("Existing Data: ", WatchdogDataToUpdate.length);
// console.log("Removed Data: ", dataToRemove.length);
/////////////////
//filter fix if table is empty
if (WatchdogData.length === 0) {
// if there are no existing Ids in the table, map all items in the data array to the expected format
WatchdogDataToInsert = await SessionData.map(jf_activity_watchdog_mapping);
} else {
// otherwise, filter only new data to insert
WatchdogDataToInsert = await SessionData.filter((sessionData) => {
return !WatchdogData.some(
(wdData) =>
wdData.Id === sessionData.Id &&
wdData.UserId === sessionData.UserId &&
wdData.DeviceId === sessionData.DeviceId &&
(sessionData.NowPlayingItem.SeriesId != undefined
? wdData.NowPlayingItemId === sessionData.NowPlayingItem.SeriesId
: wdData.NowPlayingItemId === sessionData.NowPlayingItem.Id) &&
(sessionData.NowPlayingItem.SeriesId != undefined ? wdData.EpisodeId === sessionData.NowPlayingItem.Id : true)
);
}).map(jf_activity_watchdog_mapping);
WatchdogDataToUpdate = WatchdogData.filter((wdData) => {
const session = SessionData.find(
(sessionData) =>
wdData.Id === sessionData.Id &&
wdData.UserId === sessionData.UserId &&
wdData.DeviceId === sessionData.DeviceId &&
(sessionData.NowPlayingItem.SeriesId != undefined
? wdData.NowPlayingItemId === sessionData.NowPlayingItem.SeriesId
: wdData.NowPlayingItemId === sessionData.NowPlayingItem.Id) &&
(sessionData.NowPlayingItem.SeriesId != undefined ? wdData.EpisodeId === sessionData.NowPlayingItem.Id : true)
);
if (session && session.PlayState) {
if (wdData.IsPaused != session.PlayState.IsPaused) {
wdData.IsPaused = session.PlayState.IsPaused;
if (session.PlayState.IsPaused == true) {
let startTime = moment(wdData.ActivityDateInserted, "YYYY-MM-DD HH:mm:ss.SSSZ");
let lastPausedDate = moment(session.LastPausedDate);
let diffInSeconds = lastPausedDate.diff(startTime, "seconds");
wdData.PlaybackDuration = parseInt(wdData.PlaybackDuration) + diffInSeconds;
wdData.ActivityDateInserted = `${lastPausedDate.format("YYYY-MM-DD HH:mm:ss.SSSZ")}`;
}
return true;
}
}
return false;
});
}
if (WatchdogDataToInsert.length > 0) {
//insert new rows where not existing items
// console.log("Inserted " + WatchdogDataToInsert.length + " wd playback records");
@@ -102,26 +159,9 @@ async function ActivityMonitor(interval) {
//delete from db no longer in session data and insert into stats db
//Bulk delete from db thats no longer on api
const toDeleteIds = WatchdogData.filter((id) => !SessionData.some((row) => row.Id === id.Id)).map((row) => row.Id);
const toDeleteIds = dataToRemove.map((row) => row.ActivityId);
const playbackData = WatchdogData.filter((id) => !SessionData.some((row) => row.Id === id.Id));
let playbackToInsert = playbackData.map((obj) => {
obj.Id = obj.ActivityId;
let startTime = moment(obj.ActivityDateInserted, "YYYY-MM-DD HH:mm:ss.SSSZ");
let endTime = moment();
let diffInSeconds = endTime.diff(startTime, "seconds");
if (obj.IsPaused == false) {
obj.PlaybackDuration = parseInt(obj.PlaybackDuration) + diffInSeconds;
}
obj.ActivityDateInserted = endTime.format("YYYY-MM-DD HH:mm:ss.SSSZ");
const { ...rest } = obj;
return { ...rest };
});
let playbackToInsert = dataToRemove;
if (playbackToInsert.length == 0 && toDeleteIds.length == 0) {
return;
@@ -176,7 +216,7 @@ async function ActivityMonitor(interval) {
ExistingDataToUpdate = ExistingDataToUpdate.filter((pb) => pb.PlaybackDuration > 0);
if (toDeleteIds.length > 0) {
await db.deleteBulk("jf_activity_watchdog", toDeleteIds);
await db.deleteBulk("jf_activity_watchdog", toDeleteIds, "ActivityId");
}
if (playbackToInsert.length > 0) {
await db.insertBulk("jf_playback_activity", playbackToInsert, columnsPlayback);
@@ -191,7 +231,7 @@ async function ActivityMonitor(interval) {
///////////////////////////
} catch (error) {
if (error?.code === "ECONNREFUSED") {
console.error("Error: Unable to connect to API");//TO-DO Change this to correct API name
console.error("Error: Unable to connect to API"); //TO-DO Change this to correct API name
} else if (error?.code === "ERR_BAD_RESPONSE") {
console.warn(error.response?.data);
} else {