7.4 KiB
FastAPI — Genkit Python
Install
uv add genkit-plugin-fastapi fastapi uvicorn
Streaming by default
The genkit_fastapi_handler decorator auto-streams when the client sends Accept: text/event-stream.
No extra setup — just add the header on the frontend and it works.
Wire format (SSE):
data: {"message": "<chunk text>"} ← one per ctx.send_chunk() call
data: {"message": "<chunk text>"}
data: {"result": <final output>} ← sent once when flow completes
Frontend (JS EventSource):
const res = await fetch('/flow/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' },
body: JSON.stringify({ data: { topic: 'quantum computing' } }),
});
const reader = res.body.getReader();
// decode and parse each `data: {...}` line
curl test:
curl -N -X POST http://localhost:8080/flow/chat \
-H 'Content-Type: application/json' \
-H 'Accept: text/event-stream' \
-d '{"data": {"topic": "quantum computing"}}'
Minimal streaming FastAPI app
import uvicorn
from pydantic import BaseModel
from fastapi import FastAPI
from genkit import Genkit
from genkit import ActionRunContext
from genkit.plugins.fastapi import genkit_fastapi_handler
from genkit.plugins.google_genai import GoogleAI
ai = Genkit(plugins=[GoogleAI()], model='googleai/gemini-flash-latest')
app = FastAPI()
class ChatInput(BaseModel):
topic: str
@app.post('/flow/chat', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def chat(input: ChatInput, ctx: ActionRunContext) -> str:
sr = ai.generate_stream(prompt=f'Tell me about {input.topic}.')
full = ''
async for chunk in sr.stream:
if chunk.text:
ctx.send_chunk(chunk.text) # each chunk → SSE event on the wire
full += chunk.text
return full
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8080)
Key: flow must accept ctx: ActionRunContext and call ctx.send_chunk(text) to emit SSE chunks.
Without ctx.send_chunk, the flow runs but streams nothing — client waits for the final result.
Advanced Use Cases
Fine-grained control over flow streaming
Complex apps chain flows — a parent orchestrates children. Chunks propagate upward by passing ctx to child flows.
class ResearchInput(BaseModel):
topic: str
@ai.flow()
async def research(input: ResearchInput, ctx: ActionRunContext) -> str:
"""Child flow — streams its generate_stream chunks to whoever called it."""
sr = ai.generate_stream(prompt=f'Explain {input.topic} in depth.')
full = ''
async for chunk in sr.stream:
if chunk.text:
ctx.send_chunk(chunk.text) # propagates up through the call stack
full += chunk.text
return full
class HeadlineInput(BaseModel):
text: str
@ai.flow()
async def make_headline(input: HeadlineInput) -> str:
"""Child flow — non-streaming, returns instantly."""
response = await ai.generate(prompt=f'One-line headline for: {input.text}')
return response.text.strip()
class ReportInput(BaseModel):
topic: str
@app.post('/flow/report', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def report(input: ReportInput, ctx: ActionRunContext) -> str:
"""Parent flow — calls children, composes a streaming report."""
# Step 1: fast non-streaming call
headline = await make_headline(HeadlineInput(text=input.topic))
ctx.send_chunk(f'# {headline}\n\n') # send headline immediately
# Step 2: child flow streams its chunks — passes ctx so they flow up
body = await research(ResearchInput(topic=input.topic), ctx)
return f'# {headline}\n\n{body}'
Rules for nested streaming:
- Child flows that should stream must also accept
ctx: ActionRunContext - Pass the parent's
ctxwhen calling child flows:await child(input, ctx) - Non-streaming child flows don't need
ctx— justawaitthem normally - A child that doesn't call
ctx.send_chunkcontributes nothing to the stream (fine for parallel data fetching)
Executing flows in parallel
Use asyncio.gather to run multiple flows concurrently. Only makes sense when children don't need to stream.
import asyncio
class AnalysisInput(BaseModel):
text: str
class CheckResult(BaseModel):
issues: list[str]
class CombinedAnalysis(BaseModel):
issues: list[str]
@ai.flow()
async def check_security(input: AnalysisInput) -> CheckResult:
# Here the model reviews the text; replace with your real prompt/schema as needed.
r = await ai.generate(
prompt=f'List security concerns as a short comma-separated line (or "none"): {input.text[:2000]}',
)
raw = (r.text or '').strip()
issues = [s.strip() for s in raw.split(',') if s.strip() and s.strip().lower() != 'none']
return CheckResult(issues=issues)
@ai.flow()
async def check_bugs(input: AnalysisInput) -> CheckResult:
# Model lists possible bugs; tune prompt for your codebase.
r = await ai.generate(
prompt=f'List likely bugs or correctness issues as a short comma-separated line (or "none"): {input.text[:2000]}',
)
raw = (r.text or '').strip()
issues = [s.strip() for s in raw.split(',') if s.strip() and s.strip().lower() != 'none']
return CheckResult(issues=issues)
@ai.flow()
async def check_style(input: AnalysisInput) -> CheckResult:
# Model suggests style/clarity issues; optional: use output_schema for structured rows.
r = await ai.generate(
prompt=f'List style or clarity issues as a short comma-separated line (or "none"): {input.text[:2000]}',
)
raw = (r.text or '').strip()
issues = [s.strip() for s in raw.split(',') if s.strip() and s.strip().lower() != 'none']
return CheckResult(issues=issues)
@app.post('/flow/analyze', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def analyze(input: AnalysisInput) -> CombinedAnalysis:
security, bugs, style = await asyncio.gather(
check_security(input),
check_bugs(input),
check_style(input),
)
return CombinedAnalysis(issues=security.issues + bugs.issues + style.issues)
Structured output endpoint (non-streaming)
class SentimentResult(BaseModel):
sentiment: str # positive / negative / neutral
confidence: float # 0.0–1.0
key_phrases: list[str]
@app.post('/flow/sentiment', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def sentiment(input: AnalysisInput) -> SentimentResult:
response = await ai.generate(
prompt=f'Analyze sentiment: {input.text}',
output_format='json',
output_schema=SentimentResult,
)
return response.output
Client calls this without Accept: text/event-stream — gets {"result": {...}} back.
Decorator order
Must be exactly: @app.post → @genkit_fastapi_handler(ai) → @ai.flow()
@app.post('/flow/chat', response_model=None) # 1. FastAPI route
@genkit_fastapi_handler(ai) # 2. Genkit wire format + streaming
@ai.flow() # 3. Flow registration
async def chat(input: ChatInput, ctx: ActionRunContext) -> str:
...
Run with Dev UI
GEMINI_API_KEY=your-key genkit start -- uv run src/main.py
Leave the process running until the CLI prints something like:
Genkit Developer UI: http://localhost:4000
Open that URL. Port may differ if 4000 is busy.