Cuando se habla de concurrencia se suele hablar de dos conceptos, IO-bound y CPU-bound.
Tareas de cómputo que requieren intensamente el CPU. Operaciones matemáticas, cálculo científico, etc.
Se pueden optimizar aplicando paralelismo (tareas ejecutadas a la vez en distintos CPU cores).
Tareas que requieren operaciones de entrada/salida. Vamos a enfocarnos en estas.
Ocurre cuando hay por ejemplo:
Cuando existe una operacion de I/O, el CPU se queda esperando.
Este escenario se puede optimizar con concurrencia. Las tareas podrían ejecutarse concurrentemente.
import requests
def fetch(session, url):
response = session.get(url)
print(f"[{response.status_code}] {url}")
urls = [
f"https://jsonplaceholder.typicode.com/posts/{i}"
for i in range(1, 20)
]
urls
['https://jsonplaceholder.typicode.com/posts/1', 'https://jsonplaceholder.typicode.com/posts/2', 'https://jsonplaceholder.typicode.com/posts/3', 'https://jsonplaceholder.typicode.com/posts/4', 'https://jsonplaceholder.typicode.com/posts/5', 'https://jsonplaceholder.typicode.com/posts/6', 'https://jsonplaceholder.typicode.com/posts/7', 'https://jsonplaceholder.typicode.com/posts/8', 'https://jsonplaceholder.typicode.com/posts/9', 'https://jsonplaceholder.typicode.com/posts/10', 'https://jsonplaceholder.typicode.com/posts/11', 'https://jsonplaceholder.typicode.com/posts/12', 'https://jsonplaceholder.typicode.com/posts/13', 'https://jsonplaceholder.typicode.com/posts/14', 'https://jsonplaceholder.typicode.com/posts/15', 'https://jsonplaceholder.typicode.com/posts/16', 'https://jsonplaceholder.typicode.com/posts/17', 'https://jsonplaceholder.typicode.com/posts/18', 'https://jsonplaceholder.typicode.com/posts/19']
def fetch_urls():
session = requests.Session()
for url in urls:
fetch(session, url)
%time fetch_urls()
[200] https://jsonplaceholder.typicode.com/posts/1 [200] https://jsonplaceholder.typicode.com/posts/2 [200] https://jsonplaceholder.typicode.com/posts/3 [200] https://jsonplaceholder.typicode.com/posts/4 [200] https://jsonplaceholder.typicode.com/posts/5 [200] https://jsonplaceholder.typicode.com/posts/6 [200] https://jsonplaceholder.typicode.com/posts/7 [200] https://jsonplaceholder.typicode.com/posts/8 [200] https://jsonplaceholder.typicode.com/posts/9 [200] https://jsonplaceholder.typicode.com/posts/10 [200] https://jsonplaceholder.typicode.com/posts/11 [200] https://jsonplaceholder.typicode.com/posts/12 [200] https://jsonplaceholder.typicode.com/posts/13 [200] https://jsonplaceholder.typicode.com/posts/14 [200] https://jsonplaceholder.typicode.com/posts/15 [200] https://jsonplaceholder.typicode.com/posts/16 [200] https://jsonplaceholder.typicode.com/posts/17 [200] https://jsonplaceholder.typicode.com/posts/18 [200] https://jsonplaceholder.typicode.com/posts/19 CPU times: user 358 ms, sys: 21.5 ms, total: 379 ms Wall time: 4.09 s
import concurrent.futures
def fetch_urls_with_threads(limit=None):
session = requests.Session()
with concurrent.futures.ThreadPoolExecutor(max_workers=limit) as executor:
tasks = [
executor.submit(fetch, session, url)
for url in urls
]
concurrent.futures.wait(tasks)
ThreadPoolExecutor
nos provee un context manager, para abstraernos de la creacion y liberación (start, join, close) de los threads.
%time fetch_urls_with_threads(3)
[200] https://jsonplaceholder.typicode.com/posts/1 [200] https://jsonplaceholder.typicode.com/posts/2 [200] https://jsonplaceholder.typicode.com/posts/3 [200] https://jsonplaceholder.typicode.com/posts/4 [200] https://jsonplaceholder.typicode.com/posts/5 [200] https://jsonplaceholder.typicode.com/posts/6 [200] https://jsonplaceholder.typicode.com/posts/7 [200] https://jsonplaceholder.typicode.com/posts/8 [200] https://jsonplaceholder.typicode.com/posts/9 [200] https://jsonplaceholder.typicode.com/posts/10 [200] https://jsonplaceholder.typicode.com/posts/11 [200] https://jsonplaceholder.typicode.com/posts/12 [200] https://jsonplaceholder.typicode.com/posts/13 [200] https://jsonplaceholder.typicode.com/posts/14 [200] https://jsonplaceholder.typicode.com/posts/15 [200] https://jsonplaceholder.typicode.com/posts/16 [200] https://jsonplaceholder.typicode.com/posts/17 [200] https://jsonplaceholder.typicode.com/posts/18 [200] https://jsonplaceholder.typicode.com/posts/19 CPU times: user 470 ms, sys: 34 ms, total: 504 ms Wall time: 1.71 s
Pros:
A tener en cuenta:
Cons:
multiprocessing
o concurrent.futures.ThreadProcessPool
import asyncio
import httpx
import time
async def fetch(session, url):
response = await session.get(url)
print(f"[{response.status_code}] {url}")
async def fetch_urls_with_asyncio():
async with httpx.AsyncClient() as session:
tasks = [
fetch(session, url)
for url in urls
]
await asyncio.gather(*tasks)
start = time.time()
await fetch_urls_with_asyncio()
end = time.time()
print(f"Wall time: {(end - start):.2f}s")
[200] https://jsonplaceholder.typicode.com/posts/13 [200] https://jsonplaceholder.typicode.com/posts/16 [200] https://jsonplaceholder.typicode.com/posts/14 [200] https://jsonplaceholder.typicode.com/posts/19 [200] https://jsonplaceholder.typicode.com/posts/2 [200] https://jsonplaceholder.typicode.com/posts/12 [200] https://jsonplaceholder.typicode.com/posts/5 [200] https://jsonplaceholder.typicode.com/posts/9 [200] https://jsonplaceholder.typicode.com/posts/11 [200] https://jsonplaceholder.typicode.com/posts/1 [200] https://jsonplaceholder.typicode.com/posts/18 [200] https://jsonplaceholder.typicode.com/posts/10 [200] https://jsonplaceholder.typicode.com/posts/7 [200] https://jsonplaceholder.typicode.com/posts/6 [200] https://jsonplaceholder.typicode.com/posts/15 [200] https://jsonplaceholder.typicode.com/posts/3 [200] https://jsonplaceholder.typicode.com/posts/4 [200] https://jsonplaceholder.typicode.com/posts/8 [200] https://jsonplaceholder.typicode.com/posts/17 Wall time: 0.95s
Pros:
A tener en cuenta:
sync | async |
---|---|
redis | aioredis |
requests | httpx / aiohttp |
sleep | asyncio.sleep |
mysqldb | aiomysql |
psycopg | aiopg |
...etc
A diferencia de los threads, en asyncio, cuando una tarea/corutina se queda esperando por IO, le cede el control al main loop de asyncio para que otra tarea/corutina pueda correr.
El main loop no es mas que un objecto Python que maneja una cola de tareas.
A esto se le suele decir multitasking cooperativo.