首页 > 开发 > Python > 正文

Python爬虫开发(三-续):快速线程池爬虫

2016-04-07 09:14:50  来源:极客头条

            0×00 简介
  0×01 功能定义
  0×02 总体流程
  0×03 线程池任务迭代
  0×04 具体实现
  0×05 测试使用
  0×06 结语
0×00 简介   本文算是填前面的一个坑,有朋友和我将我前面写了这么多,真正没看到什么特别突出的实战,给了应对各种情况的方案。多线程那里讲的也是坑。忽然想想,说的也对,为读者考虑我确实应该把多线程这里的坑补完。
  然后决定再以一篇文章的形式讲一下这个轻型线程池爬虫,同时也为大家提供一个思路。代码都是经过调试的,并且留了相对友好的用户接口。可以很容易得添加各种各样增强型的功能。
0×01 功能定义   1.  可选择的单页面爬虫与多页面线程池爬虫
  2.  可定制对HTML的处理
  3.  可定制获取HTML的方式(应对动态页面)
  4.  当设置为非单页面爬虫时,自动启动对当前域名下所有的页面进行深度优先爬取
  5.  自定义线程数
0×02 总体流程   1.png
0×03 线程池任务迭代实现   虽然在上面图中写出了线程池的影子,但是我们还是需要单独拿出来写一下线程池的到底是怎么样工作的,以方便读者更好地理解源代码的内容。
  2.png
0×04 具体实现   到这里相信读者知道用线程池来完成我们需要完成的爬虫了吧。关于具体内容的实现,是接下来我们要讲的。
  1.    依赖:
  我们需要用到这五个模块,我相信大家都很熟悉,那么就不多介绍了,如果有朋友不熟悉的话可以翻到前面的文章重新复习一下。
  threading
  Queue
  urlparse
  requests
  bs4
  2.    类的声明:
  ScraperWorkerBase这个类是完全可以复写的,只要和原有的接口保持一致,可以满足用户的各种各样的需求,例如,定义页面扫描函数需要复写parse方法(当然这些我是在后面会有实例给大家展示)
  那么我们还需要介绍一下其他的接口:
  Execute方法中控制主逻辑,用最简洁的语言和代码表现逻辑,如果有需要自定义自己的逻辑控制方法,那么务必保持第一个返回值仍然是inpage_url
  __get_html_data控制获取html数据的方法,你可以自己定制headers,cookies,post_data
  __get_soup这个方法是以bs4模块来解析html文档
  __get_all_url与__get_url_inpage不建议大家修改,如果修改了的话可能会影响主爬虫控制器的运行
  然后我在这里做一张ScraperWorkerBase的流程图大家可以参考一下
  图片1.png

class ScraperWorkerBase(object):
    """
    No needs to learn how is work,
    rewrite parse_page using self.soup(Beautiful), and return result,
    you can get the result by using
    
        (inpage_urls, your_own_result) urlscraper.execute()
    
    But this class is default for scraper to use,
    To enhance its function , you can completement this class 
    like:
    
    class MyWorker(ScraperWorkerBase):
    
        def parse_page(self):
            all_tags = self.soup.find_all('img')
            for i in all_tags:
                print i
    
    """
    def __init__(self, url = ''):
        
        self.target_url = url
        self.netloc = urlparse.urlparse(self.target_url)[1]
        
        
        self.response = None
        self.soup = None
        
        self.url_in_site = []
        self.url_out_site = []
        

    """override this method to get html data via any way you want or need"""    
    def __get_html_data(self):
        try:
            self.response = requests.get(self.target_url, timeout = 5)
        except:
            return ""
        
        print "[_] Got response"
        return self.response.text

    def __get_soup(self):
        text = self.__get_html_data()
        if text == '':
            return []
        
        return bs4.BeautifulSoup(text)

    def __get_all_url(self):
        url_lists = []
        
        self.soup = self.__get_soup()
        if isinstance(self.soup, type(None)):
            return []
        
        all_tags = self.soup.findAll("a")
        for a in all_tags:
            try:
                #print a['href']
                url_lists.append(a["href"])
            except:
                pass
            
        return url_lists

    
    def get_urls_inpage(self):
        ret_list = self.__get_all_url()
        
        if ret_list == []:
            return ([],[])
        else:
            for url in ret_list:
                o = urlparse.urlparse(url)
                #
                #print url
                if self.netloc in o[1]:
                    self.url_in_site.append(o.geturl())
                else:
                    self.url_out_site.append(o.geturl())
                    
        inurlset = set(self.url_in_site)
            
        outurlset = set(self.url_out_site)
            
        return inurlset, outurlset


    def execute(self):
        inpage_url = self.get_urls_inpage()
        undefined_result = self.parse_page()

        return inpage_url, undefined_result
    
    

    """You can override this method to define your own needs"""
    def parse_page(self):
        pass

                 这个类定义了处理HTML页面的基本方法,如果需要仅仅是获取页面所有的超链接的话,那么最基础的Worker类已经替大家实现了,但是如果需要对某类网站特定元素进行处理,那么完全可以只复写parse_page
  例如:
  3.png
  如果要绕开网站的限制进行爬取数据,就需要复写:
  4.png
  但是如果需要对特定url进行限制,最好不要去复写__get_all_url方法,而应该去复写get_urls_inpage方法
  关于Scraper的类说明:
  这个类显然没有前面的那么好理解,但是如果使用过HTMLParser或者是SGMLParser的读者,肯定是记得那个feed方法的。这与我们要介绍的这个类有一些相似的地方。
  在这个类中,我们建立Scraper对象的时候,需要传入的参数直接决定了我们的线程池爬虫的类型:究竟要不要启动多线程,启动多少个线程,使用哪个处理函数来除了web页面?这些都是我们要考虑的问题。所以接下来我们对这些部分进行一些说明
  这些设定很好理解,在__init__中输入是否是单页面爬虫模式,设定线程数,设定爬虫解析的具体类。然后对应初始化线程池:初始化的时候要生成多个_worker方法,循环工作,然后在_worker方法的工作时完成对传入的实际进行解析的ScraperWorkerBase类进行调用,然后收集结果填入任务队列。
  通过feed的方法来添加目标url,可以输入list,也可以直接输入str对象。
  当不想让Scraper再工作的时候,调用kill_workers就可以停止所有的worker线程。
  但是仅仅是明白这个只是可能仅仅会使用而已,既然是开发我们肯定是要清楚地讲这个Scraper是怎么样被组织起来的,他是怎么样工作的。
  首先第一个概念就是任务队列:我们feed进的数据实际就是把任务添加到任务队列中,然后任务分配的时候,每个爬虫都要get到属于自己的任务,然后各司其职的去做,互不干扰。
  第二个类似的概念就是结果队列:结果队列毫无疑问就是用于存储结果的,在外部获取这个Scraper的结果队列以后,需要去获取结果队列中的元素,由于队列的性质,当结果被抽走的时候,被获取的结果就会被删除。
  在大家明确了这两个概念以后,这个Scraper的工作原理接回很容易被理解了:
  图片2.png
  当然这个图我在做的时候是有点小偷懒的,本来应该做两种类型的Scraper,因为实际在使用的过程中,Scraper在一开始要被指定为单页还是多页,但是为了避免大量的重复所以在作图的时候我就在最后做了一个逻辑判断来表明类型,来帮助大家理解这个解析过程。我相信一个visio流程图比长篇大论的文字解释要直观的多对吧?
  那么接下来我们看一下爬虫的实体怎么写:
  

class Scraper(object):
      def __init__(self, single_page = True,  workers_num = 8, worker_class = ScraperWorkerBase):
          self.count = 0
          self.workers_num = workers_num
          """get worker_class"""
          self.worker_class = worker_class
          """check if the workers should die"""
          self.all_dead = False
          """store the visited pages"""
          self.visited = set()
          """by ScraperWorkerBase 's extension result queue"""
          self.result_urls_queue = Queue.Queue()
          self.result_elements_queue = Queue.Queue()
          """
          if single_page == True, 
          the task_queue should store the tasks (unhandled)
          """
          self.task_queue = Queue.Queue()
          self.single_page = single_page
          if self.single_page == False:
              self.__init_workers()
          else:
              self.__init_single_worker()
      def __check_single_page(self):
          if self.single_page == True:
              raise StandardError('[!] Single page won\'t allow you use many workers')
      """init worker(s)"""
      def __init_single_worker(self):
          ret = threading.Thread(target=self._single_worker)
          ret.start()
      def __init_workers(self):
          self.__check_single_page()
          for _ in range(self.workers_num):
              ret = threading.Thread(target=self._worker)
              ret.start()
      """return results"""
      def get_result_urls_queue(self):
          return self.result_urls_queue
      def get_result_elements_queue(self):
          return self.result_elements_queue
      """woker function"""
      def _single_worker(self):
          if self.all_dead != False:
              self.all_dead = False
          scraper = None
          while not self.all_dead:
              try:
                  url = self.task_queue.get(block=True)
                  print 'Workding', url
                  try:
                      if url[:url.index('#')] in self.visited:
                          continue
                  except:
                      pass
                  if url in self.visited:
                      continue
                  else:
                      pass
                  self.count = self.count+ 1
                  print 'Having process', self.count , 'Pages'
                  scraper = self.worker_class(url)
                  self.visited.add(url)
                  urlset, result_entity = scraper.execute()
                  for i in urlset[0]:
                      #self.task_queue.put(i)
                      self.result_urls_queue.put(i)
                  if result_entity != None:
                      pass
                  else:
                      self.result_elements_queue.put(result_entity)
              except:
                  pass            
              finally:
                  pass        
      def _worker(self):
          if self.all_dead != False:
              self.all_dead = False
          scraper = None
          while not self.all_dead:
              try:
                  url = self.task_queue.get(block=True)
                  print 'Workding', url
                  try:
                      if url[:url.index('#')] in self.visited:
                          continue
                  except:
                      pass
                  if url in self.visited:
                      continue
                  else:
                      pass
                  self.count = self.count + 1
                  print 'Having process', self.count , 'Pages'
                  scraper = self.worker_class(url)
                  self.visited.add(url)
                  urlset, result_entity = scraper.execute()
                  for i in urlset[0]:
                      if i in self.visited:
                          continue
                      else:
                          pass
                      self.task_queue.put(i)
                      self.result_urls_queue.put(i)
                  if result_entity != None:
                      pass
                  else:
                      self.result_elements_queue.put(result_entity)
              except:
                  pass            
              finally:
                  pass
      """scraper interface"""
      def kill_workers(self):
          if self.all_dead == False:
              self.all_dead = True
          else:
              pass
      def feed(self, target_urls = []):
          if isinstance(target_urls, list):
              for target_url in target_urls:
                  self.task_queue.put(target_url)
          elif isinstance(target_urls, str):
              self.task_queue.put(target_urls)
          else:
              pass
          #return url result
          return (self.get_result_urls_queue(), self.get_result_elements_queue() )


  这些设定很好理解,在__init__中输入是否是单页面爬虫模式,设定线程数,设定爬虫解析的具体类。然后对应初始化线程池:初始化的时候要生成多个_worker方法,循环工作,然后在_worker方法的工作时完成对传入的实际进行解析的ScraperWorkerBase类进行调用,然后收集结果填入任务队列。
  通过feed的方法来添加目标url,可以输入list,也可以直接输入str对象。
  当不想让Scraper再工作的时候,调用kill_workers就可以停止所有的worker线程。
0×04 使用实例   下面是几个相对完整的使用实例:
  单页面爬虫使用实例
  

#encoding:utf-8
  from scraper import *
  import Queue
  import time
  import sys
  import bs4
  test_obj = Scraper(single_page=True, workers_num=15)
  test_obj.feed(['http://freebuf.com'])
  time.sleep(5)
  z = test_obj.get_result_urls_queue()
  while True:
      try :
          print z.get(timeout=4)
      except:
          pass
  线程池爬虫实例:
  寻找一个网站下所有的url
  #encoding:utf-8
  from scraper import *
  import Queue
  import time
  import sys
  import bs4
  test_obj = Scraper(single_page=False, workers_num=15)
  test_obj.feed(['http://freebuf.com'])
  time.sleep(5)
  z = test_obj.get_result_urls_queue()
  while True:
      try :
          print z.get(timeout=4)
      except:
          pass


  我们发现和上面的单页面爬虫只是一个参数的区别。实际的效果还是不错的。
  下面是自定义爬取方案的应用
  11.png
  这样的示例代码基本把这个爬虫的目的和接口完整的展示出来了,用户可以在MyWorker中定义自己的处理函数。
0×05 测试使用   在实际的使用中,这个小型爬虫的效果还是相当不错的,灵活,简单,可扩展性高。有兴趣的朋友可以给它配置更多的功能型组件,比如数据库,爬取特定关键元素,针对某一个页面的数据处理。比如在实际的使用中,这个模块作为我自己正在编写的一个xss_fuzz工具的一个部分而存在。
  下面给出一些测试数据供大家参考(在普通网络状况):
  8.png
  这个结果是在本机上测试的结果,在不同的电脑商测试结果均不同,8线程是比较小的线程数目,有兴趣的朋友可以采用16线程或者是更多的线程测试,效果可能更加明显,如果为了防止页面卡死,可以在worker中设置超时时间,一旦有那个页面一时间很难打开也能很快转换到新的页面,同样也能提高效率。
0×06 结语   关于爬虫的开发,我相信到现在,大家都已经没有什么问题了,如果要问网站爬行时候什么的页面权重怎么处理,简单无非是在爬虫过程中计算某个页面被多少页面所指(当然这个算法没有这么简单),并不是什么很高深的技术,如果有兴趣的小伙伴仍然可以去深入学习,大家都知道搜索引擎的核心也是爬虫技术。
  项目Github主页:https://github.com/VillanCh/simple_scraper
  * 作者:VillanCh,本文属FreeBuf原创奖励计划文章,未经许可禁止转载
       

上一篇:第一页
下一篇:Python 异常处理