python 包之 multiprocessing 多进程

  

Python 包之 multiprocessing 多进程

multiprocessing 是 Python 标准库中提供的模块,可以方便地使用多进程进行并发编程。它提供了与 Python 标准库 threading 模块相同的接口,但是使用多进程编程可以充分利用多核 CPU 的优势,用于加速 CPU 密集型任务。

multiprocessing 模块的主要组件

  • Process:进程对象,用于创建新进程。

  • Queue:进程间通信(IPC)的队列对象,用于在多个进程之间安全地共享数据。

  • Pool:进程池对象,用于管理池中的多个 Worker 进程,执行一组对数据集的并行操作。

  • LockRLockSemaphore:进程锁,用于控制多个进程对共享资源的访问。

创建新进程

multiprocessing.Process 可以用于创建新的进程,常用的方式有如下两种:

方式一:函数形式

import multiprocessing

def work(name):
    print(f"Working on {name}")

if __name__ == '__main__':
    p = multiprocessing.Process(target=work, args=('Alice',))
    p.start()
    p.join()

在上面的代码中,我们通过 multiprocessing.Process() 函数创建了一个新进程,并将 work() 函数作为任务传递给了该进程。start() 方法用于启动该进程,并且 join() 方法用于等待该进程完成。在运行该代码时,可以看到类似于如下输出:

Working on Alice

方式二:面向对象式

import multiprocessing

class Worker(multiprocessing.Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"Working on {self.name}")

if __name__ == '__main__':
    p = Worker('Bob')
    p.start()
    p.join()

在上面的代码中,我们通过继承 multiprocessing.Process 类创建了一个新的进程对象,并覆盖了 run() 方法,该方法内部执行我们的工作。在运行该代码时,可以看到类似于如下输出:

Working on Bob

进程间通信

多个进程之间需要共享数据或结果时,可以使用 multiprocessing.Queue 类来实现进程间通信(IPC)。以下是一个示例,演示了两个进程之间如何实现数据的共享。

import multiprocessing

def producer(queue):
    for i in range(5):
        print(f'Producing {i}')
        queue.put(i)

def consumer(queue):
    while True:
        data = queue.get()
        if data is None:
            break
        print(f'Consuming {data}')

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    queue.put(None)
    p2.join()

在上面的代码中,我们创建了一个 multiprocessing.Queue 对象,并将其传递给了两个进程函数进行共享。producer 进程函数用于生成数据并将其放入队列中,consumer 进程函数负责不断地取出队列中的数据并进行消费。

在执行该代码时,可以看到类似于如下输出:

Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4

进程池

multiprocessing.Pool 类可以用于创建一个进程池,执行一组对数据集的并行操作。以下是一个示例,演示了如何使用 Pool 实现对列表中数字的并行求平方。

import multiprocessing

def square(x):
    return x*x

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    result = pool.map(square, [1, 2, 3, 4, 5])
    print(result)

在上面的代码中,我们创建了一个进程池,并使用 map() 方法实现对列表中数字的并行求平方。在运行该代码时,可以看到如下输出:

[1, 4, 9, 16, 25]

另外一个示例,演示了如何使用 Pool 实现对多个文件的并行读取和处理。

import multiprocessing
import os

def count_lines(filename):
    with open(filename, 'r') as f:
        lines = f.readlines()
    return len(lines)

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    folder_path = './files'
    filenames = [os.path.join(folder_path, f) for f in os.listdir(folder_path)]
    results = pool.map(count_lines, filenames)
    total_lines = sum(results)
    print(f'Total number of lines in {len(filenames)} files: {total_lines}')

在上面的代码中,我们创建了一个进程池,并使用 map() 方法实现对多个文件的并行读取和处理。其中,count_lines() 函数用于读取文件并返回其行数。在运行该代码时,需要先准备好指定路径下的多个文件,然后可以看到如下输出:

Total number of lines in 3 files: 15

进程锁

在多个进程共享同一份数据时,可能会出现多个进程同时读写该数据的情况,如果没有加锁保护,可能会导致数据的错误和不可预期的结果。可以使用 multiprocessing.Lockmultiprocessing.RLockmultiprocessing.Semaphore 等类来实现进程锁的功能。

以下是一个示例,演示了如何使用 Lock 类来保证多个进程安全地访问同一份数据。

import multiprocessing

def deposit(balance, lock):
    for i in range(10000):
        lock.acquire()
        balance.value += 1
        lock.release()

def withdraw(balance, lock):
    for i in range(10000):
        lock.acquire()
        balance.value -= 1
        lock.release()

if __name__ == '__main__':
    balance = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    d = multiprocessing.Process(target=deposit, args=(balance, lock))
    w = multiprocessing.Process(target=withdraw, args=(balance, lock))

    d.start()
    w.start()
    d.join()
    w.join()

    print(balance.value)

在上面的代码中,我们创建了一个共享变量 balance,并分别创建了存款和取款两个进程,它们会在 10000 次循环中反复对 balance 进行加减操作。为了避免多个进程同时访问同一份数据,我们使用 multiprocessing.Lock 类来对访问 balance 的进程进行加锁保护。在运行该代码时,可以看到如下输出:

0

可以看到,经过了 20000 次加减操作,最终 balance 的值保持不变,证明了我们的加锁保护起了作用。

总结

在本篇文章中,我们介绍了 Python 标准库 multiprocessing 模块的主要组件,包括创建新进程、进程间通信、进程池、进程锁等。同时,我们也给出了多个示例,演示了如何使用 multiprocessing 进行并发编程的实际应用。

相关文章