mirror of
https://github.com/BreizhHardware/Jellystat.git
synced 2026-01-18 16:27:20 +01:00
272 lines
7.8 KiB
JavaScript
272 lines
7.8 KiB
JavaScript
const { Pool } = require("pg");
|
|
const pgp = require("pg-promise")();
|
|
const { update_query: update_query_map } = require("./models/bulk_insert_update_handler");
|
|
|
|
const _POSTGRES_USER = process.env.POSTGRES_USER;
|
|
const _POSTGRES_PASSWORD = process.env.POSTGRES_PASSWORD;
|
|
const _POSTGRES_IP = process.env.POSTGRES_IP;
|
|
const _POSTGRES_PORT = process.env.POSTGRES_PORT;
|
|
const _POSTGRES_DATABASE = process.env.POSTGRES_DB || "jfstat";
|
|
const _POSTGRES_SSL_REJECT_UNAUTHORIZED = process.env.POSTGRES_SSL_REJECT_UNAUTHORIZED === undefined ? true : process.env.POSTGRES_SSL_REJECT_UNAUTHORIZED === "true";
|
|
|
|
if ([_POSTGRES_USER, _POSTGRES_PASSWORD, _POSTGRES_IP, _POSTGRES_PORT].includes(undefined)) {
|
|
console.log("Error: Postgres details not defined");
|
|
return;
|
|
}
|
|
|
|
const pool = new Pool({
|
|
user: _POSTGRES_USER,
|
|
host: _POSTGRES_IP,
|
|
database: _POSTGRES_DATABASE,
|
|
password: _POSTGRES_PASSWORD,
|
|
port: _POSTGRES_PORT,
|
|
max: 20, // Maximum number of connections in the pool
|
|
idleTimeoutMillis: 30000, // Close idle clients after 30 seconds
|
|
connectionTimeoutMillis: 2000, // Return an error after 2 seconds if connection could not be established
|
|
...(process.env.POSTGRES_SSL_ENABLED === "true"
|
|
? { ssl: { rejectUnauthorized: _POSTGRES_SSL_REJECT_UNAUTHORIZED } }
|
|
: {})
|
|
});
|
|
|
|
pool.on("error", (err, client) => {
|
|
console.error("Unexpected error on idle client", err);
|
|
return;
|
|
//process.exit(-1);
|
|
});
|
|
|
|
async function deleteBulk(table_name, data, pkName) {
|
|
const client = await pool.connect();
|
|
let result = "SUCCESS";
|
|
let message = "";
|
|
try {
|
|
await client.query("BEGIN");
|
|
|
|
if (data && data.length !== 0) {
|
|
const deleteQuery = {
|
|
text: `DELETE FROM ${table_name} WHERE "${pkName ?? "Id"}" IN (${pgp.as.csv(data)})`,
|
|
};
|
|
// console.log(deleteQuery);
|
|
await client.query(deleteQuery);
|
|
}
|
|
|
|
await client.query("COMMIT");
|
|
message = data.length + " Rows removed.";
|
|
|
|
if (table_name === "jf_playback_activity") {
|
|
for (const view of materializedViews) {
|
|
refreshMaterializedView(view);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
await client.query("ROLLBACK");
|
|
message = "Bulk delete error: " + error;
|
|
result = "ERROR";
|
|
} finally {
|
|
client.release();
|
|
}
|
|
return { Result: result, message: "" + message };
|
|
}
|
|
|
|
function formatForCsv(value) {
|
|
if (typeof value === "number") {
|
|
return value.toString();
|
|
}
|
|
return value;
|
|
}
|
|
|
|
async function updateSingleFieldBulk(table_name, data, field_name, new_value, where_field) {
|
|
const client = await pool.connect();
|
|
let result = "SUCCESS";
|
|
let message = "";
|
|
if (where_field === undefined || where_field === null || where_field === "") {
|
|
where_field = "Id";
|
|
}
|
|
try {
|
|
await client.query("BEGIN");
|
|
|
|
if (data && data.length !== 0) {
|
|
data = data.map((item) => formatForCsv(item));
|
|
const updateQuery = {
|
|
text: `UPDATE ${table_name} SET "${field_name}"='${new_value}' WHERE "${where_field}" IN (${pgp.as.csv(data)})`,
|
|
};
|
|
message = updateQuery.text;
|
|
console.log(updateQuery.text);
|
|
await client.query(updateQuery);
|
|
}
|
|
|
|
await client.query("COMMIT");
|
|
message = data.length + " Rows updated.";
|
|
} catch (error) {
|
|
error.query = message;
|
|
console.log(error);
|
|
await client.query("ROLLBACK");
|
|
message = "Bulk update error: " + error;
|
|
result = "ERROR";
|
|
} finally {
|
|
client.release();
|
|
}
|
|
return { Result: result, message: "" + message };
|
|
}
|
|
|
|
const materializedViews = ["js_latest_playback_activity", "js_library_stats_overview"];
|
|
|
|
async function refreshMaterializedView(view_name) {
|
|
const client = await pool.connect();
|
|
let result = "SUCCESS";
|
|
let message = "";
|
|
try {
|
|
await client.query("BEGIN");
|
|
|
|
const refreshQuery = {
|
|
text: `REFRESH MATERIALIZED VIEW ${view_name}`,
|
|
};
|
|
await client.query(refreshQuery);
|
|
|
|
await client.query("COMMIT");
|
|
message = view_name + " refreshed.";
|
|
} catch (error) {
|
|
await client.query("ROLLBACK");
|
|
message = "Refresh materialized view error: " + error;
|
|
result = "ERROR";
|
|
} finally {
|
|
client.release();
|
|
}
|
|
return { Result: result, message: "" + message };
|
|
}
|
|
|
|
async function insertBulk(table_name, data, columns) {
|
|
//dedupe data
|
|
|
|
if (Array.isArray(data)) {
|
|
data = data.reduce((accumulator, currentItem) => {
|
|
const isNotDuplicate = !accumulator.some((item) =>
|
|
currentItem.Id ? item.Id === currentItem.Id : item.rowid === currentItem.rowid
|
|
);
|
|
|
|
if (isNotDuplicate) {
|
|
accumulator.push(currentItem);
|
|
}
|
|
|
|
return accumulator;
|
|
}, []);
|
|
}
|
|
|
|
//
|
|
const client = await pool.connect();
|
|
let result = "SUCCESS";
|
|
let message = "";
|
|
try {
|
|
await client.query("BEGIN");
|
|
const update_query = update_query_map.find((query) => query.table === table_name).query;
|
|
const cs = new pgp.helpers.ColumnSet(columns, { table: table_name });
|
|
const query = pgp.helpers.insert(data, cs) + update_query; // Update the column names accordingly
|
|
await client.query(query);
|
|
await client.query("COMMIT");
|
|
|
|
if (table_name === "jf_playback_activity") {
|
|
for (const view of materializedViews) {
|
|
refreshMaterializedView(view);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
await client.query("ROLLBACK");
|
|
message = "" + error;
|
|
result = "ERROR";
|
|
} finally {
|
|
client.release();
|
|
}
|
|
return {
|
|
Result: result,
|
|
message: message ? "Bulk insert error: " + message : "",
|
|
};
|
|
}
|
|
|
|
function isNumeric(value) {
|
|
return !isNaN(value) && !isNaN(parseFloat(value));
|
|
}
|
|
|
|
async function query(text, params, refreshViews = false) {
|
|
try {
|
|
const result = await pool.query(text, params);
|
|
|
|
if (refreshViews) {
|
|
for (const view of materializedViews) {
|
|
refreshMaterializedView(view);
|
|
}
|
|
}
|
|
|
|
const skippedColumns = [
|
|
"Name",
|
|
"NowPlayingItemName",
|
|
"SeriesName",
|
|
"SeasonName",
|
|
"Id",
|
|
"NowPlayingItemId",
|
|
"ParentId",
|
|
"SeriesId",
|
|
"SeasonId",
|
|
"EpisodeId",
|
|
"ServerId",
|
|
];
|
|
// Convert integer fields in the result rows
|
|
const convertedRows = result.rows.map((row) => {
|
|
return Object.keys(row).reduce((acc, key) => {
|
|
const value = row[key];
|
|
if (skippedColumns.includes(key)) {
|
|
acc[key] = value; // Keep the original value for skipped columns
|
|
return acc; // Skip the rowid field
|
|
}
|
|
|
|
// Convert numeric strings to integers if applicable
|
|
acc[key] = isNumeric(value) ? parseInt(value, 10) : value;
|
|
return acc;
|
|
}, {});
|
|
});
|
|
|
|
result.rows = convertedRows;
|
|
return result;
|
|
} catch (error) {
|
|
if (error?.routine === "auth_failed") {
|
|
console.log("Error 401: Unable to Authenticate with Postgres DB");
|
|
} else if (error?.code === "ENOTFOUND") {
|
|
console.log("Error: Unable to Connect to Postgres DB");
|
|
} else if (error?.code === "ERR_SOCKET_BAD_PORT") {
|
|
console.log("Error: Invalid Postgres DB Port Range. Port should be >= 0 and < 65536.");
|
|
} else if (error?.code === "ECONNREFUSED") {
|
|
console.log("Error: Postgres DB Connection refused at " + error.address + ":" + error.port);
|
|
} else {
|
|
if (error.message && !error.message.includes('database "' + _POSTGRES_DATABASE + '" does not exist')) {
|
|
console.error("[JELLYSTAT]: Error occurred while executing query:", error.message);
|
|
}
|
|
}
|
|
return [];
|
|
// throw error;
|
|
}
|
|
}
|
|
|
|
async function querySingle(sql, params) {
|
|
try {
|
|
const { rows: results } = await query(sql, params);
|
|
if (results.length > 0) {
|
|
return results[0];
|
|
} else {
|
|
return null;
|
|
}
|
|
} catch (error) {
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
module.exports = {
|
|
pool: pool,
|
|
query: query,
|
|
deleteBulk: deleteBulk,
|
|
insertBulk: insertBulk,
|
|
updateSingleFieldBulk: updateSingleFieldBulk,
|
|
querySingle: querySingle,
|
|
refreshMaterializedView: refreshMaterializedView,
|
|
materializedViews: materializedViews,
|
|
|
|
// initDB: initDB,
|
|
};
|