线程池控制
Python助手,用于限制科学计算和数据科学常用的原生库(如BLAS和OpenMP)中基于线程池的线程数量。
在涉及嵌套并行的工作负载中,对底层线程池大小的精细控制可以有效缓解过度订阅问题。
安装
-
对于用户,从PyPI安装最新发布版本:
pip install threadpoolctl
-
对于贡献者,以开发模式从源代码仓库安装:
pip install -r dev-requirements.txt flit install --symlink
然后您可以使用pytest运行测试:
pytest
使用方法
命令行界面
获取导入Python包(如numpy或scipy)时初始化的线程池的JSON描述:
python -m threadpoolctl -i numpy scipy.linalg
[
{
"filepath": "/home/ogrisel/miniconda3/envs/tmp/lib/libmkl_rt.so",
"prefix": "libmkl_rt",
"user_api": "blas",
"internal_api": "mkl",
"version": "2019.0.4",
"num_threads": 2,
"threading_layer": "intel"
},
{
"filepath": "/home/ogrisel/miniconda3/envs/tmp/lib/libiomp5.so",
"prefix": "libiomp",
"user_api": "openmp",
"internal_api": "openmp",
"version": null,
"num_threads": 4
}
]
JSON信息将写入STDOUT。如果缺少某些包,将在STDERR上显示警告消息。
Python运行时程序化检查
检查导入Python包时加载的支持线程池的运行时库的当前状态:
>>> from threadpoolctl import threadpool_info
>>> from pprint import pprint
>>> pprint(threadpool_info())
[]
>>> import numpy
>>> pprint(threadpool_info())
[{'filepath': '/home/ogrisel/miniconda3/envs/tmp/lib/libmkl_rt.so',
'internal_api': 'mkl',
'num_threads': 2,
'prefix': 'libmkl_rt',
'threading_layer': 'intel',
'user_api': 'blas',
'version': '2019.0.4'},
{'filepath': '/home/ogrisel/miniconda3/envs/tmp/lib/libiomp5.so',
'internal_api': 'openmp',
'num_threads': 4,
'prefix': 'libiomp',
'user_api': 'openmp',
'version': None}]
>>> import xgboost
>>> pprint(threadpool_info())
[{'filepath': '/home/ogrisel/miniconda3/envs/tmp/lib/libmkl_rt.so',
'internal_api': 'mkl',
'num_threads': 2,
'prefix': 'libmkl_rt',
'threading_layer': 'intel',
'user_api': 'blas',
'version': '2019.0.4'},
{'filepath': '/home/ogrisel/miniconda3/envs/tmp/lib/libiomp5.so',
'internal_api': 'openmp',
'num_threads': 4,
'prefix': 'libiomp',
'user_api': 'openmp',
'version': None},
{'filepath': '/home/ogrisel/miniconda3/envs/tmp/lib/libgomp.so.1.0.0',
'internal_api': 'openmp',
'num_threads': 4,
'prefix': 'libgomp',
'user_api': 'openmp',
'version': None}]
在上面的例子中,numpy
是从默认的anaconda通道安装的,它带有MKL及其Intel OpenMP(libiomp5
)实现,而xgboost
是从pypi.org安装的,它链接到GNU OpenMP(libgomp
),因此这两个OpenMP运行时都加载在同一个Python程序中。
这些库的状态也可以通过面向对象的API访问:
>>> from threadpoolctl import ThreadpoolController, threadpool_info
>>> from pprint import pprint
>>> import numpy
>>> controller = ThreadpoolController()
>>> pprint(controller.info())
[{'architecture': 'Haswell',
'filepath': '/home/jeremie/miniconda/envs/dev/lib/libopenblasp-r0.3.17.so',
'internal_api': 'openblas',
'num_threads': 4,
'prefix': 'libopenblas',
'threading_layer': 'pthreads',
'user_api': 'blas',
'version': '0.3.17'}]
>>> controller.info() == threadpool_info()
True
设置线程池的最大大小
控制Python程序特定部分中底层运行时库使用的线程数:
>>> from threadpoolctl import threadpool_limits
>>> import numpy as np
>>> with threadpool_limits(limits=1, user_api='blas'):
... # 在此块中,对BLAS实现(如openblas或MKL)的调用
... # 将被限制为仅使用一个线程。因此它们可以与线程并行性一起使用。
... a = np.random.randn(1000, 1000)
... a_squared = a @ a
线程池也可以通过面向对象的API进行控制,这在避免每次都搜索所有已加载的共享库时特别有用。但是,它不会作用于ThreadpoolController
实例化后加载的库:
>>> from threadpoolctl import ThreadpoolController
>>> import numpy as np
>>> controller = ThreadpoolController()
>>> with controller.limit(limits=1, user_api='blas'):
... a = np.random.randn(1000, 1000)
... a_squared = a @ a
将限制限定在函数作用域内
threadpool_limits
和ThreadpoolController
也可以用作装饰器,以在函数级别设置受支持库使用的最大线程数。这些装饰器可以通过它们的wrap
方法访问:
>>> from threadpoolctl import ThreadpoolController, threadpool_limits
>>> import numpy as np
>>> controller = ThreadpoolController()
>>> @controller.wrap(limits=1, user_api='blas')
... # 或 @threadpool_limits.wrap(limits=1, user_api='blas')
... def my_func():
... # 在此函数内,对BLAS实现(如openblas或MKL)的调用
... # 将被限制为仅使用一个线程。
... a = np.random.randn(1000, 1000)
... a_squared = a @ a
...
切换FlexiBLAS后端
FlexiBLAS
是一个BLAS包装器,可以在运行时切换BLAS后端。threadpoolctl
为此功能提供了Python绑定。以下是一个示例,但请注意,API的这部分是实验性的,可能会在没有弃用警告的情况下发生变化:
>>> from threadpoolctl import ThreadpoolController
>>> import numpy as np
>>> controller = ThreadpoolController()
>>> controller.info()
[{'user_api': 'blas',
'internal_api': 'flexiblas',
'num_threads': 1,
'prefix': 'libflexiblas',
'filepath': '/usr/local/lib/libflexiblas.so.3.3',
'version': '3.3.1',
'available_backends': ['NETLIB', 'OPENBLASPTHREAD', 'ATLAS'],
'loaded_backends': ['NETLIB'],
'current_backend': 'NETLIB'}]
# 获取flexiblas控制器
>>> flexiblas_ct = controller.select(internal_api="flexiblas").lib_controllers[0]
# 切换到构建时预定义的后端(列在"available_backends"中)
>>> flexiblas_ct.switch_backend("OPENBLASPTHREAD")
>>> controller.info()
[{'user_api': 'blas',
'internal_api': 'flexiblas',
'num_threads': 4,
'prefix': 'libflexiblas',
'filepath': '/usr/local/lib/libflexiblas.so.3.3',
'version': '3.3.1',
'available_backends': ['NETLIB', 'OPENBLASPTHREAD', 'ATLAS'],
'loaded_backends': ['NETLIB', 'OPENBLASPTHREAD'],
'current_backend': 'OPENBLASPTHREAD'},
{'user_api': 'blas',
'internal_api': 'openblas',
'num_threads': 4,
'prefix': 'libopenblas',
'filepath': '/usr/lib/x86_64-linux-gnu/openblas-pthread/libopenblasp-r0.3.8.so',
'version': '0.3.8',
'threading_layer': 'pthreads',
'architecture': 'Haswell'}]
还可以直接提供共享库的路径
flexiblas_controller.switch_backend("/home/jeremie/miniforge/envs/flexiblas_threadpoolctl/lib/libmkl_rt.so") controller.info() [{'user_api': 'blas', 'internal_api': 'flexiblas', 'num_threads': 2, 'prefix': 'libflexiblas', 'filepath': '/usr/local/lib/libflexiblas.so.3.3', 'version': '3.3.1', 'available_backends': ['NETLIB', 'OPENBLASPTHREAD', 'ATLAS'], 'loaded_backends': ['NETLIB', 'OPENBLASPTHREAD', '/home/jeremie/miniforge/envs/flexiblas_threadpoolctl/lib/libmkl_rt.so'], 'current_backend': '/home/jeremie/miniforge/envs/flexiblas_threadpoolctl/lib/libmkl_rt.so'}, {'user_api': 'openmp', 'internal_api': 'openmp', 'num_threads': 4, 'prefix': 'libomp', 'filepath': '/home/jeremie/miniforge/envs/flexiblas_threadpoolctl/lib/libomp.so', 'version': None}, {'user_api': 'blas', 'internal_api': 'openblas', 'num_threads': 4, 'prefix': 'libopenblas', 'filepath': '/usr/lib/x86_64-linux-gnu/openblas-pthread/libopenblasp-r0.3.8.so', 'version': '0.3.8', 'threading_layer': 'pthreads', 'architecture': 'Haswell'}, {'user_api': 'blas', 'internal_api': 'mkl', 'num_threads': 2, 'prefix': 'libmkl_rt', 'filepath': '/home/jeremie/miniforge/envs/flexiblas_threadpoolctl/lib/libmkl_rt.so.2', 'version': '2024.0-Product', 'threading_layer': 'gnu'}]
你可以观察到,之前链接的OpenBLAS共享对象会一直被Python程序加载,但FlexiBLAS自身不再将BLAS调用委托给OpenBLAS,这一点由`current_backend`属性显示。
### 编写自定义库控制器
目前,`threadpoolctl`支持`OpenMP`和主要的`BLAS`库。然而,它也可以用于控制其他原生库的线程池,只要这些库暴露了获取和设置线程数限制的API。为此,必须为这个库实现一个控制器并将其注册到`threadpoolctl`。
自定义控制器必须是`LibController`类的子类,并实现`LibController`文档字符串中描述的属性和方法。然后,必须使用`threadpoolctl.register`函数注册这个新的控制器类。完整示例可以在[这里](https://github.com/joblib/threadpoolctl/blob/master/tests/_pyMylib/__init__.py)找到。
### OpenMP并行区域内的顺序BLAS
当想在OpenMP并行区域内进行顺序BLAS调用时,设置`limits="sequential_blas_under_openmp"`会更安全,因为设置`limits=1`和`user_api="blas"`在某些配置下可能不会导致预期的行为(例如,使用OpenMP线程层的OpenBLAS https://github.com/xianyi/OpenBLAS/issues/2985)。
### 已知限制
- 当嵌套由不同OpenMP运行时实现管理的并行循环时(例如GCC的libgomp和clang/llvm的libomp或ICC的libiomp),`threadpool_limits`可能无法限制内部线程的数量。
参见[tests/test_threadpoolctl.py](https://github.com/joblib/threadpoolctl/blob/master/tests/test_threadpoolctl.py)中的`test_openmp_nesting`函数以获取示例。更多信息可以在以下网址找到:
https://github.com/jeremiedbb/Nested_OpenMP
但是请注意,当`threadpool_limits`用于限制BLAS调用内部使用的线程数时,这个问题不会发生,即使在OpenMP并行循环下嵌套BLAS调用。`threadpool_limits`能按预期工作,即使内部BLAS实现依赖于不同的OpenMP实现。
- 在同一Python程序中使用Intel OpenMP(ICC)和LLVM OpenMP(clang)在Linux下已知会导致问题。有关更多详细信息和解决方法,请参阅以下指南:
https://github.com/joblib/threadpoolctl/blob/master/multiple_openmp.md
- 设置OpenMP和BLAS库的最大线程数具有全局效果,会影响整个Python进程。没有线程级别的隔离,因为这些库不提供线程本地API来配置嵌套并行调用中使用的线程数。
## 维护者
要进行发布:
- 在`threadpoolctl.py`中更新版本号(`__version__`),并在`CHANGES.md`中更新发布日期。
- 构建分发档案:
```bash
pip install flit
flit build
并检查dist/
的内容。
- 如果一切正常,为发布创建一个提交,打上标签并将标签推送到github:
git tag -a X.Y.Z
git push git@github.com:joblib/threadpoolctl.git X.Y.Z
- 使用flit将轮子和源分发上传到PyPI。由于PyPI不再允许密码认证,用户名需要更改为通用名称
__token__
:
FLIT_USERNAME=__token__ flit publish
并且需要传递PyPI令牌来代替密码。
-
为发布在conda-forge feedstock上创建一个PR(或等待机器人来做)。
-
在github上发布该版本。
致谢
初始动态库检查代码由@anton-malakhov为smp包编写,可在https://github.com/IntelPython/smp 获取。
threadpoolctl将其扩展到其他操作系统。与smp不同,threadpoolctl不尝试限制Python多进程池(线程或进程)的大小或设置操作系统级CPU亲和性约束:threadpoolctl仅通过原生库的公共运行时API与它们交互。