Python多线程抓取
将前几天初玩Python时写的一小段代码改装成了多线程处理的结构:N个抓取线程加上一个入库进程。基本上注释里已经写得比较清楚了,感觉Python对多线程的支持确实不错,经测试下面的代码在使用100线程抓取时仅占用8MB左右内存和6%左右的CPU:
#encoding=utf-8
import threading
import random
import time
import MySQLdb
import httplib
import sys
from Queue import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
false="false"
true="true"
mysqlconn1 = MySQLdb.connect(host='localhost', user='root', passwd='justyourpasswd',db='python',charset='utf8')
mysqlconn2 = MySQLdb.connect(host='localhost', user='root', passwd='justyourpasswd',db='python',charset='utf8') cur1=mysqlconn1.cursor()
cur2=mysqlconn2.cursor()
class GetData(threading.Thread):
def __init__(self, threadname, queue,start,end):
global mutex
self.sharedata = queue #队列
self.start=start
self.end=end
#print "This is ", self.getName(),self.start
for yphone in range(self.start,self.end,1):
#print "线程", threadname,"正在抓取",yphone,"\r"
yphone=str(yphone)
globals()["conn"+str(threadname)].request('GET', '/ec3/getAccountFee.do?flag=1&mobileId='+yphone+'&partnerId=M79101&provCode=791&servId=00')
#print globals()["conn"+str(threadname)].getresponse().read()
comeback=eval(globals()["conn"+str(threadname)].getresponse().read())
#print comeback.has_key('minFee')
if comeback['moPreFee']!='' and comeback.has_key('minFee')==True:#不支持号段处理问题
#print "Num:",yphone,"Name:",comeback['customName'].replace(" ",""),"Fee:",comeback['moPreFee']
runsql="select * from test where phone="+yphone+" and date=\'"+time.strftime('%Y-%m-%d',time.localtime(time.time()))+"\'"
if comeback['moPreFee']!='':
fee=comeback['moPreFee']
else:
fee='-'+comeback['minFee'] #欠费的处理
mutex.acquire() #获得锁防止Mysql出问题
if cur1.execute(runsql)!=1 :
runsql="insert into test (phone,name,cash,date) values ("+yphone+",\'"+comeback['customName'].replace(" ","").decode('utf-8')+"\' ,"+fee+" , \'"+ time.strftime('%Y-%m-%d',time.localtime(time.time()))+"\')"
self.sharedata.put(runsql)
#print yphone,"已获取"
#else:
#print yphone,"被丢弃"
mutex.release() #释放锁
class AddData(threading.Thread):
def __init__(self, threadname, queue):
global shutdown
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue #获取队列
def run(self):
while "True":
if self.sharedata.qsize()>0: #判断队列长度
runsql=self.sharedata.get()
state=cur2.execute(runsql)
if state==1:
print '队列中有',self.sharedata.qsize(),'条数据'
state=0
time.sleep(0.1)
else:
print "等待数据.."
time.sleep(3.0) #休息一下
if shutdown==1 and self.sharedata.qsize()==0:
break
def main(worker):
global mutex,shutdown
shutdown=0
threads = [] #线程池
queue = Queue() #创建一个队列
mutex = threading.Lock() #创建一个锁
for x in xrange(worker):
threads.append(threading.Thread(target=GetData, args=(str(x),queue,15870090000+x*1000,15870090000+(x+1)*1000))) #创建需要个数的抓取线程
AddDataer = AddData('AddData', queue) #创建接收入库线程
print '启动线程 ...'
for t in threads:
t.start() #启动抓取线程
AddDataer.setDaemon("True") #将接受入库线程设置为Daemon,让其随主进程结束而消亡
AddDataer.start() #启动入库进程
for t in threads:
t.join() #等待所有的抓取线程结束
print '抓取线程已经结束'
shutdown=1
AddDataer.join(120) #120秒后写入进程超时退出
print '所有线程已经结束'
if __name__ == '__main__':
worker=input('设置抓取线程数 ')
if worker < 0 :
worker=1
for c in xrange(worker):
globals()["conn"+str(c)]= httplib.HTTPConnection("payment.umpay.com") #关于globals()和locals() 创建需要个数的httplib实例
main(worker)
记录一下用到的知识点:
关于xrange()和range()的区别,简单的来说range会返回一个list对象而xrange生成的相应的单个数字,所以用xrange做循环的性能比range好,尤其是循环量很大的时候。
关于setDaemon(),线程对象的setDaemon()可用参数参数为bool型。当设置为True时就表示当主线程退出时,子线程不能拖后腿也要一同退出;设置为False的时候,主线程允许子线程“将在外军命有所不受”不要一同退出。需要注意的是setDaemon()方法必须在线程对象没有调用start()方法之前调用,否则没效果。
关于globals()和locals(),locals 和globals是Python的两个内置函数,它们提供了基于字典的访问局部和全局变量的方式,可以随意读取甚至修改变量是其强大之处,其中locals 是只读的而globals 是可读可写的,两者在查找变量时都按照局部、全局、内置的顺序。