xfeng

xfeng

Sporting | Reading | Technology | Recording
github
bilibili

Python多執行緒

image

1. 多線程模塊#

python 中主要用 threading 模塊提供對線程的支持

threading 模塊#

threading 模塊常用函數

  • threading.current_thread (): 返回當前的線程對象。
  • threading.enumerate (): 返回一個包含正在運行的線程的 list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
  • threading.active_count (): 返回正在運行的線程數量,與 len (threading.enumerate ()) 有相同的結果。

Thread 類#

通過 threading.Thread () 創建線程對象

主要參數:

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • group 默認為 None,為了日後擴展 ThreadGroup 類實現而保留。
  • target 是用於 run () 方法調用的可調用對象。默認是 None,表示不需要調用任何方法。
  • name 是線程名稱。默認情況下,由 "Thread-N" 格式構成一個唯一的名稱,其中 N 是小的十進制數
  • args 是用於調用目標函數的參數元組。默認是 ()
  • kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}。
  • daemon表示線程是不是守護線程。

Thread 類常用方法與屬性

  • run(): 用以表示線程活動的方法。
  • start(): 啟動線程活動。
  • join(*timeout=None*): 等待至線程中止。這阻塞調用線程直至線程的 join () 方法被調用中止 - 正常退出或者拋出未處理的異常 - 或者是可選的超時發生。
  • isAlive(): 返回線程是否活動的。
  • getName(): 返回線程名。
  • setName(): 設置線程名。
  • name:線程對象名字
  • setDaemon():設置是否為守護線程

2. GIL 鎖#

GIL, 全局解釋器鎖(Global Interpreter Lock),是 CPython 解釋器特有一個玩意,讓一個進程中在同一個時刻只能有一個線程可以被 CPU 調用

image

如果程序想利用 計算機的多核優勢,讓 CPU 同時處理一些任務,適合用多進程開發(即使資源開銷大)。

image

如果程序不利用 計算機的多核優勢,適合用多線程開發。

image

常見的程序開發中,計算操作需要使用 CPU 多核優勢,IO 操作不需要利用 CPU 的多核優勢,所以,就有這一句話:

  • 計算密集型,用多進程,例如:大量的數據計算【累加計算示例】。
  • IO 密集型,用多線程,例如:文件讀寫、網絡數據傳輸【下載抖音視頻示例】。

串行處理:

import time

start = time.time()

result = 0
for i in range(100000000):
    result += i
print(result)

end = time.time()

print("耗時:", end - start)

多線程處理:

import time
import multiprocessing

def task(start, end, queue):
    result = 0
    for i in range(start, end):
        result += i
    queue.put(result)

if __name__ == '__main__':
    queue = multiprocessing.Queue()

    start_time = time.time()

    p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
    p2.start()

    v1 = queue.get(block=True) #阻塞
    v2 = queue.get(block=True) #阻塞
    print(v1 + v2)

    end_time = time.time()

    print("耗時:", end_time - start_time)

多線程與多進程結合使用:

import multiprocessing
import threading


def thread_task():
    pass


def task(start, end):
    t1 = threading.Thread(target=thread_task)
    t1.start()

    t2 = threading.Thread(target=thread_task)
    t2.start()

    t3 = threading.Thread(target=thread_task)
    t3.start()


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task, args=(0, 50000000))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
    p2.start()

3. 多線程開發#

import threading

def task(arg):
	pass


# 創建一個Thread對象(線程),並封裝線程被CPU調度時應該執行的任務和相關參數。
t = threading.Thread(target=task,args=('xxx',))
# 線程準備就緒(等待CPU調度),代碼繼續向下執行。
t.start()

print("繼續執行...") # 主線程執行完所有代碼,不結束(等待子線程)

線程的常用方法#

  • t.start(),當前線程準備就緒(等待 CPU 調度,具體時間是由 CPU 來決定)
import threading

loop = 10000000
number = 0

def _add(count):
    global number
    for i in range(count):
        number += 1

t = threading.Thread(target=_add,args=(loop,))
t.start()

print(number)
  • t.join(),等待當前線程的任務執行完畢後再向下繼續執行
import threading

number = 0

def _add():
    global number
    for i in range(10000000):
        number += 1

t = threading.Thread(target=_add)
t.start()

t.join() # 主線程等待中...

print(number)
import threading

number = 0


def _add():
    global number
    for i in range(10000000):
        number += 1


def _sub():
    global number
    for i in range(10000000):
        number -= 1


t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join()  # t1線程執行完畢,才繼續往後走
t2.start()
t2.join()  # t2線程執行完畢,才繼續往後走

print(number)
import threading

loop = 10000000
number = 0


def _add(count):
    global number
    for i in range(count):
        number += 1


def _sub(count):
    global number
    for i in range(count):
        number -= 1


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1線程執行完畢,才繼續往後走
t2.join()  # t2線程執行完畢,才繼續往後走

print(number)
  • t.setDaemon(布爾值) ,守護線程(必須放在 start 之前)
    • t.setDaemon(True),設置為守護線程,主線程執行完畢後,子線程也自動關閉。
    • t.setDaemon(False),設置為非守護線程,主線程等待子線程,子線程執行完畢後,主線程才結束。(默認)
import threading
import time

def task(arg):
    time.sleep(5)
    print('任務')

t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()

print('END')
  • 線程名稱的設置和獲取
import threading


def task(arg):
    # 獲取當前執行此代碼的線程
    name = threading.current_thread().getName()
    print(name)


for i in range(10):
    t = threading.Thread(target=task, args=(11,))
    t.setName('日魔-{}'.format(i))
    t.start()
  • 自定義線程類,直接將線程需要做的事寫到 run 方法中
import threading

class MyThread(threading.Thread):
    def run(self):
        print('執行此線程', self._args)


t = MyThread(args=(100,))
t.start()

4. 線程鎖#

多個線程操作全局變量的時候,如果一個線程對全局變量操作分幾個步驟,當還沒有得到最後結果時,這個線程就被撤下 CPU,使用其他線程繼續上 CPU 操作,最後就會搞亂全局變量數據

import time, threading

balance = 0

def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(1000):
        change_it(n)

#這裡重複實驗100次
for i in range(100):
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)
import time, threading

balance = 0
lock = threading.Lock()

def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    #獲取鎖,用於線程同步
    lock.acquire()
    balance = balance + n
    balance = balance - n
    # 釋放鎖,開啟下一個線程
    lock.release()

def run_thread(n):
    for i in range(1000):
        change_it(n)

#這裡重複實驗100次
for i in range(100):
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

在程序中如果想要自己手動加鎖,一般有兩種:Lock 和 RLock

  • Lock,同步鎖。(不支持鎖的嵌套)
import threading

num = 0
lock_object = threading.Lock()


def task():
    print("開始")
    lock_object.acquire()  # 第1個抵達的線程進入並上鎖,其他線程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 線程出去,並解開鎖,其他線程就可以進入並執行了
    
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

  • RLock,遞歸鎖。(支持鎖的嵌套)
import threading

num = 0
lock_object = threading.RLock()


def task():
    print("開始")
    lock_object.acquire()  # 第1個抵達的線程進入並上鎖,其他線程就需要再此等待。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # 線程出去,並解開鎖,其他線程就可以進入並執行了
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

RLock 支持多次申請鎖和多次釋放;Lock 不支持

import threading
lock = threading.RLock()

# 程序員A開發了一個函數,函數可以被其他開發者調用,內部需要基於鎖保證數據安全。
def func():
	with lock:
		pass
        
# 程序員B開發了一個函數,可以直接調用這個函數。
def run():
    print("其他功能")
    func() # 調用程序員A寫的func函數,內部用到了鎖。
    print("其他功能")
    
# 程序員C開發了一個函數,自己需要加鎖,同時也需要調用func函數。
def process():
    with lock:
		print("其他功能")
        func() # ----------------> 此時就會出現多次鎖的情況,只有RLock支持(Lock不支持)。

5. 隊列(Queue)#

隊列可以看作事件同步與條件同步的簡單版,線程 A 可以往隊列裡面存數據,線程 B 則可以從隊列裡面取,不用關心同步以及鎖的問題,主要方法有:

  • put (): 向隊列中添加一個項
  • get (): 從隊列中刪除並返回一個項
  • task_done (): 當某一項任務完成時調用,表示前面排隊的任務已經被完成
  • join (): 阻塞直到所有的項目都被處理完
import threading
import queue
import random
import time

class Producer(threading.Thread):

    def __init__(self, queue):
        super(Producer,self).__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = random.randint(0, 1000)
            self.queue.put(integer)
            print('%d put to queue by %s' % (integer, self.name))
            time.sleep(1)

class Consumer(threading.Thread):

    def __init__(self, queue):
        super(Consumer, self).__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = self.queue.get()
            print('%d popped from list by %s' % (integer, self.name))
            self.queue.task_done()

if __name__ == '__main__':

    queue = queue.Queue()
    t1 = Producer(queue)
    t2 = Consumer(queue)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。