Python多线程抓取

注意:本文最后更新于 2596 天前,有关的内容可能已经发生变化,请参考使用。

将前几天初玩Python时写的一小段代码改装成了多线程处理的结构:N个抓取线程加上一个入库进程。基本上注释里已经写得比较清楚了,感觉Python对多线程的支持确实不错,经测试下面的代码在使用100线程抓取时仅占用8MB左右内存和6%左右的CPU:

    
    #encoding=utf-8
    import threading
    import random
    import time
    import MySQLdb
    import httplib
    import sys
    from Queue import Queue
    
    reload(sys)
    sys.setdefaultencoding('utf-8')
    false="false"
    true="true"
    mysqlconn1 = MySQLdb.connect(host='localhost', user='root', passwd='justyourpasswd',db='python',charset='utf8')
    mysqlconn2 = MySQLdb.connect(host='localhost', user='root', passwd='justyourpasswd',db='python',charset='utf8')  cur1=mysqlconn1.cursor()
    cur2=mysqlconn2.cursor()
    
    
    class GetData(threading.Thread):
        def __init__(self, threadname, queue,start,end):
            global mutex
            self.sharedata = queue #队列
            self.start=start
            self.end=end
            #print "This is ", self.getName(),self.start 
            for yphone in range(self.start,self.end,1):
              #print "线程", threadname,"正在抓取",yphone,"\r"
              yphone=str(yphone)
              globals()["conn"+str(threadname)].request('GET', '/ec3/getAccountFee.do?flag=1&mobileId='+yphone+'&partnerId=M79101&provCode=791&servId=00')
              #print globals()["conn"+str(threadname)].getresponse().read()
              comeback=eval(globals()["conn"+str(threadname)].getresponse().read())
              #print comeback.has_key('minFee')
              if comeback['moPreFee']!='' and comeback.has_key('minFee')==True:#不支持号段处理问题
                #print "Num:",yphone,"Name:",comeback['customName'].replace(" ",""),"Fee:",comeback['moPreFee']
                runsql="select * from test where phone="+yphone+" and date=\'"+time.strftime('%Y-%m-%d',time.localtime(time.time()))+"\'"
                if comeback['moPreFee']!='':
                    fee=comeback['moPreFee']
                else:
                    fee='-'+comeback['minFee']  #欠费的处理           
                mutex.acquire()  #获得锁防止Mysql出问题
                if cur1.execute(runsql)!=1 :
                  runsql="insert  into test (phone,name,cash,date) values ("+yphone+",\'"+comeback['customName'].replace(" ","").decode('utf-8')+"\' ,"+fee+" , \'"+ time.strftime('%Y-%m-%d',time.localtime(time.time()))+"\')" 
                  self.sharedata.put(runsql)
                  #print yphone,"已获取"
                #else:
                  #print yphone,"被丢弃"
                mutex.release()  #释放锁
    
     
      
    class AddData(threading.Thread):
        def __init__(self, threadname, queue):
            global shutdown
            threading.Thread.__init__(self, name = threadname)
            self.sharedata = queue  #获取队列
    
        def run(self):
            while "True":
              if self.sharedata.qsize()>0: #判断队列长度
                runsql=self.sharedata.get()
                state=cur2.execute(runsql)
                if state==1:
                  print '队列中有',self.sharedata.qsize(),'条数据'
                  state=0
                  time.sleep(0.1)
              else:
                  print "等待数据.."
                  time.sleep(3.0)    #休息一下
              if shutdown==1 and self.sharedata.qsize()==0:
                break
            
    
    
    def main(worker):
        global mutex,shutdown
        shutdown=0
        threads = []    #线程池
        queue = Queue() #创建一个队列
        mutex = threading.Lock()  #创建一个锁
        
        for x in xrange(worker):
          threads.append(threading.Thread(target=GetData, args=(str(x),queue,15870090000+x*1000,15870090000+(x+1)*1000)))  #创建需要个数的抓取线程
    
        AddDataer = AddData('AddData', queue)  #创建接收入库线程
        print '启动线程 ...'
        for t in threads:
            t.start()                 #启动抓取线程
        AddDataer.setDaemon("True")   #将接受入库线程设置为Daemon,让其随主进程结束而消亡
        AddDataer.start()             #启动入库进程
        for t in threads:
            t.join()                  #等待所有的抓取线程结束
        print '抓取线程已经结束'
        shutdown=1
        AddDataer.join(120)           #120秒后写入进程超时退出
        print '所有线程已经结束'
        
        
    if __name__ == '__main__':
        worker=input('设置抓取线程数 ')  
        if worker < 0 :
          worker=1
        for c in xrange(worker):
          globals()["conn"+str(c)]= httplib.HTTPConnection("payment.umpay.com")  #关于globals()和locals()  创建需要个数的httplib实例
        main(worker)

记录一下用到的知识点:

关于xrange()和range()的区别,简单的来说range会返回一个list对象而xrange生成的相应的单个数字,所以用xrange做循环的性能比range好,尤其是循环量很大的时候。

关于setDaemon(),线程对象的setDaemon()可用参数参数为bool型。当设置为True时就表示当主线程退出时,子线程不能拖后腿也要一同退出;设置为False的时候,主线程允许子线程“将在外军命有所不受”不要一同退出。需要注意的是setDaemon()方法必须在线程对象没有调用start()方法之前调用,否则没效果。

关于globals()和locals(),locals 和globals是Python的两个内置函数,它们提供了基于字典的访问局部和全局变量的方式,可以随意读取甚至修改变量是其强大之处,其中locals 是只读的而globals 是可读可写的,两者在查找变量时都按照局部、全局、内置的顺序。


「倘若有所帮助,不妨酌情赞赏!」

Holmesian

感谢您的支持!

使用微信扫描二维码完成支付


相关文章

发表新评论