1. Multithreading Module#
The main support for threads in Python is provided by the threading module.
threading Module#
Common functions in the threading module:
- threading.current_thread(): Returns the current thread object.
- threading.enumerate(): Returns a list of currently running threads. "Running" refers to threads that have started but not yet finished, excluding those that have not started or have terminated.
- threading.active_count(): Returns the number of currently running threads, which is the same as len(threading.enumerate()).
Thread Class#
Create a thread object using threading.Thread().
Main parameters:
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group Defaults to None, reserved for future extension of the ThreadGroup class.
- target The callable object to be invoked by the run() method. Defaults to None, indicating no method needs to be called.
- name The thread name. By default, a unique name is constructed in the format "Thread-N", where N is a small decimal number.
- args A tuple of arguments to pass to the target function. Defaults to ().
- kwargs A dictionary of keyword arguments to pass to the target function. Defaults to {}.
- daemon Indicates whether the thread is a daemon thread.
Common Methods and Attributes of the Thread Class
- run(): Method that represents the thread activity.
- start(): Starts the thread activity.
- join(*timeout=None*): Waits for the thread to terminate. This blocks the calling thread until the thread's join() method is called, it exits normally, or an unhandled exception is thrown, or an optional timeout occurs.
- isAlive(): Returns whether the thread is active.
- getName(): Returns the thread name.
- setName(): Sets the thread name.
- name: The name of the thread object.
- setDaemon(): Sets whether the thread is a daemon thread.
2. GIL Lock#
GIL, the Global Interpreter Lock, is a feature unique to the CPython interpreter that allows only one thread to be executed by the CPU at any given time within a process.
If a program wants to take advantage of the multi-core capabilities of a computer and allow the CPU to handle multiple tasks simultaneously, it is suitable to use multiprocessing development (even if resource overhead is high).
If a program does not utilize the multi-core capabilities of a computer, it is suitable to use multithreading development.
In common program development, computational operations need to use the CPU's multi-core advantages, while I/O operations do not need to utilize the CPU's multi-core advantages. Therefore, there is this saying:
- For compute-intensive tasks, use multiprocessing, e.g., large data calculations [cumulative calculation example].
- For I/O-intensive tasks, use multithreading, e.g., file reading and writing, network data transmission [downloading Douyin videos example].
Serial processing:
import time
start = time.time()
result = 0
for i in range(100000000):
result += i
print(result)
end = time.time()
print("Time taken:", end - start)
Multithreading processing:
import time
import multiprocessing
def task(start, end, queue):
result = 0
for i in range(start, end):
result += i
queue.put(result)
if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()
p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
p2.start()
v1 = queue.get(block=True) # blocking
v2 = queue.get(block=True) # blocking
print(v1 + v2)
end_time = time.time()
print("Time taken:", end_time - start_time)
Combining multithreading and multiprocessing:
import multiprocessing
import threading
def thread_task():
pass
def task(start, end):
t1 = threading.Thread(target=thread_task)
t1.start()
t2 = threading.Thread(target=thread_task)
t2.start()
t3 = threading.Thread(target=thread_task)
t3.start()
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task, args=(0, 50000000))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000))
p2.start()
3. Multithreading Development#
import threading
def task(arg):
pass
# Create a Thread object (thread) and encapsulate the task to be executed when the thread is scheduled by the CPU along with relevant parameters.
t = threading.Thread(target=task,args=('xxx',))
# The thread is ready (waiting for CPU scheduling), and the code continues to execute.
t.start()
print("Continuing execution...") # The main thread completes all code and does not end (waiting for the child thread)
Common Methods of Threads#
t.start()
, the current thread is ready (waiting for CPU scheduling, the specific time is determined by the CPU)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)
t.join()
, waits for the current thread's task to complete before continuing execution
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
t.join() # The main thread is waiting...
print(number)
import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1 thread completes before continuing
t2.start()
t2.join() # t2 thread completes before continuing
print(number)
import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1 thread completes before continuing
t2.join() # t2 thread completes before continuing
print(number)
t.setDaemon(boolean)
, daemon thread (must be set before start)t.setDaemon(True)
, set as a daemon thread, the child thread will automatically close after the main thread completes.t.setDaemon(False)
, set as a non-daemon thread, the main thread waits for the child thread, and the main thread only ends after the child thread completes. (default)
import threading
import time
def task(arg):
time.sleep(5)
print('Task')
t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()
print('END')
- Setting and getting thread names
import threading
def task(arg):
# Get the current thread executing this code
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('Day Demon-{}'.format(i))
t.start()
- Custom thread class, directly write the tasks to be done by the thread in the run method
import threading
class MyThread(threading.Thread):
def run(self):
print('Executing this thread', self._args)
t = MyThread(args=(100,))
t.start()
4. Thread Locks#
When multiple threads operate on global variables, if one thread performs several steps to operate on a global variable, and before it gets the final result, this thread is removed from the CPU, allowing other threads to continue using the CPU, it can mess up the global variable data.
import time, threading
balance = 0
def change_it(n):
# Deposit first then withdraw, the result should be 0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(1000):
change_it(n)
# Repeat the experiment 100 times
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
import time, threading
balance = 0
lock = threading.Lock()
def change_it(n):
# Deposit first then withdraw, the result should be 0:
global balance
# Acquire lock for thread synchronization
lock.acquire()
balance = balance + n
balance = balance - n
# Release lock, allowing the next thread to proceed
lock.release()
def run_thread(n):
for i in range(1000):
change_it(n)
# Repeat the experiment 100 times
for i in range(100):
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
If you want to manually lock in a program, there are generally two types: Lock and RLock.
- Lock, synchronization lock. (does not support nested locks)
import threading
num = 0
lock_object = threading.Lock()
def task():
print("Starting")
lock_object.acquire() # The first thread to arrive enters and locks, other threads must wait.
global num
for i in range(1000000):
num += 1
lock_object.release() # The thread exits and unlocks, allowing other threads to enter and execute.
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
- RLock, recursive lock. (supports nested locks)
import threading
num = 0
lock_object = threading.RLock()
def task():
print("Starting")
lock_object.acquire() # The first thread to arrive enters and locks, other threads must wait.
global num
for i in range(1000000):
num += 1
lock_object.release() # The thread exits and unlocks, allowing other threads to enter and execute.
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
RLock supports multiple lock acquisitions and releases; Lock does not support this.
import threading
lock = threading.RLock()
# Developer A has developed a function that can be called by other developers, and it needs to ensure data safety based on locks.
def func():
with lock:
pass
# Developer B has developed a function that can be directly called.
def run():
print("Other functionality")
func() # Calls the function written by Developer A, which uses locks internally.
print("Other functionality")
# Developer C has developed a function that needs to lock itself while also calling func.
def process():
with lock:
print("Other functionality")
func() # ----------------> This will lead to multiple lock situations, only RLock supports this (Lock does not support).
5. Queue#
A queue can be seen as a simple version of event synchronization and condition synchronization, where thread A can store data in the queue, and thread B can retrieve data from the queue without worrying about synchronization and locking issues. The main methods are:
- put(): Adds an item to the queue.
- get(): Removes and returns an item from the queue.
- task_done(): Called when a task is completed, indicating that the previously queued task has been completed.
- join(): Blocks until all items have been processed.
import threading
import queue
import random
import time
class Producer(threading.Thread):
def __init__(self, queue):
super(Producer,self).__init__()
self.queue = queue
def run(self):
while True:
integer = random.randint(0, 1000)
self.queue.put(integer)
print('%d put to queue by %s' % (integer, self.name))
time.sleep(1)
class Consumer(threading.Thread):
def __init__(self, queue):
super(Consumer, self).__init__()
self.queue = queue
def run(self):
while True:
integer = self.queue.get()
print('%d popped from list by %s' % (integer, self.name))
self.queue.task_done()
if __name__ == '__main__':
queue = queue.Queue()
t1 = Producer(queue)
t2 = Consumer(queue)
t1.start()
t2.start()
t1.join()
t2.join()