One more triggers use case or how to implement asynchronous data load in PostgreSQL

By
Anastasia Gorbatenko
July 25, 2017

keenicon / Pixabay

Intro

Through experience, I have learned to avoid using triggers in my development projects where possible. Focusing on their disadvantages rather than benefits coming from their careful use. Mainly because triggers are always hidden from developer’s sight, sometimes with a hard to predict behaviour and difficulty with traceability especially if you are trying to implement business logic or work with a legacy code.
However, I have found triggers being an indispensable part of the recent project.
As I mentioned in my last blog Why PostgreSQL might be a good choice for your project? Postgres RDS instance running in AWS was our prime choice for the project – config driven data storage.
As Postgres RDS does not include support for pgAgent (basic scheduling agent) extension the options for implementing an event based scheduler were either developing an external scheduling mechanism or use of triggers to invoke tasks execution to be handled by the database. The first option appeared to be time-consuming. The second one sounded promising.
Below I will cover the approach for the asynchronous load capability I have implemented with the help of triggers.

Triggers in Postgres

A trigger is automatically performed when a specified database event occurs. In Postgres triggers can be created for both tables and views and fired  BEFORE, AFTER or INSTEAD OF a DML operation for each row or for each statement (frequently related to DDL events).
However, unlike other RDBMS Postgres implementation has its own peculiarities.
First, you need to create a trigger function with no parameters and returning special type trigger as an output parameter. Trigger arguments can be passed via an array of text parameters TG_ARGV  and accessible in the function. Simple.
Having done the above you are ready to bind the function to a table/view in CREATE TRIGGER statement.
And voila!
To learn more about PostrgeSQL triggers visit Trigger procedures and Overview of trigger behaviour.

Design

Assumptions

An external loader puts data into source table. The data should be brought through to the storage being cleaned and transformed on the way.
A config-driven data engine assumes load and transformation tasks split into steps in a reusable manner.

Definitions

Below are the definitions of the terminology I have used to explain the process.
The schedule is utilised to plan the process of data transformation and loading for a particular table and consist of the tasks. Each task reflects the operation needed to be performed on data. The operation might be either a stored function or a block of code and stored in a config DB. As transformation can be a continuous process, it might consist of several steps. One operation – one step. Tasks for particular schedule are ordered and grouped in task hierarchy and executed one by one.
A job is an act of loading a table, assigned to execute tasks for a single load.
The queue contains a list of tasks for an execution for a particular schedule job. During the execution, a task can change its state from QUEUED to PROCESSING then DONE or FAILED.

How does all this work together?

A procedure to initiate DB load is called externally after a source table load has finished. It queues valid tasks for execution and updates the first task status to DONE. By definition, the very first task on the schedule is always a source load task which triggers consecutive execution of other queued tasks. After all the tasks have been completed successfully the queue for the particular job gets cleaned and tasks are moved to queue history.
If one of the tasks fails during the execution the job stops. You won’t be able to proceed with other loads (for a particular schedule) until you resolve issues with the failing task.
You can even work with frequently arriving data this way. As all consecutive tasks for other load jobs for a particular schedule will be queued until the previous jobs have executed.
So…from technical point of view to implement the described process I managed to get by following.
Creating a trigger on the Queue table and a related trigger function to initiate a job schedule run.

CREATE FUNCTION init_schedule_run () RETURNS trigger
LANGUAGE plpgsql
AS $$
DECLARE
 result_v INT;
BEGIN
 SELECT run_next_schedule_task(new.job_id, new.schedule_key,  new.schedule_task_key)
 INTO result_v;
END $$CREATE TRIGGER queue_tgu
AFTER UPDATE ON queue
FOR EACH ROW EXECUTE PROCEDURE init_schedule_run ();

Scripting load_status_update procedure to queue tasks for a schedule.

CREATE FUNCTION load_status_update (table_schema_in character varying, table_name_in character varying) RETURNS void
AS $$
BEGIN
<<logic>>
END
$$

Crafting one more function to identify and execute the next task from the schedule.

CREATE FUNCTION run_next_schedule_task (job_id integer, schedule_key integer, parent_task_key integer) RETURNS integer
AS $$
BEGIN
<<logic>>
END
$$

I have found this way of doing things are stable and provides a good degree of parallelism when processing data. This approach saved a lot of time on development of an external scheduling tool. However, there is always a room for improvement and the next step will be working on a schedule restart part and developing a proper logging system using dblink extension for imitating sub transactions.
Happy scheduling!
Anastasia

Anastasia blogs about Data Vault and Business Intelligence for both technical and non-technical people.

Connect with Anastasia on LinkedIn or read her other blogs here.

Copyright © 2019 OptimalBI LTD.