xfeng

xfeng

Sporting | Reading | Technology | Recording
github
bilibili

Python多线程

image

1. 多线程模块#

python 中主要用 threading 模块提供对线程的支持

threading 模块#

threading 模块常用函数

  • threading.current_thread (): 返回当前的线程对象。
  • threading.enumerate (): 返回一个包含正在运行的线程的 list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.active_count (): 返回正在运行的线程数量,与 len (threading.enumerate ()) 有相同的结果。

Thread 类#

通过 threading.Thread () 创建线程对象

主要参数:

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • group 默认为 None,为了日后扩展 ThreadGroup 类实现而保留。
  • target 是用于 run () 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
  • name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数
  • args 是用于调用目标函数的参数元组。默认是 ()
  • kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
  • daemon表示线程是不是守护线程。

Thread 类常用方法与属性

  • run(): 用以表示线程活动的方法。
  • start(): 启动线程活动。
  • join(*timeout=None*): 等待至线程中止。这阻塞调用线程直至线程的 join () 方法被调用中止 - 正常退出或者抛出未处理的异常 - 或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。
  • name:线程对象名字
  • setDaemon():设置是否为守护线程

2. GIL 锁#

GIL, 全局解释器锁(Global Interpreter Lock),是 CPython 解释器特有一个玩意,让一个进程中在同一个时刻只能有一个线程可以被 CPU 调用

image

如果程序想利用 计算机的多核优势,让 CPU 同时处理一些任务,适合用多进程开发(即使资源开销大)。

image

如果程序不利用 计算机的多核优势,适合用多线程开发。

image

常见的程序开发中,计算操作需要使用 CPU 多核优势,IO 操作不需要利用 CPU 的多核优势,所以,就有这一句话:

  • 计算密集型,用多进程,例如:大量的数据计算【累加计算示例】。
  • IO 密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。

串行处理:

import time

start = time.time()

result = 0
for i in range(100000000):
    result += i
print(result)

end = time.time()

print("耗时:", end - start)

多线程处理:

import time
import multiprocessing

def task(start, end, queue):
    result = 0
    for i in range(start, end):
        result += i
    queue.put(result)

if __name__ == '__main__':
    queue = multiprocessing.Queue()

    start_time = time.time()

    p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
    p2.start()

    v1 = queue.get(block=True) #阻塞
    v2 = queue.get(block=True) #阻塞
    print(v1 + v2)

    end_time = time.time()

    print("耗时:", end_time - start_time)

多线程与多进程结合使用:

import multiprocessing
import threading


def thread_task():
    pass


def task(start, end):
    t1 = threading.Thread(target=thread_task)
    t1.start()

    t2 = threading.Thread(target=thread_task)
    t2.start()

    t3 = threading.Thread(target=thread_task)
    t3.start()


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task, args=(0, 50000000))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
    p2.start()

3. 多线程开发#

import threading

def task(arg):
	pass


# 创建一个Thread对象(线程),并封装线程被CPU调度时应该执行的任务和相关参数。
t = threading.Thread(target=task,args=('xxx',))
# 线程准备就绪(等待CPU调度),代码继续向下执行。
t.start()

print("继续执行...") # 主线程执行完所有代码,不结束(等待子线程)

线程的常用方法#

  • t.start(),当前线程准备就绪(等待 CPU 调度,具体时间是由 CPU 来决定)
import threading

loop = 10000000
number = 0

def _add(count):
    global number
    for i in range(count):
        number += 1

t = threading.Thread(target=_add,args=(loop,))
t.start()

print(number)
  • t.join(),等待当前线程的任务执行完毕后再向下继续执行
import threading

number = 0

def _add():
    global number
    for i in range(10000000):
        number += 1

t = threading.Thread(target=_add)
t.start()

t.join() # 主线程等待中...

print(number)
import threading

number = 0


def _add():
    global number
    for i in range(10000000):
        number += 1


def _sub():
    global number
    for i in range(10000000):
        number -= 1


t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join()  # t1线程执行完毕,才继续往后走
t2.start()
t2.join()  # t2线程执行完毕,才继续往后走

print(number)
import threading

loop = 10000000
number = 0


def _add(count):
    global number
    for i in range(count):
        number += 1


def _sub(count):
    global number
    for i in range(count):
        number -= 1


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1线程执行完毕,才继续往后走
t2.join()  # t2线程执行完毕,才继续往后走

print(number)
  • t.setDaemon(布尔值) ,守护线程(必须放在 start 之前)
    • t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭。
    • t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)
import threading
import time

def task(arg):
    time.sleep(5)
    print('任务')

t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()

print('END')
  • 线程名称的设置和获取
import threading


def task(arg):
    # 获取当前执行此代码的线程
    name = threading.current_thread().getName()
    print(name)


for i in range(10):
    t = threading.Thread(target=task, args=(11,))
    t.setName('日魔-{}'.format(i))
    t.start()
  • 自定义线程类,直接将线程需要做的事写到 run 方法中
import threading

class MyThread(threading.Thread):
    def run(self):
        print('执行此线程', self._args)


t = MyThread(args=(100,))
t.start()

4. 线程锁#

多个线程操作全局变量的时候,如果一个线程对全局变量操作分几个步骤,当还没有得到最后结果时,这个线程就被撤下 CPU,使用其他线程继续上 CPU 操作,最后就会搞乱全局变量数据

import time, threading

balance = 0

def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(1000):
        change_it(n)

#这里重复实验100次
for i in range(100):
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)
import time, threading

balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    #获取锁,用于线程同步
    lock.acquire()
    balance = balance + n
    balance = balance - n
    # 释放锁,开启下一个线程
    lock.release()

def run_thread(n):
    for i in range(1000):
        change_it(n)

#这里重复实验100次
for i in range(100):
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

在程序中如果想要自己手动加锁,一般有两种:Lock 和 RLock

  • Lock,同步锁。(不支持锁的嵌套)
import threading

num = 0
lock_object = threading.Lock()


def task():
    print("开始")
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了
    
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

  • RLock,递归锁。(支持锁的嵌套)
import threading

num = 0
lock_object = threading.RLock()


def task():
    print("开始")
    lock_object.acquire()  # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 线程出去,并解开锁,其他线程就可以进入并执行了
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

RLock 支持多次申请锁和多次释放;Lock 不支持

import threading
lock = threading.RLock()

# 程序员A开发了一个函数,函数可以被其他开发者调用,内部需要基于锁保证数据安全。
def func():
	with lock:
		pass
        
# 程序员B开发了一个函数,可以直接调用这个函数。
def run():
    print("其他功能")
    func() # 调用程序员A写的func函数,内部用到了锁。
    print("其他功能")
    
# 程序员C开发了一个函数,自己需要加锁,同时也需要调用func函数。
def process():
    with lock:
		print("其他功能")
        func() # ----------------> 此时就会出现多次锁的情况,只有RLock支持(Lock不支持)。
		print("其他功能")

5. 队列(Queue)#

队列可以看作事件同步与条件同步的简单版,线程 A 可以往队列里面存数据,线程 B 则可以从队列里面取,不用关心同步以及锁的问题,主要方法有:

  • put (): 向队列中添加一个项
  • get (): 从队列中删除并返回一个项
  • task_done (): 当某一项任务完成时调用,表示前面排队的任务已经被完成
  • join (): 阻塞直到所有的项目都被处理完
import threading
import queue
import random
import time

class Producer(threading.Thread):

    def __init__(self, queue):
        super(Producer,self).__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = random.randint(0, 1000)
            self.queue.put(integer)
            print('%d put to queue by %s' % (integer, self.name))
            time.sleep(1)

class Consumer(threading.Thread):

    def __init__(self, queue):
        super(Consumer, self).__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = self.queue.get()
            print('%d popped from list by %s' % (integer, self.name))
            self.queue.task_done()

if __name__ == '__main__':

    queue = queue.Queue()
    t1 = Producer(queue)
    t2 = Consumer(queue)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。