Code
from functools import lru_cache
import os
@lru_cache(maxsize=None)
def is_prime(n: int, divisor: int = 2) -> bool:
"""Check if a given integer is a prime number using optional memoization."""
if n < 2:
return False
if divisor**2 > n:
return True
return False if n % divisor == 0 else is_prime(n, divisor + 1)
def prime_number_generator(limit: int):
"""Generates numbers to check for primality."""
yield from range(2, limit + 1)
def generate_primes_sequential(limit: int) -> list[int]:
"""Sequentially."""
return [num for num in prime_number_generator(limit) if is_prime(num)]
def generate_primes_threading(limit: int) -> list[int]:
"""Using threading, automatically utilizing all available CPU threads."""
list[int] = []
primes: = threading.Lock()
lock
def worker(start: int, end: int) -> None:
"""Worker function to check for primes in a range."""
list[int] = [
local_primes: for num in range(start, end + 1) if is_prime(num)
num
]with lock:
primes.extend(local_primes)
# Divide the range into chunks for each thread
= (limit - 1) // num_threads
chunk_size list[threading.Thread] = []
threads: for i in range(num_threads):
= 2 + i * chunk_size
start = start + chunk_size - 1 if i < num_threads - 1 else limit
end = threading.Thread(target=worker, args=(start, end))
thread
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
primes.sort()return primes
def generate_primes_asyncio(limit: int) -> list[int]:
"""Using asyncio."""
async def is_prime_async(num: int) -> bool:
return is_prime(num)
async def _async_gather_primes(limit: int):
= [is_prime_async(num) for num in prime_number_generator(limit)]
tasks = await asyncio.gather(*tasks)
results = [num for num, is_prime in enumerate(results, 2) if is_prime]
primes return primes
= asyncio.get_event_loop()
loop
return loop.run_until_complete(_async_gather_primes(limit))
def generate_primes_multiprocessing(limit: int) -> list[int]:
"""Using multiprocessing."""
with multiprocessing.Pool(processes=os.cpu_count() or 1) as pool:
= pool.map(is_prime, prime_number_generator(limit))
results return [num for num, prime in enumerate(results) if prime]
def generate_primes_concurrent_futures(limit: int) -> list[int]:
"""Using concurrent.futures."""
with concurrent.futures.ProcessPoolExecutor() as executor:
= executor.map(is_prime, prime_number_generator(limit))
results return [num for num, prime in enumerate(results) if prime]
def gen_primes_benchmark():
"Primes Generator Benchmark")
title(= int(3e5)
limit = {}
results
"Sequential", limit)
print_benchmark(results, generate_primes_sequential, "Sequential Memoized", limit)
print_benchmark(results, generate_primes_sequential,
is_prime.cache_clear()
"Threading", limit)
print_benchmark(results, generate_primes_threading, "Threading Memoized", limit)
print_benchmark(results, generate_primes_threading,
is_prime.cache_clear()
"Asyncio", limit)
print_benchmark(results, generate_primes_asyncio, "Asyncio Memoized", limit)
print_benchmark(results, generate_primes_asyncio,
is_prime.cache_clear()
"Multiprocessing", limit)
print_benchmark(results, generate_primes_multiprocessing, "Multiprocessing Memoized", limit)
print_benchmark(results, generate_primes_multiprocessing,
is_prime.cache_clear()
print_benchmark("Futures //3", limit // 3
results, generate_primes_concurrent_futures,
)
print_benchmark("Futures //3 Memoized", limit // 3
results, generate_primes_concurrent_futures,
)
gen_primes_benchmark()
[95mPrimes Generator Benchmark[0m[BR[34mSequential [0m ([32mlimit=300000[0m):[94m 4.5257s[0m[BR[34mSequential Memoized [0m ([32mlimit=300000[0m):[94m 0.0338s[0m[BR[34mThreading [0m ([32mlimit=300000[0m):[94m 4.4717s[0m[BR[34mThreading Memoized [0m ([32mlimit=300000[0m):[94m 0.0565s[0m[BR[34mAsyncio [0m ([32mlimit=300000[0m):[94m 6.4361s[0m[BR[34mAsyncio Memoized [0m ([32mlimit=300000[0m):[94m 1.7747s[0m[BR[34mMultiprocessing [0m ([32mlimit=300000[0m):[94m 1.3501s[0m[BR[34mMultiprocessing Memoized[0m ([32mlimit=300000[0m):[94m 1.4462s[0m[BR[34mFutures //3 [0m ([32mlimit=100000[0m):[94m 8.4532s[0m[BR[34mFutures //3 Memoized [0m ([32mlimit=100000[0m):[94m 8.7006s[0m[BR:::
Test 2: Image Processing (I/O-bound)
You’ll need to install the Pillow library
pip install Pillow
This challenge involves processing a set of images (resizing, filtering, etc.). This is an I/O-bound task.
Download sample images
Code
import os
import requests
import tempfile
# Directory to save downloaded images
= tempfile.gettempdir()
output_dir =True)
os.makedirs(output_dir, exist_ok
# URLs of sample images (replace with your own URLs or use these defaults)
= [
image_urls "https://picsum.photos/1280/720?random",
"https://picsum.photos/1920/1080?random",
"https://picsum.photos/3840/2160?random",
"https://picsum.photos/3840/2160?random",
]
def download_image(url: str, save_path: str) -> None:
"""Download an image from a URL and save it to the specified path."""
= requests.get(url, stream=True)
response if response.status_code == 200:
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
def download_sample_images(urls: list[str], output_dir: str) -> None:
"""Download multiple images and save them to the output directory."""
for i, url in enumerate(urls):
= f"image{i + 1}.jpg"
image_name = os.path.join(output_dir, image_name)
save_path
download_image(url, save_path)
def remove_sample_images(output_dir: str) -> None:
"""Remove sample images from the output directory."""
for file in os.listdir(output_dir):
= os.path.join(output_dir, file)
file_path if os.path.isfile(file_path):
os.remove(file_path)
remove_sample_images(output_dir) download_sample_images(image_urls, output_dir)
Code
from PIL import Image
from pathlib import Path
import tempfile
def process_image(image_path: str, output_dir: str) -> None:
"""Process an image by resizing it to 128x128 pixels."""
with Image.open(image_path) as img:
= img.resize((128, 128))
img = os.path.join(output_dir, os.path.basename(image_path))
output_path
img.save(output_path)
def process_images_sequential(image_paths: list[str], output_dir: str) -> None:
"""Process images sequentially."""
for image_path in image_paths:
process_image(image_path, output_dir)
def process_images_threading(image_paths: list[str], output_dir: str) -> None:
"""Process images using threading."""
with ThreadPoolExecutor(max_workers=os.cpu_count() or 1) as executor:
map(lambda path: process_image(path, output_dir), image_paths)
executor.
def process_images_asyncio(image_paths: list[str], output_dir: str) -> None:
"""Process images using asyncio."""
async def process_image_async(image_path: str) -> None:
= asyncio.get_event_loop()
loop await loop.run_in_executor(None, process_image, image_path, output_dir)
async def _async_gather_images(image_paths: list[str]) -> None:
= [process_image_async(image_path) for image_path in image_paths]
tasks await asyncio.gather(*tasks)
= asyncio.get_event_loop()
loop return loop.run_until_complete(_async_gather_images(image_paths))
def process_images_multiprocessing(image_paths: list[str], output_dir: str) -> None:
"""Process images using multiprocessing."""
= functools.partial(process_image, output_dir=output_dir) #create a partially applied function
process_image_partial with multiprocessing.Pool(processes=os.cpu_count() or 1) as pool:
map(process_image_partial, image_paths) # now only image_path is passed to process_image
pool.
def benchmark_image_processing():
"""Benchmark image processing using different concurrency models."""
= [
image_paths / i for i in [
Path(output_dir) "image1.jpg", "image2.jpg", "image3.jpg", "image4.jpg"
]
]
# Check for image existence BEFORE the loop
for image_path in image_paths:
if not image_path.exists():
raise FileNotFoundError(f"Sample image {image_path} not found.")
= {}
results "Image Processing Benchmark")
title(
with tempfile.TemporaryDirectory() as temp_dir: # Use tempfile.TemporaryDirectory
"Sequential", image_paths, temp_dir)
print_benchmark(results, process_images_sequential, "Threading", image_paths, temp_dir)
print_benchmark(results, process_images_threading, "Asyncio", image_paths, temp_dir)
print_benchmark(results, process_images_asyncio, "Multiprocessing", image_paths, temp_dir)
print_benchmark(results, process_images_multiprocessing,
benchmark_image_processing()
[95mImage Processing Benchmark[0m[BR[34mSequential [0m ([32mimage_paths=4, output_dir=/…[0m):[94m 0.1406s[0m[BR[34mThreading [0m ([32mimage_paths=4, output_dir=/…[0m):[94m 0.0817s[0m[BR[34mAsyncio [0m ([32mimage_paths=4, output_dir=/…[0m):[94m 0.0698s[0m[BR[34mMultiprocessing [0m ([32mimage_paths=4, output_dir=/…[0m):[94m 0.2791s[0m[BR:::
Test 3: Matrix Multiplication (CPU-bound, Larger Scale)
You’ll need to install NumPy
pip install numpy
This challenge involves multiplying two large matrices.
This is a CPU-bound task.
Code
import numpy as np
def matrix_multiply_sequential(matrix1: np.ndarray, matrix2: np.ndarray) -> np.ndarray:
"""Sequential matrix multiplication using NumPy."""
return np.dot(matrix1, matrix2)
def matrix_multiply_threading(
int
matrix1: np.ndarray, matrix2: np.ndarray, num_threads: -> np.ndarray:
) """Threaded matrix multiplication using a thread pool."""
def worker(start: int, end: int, result: np.ndarray) -> None:
"""Worker function to compute a portion of the result matrix."""
= np.dot(matrix1[start:end], matrix2)
result[start:end]
# Divide the work into chunks for each thread
= matrix1.shape[0] // num_threads
chunk_size = []
threads = np.zeros((matrix1.shape[0], matrix2.shape[1]))
result
for i in range(num_threads):
= i * chunk_size
start = (i + 1) * chunk_size if i < num_threads - 1 else matrix1.shape[0]
end = threading.Thread(target=worker, args=(start, end, result))
thread
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
return result
def matrix_multiply_asyncio(matrix1: np.ndarray, matrix2: np.ndarray) -> np.ndarray:
"""Asyncio-based matrix multiplication (using synchronous NumPy)."""
async def _inner_matrix_multiply_asyncio(
matrix1: np.ndarray, matrix2: np.ndarray-> np.ndarray:
) async def worker(start: int, end: int, result: np.ndarray) -> None:
"""Worker function to compute a portion of the result matrix."""
= np.dot(matrix1[start:end], matrix2)
result[start:end]
# Divide the work into chunks for each worker
= matrix1.shape[0] // num_threads
chunk_size = []
tasks = np.zeros((matrix1.shape[0], matrix2.shape[1]))
result
for i in range(num_threads):
= i * chunk_size
start = (i + 1) * chunk_size if i < num_threads - 1 else matrix1.shape[0]
end = asyncio.create_task(worker(start, end, result))
task
tasks.append(task)
await asyncio.gather(*tasks)
return result
return asyncio.run(_inner_matrix_multiply_asyncio(matrix1, matrix2))
def matrix_multiply_multiprocessing(
matrix1: np.ndarray, matrix2: np.ndarray-> np.ndarray:
) """Using multiprocessing."""
with multiprocessing.Pool(processes=os.cpu_count() or 1) as pool:
= []
results = matrix1.shape[0] // num_processes
chunk_size for i in range(num_processes):
= i * chunk_size
start = (
end + 1) * chunk_size if i < num_processes - 1 else matrix1.shape[0]
(i
)
= pool.apply_async(np.dot, (matrix1[start:end], matrix2))
result
results.append(result)
pool.close()
pool.join()return np.vstack([result.get() for result in results])
def benchmark_matrix_multiplication():
"""Benchmark matrix multiplication functions."""
"Matrix Multiplication Benchmark")
title(= int(3e3) # 3e4 slow :)
matrix_size = np.random.rand(matrix_size, matrix_size)
matrix1 = np.random.rand(matrix_size, matrix_size)
matrix2 = {}
results
print_benchmark("Sequential", matrix1, matrix2
results, matrix_multiply_sequential,
)
print_benchmark("Threading", matrix1, matrix2, num_threads
results, matrix_multiply_threading,
)"Asyncio", matrix1, matrix2)
print_benchmark(results, matrix_multiply_asyncio,
print_benchmark("Multiprocessing", matrix1, matrix2
results, matrix_multiply_multiprocessing,
)
benchmark_matrix_multiplication()
[95mMatrix Multiplication Benchmark[0m[BR[34mSequential [0m ([32mmatrix1=[[0.93479083 0.9286…[0m):[94m 0.2084s[0m[BR[34mThreading [0m ([32mmatrix1=[[0.93479083 0.9286…[0m):[94m 1.6144s[0m[BR[34mAsyncio [0m ([32mmatrix1=[[0.93479083 0.9286…[0m):[94m 0.6622s[0m[BR[34mMultiprocessing [0m ([32mmatrix1=[[0.93479083 0.9286…[0m):[94m 4.7708s[0m[BR:::
Conclusion
CPU-Bound: Multiprocessing excels for CPU-bound tasks like primes and matrix multiplication due to true parallelism. Threading/asyncio suffer from the GIL. Memoization drastically improves prime generation.
I/O-Bound: Threading and asyncio perform well for I/O-bound image processing, efficiently managing concurrent I/O operations.
Multiprocessing adds overhead here.Key Takeaway: Choose the concurrency model based on the task. Multiprocessing for CPU-bound, threading/asyncio for I/O-bound. Memoize repeated computations.
Final Summary
These benchmarks highlight the importance of selecting the right concurrency model in Python. Multiprocessing maximizes CPU utilization for computationally intensive tasks, while threading and asyncio shine in I/O-bound scenarios.
Memoization is a powerful optimization technique for functions with repeated calculations, and specially for recursive functions.