mapper - Parallel processing implementations

BaseMapper

MPIMapper

MPMapper

SerialMapper

ThreadPoolMapper

Thread-based parallel mapper using concurrent.futures.ThreadPoolExecutor.

can_pickle

Returns True if problem can be pickled.

cpu_id

Return the processor id for the currently running process.

nice

pool_size

Get the number of cpus available for processing, or use the number provided.

setpriority

Set The Priority of a Windows Process.

show_performance

timestamps is a series of pairs (tstart, tstop) before and after the synchronous map call, with times in nanoseconds returned from time.perf_counter_ns().

using_mpi

Parallel and serial mapper implementations.

The API is a bit crufty since interprocess communication has evolved from the original implementation. And the names are misleading.

Available mappers: - SerialMapper: Single-threaded execution - MPMapper: Multi-process execution using multiprocessing - ThreadPoolMapper: Multi-threaded execution using ThreadPoolExecutor - MPIMapper: MPI-based distributed execution across cluster nodes

Usage:

Mapper.start_worker(problem)
mapper = Mapper.start_mapper(problem, None, cpus)
result = mapper(points)
...
mapper = Mapper.start_mapper(problem, None, cpus)
result = mapper(points)
Mapper.stop_mapper()
class bumps.mapper.BaseMapper[source]

Bases: object

has_problem = False
static start_mapper(problem, modelargs=None, cpus=0)[source]

Called with the problem on a new fit.

static start_worker(problem)[source]

Called with the problem to initialize the worker

static stop_mapper(mapper=None)[source]
class bumps.mapper.MPIMapper[source]

Bases: BaseMapper

has_problem = True

For MPIMapper only the worker is initialized with the fit problem.

static start_mapper(problem, modelargs=None, cpus=0)[source]

Called with the problem on a new fit.

static start_worker(problem)[source]

Start the worker process.

For the main process this does nothing and returns immediately. The worker processes never return.

Each worker sits in a loop waiting for the next batch of points for the problem, or for the next problem. Set t problem is set to None, then exit the process and never

static stop_mapper(mapper=None)[source]
timestamps = []
class bumps.mapper.MPMapper[source]

Bases: BaseMapper

has_problem = False
manager = None
pool = None
problem = None
problem_id = 0
shared_pickled_problem = None
static start_mapper(problem, modelargs=None, cpus=0)[source]

Called with the problem on a new fit.

static start_worker(problem)[source]

Called with the problem to initialize the worker

static stop_mapper(mapper=None)[source]
timestamps = []
class bumps.mapper.SerialMapper[source]

Bases: BaseMapper

has_problem = False
static start_mapper(problem, modelargs=None, cpus=0)[source]

Called with the problem on a new fit.

static start_worker(problem)[source]

Called with the problem to initialize the worker

static stop_mapper(mapper=None)[source]
timestamps = []
class bumps.mapper.ThreadPoolMapper[source]

Bases: BaseMapper

Thread-based parallel mapper using concurrent.futures.ThreadPoolExecutor.

Each thread maintains its own copy of the problem object for independent calculations of nllf.

This mapper will only be efficient when using a free-threaded python interpreter (otherwise the GIL will prevent true parallelism).

has_problem = False
pool = None
problem_id = 0
static start_mapper(problem, modelargs=None, cpus=0)[source]

Called with the problem on a new fit.

static start_worker(problem)[source]

Called with the problem to initialize the worker

static stop_mapper(mapper=None)[source]
timestamps = []
bumps.mapper.can_pickle(problem, check=False)[source]

Returns True if problem can be pickled.

If this method returns False then MPMapper cannot be used and SerialMapper should be used instead.

If check is True then call nllf() on the duplicated object as a “smoke test” to verify that the function will run after copying. This is not foolproof. For example, access to a database may work in the duplicated object because the connection is open and available in the current process, but it will fail when trying to run on a remote machine.

bumps.mapper.cpu_id(num_sockets=2)[source]

Return the processor id for the currently running process.

bumps.mapper.nice()[source]
bumps.mapper.pool_size(cpus=0)[source]

Get the number of cpus available for processing, or use the number provided.

On linux, use os.sched_getaffinity to count the number of cpus allocated to the process rather than multiprocessing.cpu_count to return all processors on the system. This allows us to restrict the amount of parallelism to the number of cpus allocated by slurm when running on a compute cluster with a partial node.

bumps.mapper.setpriority(pid=None, priority=1)[source]

Set The Priority of a Windows Process. Priority is a value between 0-5 where 2 is normal priority and 5 is maximum. Default sets the priority of the current python process but can take any valid process ID.

bumps.mapper.show_performance(timestamps)[source]

timestamps is a series of pairs (tstart, tstop) before and after the synchronous map call, with times in nanoseconds returned from time.perf_counter_ns(). Display the median time within (tstop[k] - tstart[k]) and between (tstart[k+1] = tstop[k]) map calls.

bumps.mapper.using_mpi()[source]