map实现并行

在Python中有个两个库包含了map函数:multiprocessing和它鲜为人知的子库dummy。它是multiprocessing模块的完整克隆,唯一的不同在于multiprocessing作用于进程,而dummy模块作用于线程(因此也包括了Python所有常见的多线程限制)。所以选择使用这两个库异常容易。你可以针对IO密集型任务和CPU密集型任务来选择不同的库。

导入

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

实例化

Pool 对象有一些参数,这里我所需要关注的只是它的第一个参数:processes. 这一参数用于设定线程池中的线程数。其默认值为当前机器 CPU 的核数。

一般来说,执行 CPU 密集型任务时,调用越多的核速度就越快。但是当处理网络密集型任务时,事情有有些难以预计了,通过实验来确定线程池的大小才是明智的。

pool = ThreadPool(4) # Sets the pool size to 4

线程数过多时,切换线程所消耗的时间甚至会超过实际工作时间。对于不同的工作,通过尝试来找到线程池大小的最优值。
创建好 Pool 对象后,并行化的程序

多线程例子

import time
import urllib.request
from multiprocessing.dummy import Pool as ThreadPool

urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
# etc..
]
pool_num = 4
start_time = time.time()
# 实例化线程池,并规定线程池大小
pool = ThreadPool(pool_num)
# 打开每个网站在线程中
results = pool.map(urllib.request.urlopen, urls)
pool.close()
pool.join() # 等待进程结束
end_time = time.time()
print("pool=%s总用时:" % pool_num, end_time - start_time, "s")

为了找到最优的线程池大小,通过修改pool_num大小得到的总时间对比,当然这种方法比较笨,需要每次修改,完全可以写进循环for i in range(min=0,max=8):,再将 i 赋值给pool_num
,当然这里就不改了,也比较简单。

pool=2总用时: 7.664671182632446 s
pool=3总用时: 5.713357210159302 s
pool=4总用时: 5.139662504196167 s
pool=5总用时: 7.917045593261719 s
pool=6总用时: 23.377182006835938 s

从数据上验证了,上面的说法,并不是线程池越大,用时最短。

多进程例子

生成图片的缩略图,这是一个 CPU 密集型的任务,并且十分适合进行并行化。

import os, time
from multiprocessing import Pool
from PIL import Image

SIZE = (75, 75)
SAVE_DIRECTORY = 'thumbs'


def get_image_paths(folder):
return (os.path.join(folder, f)
for f in os.listdir(folder)
if 'jpg' in f)


def create_thumbnail(filename):
im = Image.open(filename)
im.thumbnail(SIZE, Image.ANTIALIAS)
base,fname = os.path.split(filename)
print(fname)
save_path = os.path.join(base, SAVE_DIRECTORY, fname)
im.save(save_path)


if __name__ == '__main__':
pool_num = 4
start_time = time.time()
folder = os.path.realpath(r'C:\Users\Liu\Pictures\最近')
try: # 查看是否已存在相同文件夹
os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
except FileExistsError:
pass
# 注意这里返回的不再是列表,而是迭代器
images = get_image_paths(folder)
# 列表化
images = list(images)
print(images)
try:
pool = Pool(pool_num)
pool.map(create_thumbnail, images)
pool.close()
pool.join()
except:
pass
end_time = time.time()
print("pool=%s总用时:" % pool_num, end_time - start_time, "s")

这里同上面的例子一样可以进行寻找最优的pool_num,这里就不找了,想要实验的可以直接把代码copy过去,改个路径,python3直接可以运行。

总结

在生产环境中,我们可以为 CPU 密集型任务和 IO 密集型任务分别选择多进程和多线程库来进一步提高执行速度——这也是解决死锁问题的良方。此外,由于 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

声明

本篇文章来自这里(点击这里查看原文),并根据python3与python2的区别对代码做了适当修改,python2的代码请参考原文。
还有一点要注意的是python3中大多数原来返回列表的已经改为返回迭代器,查看网上资料的时候,一不小心就会被坑一下。

-------------本文结束感谢您的阅读-------------