Python/psycopg2/PostgeSQL: script for bulk inserts using COPY with progress indicator

Woah, what a title. :) I needed a script for inserting bulk data into a PostgreSQL database. Actually, I had a script already, written in Perl, and it was so slow that I needed a better and faster replacement. As I am slowly replacing all my Bash/Perl scripts with Python-pedants I aimed at doing the same here.

I decided to use psycopg2 for a Python-PostgreSQL binding. The copy_from method proved to be very fast; exactly what I needed. BUT I also needed a progress indicator. And while I’ve found a some people out there looking for exactly the same thing, I couldn’t find a solution. So here’s my script for doing this:

#!/usr/bin/python
import psycopg2
import sys
import os

class ReadFileProgress:

  def __init__(self, filename):
    self.datafile = open(filename)
    self.totalRecords = 0
    self.totalBytes = os.stat(filename).st_size
    self.readBytes = 0

    # skip header line
    self.datafile.readline()
    # count records
    for i, l in enumerate(self.datafile):
      pass
    self.totalRecords = i + 1
    sys.stderr.write("Number of records: %d\n" % (self.totalRecords))
    # rewind
    self.datafile.seek(0)
    # skip header line
    self.datafile.readline()
    # start progress
    self.perc5 = self.totalBytes / 20.0
    self.perc5count = 0
    self.lastPerc5 = 0
    sys.stderr.write("Writing records: 0%")

  # count bytes and display progress while doing so
  def countBytes(self, size=0):
    self.readBytes += size
    if (self.readBytes - self.lastPerc5 >= self.perc5):
      self.lastPerc5 = self.readBytes

      if (int(self.readBytes / self.perc5) == 5):
        sys.stderr.write("25%")
      elif (int(self.readBytes / self.perc5) == 10):
        sys.stderr.write("50%")
      elif (int(self.readBytes / self.perc5) == 15):
        sys.stderr.write("75%")
      else:
        sys.stderr.write(".")

      sys.stderr.flush()

  def readline(self, size=None):
    countBytes(size)
    return self.datafile.readline(size)
 
  def read(self, size=None):
    self.countBytes(size)
    return self.datafile.read(size)

  def close(self):
    sys.stderr.write("100%\n")
    self.datafile.close()

def main():
  config = dict()
  config['tablename']="tablename"
  config['filename']="filename"
  config['connstring'] = "host='?' dbname='?' user='?' password='?'"
  config['droptable'] = False
  config['createtable'] = False
  config['rowsdef'] = "id serial PRIMARY KEY, number integer NOT NULL"
  config['filecolumns'] = ['id','number']

  try:
    # get a connection, if a connect cannot be made an exception will be raised here
    conn = psycopg2.connect(config['connstring'])
    # conn.cursor will return a cursor object, you can use this cursor to perform queries
    cursor = conn.cursor()

    # drop table if requested (and it exists)
    cursor.execute("SELECT * FROM information_schema.tables WHERE table_name=%s", (config['tablename'],))
    if (config['droptable'] and bool(cursor.rowcount)):
      cursor.execute("DROP TABLE "+config['tablename']+";")

    # create the table if requested
    if (config['createtable'] and not bool(cursor.rowcount)):
      cursor.execute("CREATE TABLE "+config['tablename']+" ("+config['rowsdef']+");")

    # create a fileprogress object and copy the data to the database
    datafile=ReadFileProgress(config['filename'])
    cursor.copy_from(file=datafile, table=config['tablename'], sep='\t', null='\N', size=8192, columns=config['filecolumns'])
    datafile.close()

    # commit and clsoe
    cursor.close()
    conn.commit()

    sys.stdout.write("Transaction finished successfully.\n")

  except:
    exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
    sys.exit("Database connection failed!\n ->%s" % (exceptionValue))


if __name__ == "__main__":
  sys.exit(main())

Basically, the progress counter is a wrapper around the file object. It simply outputs the percentage of bytes read. The main program is very straight forward. The script was written with an experiment-specific config file parsing which I do not include here. Thus, you have to find your own way to set the config variables in the beginning of the main program.

Leave a Reply

Your email address will not be published. Required fields are marked *