An Application For Monitoring Database Table Updates Written with Postgres and Python


Scott Langley

SCHARP / Fred Hutchinson Cancer Research Center



Why this application?


  At SCHARP (Statistical Center for HIV/AIDS Research & Prevention) our Finance Department wanted the ability to be notified when changes are made related to the employee, project, and grant tables so that they can know what new information they should take action on and when.

These tables are modified by groups outside of the Finance Department that don't always communicate what they've changed in an accurate and timely manner.  Some of the data tables are imported nightly from a Microsoft SQL Server that we at SCHARP have little or no control over.


Project Proposal - Send notifications to RSS feeds when certain database changes are detected

Scott Langley of Xapps is working on this project for Ben Robinson and Doug Lowe of Finance.

The more general and powerful solution using an ActiveDatabase or emulating the functionality of the TriggerMan program has been shelved as being too time consuming.

What is now being implemented is a smaller-scale solution customized to the particular needs of the Finance department. It is being implemented in two stages:

  1. Monitoring any changes made to selected database tables.

  2. Monitoring higher-level changes that answer specific questions that involve relationships between two or more tables





Components of the System

  1. Our PostgreSQL 9.0 Database
  2. A set of application database tables for holding information about what tables are being monitored and what changes have been detected.
  3. Server-side functions written in PL/Perl and PL/pgSQL.
  4. Python scripts for calling the server-side functions, updating the application tables, and notifying users of data updates.
  5. RSS (Really Simple Syndication)
    How our Finance Department wished to be notified about data updates
  6. Generated HTML pages
    To display the changes in a handy and readable format.
  7. Email-to-RSS Perl script.
    An existing system we had for updating RSS feeds with the contents of specially formatted emails.
  8. Cron scripts to:    
    Make nightly snapshots of the data tables to be monitored.  
    Execute the main Python script of the application.




Postgres





Implementation Choices for PostgreSQL Event Generation






Materialized Views




Application Tables



DbVisualizer Diagram

Diagram with RSS tables, too




Server-Side Functions

New type:    polling_record_differences_result

setup_change_polling_for_table_pg()
change_polling_record_differences_perl()





Python Scripts



Why Python?





Components used:


    Python 2.6.3
        http://www.python.org

    Python plugin for Eclipse - PyDev
        Supports Python editing and debugging in the Eclipse IDE
        http://pydev.org/

    pg_foundation
        A python add-on package utilized for:
            Standard PostgreSQL optparse options
            .pgpass parser
        http://python.projects.postgresql.org/old/project/pg_foundation.html

    Python DB API 2.0.
        The standard Python Database Application Programming Inferface
        http://wiki.python.org/moin/UsingDbApiWithPostgres
        http://www.python.org/dev/peps/pep-0249/

    psycopg2
        A Python Database Driver - implementing the Python DB API 2.0
        Mostly written in C and wraps the Postgres libpq library.
        http://initd.org/psycopg/
        http://initd.org/psycopg/features/



Configure the Database Connection to Postgres


1. Get any environment  variables

    Such as the current user.

    import os
            environ = os.environ


2. Modify some environment variables

            environ['PGHOST'] = config.databaseHostName            
            environ['PGDATABASE'] = config.databaseName
            environ['PGPORT'] = config.databasePort

3. Look up the password from a .pgpass file.

import postgresql.utility.client.option as pg_opt
import postgresql.utility.client.config as cl_config
# From the  pg_foundation module

defaultParser = pg_opt.DefaultParser()

commandlineOptions, extraCommandlineArguments = defaultParser.parse_args()

connectionConfigDict = client_config.create(commandlineOptions, environ)

myPassword = connectionConfigDict['password']

4. Open up a database connection

import psycopg2
import psycopg2.extras

self.db = psycopg2.connect("dbname=" + databaseName + " user=" + myUser + " host=" + databaseHostName + " password=" + myPassword)



Run a SELECT query and return one row


        dbQuery = "SELECT * FROM nextval('tools.change_polling_processing_log_run_instance_id_seq')"
        try:
            cursor = self.db.cursor()
            cursor.execute(dbQuery)
            results = cursor.fetchone()[0]
         
            self.db.commit()
            logger.debug("RunInstance="+repr(results))
            cursor.close()
        except psycopg2.DatabaseError:
            self.db.rollback()  
            raise     
        return results

Insert one row


def writeError(self, runInstance, monitoredTable, errMsg):
        dbQuery = "INSERT INTO tools.change_polling_error_log(\
            monitored_table_id, mat_views_id, run_instance_id, \
            event_time, error_message) VALUES ("\
            + str(monitoredTable.monitored_table_id) + ","\
            + str(monitoredTable.mat_views_id) + ","\
            + str(runInstance) + ","\
            + "clock_timestamp()" + ","\
            + escapeSql(str(errMsg)) + ")"

Update one row


def updateLastRunForNagios(self):
cursor.execute("UPDATE ist.replication_status_cron SET replication_time = now() \
  WHERE server_name = 'db.scharp.org' AND database_name = 'main' \
  AND schema_name = 'finance_database_change_monitoring'")  



Get Multiple Rows with a DictCursor


What is a Python Dictionary?

Like an associative array or map.

That is, it's an unordered set of key: value pairs
 
actors = {'John': 'Wayne', 'Humphrey': 'Bogart'}
print actors['John']
>>> Wayne

Use a psyco2g dictionary-like-cursor


    def getResults(self, dbQuery):
        try:
            cursor = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
            cursor.execute(dbQuery)
            results = cursor.fetchall()
            self.db.commit()
            cursor.close()
        except psycopg2.DatabaseError:
            self.db.rollback()  
            raise  
        return results

Where fetchall() does:

    Fetch all (remaining) rows of a query result, returning them as a list of tuples

Use the results

for row in results:
     event_text = row['event_text']

Another example:

third_column_of_third_row = results[2][2]



RSS & HTML

Generated via Python scripts. 

Some samples:




Cron Jobs

Executed daily before 6 AM.

1. Call a script that executes this server-side Postgres function:
   
cs_refresh_mviews()  - which materializes a fresh view for each record in the cs_materialized_views table while keeping around a copy of the view materialized the day before.

2. Execute this script:

rss/pydbmonrss/pollingDataChangeRSSEmailer.py - Runs all the python operations to compare today's and yesterday's materialized view, record changes, and notify the appropriate users of the changes.




Actual Project Status Document

Future Enhancements


  1. Implement Phase 2 of the project, Database-to-RSS Notifications in response to Custom SQL queries / Complex-logic Triggers.

  2. Use a trigger mechanism instead of a poll-and-compare mechanism to detect data changes.

  3. Provide a user-interface for users to control more precisely what they want to monitor and when.

  4. Allow notifications to be delivered via other mechanisms besides RSS & HTML. (And get rid of the email-to-rss.pl script which has proven to be the thing that breaks the most often.)

  5. Display other data besides or in addition to the actual table row changes.

  6. Adapt to changing table definitions
 




Other Database Change Notification Software and Techniques


The Database Alert Problem

Database Change Notification Tools

TriggerMan and Vigilert (Database Alerting Software)




Other Features of Psycopg2


COPY Command Support
    copy_from() and copy_to() wrap the Postgres COPY command for moving database records to and from files.
    http://initd.org/psycopg/docs/usage.html#using-copy-to-and-copy-from

Server-side cursors
    For fetching query results in batches in order not to bring all results into memory at once
    http://initd.org/psycopg/docs/usage.html#server-side-cursors

Two-Phase Commit protocol support
    Useful for distributed transactions
    http://initd.org/psycopg/docs/usage.html#two-phase-commit-protocol-support

Asynchronous notifications
    Using the Postgres LISTEN and NOTIFY commands to receive notifications of activities in other database sessions.
    http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications

Support for Postgres Large Objects
    Methods for reading and writing large objects to/from Postgres as strings or files.
    http://initd.org/psycopg/docs/usage.html#access-to-postgresql-large-objects



Other Python database packages

http://wiki.python.org/moin/UsingDbApiWithPostgres#python-interfaces-to-postgresql

pyPgSQL
http://pypgsql.sourceforge.net/
    Kind of stale.  Last release was in 2006

PyGreSQL
http://www.pygresql.org/
Another popular choice

py-postgresql
http://python.projects.postgresql.org/
Designed for Python 3 - the next generation of Python that's still rather new.
    Feature list:
    prepared statement driven interfaces;
    cluster tools for creating and controlling a cluster;
    support for most PostgreSQL types
    Reference stored procedures as Python functions
COPY Support    
    Use the convenient COPY interface to directly copy data from one connection to another.



ORM's and non-Postgres-specific Database Packages

http://wiki.python.org/moin/HigherLevelDatabaseProgramming

SQLAlchemy
http://www.sqlalchemy.org/
  Provides a SQL Expression language for Python
  And an Object-Relational Mapper (ORM)

Storm
https://storm.canonical.com/
Python ORM by Canonical - the makers of Ubuntu Linux

Python Database Objects(PDO)
http://pdo.neurokode.com/