{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "

Table of Contents

\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 01 - Introduction to joblib\n", "\n", "This notebook shows how to use joblib to execute jobs on multiple threads or processes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "need to install two packages\n", "\n", " pip install contextttimer\n", "\n", " conda install joblib" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from IPython.display import Image\n", "import contexttimer\n", "import time\n", "import math\n", "from numba import jit\n", "import multiprocessing\n", "import threading\n", "from joblib import Parallel\n", "import logging" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a process or thread pool with joblib\n", "\n", "[joblib](https://pythonhosted.org/joblib/index.html) Provides the best way to run naively parallel jobs on multiple threads or processes in python.\n", "\n", "* It integrates seamlessly with [dask](http://distributed.readthedocs.io/en/latest/joblib.html)\n", " and [scikit-learn](http://scikit-learn.org/stable/modules/model_persistence.html)\n", " \n", "* It is undergoing rapid development: e.g. [loky](https://github.com/tomMoral/loky)\n", "\n", "* To use it, create a Parallel object that runs a list of functions, where each function is part of a tuple that specifies the arguments and keywords (if any)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Running a threadsafe function " ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "@jit('float64(int64)', nopython=True, nogil=True) #release the GIL!\n", "def wait_loop_nogil(n):\n", " \"\"\"\n", " Function under test.\n", " \"\"\"\n", " for m in range(n):\n", " for l in range(m):\n", " for j in range(l):\n", " for i in range(j):\n", " i=i+4\n", " out=math.sqrt(i)\n", " out=out**2.\n", " return out" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "@jit('float64(int64)', nopython=True, nogil=False) #hold the GIL\n", "def wait_loop_withgil(n):\n", " \"\"\"\n", " Function under test.\n", " \"\"\"\n", " for m in range(n):\n", " for l in range(m):\n", " for j in range(l):\n", " for i in range(j):\n", " i=i+4\n", " out=math.sqrt(i)\n", " out=out**2.\n", " return out" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup logging so we can know what process and thread we are running" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "logging.basicConfig(level=logging.DEBUG,\n", " format='%(message)s %(threadName)s %(processName)s',\n", " )\n", "\n", "def find_ids():\n", " logging.debug('debug logging: ')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create two functions, one to print thread and process ids, and one to run the wait_for loop\n", "\n", "* Important point -- the logging module is **threadsafe**\n", "\n", "* Submit 6 jobs queued on 3 processors" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[(CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {}), (CPUDispatcher(), [1250], {})]\n" ] } ], "source": [ "njobs=12\n", "nprocs=3\n", "thread_id_jobs =[(find_ids,[],{}) for i in range(nprocs)]\n", "nloops=1250\n", "calc_jobs=[(wait_loop_nogil,[nloops],{}) for i in range(njobs)]\n", "print(calc_jobs)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]\n", "wall time 4.717013486020733 and cpu time 13.340068\n" ] } ], "source": [ "with contexttimer.Timer(time.perf_counter) as wall:\n", " with contexttimer.Timer(time.process_time) as cpu:\n", " with Parallel(n_jobs=nprocs,backend='threading') as parallel:\n", " #parallel(thread_id_jobs)\n", " results=parallel(calc_jobs)\n", " print(results)\n", "print(f'wall time {wall.elapsed} and cpu time {cpu.elapsed}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Each job was run on a different thread but in the same process\n", "\n", "* Note that the cpu time is larger than the wall time, confirming that we've release the GIL.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now repeat this holding the GIL\n", "\n", "Create a new set of jobs that hold the GIL" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]\n", "wall time 9.180190951010445 and cpu time 9.024569\n" ] } ], "source": [ "calc_jobs=[(wait_loop_withgil,[nloops],{}) for i in range(njobs)]\n", "with contexttimer.Timer(time.perf_counter) as wall:\n", " with contexttimer.Timer(time.process_time) as cpu:\n", " with Parallel(n_jobs=nprocs,backend='threading') as parallel:\n", " #parallel(thread_id_jobs)\n", " results=parallel(calc_jobs)\n", " print(results)\n", "print(f'wall time {wall.elapsed} and cpu time {cpu.elapsed}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "** Note that the speed is the same as if we ran on a single CPU **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now repeat with processes instead of threads" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0, 1250.0]\n", "wall time 5.1078909049974754 and cpu time 0.0879790000000007\n" ] } ], "source": [ "calc_jobs=[(wait_loop_withgil,[nloops],{}) for i in range(njobs)]\n", "with contexttimer.Timer(time.perf_counter) as wall:\n", " with contexttimer.Timer(time.process_time) as cpu:\n", " with Parallel(n_jobs=nprocs,backend='multiprocessing') as parallel:\n", " #parallel(thread_id_jobs)\n", " results=parallel(calc_jobs)\n", " print(results)\n", "print(f'wall time {wall.elapsed} and cpu time {cpu.elapsed}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "** how do you explain the tiny cpu time? **" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Summary\n", "\n", "1. For simple functions without Python code, Numba can release the GIL and you can get the benefit of multiple threads\n", "\n", "1. The joblib library can be used to queue dozens of jobs onto a specified number of processes or threads\n", "\n", "1. A process pool can execute pure python routines, but all data has to be copied to and from each process." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4" }, "toc": { "nav_menu": {}, "number_sections": true, "sideBar": false, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": true, "toc_position": {}, "toc_section_display": "block", "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }