Ir al contenido

Concurrency & Parallelism

Objetivo: Manejar miles de requests concurrentes sin bloquear el event loop.

El GIL es un mutex que protege el acceso a objetos Python, previniendo que múltiples threads ejecuten bytecode Python simultáneamente.

Cuándo se libera el GIL:

  • I/O operations (requests, aiohttp, file operations)
  • time.sleep() y asyncio.sleep()
  • Operaciones NumPy y computaciones C-level
  • Llamadas a bibliotecas externas (C/C++/Rust)

Implicación:

  • Multithreading NO escala en tareas CPU-bound
  • Multithreading SÍ ayuda en tareas I/O-bound
  • Multiprocessing para CPU-bound (pero tiene coste de IPC)
import asyncio
# Event Loop - El corazón de asyncio
async def main():
# Coroutine - función que puede pausarse
result = await fetch_data() # await = pausar aquí, dar control al loop
# Task - coroutine siendo ejecutada
task = asyncio.create_task(fetch_data()) # fire and forget
result = await task # esperar resultado

Diferencias clave:

  • await: Pausa la coroutine actual, bloquea hasta que complete
  • create_task(): Inicia ejecución en background, no bloquea
# ❌ VULNERABLE
counter = 0
async def increment():
global counter
temp = counter
await asyncio.sleep(0) # Context switch!
counter = temp + 1
# ✅ SEGURO
lock = asyncio.Lock()
async def increment_safe():
global counter
async with lock:
temp = counter
await asyncio.sleep(0)
counter = temp + 1

“Diseñas un servicio de inferencia. El endpoint recibe una imagen, hace pre-procesado con OpenCV (CPU), llama a API externa de IA (I/O), y guarda en DB (I/O). Diseña la estrategia de concurrencia perfecta.”

Respuesta esperada:

  1. Pre-procesado (CPU-bound): ThreadPoolExecutor o ProcessPoolExecutor
  2. API call (I/O-bound): asyncio con aiohttp
  3. DB write (I/O-bound): asyncio con driver async (e.g., asyncpg)
  4. Arquitectura: FastAPI async endpoint que:
    • Recibe imagen (async)
    • Envía pre-procesado a thread pool
    • Await resultado, hace API call con aiohttp
    • Await resultado, escribe a DB con asyncpg

🧪 Lab Práctico: “The Async Rate-Limiter”

Sección titulada «🧪 Lab Práctico: “The Async Rate-Limiter”»

Escribir un sistema de requests concurrentes con rate limiting usando solo asyncio.

  1. Simular 500 requests HTTP (usa asyncio.sleep(random.uniform(0.1, 0.5)))
  2. Limitar concurrencia a 10 con asyncio.Semaphore
  3. Producer-Consumer pattern con asyncio.Queue:
    • Producer: genera requests
    • Workers (3): consumen de la queue y “procesan”
  4. Graceful Shutdown: Manejar KeyboardInterrupt
import asyncio
import random
from typing import List
async def fetch_url(url: str, semaphore: asyncio.Semaphore) -> dict:
"""Simula request HTTP con rate limiting."""
async with semaphore:
print(f"🔄 Fetching {url}")
await asyncio.sleep(random.uniform(0.1, 0.5))
return {"url": url, "status": 200, "data": f"Response from {url}"}
async def producer(queue: asyncio.Queue, urls: List[str]):
"""Produce requests en la queue."""
# TODO: Implementar
pass
async def worker(name: str, queue: asyncio.Queue):
"""Consume y procesa requests."""
# TODO: Implementar
pass
async def main():
# TODO: Implementar orquestación
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n⚠️ Shutting down gracefully...")
  • 500 requests completadas
  • Nunca más de 10 concurrentes (verifica con logs)
  • 3 workers procesando
  • Graceful shutdown funciona
  • Tiempo total < 15 segundos

1.2 Advanced Typing & Validation