多进程+协程 (multiprocessing + gevent)

前面讲了为什么Python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。

不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。

举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?

例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):

1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的

2、单进程+多线程:例如我们在这个进程中开了10个线程,比1中能够提升10倍速度,也就是约4.63天能够完成200W条抓取。
请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,
10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,
所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),
但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)

3、多进程+多线程:这里就厉害了,一般来说很多人就是使用这个方法,多进程下,每个进程都能占一个cpu,
而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,
每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,
主要是cpu切换200W个线程的消耗肯定比切换20W个进程大,考虑到这部分开销,所以是10倍以上)。

还有更好的方法吗?答案是肯定的,它就是:

4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)

what:

协程可以视作一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。
协程调度切换时,将寄存器上下文和栈保存到其他地方,
在切回来的时候,恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态。

每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。

why:

目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),
而与协程相关的是协作式多任务。

不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,
然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,
而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。
因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,
这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

how:

python里面怎么使用协程?答案是使用gevent,使用方法可自行搜索:python gevent库
使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。
所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)
多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,
这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。


所以,如何通过多进程+协程 做到快速抓取200W条url呢?

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()
import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在这里抓取页面
#在这里做解析之类的其他操作
except Exception,e:
print e
return ''
def process_start(url_list):
tasks = []
for url in url_list:
tasks.append(gevent.spawn(fetch,url))
gevent.joinall(tasks)#使用协程来执行
def task_start(filepath,count = 100000):#每10W条url启动一个进程
with open(filepath,'r') as reader:#从给定的文件中读取url
url_list = []#这个list用于存放协程任务
url = reader.readline().strip()
while url!='':
url_list.append(url)#每次读取出url,将url添加到队列
if len(url_list) == count:#一定数量的url就启动一个进程并执行
p = Process(target=process_start,args=(url_list,))
p.start()
url_list = [] #重置url队列
url = reader.readline().strip()
if url_list not []:#若退出循环后任务队列里还有url剩余
p = Process(target=process_start,args=(url_list,))#把剩余的url都放到最后这个进程来执行
p.start()
if __name__ == '__main__':
task_start('./testData.txt')#读取指定文件,这个文件里是多个要抓取的目标url,每行一个