From 6fac8127a097abefd201d85d966e9b2a755269b0 Mon Sep 17 00:00:00 2001 From: CyferShepard Date: Sun, 28 Jul 2024 21:04:10 +0200 Subject: [PATCH] Sync rework, this overhauls the sync process to process by chuncking and processing data. This fixes an index out of range memory error due to large sets of libraries being fetched and kept in an array for final processing. Changes made to the api classes to allow for individual calls as they previously looped to fetch each chunck before returning the final large data set. Also added a fix to prevent a full sync or partial sync from triggering when the other one is already running --- backend/classes/emby-api.js | 20 +- backend/classes/jellyfin-api.js | 20 +- backend/nodemon.json | 3 + backend/routes/sync.js | 652 +++++++++++++------- backend/tasks/FullSyncTask.js | 168 +++-- backend/tasks/RecentlyAddedItemsSyncTask.js | 19 +- 6 files changed, 537 insertions(+), 345 deletions(-) create mode 100644 backend/nodemon.json diff --git a/backend/classes/emby-api.js b/backend/classes/emby-api.js index f975c9e..e12a423 100644 --- a/backend/classes/emby-api.js +++ b/backend/classes/emby-api.js @@ -192,10 +192,10 @@ class EmbyAPI { url += `&Ids=${itemid}`; } - let startIndex = params && params.startIndex ? params.startIndex : 0; - let increment = params && params.increment ? params.increment : 200; + let startIndex = params && params.startIndex !== undefined ? params.startIndex : 0; + let increment = params && params.increment !== undefined ? params.increment : 200; let recursive = params && params.recursive !== undefined ? params.recursive : true; - let total = 200; + let total = startIndex + increment; let AllItems = []; while (startIndex < total || total === undefined) { @@ -210,6 +210,8 @@ class EmbyAPI { limit: increment, isMissing: false, excludeLocationTypes: "Virtual", + sortBy: "DateCreated", + sortOrder: "Descending", }, }); @@ -220,11 +222,15 @@ class EmbyAPI { AllItems.push(...result); - if (response.data.TotalRecordCount === undefined) { - break; - } if (ws && syncTask && wsMessage) { - ws(syncTask.wsKey, { type: "Update", message: `${wsMessage} - ${((startIndex / total) * 100).toFixed(2)}%` }); + ws(syncTask.wsKey, { + type: "Update", + message: `${wsMessage} - ${((Math.min(startIndex, total) / total) * 100).toFixed(2)}%`, + }); + } + + if (response.data.TotalRecordCount === undefined || (params && params.startIndex !== undefined)) { + break; } await this.#delay(10); diff --git a/backend/classes/jellyfin-api.js b/backend/classes/jellyfin-api.js index 3dc4383..6f1b27a 100644 --- a/backend/classes/jellyfin-api.js +++ b/backend/classes/jellyfin-api.js @@ -190,10 +190,10 @@ class JellyfinAPI { url += `&Ids=${itemid}`; } - let startIndex = params && params.startIndex ? params.startIndex : 0; - let increment = params && params.increment ? params.increment : 200; + let startIndex = params && params.startIndex !== undefined ? params.startIndex : 0; + let increment = params && params.increment !== undefined ? params.increment : 200; let recursive = params && params.recursive !== undefined ? params.recursive : true; - let total = 200; + let total = startIndex + increment; let AllItems = []; while (startIndex < total || total === undefined) { @@ -208,6 +208,8 @@ class JellyfinAPI { limit: increment, isMissing: false, excludeLocationTypes: "Virtual", + sortBy: "DateCreated", + sortOrder: "Descending", }, }); @@ -218,11 +220,15 @@ class JellyfinAPI { AllItems.push(...result); - if (response.data.TotalRecordCount === undefined) { - break; - } if (ws && syncTask && wsMessage) { - ws(syncTask.wsKey, { type: "Update", message: `${wsMessage} - ${((startIndex / total) * 100).toFixed(2)}%` }); + ws(syncTask.wsKey, { + type: "Update", + message: `${wsMessage} - ${((Math.min(startIndex, total) / total) * 100).toFixed(2)}%`, + }); + } + + if (response.data.TotalRecordCount === undefined || (params && params.startIndex !== undefined)) { + break; } await this.#delay(10); diff --git a/backend/nodemon.json b/backend/nodemon.json new file mode 100644 index 0000000..fd998ed --- /dev/null +++ b/backend/nodemon.json @@ -0,0 +1,3 @@ +{ + "ignore": ["backend/backup-data", "*.json"] +} diff --git a/backend/routes/sync.js b/backend/routes/sync.js index 1603c48..8aaca31 100644 --- a/backend/routes/sync.js +++ b/backend/routes/sync.js @@ -14,7 +14,6 @@ const triggertype = require("../logging/triggertype"); const configClass = require("../classes/config"); const API = require("../classes/api-loader"); - const router = express.Router(); const { jf_libraries_columns, jf_libraries_mapping } = require("../models/jf_libraries"); @@ -47,7 +46,7 @@ class sync { async insertData(tablename, dataToInsert, column_mappings) { let result = await db.insertBulk(tablename, dataToInsert, column_mappings); if (result.Result === "SUCCESS") { - syncTask.loggedData.push({ color: "dodgerblue", Message: dataToInsert.length + " Rows Inserted." }); + // syncTask.loggedData.push({ color: "dodgerblue", Message: dataToInsert.length + " Rows Inserted." }); } else { syncTask.loggedData.push({ color: "red", @@ -81,7 +80,7 @@ class sync { async function syncUserData() { sendUpdate(syncTask.wsKey, { type: "Update", message: "Syncing User Data" }); - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 1/7" }); + syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 1/4" }); syncTask.loggedData.push({ color: "yellow", Message: "Beginning User Sync" }); const _sync = new sync(); @@ -110,7 +109,7 @@ async function syncUserData() { async function syncLibraryFolders(data, existing_excluded_libraries) { sendUpdate(syncTask.wsKey, { type: "Update", message: "Syncing Library Folders" }); - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 2/7" }); + syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 2/4" }); syncTask.loggedData.push({ color: "yellow", Message: "Beginning Library Sync" }); const _sync = new sync(); const existingIds = await db @@ -144,21 +143,16 @@ async function syncLibraryFolders(data, existing_excluded_libraries) { } await _sync.updateSingleFieldOnDB("jf_libraries", toArchiveLibraryIds, "archived", true); - - syncTask.loggedData.push({ color: "yellow", Message: "Library Sync Complete" }); } + syncTask.loggedData.push({ color: "yellow", Message: "Library Sync Complete" }); } async function syncLibraryItems(data) { const _sync = new sync(); const existingLibraryIds = await _sync.getExistingIDsforTable("jf_libraries"); // get existing library Ids from the db - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 3/7" }); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Beginning Library Item Sync (3/7)" }); - syncTask.loggedData.push({ color: "yellow", Message: "Beginning Library Item Sync" }); - data = data.filter((row) => existingLibraryIds.includes(row.ParentId)); - const existingIds = await _sync.getExistingIDsforTable("jf_library_items where archived=false"); + const existingIds = await _sync.getExistingIDsforTable("jf_library_items"); let dataToInsert = await data.map(jf_library_items_mapping); dataToInsert = dataToInsert.filter((item) => item.Id !== undefined); @@ -171,144 +165,130 @@ async function syncLibraryItems(data) { await _sync.insertData("jf_library_items", dataToInsert, jf_library_items_columns); } - syncTask.loggedData.push({ - color: "dodgerblue", - Message: `${ - syncTask.taskName === taskName.partialsync ? dataToInsert.length : Math.max(dataToInsert.length - existingIds.length, 0) - } Rows Inserted. ${syncTask.taskName === taskName.partialsync ? 0 : existingIds.length} Rows Updated.`, - }); + return { + insertedItemsCount: + syncTask.taskName === taskName.partialsync ? dataToInsert.length : Math.max(dataToInsert.length - existingIds.length, 0), + updatedItemsCount: syncTask.taskName === taskName.partialsync ? 0 : existingIds.length, + }; +} - if (syncTask.taskName === taskName.fullsync) { - let toArchiveIds = existingIds.filter((id) => !data.some((row) => row.Id === id)); - - if (toArchiveIds.length > 0) { - await _sync.updateSingleFieldOnDB("jf_library_items", toArchiveIds, "archived", true); - } +async function archiveLibraryItems(fetchedData) { + const _sync = new sync(); + const existingIds = await _sync.getExistingIDsforTable("jf_library_items where archived=false"); + let toArchiveIds = existingIds.filter((id) => !fetchedData.some((row) => row === id)); + if (toArchiveIds.length > 0) { + await _sync.updateSingleFieldOnDB("jf_library_items", toArchiveIds, "archived", true); syncTask.loggedData.push({ color: "orange", Message: toArchiveIds.length + " Library Items Archived." }); } - - syncTask.loggedData.push({ color: "yellow", Message: "Item Sync Complete" }); } -async function syncShowItems(data) { - const _sync = new sync(); - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 4/7" }); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Beginning Show Item Sync (4/7)" }); - syncTask.loggedData.push({ color: "yellow", Message: "Beginning Seasons and Episode sync" }); - - const { rows: shows } = await db.query(`SELECT * FROM public.jf_library_items where "Type"='Series'`); +async function syncSeasons(seasons) { + const shows = seasons.map((season) => season.SeriesId); let insertSeasonsCount = 0; - let insertEpisodeCount = 0; let updateSeasonsCount = 0; - let updateEpisodeCount = 0; - //loop for each show for (const show of shows) { - //get all seasons and episodes for this show from the data - const allSeasons = data.filter((item) => item.Type === "Season" && item.SeriesId === show.Id); - const allEpisodes = data.filter((item) => item.Type === "Episode" && item.SeriesId === show.Id); + const existingIdsSeasons = await db + .query(`SELECT * FROM public.jf_library_seasons where "SeriesId" = '${show}'`) + .then((res) => res.rows.map((row) => row.Id)); - if (allSeasons.length > 0 || allEpisodes.length > 0) { - const existingIdsSeasons = await db - .query(`SELECT * FROM public.jf_library_seasons where "SeriesId" = '${show.Id}'`) - .then((res) => res.rows.map((row) => row.Id)); - let existingIdsEpisodes = []; - if (existingIdsSeasons.length > 0) { - existingIdsEpisodes = await db - .query( - `SELECT * FROM public.jf_library_episodes WHERE "SeasonId" IN (${existingIdsSeasons - .filter((seasons) => seasons !== "") - .map((seasons) => pgp.as.value(seasons)) - .map((value) => "'" + value + "'") - .join(", ")})` - ) - .then((res) => res.rows.map((row) => row.EpisodeId)); - } + let seasonsToInsert = []; + seasonsToInsert = await seasons.filter((season) => season.SeriesId == show).map(jf_library_seasons_mapping); - let seasonsToInsert = []; - let episodesToInsert = []; + if (syncTask.taskName === taskName.partialsync) { + seasonsToInsert = seasonsToInsert.filter((season) => !existingIdsSeasons.some((id) => id === season.Id)); + } - seasonsToInsert = await allSeasons.map(jf_library_seasons_mapping); - episodesToInsert = await allEpisodes.map(jf_library_episodes_mapping); - - //for partial sync, dont overwrite existing data - if (syncTask.taskName === taskName.partialsync) { - seasonsToInsert = seasonsToInsert.filter((season) => !existingIdsSeasons.some((id) => id === season.Id)); - episodesToInsert = episodesToInsert.filter((episode) => !existingIdsEpisodes.some((id) => id === episode.EpisodeId)); - } - - //Bulkinsert new seasons not on db - if (seasonsToInsert.length !== 0) { - let result = await db.insertBulk("jf_library_seasons", seasonsToInsert, jf_library_seasons_columns); - if (result.Result === "SUCCESS") { - insertSeasonsCount += - syncTask.taskName === taskName.partialsync - ? seasonsToInsert.length - : Math.max(seasonsToInsert.length - existingIdsSeasons.length, 0); - updateSeasonsCount += syncTask.taskName === taskName.partialsync ? 0 : existingIdsSeasons.length; - } else { - syncTask.loggedData.push({ - color: "red", - Message: "Error performing bulk insert:" + result.message, - }); - await logging.updateLog(syncTask.uuid, syncTask.loggedData, taskstate.FAILED); - } - } - - //Bulkinsert new episodes not on db - if (episodesToInsert.length !== 0) { - let result = await db.insertBulk("jf_library_episodes", episodesToInsert, jf_library_episodes_columns); - if (result.Result === "SUCCESS") { - insertEpisodeCount += - syncTask.taskName === taskName.partialsync - ? episodesToInsert.length - : Math.max(episodesToInsert.length - existingIdsEpisodes.length, 0); - updateEpisodeCount += syncTask.taskName === taskName.partialsync ? 0 : existingIdsEpisodes.length; - } else { - syncTask.loggedData.push({ - color: "red", - Message: "Error performing bulk insert:" + result.message, - }); - await logging.updateLog(syncTask.uuid, syncTask.loggedData, taskstate.FAILED); - } - } - - if (syncTask.taskName === taskName.fullsync) { - let toArchiveSeasons = existingIdsSeasons.filter((id) => !seasonsToInsert.some((row) => row.Id === id)); - let toArchiveEpisodes = existingIdsEpisodes.filter( - (EpisodeId) => !episodesToInsert.some((row) => row.EpisodeId === EpisodeId) - ); - - if (toArchiveSeasons.length > 0) { - await _sync.updateSingleFieldOnDB("jf_library_seasons", toArchiveSeasons, "archived", true); - syncTask.loggedData.push({ color: "orange", Message: toArchiveSeasons.length + " Seasons Archived." }); - } - if (toArchiveEpisodes.length > 0) { - await _sync.updateSingleFieldOnDB("jf_library_episodes", toArchiveEpisodes, "archived", true, "EpisodeId"); - - syncTask.loggedData.push({ color: "orange", Message: toArchiveEpisodes.length + " Episodes Archived." }); - } + if (seasonsToInsert.length !== 0) { + let result = await db.insertBulk("jf_library_seasons", seasonsToInsert, jf_library_seasons_columns); + if (result.Result === "SUCCESS") { + insertSeasonsCount += + syncTask.taskName === taskName.partialsync + ? seasonsToInsert.length + : Math.max(seasonsToInsert.length - existingIdsSeasons.length, 0); + updateSeasonsCount += syncTask.taskName === taskName.partialsync ? 0 : existingIdsSeasons.length; + } else { + syncTask.loggedData.push({ + color: "red", + Message: "Error performing bulk insert:" + result.message, + }); + await logging.updateLog(syncTask.uuid, syncTask.loggedData, taskstate.FAILED); } } } - syncTask.loggedData.push({ - color: "dodgerblue", - Message: `Seasons: ${insertSeasonsCount} Rows Inserted. ${updateSeasonsCount} Rows Updated.`, - }); - syncTask.loggedData.push({ - color: "dodgerblue", - Message: `Episodes: ${insertEpisodeCount} Rows Inserted. ${updateEpisodeCount} Rows Updated.`, - }); - syncTask.loggedData.push({ color: "yellow", Message: "Sync Complete" }); + return { insertSeasonsCount: insertSeasonsCount, updateSeasonsCount: updateSeasonsCount }; } -async function syncItemInfo(seasons_and_episodes, library_items) { - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 5/7" }); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Beginning Item Info Sync (5/7)" }); - syncTask.loggedData.push({ color: "yellow", Message: "Beginning File Info Sync" }); +async function syncEpisodes(episodes) { + const shows = episodes.map((episode) => episode.SeriesId); + let insertEpisodeCount = 0; + let updateEpisodeCount = 0; + + for (const show of shows) { + const existingIdsEpisodes = await db + .query(`SELECT "EpisodeId" FROM public.jf_library_episodes where "SeriesId" = '${show}'`) + .then((res) => res.rows.map((row) => row.EpisodeId)); + + let episodesToInsert = []; + episodesToInsert = await episodes.filter((episode) => episode.SeriesId == show).map(jf_library_episodes_mapping); + + if (syncTask.taskName === taskName.partialsync) { + episodesToInsert = episodesToInsert.filter( + (episode) => !existingIdsEpisodes.some((EpisodeId) => EpisodeId === episode.EpisodeId) + ); + } + + if (episodesToInsert.length !== 0) { + let result = await db.insertBulk("jf_library_episodes", episodesToInsert, jf_library_episodes_columns); + if (result.Result === "SUCCESS") { + insertEpisodeCount += + syncTask.taskName === taskName.partialsync + ? episodesToInsert.length + : Math.max(episodesToInsert.length - existingIdsEpisodes.length, 0); + updateEpisodeCount += syncTask.taskName === taskName.partialsync ? 0 : existingIdsEpisodes.length; + } else { + syncTask.loggedData.push({ + color: "red", + Message: "Error performing bulk insert:" + result.message, + }); + await logging.updateLog(syncTask.uuid, syncTask.loggedData, taskstate.FAILED); + } + } + } + + return { insertEpisodeCount: insertEpisodeCount, updateEpisodeCount: updateEpisodeCount }; +} + +async function archiveSeasonsAndEpisodes(fetchedSeasons, fetchedEpisodes) { + const _sync = new sync(); + const existingIdsSeasons = await db + .query(`SELECT * FROM public.jf_library_seasons where archived=false`) + .then((res) => res.rows.map((row) => row.Id)); + + const existingIdsEpisodes = await db + .query(`SELECT * FROM public.jf_library_episodes where archived=false`) + .then((res) => res.rows.map((row) => row.EpisodeId)); + + // if (syncTask.taskName === taskName.fullsync) { + let toArchiveSeasons = existingIdsSeasons.filter((id) => !fetchedSeasons.some((row) => row === id)); + let toArchiveEpisodes = existingIdsEpisodes.filter((EpisodeId) => !fetchedEpisodes.some((row) => row === EpisodeId)); + + if (toArchiveSeasons.length > 0) { + await _sync.updateSingleFieldOnDB("jf_library_seasons", toArchiveSeasons, "archived", true); + syncTask.loggedData.push({ color: "orange", Message: toArchiveSeasons.length + " Seasons Archived." }); + } + if (toArchiveEpisodes.length > 0) { + await _sync.updateSingleFieldOnDB("jf_library_episodes", toArchiveEpisodes, "archived", true, "EpisodeId"); + + syncTask.loggedData.push({ color: "orange", Message: toArchiveEpisodes.length + " Episodes Archived." }); + } + // } +} + +async function syncItemInfo(seasons_and_episodes, library_items) { let Items = library_items.filter((item) => item.Type !== "Series" && item.Type !== "Folder" && item.Id !== undefined); let Episodes = seasons_and_episodes.filter((item) => item.Type === "Episode" && item.Id !== undefined); @@ -317,16 +297,9 @@ async function syncItemInfo(seasons_and_episodes, library_items) { let updateItemInfoCount = 0; let updateEpisodeInfoCount = 0; - let current_item = 0; - let all_items = Items.length; let data_to_insert = []; //loop for each Movie for (const Item of Items) { - current_item++; - sendUpdate(syncTask.wsKey, { - type: "Update", - message: `Syncing Item Info ${((current_item / all_items) * 100).toFixed(2)}%`, - }); const existingItemInfo = await db .query(`SELECT * FROM public.jf_item_info where "Id" = '${Item.Id}'`) .then((res) => res.rows.map((row) => row.Id)); @@ -343,16 +316,8 @@ async function syncItemInfo(seasons_and_episodes, library_items) { } } - let current_episode = 0; - let all_episodes = Episodes.length; //loop for each Episode for (const Episode of Episodes) { - current_episode++; - sendUpdate(syncTask.wsKey, { - type: "Update", - message: `Syncing Episode Info ${((current_episode / all_episodes) * 100).toFixed(2)}%`, - }); - const existingEpisodeItemInfo = await db .query(`SELECT * FROM public.jf_item_info where "Id" = '${Episode.Id}'`) .then((res) => res.rows.map((row) => row.Id)); @@ -384,27 +349,18 @@ async function syncItemInfo(seasons_and_episodes, library_items) { } } - syncTask.loggedData.push({ - color: "dodgerblue", - Message: - (insertItemInfoCount > 0 ? insertItemInfoCount : 0) + " Item Info inserted. " + updateItemInfoCount + " Item Info Updated", - }); - syncTask.loggedData.push({ - color: "dodgerblue", - Message: - (insertEpisodeInfoCount > 0 ? insertEpisodeInfoCount : 0) + - " Episodes Info inserted. " + - updateEpisodeInfoCount + - " Episodes Info Updated", - }); - syncTask.loggedData.push({ color: "yellow", Message: "Info Sync Complete" }); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Info Sync Complete" }); + return { + insertItemInfoCount: insertItemInfoCount, + updateItemInfoCount: updateItemInfoCount, + insertEpisodeInfoCount: insertEpisodeInfoCount, + updateEpisodeInfoCount: updateEpisodeInfoCount, + }; } async function removeOrphanedData() { const _sync = new sync(); - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 6/7" }); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Cleaning up FileInfo/Episode/Season Records (6/7)" }); + syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 4/4" }); + sendUpdate(syncTask.wsKey, { type: "Update", message: "Cleaning up FileInfo/Episode/Season Records (4/4)" }); syncTask.loggedData.push({ color: "yellow", Message: "Removing Orphaned FileInfo/Episode/Season Records" }); await db.query("CALL jd_remove_orphaned_data()"); @@ -423,14 +379,10 @@ async function removeOrphanedData() { and archived=false`); syncTask.loggedData.push({ color: "dodgerblue", Message: "Orphaned FileInfo/Episode/Season Removed." }); - - syncTask.loggedData.push({ color: "Yellow", Message: "Sync Complete" }); } async function migrateArchivedActivty() { - const _sync = new sync(); - syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 7/7" }); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Migrating Archived Activity to New Items (7/7)" }); + sendUpdate(syncTask.wsKey, { type: "Update", message: "Migrating Archived Activity to New Items" }); syncTask.loggedData.push({ color: "yellow", Message: "Migrating Archived Activity to New Items" }); //Movies @@ -486,8 +438,6 @@ async function migrateArchivedActivty() { } syncTask.loggedData.push({ color: "dodgerblue", Message: "Archived Activity Migrated to New Items Succesfully." }); - - syncTask.loggedData.push({ color: "Yellow", Message: "Sync Complete" }); } async function syncPlaybackPluginData() { @@ -497,7 +447,7 @@ async function syncPlaybackPluginData() { const installed_plugins = await API.getInstalledPlugins(); const hasPlaybackReportingPlugin = installed_plugins.filter( - (plugins) => plugins?.ConfigurationFileName === "Jellyfin.Plugin.PlaybackReporting.xml"//TO-DO Change this to the correct plugin name + (plugins) => plugins?.ConfigurationFileName === "Jellyfin.Plugin.PlaybackReporting.xml" //TO-DO Change this to the correct plugin name ); if (!hasPlaybackReportingPlugin || hasPlaybackReportingPlugin.length === 0) { @@ -589,6 +539,9 @@ async function fullSync(triggertype) { return; } + //syncUserData + await syncUserData(); + let libraries = await API.getLibraries(); if (libraries.length === 0) { syncTask.loggedData.push({ Message: "Error: No Libararies found to sync." }); @@ -602,15 +555,40 @@ async function fullSync(triggertype) { let filtered_libraries = libraries.filter((library) => !excluded_libraries.includes(library.Id)); let existing_excluded_libraries = libraries.filter((library) => excluded_libraries.includes(library.Id)); + //syncLibraryFolders + await syncLibraryFolders(filtered_libraries, existing_excluded_libraries); + + syncTask.loggedData.push({ color: "lawngreen", Message: "Syncing... 3/4" }); + syncTask.loggedData.push({ color: "yellow", Message: "Beginning Media Sync" }); + //clear data from memory as its no longer needed libraries = null; - let data = []; + let fetchedItemIds = []; + let fetchedSeasonIds = []; + let fetchedEpisodeIds = []; + + //item sync counters + let insertedItemsCount = 0; + let updatedItemsCount = 0; + let insertedSeasonsCount = 0; + let updatedSeasonsCount = 0; + let insertedEpisodeCount = 0; + let updatedEpisodeCount = 0; + + //item info sync counters + + let insertItemInfoCount = 0; + let insertEpisodeInfoCount = 0; + let updateItemInfoCount = 0; + let updateEpisodeInfoCount = 0; //for each item in library run get item using that id as the ParentId (This gets the children of the parent id) for (let i = 0; i < filtered_libraries.length; i++) { + let startIndex = 0; + let increment = 200; const item = filtered_libraries[i]; - const wsMessage = "Fetching Data for Library : " + item.Name + ` (${i + 1}/${filtered_libraries.length})`; + const wsMessage = "Syncing Library : " + item.Name + ` (${i + 1}/${filtered_libraries.length})`; sendUpdate(syncTask.wsKey, { type: "Update", message: wsMessage, @@ -621,49 +599,128 @@ async function fullSync(triggertype) { ws: sendUpdate, syncTask: syncTask, wsMessage: wsMessage, + params: { + startIndex: startIndex, + increment: increment, + }, }); - if (libraryItems.length === 0) { - syncTask.loggedData.push({ Message: "Error: No Items found for Library : " + item.Name }); + + while (libraryItems.length != 0) { + if (libraryItems.length === 0 && startIndex === 0) { + syncTask.loggedData.push({ Message: "Error: No Items found for Library : " + item.Name }); + break; + } + + const libraryItemsWithParent = libraryItems.map((items) => ({ + ...items, + ...{ ParentId: item.Id }, + })); + + let library_items = libraryItemsWithParent.filter((item) => ["Movie", "Audio", "Series"].includes(item.Type)); + + let seasons = libraryItemsWithParent.filter((item) => ["Season"].includes(item.Type)); + let episodes = libraryItemsWithParent.filter((item) => ["Episode"].includes(item.Type)); + + if (library_items.length > 0) { + //syncLibraryItems + fetchedItemIds.push(...library_items.map((item) => item.Id)); + let counts = await syncLibraryItems(library_items); + insertedItemsCount += Number(counts.insertedItemsCount); + updatedItemsCount += Number(counts.updatedItemsCount); + } + + if (seasons.length > 0) { + //syncSeasons + fetchedSeasonIds.push(...seasons.map((item) => item.Id)); + let count = await syncSeasons(seasons); + insertedSeasonsCount += Number(count.insertSeasonsCount); + updatedSeasonsCount += Number(count.updateSeasonsCount); + } + if (episodes.length > 0) { + //syncEpisodes + fetchedEpisodeIds.push(...episodes.map((item) => item.Id)); + let count = await syncEpisodes(episodes); + insertedEpisodeCount += Number(count.insertEpisodeCount); + updatedEpisodeCount += Number(count.updateEpisodeCount); + } + + //syncItemInfo + let infoCount = await syncItemInfo([...seasons, ...episodes], library_items); + + insertItemInfoCount += Number(infoCount.insertItemInfoCount); + updateItemInfoCount += Number(infoCount.updateItemInfoCount); + insertEpisodeInfoCount += Number(infoCount.insertEpisodeInfoCount); + updateEpisodeInfoCount += Number(infoCount.updateEpisodeInfoCount); + + //clear data from memory as its no longer needed + library_items = null; + seasons = null; + episodes = null; + + startIndex += increment; + + libraryItems = await API.getItemsFromParentId({ + id: item.Id, + ws: sendUpdate, + syncTask: syncTask, + wsMessage: wsMessage, + params: { + startIndex: startIndex, + increment: increment, + }, + }); } - sendUpdate(syncTask.wsKey, { type: "Update", message: "Mapping Data for Library : " + item.Name }); - - const libraryItemsWithParent = libraryItems.map((items) => ({ - ...items, - ...{ ParentId: item.Id }, - })); - data.push(...libraryItemsWithParent); sendUpdate(syncTask.wsKey, { type: "Update", message: "Data Fetched for Library : " + item.Name }); } - let library_items = data.filter((item) => ["Movie", "Audio", "Series"].includes(item.Type)); - let seasons_and_episodes = data.filter((item) => ["Season", "Episode"].includes(item.Type)); - //clear data from memory as its no longer needed - data = null; + syncTask.loggedData.push({ + color: "dodgerblue", + Message: (insertedItemsCount > 0 ? insertedItemsCount : 0) + " Items inserted. " + updatedItemsCount + " Item Info Updated", + }); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertedSeasonsCount > 0 ? insertedSeasonsCount : 0) + " Seasons inserted. " + updatedSeasonsCount + " Seasons Updated", + }); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertedEpisodeCount > 0 ? insertedEpisodeCount : 0) + + " Episodes inserted. " + + updatedEpisodeCount + + " Episodes Updated", + }); - //syncUserData - await syncUserData(); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertItemInfoCount > 0 ? insertItemInfoCount : 0) + + " Item Info inserted. " + + updateItemInfoCount + + " Item Info Updated", + }); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertEpisodeInfoCount > 0 ? insertEpisodeInfoCount : 0) + + " Episodes Info inserted. " + + updateEpisodeInfoCount + + " Episodes Info Updated", + }); - //syncLibraryFolders - await syncLibraryFolders(filtered_libraries, existing_excluded_libraries); + if (syncTask.taskName === taskName.fullsync) { + //archiveLibraryItems + await archiveLibraryItems(fetchedItemIds); + await archiveSeasonsAndEpisodes(fetchedSeasonIds, fetchedEpisodeIds); + } + + syncTask.loggedData.push({ color: "yellow", Message: "Media Sync Complete" }); //clear data from memory as its no longer needed filtered_libraries = null; existing_excluded_libraries = null; - //syncLibraryItems - await syncLibraryItems(library_items); - - //syncShowItems - await syncShowItems(seasons_and_episodes); - - //syncItemInfo - await syncItemInfo(seasons_and_episodes, library_items); - - //clear data from memory as its no longer needed - library_items = null; - seasons_and_episodes = null; - //removeOrphanedData await removeOrphanedData(); @@ -711,53 +768,168 @@ async function partialSync(triggertype) { const filtered_libraries = libraries.filter((library) => !excluded_libraries.includes(library.Id)); const existing_excluded_libraries = libraries.filter((library) => excluded_libraries.includes(library.Id)); - let data = []; - - //for each item in library run get item using that id as the ParentId (This gets the children of the parent id) - for (let i = 0; i < filtered_libraries.length; i++) { - const library = filtered_libraries[i]; - sendUpdate(syncTask.wsKey, { - type: "Update", - message: "Fetching Data for Library : " + library.Name + ` (${i + 1}/${filtered_libraries.length})`, - }); - let recentlyAddedForLibrary = await API.getRecentlyAdded({ libraryid: library.Id, limit: 10 }); - - sendUpdate(syncTask.wsKey, { type: "Update", message: "Mapping Data for Library : " + library.Name }); - const libraryItemsWithParent = recentlyAddedForLibrary.map((items) => ({ - ...items, - ...{ ParentId: library.Id }, - })); - data.push(...libraryItemsWithParent); - sendUpdate(syncTask.wsKey, { type: "Update", message: "Data Fetched for Library : " + library.Name }); - } - - const library_items = data.filter((item) => ["Movie", "Audio", "Series"].includes(item.Type)); - - for (const item of library_items.filter((item) => item.Type === "Series")) { - let dataForShow = await API.getItemsFromParentId({ id: item.Id }); - const seasons_and_episodes_for_show = dataForShow.filter((item) => ["Season", "Episode"].includes(item.Type)); - data.push(...seasons_and_episodes_for_show); - } - - const seasons_and_episodes = data.filter((item) => ["Season", "Episode"].includes(item.Type)); - - //clear data from memory as its no longer needed - data = null; - // //syncUserData await syncUserData(); // //syncLibraryFolders await syncLibraryFolders(filtered_libraries, existing_excluded_libraries); - //syncLibraryItems - await syncLibraryItems(library_items); + //item sync counters + let insertedItemsCount = 0; + let updatedItemsCount = 0; + let insertedSeasonsCount = 0; + let updatedSeasonsCount = 0; + let insertedEpisodeCount = 0; + let updatedEpisodeCount = 0; - //syncShowItems - await syncShowItems(seasons_and_episodes); + //item info sync counters - //syncItemInfo - await syncItemInfo(seasons_and_episodes, library_items); + let insertItemInfoCount = 0; + let insertEpisodeInfoCount = 0; + let updateItemInfoCount = 0; + let updateEpisodeInfoCount = 0; + + let lastSyncDate = moment().subtract(24, "hours"); + + const last_execution = await db + .query( + `SELECT "DateCreated" + FROM public.jf_library_items + ORDER BY "DateCreated" DESC + LIMIT 1` + ) + .then((res) => res.rows); + if (last_execution.length !== 0) { + lastSyncDate = moment(last_execution[0].DateCreated); + } + + //for each item in library run get item using that id as the ParentId (This gets the children of the parent id) + for (let i = 0; i < filtered_libraries.length; i++) { + let startIndex = 0; + let increment = 200; + + const library = filtered_libraries[i]; + sendUpdate(syncTask.wsKey, { + type: "Update", + message: "Fetching Data for Library : " + library.Name + ` (${i + 1}/${filtered_libraries.length})`, + }); + + const wsMessage = "Syncing Library : " + library.Name + ` (${i + 1}/${filtered_libraries.length})`; + + let libraryItems = await API.getItemsFromParentId({ + id: library.Id, + ws: sendUpdate, + syncTask: syncTask, + wsMessage: wsMessage, + params: { + startIndex: startIndex, + increment: increment, + }, + }); + + libraryItems = libraryItems.filter((item) => moment(item.DateCreated).isAfter(lastSyncDate)); + + while (libraryItems.length != 0) { + if (libraryItems.length === 0 && startIndex === 0) { + syncTask.loggedData.push({ Message: "No New Items found for Library : " + library.Name }); + break; + } + + const libraryItemsWithParent = libraryItems.map((items) => ({ + ...items, + ...{ ParentId: library.Id }, + })); + + let library_items = libraryItemsWithParent.filter((item) => ["Movie", "Audio", "Series"].includes(item.Type)); + + let seasons = libraryItemsWithParent.filter((item) => ["Season"].includes(item.Type)); + let episodes = libraryItemsWithParent.filter((item) => ["Episode"].includes(item.Type)); + + if (library_items.length > 0) { + //syncLibraryItems + + let counts = await syncLibraryItems(library_items); + insertedItemsCount += Number(counts.insertedItemsCount); + updatedItemsCount += Number(counts.updatedItemsCount); + } + + if (seasons.length > 0) { + //syncSeasons + let count = await syncSeasons(seasons); + insertedSeasonsCount += Number(count.insertSeasonsCount); + updatedSeasonsCount += Number(count.updateSeasonsCount); + } + if (episodes.length > 0) { + //syncEpisodes + let count = await syncEpisodes(episodes); + insertedEpisodeCount += Number(count.insertEpisodeCount); + updatedEpisodeCount += Number(count.updateEpisodeCount); + } + + //syncItemInfo + let infoCount = await syncItemInfo([...seasons, ...episodes], library_items); + + insertItemInfoCount += Number(infoCount.insertItemInfoCount); + updateItemInfoCount += Number(infoCount.updateItemInfoCount); + insertEpisodeInfoCount += Number(infoCount.insertEpisodeInfoCount); + updateEpisodeInfoCount += Number(infoCount.updateEpisodeInfoCount); + + //clear data from memory as its no longer needed + library_items = null; + seasons = null; + episodes = null; + + startIndex += increment; + + libraryItems = await API.getItemsFromParentId({ + id: library.Id, + ws: sendUpdate, + syncTask: syncTask, + wsMessage: wsMessage, + params: { + startIndex: startIndex, + increment: increment, + }, + }); + + libraryItems = libraryItems.filter((item) => moment(item.DateCreated).isAfter(lastSyncDate)); + } + } + + syncTask.loggedData.push({ + color: "dodgerblue", + Message: (insertedItemsCount > 0 ? insertedItemsCount : 0) + " Items inserted. " + updatedItemsCount + " Item Info Updated", + }); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertedSeasonsCount > 0 ? insertedSeasonsCount : 0) + " Seasons inserted. " + updatedSeasonsCount + " Seasons Updated", + }); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertedEpisodeCount > 0 ? insertedEpisodeCount : 0) + + " Episodes inserted. " + + updatedEpisodeCount + + " Episodes Updated", + }); + + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertItemInfoCount > 0 ? insertItemInfoCount : 0) + + " Item Info inserted. " + + updateItemInfoCount + + " Item Info Updated", + }); + syncTask.loggedData.push({ + color: "dodgerblue", + Message: + (insertEpisodeInfoCount > 0 ? insertEpisodeInfoCount : 0) + + " Episodes Info inserted. " + + updateEpisodeInfoCount + + " Episodes Info Updated", + }); //removeOrphanedData await removeOrphanedData(); diff --git a/backend/tasks/FullSyncTask.js b/backend/tasks/FullSyncTask.js index 06312cf..337bba5 100644 --- a/backend/tasks/FullSyncTask.js +++ b/backend/tasks/FullSyncTask.js @@ -1,123 +1,113 @@ const db = require("../db"); -const moment = require('moment'); +const moment = require("moment"); const sync = require("../routes/sync"); -const taskName=require('../logging/taskName'); +const taskName = require("../logging/taskName"); const taskstate = require("../logging/taskstate"); const triggertype = require("../logging/triggertype"); async function FullSyncTask() { - try{ - + try { await db.query( - `UPDATE jf_logging SET "Result"='${taskstate.FAILED}' WHERE "Name"='${taskName.fullsync}' AND "Result"='${taskstate.RUNNING}'` - ); - } - catch(error) - { - console.log('Error Cleaning up Sync Tasks: '+error); - } + `UPDATE jf_logging SET "Result"='${taskstate.FAILED}' WHERE "Name"='${taskName.fullsync}' AND "Result"='${taskstate.RUNNING}'` + ); + } catch (error) { + console.log("Error Cleaning up Sync Tasks: " + error); + } -let interval=10000; + let interval = 10000; -let taskDelay=1440; //in minutes + let taskDelay = 1440; //in minutes + async function fetchTaskSettings() { + try { + //get interval from db + const settingsjson = await db.query('SELECT settings FROM app_config where "ID"=1').then((res) => res.rows); + if (settingsjson.length > 0) { + const settings = settingsjson[0].settings || {}; -async function fetchTaskSettings() -{ - try{//get interval from db + let synctasksettings = settings.Tasks?.JellyfinSync || {}; - - const settingsjson = await db - .query('SELECT settings FROM app_config where "ID"=1') - .then((res) => res.rows); - - if (settingsjson.length > 0) { - const settings = settingsjson[0].settings || {}; - - let synctasksettings = settings.Tasks?.JellyfinSync || {}; - - if (synctasksettings.Interval) { - taskDelay=synctasksettings.Interval; - } else { - synctasksettings.Interval=taskDelay; - - if(!settings.Tasks) - { - settings.Tasks = {}; + if (synctasksettings.Interval) { + taskDelay = synctasksettings.Interval; + } else { + synctasksettings.Interval = taskDelay; + + if (!settings.Tasks) { + settings.Tasks = {}; + } + if (!settings.Tasks.JellyfinSync) { + settings.Tasks.JellyfinSync = {}; + } + settings.Tasks.JellyfinSync = synctasksettings; + + let query = 'UPDATE app_config SET settings=$1 where "ID"=1'; + + await db.query(query, [settings]); } - if(!settings.Tasks.JellyfinSync) - { - settings.Tasks.JellyfinSync = {}; - } - settings.Tasks.JellyfinSync = synctasksettings; - - - let query = 'UPDATE app_config SET settings=$1 where "ID"=1'; - - await db.query(query, [settings]); } - - + } catch (error) { + console.log("Sync Task Settings Error: " + error); } } - catch(error) - { - console.log('Sync Task Settings Error: '+error); - } -} + async function intervalCallback() { + clearInterval(intervalTask); + try { + let current_time = moment(); + const { rows: config } = await db.query('SELECT * FROM app_config where "ID"=1'); - -async function intervalCallback() { - clearInterval(intervalTask); - try{ - let current_time = moment(); - const { rows: config } = await db.query( - 'SELECT * FROM app_config where "ID"=1' - ); - - if (config.length===0 || config[0].JF_HOST === null || config[0].JF_API_KEY === null) - { + if (config.length === 0 || config[0].JF_HOST === null || config[0].JF_API_KEY === null) { return; - } - - - const last_execution=await db.query( `SELECT "TimeRun","Result" + } + + const last_execution = await db + .query( + `SELECT "TimeRun","Result" FROM public.jf_logging WHERE "Name"='${taskName.fullsync}' ORDER BY "TimeRun" DESC - LIMIT 1`).then((res) => res.rows); - if(last_execution.length!==0) - { + LIMIT 1` + ) + .then((res) => res.rows); + + const last_execution_partialSync = await db + .query( + `SELECT "TimeRun","Result" + FROM public.jf_logging + WHERE "Name"='${taskName.partialsync}' + AND "Result"='${taskstate.RUNNING}' + ORDER BY "TimeRun" DESC + LIMIT 1` + ) + .then((res) => res.rows); + if (last_execution.length !== 0) { await fetchTaskSettings(); - let last_execution_time = moment(last_execution[0].TimeRun).add(taskDelay, 'minutes'); + let last_execution_time = moment(last_execution[0].TimeRun).add(taskDelay, "minutes"); - if(!current_time.isAfter(last_execution_time) || last_execution[0].Result ===taskstate.RUNNING) - { - intervalTask = setInterval(intervalCallback, interval); - return; + if ( + !current_time.isAfter(last_execution_time) || + last_execution[0].Result === taskstate.RUNNING || + last_execution_partialSync.length > 0 + ) { + intervalTask = setInterval(intervalCallback, interval); + return; } + } + + console.log("Running Scheduled Sync"); + await sync.fullSync(triggertype.Automatic); + console.log("Scheduled Sync Complete"); + } catch (error) { + console.log(error); + return []; } - - console.log('Running Scheduled Sync'); - await sync.fullSync(triggertype.Automatic); - console.log('Scheduled Sync Complete'); - - } catch (error) - { - console.log(error); - return []; - } - - intervalTask = setInterval(intervalCallback, interval); + intervalTask = setInterval(intervalCallback, interval); } -let intervalTask = setInterval(intervalCallback, interval); - - + let intervalTask = setInterval(intervalCallback, interval); } module.exports = { diff --git a/backend/tasks/RecentlyAddedItemsSyncTask.js b/backend/tasks/RecentlyAddedItemsSyncTask.js index 6a626f5..4c83eee 100644 --- a/backend/tasks/RecentlyAddedItemsSyncTask.js +++ b/backend/tasks/RecentlyAddedItemsSyncTask.js @@ -14,7 +14,7 @@ async function RecentlyAddedItemsSyncTask() { console.log("Error Cleaning up Sync Tasks: " + error); } - let interval = 10000; + let interval = 11000; let taskDelay = 60; //in minutes @@ -71,11 +71,26 @@ async function RecentlyAddedItemsSyncTask() { LIMIT 1` ) .then((res) => res.rows); + + const last_execution_FullSync = await db + .query( + `SELECT "TimeRun","Result" + FROM public.jf_logging + WHERE "Name"='${taskName.fullsync}' + AND "Result"='${taskstate.RUNNING}' + ORDER BY "TimeRun" DESC + LIMIT 1` + ) + .then((res) => res.rows); if (last_execution.length !== 0) { await fetchTaskSettings(); let last_execution_time = moment(last_execution[0].TimeRun).add(taskDelay, "minutes"); - if (!current_time.isAfter(last_execution_time) || last_execution[0].Result === taskstate.RUNNING) { + if ( + !current_time.isAfter(last_execution_time) || + last_execution[0].Result === taskstate.RUNNING || + last_execution_FullSync.length > 0 + ) { intervalTask = setInterval(intervalCallback, interval); return; }