hs.hsadmin.ng/sql/historization.sql
Michael Hoennig ad04faa21d cleanup-todos ()
Co-authored-by: Michael Hoennig <michael@hoennig.de>
Reviewed-on: 
Reviewed-by: Timotheus Pokorra <timotheus.pokorra@hostsharing.net>
2024-04-02 13:14:46 +02:00

167 lines
6.0 KiB
PL/PgSQL

-- ========================================================
-- Historization
-- --------------------------------------------------------
CREATE TABLE "tx_history" (
"tx_id" BIGINT NOT NULL UNIQUE,
"tx_timestamp" TIMESTAMP NOT NULL,
"user" VARCHAR(64) NOT NULL, -- references postgres user
"task" VARCHAR NOT NULL
);
CREATE TYPE "operation" AS ENUM ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE');
-- see https://www.postgresql.org/docs/current/plpgsql-trigger.html
CREATE OR REPLACE FUNCTION historicize()
RETURNS trigger
LANGUAGE plpgsql STRICT AS $$
DECLARE
currentUser VARCHAR(63);
currentTask VARCHAR(127);
"row" RECORD;
"alive" BOOLEAN;
"sql" varchar;
BEGIN
-- determine user_id
BEGIN
currentUser := current_setting('hsadminng.currentUser');
EXCEPTION WHEN OTHERS THEN
currentUser := NULL;
END;
IF (currentUser IS NULL OR currentUser = '') THEN
RAISE EXCEPTION 'hsadminng.currentUser must be defined, please use "SET LOCAL ...;"';
END IF;
RAISE NOTICE 'currentUser: %', currentUser;
-- determine task
currentTask = current_setting('hsadminng.currentTask');
assert currentTask IS NOT NULL AND length(currentTask) >= 12,
format('hsadminng.currentTask (%s) must be defined and min 12 characters long, please use "SET LOCAL ...;"', currentTask);
assert length(currentTask) <= 127,
format('hsadminng.currentTask (%s) must not be longer than 127 characters"', currentTask);
IF (TG_OP = 'INSERT') OR (TG_OP = 'UPDATE') THEN
"row" := NEW;
"alive" := TRUE;
ELSE -- DELETE or TRUNCATE
"row" := OLD;
"alive" := FALSE;
END IF;
sql := format('INSERT INTO tx_history VALUES (txid_current(), now(), %1L, %2L) ON CONFLICT DO NOTHING', currentUser, currentTask);
RAISE NOTICE 'sql: %', sql;
EXECUTE sql;
sql := format('INSERT INTO %3$I_versions VALUES (DEFAULT, txid_current(), %1$L, %2$L, $1.*)', TG_OP, alive, TG_TABLE_NAME);
RAISE NOTICE 'sql: %', sql;
EXECUTE sql USING "row";
RETURN "row";
END; $$;
CREATE OR REPLACE PROCEDURE create_historical_view(baseTable varchar)
LANGUAGE plpgsql AS $$
DECLARE
createTriggerSQL varchar;
viewName varchar;
versionsTable varchar;
createViewSQL varchar;
baseCols varchar;
BEGIN
viewName = quote_ident(format('%s_hv', baseTable));
versionsTable = quote_ident(format('%s_versions', baseTable));
baseCols = (SELECT string_agg(quote_ident(column_name), ', ')
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = baseTable);
createViewSQL = format(
'CREATE OR REPLACE VIEW %1$s AS' ||
'(' ||
' SELECT %2$s' ||
' FROM %3$s' ||
' WHERE alive = TRUE' ||
' AND version_id IN' ||
' (' ||
' SELECT max(vt.version_id) AS history_id' ||
' FROM %3$s AS vt' ||
' JOIN tx_history as txh ON vt.tx_id = txh.tx_id' ||
' WHERE txh.tx_timestamp <= current_setting(''hsadminng.timestamp'')::timestamp' ||
' GROUP BY id' ||
' )' ||
')',
viewName, baseCols, versionsTable
);
RAISE NOTICE 'sql: %', createViewSQL;
EXECUTE createViewSQL;
createTriggerSQL = 'CREATE TRIGGER ' || baseTable || '_historicize' ||
' AFTER INSERT OR DELETE OR UPDATE ON ' || baseTable ||
' FOR EACH ROW EXECUTE PROCEDURE historicize()';
RAISE NOTICE 'sql: %', createTriggerSQL;
EXECUTE createTriggerSQL;
END; $$;
CREATE OR REPLACE PROCEDURE create_historicization(baseTable varchar)
LANGUAGE plpgsql AS $$
DECLARE
createHistTableSql varchar;
createTriggerSQL varchar;
viewName varchar;
versionsTable varchar;
createViewSQL varchar;
baseCols varchar;
BEGIN
-- create the history table
createHistTableSql = '' ||
'CREATE TABLE ' || baseTable || '_versions (' ||
' version_id serial PRIMARY KEY,' ||
' tx_id bigint NOT NULL REFERENCES tx_history(tx_id),' ||
' trigger_op operation NOT NULL,' ||
' alive boolean not null,' ||
' LIKE ' || baseTable ||
' EXCLUDING CONSTRAINTS' ||
' EXCLUDING STATISTICS' ||
')';
RAISE NOTICE 'sql: %', createHistTableSql;
EXECUTE createHistTableSql;
-- create the historical view
viewName = quote_ident(format('%s_hv', baseTable));
versionsTable = quote_ident(format('%s_versions', baseTable));
baseCols = (SELECT string_agg(quote_ident(column_name), ', ')
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = baseTable);
createViewSQL = format(
'CREATE OR REPLACE VIEW %1$s AS' ||
'(' ||
' SELECT %2$s' ||
' FROM %3$s' ||
' WHERE alive = TRUE' ||
' AND version_id IN' ||
' (' ||
' SELECT max(vt.version_id) AS history_id' ||
' FROM %3$s AS vt' ||
' JOIN tx_history as txh ON vt.tx_id = txh.tx_id' ||
' WHERE txh.tx_timestamp <= current_setting(''hsadminng.timestamp'')::timestamp' ||
' GROUP BY id' ||
' )' ||
')',
viewName, baseCols, versionsTable
);
RAISE NOTICE 'sql: %', createViewSQL;
EXECUTE createViewSQL;
createTriggerSQL = 'CREATE TRIGGER ' || baseTable || '_historicize' ||
' AFTER INSERT OR DELETE OR UPDATE ON ' || baseTable ||
' FOR EACH ROW EXECUTE PROCEDURE historicize()';
RAISE NOTICE 'sql: %', createTriggerSQL;
EXECUTE createTriggerSQL;
END; $$;