Smartflow SDK for Python
What is Smartflow?
Smartflow is an enterprise AI orchestration layer that sits between your application and AI providers (OpenAI, Anthropic, Google, and others). It provides:
- Intelligent Routing— Route requests to the best provider based on cost, latency, and availability
- 3-Layer Semantic Cache— 60–80% cost reduction with L1/L2/L3 caching and semantic similarity matching
- ML-Powered Compliance— Real-time PII detection with adaptive learning and behavioral analysis
- Complete Audit Trail— Every AI interaction logged for compliance, debugging, and analytics
- Automatic Failover— Zero-downtime provider switching when issues occur
- MCP Tool Gateway— Register and invoke external MCP tools with shared auth, budgeting, and audit
- A2A Agent Orchestration— Route tasks to registered A2A agents with full traceability
Installation
pip install smartflow-sdk
From the wheel distributed by your LangSmart account team:
pip install smartflow_sdk-0.3.1-py3-none-any.whl
Optional — sync client in async environments:
pip install nest_asyncio
Quick Start
Async (Recommended)
import asyncio
from smartflow import SmartflowClient
async def main():
async with SmartflowClient("http://your-smartflow:7775") as sf:
# Automatic caching, compliance, failover, and audit on every call.
response = await sf.chat("Explain quantum computing in simple terms")
print(response)
asyncio.run(main())
Synchronous — Scripts and Notebooks
from smartflow import SyncSmartflowClient
sf = SyncSmartflowClient("http://your-smartflow:7775")
response = sf.chat("What is machine learning?")
print(response)
stats = sf.get_cache_stats()
print(f"Cache hit rate: {stats.hit_rate:.1%}")
print(f"Tokens saved: {stats.tokens_saved:,}")
sf.close()
For Jupyter notebooks with an existing event loop:
import nest_asyncio
nest_asyncio.apply()
OpenAI Drop-in Replacement
Zero code changes required — just update the base URL:
from openai import OpenAI
# Before: client = OpenAI()
# After: through Smartflow — caching, compliance, logging apply transparently
client = OpenAI(
base_url="http://your-smartflow:7775/v1",
api_key="sk-sf-your-virtual-key"
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
SmartflowClient
Primary async client.
# As a context manager (recommended)
async with SmartflowClient("http://smartflow:7775", api_key="sk-sf-...") as sf:
...
# Manual lifecycle
sf = SmartflowClient("http://smartflow:7775")
await sf._ensure_client()
# ... use sf ...
await sf.close()
chat()
Send a message, receive the reply as a plain string.
async with SmartflowClient("http://smartflow:7775") as sf:
response = await sf.chat("Explain Docker containers")
response = await sf.chat(
message="Write a Python function to sort a list",
model="gpt-4o",
system_prompt="You are an expert Python developer.",
temperature=0.3,
max_tokens=1000,
)
chat_completions()
Full OpenAI-compatible completions. Returns a structured AIResponse.
response = await sf.chat_completions(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is REST API?"},
],
model="gpt-4o",
)
print(response.content)
print(f"Tokens used: {response.usage.total_tokens}")
print(f"Cached: {response.cached}") # True if served from MetaCache
stream_chat()
Async generator that yields text delta strings as they stream.
async for chunk in sf.stream_chat("Tell me a story about a robot"):
print(chunk, end="", flush=True)
print()
claude_message()
Send a message to Claude using the native Anthropic Messages API path. The proxy injects the API key automatically.
response = await sf.claude_message(
message="Analyze this code for security vulnerabilities",
model="claude-sonnet-4-6",
max_tokens=2000,
system="You are a senior security engineer.",
)
Routes to /anthropic/v1/messages. For multi-turn or multimodal use, call chat_completions() with the Claude model name.
embeddings()
Generate vector embeddings. Supports batch input and provider routing.
# Single text
result = await sf.embeddings("Hello, world!")
vector = result["data"][0]["embedding"]
# Batch
result = await sf.embeddings(["First document", "Second document", "Third document"])
vectors = [item["embedding"] for item in result["data"]]
# Cohere with input_type
result = await sf.embeddings(
["search query", "document text"],
model="cohere/embed-english-v3.0",
input_type="search_document",
)
# Reduce dimensions (OpenAI text-embedding-3+)
result = await sf.embeddings("Hello", model="text-embedding-3-large", dimensions=256)
image_generation()
result = await sf.image_generation(
"A futuristic city at sunrise",
model="dall-e-3",
size="1792x1024",
quality="hd",
style="vivid",
)
print(result["data"][0]["url"])
audio_transcription()
Transcribe audio. Accepts a file-like object.
with open("recording.mp3", "rb") as f:
result = await sf.audio_transcription(f, model="whisper-1")
print(result["text"])
# Groq (faster)
with open("recording.mp3", "rb") as f:
result = await sf.audio_transcription(f, model="groq/whisper-large-v3")
text_to_speech()
Synthesize speech. Returns raw audio bytes.
audio = await sf.text_to_speech("Hello, this is Smartflow.", voice="nova")
with open("output.mp3", "wb") as f:
f.write(audio)
rerank()
Rerank documents by relevance to a query.
result = await sf.rerank(
"What is the return policy?",
["We accept returns within 30 days.", "Contact support@example.com."],
top_n=1,
)
list_models()
models = await sf.list_models()
for m in models:
print(m["id"])
chatbot_query()
Query Smartflow's built-in system chatbot for operational information using natural language.
result = await sf.chatbot_query("show me today's cache stats")
print(result["response"])
result = await sf.chatbot_query("which provider had the most errors this week?")
result = await sf.chatbot_query("what did we spend on OpenAI yesterday?")
Provider Prefix Reference
All methods that accept a model parameter support provider prefix routing. The three primary providers are auto-detected from model name.
Automatic Detection (no prefix needed)
# OpenAI — gpt-*, o1-*, o3-*, chatgpt-*, whisper-*, dall-e-*
reply = await sf.chat("Hello", model="gpt-4o")
reply = await sf.chat("Hello", model="gpt-4o-mini")
reply = await sf.chat("Hello", model="o3-mini")
# Anthropic — claude-*
reply = await sf.chat("Hello", model="claude-sonnet-4-6")
reply = await sf.chat("Hello", model="claude-3-opus-20240229")
# Google Gemini — gemini-*
reply = await sf.chat("Hello", model="gemini-1.5-pro")
reply = await sf.chat("Hello", model="gemini-2.0-flash")
Explicit Prefix Required
reply = await sf.chat("Hello", model="xai/grok-2-latest")
reply = await sf.chat("Hello", model="mistral/mistral-large-latest")
reply = await sf.chat("Hello", model="cohere/command-r-plus")
reply = await sf.chat("Hello", model="groq/llama-3.1-70b-versatile")
reply = await sf.chat("Hello", model="openrouter/meta-llama/llama-3.1-405b")
reply = await sf.chat("Hello", model="ollama/llama3.2")
reply = await sf.chat("Hello", model="azure/my-gpt4o-deployment")
| Prefix | Provider | API Key Env Var |
|---|---|---|
| (none) | OpenAI | OPENAI_API_KEY |
anthropic/ | Anthropic | ANTHROPIC_API_KEY |
xai/ | xAI (Grok) | XAI_API_KEY |
gemini/ | Google Gemini | GEMINI_API_KEY |
vertex_ai/ | Google Vertex AI | VERTEXAI_API_KEY |
openrouter/ | OpenRouter | OPENROUTER_API_KEY |
azure/ | Azure OpenAI | AZURE_API_KEY + AZURE_API_BASE |
mistral/ | Mistral AI | MISTRAL_API_KEY |
cohere/ | Cohere | COHERE_API_KEY |
nvidia_nim/ | NVIDIA NIM | NVIDIA_NIM_API_KEY |
huggingface/ | HuggingFace | HUGGINGFACE_API_KEY |
groq/ | Groq | GROQ_API_KEY |
deepgram/ | Deepgram | DEEPGRAM_API_KEY |
fireworks/ | Fireworks AI | FIREWORKS_API_KEY |
novita/ | Novita AI | NOVITA_API_KEY |
together/ | Together AI | TOGETHER_API_KEY |
perplexity/ | Perplexity AI | PERPLEXITY_API_KEY |
replicate/ | Replicate | REPLICATE_API_KEY |
vercel_ai_gateway/ | Vercel AI Gateway | VERCEL_AI_GATEWAY_API_KEY |
ollama/ | Ollama (local) | none required |
Intelligent Compliance Engine
Smartflow's ML-powered compliance engine goes beyond regex. It learns and adapts based on user behavior and organizational baselines.
intelligent_scan()
result = await sf.intelligent_scan(
content="Please send payment to card 4111-1111-1111-1111",
)
print(f"Has violations: {result.has_violations}")
print(f"Risk score: {result.risk_score:.2f}") # 0.0 – 1.0
print(f"Action: {result.recommended_action}") # Allow/AllowAndLog/Block/Review
print(f"Explanation: {result.explanation}")
for v in result.regex_violations:
print(f" - {v['violation_type']}: {v['severity']}")
Enable behavioral analysis:
result = await sf.intelligent_scan(
content="Customer email: john.doe@example.com",
user_id="support_agent_42", # Track individual behavior
org_id="acme_corporation", # Compare against org baseline
context="customer_support",
)
check_compliance()
result = await sf.check_compliance("My SSN is 123-45-6789")
if result.has_violations:
print(f"Risk: {result.risk_level}")
print(f"PII: {result.pii_detected}")
print(f"Safe text: {result.redacted_content}")
redact_pii()
safe = await sf.redact_pii("Call me at 555-867-5309, email john@example.com")
# "Call me at [PHONE], email [EMAIL]"
submit_compliance_feedback()
Submit a true/false-positive correction to retrain the ML model.
# Store the scan response dict to get the scan_id
response = await sf._post(
f"{sf.compliance_url}/api/compliance/intelligent/scan",
{"content": "Call me at 555-0100"}
)
scan_id = response.get("scan_id")
if scan_id:
await sf.submit_compliance_feedback(
scan_id=scan_id,
is_false_positive=True,
user_id="admin_user",
notes="555-0100 is a known test number, not real PII",
)
Learning & ML Methods
# Organization-wide learning progress
summary = await sf.get_learning_summary()
print(f"Total users tracked: {summary.total_users}")
print(f"Users with complete baselines: {summary.users_learning_complete}")
print(f"Learning period: {summary.config_learning_days} days")
# Per-user learning status
status = await sf.get_learning_status("user-alice")
print(f"Progress: {status.progress_percent}%")
print(f"Complete: {status.learning_complete}")
# ML engine statistics
ml_stats = await sf.get_ml_stats()
print(f"Total patterns: {ml_stats.total_patterns}")
print(f"Learned patterns: {ml_stats.learned_patterns}")
print(f"Average confidence: {ml_stats.average_confidence:.2f}")
# Org baseline (for anomaly detection)
baseline = await sf.get_org_baseline("acme-corp")
print(f"Violation rate: {baseline.violation_rate:.2%}")
| Method | Returns | Description |
|---|---|---|
get_org_summary() | Dict | Organization-level aggregate compliance stats |
get_org_baseline(org_id) | OrgBaseline | Behavioral baseline used for anomaly detection |
get_persistence_stats() | PersistenceStats | Redis persistence stats for compliance data |
save_compliance_data() | Dict | Trigger manual flush of compliance data to Redis |
get_intelligent_health() | Dict | Health status of ML engine and embedding service |
MCP Tool Invocation
MCP tool calls are direct HTTP requests to the proxy. Use httpx or any HTTP client.
Calling a Tool
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://smartflow:7775/github-tools/mcp/",
headers={
"Authorization": "Bearer sk-sf-...",
"Content-Type": "application/json",
},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "create_issue",
"arguments": {
"repo": "my-org/my-repo",
"title": "Bug: login fails on mobile",
"body": "Steps to reproduce...",
}
}
}
)
result = response.json()
print(result["result"]["content"])
Searching the Tool Catalog
async with httpx.AsyncClient() as client:
r = await client.get(
"http://smartflow:7775/api/mcp/tools/search",
params={"q": "create github issue", "k": 3},
headers={"Authorization": "Bearer sk-sf-..."},
)
for tool in r.json()["results"]:
print(f"{tool['server_id']}.{tool['name']}: {tool['description']}")
MCP Usage and Cost
async with httpx.AsyncClient() as client:
r = await client.get(
"http://smartflow:7775/api/mcp/usage",
headers={"Authorization": "Bearer sk-sf-..."},
)
A2A Agent Invocation
A2A tasks are HTTP POST requests to the proxy. The proxy forwards to the registered agent, logs the exchange, and returns the result.
Sending a Task
import httpx
async with httpx.AsyncClient() as client:
response = await client.post(
"http://smartflow:7775/a2a/summarizer-agent",
headers={
"Authorization": "Bearer sk-sf-...",
"Content-Type": "application/json",
"x-a2a-trace-id": "trace-abc-123", # correlate across agents
},
json={
"id": "task-uuid-001",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Summarise the Q4 earnings report."}]
}
}
)
result = response.json()
print(result["result"]["parts"][0]["text"])
Agent Capability Card
async with httpx.AsyncClient() as client:
r = await client.get(
"http://smartflow:7775/a2a/summarizer-agent/.well-known/agent.json",
headers={"Authorization": "Bearer sk-sf-..."},
)
card = r.json()
print(card["name"], card["capabilities"])
The x-a2a-trace-id header is passed through all hops in a multi-agent chain so every agent's logs can be correlated by a single trace ID.
Cache Performance
stats = await sf.get_cache_stats()
print(f"Hit Rate: {stats.hit_rate:.1%}")
print(f"L1 (Memory): {stats.l1_hits:,} hits")
print(f"L2 (Semantic): {stats.l2_hits:,} hits")
print(f"L3 (Exact): {stats.l3_hits:,} hits")
print(f"Tokens saved: {stats.tokens_saved:,}")
print(f"Cost saved: ${stats.cost_saved_cents / 100:.2f}")
Provider Health
providers = await sf.get_provider_health()
for p in providers:
print(f"{p.provider}")
print(f" Status: {p.status}")
print(f" Latency: {p.latency_ms:.0f}ms")
print(f" Success: {p.success_rate:.1%}")
print(f" Requests: {p.requests_total:,}")
System Health
# Full system health
health = await sf.health_comprehensive()
print(health.status) # "healthy" | "degraded" | "unhealthy"
print(health.uptime_seconds)
print(health.version)
# Quick liveness check
status = await sf.health()
assert status["status"] == "ok"
Audit Logs (VAS)
logs = await sf.get_logs(limit=10, provider="openai")
for log in logs:
cached_badge = "[CACHED]" if log.cached else ""
print(f"[{log.timestamp}] {log.provider}/{log.model} {cached_badge}")
print(f" Tokens: {log.tokens_used} | Latency: {log.latency_ms:.0f}ms")
get_logs_hybrid()
Unified audit log across all Smartflow instances via the hybrid bridge.
all_logs = await sf.get_logs_hybrid(limit=500)
for log in all_logs:
print(f"{log['timestamp']} | {log['provider']} | {log['model']}")
Analytics
data = await sf.get_analytics(
start_date="2026-02-01",
end_date="2026-02-19",
)
Routing Control
# Current routing state
status = await sf.get_routing_status()
# Force to a specific provider (e.g. during an outage)
await sf.force_provider("openai", duration_seconds=600)
SmartflowAgent
Higher-level agent with conversation memory, compliance scanning, and tool support.
from smartflow import SmartflowClient, SmartflowAgent
async with SmartflowClient("http://smartflow:7775") as sf:
agent = SmartflowAgent(
client=sf,
name="TechSupport",
model="gpt-4o",
system_prompt="""You are a senior technical support engineer.
Guidelines:
- Be patient and thorough
- Ask clarifying questions when needed
- Provide step-by-step solutions
- Never ask for or repeat sensitive information""",
temperature=0.7,
compliance_policy="enterprise_standard",
enable_compliance_scan=True,
user_id="support_session_123",
org_id="tech_company",
)
print(await agent.chat("My application keeps crashing"))
print(await agent.chat("It's a Python web app using Flask"))
print(await agent.chat("Here's the error: MemoryError"))
print(f"Messages exchanged: {agent.message_count}")
agent.clear_history()
| Method / Property | Description |
|---|---|
chat(message, scan_input=True, scan_output=True) | Send message; raises ComplianceError if blocked |
clear_history() | Reset conversation, preserve system prompt |
get_history() | Return copy of message history |
message_count | Number of messages in history |
SmartflowWorkflow
Chain AI operations with branching logic.
from smartflow import SmartflowClient, SmartflowWorkflow
async with SmartflowClient("http://smartflow:7775") as sf:
workflow = SmartflowWorkflow(sf, name="ContentPipeline")
workflow.add_step(
name="analyze",
action="chat",
config={"prompt": "Analyze the tone of: {input}", "model": "gpt-4o-mini"},
next_steps=["compliance_check"],
)
workflow.add_step(
name="compliance_check",
action="compliance_check",
config={"content": "{input}"},
next_steps=["route"],
)
workflow.add_step(
name="route",
action="condition",
config={
"field": "output",
"cases": {"positive": "enhance", "negative": "review", "neutral": "publish"},
},
)
result = await workflow.execute({"input": "This product exceeded my expectations!"})
print(f"Success: {result.success}")
print(f"Path: {' -> '.join(result.steps_executed)}")
print(f"Time: {result.execution_time_ms:.0f}ms")
print(f"Tokens: {result.total_tokens}")
| Step Action | Config fields | Description |
|---|---|---|
"chat" | prompt, model, temperature | Chat completion; {input} / {output} are template variables |
"compliance_check" | content | Rule-based compliance scan |
"condition" | field, cases, default | Branch on a context value |
SyncSmartflowClient
Synchronous wrapper. Every async method is available without await.
from smartflow import SyncSmartflowClient
sf = SyncSmartflowClient("http://smartflow:7775", api_key="sk-sf-...")
reply = sf.chat("Hello!")
emb = sf.embeddings("Hello", model="text-embedding-3-small")
img = sf.image_generation("A sunset", model="dall-e-3")
transcript = sf.audio_transcription(open("audio.mp3", "rb"), model="whisper-1")
audio = sf.text_to_speech("Hello!", voice="nova")
ranked = sf.rerank("What is the return policy?", ["doc1", "doc2"])
stats = sf.get_cache_stats()
logs = sf.get_logs(limit=20)
sf.close()
Configuration
Client Options
sf = SmartflowClient(
base_url="http://smartflow:7775", # Proxy endpoint
api_key="sk-sf-...", # Virtual key for authentication
timeout=30.0, # Request timeout in seconds
management_port=7778, # Health, metrics, routing API
compliance_port=7777, # Compliance API
bridge_port=3500, # Hybrid bridge (cross-instance logs)
)
From Environment Variables
import os
from smartflow import SmartflowClient
sf = SmartflowClient(
base_url=os.environ["SMARTFLOW_URL"],
api_key=os.environ.get("SMARTFLOW_API_KEY"),
)
Error Handling
from smartflow import (
SmartflowClient,
SmartflowError,
ConnectionError,
ComplianceError,
RateLimitError,
TimeoutError,
)
import asyncio
try:
async with SmartflowClient("http://smartflow:7775") as sf:
response = await sf.chat("Hello!")
except ConnectionError:
print("Cannot connect to Smartflow proxy")
except ComplianceError as e:
print(f"Blocked by compliance policy: {e}")
except RateLimitError:
print("Rate limited — backing off")
await asyncio.sleep(60)
except TimeoutError:
print("Request timed out")
except SmartflowError as e:
print(f"Smartflow error: {e}")
| Exception | Condition |
|---|---|
SmartflowError | Base class for all SDK errors |
ConnectionError | Cannot connect to proxy |
AuthenticationError | 401 — invalid or missing key |
RateLimitError | 429 — rate limit hit |
ComplianceError | 403 — request blocked by compliance policy |
ProviderError | Upstream provider error |
TimeoutError | Request timeout |
Response Types
AIResponse
| Field | Type | Description |
|---|---|---|
content | str | First choice text |
choices | list | Full choices array |
usage | Usage | Token usage (prompt_tokens, completion_tokens, total_tokens) |
model | str | Model used |
id | str | Response ID |
cached | bool | True if served from MetaCache |
cache_hit_type | str | "exact", "semantic", or None |
provider | str | Provider that served the request |
CacheStats
| Field | Type |
|---|---|
hit_rate | float |
hits / misses | int |
l1_hits / l2_hits / l3_hits | int |
tokens_saved | int |
cost_saved_cents | int |
entries | int |
ComplianceResult
| Field | Type |
|---|---|
has_violations | bool |
compliance_score | float |
violations | list[str] |
pii_detected | list[str] |
risk_level | str—"low" / "medium" / "high" / "critical" |
recommendations | list[str] |
redacted_content | str | None |
IntelligentScanResult
| Field | Type |
|---|---|
has_violations | bool |
risk_score | float— 0.0 to 1.0 |
recommended_action | str—"Allow" / "AllowAndLog" / "Review" / "Block" |
explanation | str |
regex_violations | list |
ml_violations | list |
behavior_deviations | list |
processing_time_us | int |
VASLog
| Field | Type |
|---|---|
request_id | str |
timestamp | str |
provider | str |
model | str |
tokens_used | int |
latency_ms | float |
cached | bool |
compliance | ComplianceResult | None |
ProviderHealth
| Field | Type |
|---|---|
provider | str |
status | str—"healthy" / "degraded" / "unhealthy" |
latency_ms | float |
success_rate | float |
error_rate | float |
requests_total | int |
last_updated | str |
SystemHealth
| Field | Type |
|---|---|
status | str |
uptime_seconds | int |
version | str |
providers | dict |
cache | dict |
timestamp | str |
WorkflowResult
| Field | Type |
|---|---|
success | bool |
output | str |
steps_executed | list[str] |
errors | list |
total_tokens | int |
total_cost_cents | int |
execution_time_ms | float |
Use Case 1: Secure Customer Support Bot
import asyncio
from smartflow import SmartflowClient, SmartflowAgent
class SecureCustomerSupportBot:
def __init__(self, smartflow_url: str):
self.sf_url = smartflow_url
async def handle_customer_session(self, customer_id: str, organization: str):
async with SmartflowClient(self.sf_url) as sf:
agent = SmartflowAgent(
client=sf,
name="SecureSupport",
model="gpt-4o",
system_prompt="""You are a helpful customer support agent.
CRITICAL RULES:
1. NEVER ask customers for full credit card numbers, SSNs, or passwords
2. If a customer shares sensitive info, acknowledge receipt but do not repeat it
3. For account verification, use last 4 digits only
4. Always offer secure channels for sensitive transactions""",
compliance_policy="pci_dss_strict",
enable_compliance_scan=True,
user_id=f"customer_{customer_id}",
org_id=organization,
)
while True:
user_input = input("Customer: ")
if user_input.lower() == "quit":
break
try:
response = await agent.chat(user_input)
print(f"Support: {response}")
except Exception as e:
if "compliance" in str(e).lower():
print("Support: For your protection, please use our secure verification process.")
# asyncio.run(SecureCustomerSupportBot("http://smartflow:7775").handle_customer_session("12345", "fintech_corp"))
What this demonstrates:PII detection blocks sensitive data before it reaches the AI provider · Behavioral tracking learns normal patterns per customer · Complete audit trail for compliance audits · Graceful handling of compliance violations
Use Case 2: Cost-Optimized Content Generation Pipeline
import asyncio
from dataclasses import dataclass
from typing import List
from smartflow import SmartflowClient
@dataclass
class Product:
id: str
name: str
category: str
features: List[str]
price: float
class ContentGenerationPipeline:
def __init__(self, smartflow_url: str):
self.sf_url = smartflow_url
async def generate_description(self, sf, product: Product) -> dict:
# Structured prompt to maximize semantic cache hits across similar products
prompt = f"""Write a compelling product description.
Category: {product.category}
Product: {product.name}
Key Features: {', '.join(product.features)}
Price: ${product.price:.2f}
Requirements: 2-3 sentences, highlight key benefits, call-to-action, professional tone"""
response = await sf.chat_completions(
messages=[{"role": "user", "content": prompt}],
model="gpt-4o-mini",
temperature=0.7,
)
return {"product_id": product.id, "description": response.content,
"cached": response.cached, "tokens": response.usage.total_tokens}
async def process_catalog(self, products: List[Product]) -> dict:
async with SmartflowClient(self.sf_url) as sf:
initial_stats = await sf.get_cache_stats()
results = []
cached_count = 0
total_tokens = 0
for i, product in enumerate(products):
result = await self.generate_description(sf, product)
results.append(result)
if result["cached"]:
cached_count += 1
total_tokens += result["tokens"]
final_stats = await sf.get_cache_stats()
cost_saved = final_stats.cost_saved_cents - initial_stats.cost_saved_cents
cache_hit_rate = cached_count / len(products) if products else 0
return {
"results": results,
"summary": {
"total_products": len(products),
"cache_hit_rate": f"{cache_hit_rate:.1%}",
"tokens_used": total_tokens,
"cost_saved": f"${cost_saved / 100:.2f}",
},
}
# asyncio.run(ContentGenerationPipeline("http://smartflow:7775").process_catalog([...]))
What this demonstrates:Semantic caching recognizes similar products and reuses responses · Structured prompts maximize cache hit potential · Real-time cost tracking viacost_saved_cents
Use Case 3: Multi-Agent Research and Report Generation
import asyncio
from datetime import datetime
from smartflow import SmartflowClient, SmartflowAgent
class ResearchOrchestrator:
"""Agents: Researcher → Analyst → Writer → Editor (deep mode)"""
def __init__(self, smartflow_url: str):
self.sf_url = smartflow_url
async def research_topic(self, topic: str, depth: str = "standard") -> dict:
async with SmartflowClient(self.sf_url) as sf:
timestamp = datetime.now().isoformat()
researcher = SmartflowAgent(client=sf, name="Researcher", model="gpt-4o",
system_prompt="You are a thorough research analyst. Provide structured findings.",
user_id="research_system", org_id="analytics_dept")
research_data = await researcher.chat(f"Research this topic: {topic}")
if depth == "quick":
return {"topic": topic, "timestamp": timestamp,
"report": research_data, "agents_used": ["Researcher"]}
analyst = SmartflowAgent(client=sf, name="Analyst", model="gpt-4o",
system_prompt="You are a strategic analyst. Identify patterns, risks, opportunities.",
user_id="research_system", org_id="analytics_dept")
analysis = await analyst.chat(f"Analyze and provide strategic insights:\n\n{research_data}")
writer = SmartflowAgent(client=sf, name="Writer", model="gpt-4o",
system_prompt="You are an expert business writer. Synthesize research for executives.",
user_id="research_system", org_id="analytics_dept")
report = await writer.chat(
f"Write an executive report from this research and analysis:\n\n"
f"RESEARCH:\n{research_data}\n\nANALYSIS:\n{analysis}"
)
if depth == "deep":
editor = SmartflowAgent(client=sf, name="Editor", model="gpt-4o",
system_prompt="You are a senior editor. Review for accuracy, clarity, and flow.",
temperature=0.3, user_id="research_system", org_id="analytics_dept")
report = await editor.chat(
f"Polish this report:\n\n{report}\n\nSource research:\n{research_data}")
logs = await sf.get_logs(limit=10)
return {
"topic": topic, "timestamp": timestamp, "report": report,
"agents_used": ["Researcher", "Analyst", "Writer"]
+ (["Editor"] if depth == "deep" else []),
"audit_trail": [{"timestamp": l.timestamp, "model": l.model,
"tokens": l.tokens_used, "cached": l.cached} for l in logs]
}
# asyncio.run(ResearchOrchestrator("http://smartflow:7775").research_topic("AI agents in enterprise 2026", depth="deep"))
What this demonstrates:Coordinated multi-agent workflows with specialized roles · Progressive refinement through an agent chain · Complete per-request audit trail · Organizational context tracking for behavioral analysis
Summary
| Feature | Benefit |
|---|---|
| Semantic Cache (3-tier) | 60–80% cost reduction, no external vector DB |
| ML Compliance Engine | Real-time PII protection with adaptive learning |
| Smart Routing | Latency, cost, or priority-based provider selection |
| Full Audit Trail (VAS) | Complete compliance visibility across every request |
| MCP Tool Gateway | Register and invoke external tools with shared auth and budgeting |
| A2A Agent Orchestration | Route tasks across agents with full traceability |
| Agent Builder | Production-ready conversational AI with memory and compliance |
| Workflow Orchestration | Multi-step AI pipelines with branching and error handling |
Resources & Support
- Support portal: support.langsmart.app
- Email: support@langsmart.ai
- Website: langsmart.ai
Changelog
v0.3.1 — 2026-03-05
- Fixed
claude_message()— now routes to/anthropic/v1/messages(native Anthropic path) instead of the OpenAI-compat/v1/messages - Fixed
claude_message()default model toclaude-sonnet-4-6 - Fixed
_hostURL parsing to useurlparse, correctly handling domain-only HTTPS URLs and URLs with a path component - Exported
ValidationErrorandCacheErrorfrom the top-level package - Removed all references to private internal repositories; project URLs now point to
langsmart.ai - Bumped
pyproject.tomland__init__.pyto 0.3.1; wheel rebuilt
v0.3.0
- Added
image_generation(),audio_transcription(),text_to_speech(),stream_chat(),rerank() - Extended
embeddings()withencoding_format,dimensions,input_type - New providers: Groq, Deepgram, Fireworks AI
v0.2.0
- Added
SmartflowAgentwith compliance scanning and conversation memory - Added
SmartflowWorkflowfor multi-step AI pipelines - Added
intelligent_scan(),submit_compliance_feedback() - Added
get_provider_health(),get_cache_stats(),health_comprehensive()
v0.1.0
- Initial release:
chat(),chat_completions(),embeddings(),claude_message() - VAS audit logging,
SyncSmartflowClient
2026 LangSmart, Inc. All rights reserved. Smartflow is a trademark of LangSmart, Inc.