多线程

虽然Python多线程有缺陷,总被人说成是鸡肋,但也不是一无用处,它很适合用在IO密集型任务中。I/O密集型执行期间大部分是时间都用在I/O上,如数据库I/O,较少时间用在CPU计算上。因此该应用场景可以使用Python多线程,当一个任务阻塞在IO操作上时,我们可以立即切换执行其他线程上执行其他IO操作请求。

导入

import threading

Python中使用线程有两种方式:函数或者用类来包装线程对象。

基于函数线程

调用 threading 模块中的Thread()函数来产生新线程。语法如下:

threading.Thread( function, args[, kwargs] )

参数说明:

参数 描述
function 线程函数。
args 传递给线程函数的参数,他必须是个tuple类型。
kwargs 可选参数。

下面来看一个实例:

import threading, time


def test(s):
print("线程%d开始" % s)
start_time = time.time()
time.sleep(3)
print("线程%d结束" % s)
end_time = time.time()
print("线程%d花费的时间为:" % s, end_time - start_time)


if __name__ == '__main__':
threads_list = [] # 定义一个线程空列表
start = time.time() # 总的开始时间

for i in range(0, 4): # 生成四个线程实例,并将他们存到列表里
t = threading.Thread(target=test, args=(i,))
threads_list.append(t)

for t in threads_list: # 循环遍历列表,启动每一个线程
t.start()

for t in threads_list: # 循环遍历列表,等待每一个线程结束
t.join()
end = time.time() # 总的结束时间
print("总共花费的时间:", end - start)

运行结果:

线程0开始
线程1开始
线程2开始
线程3开始
线程1结束
线程1花费的时间为: 3.000976085662842
线程0结束
线程2结束
线程0花费的时间为: 3.000976085662842
线程2花费的时间为: 3.000976085662842
线程3结束
线程3花费的时间为: 3.0009756088256836
总共花费的时间: 3.0027308464050293

四个线程,每个线程单独完成的时间基本上一样,但是经过多线程的处理,总共的运行时间也才3秒多一点,可见多线程IO阻塞情况下还是可以大大提高我们的运行效率的。

基于类的线程

将上面的例子,改为基于类实现,方便对比:

class Test(threading.Thread):
def __init__(self, name=0):
# 继承父类的两种方式
# super().__init__()
threading.Thread.__init__(self)
self.name = name

def run(self):
print("线程%s开始" % self.name)
start_time = time.time()
time.sleep(3)
print("线程%s结束" % self.name)
end_time = time.time()
print("线程%s花费的时间为:" % self.name, end_time - start_time)


if __name__ == '__main__':
threads_list = [] # 定义一个线程空列表
start = time.time() # 总的开始时间
for i in range(0, 4): # 生成四个线程实例,并将他们存到列表里
t = Test(name=i)
threads_list.append(t)

for t in threads_list: # 循环遍历列表,启动每一个线程
t.start()

for t in threads_list: # 循环遍历列表,等待每一个线程结束
t.join()
end = time.time() # 总的结束时间
print("总共花费的时间:", end - start)

两者实现的效果相同,条条大路通罗马,只是走的路不同而已。

线程模块中其他方法

方法名 描述
run() 用以表示线程活动的方法。
start() 启动线程活动。
join([time]) 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
isAlive() 返回线程是否活动的。
getName() 返回线程名。
setName() 设置线程名。

线程锁

如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要保证每次只有一个线程对其修改,这就需要用到我们的线程锁

Thread 对象的 Lock 和 Rlock 可以实现这样的功能,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。

看个例子,使用十个线程对同一个全局变量进行修改:

import threading, time

global_num = 0
lock = threading.RLock() # 获得锁

def show():
lock.acquire() # 上锁
global global_num
time.sleep(1)
global_num += 1
print(global_num)
lock.release() # 解锁

if __name__ == '__main__':
for i in range(10): # 启动十个线程
t = threading.Thread(target=show)
t.start()

上面说了Lock和RLock方法都可以获得锁,那么两者的区别是什么呢?

Lock和RLock的区别

RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

Condition方法

在threading中除了Lock和RLock,还可以使用Condition(条件)方法达到想要的效果。可以把Condiftion理解为一把高级的琐,它提供了比Lock, RLock更高级的功能,允许我们能够控制复杂的线程同步问题。threadiong.Condition在内部维护一个琐对象(默认是RLock),可以在创建Condigtion对象的时候把琐对象作为参数传入。
Condition也提供了acquire, release方法,其含义与琐的acquire, release方法一致,其实它只是简单的调用内部琐对象的对应的方法而已。

Condition还提供了如下方法(注意:这些方法只有在占用琐(acquire)之后才能调用,否则将会报RuntimeError异常。):

Condition.wait([timeout]):

  wait方法释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时(如果提供了timeout参数的话)。当线程被唤醒并重新占有琐的时候,程序才会继续执行下去。

Condition.notify():

  唤醒一个挂起的线程(如果存在挂起的线程)。注意:notify()方法不会释放所占用的琐。

Condition.notifyAll():

  唤醒所有挂起的线程(如果存在挂起的线程)。注意:这些方法不会释放所占用的琐。

处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。

附上经典的消费者与生产者模型:2个生成者生产products ,而接下来的10个消费者将会消耗products。

import threading, time

condition = threading.Condition()
products = 0


class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
global condition, products
while True:
if condition.acquire():
if products < 10:
products += 1
print("Producer(%s):deliver one, now products:%s" % (self.name, products))
condition.notify()
else:
print("Producer(%s):already 10, stop deliver, now products:%s" % (self.name, products))
condition.wait();
condition.release()
time.sleep(2)


class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
global condition, products
while True:
if condition.acquire():
if products > 1:
products -= 1
print( "Consumer(%s):consume one, now products:%s" % (self.name, products))
condition.notify()
else:
print("Consumer(%s):only 1, stop consume, products:%s" % (self.name, products))
condition.wait()
condition.release()
time.sleep(2)


if __name__ == "__main__":
for p in range(0, 2):
p = Producer()
p.start()

for c in range(0, 10):
c = Consumer()
c.start()

Thread.Local方法

当不想将变量共享给其他线程时,可以使用局部变量,但在函数中定义局部变量会使得在函数之间传递特别麻烦。ThreadLocal解决了全局变量需要枷锁,局部变量传递麻烦的两个问题。通过在线程中定义:

local_school = threading.local()

此时这个local_school就变成了一个全局变量,但这个全局变量只在该线程中为全局变量,对于其他线程来说是局部变量,别的线程不可更改。

def process_thread(name):# 绑定ThreadLocal的student: local_school.student = name

这个student属性只有本线程可以修改,别的线程不可以。

local = threading.local()
def func(name):
print('current thread:%s' % threading.currentThread().name)
local.name = name
print("%s in %s" % (local.name,threading.currentThread().name))
t1 = threading.Thread(target=func,args=('haibo',))
t2 = threading.Thread(target=func,args=('lina',))
t1.start()
t2.start()
-------------本文结束感谢您的阅读-------------