xfeng

xfeng

Sporting | Reading | Technology | Recording
github
bilibili

Pythonのマルチスレッド

image

1. マルチスレッドモジュール#

Python では、主に threading モジュールがスレッドのサポートを提供します。

threading モジュール#

threading モジュールの一般的な関数

  • threading.current_thread (): 現在のスレッドオブジェクトを返します。
  • threading.enumerate (): 実行中のスレッドを含むリストを返します。実行中とは、スレッドが開始された後、終了する前を指し、開始前や終了後のスレッドは含まれません。
  • threading.active_count (): 実行中のスレッドの数を返します。len (threading.enumerate ()) と同じ結果です。

Thread クラス#

threading.Thread () を使用してスレッドオブジェクトを作成します。

主なパラメータ:

threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • group デフォルトは None で、将来的に ThreadGroup クラスを実装するために保持されています。
  • target run () メソッドで呼び出される呼び出し可能オブジェクトです。デフォルトは None で、何も呼び出す必要がないことを示します。
  • name スレッドの名前です。デフォルトでは、"Thread-N" 形式で一意の名前が構成され、N は小さな 10 進数です。
  • args 目標関数を呼び出すための引数のタプルです。デフォルトは () です。
  • kwargs 目標関数を呼び出すためのキーワード引数の辞書です。デフォルトは {} です。
  • daemon スレッドがデーモンスレッドかどうかを示します。

Thread クラスの一般的なメソッドと属性

  • run(): スレッドの活動を示すメソッドです。
  • start(): スレッドの活動を開始します。
  • join(*timeout=None*): スレッドが終了するまで待機します。これは、スレッドの join () メソッドが呼び出されて終了するまで、またはオプションのタイムアウトが発生するまで、呼び出しスレッドをブロックします。
  • isAlive(): スレッドが活動中かどうかを返します。
  • getName(): スレッド名を返します。
  • setName(): スレッド名を設定します。
  • name:スレッドオブジェクトの名前
  • setDaemon():デーモンスレッドかどうかを設定します。

2. GIL ロック#

GIL、グローバルインタプリタロック(Global Interpreter Lock)は、CPython インタプリタ特有のもので、1 つのプロセス内で同時に CPU によって呼び出されるスレッドは 1 つだけです。

image

プログラムがコンピュータのマルチコアの利点を利用して、CPU が同時にいくつかのタスクを処理する場合は、マルチプロセス開発(リソースのオーバーヘッドが大きくても)を使用するのが適しています。

image

プログラムがコンピュータのマルチコアの利点を利用しない場合は、マルチスレッド開発が適しています。

image

一般的なプログラム開発では、計算操作は CPU のマルチコアの利点を利用する必要があり、IO 操作は CPU のマルチコアの利点を利用する必要がないため、次のようなことが言えます:

  • 計算集約型はマルチプロセスを使用します。例:大量のデータ計算【累積計算の例】。
  • IO 集約型はマルチスレッドを使用します。例:ファイルの読み書き、ネットワークデータ転送【TikTok 動画のダウンロード例】。

直列処理:

import time

start = time.time()

result = 0
for i in range(100000000):
    result += i
print(result)

end = time.time()

print("時間:", end - start)

マルチスレッド処理:

import time
import multiprocessing

def task(start, end, queue):
    result = 0
    for i in range(start, end):
        result += i
    queue.put(result)

if __name__ == '__main__':
    queue = multiprocessing.Queue()

    start_time = time.time()

    p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
    p2.start()

    v1 = queue.get(block=True) #ブロック
    v2 = queue.get(block=True) #ブロック
    print(v1 + v2)

    end_time = time.time()

    print("時間:", end_time - start_time)

マルチスレッドとマルチプロセスの組み合わせ使用:

import multiprocessing
import threading


def thread_task():
    pass


def task(start, end):
    t1 = threading.Thread(target=thread_task)
    t1.start()

    t2 = threading.Thread(target=thread_task)
    t2.start()

    t3 = threading.Thread(target=thread_task)
    t3.start()


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task, args=(0, 50000000))
    p1.start()

    p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
    p2.start()

3. マルチスレッド開発#

import threading

def task(arg):
	pass


# Threadオブジェクト(スレッド)を作成し、CPUスケジュール時に実行すべきタスクと関連パラメータをカプセル化します。
t = threading.Thread(target=task,args=('xxx',))
# スレッドが準備完了(CPUスケジュール待機)、コードは続行します。
t.start()

print("続行中...") # メインスレッドがすべてのコードを実行し終わるまで終了しません(サブスレッドを待機)

スレッドの一般的なメソッド#

  • t.start()、現在のスレッドが準備完了(CPU スケジュール待機、具体的な時間は CPU によって決定されます)
import threading

loop = 10000000
number = 0

def _add(count):
    global number
    for i in range(count):
        number += 1

t = threading.Thread(target=_add,args=(loop,))
t.start()

print(number)
  • t.join()、現在のスレッドのタスクが完了するまで待機してから続行します
import threading

number = 0

def _add():
    global number
    for i in range(10000000):
        number += 1

t = threading.Thread(target=_add)
t.start()

t.join() # メインスレッドが待機中...

print(number)
import threading

number = 0


def _add():
    global number
    for i in range(10000000):
        number += 1


def _sub():
    global number
    for i in range(10000000):
        number -= 1


t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join()  # t1スレッドが完了するまで、次に進みません
t2.start()
t2.join()  # t2スレッドが完了するまで、次に進みません

print(number)
import threading

loop = 10000000
number = 0


def _add(count):
    global number
    for i in range(count):
        number += 1


def _sub(count):
    global number
    for i in range(count):
        number -= 1


t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()

t1.join()  # t1スレッドが完了するまで、次に進みません
t2.join()  # t2スレッドが完了するまで、次に進みません

print(number)
  • t.setDaemon(ブール値) 、デーモンスレッド(start の前に設定する必要があります)
    • t.setDaemon(True)、デーモンスレッドに設定し、メインスレッドが完了すると、サブスレッドも自動的に終了します。
    • t.setDaemon(False)、非デーモンスレッドに設定し、メインスレッドがサブスレッドを待機し、サブスレッドが完了した後にメインスレッドが終了します。(デフォルト)
import threading
import time

def task(arg):
    time.sleep(5)
    print('タスク')

t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()

print('END')
  • スレッド名の設定と取得
import threading


def task(arg):
    # 現在このコードを実行しているスレッドを取得
    name = threading.current_thread().getName()
    print(name)


for i in range(10):
    t = threading.Thread(target=task, args=(11,))
    t.setName('スレッド-{}'.format(i))
    t.start()
  • カスタムスレッドクラス、スレッドが行うべきことを run メソッドに直接書きます
import threading

class MyThread(threading.Thread):
    def run(self):
        print('このスレッドを実行中', self._args)


t = MyThread(args=(100,))
t.start()

4. スレッドロック#

複数のスレッドがグローバル変数を操作する場合、1 つのスレッドがグローバル変数を操作するのに複数のステップを要する場合、最終結果を得る前にそのスレッドが CPU から撤去され、他のスレッドが CPU を使用し続けると、最終的にグローバル変数のデータが混乱することがあります。

import time, threading

balance = 0

def change_it(n):
    # 先に入金し、後に引き出す。結果は0であるべきです:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(1000):
        change_it(n)

#ここで100回繰り返し実験します
for i in range(100):
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)
import time, threading

balance = 0
lock = threading.Lock()

def change_it(n):
    # 先に入金し、後に引き出す。結果は0であるべきです:
    global balance
    #ロックを取得し、スレッドの同期を行います
    lock.acquire()
    balance = balance + n
    balance = balance - n
    # ロックを解放し、次のスレッドを開始します
    lock.release()

def run_thread(n):
    for i in range(1000):
        change_it(n)

#ここで100回繰り返し実験します
for i in range(100):
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

プログラム内で手動でロックを加えたい場合、一般的に 2 つの方法があります:Lock と RLock

  • Lock、同期ロック。(ネストされたロックはサポートされていません)
import threading

num = 0
lock_object = threading.Lock()


def task():
    print("開始")
    lock_object.acquire()  # 最初に到達したスレッドが入ってロックし、他のスレッドはここで待機する必要があります。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # スレッドが出て、ロックを解放し、他のスレッドが入って実行できるようになります
    
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

  • RLock、再帰ロック。(ネストされたロックをサポートします)
import threading

num = 0
lock_object = threading.RLock()


def task():
    print("開始")
    lock_object.acquire()  # 最初に到達したスレッドが入ってロックし、他のスレッドはここで待機する必要があります。
    global num
    for i in range(1000000):
        num += 1
    lock_object.release()  # スレッドが出て、ロックを解放し、他のスレッドが入って実行できるようになります
    print(num)


for i in range(2):
    t = threading.Thread(target=task)
    t.start()

RLock は複数回のロック取得と解放をサポートしますが、Lock はサポートしません。

import threading
lock = threading.RLock()

# プログラマーAが関数を開発し、他の開発者が呼び出すことができ、内部でロックを使用してデータの安全性を保証します。
def func():
	with lock:
		pass
        
# プログラマーBが関数を開発し、この関数を直接呼び出すことができます。
def run():
    print("他の機能")
    func() # プログラマーAが書いたfunc関数を呼び出し、内部でロックを使用します。
    print("他の機能")
    
# プログラマーCが関数を開発し、自分でロックを加え、同時にfunc関数を呼び出す必要があります。
def process():
    with lock:
		print("他の機能")
        func() # ----------------> この時、複数回のロックが発生し、RLockのみがサポートします(Lockはサポートしません)。

5. キュー(Queue)#

キューはイベント同期と条件同期の簡易版と見なすことができ、スレッド A はキューにデータを格納し、スレッド B はキューからデータを取得することができ、同期やロックの問題を気にする必要がありません。主なメソッドは次の通りです:

  • put (): キューに項目を追加します
  • get (): キューから項目を削除して返します
  • task_done (): 特定のタスクが完了したときに呼び出し、前のタスクが完了したことを示します
  • join (): すべての項目が処理されるまでブロックします
import threading
import queue
import random
import time

class Producer(threading.Thread):

    def __init__(self, queue):
        super(Producer,self).__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = random.randint(0, 1000)
            self.queue.put(integer)
            print('%d をキューに追加しました %s' % (integer, self.name))
            time.sleep(1)

class Consumer(threading.Thread):

    def __init__(self, queue):
        super(Consumer, self).__init__()
        self.queue = queue

    def run(self):
        while True:
            integer = self.queue.get()
            print('%d をリストから取り出しました %s' % (integer, self.name))
            self.queue.task_done()

if __name__ == '__main__':

    queue = queue.Queue()
    t1 = Producer(queue)
    t2 = Consumer(queue)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。