python多进程操作实例

  

Python 多进程操作实例攻略

Python 多进程是一种常用的处理大量数据和计算密集型任务的方式,它可以充分利用 CPU 的多核心特性,提高程序的执行效率。本文将介绍如何使用 Python 实现多进程操作,并提供两个简单的示例说明。

使用 multiprocessing 模块

Python 提供了一个名为 multiprocessing 的内置模块,它可以帮助你快速实现多进程操作。使用 multiprocessing 可以创建新进程、管理进程、共享数据等等。

创建新进程

在 Python 中,可以使用 Process 类来创建新的进程。以下是一个简单的示例:

from multiprocessing import Process

def worker():
    """子进程要执行的任务"""
    print("This is a new process.")

if __name__ == '__main__':
    p = Process(target=worker)
    p.start()

上述代码中,我们首先导入了 Process 类,创建了一个函数 worker 作为子进程要执行的任务。然后在主进程中,我们创建了一个 Process 实例 p,指定了要执行的任务为 worker。最后,我们通过调用 p.start() 来启动新的进程。

管理进程

在使用 multiprocessing 时,可以使用 Pool 类来管理进程池。一个进程池中通常包含多个进程,可以在该进程池中同时运行多个子进程。

以下是一个示例,其中我们使用 Pool 类来处理并行任务:

from multiprocessing import Pool

def worker(num):
    """子进程要执行的任务"""
    return num**2

if __name__ == '__main__':
    # 创建进程池,拥有 4 个进程
    pool = Pool(processes=4)

    # 带入多个参数并行执行任务
    results = pool.map(worker, [1, 2, 3, 4, 5])
    print(results)

上述代码中,我们创建了一个 Pool 实例 pool,指定了该进程池中要运行 4 个进程。接着,我们使用 map 函数将任务分配给多个进程,其中 [1, 2, 3, 4, 5] 是带入函数 worker 的参数,它们会被依次传入子进程执行,最终返回的结果被保存在 results 中,打印输出即可看到结果。

共享数据

在使用多进程时,往往需要利用共享内存来保存数据。Python 提供了 ValueArray 两个类实现不同的数据类型的共享数据。

以下是一个使用 Value 类来共享数据的示例:

from multiprocessing import Process, Value

def worker(myvalue):
    """子进程要执行的任务"""
    myvalue.value += 1
    print(f"Process {myvalue.value}")

if __name__ == '__main__':
    myvalue = Value('i', 0) # 创建共享数值对象,初始值为 0
    processes = [Process(target=worker, args=(myvalue,)) for i in range(10)] # 创建多个进程
    for p in processes:
        p.start()
    for p in processes:
        p.join()

上述代码中,我们首先创建了一个 Value 类对象 myvalue,它的数据类型为整数,初始化为 0。然后我们创建了 10 个进程,每个进程都会对 myvalue 的值进行加 1 操作,并输出当前的数值。

示例1:计算质数

以下代码是一个简单的示例,使用多进程计算从 1 到 100 之间所有的质数。

import math
from multiprocessing import Process, Queue

def is_prime(num):
    """判断一个数是否为质数"""
    if num <= 1:
        return False
    for i in range(2, int(math.sqrt(num))+1):
        if num % i == 0:
            return False
    return True

def worker(start, end, queue):
    """子进程要执行的任务"""
    primes = [num for num in range(start, end+1) if is_prime(num)]
    queue.put(primes)

if __name__ == '__main__':
    queue = Queue() # 创建队列,用于存放结果
    processes = [Process(target=worker, args=(i*10+1, i*10+10, queue)) for i in range(10)] # 创建多个进程
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    results = []
    while not queue.empty():
        results.extend(queue.get())
    results.sort()
    print(results)

在此示例中,我们首先定义了一个函数 is_prime,用于判断一个数是否为质数。然后我们创建了多个子进程,每个子进程会计算一段范围内的质数,将结果存入一个队列中。最后我们使用主进程读取队列中的结果,并排序输出。

示例2:爬取多条路线公交车站信息

以下代码是一个简单的示例,使用多进程爬取北京多条路线公交车站信息,并将结果保存至本地文件。

import requests
from multiprocessing import Process, Pool

def get_data(url, filename):
    """获取网页数据"""
    response = requests.get(url)
    with open(filename, 'w') as outfile:
        outfile.write(response.text)
    print(f"{url} saved to file {filename}.")

def worker(line):
    """子进程要执行的任务"""
    url = f"http://www.bjbus.com/home/ajax_rtbus_data.php?act=busTime&selBLine={line}&selBDir=0&selBStop=0"
    filename = f"{line}.html"
    get_data(url, filename)

if __name__ == '__main__':
    pool = Pool(processes=4) # 创建进程池,拥有 4 个进程
    lines = ['101', '103', '105', '107', '111', '116', '118', '127', '132', '359', '363'] # 待爬行的公交路线
    pool.map(worker, lines) # 使用进程池并行执行多个任务

在此示例中,我们首先定义了一个函数 get_data,用于获取一个网页的数据,并将其保存至本地文件。然后我们创建了一个进程池,指定其拥有 4 个进程。接着我们定义了一个列表 lines,它包含了要爬取的多个公交路线。最后我们使用进程池的 map 方法,将待执行的任务全部提交给多个进程进行并行处理。每个子进程将负责爬取一条公交路线的数据,并将结果保存至本地。

相关文章