GadaaLabs
Python Mastery — From Zero to AI Engineering
Lesson 13

Concurrency — Threading, Multiprocessing & asyncio

28 min

The Concurrency Problem

Modern programs wait constantly — for disk, network, databases, external APIs. A sequential program wastes CPU during every wait. Concurrency lets you overlap that waiting.

Python offers three distinct models, each solving a different problem:

| Model | Best for | Parallelism? | Overhead | |---|---|---|---| | threading | I/O-bound work | Concurrent, not parallel | Low | | multiprocessing | CPU-bound work | True parallel | High | | asyncio | High-throughput I/O | Concurrent, not parallel | Very low |

Understanding which to use is 90% of the battle.

The GIL — What It Is and Why It Matters

The Global Interpreter Lock (GIL) is a mutex in CPython that ensures only one thread executes Python bytecode at a time. This means:

  • Multiple threads cannot run Python code in parallel on multiple CPU cores
  • The GIL is released during I/O operations — so threads can overlap I/O even without true parallelism
  • C extensions (NumPy, Pandas) often release the GIL during computation — so they can be parallelized with threads
python
# This looks parallel but isn't — GIL prevents simultaneous bytecode execution
import threading

def cpu_work(n):
    return sum(i*i for i in range(n))

# Two threads competing for the GIL — slower than sequential
t1 = threading.Thread(target=cpu_work, args=(10_000_000,))
t2 = threading.Thread(target=cpu_work, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()

For CPU-bound Python code: use multiprocessing (separate processes = separate GILs). For I/O-bound code: threading or asyncio both work fine.

Threading — I/O-Bound Work

The Core Primitives

python
import threading
import time

def worker(name, delay):
    print(f"[{name}] starting")
    time.sleep(delay)   # Simulates I/O — GIL is released here
    print(f"[{name}] done after {delay}s")

# Create and start threads
t1 = threading.Thread(target=worker, args=("A", 2))
t2 = threading.Thread(target=worker, args=("B", 1))

t1.start()
t2.start()

t1.join()  # Wait for t1 to finish
t2.join()  # Wait for t2 to finish
# Total time: ~2s (not 3s) — they ran concurrently

Synchronization Primitives

Lock prevents race conditions when multiple threads share mutable state:

python
import threading

counter = 0
lock = threading.Lock()

def increment(n):
    global counter
    for _ in range(n):
        with lock:          # Acquire lock, execute, release
            counter += 1   # Now thread-safe

threads = [threading.Thread(target=increment, args=(10000,)) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # Always 100000

RLock (Reentrant Lock) — the same thread can acquire it multiple times:

python
rlock = threading.RLock()

def recursive_task(depth):
    with rlock:                      # Works even if called recursively
        if depth > 0:
            recursive_task(depth - 1)

Semaphore — limits concurrent access to a resource:

python
# Limit to 3 concurrent database connections
db_semaphore = threading.Semaphore(3)

def query_database(query_id):
    with db_semaphore:
        print(f"Query {query_id} executing")
        time.sleep(0.5)
        print(f"Query {query_id} complete")

Event — thread signaling:

python
ready = threading.Event()

def producer():
    time.sleep(1)
    print("Data ready!")
    ready.set()   # Signal all waiting threads

def consumer():
    ready.wait()  # Block until event is set
    print("Consumer received signal")

Producer-Consumer with Queue

queue.Queue is thread-safe — the canonical pattern for thread communication:

Producer-Consumer Pattern
Click Run to execute — Python runs in your browser via WebAssembly

ThreadPoolExecutor — The Right Way

Manual thread management is error-prone. Use concurrent.futures.ThreadPoolExecutor:

ThreadPoolExecutor Speedup
Click Run to execute — Python runs in your browser via WebAssembly

Multiprocessing — CPU-Bound Work

Each Process has its own Python interpreter and memory space — no GIL contention:

python
from multiprocessing import Process, Pool, Queue
import os

def cpu_intensive(n):
    """Pure CPU work — benefits from multiprocessing."""
    return sum(i * i for i in range(n))

if __name__ == "__main__":   # REQUIRED on Windows/macOS
    with Pool(processes=4) as pool:
        results = pool.map(cpu_intensive, [1_000_000] * 8)
    print(results)

ProcessPoolExecutor

python
from concurrent.futures import ProcessPoolExecutor
import math

def factorize(n):
    """CPU-bound: find prime factors."""
    factors = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factors.append(d)
            n //= d
        d += 1
    if n > 1:
        factors.append(n)
    return factors

numbers = [999983, 1000003, 999979, 1000033]

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as executor:
        results = list(executor.map(factorize, numbers))
    for n, factors in zip(numbers, results):
        print(f"{n}: {factors}")

Shared Memory and Manager

python
from multiprocessing import Process, Value, Array, Manager

def increment_shared(val, lock):
    for _ in range(10000):
        with lock:
            val.value += 1

# Value: a single shared value with a lock
from multiprocessing import Lock
counter = Value('i', 0)   # 'i' = C int
lock = Lock()

processes = [Process(target=increment_shared, args=(counter, lock))
             for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(counter.value)   # 40000

concurrent.futures: Unified Interface

Both ThreadPoolExecutor and ProcessPoolExecutor share the same API:

python
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time

def fetch(url):
    time.sleep(1)   # Simulate network request
    return f"Response from {url}"

urls = ["api.example.com", "data.service.io", "cdn.fast.net"]

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(fetch, url) for url in urls]

    # Wait for first to complete
    done, pending = wait(futures, return_when=FIRST_COMPLETED)
    for f in done:
        print(f"First result: {f.result()}")

    # Cancel pending if needed
    for f in pending:
        f.cancel()

asyncio — High-Throughput Async I/O

asyncio uses a single thread with an event loop. No OS thread overhead, handles thousands of concurrent connections:

python
import asyncio

async def fetch_data(url: str, delay: float) -> str:
    """async def makes this a coroutine."""
    print(f"Starting: {url}")
    await asyncio.sleep(delay)    # Yield control to event loop
    print(f"Done: {url}")
    return f"data from {url}"

async def main():
    # Run concurrently with asyncio.gather
    results = await asyncio.gather(
        fetch_data("api/users", 1.0),
        fetch_data("api/orders", 0.5),
        fetch_data("api/products", 0.8),
    )
    for r in results:
        print(r)

asyncio.run(main())

asyncio Patterns

create_task — fire and forget, schedule without awaiting immediately:

python
async def main():
    task1 = asyncio.create_task(fetch_data("api/a", 1.0))
    task2 = asyncio.create_task(fetch_data("api/b", 0.5))

    # Do other work here while tasks run concurrently
    print("Tasks started, doing other work...")
    await asyncio.sleep(0.1)

    # Now collect results
    result1 = await task1
    result2 = await task2

Semaphore for rate limiting:

python
async def fetch_with_limit(session, url, semaphore):
    async with semaphore:      # Max 10 concurrent requests
        await asyncio.sleep(0.1)   # Simulated request
        return f"result from {url}"

async def main():
    semaphore = asyncio.Semaphore(10)
    urls = [f"api/item/{i}" for i in range(100)]
    tasks = [fetch_with_limit(None, url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} items")

asyncio.wait_for for timeouts:

python
async def slow_operation():
    await asyncio.sleep(10)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Operation timed out after 2s")

asyncio Queue — Producer/Consumer

asyncio Producer-Consumer Queue
Click Run to execute — Python runs in your browser via WebAssembly

When to Use Which: Decision Matrix

Is your bottleneck I/O (network, disk, database)?
  YES → Is your concurrency level very high (100+ simultaneous)?
          YES → asyncio
          NO  → threading or ThreadPoolExecutor
  NO  → Is it pure Python CPU work?
          YES → multiprocessing / ProcessPoolExecutor
          NO  → (NumPy/Pandas/C extensions) → threading may work due to GIL release

Key rules:

  • asyncio requires your entire stack to be async-aware (libraries must support it)
  • threading works with any existing synchronous code
  • multiprocessing has high startup overhead — worthwhile for tasks taking >100ms
  • Never share mutable state between threads without synchronization primitives

PROJECT: Parallel Data Processor

Parallel Data Processor
Click Run to execute — Python runs in your browser via WebAssembly

PROJECT: Async Task Manager

Async Task Manager with Priority Queue
Click Run to execute — Python runs in your browser via WebAssembly

Key Takeaways

  • The GIL prevents parallel bytecode execution in threads — use multiprocessing for CPU-bound Python code
  • Threading is ideal for I/O-bound work: the GIL is released during I/O, enabling true concurrency
  • ThreadPoolExecutor and ProcessPoolExecutor share the concurrent.futures API — switching between them is one line
  • asyncio uses cooperative multitasking: a coroutine yields control with await, not OS thread switches
  • asyncio.gather() runs coroutines concurrently; asyncio.create_task() schedules them without waiting
  • Use asyncio.Semaphore to cap concurrent connections; asyncio.wait_for to enforce timeouts
  • queue.Queue (threading) and asyncio.Queue are the canonical inter-worker communication patterns
  • Profile before choosing: the "best" model depends on your bottleneck, not on what sounds sophisticated