Smartflow — API and SDK Reference
Smartflow is an enterprise AI gateway that proxies requests to multiple LLM providers, enforces compliance policy, caches semantically, and orchestrates MCP tools and A2A agents. This document covers every API surface the platform exposes: the LLM proxy, management APIs, MCP gateway, A2A gateway, vector store, RAG pipeline, and the Python SDK.
Architecture Overview
Smartflow runs as five cooperating services:
| Service | Default Port | Purpose |
|---|---|---|
| smartflow (proxy) | 7775 | LLM proxy, MCP gateway, A2A gateway, semantic caching, pre/post-call compliance hooks |
| api_server (management) | 7778 | Virtual keys, routing chains, audit logs, analytics |
| compliance_api_server | 7777 | ML content scanning, PII redaction, adaptive learning, intelligent scan |
| policy_perfect_api | 7782 | Policy and preset CRUD, AI document-to-policy generation, assignment management |
| smartflow-hybrid-bridge | 3500 | Cross-datacenter Redis log aggregation |
All five services share one Redis instance for shared state: routing tables, semantic cache, VAS logs, provider latency metrics, virtual key budgets, and MCP server registry. The Policy Perfect API additionally requires PostgreSQL for durable policy and preset storage. In production the proxy sits behind a TLS-terminating reverse proxy (Caddy or nginx). Management, compliance, and policy APIs are backend surfaces.
Authentication
Virtual Keys
The primary credential for clients. Issue sk-sf-{48-hex} tokens via the management API (POST /api/enterprise/vkeys). Each key is stored in Redis and carries optional USD budget caps, period resets, and rate limits. The raw token is returned exactly once at creation — it cannot be retrieved again.
# OpenAI-compatible and all /v1/* routes
Authorization: Bearer sk-sf-a1b2c3d4...
# Anthropic native /anthropic/v1/* routes
x-api-key: sk-sf-a1b2c3d4...
On every request the proxy:
- Extracts the
sk-sf-*token fromAuthorization: Bearerorx-api-key - Looks up the key in Redis — returns
429if not found or revoked - Checks USD budget against accumulated spend — returns
429withX-Smartflow-Budget-Exceeded: trueif exceeded - Forwards request to the provider using the server-side provider API key
- Records actual spend (
cost_usd) back to the key's spend counter after completion
Provider API Keys
Stored server-side in Redis (smartflow:api_keys:{provider}_api_key). Clients never send raw provider credentials. The proxy resolves the correct key from the key store when forwarding to each provider.
Anthropic Native Passthrough
For /anthropic/* routes, clients send their Smartflow virtual key as x-api-key. The proxy validates budget, then replaces it with the real ANTHROPIC_API_KEY before forwarding.
JWT (SafeChat / Dashboard)
The SafeChat web app and admin dashboard use a smartflow_token cookie-based JWT for browser sessions. JWT validation occurs at the application layer, not in the proxy core.
LLM Proxy Endpoints
All proxy endpoints are on port 7775 by default.
/v1/chat/completions
OpenAI-compatible chat completions. Accepts any OpenAI-format request body. Provider and model are resolved from the model name or an explicit prefix.
{
"model": "gpt-4o",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"}
],
"temperature": 0.7,
"max_tokens": 256,
"stream": false
}
Model-prefix routing:
| Prefix / Pattern | Provider |
|---|---|
| gpt-*, o1-*, o3-*, chatgpt-* | OpenAI |
| claude-* | Anthropic |
| gemini-* | Google Gemini |
| grok-* | xAI |
| mistral-*, mixtral-* | Mistral AI |
| command-*, c4ai-* | Cohere |
| llama-*, groq/* | Groq |
| openrouter/* | OpenRouter |
| ollama/* | Local Ollama |
| azure/* | Azure OpenAI |
No prefix is required for the primary supported providers — model name heuristic detects gemini-*, claude-*, gpt-*, etc. automatically. An explicit provider/model prefix always takes precedence.
Multimodal — Image
{
"model": "gpt-4o",
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": "What is in this image?"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}
]
}]
}
Multimodal — Audio (gpt-4o-audio-preview)
{
"type": "input_audio",
"input_audio": {"data": "<base64>", "format": "mp3"}
}
Response
{
"id": "chatcmpl-...",
"object": "chat.completion",
"model": "gpt-4o",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "Paris."},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 24, "completion_tokens": 3, "total_tokens": 27}
}
/anthropic/v1/messages
Native Anthropic Messages API passthrough. The proxy injects the API key from the server key store. The full Anthropic request and response format is preserved with no translation. Also accessible as /cursor/v1/messages for Cursor IDE passthrough. The [1m] suffix that Claude Code appends to model names is stripped automatically.
{
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"system": "You are a helpful assistant.",
"messages": [
{"role": "user", "content": "Hello, Claude."}
]
}
Multimodal — Image (native Anthropic)
{
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": "..."}
}
Multimodal — PDF Document (native Anthropic)
{
"type": "document",
"source": {"type": "base64", "media_type": "application/pdf", "data": "..."}
}
/v1/embeddings
Generate vector embeddings. Supports multi-provider routing via model prefix.
{"model": "text-embedding-3-small", "input": "Your text here"}
Response follows the OpenAI embeddings format with data[].embedding float arrays.
/v1/audio/transcriptions
Transcribe audio. Multipart form upload. Routes to OpenAI Whisper by default. Use groq/whisper-large-v3 for Groq, deepgram/nova-2 for Deepgram.
Content-Type: multipart/form-data
file=@audio.mp3
model=whisper-1
/v1/audio/speech
Text-to-speech synthesis. Returns raw audio bytes.
{
"model": "tts-1", "input": "Hello, world.",
"voice": "nova", "response_format": "mp3"
}
/v1/images/generations
{
"model": "dall-e-3", "prompt": "A futuristic city at sunrise",
"n": 1, "size": "1024x1024", "quality": "hd",
"style": "vivid", "response_format": "url"
}
/v1/rerank
Document reranking. Compatible with Cohere's rerank API.
{
"model": "rerank-english-v3.0",
"query": "What is the return policy?",
"documents": ["Document one text.", "Document two text."],
"top_n": 3
}
/v1/models
List available models across all enabled providers.
/v1/completions
Legacy text completions. Forwarded to the configured provider.
Routing and Provider Selection
Automatic Model-Name Heuristic
For requests to /v1/chat/completions with no explicit provider prefix, the proxy infers the provider from the model name. An explicit provider/model prefix always takes precedence over heuristic detection.
| Pattern | Inferred Provider |
|---|---|
| gpt-*, o1-*, o3-*, o4-*, chatgpt-*, whisper-*, tts-*, dall-e-* | OpenAI |
| claude-* | Anthropic |
| gemini-* | |
| grok-* | xAI |
| mistral-*, mixtral-* | Mistral |
| command-* | Cohere |
| llama-* | Groq |
Routing Strategies
Configured per fallback chain via the management API:
| Strategy | Behavior |
|---|---|
| round_robin | Distribute requests across targets in order |
| weighted | Traffic proportional to assigned weights |
| least_connections | Send to provider with fewest in-flight requests |
| random | Random selection among healthy providers |
| priority | Try targets in order; fall back only on failure |
| latency | Route to provider with lowest p95 rolling EMA latency (tracked in Redis) |
| cost | Route to provider with lowest per-token cost; skip providers over daily budget cap |
Fallback Chains
Named ordered provider lists with retry logic. Configured at POST /api/routing/fallback-chains.
{
"name": "production-chain",
"targets": [
{"provider": "openai", "model": "gpt-4o", "weight": 1},
{"provider": "anthropic", "model": "claude-sonnet-4-6", "weight": 1},
{"provider": "google", "model": "gemini-1.5-pro", "weight": 1}
],
"retry_on": ["429", "500", "502", "503"],
"max_retries": 2,
"backoff_ms": 500
}
On 429 or 5xx the proxy retries the next target with exponential backoff. Non-retryable 4xx errors bypass retry. Providers that have exceeded their daily budget cap are excluded from selection automatically.
MetaCache — Semantic Caching
The MetaCache intercepts every /v1/chat/completions request before any provider call is made.
How It Works
The incoming query is embedded and its cosine similarity is computed against stored request embeddings. If similarity exceeds the configured threshold, the cached response is returned. Otherwise the request is forwarded to the provider and the response is stored. Responses are semantically compressed before storage to reduce Redis footprint while preserving meaning.
Three tiers operate in sequence: L1 in-process memory, L2 Redis semantic similarity, L3 Redis exact match. Every cache lookup traverses all three before forwarding.
Per-Request Cache Controls
| Header | Effect |
|---|---|
| Cache-Control: no-cache | Bypass cache read; always query the provider |
| Cache-Control: no-store | Bypass cache write; do not cache this response |
| x-smartflow-cache-ttl: 3600 | Override TTL in seconds for this response |
| x-smartflow-cache-namespace: <ns> | Scope cache to a logical partition |
Cached responses return x-smartflow-cache-hit: true and x-smartflow-cache-key for client-side correlation.
MCP Gateway
Smartflow implements the Model Context Protocol (MCP) gateway. Register external MCP servers and invoke their tools through the proxy with shared authentication, budgeting, and audit logging.
Server Registry
List registered MCP servers.
Register an MCP server.
{
"id": "github-tools",
"name": "GitHub MCP Server",
"base_url": "https://mcp.github.example.com",
"auth_type": "bearer",
"allowed_tools": ["list_repos", "create_issue"],
"disallowed_tools": [],
"cost_info": {"per_call_usd": 0.001},
"guardrail_mode": "strict"
}
Tool Invocation
The proxy authenticates the request, applies per-tool access controls, records cost, and forwards to the server.
Catalog and Search
Browse the tool catalog across all registered servers.
Semantic search over the tool catalog. Returns the top k tools matching the natural-language query.
Full indexed tool list with embedding metadata.
Access Control
Per-server configuration fields for access control:
| Field | Type | Description |
|---|---|---|
| allowed_tools | string[] | If non-empty, only these tools may be called |
| disallowed_tools | string[] | These tools are always blocked |
| allowed_params | object | Per-tool parameter allowlists |
| guardrail_mode | string | "strict" — block on policy violation; "log" — flag and continue |
| available_on_public_internet | bool | If false, only accessible from approved network segments |
Access Request Flow
OAuth Flow
Usage and Logs
Aggregated cost and call counts per server and tool.
Per-invocation audit logs.
API Generation from OpenAPI Spec
Auto-generate an MCP server adapter from an OpenAPI specification.
{
"spec": "<OpenAPI JSON or YAML string>",
"server_id": "my-api",
"server_name": "My REST API",
"base_url": "https://api.example.com",
"include_methods": ["GET", "POST"]
}
A2A Agent Gateway
Smartflow implements the A2A (Agent-to-Agent) protocol for inter-agent communication. Register external agents and invoke them with full logging and routing.
Agent Card
Returns the agent's machine-readable capability card: name, capabilities, supported task types, and authentication requirements.
Task Invocation
Send a task to a registered agent. The proxy forwards the request, captures the response, and logs both.
{
"id": "task-uuid",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Summarize the latest earnings report."}]
}
}
Supports synchronous JSON responses and SSE streaming for long-running tasks. Include x-a2a-trace-id to correlate task invocations across agents in distributed workflows.
Vector Store API
Built-in vector store backed by Redis. No external vector database required. All endpoints are on the proxy at port 7775.
Create a vector store.
{
"name": "product-documentation",
"description": "Internal product docs",
"metadata": {"team": "engineering"}
}
Response includes id, name, description, file_count, created_at.
List all vector stores.
Get a specific vector store.
Delete a vector store and all its files.
Add a text document. The document is chunked and embedded automatically.
{
"content": "Full document text...",
"filename": "architecture.md",
"metadata": {"version": "3.0"}
}
List files in a vector store.
Semantic search over stored documents.
{
"query": "How does the caching layer work?",
"max_results": 5,
"score_threshold": 0.7
}
// Response
{
"results": [
{"file_id": "vf_xyz", "filename": "architecture.md", "content": "...chunk...", "score": 0.91}
],
"total": 1
}
RAG Pipeline API
Built on top of the vector store. Ingest documents with automatic chunking, then retrieve context for LLM augmentation.
Chunk a document, embed each chunk, and store in a named vector store.
{
"content": "Full document text...",
"vector_store_id": "vs_abc123",
"filename": "report-q4.txt",
"chunk_size": 512,
"chunk_overlap": 64,
"metadata": {"source": "internal"}
}
| Field | Type | Default | Description |
|---|---|---|---|
| content | string | required | Full document text |
| vector_store_id | string | required | Target store (must already exist) |
| filename | string | "" | Display name for the file |
| chunk_size | int | 512 | Characters per chunk |
| chunk_overlap | int | 64 | Overlap between consecutive chunks |
| metadata | object | {} | Arbitrary key-value metadata |
Response: { "store_id", "file_id", "chunks_created", "status": "completed" }
Embed a question, retrieve matching chunks, and optionally assemble a context string for injection into an LLM system prompt.
{
"query": "What were the Q4 revenue figures?",
"vector_store_id": "vs_abc123",
"max_results": 5,
"score_threshold": 0.0,
"include_context": true
}
| Field | Default | Description |
|---|---|---|
| query | required | Natural language question |
| vector_store_id | required | Store to search |
| max_results | 5 | Maximum chunks to return |
| score_threshold | 0.0 | Minimum cosine similarity (0 = return all) |
| include_context | true | Concatenate chunks into a context string field |
Response includes chunks[], context (concatenated string for prompt injection), and total.
Management API
Management API runs on port 7778.
Virtual Keys
All virtual key endpoints are on the management port (7778 / /api/enterprise/vkeys). Requires admin-level auth.
Create a virtual key. The token is returned once only — store it immediately. All fields except name and owner_id are optional.
// Request
{
"name": "team-alpha-prod", // human label
"owner_id": "alice@example.com", // user or team id
"provider": null, // null = any provider; or "openai","anthropic"…
"budget_usd": 100.00, // USD ceiling per period; null = unlimited
"period": "monthly", // daily | weekly | monthly | lifetime
"tpm_limit": 100000, // tokens per minute; null = unlimited
"rpm_limit": 60, // requests per minute; null = unlimited
"tags": { "team": "alpha", "env": "prod" }
}
// Response — token shown ONCE
{
"success": true,
"token": "sk-sf-a1b2c3d4e5f6...", // 48-hex — deliver to your client now
"key_id": "sk-sf-a1b2c3d4e5f6...",
"message": "Virtual key created. Store the token securely — it will not be shown again."
}
List all virtual keys. Tokens are never returned here — only metadata (name, owner, budget, active status).
{
"success": true,
"total": 3,
"keys": [
{
"key_id": "sk-sf-a1b2...",
"name": "team-alpha-prod",
"owner_id": "alice@example.com",
"active": true,
"budget_usd": 100.00,
"period": "monthly",
"period_start": "2026-03-01T00:00:00Z",
"tpm_limit": 100000,
"rpm_limit": 60,
"created_at": "2026-02-15T10:30:00Z",
"last_used": "2026-03-10T08:22:00Z",
"tags": { "team": "alpha" }
}
]
}
List all keys owned by a specific user or team id.
Get current period spend and remaining budget for a key.
{
"success": true,
"budget": {
"key_id": "sk-sf-a1b2...",
"name": "team-alpha-prod",
"active": true,
"budget_usd": 100.00,
"period": "monthly",
"spent_usd": 23.47,
"remaining_usd": 76.53,
"percent_used": 23.47,
"last_used": "2026-03-10T08:22:00Z"
}
}
Reset the spend counter to zero and restart the budget period. No request body needed.
{ "success": true, "message": "Budget reset for key 'sk-sf-a1b2...'" }
Deactivate a key. Revoked keys return 429 Virtual key is revoked on all subsequent requests. The key metadata is retained in Redis for audit purposes.
{ "success": true, "message": "Virtual key 'sk-sf-a1b2...' revoked" }
Budget Exceeded Response
When a request is blocked by budget enforcement, the proxy returns:
HTTP/1.1 429 Too Many Requests
X-Smartflow-Budget-Exceeded: true
X-Smartflow-Budget-Remaining: 0.0000
{
"error": {
"message": "Budget limit exceeded: $100.00 spent of $100.00 (monthly budget)",
"type": "budget_exceeded",
"code": "insufficient_quota"
},
"spent_usd": 100.04,
"remaining_usd": 0.0
}
Quick Start — cURL
# Create a key with a $50/month budget
curl -s https://YOUR_SMARTFLOW_HOST/api/enterprise/vkeys \
-H "Authorization: Bearer YOUR_ADMIN_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "my-app-prod",
"owner_id": "alice@example.com",
"budget_usd": 50.0,
"period": "monthly"
}'
# Check spend
curl -s https://YOUR_SMARTFLOW_HOST/api/enterprise/vkeys/sk-sf-xxx/budget \
-H "Authorization: Bearer YOUR_ADMIN_KEY"
# Revoke
curl -s -X POST https://YOUR_SMARTFLOW_HOST/api/enterprise/vkeys/sk-sf-xxx/revoke \
-H "Authorization: Bearer YOUR_ADMIN_KEY"
Routing API
Current routing state: active provider, fallback chain, last failure.
{"provider": "openai", "duration_seconds": 600}
Audit Logs (VAS)
Retrieve VAS audit logs. Every request proxied through Smartflow produces a log entry including: timestamp, provider, model, prompt tokens, completion tokens, cost in USD, cache hit flag, compliance flags, user context, and latency.
Retrieve logs aggregated across multiple Smartflow instances via the hybrid bridge.
Analytics
Usage analytics: request volume, cost by provider, cache hit rate, top models, top users.
Compliance API
The Compliance API runs on port 7777. It provides ML-based content scanning, PII detection and redaction, and an adaptive learning loop that improves over time based on human feedback. The proxy integrates with this service on every request when pre/post-call scanning is enabled.
Rule-based compliance scan against configured policies.
{
"content": "Text to scan",
"policy": "enterprise_standard",
"user_id": "user-123",
"org_id": "acme"
}
// Response
{
"has_violations": false,
"compliance_score": 0.97,
"risk_level": "low",
"recommended_action": "Allow",
"violations": [],
"pii_detected": []
}
Maestro ML policy engine. Evaluates intent against your organization's policy documents — not keyword matching.
Response includes risk_score (0–1), risk_level, recommended_action (Allow / Flag / Block), violations, explanation.
Submit a correction to improve the ML model's future predictions.
{
"scan_id": "scan-xyz",
"correct_action": "Allow",
"correct_risk_level": "low",
"notes": "False positive — internal terminology"
}
Detect and redact PII from content. Returns the redacted string.
Policy Perfect API
The Policy Perfect API runs on port 7782. It manages the organization's compliance policy library — the source documents the Maestro ML engine reads when evaluating requests. Backed by PostgreSQL for durable policy storage.
Liveness check for the Policy Perfect service.
Aggregate counts for the current state of the policy library.
{
"total_policies": 42,
"total_presets": 8,
"total_applications": 1204,
"compliance_violations": 3
}
Policies
Policies are named compliance rules attached to scopes. The Maestro engine evaluates all active policies on every request.
Policy types:
| Type | Description |
|---|---|
| compliance | Regulatory rules — HIPAA, GDPR, SOC 2, PCI-DSS, etc. |
| brand | Brand voice and communication standards |
| format | Output format constraints |
| role | Role-based access and behavior restrictions |
| industry | Industry-specific usage rules |
| legal | Legal department rules and disclaimers |
| security | Security guardrails and data handling policies |
List all active policies.
Create a policy.
{
"name": "HIPAA PHI Protection",
"description": "Prevent transmission of protected health information",
"policy_type": "compliance",
"content": "Do not include patient names, diagnoses, medical record numbers, or any PHI in AI responses.",
"priority": 90,
"applicable_providers": ["all"],
"applicable_models": ["all"],
"regulatory_framework": "HIPAA",
"severity": "critical",
"metadata": {
"departments": ["clinical", "billing"],
"ad_groups": ["clinicians", "admins"]
}
}
| Field | Type | Description |
|---|---|---|
| name | string | Policy display name |
| policy_type | string | One of the seven policy types above |
| content | string | Policy text read by the Maestro ML engine |
| priority | int | Evaluation order (0–100); higher values evaluated first |
| applicable_providers | string[] | Providers this policy applies to; ["all"] for universal |
| applicable_models | string[] | Models this policy applies to; ["all"] for universal |
| regulatory_framework | string | HIPAA, GDPR, SOC2, PCI-DSS, etc. |
| severity | string | critical, high, medium, low |
| metadata | object | Layer 2/3 targeting: source_ips, ad_groups, departments, applications |
Get a policy by ID.
Update a policy. All fields optional; only supplied fields are changed. Set "is_active": false to deactivate without deleting.
Delete a policy permanently.
Presets
Presets are named, ordered collections of policies. Assign a preset to a team, role, or virtual key instead of managing individual policies per scope.
List all presets. Each entry includes the preset metadata and its ordered policy list.
Create a preset.
{
"name": "Healthcare Standard",
"description": "Default policy set for all clinical staff",
"use_case": "Clinical AI assistant",
"policy_ids": ["pol_hipaa_phi", "pol_brand_tone", "pol_no_diagnosis"]
}
Policy order in policy_ids determines evaluation priority.
Get a preset and its full ordered policy list.
AI Document-to-Policy Generation
Upload a compliance document (PDF, DOCX, TXT — up to 50 MB). The service uses GPT-4o to extract structured policy suggestions automatically. Processing is asynchronous; poll for progress with the returned job ID.
Multipart form upload. Field name: file.
Content-Type: multipart/form-data
file=@hipaa-policy-handbook.pdf
Immediate response:
{
"success": true,
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"message": "Document processing started."
}
Poll for processing status. Status values: pending, processing, completed, failed.
{
"success": true,
"job": {
"id": "550e8400-...",
"filename": "hipaa-policy-handbook.pdf",
"status": "processing",
"progress_pct": 62,
"created_at": "2026-02-19T10:00:00Z"
}
}
Retrieve suggested policies once status is completed. Each suggestion includes a confidence score (0–1). Review suggestions and create live policies via POST /api/policies.
{
"success": true,
"job_id": "550e8400-...",
"filename": "hipaa-policy-handbook.pdf",
"total_policies": 7,
"suggested_policies": [
{
"id": "sugg_abc",
"name": "Minimum Necessary Standard",
"type": "compliance",
"content": "Limit PHI access and disclosure to the minimum necessary...",
"priority": 85,
"regulatory_framework": "HIPAA",
"confidence": 0.94
}
]
}
Alerting
Smartflow fires HTTP POST webhooks when threshold events occur. Configuration is via environment variables on the proxy server.
| Alert Type | Trigger |
|---|---|
| BudgetThreshold | Provider or virtual key spend exceeds configured cap |
| ProviderFailure | Error rate for a provider exceeds spike threshold |
| SlowRequest | Request latency exceeds the slow-request threshold |
| Custom | Programmatic alerts from the management API |
Configure any combination of webhook destinations:
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/...
TEAMS_WEBHOOK_URL=https://outlook.office.com/webhook/...
DISCORD_WEBHOOK_URL=https://discord.com/api/webhooks/...
SMARTFLOW_ALERTS_ENABLED=true
Alerts are fire-and-forget — they do not block the request that triggered them.
Observability
Returns 200 OK with {"status":"ok"} when the proxy process is running.
Returns 200 OK when Redis is connected and providers are reachable.
Prometheus-compatible metrics. Exposed metrics:
| Metric | Description |
|---|---|
| smartflow_requests_total | Request counter by provider, model, status |
| smartflow_request_latency_seconds | Request latency histogram |
| smartflow_cache_hits_total | Cache hit counter by tier (L1/L2/L3) |
| smartflow_cache_misses_total | Cache miss counter |
| smartflow_provider_errors_total | Upstream error counter by provider and status |
| smartflow_tokens_total | Token usage by provider and direction |
| smartflow_cost_usd_total | Cumulative cost by provider |
| smartflow_mcp_calls_total | MCP tool invocation counter by server and tool |
| smartflow_vkey_spend_usd | Per-virtual-key spend gauge |
Python SDK
Installation
pip install smartflow-sdk
# or from source
pip install git+https://github.com/SRAGroupTX/SmartflowV3.git#subdirectory=sdk/python
Requirements: Python 3.10+, httpx >= 0.24
SmartflowClient
The primary async client.
class SmartflowClient(
base_url: str,
api_key: Optional[str] = None,
timeout: float = 30.0,
management_port: int = 7778,
compliance_port: int = 7777,
bridge_port: int = 3500,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
| base_url | str | — | Proxy URL, e.g. "https://smartflow.example.com" |
| api_key | str | None | Virtual key sent as Authorization: Bearer |
| timeout | float | 30.0 | Request timeout in seconds |
| management_port | int | 7778 | Management API port |
| compliance_port | int | 7777 | Compliance API port |
| bridge_port | int | 3500 | Hybrid bridge port |
from smartflow import SmartflowClient
async with SmartflowClient("https://smartflow.example.com", api_key="sk-sf-...") as sf:
reply = await sf.chat("What is the capital of France?")
print(reply)
Core AI Methods
chat()
async def chat(
message: str,
model: str = "gpt-4o",
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs,
) -> str
Send a message, receive the reply as a plain string.
reply = await sf.chat("Summarise this in one sentence.", model="claude-sonnet-4-6")
chat_completions()
async def chat_completions(
messages: List[Dict[str, str]],
model: str = "gpt-4o",
temperature: float = 0.7,
max_tokens: Optional[int] = None,
stream: bool = False,
**kwargs,
) -> AIResponse
Full OpenAI-compatible completions. Returns an AIResponse object.
response = await sf.chat_completions(
messages=[
{"role": "system", "content": "You are a concise assistant."},
{"role": "user", "content": "What is 2 + 2?"},
],
model="gpt-4o-mini", max_tokens=50,
)
print(response.content)
print(response.usage.total_tokens)
stream_chat()
async def stream_chat(message: str, model: str = "gpt-4o", ...) -> AsyncIterator[str]
Async generator that yields text delta strings as they stream.
async for chunk in sf.stream_chat("Tell me a story about a robot"):
print(chunk, end="", flush=True)
embeddings()
async def embeddings(
input: Union[str, List[str]],
model: str = "text-embedding-3-small",
encoding_format: str = "float",
dimensions: Optional[int] = None,
input_type: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]
result = await sf.embeddings("Hello world")
vector = result["data"][0]["embedding"]
# Cohere with input_type
result = await sf.embeddings(
["doc one", "doc two"],
model="cohere/embed-english-v3.0",
input_type="search_document",
)
# Reduce dimensions
result = await sf.embeddings("Hello", model="text-embedding-3-large", dimensions=256)
rerank()
result = await sf.rerank(
"What is the return policy?",
["We accept returns within 30 days.", "Contact support@example.com."],
top_n=1,
)
claude_message()
Send a message to Claude via the native Anthropic Messages API path (/anthropic/v1/messages). The proxy injects the API key automatically.
async def claude_message(
message: str,
model: str = "claude-sonnet-4-6",
max_tokens: int = 1024,
system: Optional[str] = None,
anthropic_key: Optional[str] = None,
) -> str
reply = await sf.claude_message(
"Summarise this contract in three bullet points.",
model="claude-sonnet-4-6",
system="You are a legal assistant.",
max_tokens=512,
)
chatbot_query()
Query Smartflow's built-in system chatbot for natural-language operational queries about logs, cache stats, cost analysis, and system health.
result = await sf.chatbot_query("show me today's cache stats")
print(result["response"])
result = await sf.chatbot_query("which provider had the most errors this week?")
Provider Routing Examples
All chat and completion methods accept a model argument that determines which provider is used. No additional client-side configuration is required.
# OpenAI
reply = await sf.chat("Hello", model="gpt-4o")
reply = await sf.chat("Hello", model="gpt-4o-mini")
reply = await sf.chat("Hello", model="o3-mini")
# Anthropic — model-name heuristic, no prefix needed
reply = await sf.chat("Hello", model="claude-sonnet-4-6")
reply = await sf.chat("Hello", model="claude-3-opus-20240229")
# Google Gemini — model-name heuristic
reply = await sf.chat("Hello", model="gemini-1.5-pro")
reply = await sf.chat("Hello", model="gemini-2.0-flash")
# xAI Grok — explicit prefix
reply = await sf.chat("Hello", model="xai/grok-2-latest")
# Mistral
reply = await sf.chat("Hello", model="mistral/mistral-large-latest")
# Cohere
reply = await sf.chat("Hello", model="cohere/command-r-plus")
# Groq (fast Llama inference)
reply = await sf.chat("Hello", model="groq/llama-3.1-70b-versatile")
# OpenRouter (200+ models through one key)
reply = await sf.chat("Hello", model="openrouter/meta-llama/llama-3.1-405b")
# Local Ollama
reply = await sf.chat("Hello", model="ollama/llama3.2")
# Azure OpenAI — deployment name as suffix
reply = await sf.chat("Hello", model="azure/my-gpt4o-deployment")
# Native Anthropic path (uses /anthropic/v1/messages directly)
reply = await sf.claude_message("Hello", model="claude-sonnet-4-6")
MCP Tool Invocation via SDK
The Python SDK does not expose dedicated MCP methods. MCP tool calls are made as direct HTTP requests to the proxy. Use httpx or any HTTP client. All requests go through the proxy's authentication, access control, and cost tracking.
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"https://smartflow.example.com/github-tools/mcp/",
headers={
"Authorization": "Bearer sk-sf-...",
"Content-Type": "application/json",
},
json={
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": {
"name": "create_issue",
"arguments": {
"repo": "my-org/my-repo",
"title": "Bug: login fails on mobile",
"body": "Steps to reproduce..."
}
}
}
)
print(response.json()["result"]["content"])
Discover available tools with a natural-language search:
async with httpx.AsyncClient() as client:
r = await client.get(
"https://smartflow.example.com/api/mcp/tools/search",
params={"q": "create github issue", "k": 3},
headers={"Authorization": "Bearer sk-sf-..."},
)
for tool in r.json()["results"]:
print(f"{tool['server_id']}.{tool['name']}: {tool['description']}")
A2A Agent Invocation via SDK
A2A tasks are sent as HTTP POST requests to the proxy. The proxy forwards to the registered agent, logs the exchange, and returns the result. Include x-a2a-trace-id to correlate across multi-agent workflows.
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"https://smartflow.example.com/a2a/summarizer-agent",
headers={
"Authorization": "Bearer sk-sf-...",
"Content-Type": "application/json",
"x-a2a-trace-id": "trace-abc-123",
},
json={
"id": "task-uuid-001",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Summarise the Q4 earnings report."}]
}
}
)
print(response.json()["result"]["parts"][0]["text"])
# Retrieve the agent's capability card
r = await client.get(
"https://smartflow.example.com/a2a/summarizer-agent/.well-known/agent.json",
headers={"Authorization": "Bearer sk-sf-..."},
)
print(r.json()["capabilities"])
Audio and Image Methods
audio_transcription()
with open("recording.mp3", "rb") as f:
result = await sf.audio_transcription(f, model="whisper-1")
print(result["text"])
# Groq Whisper (faster, same format)
with open("recording.mp3", "rb") as f:
result = await sf.audio_transcription(f, model="groq/whisper-large-v3")
text_to_speech()
audio = await sf.text_to_speech("Hello, this is Smartflow.", voice="nova")
with open("output.mp3", "wb") as f:
f.write(audio)
image_generation()
result = await sf.image_generation(
"A mountain landscape at dawn",
model="dall-e-3", size="1792x1024", quality="hd",
)
print(result["data"][0]["url"])
Compliance Methods
check_compliance()
result = await sf.check_compliance("User message text", policy="hipaa")
if result.has_violations:
print(result.violations)
intelligent_scan()
ML-based scan combining regex, embedding similarity, behavioral analysis, and organization baselines.
async def intelligent_scan(
content: str,
user_id: Optional[str] = None,
org_id: Optional[str] = None,
context: Optional[str] = None,
) -> IntelligentScanResult
result = await sf.intelligent_scan(
"My SSN is 123-45-6789",
user_id="user-123",
org_id="acme-corp",
context="customer_support",
)
print(f"{result.risk_level}: {result.recommended_action}")
# "high: Block"
redact_pii()
clean = await sf.redact_pii("My SSN is 123-45-6789, email me at john@example.com")
# "My SSN is [SSN], email me at [EMAIL]"
submit_compliance_feedback()
Submit a true/false-positive correction. Used to retrain the ML model.
async def submit_compliance_feedback(
scan_id: str,
is_false_positive: bool,
user_id: Optional[str] = None,
notes: Optional[str] = None,
) -> Dict[str, Any]
await sf.submit_compliance_feedback(
scan_id="scan-xyz",
is_false_positive=True,
notes="This was a test phone number, not real PII",
)
ML Learning and Org Baseline Methods
| Method | Returns | Description |
|---|---|---|
| get_learning_status(user_id) | LearningStatus | Adaptive learning progress for a specific user |
| get_learning_summary() | LearningSummary | Organization-wide learning summary |
| get_ml_stats() | MLStats | Pattern counts, accuracy, and categories for the ML engine |
| get_org_summary() | Dict | Organization-level compliance summary |
| get_org_baseline(org_id) | OrgBaseline | Behavioral baseline used for anomaly detection |
| get_persistence_stats() | PersistenceStats | Redis persistence statistics for compliance data |
| save_compliance_data() | Dict | Trigger manual flush of compliance data to Redis |
| get_intelligent_health() | Dict | Health status of the ML engine and all sub-components |
Monitoring Methods
get_cache_stats()
stats = await sf.get_cache_stats()
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Tokens saved: {stats.tokens_saved:,}")
print(f"Cost saved: ${stats.cost_saved_usd:.4f}")
print(f"L1/L2/L3: {stats.l1_hits} / {stats.l2_hits} / {stats.l3_hits}")
health_comprehensive()
h = await sf.health_comprehensive()
print(h.overall_status) # "healthy"
print(h.redis_connected) # True
print(h.providers_available) # ["openai", "anthropic", "google"]
Other monitoring methods
| Method | Returns |
|---|---|
| health() | Dict — basic liveness check |
| get_provider_health() | List[ProviderHealth] — latency + success rate per provider |
| get_logs(limit, provider) | List[VASLog] — audit log entries from local instance |
| get_logs_hybrid(limit) | List[Dict] — logs aggregated across all instances via hybrid bridge |
| get_analytics(start_date, end_date) | Dict — usage and cost analytics |
| get_routing_status() | Dict — current routing state |
| force_provider(provider, duration_seconds) | Dict — force routing for a duration |
SmartflowAgent
Stateful agent with conversation memory and per-message compliance scanning.
async with SmartflowClient("https://smartflow.example.com", api_key="sk-...") as sf:
agent = SmartflowAgent(
client=sf,
name="SupportBot",
model="gpt-4o",
system_prompt="You are a helpful customer support agent.",
user_id="user-123",
org_id="acme",
)
r1 = await agent.chat("How do I reset my password?")
r2 = await agent.chat("What if I forgot my email too?")
print(agent.message_count)
agent.clear_history()
| Method | Description |
|---|---|
| chat(message, scan_input=True, scan_output=True) | Send message; raises ComplianceError if blocked |
| clear_history() | Reset conversation, preserve system prompt |
| get_history() | Return copy of message history |
| message_count | Number of messages in history |
SmartflowWorkflow
Chain AI operations with branching and error handling.
workflow = SmartflowWorkflow(client, name="TicketFlow")
workflow \
.add_step("classify", action="chat",
config={"prompt": "Classify this ticket: {input}", "model": "gpt-4o-mini"}) \
.add_step("check", action="compliance_check",
config={"content": "{output}"}) \
.add_step("route", action="condition",
config={"field": "output", "cases": {"billing": "billing_step"}, "default": "general_step"})
result = await workflow.execute({"input": ticket_text})
print(result.output)
print(result.steps_executed)
print(result.execution_time_ms)
| Action | Config fields | Description |
|---|---|---|
| "chat" | prompt, model, temperature | Chat completion; {input} / {output} are template variables |
| "compliance_check" | content | Compliance scan |
| "condition" | field, cases, default | Branch on a context value |
SyncSmartflowClient
Synchronous wrapper for scripts and Jupyter notebooks. Every async method is available without await.
from smartflow import SyncSmartflowClient
sf = SyncSmartflowClient("https://smartflow.example.com", api_key="sk-...")
reply = sf.chat("Hello!")
emb = sf.embeddings("Hello", model="text-embedding-3-small")
img = sf.image_generation("A sunset", model="dall-e-3")
transcript = sf.audio_transcription(open("audio.mp3", "rb"))
audio = sf.text_to_speech("Hello!", voice="nova")
ranked = sf.rerank("What is the return policy?", ["doc1", "doc2"])
In Jupyter with an existing event loop: pip install nest_asyncio then nest_asyncio.apply().
OpenAI Drop-in Replacement
Any code targeting the OpenAI API works by pointing base_url at Smartflow. MetaCache, compliance scanning, VAS logging, and routing apply transparently.
from openai import OpenAI
client = OpenAI(
api_key="sk-sf-your-virtual-key",
base_url="https://smartflow.example.com/v1"
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
Response Types
AIResponse
| Field | Type | Description |
|---|---|---|
| content | str | First choice text |
| choices | list | Full choices array |
| usage | Usage | Token usage |
| model | str | Model used |
| id | str | Response ID |
CacheStats
| Field | Type |
|---|---|
| hit_rate | float |
| total_requests | int |
| tokens_saved | int |
| cost_saved_usd | float |
| l1_hits | int |
| l2_hits | int |
| l3_hits | int |
ComplianceResult
| Field | Type |
|---|---|
| has_violations | bool |
| compliance_score | float |
| violations | list[str] |
| pii_detected | list[str] |
| risk_level | str — "low" / "medium" / "high" / "critical" |
| recommendations | list[str] |
| redacted_content | str | None |
IntelligentScanResult
| Field | Type |
|---|---|
| risk_score | float — 0.0 to 1.0 |
| risk_level | str |
| recommended_action | str — "Allow" / "Flag" / "Block" |
| violations | list |
| explanation | str |
Response Headers
Every proxied response includes these headers:
| Header | Description |
|---|---|
| x-smartflow-provider | Provider that served the request |
| x-smartflow-model | Actual model used |
| x-smartflow-request-id | Unique request ID for log correlation |
| x-smartflow-cache-hit | true if response was served from MetaCache |
| x-smartflow-cache-key | Cache key when cache-hit is true |
| x-smartflow-latency-ms | Total proxy latency in milliseconds |
| x-smartflow-cost-usd | Estimated cost in USD for this request |
| x-smartflow-compliance-score | Compliance score (0–1) when pre-call scan is enabled |
Environment Variables
Set on the Smartflow server; not used in client code.
Provider Keys
| Variable | Provider |
|---|---|
| OPENAI_API_KEY | OpenAI |
| ANTHROPIC_API_KEY | Anthropic |
| GEMINI_API_KEY | Google Gemini |
| XAI_API_KEY | xAI / Grok |
| OPENROUTER_API_KEY | OpenRouter |
| AZURE_API_KEY, AZURE_API_BASE, AZURE_API_VERSION | Azure OpenAI |
| MISTRAL_API_KEY | Mistral AI |
| COHERE_API_KEY | Cohere |
| GROQ_API_KEY | Groq |
| DEEPGRAM_API_KEY | Deepgram |
| FIREWORKS_API_KEY | Fireworks AI |
| NVIDIA_NIM_API_KEY, NVIDIA_NIM_API_BASE | NVIDIA NIM |
| HUGGINGFACE_API_KEY, HUGGINGFACE_API_BASE | HuggingFace |
| TOGETHER_API_KEY | Together AI |
| PERPLEXITY_API_KEY | Perplexity AI |
| REPLICATE_API_KEY | Replicate |
| VERTEXAI_API_KEY, VERTEXAI_PROJECT, VERTEXAI_LOCATION | Vertex AI |
| AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION | AWS Bedrock |
| NOVITA_API_KEY | Novita AI |
| VERCEL_AI_GATEWAY_API_KEY | Vercel AI Gateway |
Feature Flags and Ports
| Variable | Default | Description |
|---|---|---|
| GEMINI_ENABLED | false | Enable Google Gemini in intelligent routing |
| SMARTFLOW_ALERTS_ENABLED | true | Enable webhook alerting |
| SLACK_WEBHOOK_URL | — | Slack incoming webhook |
| TEAMS_WEBHOOK_URL | — | Microsoft Teams webhook |
| DISCORD_WEBHOOK_URL | — | Discord webhook |
| PROXY_PORT | 7775 | LLM proxy port |
| MANAGEMENT_PORT | 7778 | Management API port |
| COMPLIANCE_PORT | 7777 | Compliance API port |
| BRIDGE_PORT | 3500 | Hybrid bridge port |
Error Reference
HTTP Status Codes
| Code | Meaning |
|---|---|
| 400 | Malformed request — check body format |
| 401 | Missing or invalid API key |
| 402 | Virtual key budget exceeded |
| 403 | Request blocked by compliance policy |
| 404 | Resource or route not found |
| 429 | Rate limit exceeded (RPM or TPM) |
| 500 | Proxy internal error |
| 502 | Upstream provider returned an error |
| 503 | No providers available — fallback chain exhausted |
SDK Exceptions
| Exception | Condition |
|---|---|
| SmartflowError | Base class for all SDK errors |
| ConnectionError | Cannot connect to proxy |
| AuthenticationError | 401 — invalid or missing key |
| RateLimitError | 429 — rate limit hit |
| ComplianceError | 403 — request blocked by policy |
| ProviderError | Upstream provider error |
| TimeoutError | Request timeout |
from smartflow import ComplianceError, RateLimitError
import asyncio
try:
result = await sf.chat("sensitive message")
except ComplianceError as e:
print(f"Blocked by policy: {e}")
except RateLimitError:
await asyncio.sleep(60)
# retry
Changelog
v3.0 (proxy) / v0.3.0 (SDK) — 2026
New in the proxy:
- Vector Store API (
/v1/vector_stores/*) — Redis-backed, no external vector database required - RAG Pipeline API (
/v1/rag/ingest,/v1/rag/query) — document chunking, embedding, context retrieval - A2A Agent Gateway (
/a2a/*) — A2A protocol for inter-agent orchestration - Webhook alerting — Slack, Teams, Discord for budget, failure, and latency events
- Model-name heuristic routing —
claude-*,gemini-*,gpt-*detected automatically - Anthropic API key injection for
/anthropic/*passthrough - Cost-based and latency-based routing strategies
- Prometheus metrics endpoint (
/metrics) - MCP access control —
allowed_tools,disallowed_tools,guardrail_modeper server - MCP cost tracking via Redis
HINCRBYFLOAT
New in the SDK:
image_generation()— multi-provider image generationaudio_transcription()— multipart audio, Groq/Deepgram/Fireworks routingtext_to_speech()— returns raw audio bytesstream_chat()— async SSE iteratorrerank()— Cohere-compatible document reranking- Extended
embeddings()withencoding_format,dimensions,input_type
v2.0 (proxy) / v0.2.0 (SDK)
- MCP gateway with server registry, catalog, OAuth flow
SmartflowAgentwith compliance scanning and conversation memorySmartflowWorkflowfor multi-step AI pipelines- Maestro ML policy engine (intelligent compliance)
v1.0 (proxy) / v0.1.0 (SDK)
- OpenAI-compatible proxy, virtual keys, 3-tier semantic cache
- Initial SDK:
chat,chat_completions,embeddings - VAS audit logging,
SyncSmartflowClient