Improving openemm's pickdist.py performance (with patch)

Use this forum for questions regarding adoption and functionality of OpenEMM

Moderator: moderator

liam821
Posts: 2
Joined: Wed Apr 28, 2010 8:19 am

Improving openemm's pickdist.py performance (with patch)

Post by liam821 »

** I take no responsibility for anything below. It works for us, might not for you. :)

We have a rather large mailing list (in the millions) so i've re-wrote pickdist.py to help improve our sending performance.

I thought i'd share....

We generate our email queue first then run a job to sort the mail queue into subqueues by major ISP (gmail, aol, msn, yahoo etc) and one big catchall for everything else. This allows us to throttle each queue based on how fast email is being accepted.

Our problem was pickdist.py was taking days to complete the full mail queue generation. The pickdist.py process would just max out one cpu core for hours on end. It could only generate about 140k emails an hour.

So i hacked in the Python multiprocessing module (need Python 2.6+) into pickdist.py and added forking support. This got around Python's global interpreter lock and now we can finally see huge performance gains from multi-cpu machines.

By having pickdist.py fork off 12 worker threads (we a 8 cpu core server) and always returning true to the queueIsFree call we went from generating about 140k emails an hour to over 4 million an hour!! Holy moly! I've included my pickdist.py patch below.

Hope this helps some people out!

We're on Centos 5.4 64bit, Python 2.6.5, and OpenEMM 5.4.0. You need Python 2.6+ for the multiprocess library.

Happy Hacking!
liam

You can download our pickdist.py patch here: http://dropzone.slacker.com/~liam/pickd ... cker.patch

A note on the patch. We run OpenEMM 5.4.0 and i don't know if pickdist.py has been changed in later releases. Its a pretty small change though so it should be easy to patch into later versions.

If you want to put back in the queue rate limiting (i removed it) remove True and uncomment the < 5000 line in queueIsFree.

Here is the patch incase the above url stops working

Code: Select all

--- pickdist.py.ORIG	2010-04-28 01:55:09.000000000 -0700
+++ pickdist.py	2010-04-28 01:57:15.000000000 -0700
@@ -20,16 +20,34 @@
 * Reserved.
 * 
 * Contributor(s): AGNITAS AG. 
+*
+*
+* Added mutliprocessing support (forking) to improve performance via the py2.6
+* multiprocessing library.  Modify numthread below to specify how many
+* processes to fork out.
+*
+* Liam Slusser liam@slacker.com
+* April 26th 2010
+*
 **********************************************************************************
 """
 #
 import	os, time, signal
 import	shutil
 import	agn
+# import multiprocessing goodness
+from multiprocessing import Process
 agn.require ('1.5.3')
 agn.loglevel = agn.LV_INFO
 if agn.iswin:
 	import	subprocess
+	
+# the number of forks to spawn to do work
+numthread = 12
+
+# some debugging messages if you want
+debug = 0
+
 #
 class Block:
 	def __init__ (self, path):
@@ -159,7 +177,9 @@ class Pickdist:
 
 # adjust queue size to be processed
 	def queueIsFree (self):
-		return len ([_f for _f in os.listdir (self.queue) if _f[:2] == 'qf']) < 5000
+		#return len ([_f for _f in os.listdir (self.queue) if _f[:2] == 'qf']) < 5000
+		# Why wait, just keep going
+		return True
 	
 	def hasData (self):
 		return len (self.data) > 0
@@ -177,42 +197,68 @@ def handler (sig, stack):
 	global	term
 	
 	term = True
-signal.signal (signal.SIGINT, handler)
-signal.signal (signal.SIGTERM, handler)
 
-if not agn.iswin:
-	signal.signal (signal.SIGHUP, signal.SIG_IGN)
-	signal.signal (signal.SIGPIPE, signal.SIG_IGN)
-#
-agn.lock ()
-agn.log (agn.LV_INFO, 'main', 'Starting up')
-#
-pd = Pickdist ()
-while not term:
-	time.sleep (1)
-	agn.mark (agn.LV_INFO, 'loop', 180)
-	if pd.scanForData () == 0:
-		delay = 30
-		agn.log (agn.LV_VERBOSE, 'loop', 'No ready to send data file found')
+def doPackMove(pd,blk,agn):
+	
+	if blk.unpack (pd.queue):
+		blk.moveTo (agn.mkArchiveDirectory (pd.archive))
 	else:
-		delay = 0
-		while not term and pd.hasData ():
-			if not pd.queueIsFree ():
-				agn.log (agn.LV_INFO, 'loop', 'Queue is already filled up')
-				delay = 180
-				break
-			blk = pd.getNextBlock ()
-			if blk.unpack (pd.queue):
-				blk.moveTo (agn.mkArchiveDirectory (pd.archive))
-			else:
-				blk.moveTo (pd.recover)
-	while not term and delay > 0:
-
-		if agn.iswin and agn.winstop ():
-			term = True
-			break
+		blk.moveTo (pd.recover)
+	
+if __name__ == '__main__':
+	signal.signal (signal.SIGINT, handler)
+	signal.signal (signal.SIGTERM, handler)
+	
+	if not agn.iswin:
+		signal.signal (signal.SIGHUP, signal.SIG_IGN)
+		signal.signal (signal.SIGPIPE, signal.SIG_IGN)
+	#
+	agn.lock ()
+	agn.log (agn.LV_INFO, 'main', 'Starting up')
+
+	#
+	pd = Pickdist ()
+	while not term:
 		time.sleep (1)
-		delay -= 1
-#
-agn.log (agn.LV_INFO, 'main', 'Going down')
-agn.unlock ()
+		agn.mark (agn.LV_INFO, 'loop', 180)
+		if pd.scanForData () == 0:
+			delay = 30
+			agn.log (agn.LV_VERBOSE, 'loop', 'No ready to send data file found')
+		else:
+			delay = 0
+			while not term and pd.hasData ():
+				if not pd.queueIsFree ():
+					agn.log (agn.LV_INFO, 'loop', 'Queue is already filled up')
+					delay = 180
+					break   
+				
+				p = {}
+				# spawn a whole bunch of workers!
+				for proc in range(0,numthread):
+					if not pd.hasData():
+					    break
+					else:
+						if debug:
+							print "%s spawning process" % (proc)
+						blk = pd.getNextBlock ()
+						p[proc] = Process(target=doPackMove, args=(pd,blk,agn))
+						p[proc].start()
+						
+				# wait for workers to finish
+				for proc in range(0,numthread):
+					if p.has_key(proc):
+						if debug:
+							print "%s joining process" % (proc)
+						p[proc].join()
+					
+					
+		while not term and delay > 0:
+	
+			if agn.iswin and agn.winstop ():
+				term = True
+				break
+			time.sleep (1)
+			delay -= 1
+	#
+	agn.log (agn.LV_INFO, 'main', 'Going down')
+	agn.unlock ()
Shuro
Posts: 20
Joined: Mon Oct 26, 2009 1:26 pm
Location: Germany, Pr. Oldendorf

Re: Improving openemm's pickdist.py performance (with patch)

Post by Shuro »

liam821 wrote:** I take no responsibility for anything below. It works for us, might not for you. :)

We have a rather large mailing list (in the millions) so i've re-wrote pickdist.py to help improve our sending performance.
Sounds good, i will try it with openEMM 6.1.
liam821 wrote:We generate our email queue first then run a job to sort the mail queue into subqueues by major ISP (gmail, aol, msn, yahoo etc) and one big catchall for everything else. This allows us to throttle each queue based on how fast email is being accepted.
Can you also tell more about that? My company want to use openEMM with a large amount of recipients, but my Python knowledge is not so good.
liam821
Posts: 2
Joined: Wed Apr 28, 2010 8:19 am

Re: Improving openemm's pickdist.py performance (with patch)

Post by liam821 »

Shuro wrote:
liam821 wrote:We generate our email queue first then run a job to sort the mail queue into subqueues by major ISP (gmail, aol, msn, yahoo etc) and one big catchall for everything else. This allows us to throttle each queue based on how fast email is being accepted.
Can you also tell more about that? My company want to use openEMM with a large amount of recipients, but my Python knowledge is not so good.
Well its not in the python program we do the sorting. All pickdist.py does is calls an external application which converts the xml to sendmail queue files. I do the sorting after all that has finished.

So basically we do this...

a) disable sendmail (so it wont send anything)
b) do the mailing and generate the whole queue folder
c) sort the QUEUE folder for the major carriers and move those files into a different directory.
d) start sendmail, including starting one (or multiple) on each carrier queue.

The sort program i wrote is just in perl and opens each file, figures out where its going, and moves it into the appropriate queue.

liam
gettingoutside
Posts: 2
Joined: Sat May 22, 2010 9:35 pm
Contact:

Anyone able to get this to work in V 6.x?

Post by gettingoutside »

Having some trouble getting a large list, 1million+ to send in a reasonable time. This is our first attempt, and we are learning our lessons along the way.
Best regards,
Don Neske
http://www.GettingOutside.com
Follow GettingOutside on Twitter: http://twitter.com/gettingoutside
maschoff
Site Admin
Posts: 2597
Joined: Thu Aug 03, 2006 10:20 am
Location: Munich, Germany
Contact:

Post by maschoff »

Those changes speed up the unpacking of the XML packages but not the actual delivery (the OpenEMM send statistic shows mails delivered to the sending mailserver, but not successfully delivered mails, because this can take up to 5 days, depending on the receiving mailserver).

Actually, our guess is that sending of big mailings will slow down, because if you fill up the queue at max speed the queue gets very long and processing of it eats up more and more CPU time.

However, this is just our opinion and we are eager to learn your tips to speed up mail delivery of OpenEMM!
OpenEMM Maintainer
gettingoutside
Posts: 2
Joined: Sat May 22, 2010 9:35 pm
Contact:

Campaign is Stuck

Post by gettingoutside »

Well, for us only 60,000 emails have been delivered after several days. It appears stalled. Any ideas?
Post Reply