1. 多线程模块#
python 中主要用 threading 模块提供对线程的支持
threading 模块#
threading 模块常用函数
- threading.current_thread (): 返回当前的线程对象。
- threading.enumerate (): 返回一个包含正在运行的线程的 list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
- threading.active_count (): 返回正在运行的线程数量,与 len (threading.enumerate ()) 有相同的结果。
Thread 类#
通过 threading.Thread () 创建线程对象
主要参数:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group 默认为 None,为了日后扩展 ThreadGroup 类实现而保留。
- target 是用于 run () 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
- name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数
- args 是用于调用目标函数的参数元组。默认是 ()
- kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
- daemon表示线程是不是守护线程。
Thread 类常用方法与属性
- run(): 用以表示线程活动的方法。
- start(): 启动线程活动。
- join(*timeout=None*): 等待至线程中止。这阻塞调用线程直至线程的 join () 方法被调用中止 - 正常退出或者抛出未处理的异常 - 或者是可选的超时发生。
- isAlive(): 返回线程是否活动的。
- getName(): 返回线程名。
- setName(): 设置线程名。
- name:线程对象名字
- setDaemon():设置是否为守护线程
2. GIL 锁#
GIL, 全局解释器锁(Global Interpreter Lock),是 CPython 解释器特有一个玩意,让一个进程中在同一个时刻只能有一个线程可以被 CPU 调用。
如果程序想利用 计算机的多核优势,让 CPU 同时处理一些任务,适合用多进程开发(即使资源开销大)。
如果程序不利用 计算机的多核优势,适合用多线程开发。
常见的程序开发中,计算操作需要使用 CPU 多核优势,IO 操作不需要利用 CPU 的多核优势,所以,就有这一句话:
- 计算密集型,用多进程,例如:大量的数据计算【累加计算示例】。
- IO 密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。
串行处理:
import time
start = time.time()
result = 0
for i in range(100000000):
result += i
print(result)
end = time.time()
print("耗时:", end - start)
多线程处理:
import time
import multiprocessing
def task(start, end, queue):
result = 0
for i in range(start, end):
result += i
queue.put(result)
if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()
p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
p2.start()
v1 = queue.get(block=True) #阻塞
v2 = queue.get(block=True) #阻塞
print(v1 + v2)
end_time = time.time()
print("耗时:", end_time - start_time)
多线程与多进程结合使用:
import multiprocessing
import threading
def thread_task():
pass
def task(start, end):
t1 = threading.Thread(target=thread_task)
t1.start()
t2 = threading.Thread(target=thread_task)
t2.start()
t3 = threading.Thread(target=thread_task)
t3.start()
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task, args=(0, 50000000))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
p2.start()
3. 多线程开发#
import threading
def task(arg):
pass
# 创建一个Thread对象(线程),并封装线程被CPU调度时应该执行的任务和相关参数。
t = threading.Thread(target=task,args=('xxx',))
# 线程准备就绪(等待CPU调度),代码继续向下执行。
t.start()
print("继续执行...") # 主线程执行完所有代码,不结束(等待子线程)
线程的常用方法#
t.start()
,当前线程准备就绪(等待 CPU 调度,具体时间是由 CPU 来决定)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)
t.join()
,等待当前线程的任务执行完毕后再向下继续执行
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
t.join() # 主线程等待中...
print(number)
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.start()
t2.join() # t2线程执行完毕,才继续往后走
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)
t.setDaemon(布尔值)
,守护线程(必须放在 start 之前)t.setDaemon(True)
,设置为守护线程,主线程执行完毕后,子线程也自动关闭。t.setDaemon(False)
,设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)
import threading
import time
def task(arg):
time.sleep(5)
print('任务')
t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()
print('END')
- 线程名称的设置和获取
import threading
def task(arg):
# 获取当前执行此代码的线程
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('日魔-{}'.format(i))
t.start()
- 自定义线程类,直接将线程需要做的事写到 run 方法中
import threading
class MyThread(threading.Thread):
def run(self):
print('执行此线程', self._args)
t = MyThread(args=(100,))
t.start()
4. 线程锁#
多个线程操作全局变量的时候,如果一个线程对全局变量操作分几个步骤,当还没有得到最后结果时,这个线程就被撤下 CPU,使用其他线程继续上 CPU 操作,最后就会搞乱全局变量数据
import time, threading
balance = 0
def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(1000):
change_it(n)
#这里重复实验100次
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
# 先存后取,结果应该为0:
global balance
#获取锁,用于线程同步
lock.acquire()
balance = balance + n
balance = balance - n
# 释放锁,开启下一个线程
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
#这里重复实验100次
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
在程序中如果想要自己手动加锁,一般有两种:Lock 和 RLock
- Lock,同步锁。(不支持锁的嵌套)
import threading
num = 0
lock_object = threading.Lock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
- RLock,递归锁。(支持锁的嵌套)
import threading
num = 0
lock_object = threading.RLock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
RLock 支持多次申请锁和多次释放;Lock 不支持
import threading
lock = threading.RLock()
# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():
with lock:
pass
# 程序员B开发了一个函数,可以直接调用这个函数。
def run():
print("其他功能")
func() # 调用程序员A写的func函数,内部用到了锁。
print("其他功能")
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():
with lock:
print("其他功能")
func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。
print("其他功能")
5. 队列(Queue)#
队列可以看作事件同步与条件同步的简单版,线程 A 可以往队列里面存数据,线程 B 则可以从队列里面取,不用关心同步以及锁的问题,主要方法有:
- put (): 向队列中添加一个项
- get (): 从队列中删除并返回一个项
- task_done (): 当某一项任务完成时调用,表示前面排队的任务已经被完成
- join (): 阻塞直到所有的项目都被处理完
import threading
import queue
import random
import time
class Producer(threading.Thread):
def __init__(self, queue):
super(Producer,self).__init__()
self.queue = queue
def run(self):
while True:
integer = random.randint(0, 1000)
self.queue.put(integer)
print('%d put to queue by %s' % (integer, self.name))
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
super(Consumer, self).__init__()
self.queue = queue
def run(self):
while True:
integer = self.queue.get()
print('%d popped from list by %s' % (integer, self.name))
self.queue.task_done()
if __name__ == '__main__':
queue = queue.Queue()
t1 = Producer(queue)
t2 = Consumer(queue)
t1.start()
t2.start()
t1.join()
t2.join()