Python Concurrency Deep Dive

Author

Andres Monge

Published

February 10, 2025

Performance analysis of Python’s threading, asyncio, multiprocessing , and Futures concurrency models on a CPU-bound problem (prime number generation).

Comparing efficiency with and without memoization* using timeit for rigorous benchmarking.

Memoization

Memoization is a technique used to improve the performance of a function by storing the results of previously calculated values and returning them when the same input is encountered again. This can help avoid redundant calculations and improve the efficiency of the function.

Using lru_cache is the easiest way to implement memoization in python, but it can be easily extended to other concurrency models. But note that it will only work on the second run. Using .cache_clear() will efectively reset the cache and memoization.

Common libraries and helper methods

We need to patch asyncio to allow nested use of asyncio.run and loop.run_until_complete, inside our quarto/Jupyter notebooks.

pip install nest_asyncio
Code
import nest_asyncio
nest_asyncio.apply()  # Patches asyncio for Jupyter compatibility
LINE_BREAK = "[BR"

Add some helper function with coloring to understand the results (console-only).

pip install colorama timeit
Code
import timeit
import inspect
from colorama import Fore, Style

def benchmark(func, *args) -> tuple[float, object]:
    """Times a function and returns time and result."""
    timer = timeit.Timer(lambda: func(*args))
    time_taken = timer.timeit(number=1)
    result = func(*args)
    return time_taken, result


def print_benchmark(results, method, name, *args) -> None:
    """Universal generic print function with dynamic argument naming"""
    time, result = benchmark(method, *args)
    results[method.__name__] = result

    # Get parameter names from method signature
    sig = inspect.signature(method)
    params = list(sig.parameters.keys())[:len(args)]  # Handle variable args
    
    # Build arguments description
    arg_descriptions = []
    for i, arg in enumerate(args):
        param_name = params[i] if i < len(params) else f"arg{i+1}"
        
        description = (
            f"{param_name}={len(arg)}"
            if isinstance(arg, list) else
            f"{param_name}={arg}"
        )
        arg_descriptions.append(description)

    # Join arg descriptions and limit to 30 characters
    joined_args = ', '.join(arg_descriptions)
    if len(joined_args) > 30:
        joined_args = joined_args[:27] + '...'

    # Construct dynamic output
    output = (
        f"{Fore.BLUE}{name:<24}{Style.RESET_ALL} "
        f"({Fore.GREEN}{joined_args}{Style.RESET_ALL}):"
        f"{Fore.LIGHTBLUE_EX}\t{time:.4f}s{Style.RESET_ALL}"
    )

    print(output, end=LINE_BREAK)

def title(text: str) -> None:
    """Quarto-compatible title with ANSI colors"""
    print(f"{Fore.LIGHTMAGENTA_EX}{text}{Style.RESET_ALL}", end=LINE_BREAK)

import multiprocessing

num_threads = multiprocessing.cpu_count() or 1
num_processes = multiprocessing.cpu_count()

Import the main libraries

Code
import threading
import asyncio
import multiprocessing
import concurrent.futures
import functools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

Test 1: Prime Number Generation (CPU-bound)

This challenge involves generating prime numbers up to a specified limit.
This is a CPU-bound task.

Code
from functools import lru_cache
import os


@lru_cache(maxsize=None)
def is_prime(n: int, divisor: int = 2) -> bool:
    """Check if a given integer is a prime number using optional memoization."""
    if n < 2:
        return False
    if divisor**2 > n:
        return True
    return False if n % divisor == 0 else is_prime(n, divisor + 1)


def prime_number_generator(limit: int):
    """Generates numbers to check for primality."""
    yield from range(2, limit + 1)


def generate_primes_sequential(limit: int) -> list[int]:
    """Sequentially."""
    return [num for num in prime_number_generator(limit) if is_prime(num)]


def generate_primes_threading(limit: int) -> list[int]:
    """Using threading, automatically utilizing all available CPU threads."""
    primes: list[int] = []
    lock = threading.Lock()

    def worker(start: int, end: int) -> None:
        """Worker function to check for primes in a range."""
        local_primes: list[int] = [
            num for num in range(start, end + 1) if is_prime(num)
        ]
        with lock:
            primes.extend(local_primes)

    # Divide the range into chunks for each thread
    chunk_size = (limit - 1) // num_threads
    threads: list[threading.Thread] = []
    for i in range(num_threads):
        start = 2 + i * chunk_size
        end = start + chunk_size - 1 if i < num_threads - 1 else limit
        thread = threading.Thread(target=worker, args=(start, end))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    primes.sort()
    return primes


def generate_primes_asyncio(limit: int) -> list[int]:
    """Using asyncio."""

    async def is_prime_async(num: int) -> bool:
        return is_prime(num)

    async def _async_gather_primes(limit: int):
        tasks = [is_prime_async(num) for num in prime_number_generator(limit)]
        results = await asyncio.gather(*tasks)
        primes = [num for num, is_prime in enumerate(results, 2) if is_prime]
        return primes

    loop = asyncio.get_event_loop()

    return loop.run_until_complete(_async_gather_primes(limit))


def generate_primes_multiprocessing(limit: int) -> list[int]:
    """Using multiprocessing."""

    with multiprocessing.Pool(processes=os.cpu_count() or 1) as pool:
        results = pool.map(is_prime, prime_number_generator(limit))
        return [num for num, prime in enumerate(results) if prime]


def generate_primes_concurrent_futures(limit: int) -> list[int]:
    """Using concurrent.futures."""
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = executor.map(is_prime, prime_number_generator(limit))
        return [num for num, prime in enumerate(results) if prime]

def gen_primes_benchmark():
    title("Primes Generator Benchmark")
    limit = int(3e5)
    results = {}

    print_benchmark(results, generate_primes_sequential, "Sequential", limit)
    print_benchmark(results, generate_primes_sequential, "Sequential Memoized", limit)
    is_prime.cache_clear()

    print_benchmark(results, generate_primes_threading, "Threading", limit)
    print_benchmark(results, generate_primes_threading, "Threading Memoized", limit)
    is_prime.cache_clear()

    print_benchmark(results, generate_primes_asyncio, "Asyncio", limit)
    print_benchmark(results, generate_primes_asyncio, "Asyncio Memoized", limit)
    is_prime.cache_clear()

    print_benchmark(results, generate_primes_multiprocessing, "Multiprocessing", limit)
    print_benchmark(results, generate_primes_multiprocessing, "Multiprocessing Memoized", limit)
    is_prime.cache_clear()

    print_benchmark(
        results, generate_primes_concurrent_futures, "Futures //3", limit // 3
    )
    print_benchmark(
        results, generate_primes_concurrent_futures, "Futures //3 Memoized", limit // 3
    )

gen_primes_benchmark()

Primes Generator Benchmark[BRSequential  (limit=300000): 4.5257s[BRSequential Memoized  (limit=300000): 0.0338s[BRThreading  (limit=300000): 4.4717s[BRThreading Memoized  (limit=300000): 0.0565s[BRAsyncio  (limit=300000): 6.4361s[BRAsyncio Memoized  (limit=300000): 1.7747s[BRMultiprocessing  (limit=300000): 1.3501s[BRMultiprocessing Memoized (limit=300000): 1.4462s[BRFutures //3  (limit=100000): 8.4532s[BRFutures //3 Memoized  (limit=100000): 8.7006s[BR:::

Test 2: Image Processing (I/O-bound)

You’ll need to install the Pillow library

pip install Pillow

This challenge involves processing a set of images (resizing, filtering, etc.). This is an I/O-bound task.

Download sample images

Code
import os
import requests
import tempfile

# Directory to save downloaded images
output_dir = tempfile.gettempdir()
os.makedirs(output_dir, exist_ok=True)

# URLs of sample images (replace with your own URLs or use these defaults)
image_urls = [
    "https://picsum.photos/1280/720?random",
    "https://picsum.photos/1920/1080?random",
    "https://picsum.photos/3840/2160?random",
    "https://picsum.photos/3840/2160?random",
]

def download_image(url: str, save_path: str) -> None:
    """Download an image from a URL and save it to the specified path."""
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(save_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)

def download_sample_images(urls: list[str], output_dir: str) -> None:
    """Download multiple images and save them to the output directory."""
    for i, url in enumerate(urls):
        image_name = f"image{i + 1}.jpg"
        save_path = os.path.join(output_dir, image_name)
        download_image(url, save_path)

def remove_sample_images(output_dir: str) -> None:
    """Remove sample images from the output directory."""
    for file in os.listdir(output_dir):
        file_path = os.path.join(output_dir, file)
        if os.path.isfile(file_path):
            os.remove(file_path)

remove_sample_images(output_dir)
download_sample_images(image_urls, output_dir)
Code
from PIL import Image
from pathlib import Path
import tempfile

def process_image(image_path: str, output_dir: str) -> None:
    """Process an image by resizing it to 128x128 pixels."""
    with Image.open(image_path) as img:
        img = img.resize((128, 128))
        output_path = os.path.join(output_dir, os.path.basename(image_path))
        img.save(output_path)

def process_images_sequential(image_paths: list[str], output_dir: str) -> None:
    """Process images sequentially."""
    for image_path in image_paths:
        process_image(image_path, output_dir)

def process_images_threading(image_paths: list[str], output_dir: str) -> None:
    """Process images using threading."""
    with ThreadPoolExecutor(max_workers=os.cpu_count() or 1) as executor:
        executor.map(lambda path: process_image(path, output_dir), image_paths)

def process_images_asyncio(image_paths: list[str], output_dir: str) -> None:
    """Process images using asyncio."""
    async def process_image_async(image_path: str) -> None:
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, process_image, image_path, output_dir)

    async def _async_gather_images(image_paths: list[str]) -> None:
        tasks = [process_image_async(image_path) for image_path in image_paths]
        await asyncio.gather(*tasks)

    loop = asyncio.get_event_loop()
    return loop.run_until_complete(_async_gather_images(image_paths))

def process_images_multiprocessing(image_paths: list[str], output_dir: str) -> None:
    """Process images using multiprocessing."""
    process_image_partial = functools.partial(process_image, output_dir=output_dir) #create a partially applied function
    with multiprocessing.Pool(processes=os.cpu_count() or 1) as pool:
        pool.map(process_image_partial, image_paths) # now only image_path is passed to process_image

def benchmark_image_processing():
    """Benchmark image processing using different concurrency models."""
    image_paths = [
        Path(output_dir) / i for i in [
            "image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"
        ]
    ]

    # Check for image existence BEFORE the loop
    for image_path in image_paths:
        if not image_path.exists():
            raise FileNotFoundError(f"Sample image {image_path} not found.")

    results = {}
    title("Image Processing Benchmark")

    with tempfile.TemporaryDirectory() as temp_dir:  # Use tempfile.TemporaryDirectory
        print_benchmark(results, process_images_sequential, "Sequential", image_paths, temp_dir)
        print_benchmark(results, process_images_threading, "Threading", image_paths, temp_dir)
        print_benchmark(results, process_images_asyncio, "Asyncio", image_paths, temp_dir)
        print_benchmark(results, process_images_multiprocessing, "Multiprocessing", image_paths, temp_dir)

benchmark_image_processing()

Image Processing Benchmark[BRSequential  (image_paths=4, output_dir=/…): 0.1406s[BRThreading  (image_paths=4, output_dir=/…): 0.0817s[BRAsyncio  (image_paths=4, output_dir=/…): 0.0698s[BRMultiprocessing  (image_paths=4, output_dir=/…): 0.2791s[BR:::

Test 3: Matrix Multiplication (CPU-bound, Larger Scale)

You’ll need to install NumPy

pip install numpy

This challenge involves multiplying two large matrices.
This is a CPU-bound task.

Code
import numpy as np

def matrix_multiply_sequential(matrix1: np.ndarray, matrix2: np.ndarray) -> np.ndarray:
    """Sequential matrix multiplication using NumPy."""
    return np.dot(matrix1, matrix2)

def matrix_multiply_threading(
    matrix1: np.ndarray, matrix2: np.ndarray, num_threads: int
) -> np.ndarray:
    """Threaded matrix multiplication using a thread pool."""
    def worker(start: int, end: int, result: np.ndarray) -> None:
        """Worker function to compute a portion of the result matrix."""
        result[start:end] = np.dot(matrix1[start:end], matrix2)

    # Divide the work into chunks for each thread
    chunk_size = matrix1.shape[0] // num_threads
    threads = []
    result = np.zeros((matrix1.shape[0], matrix2.shape[1]))

    for i in range(num_threads):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_threads - 1 else matrix1.shape[0]
        thread = threading.Thread(target=worker, args=(start, end, result))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    return result

def matrix_multiply_asyncio(matrix1: np.ndarray, matrix2: np.ndarray) -> np.ndarray:
    """Asyncio-based matrix multiplication (using synchronous NumPy)."""

    async def _inner_matrix_multiply_asyncio(
        matrix1: np.ndarray, matrix2: np.ndarray
    ) -> np.ndarray:
        async def worker(start: int, end: int, result: np.ndarray) -> None:
            """Worker function to compute a portion of the result matrix."""
            result[start:end] = np.dot(matrix1[start:end], matrix2)

        # Divide the work into chunks for each worker
        chunk_size = matrix1.shape[0] // num_threads
        tasks = []
        result = np.zeros((matrix1.shape[0], matrix2.shape[1]))

        for i in range(num_threads):
            start = i * chunk_size
            end = (i + 1) * chunk_size if i < num_threads - 1 else matrix1.shape[0]
            task = asyncio.create_task(worker(start, end, result))
            tasks.append(task)

        await asyncio.gather(*tasks)
        return result

    return asyncio.run(_inner_matrix_multiply_asyncio(matrix1, matrix2))

def matrix_multiply_multiprocessing(
    matrix1: np.ndarray, matrix2: np.ndarray
) -> np.ndarray:
    """Using multiprocessing."""
    with multiprocessing.Pool(processes=os.cpu_count() or 1) as pool:
        results = []
        chunk_size = matrix1.shape[0] // num_processes
        for i in range(num_processes):
            start = i * chunk_size
            end = (
                (i + 1) * chunk_size if i < num_processes - 1 else matrix1.shape[0]
            )

            result = pool.apply_async(np.dot, (matrix1[start:end], matrix2))
            results.append(result)
        pool.close()
        pool.join()
        return np.vstack([result.get() for result in results])

def benchmark_matrix_multiplication():
    """Benchmark matrix multiplication functions."""
    title("Matrix Multiplication Benchmark")
    matrix_size = int(3e3) # 3e4 slow :)
    matrix1 = np.random.rand(matrix_size, matrix_size)
    matrix2 = np.random.rand(matrix_size, matrix_size)
    results = {}

    print_benchmark(
        results, matrix_multiply_sequential, "Sequential", matrix1, matrix2
    )
    print_benchmark(
        results, matrix_multiply_threading, "Threading", matrix1, matrix2, num_threads
    )
    print_benchmark(results, matrix_multiply_asyncio, "Asyncio", matrix1, matrix2)
    print_benchmark(
        results, matrix_multiply_multiprocessing, "Multiprocessing", matrix1, matrix2
    )

benchmark_matrix_multiplication()

Matrix Multiplication Benchmark[BRSequential  (matrix1=[[0.93479083 0.9286…): 0.2084s[BRThreading  (matrix1=[[0.93479083 0.9286…): 1.6144s[BRAsyncio  (matrix1=[[0.93479083 0.9286…): 0.6622s[BRMultiprocessing  (matrix1=[[0.93479083 0.9286…): 4.7708s[BR:::

Conclusion

  • CPU-Bound: Multiprocessing excels for CPU-bound tasks like primes and matrix multiplication due to true parallelism. Threading/asyncio suffer from the GIL. Memoization drastically improves prime generation.

  • I/O-Bound: Threading and asyncio perform well for I/O-bound image processing, efficiently managing concurrent I/O operations.
    Multiprocessing adds overhead here.

  • Key Takeaway: Choose the concurrency model based on the task. Multiprocessing for CPU-bound, threading/asyncio for I/O-bound. Memoize repeated computations.

Final Summary

These benchmarks highlight the importance of selecting the right concurrency model in Python. Multiprocessing maximizes CPU utilization for computationally intensive tasks, while threading and asyncio shine in I/O-bound scenarios.

Memoization is a powerful optimization technique for functions with repeated calculations, and specially for recursive functions.