Table of Contents

  • 1  01 - Introduction to joblib

    • 1.1  Creating a process or thread pool with joblib

      • 1.1.1  Running a threadsafe function

      • 1.1.2  Setup logging so we can know what process and thread we are running

      • 1.1.3  Create two functions, one to print thread and process ids, and one to run the wait_for loop

      • 1.1.4  Now repeat this holding the GIL

      • 1.1.5  Now repeat with processes instead of threads

      • 1.1.6  Summary

01 - Introduction to joblib

This notebook shows how to use joblib to execute jobs on multiple threads or processes

need to install two packages

pip install contextttimer

conda install joblib
[1]:
from IPython.display import Image
import contexttimer
import time
import math
from numba import jit
import multiprocessing
import threading
from joblib import Parallel
import logging

Creating a process or thread pool with joblib

joblib Provides the best way to run naively parallel jobs on multiple threads or processes in python.

  • It integrates seamlessly with dask and scikit-learn
  • It is undergoing rapid development: e.g. loky
  • To use it, create a Parallel object that runs a list of functions, where each function is part of a tuple that specifies the arguments and keywords (if any)

Running a threadsafe function

[2]:
@jit('float64(int64)', nopython=True, nogil=True)  #release the GIL!
def wait_loop_nogil(n):
    """
    Function under test.
    """
    for m in range(n):
        for l in range(m):
            for j in range(l):
                for i in range(j):
                    i=i+4
                    out=math.sqrt(i)
                    out=out**2.
    return out
[3]:
@jit('float64(int64)', nopython=True, nogil=False) #hold the GIL
def wait_loop_withgil(n):
    """
    Function under test.
    """
    for m in range(n):
        for l in range(m):
            for j in range(l):
                for i in range(j):
                    i=i+4
                    out=math.sqrt(i)
                    out=out**2.
    return out

Setup logging so we can know what process and thread we are running

[4]:
logging.basicConfig(level=logging.DEBUG,
                    format='%(message)s %(threadName)s %(processName)s',
                    )

def find_ids():
    logging.debug('debug logging: ')

Create two functions, one to print thread and process ids, and one to run the wait_for loop

  • Important point – the logging module is threadsafe
  • Submit 6 jobs queued on 3 processors
[5]:
njobs=12
nprocs=3
thread_id_jobs =[(find_ids,[],{}) for i in range(nprocs)]
nloops=1250
calc_jobs=[(wait_loop_nogil,[nloops],{}) for i in range(njobs)]
print(calc_jobs)
[(CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {}), (CPUDispatcher(<function wait_loop_nogil at 0x10ba8b9d8>), [1250], {})]
[6]:
with contexttimer.Timer(time.perf_counter) as wall:
    with contexttimer.Timer(time.process_time) as cpu:
        with Parallel(n_jobs=nprocs,backend='threading') as parallel:
            #parallel(thread_id_jobs)
            results=parallel(calc_jobs)
        print(results)
print(f'wall time {wall.elapsed} and cpu time {cpu.elapsed}')
[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]
wall time 4.717013486020733 and cpu time 13.340068
  • Each job was run on a different thread but in the same process
  • Note that the cpu time is larger than the wall time, confirming that we’ve release the GIL.

Now repeat this holding the GIL

Create a new set of jobs that hold the GIL

[7]:
calc_jobs=[(wait_loop_withgil,[nloops],{}) for i in range(njobs)]
with contexttimer.Timer(time.perf_counter) as wall:
    with contexttimer.Timer(time.process_time) as cpu:
        with Parallel(n_jobs=nprocs,backend='threading') as parallel:
            #parallel(thread_id_jobs)
            results=parallel(calc_jobs)
        print(results)
print(f'wall time {wall.elapsed} and cpu time {cpu.elapsed}')
[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]
wall time 9.180190951010445 and cpu time 9.024569

** Note that the speed is the same as if we ran on a single CPU **

Now repeat with processes instead of threads

[8]:
calc_jobs=[(wait_loop_withgil,[nloops],{}) for i in range(njobs)]
with contexttimer.Timer(time.perf_counter) as wall:
    with contexttimer.Timer(time.process_time) as cpu:
        with Parallel(n_jobs=nprocs,backend='multiprocessing') as parallel:
            #parallel(thread_id_jobs)
            results=parallel(calc_jobs)
        print(results)
print(f'wall time {wall.elapsed} and cpu time {cpu.elapsed}')
[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]
wall time 5.1078909049974754 and cpu time 0.0879790000000007

** how do you explain the tiny cpu time? **

Summary

  1. For simple functions without Python code, Numba can release the GIL and you can get the benefit of multiple threads
  2. The joblib library can be used to queue dozens of jobs onto a specified number of processes or threads
  3. A process pool can execute pure python routines, but all data has to be copied to and from each process.
[ ]: