User Tools

Site Tools


python_cookbook

This is an old revision of the document!


Table of Contents

Basic Language Features

"Everything is an Object"

And by everything they mean _everything_. For example it is legal to do the following

#define a function (which actually creates a function object)
def function():
    function.count += 1 # count is an attribute of the function object (defined below)
    print "Function called %d times" % function.count
 
function.count=0 #add an attribute to the function ob
function()  # --> Function called 1 times
function()  # --> Function called 2 times
function()  # --> Function called 3 times

Yes that is legal python code. Swallow it.

Global Variables

If a variable name is read in python all namespaces are searched in order, until a variable with that name is found. However if a variable is changed and it does not exist in the local namespace it is created thus shadowing a global variable with the same name.

Note that:

  • this only applies to assignment
  • Modifiying a mutable global variable (eg. adding elements to a list) is possible without declaring the variable as global

To reassign a global variable in a function it needs to be declared as global before it is used:

myGlobalVar = 23
def test():
   global myGlobalVar
   myGlobalVar=42

Some Global Weirdness

Its also interesting to note that (because of the above)

x = 3
def test():
  print x
 
#-->Works
 
 
x=3
def test():
  print x
  x = 7
 
#-->Gives an **error** (variable used before defined).

Therefore if you want to use a global variable its best to simply put the “global myVar” line at the start of the function.

Modules

Modules can be nested in a package structure similar to java. Eg:

  - src
  |    main.py
  |----test
  |        |__init__.py
  |--------|muh
               |---- __init__.py
               |MyMuh.py

The init.py files need to exist in every directory, altough they are allowed to be empty.

In order to import such a nested module the root of the directory structure (in this example the directory src) must be included in PYTHONPATH:

in main.py you have to write

import test.muh.MyMuh as muhModule 
muhModule.doMuh()

Namespaces

(from diveintopython.org]:)

Namespace order # local namespace - specific to the current function or class method # global namespace - specific to the current module # built-in namespace - global to all modules

Accessing locals with the function locals() returns a copy of that namespace. Accessing globals with globals() returns the actual namespace

Sequences

Sorting a Sequence

l=[3,2,15,3,2,1,4]
lSorted = sorted(l)

Default sorting order is ascending

Descending Sort

l = [3,2,15,3,2,1,4]
lReverseSorted = sorted(l, reverse=True)

Sorting a list by a certain element of a list item

Having a sequence consisting of tuples(or sequences) like

mylist=((1,'one'),(0,'zero'),(4,'four'))

you can easily sort them by one element by using the itemgetter function:

from operator import itemgetter
mysorted = sorted(mylist,key=itemgetter(0))

(for the first part of the tuples also.)

If the list item is a class, then a lambda function has to be used:

mySorted = sorted(mylist, key = lambda element: element.myKeyAttribute)

Sorting a list of lists by length

tmp = [[[1,2,3],[3],[6,7]]]
sorted(tmp, lambda x,y: len(x)-len(y))

Sorting values in a Dict

import operator
items = sorted(my_dict.items(), key=operator.itemgetter(1)) # sort by value, itemgetter(0) to sort by key

List intersection/union/difference

Union A∪B

union=A+filter(lambda x:x not in A,B)

Intersection A∩B

intersection=filter(lambda x:x in A,B)

Difference A\B

difference=filter(lambda x:x not in B,A)

Symmetrical difference AΔB

symdifference=filter(lambda x:x not in B,A)+filter(lambda x:x not in A,B)

List comprehension

An easy way to define lists

noprimes = [j for i in range(2, 8) for j in range(i*2, 50, i)]
primes = [x for x in range(2, 50) if x not in noprimes]

Subtracting two lists

To element-wise subtract two lists from each other

# define lists x=[5,6,7]
y=[3,4,5]
# subtract them element-wise
import operator
map(operator.sub, x, y)
>>> [2,2,2]

Flattening lists

l = [[[1,2,3],[4,5,6], [7], [8,9]]]
[item for sublist in l for item in sublist]

Iterating

The docs of itertools have lots of useful recipes.

Favourites:

from itertools import tee, izip
 
def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return izip(a, b)
 
l = range(4)
for i in pairwise(l):
    print i

Dict comprehension

dict([(i, chr(65+i)) for i in range(4)])

Merging Sequences

def merge(*input):
    return reduce(list.add, input, list())

Using:

a=[1,3,4]
b=[5,6]
c=[[a","b","c]]
merge(a,b,c) # [1, 3, 4, 5, 6, 'a', 'b', 'c']

Tuples

"Modifying" a Tuple

Tuples are immutable and can not be modified. In order to change one element in a tuple, a new tuple has to be constructed. For example to add 36 to the 4th element of the tuple:

t=(1,2,3,4,5,6,7,8,9) 
t1 = (t[:3] + (t[3]+36,) + t[4:])

Or simply create a list from the tuple

t=(1,2,3,4,5,6,7,8,9) 
t1 = list (t) 
t1[3] += 36

Lambda functions

Lambda functions are inline functions with a simplified syntax. They are best used for functionality which is not really reusable in other parts of the code. See diveintopython.org for more.

g = lambda x: x*2
# g(3) will return 6

and or

Those two keywords act boolean but they return one of the values they compare. Evaluation is from left to right. 0, //, [], (), {}, and None are false in a boolean context; everything else is true. As can be expected, for and: if every value is true, the last true one is returned. If every value is false, the first false one is returned. For or it is the other way round.

Ternary Operator

In Python 2.5 there is a built in ternary operator (if?then:else ):

x = 12 if (y>0) else -12

Ternary Operator pre Python 2.5

In older Python versions and/or can be used to emulate the ternary operator

1 and "first" or "second" # returns first

However since “” is false in a boolean context

1 and "" or "second" # returns second

so be careful!

A safe way to emulate the ternary operator if?then:else is

result =  ((c>3) and [a] or [b])[0] # if c>3 return a else return b

See diveintopython.org] for more.

Reloading modules

import qgisfleettools as q
q.doStuff()
reload(q)

String Formatting

Use str.format(). Fields can be replaced by index or by name:

print '{1} and {0}'.format('spam', 'eggs') # by index
print 'This {food} is {adjective}.'.format(food='spam', adjective='absolutely horrible') #  by name
print 'The story of {0}, {1}, and {other}.'.format('Bill', 'Manfred', other='Georg') # both

Old approach: % operator

* By Position

" %s , %d , %f "  % (a_string,an_int, a_float)

* By name

" %(name1)s %(name2)d  %(name1)s"  % {"name1":value1, "name2":value2}  #note that name1 is used several times in the template!

* The “locals trick” The function locals() creates a dict from all variables in the namespace. This can be used for easier “by-name” variable substitution:

y=7
print ("value =  %(y)s" % locals() )

Formatting Numbers

i = 4
"%d" % (i,)  # --> "4"
"%4d"  % (i,)  # --> "   4"
"%04d" % (i,)  # --> "0004"
j = 1.3 "%.2f" % (j)  # --> "1.30" ''

Templates

String templates provide a simpler string way for substitutions. Instead of the normal “%”-based substitutions, Templates support “$”-based substitution

from string import Template
s = Template('$who likes $what')
s.substitute(who='tim', what='kung pao')

see http://www.python.org/doc/2.5.2/lib/node40.html for details

Rot 13

"muh".encode('rot13')

Classes

Automatically set parameters of initializer as member variables

This may go against the zen of python (explicit is better than implicit) but it is extremely convenient when dealing with a lot of arguments in the initializer

class MyClassWithLongInitializer:
    def __init__(self, a,b,c,d,e,f,g, x, y=244):
        self.__dict__.update(**locals()) # instead of self.a=a;self.b=b;self.c=c ...

Taking care of boilerplate code

attrs helps with boilerplate code a lot, e.g. initialisation, string representation, comparison.

import attr
@attr.s
class Point3D(object):
    x = attr.ib()
    y = attr.ib()
    z = attr.ib()

CGI Scripting

Content Type

Is set simply by printing the corresponding information at the start of the script:

import cgi
print "Content-Type: text/html\n" # or "Content-Type: image/png\n"  or somesuch

Get Request Parameters

import cgi
sectionId = cgi.FieldStorage()['sectionid'].value

see http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/81547 for a script to create a dict from the fieldStorage

Show errors in output page

The cgitb module provides a special exception handler for Python scripts. (Its name is a bit misleading. It was originally designed to display extensive traceback information in HTML for CGI scripts. It was later generalized to also display this information in plain text.) After this module is activated, if an uncaught exception occurs, a detailed, formatted report will be displayed. The report includes a traceback showing excerpts of the source code for each level, as well as the values of the arguments and local variables to currently running functions, to help you debug the problem. ( text shamelessly stolen from http://docs.python.org/lib/module-cgitb.html )

import cgitb
cgitb.enable()

Commandline Parameters

Simple

import sys
print sys.argv[1]

Advanced

I have found several builtin modules to deal with commandline parameters. The most flexible and object oriented seems to be OptionParser.

Option Parser

from optparse import OptionParser
parser= OptionParser("usage: %prog [options] INPUT_FILE")
parser.add_option("-f", "--file", dest="infile", help="input file")
parser.add_option("-d", "--direction", choices=[[0,1]], dest="direction", help="direction of the road")
parser.add_option("-v", "--verbose",  dest="verbosity", action="count",default=0, help="Increase Verbosity of debugging output: -v -vv -vvv ")
parser.add_option("-s", "--show-invalid-lines",  dest="showInvalidLines", action="store_true",default=False, help="shows...")
(options,args) = parser.parse_args("test -v --file out.txt".split())
if (options.showInvalidLines):
  print "Showing Invalid Lines"
if (options.verbosity > 1):
  print "Very Verbose"

However, optparse is deprecated since 2.7, the (very similar) replacement is argparse: http://docs.python.org/2/library/argparse.html#module-argparse

Compressed Data

Gzip

Reading

import gzip
 
f = gzip.open('file.txt.gz')
file_content = f.read()
f.close()

Writing

import gzip
 
f_out = gzip.open('file.txt.gz', 'wb')
f_out.write(data)
f_out.close()

Config Files

There appear to be several ways to read in config files. This example uses the ConfigParser class.

Config File Layout

Here is an example config file

[DEFAULT]
text:"Das ist ein text"

[General]
muh:"Die Kuh macht muh"
times:3

The DEFAULT Section is special, the names of other Sections can be whatever you like.

Reading the config file

from ConfigParser import ConfigParser
config = ConfigParser()
config.read("test.cfg")
print config.get("General","muh")
config.getint("General", "times")
print config.get("General", "text") # text not defined in General section, but a DEFAULT definition exists
myBool = config.getboolean("General", "myBoolean")

Writing a config file

from ConfigParser import ConfigParser
config = ConfigParser()
config.add_section("Test")
config.set("Test", "Muh", 123)
 
with open('example.cfg', 'wb') as configfile:
    config.write(configfile)

Cryptography

Hashes

import hashlib
m = hashlib.sha512()
m.update("text")
m.update("more text")
print m.hexdigest()

See http://www.python.org/doc/current/lib/module-hashlib.html for details and a list of available hash algorithms.

CSV Files

Reading CSV Files

A CSV file with a header can be read like so:

import csv
for row in csv.DictReader(file(file_name), delimiter=";"):
     print row["id"]

Writing CSV Files

CSVs with headers can also be written with the csv library:

import csv
 
with open('names.csv', 'w') as csvfile:
    fieldnames = ['first_name', 'last_name']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
 
    writer.writeheader()
    writer.writerow({'first_name': 'Baked', 'last_name': 'Beans'})
    writer.writerow({'first_name': 'Lovely', 'last_name': 'Spam'})
    writer.writerow({'first_name': 'Wonderful', 'last_name': 'Spam'})

Database

Server-side cursors

Database cursors are a construct to traverse over records.

When data is selected, it usually gets transferred to the client process first - the cursor is on the client side.

For large result sets this poses an obvious problem if the client lacks the required resources. Some drivers support server-side cursors. Using those, the client can control how much data it wants to receive at once, thus being able to handle even very large datasets. For recipes see

Postgres

sudo apt-get install python-psycopg2

Selecting

import psycopg2
conn = psycopg2.connect(host="localhost", database="mydb", user="soma", password="xxx")
cursor = conn.cursor()
cursor.execute("select * from timeseries limit 10")
for row in  cursor:
  print row

Using column names to index rows:

import psycopg2
import psycopg2.extras
conn = psycopg2.connect(host="localhost", database="mydb", user="soma", password="xxx")
dict_cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
dict_cur.execute("SELECT a,b,c FROM table")
for r in cursor:
  print r["a"]

PostgreSQL server-side cursor

In order to use server-side cursors with PostgreSQL/psycopg2 one just needs to use psycopg2's named cursor and set an adequate itersize

cursor = conn.cursor(name='a_cursor')
cursor.itersize = 10000
 
for row in cursor:
    pass

For details see e.g. psycopg2's server-side doc on cursors and the PostgreSQL cursor doc

Batch insert

import psycopg2
conn = psycopg2.connect(host="localhost", database="mydb", user="soma", password="xxx")
cursor = conn.cursor()
INSERT = "INSERT INTO mytable (a,b,c) values (%s,%s,%s)"
data = [(12,'meas',14),(14,'est',11)]
cursor.executemany(INSERT, data)

Select into numpy Array

import numpy
import psycopg2 as pgdb
 
...
cursor = conn.cursor()
cursor.execute('SELECT a, b FROM demo')
result = numpy.fromiter((tuple (row) for row in cursor), dtype=[('a',float), ('b', float)], count = cursor.rowcount)

I had some trouble getting this to work with datetime. Here is a workaround (not efficient, but working)

import dateutil
import numpy
 
...
 
cursor.execute('SELECT time, value FROM table') # query
 
data = [(dateutil.parser.parse(time),value) for (time,value) in cur] # convert time column to datetime objects
result = numpy.array(data, dtype=[('time',object), ('value', float)]) # convert to numpy array

Inserting

import psycopg2
conn = psycopg2.connect(host="localhost", database="mydb", user="soma", password="xxx")
cursor = conn.cursor()
try:
  cursor.execute( "INSERT INTO timeseries_valuetype (name, unit) VALUES ( %(name)s, %(unit)s )", {"name":"mph", "unit":"mp/h"});
  conn.commit()
except pgdb.DatabaseError, details:
  print "Got a DatabaseError, details are: " + str(details) 
  conn.rollback()

Note:The parameter (%(name)s ) for this style of placeholder variable is a dictionary that maps the names of the placeholders in the first part to values. The s indicates type string.

Copy

The psycopg2 driver offers superior batch inserting performance using the copy_from method:

import psycopg2
import cStringIO
 
# create data to copy (basically a csv file)
data = cStringIO.StringIO() # use cStringIO for best performance
data.write(u"1,1000\n")
data.write(u"2,2000\n")
 
data.seek(0) # jump to beginning of "file"
 
conn = psycopg2.connect(host="localhost", database="mydb", user="soma", password="xxx")
cursor.copy_from(data, 'my_table', sep=",") # copy data to my_table
conn.commit()

Tutorials

SQLite

sudo apt-get install  python-pysqlite2 sqlite3

Note: python-pysqlite2 creates sqlite3 databases. SQLite2 and SQLite3 are not compatible!

Creating a DB Connection

from pysqlite2 import dbapi2 as sqlite
connection = sqlite.connect("test.db") # or use :memory: to create an in-memory database
cursor = connection.cursor()

Attaching a second DB

cursor.execute("attach '/var/fleet/output/trips.db' as trips")

Selecting

cursor.execute("SELECT * from names")
print cursor.fetchall()
#
# or
#
for row in cursor:
  print row[0]
connection.close()

Inserting

cursor.execute ('CREATE TABLE names (id INTEGER PRIMARY KEY, name VARCHAR(50), email VARCHAR(50))') 
cursor.execute('INSERT INTO names VALUES (null, "John Doe", " jdoe@jdoe.zz ")')
print cursor.lastrowid
connection.commit()

Batch insert

cursor.execute( 'create table test(roadid INTEGER, name TEXT)')
values = [(1,'one'),(2,'two')] # anything iterable will do
cursor.executemany("INSERT INTO test (roadid, name) VALUES (?,?) ", values)

Creating user defined functions

Aggregate Function

from pysqlite2 import dbapi2 as sqlite
 
class MySum:
  def __init__(self):
    self.count = 0
  def step(self, value):
    self.count += value
  def finalize(self):
    return self.count
 
con = sqlite.connect(":memory:")
con.create_aggregate("mysum", 1, MySum)
cur = con.cursor()
cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test")
print cur.fetchone()[0]

load_extension

According to documentation] it is necessary to explicitly enable load_extension

DB = sqlite.connect( './html_content.db' )
DB.enable_load_extension(True)
DB.execute( "SELECT load_extension('/usr/lib/libspatialite.so.2.0.3')" )

MSSQL

The best way I found so far is to use http://pymssql.sourceforge.net/ pymssql. Unfortunately there is currently no ubuntu package for pymssql so it needs to be installed by hand. Luckily this is quite trivial:

Install pymssql

Start by installing the required dependencies

sudo aptitude install python2.5-dev freetds-dev

Download and the latest pymssql release from https://sourceforge.net/project/showfiles.php?group_id=40059 pymssql at sourceforge and install it:

tar -xvzf pymssql-0.8.0.tar.gz cd pymssql-0.8.0 python setup.py install

Using pymssql

A short usage example stolen from http://john.parnefjord.se/node/43 here

import _mssql
mssql=_mssql.connect('mssql.server.com','databaseuser','password')
mssql.select_db('Northwind')
query="select firstname,lastname,birthdate from dbo.Employees;"
if mssql.query(query):
  rows=mssql.fetch_array()
  rowNumbers = rows[0][1]
  print "Number of rows fetched: " + str(rowNumbers)
  for row in rows:
    for i in range(rowNumbers):
      print str(i) + "\t" + row[2][i][0] + "\t" + row[2][i][1] + "\t" + str(row[2][i][2])
else:
  print mssql.errmsg() print mssql.stdmsg()
  mssql.close()

MYSQL

A good introduction to mysql-python can be found at http://mysql-python.sourceforge.net/MySQLdb.html#mysqldb

Install

sudo apt-get install python-mysqldb

Select

import MySQLdb
connection = MySQLdb.connect("10.101.21.25", "user","pass","database")
cur = connection.cursor()
cursor.execute("SELECT * from timeseries")
result = cur.fetchall()
for r in result:
  print r
  cur.close()
  connection.close()

Insert

import MySQLdb
connection = MySQLdb.connect("10.101.21.25", "user","pass","database")
cur = connection.cursor()
cur.execute(""" INSERT INTO timeseries(day,roadid,laneid,speed) values ('2008-01-01', 1,1,60.0)  """) 
connection.commit()

Batch Insert

c.executemany( """INSERT INTO breakfast (name, spam, eggs, sausage, price) VALUES (%s, %s, %s, %s, %s)""",[
  ("Spam and Sausage Lover's Plate", 5, 1, 8, 7.95 ),
  ("Not So Much Spam Plate", 3, 2, 0, 3.95 ),
  ("Don't Wany ANY SPAM! Plate", 0, 4, 3, 5.95 )
] )

Notes: * there is a mix of types in the values array (strings, ints, floats) but we still only use %s in the format string (otherwise you will get an error!) * executemany() tries to throw the whole values array at MySQL at once. If you try to insert many thousand records, this may exceed MySQL's standard buffer size, and wil give you an exception:

_mysql_exceptions.OperationalError: (1153, "Got a packet bigger than 'max_allowed_packet' bytes") 

To prevent this you need to manually split your value-list into smaller batches like this:

batch_size=20000 # you might have to experiment to find optimal batch_size for your data
while values: # repeat until all records in values have been inserted ''
  batch, values = values[:batch_size], values[batch_size:] #split values into the current batch and the remaining records
  cur.executemany("INSERT INTO timeseries(day,roadid,laneid,intrvl,speed,stddev,count) VALUES (%s,%s,%s,%s, %s, %s, %s)", batch ) #insert current batch ''

Date and time

There are (at least) two separate ways to deal with dates:

  • 'time' - represents a timestamp as a tuple of at least 9 values and may be deprecated by now. (see here )
  • 'datetime' - represents a timestamp as a datetime object ( documentation )

Get Current Date/Time

import datetime
current_time = datetime.datetime.now()

Parse dates

Python provides a dateutil library which can be used to parse many common date formats:

import dateutil.parser
dateutil.parser.parse("2011-05-18 12:30:00")

If you need to parse a custom format, use the strptime function of the datetime library (check here for directives and their meanings):

import datetime
d = datetime.datetime.strptime("20071031T235958","%Y%m%dT%H%M%S")
year = d.year # access fields of datetime

the other is to use the time library (which seems a little less intuitive)

import time
year,month,day = time.strptime("20071003","%Y%m%d")[0:3] # values returned as tuple of ints print year,month,day h,m,s=time.strptime("04:12:02", "%H:%M:%S")[3:6]

To handle ISO 8601 timestamps like 20071031T235958 and get a datetime object:

datetime.datetime.strptime("20071031T235958","%Y%m%dT%H%M%S")

Get Unix timestamp from Python datetime:

calendar.timegm(tuple)

Get Python time from Unix timestamp:

time.gmtime(unixtimestamp)

Date formatting

Print datetime object

dat= datetime.datetime.strptime('2008-04-21 11:00:00', '%Y-%m-%d %H:%M:%S')
dat.strftime('%Y%m%dT%H%M%S')

To print a time object prettily

time.strftime("%Y-%m-%d %H:%M:%S",timeStamp)

Manipulating Dates

import datetime
d = datetime.datetime(2007,11,21,12,33)
d += datetime.timedelta(days=4,hours=2,minutes=40,seconds=20,milliseconds=300);
x=datetime.now()
x.replace(minute=20)

Difference of 2 Dates

Getting the difference between 2 datetime-objects is easy:

testTime1=datetime.datetime(2001,1,1,0,0,0)
testTime2=datetime.datetime(2009,4,23,12,34,45)
difference=testTime2-testTime1
print difference #the effect of these few lines: 3034 days, 12:34:45

But you also can split this difference into weeks,days,minutes,hours,…

weeks, days = divmod(difference.days, 7)
minutes, seconds = divmod(difference.seconds, 60)
hours, minutes = divmod(minutes, 60)

Timezones

As far as I know datetime.strptime ignores the timezone information, and always creates naive (timezone-unaware) datetimes. Often this is not what you want. In order to attach a timezone to a naive timestamp, and then convert it to a local time use the following:

sudo aptitude install python-tz

Attach Timezone to a "dumb" timestamp

import pytz
import datetime
 
ts = datetime.datetime.now() # create a dumb timestamp
tz_vienna = pytz.timezone("Europe/Vienna")
localized_ts = tz_vienna.localize(ts) # this just assumes that the timestamp is "right" and attaches the timezone. But it DOES correctly handle daylight savings time

Convert a Time

import datetime
import pytz
 
# create naive timestamp
naive_time = datetime.datetime.strptime("24.11.11 12:46:25", "%d.%m.%y %H:%M:%S")
 
# attach timezone
tz_vienna = pytz.timezone("Europe/Vienna")
local_time =  tz_vienna.localize(naive_time)
 
# convert to utc
utc_time = local_time.astimezone(pytz.utc)

External Commands

To simply run an external command.

import os
exitValue = os.system("ls")

Note that this returns the commands exit code shifted by 8 bits! (dont ask.. read http://blog.tsul.net/2008/04/ossystem-and-its-return-value.html)

If you want the programs exit code it is probably easier to do

import subprocess
exitCode = subprocess.call([[ls","-a]])

Passing a list with pieces of commandline is often not very handy. Using call like so

import subprocess
exitCode = subprocess.call("ls -a", shell=True)

means (surprise!):

“the specified command will be executed through the shell. This can be useful if you are using Python primarily for the enhanced control flow it offers over most system shells and still want access to other shell features such as filename wildcards, shell pipes and environment variable expansion.”

It also means a SECURITY HAZARD if the input to the command comes from untrustable sources! (from http://docs.python.org/dev/library/subprocess.html#frequently-used-arguments ) So use with care.

To execute an external command and get the output use

import commands
output = commands.getoutput("grep 'muh' input.txt")

File handling

Reading

Reading file line per line

(from here] : Remember that line is a string even if it looks like a number)

infile = open( infilename, "r" )
for line in infile:
  # Do stuff with line. # e.g. num = int( line )
infile.close()

Filter input lines in finite time

Reading in a whole file and filtering afterwards is rather slow. For now it's using grep because I have not found out how speed optimization can be done otherwise. Giving grep the -s flag prevents us from having grep's error message in our lines list. We can thus assume that we either have the expected lines or none at all.

import commands
lines =  commands.getoutput("grep -s " + myregex + " " + myfile).strip().split("\n") # we have a list of the lines that matched the regular expressions now
for line in lines:
  if(len(line)>0):
    #do something

Writing

f = file("test.txt","w")
f.write("muh")
f.close()

The mode string can be “w” (write) or “a”(append) , if binary data needs to be written use “wb” or “ab”

Files with Umlauts

One way to handle nasty Umlauts: (check here to find the correct codec)

import codecs
inputfile = codecs.open("something.csv", "r","latin1")
for i, line in enumerate(inputfile):
  print line
inputfile.close()
 
outputfile = codecs.open("somethingelse.csv","w","latin1")
outputfile.write("Mäh, öh, blüb!")
outputfile.close() ''

Another way:

fileencoding = "iso-8859-1"
for raw in file("Taxistandplaetze(Sektoren).csv"):
  print raw.decode(fileencoding)

Convert string to another encoding:

import codecs
 
inputfile = codecs.open("something.csv", "r","latin1")
line = inputfile.next()
ascii_line = line.encode("ascii","ignore") 
# if a character can not be encoded in this encoding python would normally raise an exception
# 'ignore' tells the encoder to ignore such errors. other options are 'replace' , 'xmlcharrefreplace', 'backslashreplace'
# see http://docs.python.org/library/codecs.html

File number support

(Found on a mailing list)

from itertools import izip, count
 
def enumerate(iterable, start=0):
  return izip(count(start), iterable) # redefine enumerate
 
for i, line in enumerate(infile):
  print "line number: " + str(i) + ": " + line.rstrip()

Replace lines in-line

import fileinput
for line in fileinput.input(onefilename, inplace=1):
  print line.replace(old,new)

Checking if a File/Directory Exists

import os
os.path.exists("/path/to/some/where")

Get filename from absolute path

import os
 
path,filename = os.path.split(absolute_path)

Creating Directories

import os
if not os.path.isdir(dir):
  os.makedirs(dir) # creates all non-existent directories in the path of dir

Working with the Working-Directory

(yes its a bad pun, I know)

import os
current_work_dir = os.getcwd()  #get current working dir
os.chdir(new_work_dir)          #change working dir ''

If your script needs to change the working directory I would strongly suggest not to rely on relative paths. Use os.getcwd() to store the script's base path and construct absolute paths for every os.chdir() you are going to do!

Deleting a Directory

Deleting an empty directory can be done with os.rmdir() but most of the time your directory will not be empty. If you want to do the equivalent of an rm -rf dir_to_delete use

import shutil
shutil.rmtree("dir_to_delete")

Listing Directory Contents

If you want to use wildcards try:

import glob
dir_contents = glob.glob("/home/soma/*.txt")

An alternative would be to use os.listdir. The following script lists all files in the current directory (it filters out subdirectories)

import os
dirContents = os.listdir(".")
for aFile in (c for c in dirContents if not os.path.isdir(c)):
  print "a file: %s", aFile

Temporary Files/Directories

import tempfile
tempfile.mkstemp() # create temporary file
tempfile.mkdtemp(".tmp", "backup_") #create a temp directory with prefix and suffix. eg: /tmp/backup_rt5JWA.tmp

JSON-RPC

For a json-rpc library take a look at http://json-rpc.org/ . But be warned right now the ServiceProxy does not set the content-type and might not work with some servers.

Client

Using the python json library it is very easy to create a simple ServiceProxy client (code mostly stolen from json-rpc.org but added content-type. I am posting this here because the client is so simple that installing an extra library might be overkill).

# Code mostly from http://json-rpc.org/
 
import urllib2
import json
 
# Define helper classes
 
class JSONRPCException(Exception):
    def __init__(self, rpcError):
        Exception.__init__(self)
        self.error = rpcError
 
class ServiceProxy(object):
    def __init__(self, serviceURL, serviceName=None):
        self.__serviceURL = serviceURL
        self.__serviceName = serviceName
 
    def __getattr__(self, name):
        if self.__serviceName != None:
            name = "%s.%s" % (self.__serviceName, name)
        return ServiceProxy(self.__serviceURL, name)
 
    def __call__(self, *args):
         postdata = json.dumps({"method": self.__serviceName, 'params': args, 'id':'jsonrpc'})
 
         req = urllib2.Request(url=self.__serviceURL, data=postdata)
         req.add_header("Content-Type", "application/json")
         respdata = urllib2.urlopen(req).read()
 
         resp = json.loads(respdata)
         if resp['error'] != None:
             raise JSONRPCException(resp['error'])
         else:
             return resp['result']
 
 
# Usage Example
 
sp = ServiceProxy('http://some-jsonrpc-service.com/service')
result  = sp.someMethod(1,"two",3)

Logging

Logging to Console

# Initialize the Logger
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)-15s\t%(name)-5s\t%(levelname)-8s\t%(message)s")
logging.info("Logging Initialized")

Logging to File

import logging
logging.basicConfig(filename="mylog.log", level=20, format="%(asctime)-15s %(levelname)s\t(%(filename)s:%(lineno)d) -  %(message)s")
logging.info("Logging Initialized.")

see also:

Mathematics

Rounding

import math
print math.floor(2.4)# --> 2.0
print math.ceil(2.4) #  --> 3.0
print round(2.4)     # --> 2
print int(2.4)       # --> 2
print round(2.6)     # --> 3
print int(2.6)       # --> 2 Conversion to int **truncates** the comma, the round function rounds!
# rounding errors
round(26.9403314917,2) # --> 26.940000000000001
myvar = round(26.9403314917,2)
print myvar            # --> gives 26.94, which is a good enough estimation in most cases

int to bin

From here:

count=24
n=286476
print  "".join([str((n >> y) & 1) for y in range(count-1, -1, -1)])

Transpose of a matrix

Entries are stored row by row in list of lists (or tuples)

>>> x = [[1, 2, 3], [4, 5, 6]] 
>>> zip(*x) [(1, 4), (2, 5), (3, 6)]

Machine Learning

k-means clustering

Attention: Make sure your data does not contain NANs!

import pylab
import numpy
import scipy.cluster.vq
 
# the data to cluster
timeseries = numpy.array([numpy.random.rand(10) for i in range(10)])
 
# find 5 centroids
centroids, x = scipy.cluster.vq.kmeans(timeseries, 5)
 
# assign each timeseries to a centroid
idx,_ = scipy.cluster.vq.vq(timeseries,centroids)
 
# plot centroids and corresponding timeseries
pylab.figure()
for i in range(5):
    pylab.subplot(510+i)
    cluster = i
    pylab.plot(timeseries[idx==cluster].T, color='b')
    pylab.plot(centroids[cluster].T, color='r')

Hierarchical clustering

Attention: Make sure your data does not contain NANs!

import pylab
import numpy
import scipy.cluster.hierarchy as hcluster
 
timeseries = numpy.array([numpy.random.rand(10) for i in range(50)])
 
# get a cluster-id for every timeseries
idx = hcluster.fclusterdata(timeseries, 4.0, criterion='maxclust', method='complete')
 
# calculate the centroid for every cluster
centroids={}
for i in set(idx):
    centroid = pylab.mean(timeseries[idx==i],0)
    centroids[i] = centroid
 
# plot centroids and corresponding timeseries
fig = pylab.figure()
for i in range(0, len(centroids)):
    subplot = len(centroids)*100 + 10 + i+1
    pylab.subplot(subplot)
    cluster = i+1
    pylab.plot(centroids[cluster], color='r', linewidth=2)
    pylab.plot(timeseries[idx==cluster].T, color='b')
    pylab.axis(ymin=0)

PCA

import pylab
import numpy
import mlpy
 
# 2 dimensional data
data = numpy.array([[1,1], [2,2.3], [3, 3.4]])
pylab.scatter(data[:,0], data[:,1], label="orig_data")
 
pca = mlpy.PCA()
pca.learn(data) 
 
# plot principal components
coef = pca.coeff() # column1=pc1 , column2=pc2
pylab.plot([0,coef[0,0]] ,[0, coef[0,1]], '-r', label="first PC" )
pylab.plot([0,coef[1,0]] ,[0, coef[1,1]], '-b', label="second PC")
 
# dimensionality reduction / reconstruction
z = pca.transform(data,k=1) # reduce data to the first principal component
rec = pca.transform_inv(z)  # reconstruct the original data from the first principal component
 
# plot reconstructed data
pylab.plot(rec[:,0], rec[:,1], '+r', label="reconstructed data")
pylab.legend()

Mixture Models

Try PyMix

Networking

Simple HTTP Download

import urllib
url = urllib.urlopen("http://10.101.21.115:8080/display/servlet/graph?chart=avg&day=20061018&sectionid=0303-0304")
f = file("out.png", "w")
f.write(url.read()) 
f.close()

Very simple HTTP Servers

With SimpleHTTPServer contents of the current directory can be served via HTTP.

# serve current directory @ port 8000
python -m SimpleHTTPServer

The port can be supplied as first argument. For ports < 1024 root privileges are required (which is not recommended due to security problems on StackOverflow)

sudo python -m SimpleHTTPServer 80

Simple HTTP Servers

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from urlparse import urlparse, parse_qs
 
 
class MyServer(BaseHTTPRequestHandler):
 
  def do_GET(self):
    params = parse_qs(urlparse(self.path).query) # get request parameters    
    self.send_message("hello " + params['name'][0])
 
  def do_POST(self):
    content_len = int(self.headers.getheader('content-length'))
    post_body = self.rfile.read(content_len)
 
    self.send_message("Ok. Got Message " + post_body)
 
  def send_message(self, message):
    self.send_response(200, 'OK')
    self.send_header('Content-type', 'text/html')
    self.end_headers()
 
    self.wfile.write(message)
 
  @staticmethod
  def serve_forever(port):
    HTTPServer(('', port), MyServer).serve_forever()
 
if __name__ == "__main__":
  MyServer.serve_forever(8080)

see http://www.doughellmann.com/PyMOTW/BaseHTTPServer/index.html for a lot of interesting remarks

Socket Server

import SocketServer
 
class EchoRequestHandler(SocketServer.BaseRequestHandler ):
    def setup(self):
        print self.client_address, 'connected!'
        self.request.send('hi ' + str(self.client_address) + '\n')
 
    def handle(self):
        data = 'dummy'
        while data:
            data = self.request.recv(1024)
            self.request.send(data)
            if data.strip() == 'bye':
                return
 
    def finish(self):
        print self.client_address, 'disconnected!'
        self.request.send('bye ' + str(self.client_address) + '\n')
 
#server host is a tuple ('host', port)
server = SocketServer.ThreadingTCPServer(('', 50008), EchoRequestHandler)
server.serve_forever()

Using FTP

FTP Download

from ftplib import FTP 
 
ftp = FTP()
ftp.connect("127.0.0.1", port=21)
ftp.login("username", "password") 
 
ftp.cwd('dir/subdir/subdir')
 
local_file = open("local_file.xml", 'wb')
cmd = 'RETR ' + "file.xml"
ftp.retrbinary(cmd, local_file.write)
 
f.close()
ftp.quit()

FTP Upload

from ftplib import FTP 
 
ftp = FTP("ftp.theftp.org")
ftp.login("username", "password") 
 
ftp.cwd('subdirectory/subdirectory')
 
f = open(filepath, 'rb')
cmd = 'STOR ' + fileName 
ftp.storbinary(cmd, f)
 
f.close()
ftp.quit()

Numpy

After having to reduce the memory footprint for one of my scripts I realized how much more efficient numpy arrays are compared to the default python lists. If you have large arrays of data you should strongly consider using numpy.

External Documentation

Creating Arrays

import numpy
arr =  numpy.zeros((96,4)) # create two dimensional array with values initialized to 0

Additional views on arrays

Use views on arrays to add new functionality, reshape arrays, etc. - all without copying the actual data

import numpy as np
 
x = np.array([(1, 2),(3,4)], dtype=[('a', np.int8), ('b', np.int8)]) # access via z["a"]
z = x.view(np.recarray) 
print z.a # recarrays allow access with attributes

Read a CSV File into a numpy ndarray

Easy way with numpy.genfromtxt

import numpy
data= numpy.genfromtxt("data.csv", delimiter=";", names=True)
 
#accessing data
column_x = data["x"] # access a column
row1 = data[1] # access a row
row1_x = data[1]["x"] # access column "x" of row 1

Another, more verbose, method ( TODO: remove )

import numpy
 
lines = file("cv_in.txt").readlines()
rows = len(lines)
cols = len(lines[0].strip().split("\t"))
values = numpy.zeros( (rows,cols))
 
count = 0
for line in lines:
  values[count,] = [ float(x) for x in line.strip().split("\t") ]
  count += 1

Optimisation

Cython

A nice introduction can be found here http://www.perrygeo.net/wordpress/?p=116

Pandas

Parallel Computing

Threading vs. Multiprocessing

Python < 2.6 supports threads trough the “threading” module. An important note however: CPython currently has something called a Global Interpreter Lock (http://en.wikipedia.org/wiki/Global_Interpreter_Lock). In short this means that only one thread is allowed to use the Python interpreter. As a consequence this highly limits the concurrency of a single process with multiple threads.

What this means is that in Python < 2.6 you will not gain performance from running a python numbercrunching application with multiple threads! You will only gain performance if your threads are strongly IO bound!

Starting with Python 2.6 the multiprocessing module is made available. This module circumvents the GIL by using subprocesses and is the preferred option to parallelize python calculations (see http://docs.python.org/library/multiprocessing.html )

Multiprocessing

If you want true parallelism (without GIL restrictions) use the multiprocessing library. Here is a simple example using queues and Worker Processes:

import multiprocessing as mp
import Queue  # for catching Queue.Empty
 
import random
import time
 
 
class Result(object):
    def __init__(self, v):
        self.v = v
 
    def value(self):
        return self.v
 
 
def iterate(queue):
    """helper to iterate over a queue until it is empty"""
    while True:
        try:
            yield queue.get_nowait()
        except Queue.Empty:
            break
 
 
class Worker(mp.Process):
    def __init__(self, i, qin, qout):
        super(Worker, self).__init__()
        self.id = i
        self.qin = qin
        self.qout = qout
 
    def run(self):
        for data in iterate(self.qin):
            print "worker %d has data: %d" % (self.id, data)
            time.sleep(random.randint(0, 1))
            self.qout.put(Result(data * 2))
 
        print "Ending Worker %d" % self.id
 
 
 
mgr = mp.Manager()  # creating Queues without a Manager will lead to strange behaviour
 
q_in = mgr.Queue()
q_out = mgr.Queue()
 
# create data
for i in range(250):
    q_in.put(i)
 
# create workers
workers = []
for i in range(20):
    w = Worker(i, q_in, q_out)
    workers.append(w)
    w.start()
 
# wait for the workers to finish
for w in workers:
    w.join()
 
# process results
while not q_out.empty():
    result =  q_out.get()
    print result.value()

Threading

Create a Worker Class

import threading
 
class Worker(threading.Thread):
  def run(self):
    doSomeWork()
 
for i in range(3):
   worker = Worker()
   worker.start()

Execute a method as a thread

def worker_method():
  doSomething()
 
t = Thread(target=worker_method)
t.start()

Queues

Queues can be used to pass data around in a thread-safe manner. See http://www.python.org/doc/2.5.2/lib/QueueObjects.html for details.

from threading import Thread
from Queue import Queue
 
def worker():
  while True:
    item = q.get()  # this call BLOCKS if the queue is empty.
                    # Use get_nowait()  if you would rather like an Exception.
    do_work(item)
    q.task_done()  # THIS is important!
 
q = Queue()
for i in range(num_worker_threads):
  t = Thread(target=worker)
  t.setDaemon(True)
  t.start()
  for item in items_to_process:
    q.put(item)
 
  q.join()       # blocks until all itmes are processed

Note' the 'q.task_done() call in the worker. This tells the queue that processing one item has finished. If your workers for some reason dont call this ( eg. because of an Exception) then the call q.join() will NEVER UNBLOCK!

External Documentation

Pylab recipes

Regular Expressions

Find out if something does (not) match

import re
pattern = re.compile("^\d")
match = pattern.match("my text")
if match:
  print "line starts with number"
else:
  print "line does not start with number"

Split a String

To split a string by a simple delimiter just use string.split(). For a more complex splitting operation:

import re
s = "a 1 and 2 and 3 and 4"
a = re.split("\d", s) # every number is a delimiter

Extract Data Using Subgroups

Single Match

Either use a compiled pattern:

import re
p = re.compile("x=(\d+?).*?y=(\d+?)")
match = p.search("blah x=3 y=4 and ")
(x,y) = match.groups()
print x,y

Or use the package function:

import re
matches= re.search("x=(\d+?)[ ]*?y=(\d+?)[ ]","blah x=3 y=4 and ")
(x,y) = matches.groups()
print x,y

Multiple Matches

Either use a compiled pattern:

import re
p = re.compile("x=(\d+?).*?y=(\d+?)")
mItr = p.finditer("x=3 y=4 and then x=5, y=7  and x=8, y=9") # return iterator<tuple>
for m in mItr :
  (x,y) = m.groups()
  #OR: matches = p.findall("x=3 y=4 and then x=5, y=7  and x=8, y=9") # return array of tuples

Or use the package function

import re
matches = re.findall("x=(\d+?).*?y=(\d+?)","x=3 y=4 and then x=5, y=7  and x=8, y=9") # or re.finditer

Selecting Groups From a Match

The .groups() method can optionally take a sequence that indicates which groups should be returned (group names or indices). eg.

(x,y) = match.groups([1,4]) #only get first and fourth group from the match

Sending Mail

import smtplib
msg="Subject: Subject \n\nBlablablablabla"
smtp = smtplib.SMTP("localhost")
smtp.sendmail("root", reciever, msg)
smtp.quit()

Scipy

Installing

sudo aptitude install python-scipy

Interpolation

Interpolation is the process of using a set of data values for a function to determine the missing values of that function. Scipy provides a lot of functionality for this. See here]

Simple Example:

import numpy
import scipy.interpolate
 
orig_data = numpy.array([1,2,3,0,5,0,5,0,7]) # data to smooth
 
# find x,y positions between which to interpolate
x_data = [i for i in range(len(orig_data)) if orig_data[i] > 0] # indices where orig_data is valid
y_data = orig_data[x_data]  # the valid data in orig_data at the corresponding indices
spline = scipy.interpolate.splrep(x_data,y_data,s=200) # calculate the spline
smoothed_data = scipy.interpolate.splev(range(len(orig_data-1)), spline) # calculate a complete series
 
# optional: plot data
import pylab
pylab.plot(orig_data)
pylab.plot(smoothed_data)

Regression

Linear Regression

import numpy
import scipy
 
# data to fit
x = numpy.arange(0,9)
y = [19, 20, 20.5, 21.5, 22, 23, 23, 25.5, 24]
 
# calculate regression parameters ( y_fitted = a + bx ) 
[a,b] = scipy.polyfit(x,y,1)
 
# calculate fit
y_fitted = scipy.polyval([a,b],x)

Ordinary Linear Least Squares Fit using mlpy

import numpy
import pylab
import mlpy
 
#  data to learn
x = numpy.random.normal(1, 5, 50)
x = x.reshape(-1,1) # need to transform x, the features of each datapoint must be in one row
y = numpy.random.normal(2,2, 50)
 
# Ordinary least squares fit
ols = mlpy.OLS()
ols.learn(x,y)
 
# predict data using the learned regression
x1 = numpy.arange(-20,20,0.5).reshape(-1,1) # features of one input-point must be in a row
y1 = ols.pred(x1)
 
# plot
pylab.scatter(x,y)
pylab.plot(x1,y1)

Smoothing Data

A smoothing function that does exactly the same as the Matlab function “smooth” (from http://www.scipy.org/Cookbook/SignalSmooth )

def smooth(x,window_len=5,window='flat'):
    if x.ndim != 1:
        raise ValueError, "smooth only accepts 1 dimension arrays."
    if x.size < window_len:
        raise ValueError, "Input vector needs to be bigger than window size."
    if window_len<3:
        return x
    if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
        raise ValueError, "Window has to be one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
 
    s=numpy.r_[2*x[0]-x[window_len-1::-1],x,2*x[-1]-x[-1:-window_len:-1]]
    if window == 'flat': #moving average
        w=numpy.ones(window_len,'d')
    else:
        w=eval('numpy.'+window+'(window_len)')
 
    y=numpy.convolve(w/w.sum(),s,mode='same')
    return y[window_len:-window_len+1]

Spline Polynoms

import numpy
import scipy.signal
y = numpy.array([41.621207814814809, 42.328298238095236, 45.881729878787887, 43.800834224999996])
y_smoothed = scipy.signal.cspline1d(y)

t-test

from scipy import stats
import numpy
import statistics
 
# http://www.biostathandbook.com/onesamplettest.html
data = [120.6, 116.4,117.2,118.1,114.1,116.9,113.3,121.1,116.9,117.0]
 
m = sum(data)/len(data)
 
null_hypothesis = 120
 
t_value, p_value = stats.ttest_1samp(data, null_hypothesis)
 
print(statistics.stdev(data))
print(numpy.std(data, ddof=1))
 
print(t_value, p_value)

Statistical Functions

Pylab

Pylab provides a number of simple statistical functions:

  • pylab.mean
  • pyalb.median
  • pylab.var

Python-Statlib

The google project Python-Statlib is the most complete statistical library for python I have found so far.

Installation

Unfortunately there is currently no ubuntu package, so you will have to download the latest .tar.gz from http://code.google.com/p/python-statlib/downloads/list and install it by extracting it and running:

sudo python setup.py install

Example

from statlib import stats
mean = stats.mean([1,2,3,4,5])

A complete list of the supported statistical functions can be found at http://code.google.com/p/python-statlib/wiki/StatsDoc

Running Median

Normally it is advisable to use pylab.median …. but if you have so many values that they don't fit into memory anymore, there is a trick that can give you a rough estimate for the median:

def running_median(v,step_size=0.01):
    """ Estimate median from v. Warning: this will be inaccurate unless there are MANY values in v! """
    median = v[0]
    for i in range(len(v)):
        inc  =  step_size if v[i] > median else -step_size
        median += inc
    return median
 
 
# test accurracy of the approach        
errors=[]
for i in range(500):
    print i
    v = numpy.random.uniform(0,100,size=50000)
    errors.append(running_median(v) - pylab.median(v))
 
pylab.hist(errors,bins=50)

(found in comments here - damn I hate paywalls. I'd love to read the piglet tracking paper!)

Testing

unittest

Basics

import unittest
class CalculationTest(unittest.TestCase):
	# before each test
	def setUp(self):
		self.x = 23
 
	# tests = methods whose name starts with 'test', executed in order of their function name
	def testGetArithAvg(self):
		self.assertEquals(23, self.x)
 
# execute all tests
if __name__ == '__main__':
	unittest.main()

classwide Setup and teardown

There are two class methods that are called before/after tests in an individual class run. setUpClass and tearDownClass are called with the class as the only argument and must be decorated as a classmethod():

import unittest
 
class MyTest(unittest.case.TestCase):
 
    @classmethod
    def setUpClass(cls):
        cls.msg = "I am upset!"         
 
 
 
    def test_hello(self):
        print self.msg
 
    @classmethod
    def tearDownClass(cls):
        cls.msg = None
 
if __name__ == '__main__':
    unittest.main()
 

Test for exceptions

Since python 2.7 this is best done by using the context manager returned by unittest.assertRaises()

with self.assertRaises(SomeException):
    test_something_that_raises_exception()

For older python versions see this SO question.

Web Services

There are several python frameworks available.

ZSI

ZSI is a framework which supports webservice servers and clients. And has support for wsdl2python as well as dynamic webservice calls via a ServiceProxy. See http://pywebsvcs.sourceforge.net/holger.pdf

sudo aptitude install python-zsi

Accessing a Webservice with ZSI

import sys
from ZSI.ServiceProxy import ServiceProxy
wsdlUrl='http://www2.meteomedia.at/wetter_verkehr/weather_data.php?wsdl'
service = ServiceProxy(wsdlUrl, tracefile=sys.stdout)
service.getLastCalculationTime()

ServiceProxy and caching

A very important note: ZSI ServiceProxy creates a cache where it puts all the python classes generated from a WSDL. AND DOES NOT REFRESH THAT CACHE for you. So if you are wondering why changes made to a WSDL you are fetching are not reflected in the Python classes you try to use in your client, have a look at ~/.zsi_service_proxy_dir (ZSI 2.1) or ./.service_proxy_dir (ZSI 2.0) and clean up!

It might also be useful to explicitly control which directory is used by:

service = ServiceProxy(wsdlUrl,cachedir='/tmp/zsi_test/', tracefile=sys.stdout)

Datetimes

Datetimes are tricky (see http://pywebsvcs.sourceforge.net/zsi.html#SECTION007600000000000000000 and http://pywebsvcs.sourceforge.net/cookbook.pdf for details): basically ZSI does not expect a datetime to be a string in the standard xs:datetime format, or a python datetime. Instead it expects a python timetuple in UTC, which means timezones are not supported.

dt = datetime.datetime.now().timetuple() # valid parameter for a webservice request

Soappy

Soappy is DEPRECATED and should no longer be used

sudo aptitude install python-soappy

See http://www.ebi.ac.uk/Tools/webservices/tutorials/python for tutorials

Accessing a Webservice with Soappy

from SOAPpy import WSDL
wsdlUrl = 'http://1.2.3.4:8000/dynamicroutermodule?wsdl'
service =  WSDL.Proxy(wsdlUrl)
request={}
request[[fromRoadId]]=10400000586647
request[[fromRoadDirection]]=1
request[[toRoadId]]=10400002879088
request[[toRoadDirection]]=1
request[[routingType]]=0
route = service.getRoute(arg0=request)

Writing a report file

docx

rtf

pyth comes with conversion tools but offers no image support.

pyrtf-ng

XML Processing

ElementTree

import xml.etree.ElementTree
 
#
# Parse
#
 
# parse xml from file
root = xml.etree.ElementTree.parse(filename).getroot() # parse() yields an ElementTree object so we need to explicitly call getroot()
 
# parse xml from string
root = xml.etree.ElementTree.fromstring(xml_str) # fromstring() directly yields the root element
 
 
 
#
# Search
#
 
# find a tag via xpath
gisroute = root.find("GisRes/GisRoute")
 
# find multiple tags via xpath
connections = root.findall("ConnectionList/Connection")
 
#
# Access
#
 
# access attributes
route_id = gisroute.attrib["id"]
 
# access text
txt = gisroute.text

DOM

Parsing a Document

import xml.dom.minidom
from xml.dom.minidom import Node
doc = xml.dom.minidom.parse("maps.xml")
for node in doc.getElementsByTagName("Placemark"):
    #do something to node

Getting an Attribute

Extract the “muh” from <node att=“muh” />

    att = node.getAttribute("att")

Getting text from content of node

Extract the “muh” from <parent>MUH</parent>

for node in parent.childNodes:
    if node.nodeType == Node.TEXT_NODE:
        print node.data

Creating a XML doc

from xml.dom.minidom import Document
 
# Create the minidom document
doc = Document()
 
# Create the <wml> base element
wml = doc.createElement("wml")
doc.appendChild(wml)
 
# Create the main <card> element
maincard = doc.createElement("card")
maincard.setAttribute("id", "main")
wml.appendChild(maincard)
 
# Create a <p> element
paragraph1 = doc.createElement("p")
maincard.appendChild(paragraph1)
 
# Give the <p> elemenet some text
ptext = doc.createTextNode("This is a test!")
paragraph1.appendChild(ptext)
 
# save
out=file('out.xml','w')
doc.writexml(out)
out.close()

(example from http://www.postneo.com/projects/pyxml/)

SAX

Design Patterns

Singleton

class Singleton:
    __shared_state = {}
    def __init__(self):
        self.__dict__ = self.__shared_state
 
s1 = Singleton()
s1.x = 1
s2 = Singleton()
s2.x # 1

Iterator

def datetimeIterator(from_date=datetime.now(), to_date=None, delta=timedelta(days=1) ):
    while to_date is None or from_date <= to_date:
        yield from_date from_date = from_date + delta
    return
 
for d in datetimeIterator(datetime.strptime("20090101","%Y%m%d"),datetime.strptime("20090610","%Y%m%d")):
    print datetime.strptime("20090104","%Y%m%d")==d
python_cookbook.1551775847.txt.gz · Last modified: 2019/03/05 09:50 by mantis