Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Streaming Responses and Concurrency Handling

Section Goal: Master streaming output and high-concurrency handling techniques for Agent services.


Why Streaming Responses?

An LLM may take 5–15 seconds to generate a complete reply. Making users wait in silence is a poor experience. Streaming responses work like a typewriter — each character is sent as it's generated, so users can see "AI is thinking" in real time.

Streaming SSE Response + Concurrency Control Architecture


LLM Streaming Output

from langchain_openai import ChatOpenAI
import asyncio

async def stream_agent_response(question: str):
    """Stream Agent reply"""
    
    llm = ChatOpenAI(model="gpt-4o", streaming=True)
    
    # Method 1: Use astream (recommended)
    full_response = ""
    async for chunk in llm.astream(question):
        token = chunk.content
        if token:
            full_response += token
            print(token, end="", flush=True)  # Real-time output
    
    return full_response

Implementing Streaming SSE in FastAPI

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
import json

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(question: str):
    """Streaming Agent chat"""
    
    async def generate():
        llm = ChatOpenAI(model="gpt-4o", streaming=True)
        
        # Send thinking status
        yield f"data: {json.dumps({'type': 'thinking'})}\n\n"
        
        # Stream generate reply
        async for chunk in llm.astream(question):
            if chunk.content:
                yield f"data: {json.dumps({'type': 'token', 'content': chunk.content})}\n\n"
        
        # Send completion signal
        yield f"data: {json.dumps({'type': 'done'})}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Concurrency Handling

Async Agent Execution

import asyncio
from langchain_openai import ChatOpenAI

class AsyncAgentService:
    """Async Agent service — supports high concurrency"""
    
    def __init__(self, max_concurrent: int = 50):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.llm = ChatOpenAI(model="gpt-4o")
    
    async def handle_request(self, question: str) -> str:
        """Handle a single request (with concurrency control)"""
        
        async with self.semaphore:  # Limit number of concurrent requests
            try:
                response = await asyncio.wait_for(
                    self.llm.ainvoke(question),
                    timeout=30.0  # 30-second timeout
                )
                return response.content
            except asyncio.TimeoutError:
                return "Sorry, request timed out. Please try again later."
    
    async def batch_process(
        self,
        questions: list[str]
    ) -> list[str]:
        """Batch process multiple requests"""
        
        tasks = [
            self.handle_request(q) for q in questions
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return [
            r if isinstance(r, str) else f"Error: {r}"
            for r in results
        ]

Request Queue (Handling Traffic Spikes)

import asyncio
from collections import deque
from dataclasses import dataclass, field
from typing import Any

@dataclass
class QueuedRequest:
    """Request in queue"""
    question: str
    future: asyncio.Future = field(default_factory=lambda: asyncio.get_event_loop().create_future())

class RequestQueue:
    """Request queue — traffic peak shaving"""
    
    def __init__(self, max_queue_size: int = 1000, workers: int = 10):
        self.queue = asyncio.Queue(maxsize=max_queue_size)
        self.workers = workers
    
    async def start(self, agent_service: AsyncAgentService):
        """Start worker coroutines"""
        worker_tasks = [
            asyncio.create_task(self._worker(agent_service))
            for _ in range(self.workers)
        ]
        return worker_tasks
    
    async def _worker(self, agent_service: AsyncAgentService):
        """Worker coroutine: fetch requests from queue and process"""
        while True:
            request: QueuedRequest = await self.queue.get()
            try:
                result = await agent_service.handle_request(
                    request.question
                )
                request.future.set_result(result)
            except Exception as e:
                request.future.set_exception(e)
            finally:
                self.queue.task_done()
    
    async def submit(self, question: str) -> str:
        """Submit request and wait for result"""
        request = QueuedRequest(question=question)
        
        try:
            self.queue.put_nowait(request)
        except asyncio.QueueFull:
            raise Exception("Service busy, please try again later")
        
        return await request.future

Connection Pool Management

Avoid creating new HTTP connections for every request:

import httpx

class LLMConnectionPool:
    """LLM API connection pool"""
    
    def __init__(self, max_connections: int = 100):
        self.client = httpx.AsyncClient(
            limits=httpx.Limits(
                max_connections=max_connections,
                max_keepalive_connections=50
            ),
            timeout=httpx.Timeout(30.0)
        )
    
    async def call_llm(self, messages: list[dict]) -> str:
        """Call LLM using connection pool"""
        response = await self.client.post(
            "https://api.openai.com/v1/chat/completions",
            json={
                "model": "gpt-4o",
                "messages": messages
            },
            headers={
                "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"
            }
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]
    
    async def close(self):
        """Close connection pool"""
        await self.client.aclose()

Summary

TechnologyProblem SolvedKey Point
Streaming responseReduces user wait timeSSE + LLM streaming
Async processingImproves concurrencyasyncio + ainvoke
SemaphorePrevents resource exhaustionSemaphore limits concurrency
Request queueHandles traffic spikesPeak shaving
Connection poolReduces connection overheadhttpx connection reuse

Next Section Preview: Finally, we'll integrate everything and deploy a complete production-grade Agent service.


Next: 18.5 Practice: Deploying a Production-Grade Agent Service →