Disabled external gits
This commit is contained in:
24
cs440-acg/ext/tbb/python/TBB.py
Normal file
24
cs440-acg/ext/tbb/python/TBB.py
Normal file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from tbb import *
|
||||
from tbb import __all__, __doc__
|
||||
|
||||
if __name__ == "__main__":
|
||||
from tbb import _main
|
||||
import sys
|
||||
sys.exit(_main())
|
84
cs440-acg/ext/tbb/python/index.html
Normal file
84
cs440-acg/ext/tbb/python/index.html
Normal file
@@ -0,0 +1,84 @@
|
||||
<HTML>
|
||||
<BODY>
|
||||
<H2>Python* API for Intel® Threading Building Blocks (Intel® TBB).
|
||||
</H2>
|
||||
|
||||
<H2>Overview</H2>
|
||||
It is a preview Python* module which unlocks opportunities for additional performance in multi-threaded and multiprocess Python programs by enabling threading composability
|
||||
between two or more thread-enabled libraries like Numpy, Scipy, Sklearn, Dask, Joblib, and etc.
|
||||
<p></p>
|
||||
The biggest improvement can be achieved when a task pool like the ThreadPool or Pool from the Python standard library or libraries like Dask or Joblib (used either in multi-threading or multi-processing mode)
|
||||
execute tasks calling compute-intensive functions of Numpy/Scipy/Sklearn/PyDAAL which in turn are parallelized using Intel® Math Kernel Library or/and Intel® TBB.
|
||||
<p></p>
|
||||
The module implements Pool class with the standard interface using Intel® TBB which can be used to replace Python's ThreadPool.
|
||||
Thanks to the monkey-patching technique implemented in class Monkey, no source code change is needed in order to enable threading composability in Python programs.
|
||||
<p></p>
|
||||
For more information and examples, please refer to <A HREF="http://software.intel.com/en-us/blogs/2016/04/04/unleash-parallel-performance-of-python-programs">online blog</A>.
|
||||
|
||||
<H2>Directories</H2>
|
||||
<DL>
|
||||
<DT><A HREF="rml">rml</A>
|
||||
<DD>The folder contains sources for building the plugin with cross-process dynamic thread scheduler implementation.
|
||||
<DT><A HREF="tbb">tbb</A>
|
||||
<DD>The folder contains Python module sources.
|
||||
</DL>
|
||||
|
||||
<H2>Files</H2>
|
||||
<DL>
|
||||
<DT><A HREF="setup.py">setup.py</A>
|
||||
<DD>Standard Python setup script.
|
||||
<DT><A HREF="Makefile">Makefile</A>
|
||||
<DD>Internal Makefile for building, installing, and testing. See below.
|
||||
<DT><A HREF="TBB.py">TBB.py</A>
|
||||
<DD>Alternative entry point for Python module.
|
||||
</DL>
|
||||
|
||||
<A NAME=build><H2>Build and install (source package only)</H2></A>
|
||||
For accessing targets defined in python/Makefile, please use
|
||||
<A HREF="../src/index.html">src/Makefile</A>
|
||||
instead and build runtime libraries before working with Python.
|
||||
<DL>
|
||||
<DT><TT>make -C ../src python_all</TT>
|
||||
<DD>Install and test as described below.
|
||||
<DT><TT>make -C ../src python_install</TT>
|
||||
<DD>Install module into Python environment.
|
||||
<DT><TT>make -C ../src python_test</TT>
|
||||
<DD>Test installed Intel® TBB module for Python.
|
||||
<DT><TT>make -C ../src python_release</TT>
|
||||
<DD>Recompile Python module. Result is located in Intel® TBB build directory.
|
||||
<DT><TT>make -C ../src python_clean</TT>
|
||||
<DD>Remove any intermediate files produced by the commands above. Does not remove installed module.
|
||||
</DL>
|
||||
|
||||
<H2>Command-line interface</H2>
|
||||
<DL>
|
||||
<DT><TT>python -m tbb -h</TT>
|
||||
<DD>Print documentation on command-line interface</DD>
|
||||
<DT><TT>pydoc tbb</TT>
|
||||
<DD>Read built-in documentation for Python interfaces.</DD>
|
||||
<DT><TT>python-tbb your_script.py</TT>
|
||||
<DT><TT>python -m tbb your_script.py</TT>
|
||||
<DD>Run your_script.py in context of `with tbb.Monkey():` when Intel® TBB is enabled. By default only multi-threading will be covered.</DD>
|
||||
<DT><TT>python -m tbb --ipc your_script.py</TT>
|
||||
<DD>Run your_script.py in context of `with tbb.Monkey():` when Intel® TBB enabled in both multi-threading and multi-processing modes.</DD>
|
||||
</DL>
|
||||
|
||||
<H2>System Requirements</H2>
|
||||
The Python module was not tested on older versions of Python thus we require at least Python versions 2.7 and 3.5 or higher.<BR>
|
||||
SWIG must be of version 3.0.6 or higher<BR>
|
||||
OS versions:
|
||||
Microsoft* Windows* Server 2012,
|
||||
Microsoft* Windows* 10,
|
||||
Ubuntu* 14.04 LTS,
|
||||
Red Hat* Enterprise Linux* 7.
|
||||
<HR>
|
||||
<A href="../index.html">Up to parent directory</A>
|
||||
<p></p>
|
||||
Copyright © 2016-2020 Intel Corporation. All Rights Reserved.
|
||||
<P></P>
|
||||
Intel is a registered trademark or trademark of Intel Corporation
|
||||
or its subsidiaries in the United States and other countries.
|
||||
<p></p>
|
||||
* Other names and brands may be claimed as the property of others.
|
||||
</BODY>
|
||||
</HTML>
|
1115
cs440-acg/ext/tbb/python/rml/ipc_server.cpp
Normal file
1115
cs440-acg/ext/tbb/python/rml/ipc_server.cpp
Normal file
File diff suppressed because it is too large
Load Diff
140
cs440-acg/ext/tbb/python/rml/ipc_utils.cpp
Normal file
140
cs440-acg/ext/tbb/python/rml/ipc_utils.cpp
Normal file
@@ -0,0 +1,140 @@
|
||||
/*
|
||||
Copyright (c) 2017-2020 Intel Corporation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
#include "ipc_utils.h"
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <limits.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
|
||||
namespace tbb {
|
||||
namespace internal {
|
||||
namespace rml {
|
||||
|
||||
#define MAX_STR_LEN 255
|
||||
#define STARTTIME_ITEM_ID 21
|
||||
|
||||
static char* get_stat_item(char* line, int item_id) {
|
||||
int id = 0, i = 0;
|
||||
|
||||
while( id!=item_id ) {
|
||||
while( line[i]!='(' && line[i]!=' ' && line[i]!='\0' ) {
|
||||
++i;
|
||||
}
|
||||
if( line[i]==' ' ) {
|
||||
++id;
|
||||
++i;
|
||||
} else if( line[i]=='(' ) {
|
||||
while( line[i]!=')' && line[i]!='\0' ) {
|
||||
++i;
|
||||
}
|
||||
if( line[i]==')' ) {
|
||||
++i;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return line + i;
|
||||
}
|
||||
|
||||
unsigned long long get_start_time(int pid) {
|
||||
const char* stat_file_path_template = "/proc/%d/stat";
|
||||
char stat_file_path[MAX_STR_LEN + 1];
|
||||
sprintf( stat_file_path, stat_file_path_template, pid );
|
||||
|
||||
FILE* stat_file = fopen( stat_file_path, "rt" );
|
||||
if( stat_file==NULL ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
char stat_line[MAX_STR_LEN + 1];
|
||||
char* line = fgets( stat_line, MAX_STR_LEN, stat_file );
|
||||
if( line==NULL ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
char* starttime_str = get_stat_item( stat_line, STARTTIME_ITEM_ID );
|
||||
if( starttime_str==NULL ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
unsigned long long starttime = strtoull( starttime_str, NULL, 10 );
|
||||
if( starttime==ULLONG_MAX ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return starttime;
|
||||
}
|
||||
|
||||
char* get_shared_name(const char* prefix, int pid, unsigned long long time) {
|
||||
const char* name_template = "%s_%d_%llu";
|
||||
const int digits_in_int = 10;
|
||||
const int digits_in_long = 20;
|
||||
|
||||
int len = strlen( name_template ) + strlen( prefix ) + digits_in_int + digits_in_long + 1;
|
||||
char* name = new char[len];
|
||||
sprintf( name, name_template, prefix, pid, time );
|
||||
|
||||
return name;
|
||||
}
|
||||
|
||||
char* get_shared_name(const char* prefix) {
|
||||
int pid = getpgrp();
|
||||
unsigned long long time = get_start_time( pid );
|
||||
return get_shared_name( prefix, pid, time );
|
||||
}
|
||||
|
||||
int get_num_threads(const char* env_var) {
|
||||
if( env_var==NULL ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
char* value = getenv( env_var );
|
||||
if( value==NULL ) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int num_threads = (int)strtol( value, NULL, 10 );
|
||||
return num_threads;
|
||||
}
|
||||
|
||||
bool get_enable_flag(const char* env_var) {
|
||||
if( env_var==NULL ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
char* value = getenv( env_var );
|
||||
if( value==NULL ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if( strcmp( value, "0" ) == 0 ||
|
||||
strcmp( value, "false" ) == 0 ||
|
||||
strcmp( value, "False" ) == 0 ||
|
||||
strcmp( value, "FALSE" ) == 0 ) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}}} //tbb::internal::rml
|
30
cs440-acg/ext/tbb/python/rml/ipc_utils.h
Normal file
30
cs440-acg/ext/tbb/python/rml/ipc_utils.h
Normal file
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
Copyright (c) 2017-2020 Intel Corporation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
#ifndef __IPC_UTILS_H
|
||||
#define __IPC_UTILS_H
|
||||
|
||||
namespace tbb {
|
||||
namespace internal {
|
||||
namespace rml {
|
||||
|
||||
char* get_shared_name(const char* prefix);
|
||||
int get_num_threads(const char* env_var);
|
||||
bool get_enable_flag(const char* env_var);
|
||||
|
||||
}}} //tbb::internal::rml
|
||||
|
||||
#endif
|
120
cs440-acg/ext/tbb/python/setup.py
Normal file
120
cs440-acg/ext/tbb/python/setup.py
Normal file
@@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# System imports
|
||||
from __future__ import print_function
|
||||
from glob import glob
|
||||
import platform
|
||||
import os
|
||||
|
||||
from distutils.core import *
|
||||
from distutils.command.build import build
|
||||
|
||||
rundir = os.getcwd()
|
||||
os.chdir(os.path.abspath(os.path.dirname(__file__)))
|
||||
|
||||
if any(i in os.environ for i in ["CC", "CXX"]):
|
||||
if "CC" not in os.environ:
|
||||
os.environ['CC'] = os.environ['CXX']
|
||||
if "CXX" not in os.environ:
|
||||
os.environ['CXX'] = os.environ['CC']
|
||||
if platform.system() == 'Linux':
|
||||
os.environ['LDSHARED'] = os.environ['CXX'] + " -shared"
|
||||
print("Environment specifies CC=%s CXX=%s"%(os.environ['CC'], os.environ['CXX']))
|
||||
|
||||
intel_compiler = os.getenv('CC', '') in ['icl', 'icpc', 'icc']
|
||||
try:
|
||||
tbb_root = os.environ['TBBROOT']
|
||||
print("Using TBBROOT=", tbb_root)
|
||||
except:
|
||||
tbb_root = '..'
|
||||
if not intel_compiler:
|
||||
print("Warning: TBBROOT env var is not set and Intel's compiler is not used. It might lead\n"
|
||||
" !!!: to compile/link problems. Source tbbvars.sh/.csh file to set environment")
|
||||
use_compiler_tbb = intel_compiler and tbb_root == '..'
|
||||
if use_compiler_tbb:
|
||||
print("Using Intel TBB from Intel's compiler")
|
||||
if platform.system() == 'Windows':
|
||||
if intel_compiler:
|
||||
os.environ['DISTUTILS_USE_SDK'] = '1' # Enable environment settings in distutils
|
||||
os.environ['MSSdk'] = '1'
|
||||
print("Using compiler settings from environment")
|
||||
tbb_flag = ['/Qtbb'] if use_compiler_tbb else []
|
||||
tbb_flag += ['/EHsc'] # for Python 2
|
||||
compile_flags = ['/Qstd=c++11'] if intel_compiler else []
|
||||
else:
|
||||
tbb_flag = ['-tbb'] if use_compiler_tbb else []
|
||||
compile_flags = ['-std=c++11', '-Wno-unused-variable']
|
||||
|
||||
_tbb = Extension("tbb._api", ["tbb/api.i"],
|
||||
include_dirs=[os.path.join(tbb_root, 'include')] if not use_compiler_tbb else [],
|
||||
swig_opts =['-c++', '-O', '-threads'] + ( # add '-builtin' later
|
||||
['-I' + os.path.join(tbb_root, 'include')] if not use_compiler_tbb else []),
|
||||
extra_compile_args=compile_flags + tbb_flag,
|
||||
extra_link_args=tbb_flag,
|
||||
libraries =(['tbb'] if not use_compiler_tbb else []) +
|
||||
(['irml'] if platform.system() == "Linux" else []), # TODO: why do we need this?
|
||||
library_dirs=[ rundir, # for custom-builds
|
||||
os.path.join(tbb_root, 'lib', 'intel64', 'gcc4.8'), # for Linux
|
||||
os.path.join(tbb_root, 'lib'), # for MacOS
|
||||
os.path.join(tbb_root, 'lib', 'intel64', 'vc_mt'), # for Windows
|
||||
] if not use_compiler_tbb else [],
|
||||
language ='c++',
|
||||
)
|
||||
|
||||
|
||||
class TBBBuild(build):
|
||||
sub_commands = [ # define build order
|
||||
('build_ext', build.has_ext_modules),
|
||||
('build_py', build.has_pure_modules),
|
||||
]
|
||||
|
||||
|
||||
setup( name ="TBB",
|
||||
description ="Python API for Intel TBB",
|
||||
long_description="Python API to Intel(R) Threading Building Blocks library (Intel(R) TBB) "
|
||||
"extended with standard Pool implementation and monkey-patching",
|
||||
url ="https://software.intel.com/en-us/intel-tbb",
|
||||
author ="Intel Corporation",
|
||||
author_email="inteltbbdevelopers@intel.com",
|
||||
license ="Dual license: Apache or Proprietary",
|
||||
version ="0.1",
|
||||
classifiers =[
|
||||
'Development Status :: 4 - Beta',
|
||||
'Environment :: Console',
|
||||
'Environment :: Plugins',
|
||||
'Intended Audience :: Developers',
|
||||
'Intended Audience :: System Administrators',
|
||||
'Intended Audience :: Other Audience',
|
||||
'Intended Audience :: Science/Research',
|
||||
'License :: OSI Approved :: Apache Software License',
|
||||
'Operating System :: MacOS :: MacOS X',
|
||||
'Operating System :: Microsoft :: Windows',
|
||||
'Operating System :: POSIX :: Linux',
|
||||
'Programming Language :: Python',
|
||||
'Programming Language :: Python :: 2',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: C++',
|
||||
'Topic :: System :: Hardware :: Symmetric Multi-processing',
|
||||
'Topic :: Software Development :: Libraries',
|
||||
],
|
||||
keywords='TBB multiprocessing multithreading composable parallelism',
|
||||
ext_modules=[_tbb],
|
||||
packages=['tbb'],
|
||||
py_modules=['TBB'],
|
||||
cmdclass={'build': TBBBuild}
|
||||
)
|
325
cs440-acg/ext/tbb/python/tbb/__init__.py
Normal file
325
cs440-acg/ext/tbb/python/tbb/__init__.py
Normal file
@@ -0,0 +1,325 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import multiprocessing.pool
|
||||
import ctypes
|
||||
import atexit
|
||||
import sys
|
||||
import os
|
||||
|
||||
from .api import *
|
||||
from .api import __all__ as api__all
|
||||
from .pool import *
|
||||
from .pool import __all__ as pool__all
|
||||
|
||||
__all__ = ["Monkey", "is_active"] + api__all + pool__all
|
||||
|
||||
__doc__ = """
|
||||
Python API for Intel(R) Threading Building Blocks library (Intel(R) TBB)
|
||||
extended with standard Python's pools implementation and monkey-patching.
|
||||
|
||||
Command-line interface example:
|
||||
$ python -m tbb $your_script.py
|
||||
Runs your_script.py in context of tbb.Monkey
|
||||
"""
|
||||
|
||||
is_active = False
|
||||
""" Indicates whether TBB context is activated """
|
||||
|
||||
ipc_enabled = False
|
||||
""" Indicates whether IPC mode is enabled """
|
||||
|
||||
libirml = "libirml.so.1"
|
||||
|
||||
|
||||
def _test(arg=None):
|
||||
"""Some tests"""
|
||||
import platform
|
||||
if platform.system() == "Linux":
|
||||
ctypes.CDLL(libirml)
|
||||
assert 256 == os.system("ldd "+_api.__file__+"| grep -E 'libimf|libsvml|libintlc'")
|
||||
from .test import test
|
||||
test(arg)
|
||||
print("done")
|
||||
|
||||
|
||||
def tbb_process_pool_worker27(inqueue, outqueue, initializer=None, initargs=(),
|
||||
maxtasks=None):
|
||||
from multiprocessing.pool import worker
|
||||
worker(inqueue, outqueue, initializer, initargs, maxtasks)
|
||||
if ipc_enabled:
|
||||
try:
|
||||
librml = ctypes.CDLL(libirml)
|
||||
librml.release_resources()
|
||||
except:
|
||||
print("Warning: Can not load ", libirml, file=sys.stderr)
|
||||
|
||||
|
||||
class TBBProcessPool27(multiprocessing.pool.Pool):
|
||||
def _repopulate_pool(self):
|
||||
"""Bring the number of pool processes up to the specified number,
|
||||
for use after reaping workers which have exited.
|
||||
"""
|
||||
from multiprocessing.util import debug
|
||||
|
||||
for i in range(self._processes - len(self._pool)):
|
||||
w = self.Process(target=tbb_process_pool_worker27,
|
||||
args=(self._inqueue, self._outqueue,
|
||||
self._initializer,
|
||||
self._initargs, self._maxtasksperchild)
|
||||
)
|
||||
self._pool.append(w)
|
||||
w.name = w.name.replace('Process', 'PoolWorker')
|
||||
w.daemon = True
|
||||
w.start()
|
||||
debug('added worker')
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
for p in self._pool:
|
||||
p.join()
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.close()
|
||||
for p in self._pool:
|
||||
p.join()
|
||||
|
||||
|
||||
def tbb_process_pool_worker3(inqueue, outqueue, initializer=None, initargs=(),
|
||||
maxtasks=None, wrap_exception=False):
|
||||
from multiprocessing.pool import worker
|
||||
worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
|
||||
if ipc_enabled:
|
||||
try:
|
||||
librml = ctypes.CDLL(libirml)
|
||||
librml.release_resources()
|
||||
except:
|
||||
print("Warning: Can not load ", libirml, file=sys.stderr)
|
||||
|
||||
|
||||
class TBBProcessPool3(multiprocessing.pool.Pool):
|
||||
def _repopulate_pool(self):
|
||||
"""Bring the number of pool processes up to the specified number,
|
||||
for use after reaping workers which have exited.
|
||||
"""
|
||||
from multiprocessing.util import debug
|
||||
|
||||
for i in range(self._processes - len(self._pool)):
|
||||
w = self.Process(target=tbb_process_pool_worker3,
|
||||
args=(self._inqueue, self._outqueue,
|
||||
self._initializer,
|
||||
self._initargs, self._maxtasksperchild,
|
||||
self._wrap_exception)
|
||||
)
|
||||
self._pool.append(w)
|
||||
w.name = w.name.replace('Process', 'PoolWorker')
|
||||
w.daemon = True
|
||||
w.start()
|
||||
debug('added worker')
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
for p in self._pool:
|
||||
p.join()
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.close()
|
||||
for p in self._pool:
|
||||
p.join()
|
||||
|
||||
|
||||
class Monkey:
|
||||
"""
|
||||
Context manager which replaces standard multiprocessing.pool
|
||||
implementations with tbb.pool using monkey-patching. It also enables TBB
|
||||
threading for Intel(R) Math Kernel Library (Intel(R) MKL). For example:
|
||||
|
||||
with tbb.Monkey():
|
||||
run_my_numpy_code()
|
||||
|
||||
It allows multiple parallel tasks to be executed on the same thread pool
|
||||
and coordinate number of threads across multiple processes thus avoiding
|
||||
overheads from oversubscription.
|
||||
"""
|
||||
_items = {}
|
||||
_modules = {}
|
||||
|
||||
def __init__(self, max_num_threads=None, benchmark=False):
|
||||
"""
|
||||
Create context manager for running under TBB scheduler.
|
||||
:param max_num_threads: if specified, limits maximal number of threads
|
||||
:param benchmark: if specified, blocks in initialization until requested number of threads are ready
|
||||
"""
|
||||
if max_num_threads:
|
||||
self.ctl = global_control(global_control.max_allowed_parallelism, int(max_num_threads))
|
||||
if benchmark:
|
||||
if not max_num_threads:
|
||||
max_num_threads = default_num_threads()
|
||||
from .api import _concurrency_barrier
|
||||
_concurrency_barrier(int(max_num_threads))
|
||||
|
||||
def _patch(self, class_name, module_name, obj):
|
||||
m = self._modules[class_name] = __import__(module_name, globals(),
|
||||
locals(), [class_name])
|
||||
if m == None:
|
||||
return
|
||||
oldattr = getattr(m, class_name, None)
|
||||
if oldattr == None:
|
||||
self._modules[class_name] = None
|
||||
return
|
||||
self._items[class_name] = oldattr
|
||||
setattr(m, class_name, obj)
|
||||
|
||||
def __enter__(self):
|
||||
global is_active
|
||||
assert is_active == False, "tbb.Monkey does not support nesting yet"
|
||||
is_active = True
|
||||
self.env_mkl = os.getenv('MKL_THREADING_LAYER')
|
||||
os.environ['MKL_THREADING_LAYER'] = 'TBB'
|
||||
self.env_numba = os.getenv('NUMBA_THREADING_LAYER')
|
||||
os.environ['NUMBA_THREADING_LAYER'] = 'TBB'
|
||||
|
||||
if ipc_enabled:
|
||||
if sys.version_info.major == 2 and sys.version_info.minor >= 7:
|
||||
self._patch("Pool", "multiprocessing.pool", TBBProcessPool27)
|
||||
elif sys.version_info.major == 3 and sys.version_info.minor >= 5:
|
||||
self._patch("Pool", "multiprocessing.pool", TBBProcessPool3)
|
||||
self._patch("ThreadPool", "multiprocessing.pool", Pool)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
global is_active
|
||||
assert is_active == True, "modified?"
|
||||
is_active = False
|
||||
if self.env_mkl is None:
|
||||
del os.environ['MKL_THREADING_LAYER']
|
||||
else:
|
||||
os.environ['MKL_THREADING_LAYER'] = self.env_mkl
|
||||
if self.env_numba is None:
|
||||
del os.environ['NUMBA_THREADING_LAYER']
|
||||
else:
|
||||
os.environ['NUMBA_THREADING_LAYER'] = self.env_numba
|
||||
for name in self._items.keys():
|
||||
setattr(self._modules[name], name, self._items[name])
|
||||
|
||||
|
||||
def init_sem_name():
|
||||
try:
|
||||
librml = ctypes.CDLL(libirml)
|
||||
librml.set_active_sem_name()
|
||||
librml.set_stop_sem_name()
|
||||
except Exception as e:
|
||||
print("Warning: Can not initialize name of shared semaphores:", e,
|
||||
file=sys.stderr)
|
||||
|
||||
|
||||
def tbb_atexit():
|
||||
if ipc_enabled:
|
||||
try:
|
||||
librml = ctypes.CDLL(libirml)
|
||||
librml.release_semaphores()
|
||||
except:
|
||||
print("Warning: Can not release shared semaphores",
|
||||
file=sys.stderr)
|
||||
|
||||
|
||||
def _main():
|
||||
# Run the module specified as the next command line argument
|
||||
# python -m TBB user_app.py
|
||||
global ipc_enabled
|
||||
|
||||
import platform
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(prog="python -m tbb", description="""
|
||||
Run your Python script in context of tbb.Monkey, which
|
||||
replaces standard Python pools and threading layer of
|
||||
Intel(R) Math Kernel Library by implementation based on
|
||||
Intel(R) Threading Building Blocks. It enables multiple parallel
|
||||
tasks to be executed on the same thread pool and coordinate
|
||||
number of threads across multiple processes thus avoiding
|
||||
overheads from oversubscription.
|
||||
""", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
if platform.system() == "Linux":
|
||||
parser.add_argument('--ipc', action='store_true',
|
||||
help="Enable inter-process (IPC) coordination between Intel TBB schedulers")
|
||||
parser.add_argument('-a', '--allocator', action='store_true',
|
||||
help="Enable Intel TBB scalable allocator as a replacement for standard memory allocator")
|
||||
parser.add_argument('--allocator-huge-pages', action='store_true',
|
||||
help="Enable huge pages for Intel TBB allocator (implies: -a)")
|
||||
parser.add_argument('-p', '--max-num-threads', default=default_num_threads(), type=int,
|
||||
help="Initialize Intel TBB with P max number of threads per process", metavar='P')
|
||||
parser.add_argument('-b', '--benchmark', action='store_true',
|
||||
help="Block Intel TBB initialization until all the threads are created before continue the script. "
|
||||
"This is necessary for performance benchmarks that want to exclude lazy scheduler initialization effects from the measurements")
|
||||
parser.add_argument('-v', '--verbose', action='store_true',
|
||||
help="Request verbose and version information")
|
||||
parser.add_argument('-m', action='store_true', dest='module',
|
||||
help="Executes following as a module")
|
||||
parser.add_argument('name', help="Script or module name")
|
||||
parser.add_argument('args', nargs=argparse.REMAINDER,
|
||||
help="Command line arguments")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
os.environ["TBB_VERSION"] = "1"
|
||||
if platform.system() == "Linux":
|
||||
if args.allocator_huge_pages:
|
||||
args.allocator = True
|
||||
if args.allocator and not os.environ.get("_TBB_MALLOC_PRELOAD"):
|
||||
libtbbmalloc_lib = 'libtbbmalloc_proxy.so.2'
|
||||
ld_preload = 'LD_PRELOAD'
|
||||
os.environ["_TBB_MALLOC_PRELOAD"] = "1"
|
||||
preload_list = filter(None, os.environ.get(ld_preload, "").split(':'))
|
||||
if libtbbmalloc_lib in preload_list:
|
||||
print('Info:', ld_preload, "contains", libtbbmalloc_lib, "already\n")
|
||||
else:
|
||||
os.environ[ld_preload] = ':'.join([libtbbmalloc_lib] + list(preload_list))
|
||||
|
||||
if args.allocator_huge_pages:
|
||||
assert platform.system() == "Linux"
|
||||
try:
|
||||
with open('/proc/sys/vm/nr_hugepages', 'r') as f:
|
||||
pages = int(f.read())
|
||||
if pages == 0:
|
||||
print("TBB: Pre-allocated huge pages are not currently reserved in the system. To reserve, run e.g.:\n"
|
||||
"\tsudo sh -c 'echo 2000 > /proc/sys/vm/nr_hugepages'")
|
||||
os.environ["TBB_MALLOC_USE_HUGE_PAGES"] = "1"
|
||||
except:
|
||||
print("TBB: Failed to read number of pages from /proc/sys/vm/nr_hugepages\n"
|
||||
"\tIs the Linux kernel configured with the huge pages feature?")
|
||||
sys.exit(1)
|
||||
|
||||
os.execl(sys.executable, sys.executable, '-m', 'tbb', *sys.argv[1:])
|
||||
assert False, "Re-execution failed"
|
||||
|
||||
sys.argv = [args.name] + args.args
|
||||
ipc_enabled = platform.system() == "Linux" and args.ipc
|
||||
os.environ["IPC_ENABLE"] = "1" if ipc_enabled else "0"
|
||||
if ipc_enabled:
|
||||
atexit.register(tbb_atexit)
|
||||
init_sem_name()
|
||||
if not os.environ.get("KMP_BLOCKTIME"): # TODO move
|
||||
os.environ["KMP_BLOCKTIME"] = "0"
|
||||
if '_' + args.name in globals():
|
||||
return globals()['_' + args.name](*args.args)
|
||||
else:
|
||||
import runpy
|
||||
runf = runpy.run_module if args.module else runpy.run_path
|
||||
with Monkey(max_num_threads=args.max_num_threads, benchmark=args.benchmark):
|
||||
runf(args.name, run_name='__main__')
|
20
cs440-acg/ext/tbb/python/tbb/__main__.py
Normal file
20
cs440-acg/ext/tbb/python/tbb/__main__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from . import _main
|
||||
from sys import exit
|
||||
exit(_main())
|
175
cs440-acg/ext/tbb/python/tbb/api.i
Normal file
175
cs440-acg/ext/tbb/python/tbb/api.i
Normal file
@@ -0,0 +1,175 @@
|
||||
%pythonbegin %{
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
__all__ = ["task_arena", "task_group", "task_scheduler_init", "global_control", "default_num_threads"]
|
||||
%}
|
||||
%begin %{
|
||||
/* Defines Python wrappers for Intel(R) Threading Building Blocks (Intel TBB).*/
|
||||
%}
|
||||
%module api
|
||||
|
||||
#if SWIG_VERSION < 0x030001
|
||||
#error SWIG version 3.0.6 or newer is required for correct functioning
|
||||
#endif
|
||||
|
||||
%{
|
||||
#define TBB_PREVIEW_WAITING_FOR_WORKERS 1
|
||||
#include <tbb/tbb.h>
|
||||
#include <tbb/compat/condition_variable>
|
||||
#if TBB_IMPLEMENT_CPP0X
|
||||
namespace std { using tbb::mutex; }
|
||||
#define unique_ptr auto_ptr
|
||||
#else
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
#endif
|
||||
using namespace tbb;
|
||||
|
||||
class PyCaller : public swig::SwigPtr_PyObject {
|
||||
public:
|
||||
// icpc 2013 does not support simple using SwigPtr_PyObject::SwigPtr_PyObject;
|
||||
PyCaller(const PyCaller& s) : SwigPtr_PyObject(s) {}
|
||||
PyCaller(PyObject *p, bool initial = true) : SwigPtr_PyObject(p, initial) {}
|
||||
|
||||
void operator()() const {
|
||||
SWIG_PYTHON_THREAD_BEGIN_BLOCK;
|
||||
PyObject* r = PyObject_CallFunctionObjArgs((PyObject*)*this, NULL);
|
||||
if(r) Py_DECREF(r);
|
||||
SWIG_PYTHON_THREAD_END_BLOCK;
|
||||
}
|
||||
};
|
||||
|
||||
struct ArenaPyCaller {
|
||||
task_arena *my_arena;
|
||||
PyObject *my_callable;
|
||||
ArenaPyCaller(task_arena *a, PyObject *c) : my_arena(a), my_callable(c) {
|
||||
SWIG_PYTHON_THREAD_BEGIN_BLOCK;
|
||||
Py_XINCREF(c);
|
||||
SWIG_PYTHON_THREAD_END_BLOCK;
|
||||
}
|
||||
void operator()() const {
|
||||
my_arena->execute(PyCaller(my_callable, false));
|
||||
}
|
||||
};
|
||||
|
||||
struct barrier_data {
|
||||
std::condition_variable event;
|
||||
std::mutex m;
|
||||
int worker_threads, full_threads;
|
||||
};
|
||||
|
||||
class barrier_task : public tbb::task {
|
||||
barrier_data &b;
|
||||
public:
|
||||
barrier_task(barrier_data &d) : b(d) {}
|
||||
/*override*/ tbb::task *execute() {
|
||||
std::unique_lock<std::mutex> lock(b.m);
|
||||
if(++b.worker_threads >= b.full_threads)
|
||||
b.event.notify_all();
|
||||
else while(b.worker_threads < b.full_threads)
|
||||
b.event.wait(lock);
|
||||
return NULL;
|
||||
}
|
||||
};
|
||||
|
||||
void _concurrency_barrier(int threads = tbb::task_scheduler_init::automatic) {
|
||||
if(threads == task_scheduler_init::automatic)
|
||||
threads = task_scheduler_init::default_num_threads();
|
||||
if(threads < 2)
|
||||
return;
|
||||
std::unique_ptr<global_control> g(
|
||||
(global_control::active_value(global_control::max_allowed_parallelism) < unsigned(threads))?
|
||||
new global_control(global_control::max_allowed_parallelism, threads) : NULL);
|
||||
barrier_data b;
|
||||
b.worker_threads = 0;
|
||||
b.full_threads = threads-1;
|
||||
for(int i = 0; i < b.full_threads; i++)
|
||||
tbb::task::enqueue( *new( tbb::task::allocate_root() ) barrier_task(b) );
|
||||
std::unique_lock<std::mutex> lock(b.m);
|
||||
b.event.wait(lock);
|
||||
};
|
||||
|
||||
%}
|
||||
|
||||
void _concurrency_barrier(int threads = tbb::task_scheduler_init::automatic);
|
||||
|
||||
namespace tbb {
|
||||
class task_scheduler_init {
|
||||
public:
|
||||
//! Typedef for number of threads that is automatic.
|
||||
static const int automatic = -1;
|
||||
//! Argument to initialize() or constructor that causes initialization to be deferred.
|
||||
static const int deferred = -2;
|
||||
task_scheduler_init( int max_threads=automatic,
|
||||
size_t thread_stack_size=0 );
|
||||
~task_scheduler_init();
|
||||
void initialize( int max_threads=automatic );
|
||||
void terminate();
|
||||
static int default_num_threads();
|
||||
bool is_active() const;
|
||||
void blocking_terminate();
|
||||
};
|
||||
|
||||
class task_arena {
|
||||
public:
|
||||
static const int automatic = -1;
|
||||
static int current_thread_index();
|
||||
task_arena(int max_concurrency = automatic, unsigned reserved_for_masters = 1);
|
||||
task_arena(const task_arena &s);
|
||||
~task_arena();
|
||||
void initialize();
|
||||
void initialize(int max_concurrency, unsigned reserved_for_masters = 1);
|
||||
void terminate();
|
||||
bool is_active();
|
||||
%extend {
|
||||
void enqueue( PyObject *c ) { $self->enqueue(PyCaller(c)); }
|
||||
void execute( PyObject *c ) { $self->execute(PyCaller(c)); }
|
||||
};
|
||||
};
|
||||
|
||||
class task_group {
|
||||
public:
|
||||
task_group();
|
||||
~task_group();
|
||||
void wait();
|
||||
bool is_canceling();
|
||||
void cancel();
|
||||
%extend {
|
||||
void run( PyObject *c ) { $self->run(PyCaller(c)); }
|
||||
void run( PyObject *c, task_arena *a ) { $self->run(ArenaPyCaller(a, c)); }
|
||||
};
|
||||
};
|
||||
|
||||
class global_control {
|
||||
public:
|
||||
enum parameter {
|
||||
max_allowed_parallelism,
|
||||
thread_stack_size,
|
||||
parameter_max // insert new parameters above this point
|
||||
};
|
||||
global_control(parameter param, size_t value);
|
||||
~global_control();
|
||||
static size_t active_value(parameter param);
|
||||
};
|
||||
|
||||
} // tbb
|
||||
|
||||
// Additional definitions for Python part of the module
|
||||
%pythoncode %{
|
||||
default_num_threads = task_scheduler_init_default_num_threads
|
||||
%}
|
631
cs440-acg/ext/tbb/python/tbb/pool.py
Normal file
631
cs440-acg/ext/tbb/python/tbb/pool.py
Normal file
@@ -0,0 +1,631 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Based on the software developed by:
|
||||
# Copyright (c) 2008,2016 david decotigny (Pool of threads)
|
||||
# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool)
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions
|
||||
# are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# 2. Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in the
|
||||
# documentation and/or other materials provided with the distribution.
|
||||
# 3. Neither the name of author nor the names of any contributors may be
|
||||
# used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
|
||||
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
||||
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
||||
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
||||
# SUCH DAMAGE.
|
||||
#
|
||||
|
||||
# @brief Python Pool implementation based on TBB with monkey-patching
|
||||
#
|
||||
# See http://docs.python.org/dev/library/multiprocessing.html
|
||||
# Differences: added imap_async and imap_unordered_async, and terminate()
|
||||
# has to be called explicitly (it's not registered by atexit).
|
||||
#
|
||||
# The general idea is that we submit works to a workqueue, either as
|
||||
# single Jobs (one function to call), or JobSequences (batch of
|
||||
# Jobs). Each Job is associated with an ApplyResult object which has 2
|
||||
# states: waiting for the Job to complete, or Ready. Instead of
|
||||
# waiting for the jobs to finish, we wait for their ApplyResult object
|
||||
# to become ready: an event mechanism is used for that.
|
||||
# When we apply a function to several arguments in "parallel", we need
|
||||
# a way to wait for all/part of the Jobs to be processed: that's what
|
||||
# "collectors" are for; they group and wait for a set of ApplyResult
|
||||
# objects. Once a collector is ready to be used, we can use a
|
||||
# CollectorIterator to iterate over the result values it's collecting.
|
||||
#
|
||||
# The methods of a Pool object use all these concepts and expose
|
||||
# them to their caller in a very simple way.
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from .api import *
|
||||
|
||||
__all__ = ["Pool", "TimeoutError"]
|
||||
__doc__ = """
|
||||
Standard Python Pool implementation based on Python API
|
||||
for Intel(R) Threading Building Blocks library (Intel(R) TBB)
|
||||
"""
|
||||
|
||||
|
||||
class TimeoutError(Exception):
|
||||
"""Raised when a result is not available within the given timeout"""
|
||||
pass
|
||||
|
||||
|
||||
class Pool(object):
|
||||
"""
|
||||
The Pool class provides standard multiprocessing.Pool interface
|
||||
which is mapped onto Intel(R) TBB tasks executing in its thread pool
|
||||
"""
|
||||
|
||||
def __init__(self, nworkers=0, name="Pool"):
|
||||
"""
|
||||
\param nworkers (integer) number of worker threads to start
|
||||
\param name (string) prefix for the worker threads' name
|
||||
"""
|
||||
self._closed = False
|
||||
self._tasks = task_group()
|
||||
self._pool = [None,]*default_num_threads() # Dask asks for len(_pool)
|
||||
|
||||
def apply(self, func, args=(), kwds=dict()):
|
||||
"""Equivalent of the apply() builtin function. It blocks till
|
||||
the result is ready."""
|
||||
return self.apply_async(func, args, kwds).get()
|
||||
|
||||
def map(self, func, iterable, chunksize=None):
|
||||
"""A parallel equivalent of the map() builtin function. It
|
||||
blocks till the result is ready.
|
||||
|
||||
This method chops the iterable into a number of chunks which
|
||||
it submits to the process pool as separate tasks. The
|
||||
(approximate) size of these chunks can be specified by setting
|
||||
chunksize to a positive integer."""
|
||||
return self.map_async(func, iterable, chunksize).get()
|
||||
|
||||
def imap(self, func, iterable, chunksize=1):
|
||||
"""
|
||||
An equivalent of itertools.imap().
|
||||
|
||||
The chunksize argument is the same as the one used by the
|
||||
map() method. For very long iterables using a large value for
|
||||
chunksize can make the job complete much faster than
|
||||
using the default value of 1.
|
||||
|
||||
Also if chunksize is 1 then the next() method of the iterator
|
||||
returned by the imap() method has an optional timeout
|
||||
parameter: next(timeout) will raise processing.TimeoutError if
|
||||
the result cannot be returned within timeout seconds.
|
||||
"""
|
||||
collector = OrderedResultCollector(as_iterator=True)
|
||||
self._create_sequences(func, iterable, chunksize, collector)
|
||||
return iter(collector)
|
||||
|
||||
def imap_unordered(self, func, iterable, chunksize=1):
|
||||
"""The same as imap() except that the ordering of the results
|
||||
from the returned iterator should be considered
|
||||
arbitrary. (Only when there is only one worker process is the
|
||||
order guaranteed to be "correct".)"""
|
||||
collector = UnorderedResultCollector()
|
||||
self._create_sequences(func, iterable, chunksize, collector)
|
||||
return iter(collector)
|
||||
|
||||
def apply_async(self, func, args=(), kwds=dict(), callback=None):
|
||||
"""A variant of the apply() method which returns an
|
||||
ApplyResult object.
|
||||
|
||||
If callback is specified then it should be a callable which
|
||||
accepts a single argument. When the result becomes ready,
|
||||
callback is applied to it (unless the call failed). callback
|
||||
should complete immediately since otherwise the thread which
|
||||
handles the results will get blocked."""
|
||||
assert not self._closed # No lock here. We assume it's atomic...
|
||||
apply_result = ApplyResult(callback=callback)
|
||||
job = Job(func, args, kwds, apply_result)
|
||||
self._tasks.run(job)
|
||||
return apply_result
|
||||
|
||||
def map_async(self, func, iterable, chunksize=None, callback=None):
|
||||
"""A variant of the map() method which returns a ApplyResult
|
||||
object.
|
||||
|
||||
If callback is specified then it should be a callable which
|
||||
accepts a single argument. When the result becomes ready
|
||||
callback is applied to it (unless the call failed). callback
|
||||
should complete immediately since otherwise the thread which
|
||||
handles the results will get blocked."""
|
||||
apply_result = ApplyResult(callback=callback)
|
||||
collector = OrderedResultCollector(apply_result, as_iterator=False)
|
||||
if not self._create_sequences(func, iterable, chunksize, collector):
|
||||
apply_result._set_value([])
|
||||
return apply_result
|
||||
|
||||
def imap_async(self, func, iterable, chunksize=None, callback=None):
|
||||
"""A variant of the imap() method which returns an ApplyResult
|
||||
object that provides an iterator (next method(timeout)
|
||||
available).
|
||||
|
||||
If callback is specified then it should be a callable which
|
||||
accepts a single argument. When the resulting iterator becomes
|
||||
ready, callback is applied to it (unless the call
|
||||
failed). callback should complete immediately since otherwise
|
||||
the thread which handles the results will get blocked."""
|
||||
apply_result = ApplyResult(callback=callback)
|
||||
collector = OrderedResultCollector(apply_result, as_iterator=True)
|
||||
if not self._create_sequences(func, iterable, chunksize, collector):
|
||||
apply_result._set_value(iter([]))
|
||||
return apply_result
|
||||
|
||||
def imap_unordered_async(self, func, iterable, chunksize=None,
|
||||
callback=None):
|
||||
"""A variant of the imap_unordered() method which returns an
|
||||
ApplyResult object that provides an iterator (next
|
||||
method(timeout) available).
|
||||
|
||||
If callback is specified then it should be a callable which
|
||||
accepts a single argument. When the resulting iterator becomes
|
||||
ready, callback is applied to it (unless the call
|
||||
failed). callback should complete immediately since otherwise
|
||||
the thread which handles the results will get blocked."""
|
||||
apply_result = ApplyResult(callback=callback)
|
||||
collector = UnorderedResultCollector(apply_result)
|
||||
if not self._create_sequences(func, iterable, chunksize, collector):
|
||||
apply_result._set_value(iter([]))
|
||||
return apply_result
|
||||
|
||||
def close(self):
|
||||
"""Prevents any more tasks from being submitted to the
|
||||
pool. Once all the tasks have been completed the worker
|
||||
processes will exit."""
|
||||
# No lock here. We assume it's sufficiently atomic...
|
||||
self._closed = True
|
||||
|
||||
def terminate(self):
|
||||
"""Stops the worker processes immediately without completing
|
||||
outstanding work. When the pool object is garbage collected
|
||||
terminate() will be called immediately."""
|
||||
self.close()
|
||||
self._tasks.cancel()
|
||||
|
||||
def join(self):
|
||||
"""Wait for the worker processes to exit. One must call
|
||||
close() or terminate() before using join()."""
|
||||
self._tasks.wait()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.join()
|
||||
|
||||
def __del__(self):
|
||||
self.terminate()
|
||||
self.join()
|
||||
|
||||
def _create_sequences(self, func, iterable, chunksize, collector):
|
||||
"""
|
||||
Create callable objects to process and pushes them on the
|
||||
work queue. Each work unit is meant to process a slice of
|
||||
iterable of size chunksize. If collector is specified, then
|
||||
the ApplyResult objects associated with the jobs will notify
|
||||
collector when their result becomes ready.
|
||||
|
||||
\return the list callable objects (basically: JobSequences)
|
||||
pushed onto the work queue
|
||||
"""
|
||||
assert not self._closed # No lock here. We assume it's atomic...
|
||||
it_ = iter(iterable)
|
||||
exit_loop = False
|
||||
sequences = []
|
||||
while not exit_loop:
|
||||
seq = []
|
||||
for _ in range(chunksize or 1):
|
||||
try:
|
||||
arg = next(it_)
|
||||
except StopIteration:
|
||||
exit_loop = True
|
||||
break
|
||||
apply_result = ApplyResult(collector)
|
||||
job = Job(func, (arg,), {}, apply_result)
|
||||
seq.append(job)
|
||||
if seq:
|
||||
sequences.append(JobSequence(seq))
|
||||
for t in sequences:
|
||||
self._tasks.run(t)
|
||||
return sequences
|
||||
|
||||
|
||||
class Job:
|
||||
"""A work unit that corresponds to the execution of a single function"""
|
||||
|
||||
def __init__(self, func, args, kwds, apply_result):
|
||||
"""
|
||||
\param func/args/kwds used to call the function
|
||||
\param apply_result ApplyResult object that holds the result
|
||||
of the function call
|
||||
"""
|
||||
self._func = func
|
||||
self._args = args
|
||||
self._kwds = kwds
|
||||
self._result = apply_result
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
Call the function with the args/kwds and tell the ApplyResult
|
||||
that its result is ready. Correctly handles the exceptions
|
||||
happening during the execution of the function
|
||||
"""
|
||||
try:
|
||||
result = self._func(*self._args, **self._kwds)
|
||||
except:
|
||||
self._result._set_exception()
|
||||
else:
|
||||
self._result._set_value(result)
|
||||
|
||||
|
||||
class JobSequence:
|
||||
"""A work unit that corresponds to the processing of a continuous
|
||||
sequence of Job objects"""
|
||||
|
||||
def __init__(self, jobs):
|
||||
self._jobs = jobs
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
Call all the Job objects that have been specified
|
||||
"""
|
||||
for job in self._jobs:
|
||||
job()
|
||||
|
||||
|
||||
class ApplyResult(object):
|
||||
"""An object associated with a Job object that holds its result:
|
||||
it's available during the whole life the Job and after, even when
|
||||
the Job didn't process yet. It's possible to use this object to
|
||||
wait for the result/exception of the job to be available.
|
||||
|
||||
The result objects returns by the Pool::*_async() methods are of
|
||||
this type"""
|
||||
|
||||
def __init__(self, collector=None, callback=None):
|
||||
"""
|
||||
\param collector when not None, the notify_ready() method of
|
||||
the collector will be called when the result from the Job is
|
||||
ready
|
||||
\param callback when not None, function to call when the
|
||||
result becomes available (this is the parameter passed to the
|
||||
Pool::*_async() methods.
|
||||
"""
|
||||
self._success = False
|
||||
self._event = threading.Event()
|
||||
self._data = None
|
||||
self._collector = None
|
||||
self._callback = callback
|
||||
|
||||
if collector is not None:
|
||||
collector.register_result(self)
|
||||
self._collector = collector
|
||||
|
||||
def get(self, timeout=None):
|
||||
"""
|
||||
Returns the result when it arrives. If timeout is not None and
|
||||
the result does not arrive within timeout seconds then
|
||||
TimeoutError is raised. If the remote call raised an exception
|
||||
then that exception will be reraised by get().
|
||||
"""
|
||||
if not self.wait(timeout):
|
||||
raise TimeoutError("Result not available within %fs" % timeout)
|
||||
if self._success:
|
||||
return self._data
|
||||
if sys.version_info[0] == 3:
|
||||
raise self._data[0](self._data[1]).with_traceback(self._data[2])
|
||||
else:
|
||||
exec("raise self._data[0], self._data[1], self._data[2]")
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits until the result is available or until timeout
|
||||
seconds pass."""
|
||||
self._event.wait(timeout)
|
||||
return self._event.isSet()
|
||||
|
||||
def ready(self):
|
||||
"""Returns whether the call has completed."""
|
||||
return self._event.isSet()
|
||||
|
||||
def successful(self):
|
||||
"""Returns whether the call completed without raising an
|
||||
exception. Will raise AssertionError if the result is not
|
||||
ready."""
|
||||
assert self.ready()
|
||||
return self._success
|
||||
|
||||
def _set_value(self, value):
|
||||
"""Called by a Job object to tell the result is ready, and
|
||||
provides the value of this result. The object will become
|
||||
ready and successful. The collector's notify_ready() method
|
||||
will be called, and the callback method too"""
|
||||
assert not self.ready()
|
||||
self._data = value
|
||||
self._success = True
|
||||
self._event.set()
|
||||
if self._collector is not None:
|
||||
self._collector.notify_ready(self)
|
||||
if self._callback is not None:
|
||||
try:
|
||||
self._callback(value)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
def _set_exception(self):
|
||||
"""Called by a Job object to tell that an exception occurred
|
||||
during the processing of the function. The object will become
|
||||
ready but not successful. The collector's notify_ready()
|
||||
method will be called, but NOT the callback method"""
|
||||
# traceback.print_exc()
|
||||
assert not self.ready()
|
||||
self._data = sys.exc_info()
|
||||
self._success = False
|
||||
self._event.set()
|
||||
if self._collector is not None:
|
||||
self._collector.notify_ready(self)
|
||||
|
||||
|
||||
class AbstractResultCollector(object):
|
||||
"""ABC to define the interface of a ResultCollector object. It is
|
||||
basically an object which knows whuich results it's waiting for,
|
||||
and which is able to get notify when they get available. It is
|
||||
also able to provide an iterator over the results when they are
|
||||
available"""
|
||||
|
||||
def __init__(self, to_notify):
|
||||
"""
|
||||
\param to_notify ApplyResult object to notify when all the
|
||||
results we're waiting for become available. Can be None.
|
||||
"""
|
||||
self._to_notify = to_notify
|
||||
|
||||
def register_result(self, apply_result):
|
||||
"""Used to identify which results we're waiting for. Will
|
||||
always be called BEFORE the Jobs get submitted to the work
|
||||
queue, and BEFORE the __iter__ and _get_result() methods can
|
||||
be called
|
||||
\param apply_result ApplyResult object to add in our collection
|
||||
"""
|
||||
raise NotImplementedError("Children classes must implement it")
|
||||
|
||||
def notify_ready(self, apply_result):
|
||||
"""Called by the ApplyResult object (already registered via
|
||||
register_result()) that it is now ready (ie. the Job's result
|
||||
is available or an exception has been raised).
|
||||
\param apply_result ApplyResult object telling us that the job
|
||||
has been processed
|
||||
"""
|
||||
raise NotImplementedError("Children classes must implement it")
|
||||
|
||||
def _get_result(self, idx, timeout=None):
|
||||
"""Called by the CollectorIterator object to retrieve the
|
||||
result's values one after another (order defined by the
|
||||
implementation)
|
||||
\param idx The index of the result we want, wrt collector's order
|
||||
\param timeout integer telling how long to wait (in seconds)
|
||||
for the result at index idx to be available, or None (wait
|
||||
forever)
|
||||
"""
|
||||
raise NotImplementedError("Children classes must implement it")
|
||||
|
||||
def __iter__(self):
|
||||
"""Return a new CollectorIterator object for this collector"""
|
||||
return CollectorIterator(self)
|
||||
|
||||
|
||||
class CollectorIterator(object):
|
||||
"""An iterator that allows to iterate over the result values
|
||||
available in the given collector object. Equipped with an extended
|
||||
next() method accepting a timeout argument. Created by the
|
||||
AbstractResultCollector::__iter__() method"""
|
||||
|
||||
def __init__(self, collector):
|
||||
"""\param AbstractResultCollector instance"""
|
||||
self._collector = collector
|
||||
self._idx = 0
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def next(self, timeout=None):
|
||||
"""Return the next result value in the sequence. Raise
|
||||
StopIteration at the end. Can raise the exception raised by
|
||||
the Job"""
|
||||
try:
|
||||
apply_result = self._collector._get_result(self._idx, timeout)
|
||||
except IndexError:
|
||||
# Reset for next time
|
||||
self._idx = 0
|
||||
raise StopIteration
|
||||
except:
|
||||
self._idx = 0
|
||||
raise
|
||||
self._idx += 1
|
||||
assert apply_result.ready()
|
||||
return apply_result.get(0)
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
|
||||
|
||||
class UnorderedResultCollector(AbstractResultCollector):
|
||||
"""An AbstractResultCollector implementation that collects the
|
||||
values of the ApplyResult objects in the order they become ready. The
|
||||
CollectorIterator object returned by __iter__() will iterate over
|
||||
them in the order they become ready"""
|
||||
|
||||
def __init__(self, to_notify=None):
|
||||
"""
|
||||
\param to_notify ApplyResult object to notify when all the
|
||||
results we're waiting for become available. Can be None.
|
||||
"""
|
||||
AbstractResultCollector.__init__(self, to_notify)
|
||||
self._cond = threading.Condition()
|
||||
self._collection = []
|
||||
self._expected = 0
|
||||
|
||||
def register_result(self, apply_result):
|
||||
"""Used to identify which results we're waiting for. Will
|
||||
always be called BEFORE the Jobs get submitted to the work
|
||||
queue, and BEFORE the __iter__ and _get_result() methods can
|
||||
be called
|
||||
\param apply_result ApplyResult object to add in our collection
|
||||
"""
|
||||
self._expected += 1
|
||||
|
||||
def _get_result(self, idx, timeout=None):
|
||||
"""Called by the CollectorIterator object to retrieve the
|
||||
result's values one after another, in the order the results have
|
||||
become available.
|
||||
\param idx The index of the result we want, wrt collector's order
|
||||
\param timeout integer telling how long to wait (in seconds)
|
||||
for the result at index idx to be available, or None (wait
|
||||
forever)
|
||||
"""
|
||||
self._cond.acquire()
|
||||
try:
|
||||
if idx >= self._expected:
|
||||
raise IndexError
|
||||
elif idx < len(self._collection):
|
||||
return self._collection[idx]
|
||||
elif idx != len(self._collection):
|
||||
# Violation of the sequence protocol
|
||||
raise IndexError()
|
||||
else:
|
||||
self._cond.wait(timeout=timeout)
|
||||
try:
|
||||
return self._collection[idx]
|
||||
except IndexError:
|
||||
# Still not added !
|
||||
raise TimeoutError("Timeout while waiting for results")
|
||||
finally:
|
||||
self._cond.release()
|
||||
|
||||
def notify_ready(self, apply_result=None):
|
||||
"""Called by the ApplyResult object (already registered via
|
||||
register_result()) that it is now ready (ie. the Job's result
|
||||
is available or an exception has been raised).
|
||||
\param apply_result ApplyResult object telling us that the job
|
||||
has been processed
|
||||
"""
|
||||
first_item = False
|
||||
self._cond.acquire()
|
||||
try:
|
||||
self._collection.append(apply_result)
|
||||
first_item = (len(self._collection) == 1)
|
||||
|
||||
self._cond.notifyAll()
|
||||
finally:
|
||||
self._cond.release()
|
||||
|
||||
if first_item and self._to_notify is not None:
|
||||
self._to_notify._set_value(iter(self))
|
||||
|
||||
|
||||
class OrderedResultCollector(AbstractResultCollector):
|
||||
"""An AbstractResultCollector implementation that collects the
|
||||
values of the ApplyResult objects in the order they have been
|
||||
submitted. The CollectorIterator object returned by __iter__()
|
||||
will iterate over them in the order they have been submitted"""
|
||||
|
||||
def __init__(self, to_notify=None, as_iterator=True):
|
||||
"""
|
||||
\param to_notify ApplyResult object to notify when all the
|
||||
results we're waiting for become available. Can be None.
|
||||
\param as_iterator boolean telling whether the result value
|
||||
set on to_notify should be an iterator (available as soon as 1
|
||||
result arrived) or a list (available only after the last
|
||||
result arrived)
|
||||
"""
|
||||
AbstractResultCollector.__init__(self, to_notify)
|
||||
self._results = []
|
||||
self._lock = threading.Lock()
|
||||
self._remaining = 0
|
||||
self._as_iterator = as_iterator
|
||||
|
||||
def register_result(self, apply_result):
|
||||
"""Used to identify which results we're waiting for. Will
|
||||
always be called BEFORE the Jobs get submitted to the work
|
||||
queue, and BEFORE the __iter__ and _get_result() methods can
|
||||
be called
|
||||
\param apply_result ApplyResult object to add in our collection
|
||||
"""
|
||||
self._results.append(apply_result)
|
||||
self._remaining += 1
|
||||
|
||||
def _get_result(self, idx, timeout=None):
|
||||
"""Called by the CollectorIterator object to retrieve the
|
||||
result's values one after another (order defined by the
|
||||
implementation)
|
||||
\param idx The index of the result we want, wrt collector's order
|
||||
\param timeout integer telling how long to wait (in seconds)
|
||||
for the result at index idx to be available, or None (wait
|
||||
forever)
|
||||
"""
|
||||
res = self._results[idx]
|
||||
res.wait(timeout)
|
||||
return res
|
||||
|
||||
def notify_ready(self, apply_result):
|
||||
"""Called by the ApplyResult object (already registered via
|
||||
register_result()) that it is now ready (ie. the Job's result
|
||||
is available or an exception has been raised).
|
||||
\param apply_result ApplyResult object telling us that the job
|
||||
has been processed
|
||||
"""
|
||||
got_first = False
|
||||
got_last = False
|
||||
self._lock.acquire()
|
||||
try:
|
||||
assert self._remaining > 0
|
||||
got_first = (len(self._results) == self._remaining)
|
||||
self._remaining -= 1
|
||||
got_last = (self._remaining == 0)
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
if self._to_notify is not None:
|
||||
if self._as_iterator and got_first:
|
||||
self._to_notify._set_value(iter(self))
|
||||
elif not self._as_iterator and got_last:
|
||||
try:
|
||||
lst = [r.get(0) for r in self._results]
|
||||
except:
|
||||
self._to_notify._set_exception()
|
||||
else:
|
||||
self._to_notify._set_value(lst)
|
195
cs440-acg/ext/tbb/python/tbb/test.py
Normal file
195
cs440-acg/ext/tbb/python/tbb/test.py
Normal file
@@ -0,0 +1,195 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) 2016-2020 Intel Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Based on the software developed by:
|
||||
# Copyright (c) 2008,2016 david decotigny (Pool of threads)
|
||||
# Copyright (c) 2006-2008, R Oudkerk (multiprocessing.Pool)
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions
|
||||
# are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
# 2. Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in the
|
||||
# documentation and/or other materials provided with the distribution.
|
||||
# 3. Neither the name of author nor the names of any contributors may be
|
||||
# used to endorse or promote products derived from this software
|
||||
# without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
|
||||
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
||||
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
||||
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
||||
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
||||
# SUCH DAMAGE.
|
||||
#
|
||||
|
||||
from __future__ import print_function
|
||||
import time
|
||||
import threading
|
||||
|
||||
from .api import *
|
||||
from .pool import *
|
||||
|
||||
|
||||
def test(arg=None):
|
||||
if arg == "-v":
|
||||
def say(*x):
|
||||
print(*x)
|
||||
else:
|
||||
def say(*x):
|
||||
pass
|
||||
say("Start Pool testing")
|
||||
|
||||
get_tid = lambda: threading.current_thread().ident
|
||||
|
||||
def return42():
|
||||
return 42
|
||||
|
||||
def f(x):
|
||||
return x * x
|
||||
|
||||
def work(mseconds):
|
||||
res = str(mseconds)
|
||||
if mseconds < 0:
|
||||
mseconds = -mseconds
|
||||
say("[%d] Start to work for %fms..." % (get_tid(), mseconds*10))
|
||||
time.sleep(mseconds/100.)
|
||||
say("[%d] Work done (%fms)." % (get_tid(), mseconds*10))
|
||||
return res
|
||||
|
||||
### Test copy/pasted from multiprocessing
|
||||
pool = Pool(4) # start worker threads
|
||||
|
||||
# edge cases
|
||||
assert pool.map(return42, []) == []
|
||||
assert pool.apply_async(return42, []).get() == 42
|
||||
assert pool.apply(return42, []) == 42
|
||||
assert list(pool.imap(return42, iter([]))) == []
|
||||
assert list(pool.imap_unordered(return42, iter([]))) == []
|
||||
assert pool.map_async(return42, []).get() == []
|
||||
assert list(pool.imap_async(return42, iter([])).get()) == []
|
||||
assert list(pool.imap_unordered_async(return42, iter([])).get()) == []
|
||||
|
||||
# basic tests
|
||||
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
|
||||
assert result.get(timeout=1) == 100 # ... unless slow computer
|
||||
assert list(pool.map(f, range(10))) == list(map(f, range(10)))
|
||||
it = pool.imap(f, range(10))
|
||||
assert next(it) == 0
|
||||
assert next(it) == 1
|
||||
assert next(it) == 4
|
||||
|
||||
# Test apply_sync exceptions
|
||||
result = pool.apply_async(time.sleep, (3,))
|
||||
try:
|
||||
say(result.get(timeout=1)) # raises `TimeoutError`
|
||||
except TimeoutError:
|
||||
say("Good. Got expected timeout exception.")
|
||||
else:
|
||||
assert False, "Expected exception !"
|
||||
assert result.get() is None # sleep() returns None
|
||||
|
||||
def cb(s):
|
||||
say("Result ready: %s" % s)
|
||||
|
||||
# Test imap()
|
||||
assert list(pool.imap(work, range(10, 3, -1), chunksize=4)) == list(map(
|
||||
str, range(10, 3, -1)))
|
||||
|
||||
# Test imap_unordered()
|
||||
assert sorted(pool.imap_unordered(work, range(10, 3, -1))) == sorted(map(
|
||||
str, range(10, 3, -1)))
|
||||
|
||||
# Test map_async()
|
||||
result = pool.map_async(work, range(10), callback=cb)
|
||||
try:
|
||||
result.get(timeout=0.01) # raises `TimeoutError`
|
||||
except TimeoutError:
|
||||
say("Good. Got expected timeout exception.")
|
||||
else:
|
||||
assert False, "Expected exception !"
|
||||
say(result.get())
|
||||
|
||||
# Test imap_async()
|
||||
result = pool.imap_async(work, range(3, 10), callback=cb)
|
||||
try:
|
||||
result.get(timeout=0.01) # raises `TimeoutError`
|
||||
except TimeoutError:
|
||||
say("Good. Got expected timeout exception.")
|
||||
else:
|
||||
assert False, "Expected exception !"
|
||||
for i in result.get():
|
||||
say("Item:", i)
|
||||
say("### Loop again:")
|
||||
for i in result.get():
|
||||
say("Item2:", i)
|
||||
|
||||
# Test imap_unordered_async()
|
||||
result = pool.imap_unordered_async(work, range(10, 3, -1), callback=cb)
|
||||
try:
|
||||
say(result.get(timeout=0.01)) # raises `TimeoutError`
|
||||
except TimeoutError:
|
||||
say("Good. Got expected timeout exception.")
|
||||
else:
|
||||
assert False, "Expected exception !"
|
||||
for i in result.get():
|
||||
say("Item1:", i)
|
||||
for i in result.get():
|
||||
say("Item2:", i)
|
||||
r = result.get()
|
||||
for i in r:
|
||||
say("Item3:", i)
|
||||
for i in r:
|
||||
say("Item4:", i)
|
||||
for i in r:
|
||||
say("Item5:", i)
|
||||
|
||||
#
|
||||
# The case for the exceptions
|
||||
#
|
||||
|
||||
# Exceptions in imap_unordered_async()
|
||||
result = pool.imap_unordered_async(work, range(2, -10, -1), callback=cb)
|
||||
time.sleep(3)
|
||||
try:
|
||||
for i in result.get():
|
||||
say("Got item:", i)
|
||||
except (IOError, ValueError):
|
||||
say("Good. Got expected exception")
|
||||
|
||||
# Exceptions in imap_async()
|
||||
result = pool.imap_async(work, range(2, -10, -1), callback=cb)
|
||||
time.sleep(3)
|
||||
try:
|
||||
for i in result.get():
|
||||
say("Got item:", i)
|
||||
except (IOError, ValueError):
|
||||
say("Good. Got expected exception")
|
||||
|
||||
# Stop the test: need to stop the pool !!!
|
||||
pool.terminate()
|
||||
pool.join()
|
||||
|
||||
|
Reference in New Issue
Block a user