TKK_E32230176/Source-Code-Web/.agents/skills/developing-genkit-python/references/fastapi.md

248 lines
7.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# FastAPI — Genkit Python
## Install
```bash
uv add genkit-plugin-fastapi fastapi uvicorn
```
---
## Streaming by default
The `genkit_fastapi_handler` decorator auto-streams when the client sends `Accept: text/event-stream`.
No extra setup — just add the header on the frontend and it works.
**Wire format (SSE):**
```
data: {"message": "<chunk text>"} ← one per ctx.send_chunk() call
data: {"message": "<chunk text>"}
data: {"result": <final output>} ← sent once when flow completes
```
**Frontend (JS EventSource):**
```js
const res = await fetch('/flow/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' },
body: JSON.stringify({ data: { topic: 'quantum computing' } }),
});
const reader = res.body.getReader();
// decode and parse each `data: {...}` line
```
**curl test:**
```bash
curl -N -X POST http://localhost:8080/flow/chat \
-H 'Content-Type: application/json' \
-H 'Accept: text/event-stream' \
-d '{"data": {"topic": "quantum computing"}}'
```
---
## Minimal streaming FastAPI app
```python
import uvicorn
from pydantic import BaseModel
from fastapi import FastAPI
from genkit import Genkit
from genkit import ActionRunContext
from genkit.plugins.fastapi import genkit_fastapi_handler
from genkit.plugins.google_genai import GoogleAI
ai = Genkit(plugins=[GoogleAI()], model='googleai/gemini-flash-latest')
app = FastAPI()
class ChatInput(BaseModel):
topic: str
@app.post('/flow/chat', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def chat(input: ChatInput, ctx: ActionRunContext) -> str:
sr = ai.generate_stream(prompt=f'Tell me about {input.topic}.')
full = ''
async for chunk in sr.stream:
if chunk.text:
ctx.send_chunk(chunk.text) # each chunk → SSE event on the wire
full += chunk.text
return full
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8080)
```
**Key:** flow must accept `ctx: ActionRunContext` and call `ctx.send_chunk(text)` to emit SSE chunks.
Without `ctx.send_chunk`, the flow runs but streams nothing — client waits for the final result.
---
## Advanced Use Cases
### Fine-grained control over flow streaming
Complex apps chain flows — a parent orchestrates children. Chunks propagate upward by **passing `ctx` to child flows**.
```python
class ResearchInput(BaseModel):
topic: str
@ai.flow()
async def research(input: ResearchInput, ctx: ActionRunContext) -> str:
"""Child flow — streams its generate_stream chunks to whoever called it."""
sr = ai.generate_stream(prompt=f'Explain {input.topic} in depth.')
full = ''
async for chunk in sr.stream:
if chunk.text:
ctx.send_chunk(chunk.text) # propagates up through the call stack
full += chunk.text
return full
class HeadlineInput(BaseModel):
text: str
@ai.flow()
async def make_headline(input: HeadlineInput) -> str:
"""Child flow — non-streaming, returns instantly."""
response = await ai.generate(prompt=f'One-line headline for: {input.text}')
return response.text.strip()
class ReportInput(BaseModel):
topic: str
@app.post('/flow/report', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def report(input: ReportInput, ctx: ActionRunContext) -> str:
"""Parent flow — calls children, composes a streaming report."""
# Step 1: fast non-streaming call
headline = await make_headline(HeadlineInput(text=input.topic))
ctx.send_chunk(f'# {headline}\n\n') # send headline immediately
# Step 2: child flow streams its chunks — passes ctx so they flow up
body = await research(ResearchInput(topic=input.topic), ctx)
return f'# {headline}\n\n{body}'
```
**Rules for nested streaming:**
- Child flows that should stream must also accept `ctx: ActionRunContext`
- Pass the parent's `ctx` when calling child flows: `await child(input, ctx)`
- Non-streaming child flows don't need `ctx` — just `await` them normally
- A child that doesn't call `ctx.send_chunk` contributes nothing to the stream (fine for parallel data fetching)
### Executing flows in parallel
Use `asyncio.gather` to run multiple flows concurrently. Only makes sense when children don't need to stream.
```python
import asyncio
class AnalysisInput(BaseModel):
text: str
class CheckResult(BaseModel):
issues: list[str]
class CombinedAnalysis(BaseModel):
issues: list[str]
@ai.flow()
async def check_security(input: AnalysisInput) -> CheckResult:
# Here the model reviews the text; replace with your real prompt/schema as needed.
r = await ai.generate(
prompt=f'List security concerns as a short comma-separated line (or "none"): {input.text[:2000]}',
)
raw = (r.text or '').strip()
issues = [s.strip() for s in raw.split(',') if s.strip() and s.strip().lower() != 'none']
return CheckResult(issues=issues)
@ai.flow()
async def check_bugs(input: AnalysisInput) -> CheckResult:
# Model lists possible bugs; tune prompt for your codebase.
r = await ai.generate(
prompt=f'List likely bugs or correctness issues as a short comma-separated line (or "none"): {input.text[:2000]}',
)
raw = (r.text or '').strip()
issues = [s.strip() for s in raw.split(',') if s.strip() and s.strip().lower() != 'none']
return CheckResult(issues=issues)
@ai.flow()
async def check_style(input: AnalysisInput) -> CheckResult:
# Model suggests style/clarity issues; optional: use output_schema for structured rows.
r = await ai.generate(
prompt=f'List style or clarity issues as a short comma-separated line (or "none"): {input.text[:2000]}',
)
raw = (r.text or '').strip()
issues = [s.strip() for s in raw.split(',') if s.strip() and s.strip().lower() != 'none']
return CheckResult(issues=issues)
@app.post('/flow/analyze', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def analyze(input: AnalysisInput) -> CombinedAnalysis:
security, bugs, style = await asyncio.gather(
check_security(input),
check_bugs(input),
check_style(input),
)
return CombinedAnalysis(issues=security.issues + bugs.issues + style.issues)
```
---
## Structured output endpoint (non-streaming)
```python
class SentimentResult(BaseModel):
sentiment: str # positive / negative / neutral
confidence: float # 0.01.0
key_phrases: list[str]
@app.post('/flow/sentiment', response_model=None)
@genkit_fastapi_handler(ai)
@ai.flow()
async def sentiment(input: AnalysisInput) -> SentimentResult:
response = await ai.generate(
prompt=f'Analyze sentiment: {input.text}',
output_format='json',
output_schema=SentimentResult,
)
return response.output
```
Client calls this without `Accept: text/event-stream` — gets `{"result": {...}}` back.
---
## Decorator order
Must be exactly: `@app.post``@genkit_fastapi_handler(ai)``@ai.flow()`
```python
@app.post('/flow/chat', response_model=None) # 1. FastAPI route
@genkit_fastapi_handler(ai) # 2. Genkit wire format + streaming
@ai.flow() # 3. Flow registration
async def chat(input: ChatInput, ctx: ActionRunContext) -> str:
...
```
---
## Run with Dev UI
```bash
GEMINI_API_KEY=your-key genkit start -- uv run src/main.py
```
Leave the process running until the CLI prints something like:
```
Genkit Developer UI: http://localhost:4000
```
Open that URL. Port may differ if 4000 is busy.