跳转至

Python 多线程处理列表

约 34 个字 198 行代码 预计阅读时间 3 分钟

写装饰器

from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from typing import Callable, TypeVar, Collection

from tqdm import tqdm


T = TypeVar('T')
R = TypeVar('R')

def threaded_map(desc="Processing", unit="item(s)", max_workers=None):
    """
    创建一个装饰器,用于并行处理项目列表并在处理时显示进度条。不保证顺序。

    :param desc: 进度条显示的描述文本,默认为"Processing"
    :param unit: 进度条上显示的单位,默认为"item(s)"
    :param max_workers: 线程池中的最大线程数,默认为None(由ThreadPoolExecutor决定)

    :return function: 一个装饰器函数,可用于包装需要并行处理的函数
    """
    def decorator(func):
        @wraps(func)
        def wrapper(items, *args, **kwargs):
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                # 提交任务
                futures = {executor.submit(func, item, *args, **kwargs): item
                        for item in items}

                # 进度条处理
                for future in tqdm(as_completed(futures),
                                total=len(items),
                                desc=desc,
                                unit=unit):
                    future.result()  # 可获取返回值或处理异常

        return wrapper

    return decorator

def threaded_map_list(
    desc: str = "Processing...",
    unit: str = 'item(s)',
    ensure_order: bool = True,
    max_workers: int | None = None
) -> Callable[[Callable[[T], R]], Callable[[Collection[T]], list[R]]]:
    """
    通过多线程执行方式,对集合中的每个元素应用给定的函数,并返回结果列表。

    使用方式:

    @threaded_map_list()
    def test_func(list_item):
        return list_item * 2

    l = range(100000)
    print(test_func(l)[0])

    :param desc: 进度条的描述文本。
    :param unit: 进度条中处理单位的字符串表示。
    :param ensure_order: 是否保持结果顺序与输入集合顺序一致。
    :param max_workers: 最大工作线程数,如果为 None,则使用默认值。
    :return 一个装饰器,用于装饰接受单个参数并返回结果的函数。装饰后的函数将接受一个集合作为参数,并返回一个包含每个元素处理结果的列表。
    """
    def decorator(func: Callable[[T], R]) -> Callable[[Collection[T]], list[R]]:
        @wraps(func)
        def wrapper(in_list: Collection[T]) -> list[R]:
            out_list = [None] * len(in_list)
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                if ensure_order:
                    futures = {
                        executor.submit(func, item): idx
                        for idx, item in enumerate(in_list)
                    }
                else:
                    futures = {
                        executor.submit(func, item): item
                        for item in in_list
                    }

                for future in tqdm(as_completed(futures), total=len(futures), desc=desc, unit=unit):
                    result = future.result()
                    out_list[futures[future]] = result  # 按索引存储结果

            return out_list
        return wrapper
    return decorator
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from typing import Callable, TypeVar, Collection


T = TypeVar('T')
R = TypeVar('R')

def threaded_map_simple(max_workers=None):
    """
    创建一个简单的装饰器,用于并行处理项目列表。不保证顺序。

    :param max_workers: 线程池中的最大线程数,默认为 None(由ThreadPoolExecutor决定)

    :return function: 一个装饰器函数,可用于包装需要并行处理的函数
    """
    def decorator(func):
        @wraps(func)
        def wrapper(items, *args, **kwargs):
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                # 提交任务
                futures = {executor.submit(func, item, *args, **kwargs): item
                        for item in items}

                # 简单处理,不显示进度条
                for future in as_completed(futures):
                    future.result()  # 可获取返回值或处理异常

        return wrapper

    return decorator


def threaded_map_list_simple(
    ensure_order: bool = True,
    max_workers: int | None = None
) -> Callable[[Callable[[T], R]], Callable[[Collection[T]], list[R]]]:
    """
    通过多线程执行方式,对集合中的每个元素应用给定的函数,并返回结果列表。
    不显示进度条。

    使用方式:

    @threaded_map_list_simple()
    def test_func(list_item):
        return list_item * 2

    l = range(100000)
    print(test_func(l)[0])

    :param ensure_order: 是否保持结果顺序与输入集合顺序一致。
    :param max_workers: 最大工作线程数,如果为None,则使用默认值。
    :return 一个装饰器,用于装饰接受单个参数并返回结果的函数。装饰后的函数将接受一个集合作为参数,并返回一个包含每个元素处理结果的列表。
    """
    def decorator(func: Callable[[T], R]) -> Callable[[Collection[T]], list[R]]:
        @wraps(func)
        def wrapper(in_list: Collection[T]) -> list[R]:
            out_list = [None] * len(in_list)
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                if ensure_order:
                    futures = {
                        executor.submit(func, item): idx
                        for idx, item in enumerate(in_list)
                    }
                else:
                    futures = {
                        executor.submit(func, item): item
                        for item in in_list
                    }

                # 不使用进度条,直接处理完成的任务
                for future in as_completed(futures):
                    result = future.result()
                    out_list[futures[future]] = result  # 按索引存储结果

            return out_list
        return wrapper
    return decorator

使用方法

in_list: list[in_item_type] = ...

@threaded_map(desc="Task", unit='item(s)')
def task_map(in_item: in_item_type):
    # in_item 为列表子项
    # 不输出值,但可以操作 in_list 内各项

task_map(in_list)


@threaded_map_list(desc="Load results", unit='item(s)')
def map_list(in_item: in_item_type) -> out_item_type:
    # in_item 为列表子项
    # ...
    return out_item

out_list: list[out_item_type] = map_list(in_list)
in_list: list[in_item_type] = ...

@threaded_map_simple()
def task_map(in_item: in_item_type):
    # in_item 为列表子项
    # 不输出值,但可以操作 in_list 内各项

task_map(in_list)


@threaded_map_simple()
def map_list(in_item: in_item_type) -> out_item_type:
    # in_item 为列表子项
    # ...
    return out_item

out_list: list[out_item_type] = map_list(in_list)