What is Smartflow?

Smartflow is an enterprise AI orchestration layer that sits between your application and AI providers (OpenAI, Anthropic, Google, and others). It provides:

YOUR APPLICATION │ pip install smartflow-sdk │ ┌────────▼────────┐ │ Smartflow SDK │ └────────┬────────┘ │ ┌────────▼──────────────────────┐ │ SMARTFLOW PROXY │ │ MetaCache · ML Compliance │ │ Routing · VAS Logging │ │ MCP Gateway · A2A Gateway │ └────────┬──────────────────────┘ │ ┌─────┼─────┬─────┬──────┐ ▼ ▼ ▼ ▼ ▼ OpenAI Anthropic Gemini Cohere Local

Installation

pip install smartflow-sdk

From the wheel distributed by your LangSmart account team:

pip install smartflow_sdk-0.3.1-py3-none-any.whl

Optional — sync client in async environments:

pip install nest_asyncio

Quick Start

Async (Recommended)

import asyncio
from smartflow import SmartflowClient

async def main():
    async with SmartflowClient("http://your-smartflow:7775") as sf:
        # Automatic caching, compliance, failover, and audit on every call.
        response = await sf.chat("Explain quantum computing in simple terms")
        print(response)

asyncio.run(main())

Synchronous — Scripts and Notebooks

from smartflow import SyncSmartflowClient

sf = SyncSmartflowClient("http://your-smartflow:7775")

response = sf.chat("What is machine learning?")
print(response)

stats = sf.get_cache_stats()
print(f"Cache hit rate: {stats.hit_rate:.1%}")
print(f"Tokens saved: {stats.tokens_saved:,}")

sf.close()

For Jupyter notebooks with an existing event loop:

import nest_asyncio
nest_asyncio.apply()

OpenAI Drop-in Replacement

Zero code changes required — just update the base URL:

from openai import OpenAI

# Before: client = OpenAI()
# After: through Smartflow — caching, compliance, logging apply transparently

client = OpenAI(
    base_url="http://your-smartflow:7775/v1",
    api_key="sk-sf-your-virtual-key"
)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}]
)

SmartflowClient

Primary async client.

SmartflowClient( base_url: str, # Proxy URL, e.g. "http://smartflow:7775" api_key: Optional[str] = None, # Virtual key (sk-sf-...) timeout: float = 30.0, # Request timeout in seconds management_port: int = 7778, # Management API port compliance_port: int = 7777, # Compliance API port bridge_port: int = 3500, # Hybrid bridge port )
# As a context manager (recommended)
async with SmartflowClient("http://smartflow:7775", api_key="sk-sf-...") as sf:
    ...

# Manual lifecycle
sf = SmartflowClient("http://smartflow:7775")
await sf._ensure_client()
# ... use sf ...
await sf.close()

chat()

Send a message, receive the reply as a plain string.

chat(message, model="gpt-4o", system_prompt=None, temperature=0.7, max_tokens=None, **kwargs) → str
async with SmartflowClient("http://smartflow:7775") as sf:

    response = await sf.chat("Explain Docker containers")

    response = await sf.chat(
        message="Write a Python function to sort a list",
        model="gpt-4o",
        system_prompt="You are an expert Python developer.",
        temperature=0.3,
        max_tokens=1000,
    )

chat_completions()

Full OpenAI-compatible completions. Returns a structured AIResponse.

chat_completions(messages, model="gpt-4o", temperature=0.7, max_tokens=None, stream=False, **kwargs) → AIResponse
response = await sf.chat_completions(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is REST API?"},
    ],
    model="gpt-4o",
)

print(response.content)
print(f"Tokens used: {response.usage.total_tokens}")
print(f"Cached: {response.cached}")  # True if served from MetaCache

stream_chat()

Async generator that yields text delta strings as they stream.

stream_chat(message, model="gpt-4o", system_prompt=None, temperature=0.7, max_tokens=None, **kwargs) → AsyncIterator[str]
async for chunk in sf.stream_chat("Tell me a story about a robot"):
    print(chunk, end="", flush=True)
print()

claude_message()

Send a message to Claude using the native Anthropic Messages API path. The proxy injects the API key automatically.

claude_message(message, model="claude-sonnet-4-6", max_tokens=1024, system=None, anthropic_key=None) → str
response = await sf.claude_message(
    message="Analyze this code for security vulnerabilities",
    model="claude-sonnet-4-6",
    max_tokens=2000,
    system="You are a senior security engineer.",
)

Routes to /anthropic/v1/messages. For multi-turn or multimodal use, call chat_completions() with the Claude model name.

embeddings()

Generate vector embeddings. Supports batch input and provider routing.

embeddings(input, model="text-embedding-3-small", encoding_format="float", dimensions=None, input_type=None, **kwargs) → Dict
# Single text
result = await sf.embeddings("Hello, world!")
vector = result["data"][0]["embedding"]

# Batch
result = await sf.embeddings(["First document", "Second document", "Third document"])
vectors = [item["embedding"] for item in result["data"]]

# Cohere with input_type
result = await sf.embeddings(
    ["search query", "document text"],
    model="cohere/embed-english-v3.0",
    input_type="search_document",
)

# Reduce dimensions (OpenAI text-embedding-3+)
result = await sf.embeddings("Hello", model="text-embedding-3-large", dimensions=256)

image_generation()

image_generation(prompt, model="dall-e-3", n=1, size="1024x1024", quality=None, response_format="url", style=None, **kwargs) → Dict
result = await sf.image_generation(
    "A futuristic city at sunrise",
    model="dall-e-3",
    size="1792x1024",
    quality="hd",
    style="vivid",
)
print(result["data"][0]["url"])

audio_transcription()

Transcribe audio. Accepts a file-like object.

audio_transcription(file, model="whisper-1", language=None, prompt=None, response_format="json", temperature=0.0, filename="audio.mp3", **kwargs) → Dict
with open("recording.mp3", "rb") as f:
    result = await sf.audio_transcription(f, model="whisper-1")
print(result["text"])

# Groq (faster)
with open("recording.mp3", "rb") as f:
    result = await sf.audio_transcription(f, model="groq/whisper-large-v3")

text_to_speech()

Synthesize speech. Returns raw audio bytes.

text_to_speech(input, model="tts-1", voice="alloy", response_format="mp3", speed=1.0, **kwargs) → bytes
audio = await sf.text_to_speech("Hello, this is Smartflow.", voice="nova")
with open("output.mp3", "wb") as f:
    f.write(audio)

rerank()

Rerank documents by relevance to a query.

rerank(query, documents, model="rerank-english-v3.0", top_n=None, **kwargs) → Dict
result = await sf.rerank(
    "What is the return policy?",
    ["We accept returns within 30 days.", "Contact support@example.com."],
    top_n=1,
)

list_models()

models = await sf.list_models()
for m in models:
    print(m["id"])

chatbot_query()

Query Smartflow's built-in system chatbot for operational information using natural language.

chatbot_query(query: str) → Dict
result = await sf.chatbot_query("show me today's cache stats")
print(result["response"])

result = await sf.chatbot_query("which provider had the most errors this week?")
result = await sf.chatbot_query("what did we spend on OpenAI yesterday?")

Provider Prefix Reference

All methods that accept a model parameter support provider prefix routing. The three primary providers are auto-detected from model name.

Automatic Detection (no prefix needed)

# OpenAI — gpt-*, o1-*, o3-*, chatgpt-*, whisper-*, dall-e-*
reply = await sf.chat("Hello", model="gpt-4o")
reply = await sf.chat("Hello", model="gpt-4o-mini")
reply = await sf.chat("Hello", model="o3-mini")

# Anthropic — claude-*
reply = await sf.chat("Hello", model="claude-sonnet-4-6")
reply = await sf.chat("Hello", model="claude-3-opus-20240229")

# Google Gemini — gemini-*
reply = await sf.chat("Hello", model="gemini-1.5-pro")
reply = await sf.chat("Hello", model="gemini-2.0-flash")

Explicit Prefix Required

reply = await sf.chat("Hello", model="xai/grok-2-latest")
reply = await sf.chat("Hello", model="mistral/mistral-large-latest")
reply = await sf.chat("Hello", model="cohere/command-r-plus")
reply = await sf.chat("Hello", model="groq/llama-3.1-70b-versatile")
reply = await sf.chat("Hello", model="openrouter/meta-llama/llama-3.1-405b")
reply = await sf.chat("Hello", model="ollama/llama3.2")
reply = await sf.chat("Hello", model="azure/my-gpt4o-deployment")
PrefixProviderAPI Key Env Var
(none)OpenAIOPENAI_API_KEY
anthropic/AnthropicANTHROPIC_API_KEY
xai/xAI (Grok)XAI_API_KEY
gemini/Google GeminiGEMINI_API_KEY
vertex_ai/Google Vertex AIVERTEXAI_API_KEY
openrouter/OpenRouterOPENROUTER_API_KEY
azure/Azure OpenAIAZURE_API_KEY + AZURE_API_BASE
mistral/Mistral AIMISTRAL_API_KEY
cohere/CohereCOHERE_API_KEY
nvidia_nim/NVIDIA NIMNVIDIA_NIM_API_KEY
huggingface/HuggingFaceHUGGINGFACE_API_KEY
groq/GroqGROQ_API_KEY
deepgram/DeepgramDEEPGRAM_API_KEY
fireworks/Fireworks AIFIREWORKS_API_KEY
novita/Novita AINOVITA_API_KEY
together/Together AITOGETHER_API_KEY
perplexity/Perplexity AIPERPLEXITY_API_KEY
replicate/ReplicateREPLICATE_API_KEY
vercel_ai_gateway/Vercel AI GatewayVERCEL_AI_GATEWAY_API_KEY
ollama/Ollama (local)none required

Intelligent Compliance Engine

Smartflow's ML-powered compliance engine goes beyond regex. It learns and adapts based on user behavior and organizational baselines.

┌─────────────────────────────────────────────────────────────┐ │ INTELLIGENT COMPLIANCE │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ LAYER 1 │ │ LAYER 2 │ │ LAYER 3 │ │ │ │ Regex/Rules │→ │ ML Embeddings│→ │ Behavioral │ │ │ │ │ │ │ │ Analysis │ │ │ │ SSN │ │ Semantic │ │ User │ │ │ │ Credit Card │ │ similarity │ │ patterns │ │ │ │ Email │ │ Context │ │ Org │ │ │ │ Phone │ │ awareness │ │ baselines │ │ │ │ MRN │ │ Learned │ │ Anomaly │ │ │ │ Passport │ │ patterns │ │ detection │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ └──────────────┬───────────────┘ │ │ ▼ │ │ ┌──────────────────┐ │ │ │ CORRELATION │ │ │ │ ENGINE │ │ │ │ Composite Risk │ │ │ │ Score + Action │ │ │ └──────────────────┘ │ │ ▼ │ │ Allow | AllowAndLog | Review | Block │ └─────────────────────────────────────────────────────────────┘

intelligent_scan()

intelligent_scan(content, user_id=None, org_id=None, context=None) → IntelligentScanResult
result = await sf.intelligent_scan(
    content="Please send payment to card 4111-1111-1111-1111",
)

print(f"Has violations: {result.has_violations}")
print(f"Risk score: {result.risk_score:.2f}")            # 0.0 – 1.0
print(f"Action: {result.recommended_action}")            # Allow/AllowAndLog/Block/Review
print(f"Explanation: {result.explanation}")

for v in result.regex_violations:
    print(f"  - {v['violation_type']}: {v['severity']}")

Enable behavioral analysis:

result = await sf.intelligent_scan(
    content="Customer email: john.doe@example.com",
    user_id="support_agent_42",    # Track individual behavior
    org_id="acme_corporation",     # Compare against org baseline
    context="customer_support",
)

check_compliance()

check_compliance(content, policy="enterprise_standard") → ComplianceResult
result = await sf.check_compliance("My SSN is 123-45-6789")
if result.has_violations:
    print(f"Risk: {result.risk_level}")
    print(f"PII: {result.pii_detected}")
    print(f"Safe text: {result.redacted_content}")

redact_pii()

safe = await sf.redact_pii("Call me at 555-867-5309, email john@example.com")
# "Call me at [PHONE], email [EMAIL]"

submit_compliance_feedback()

Submit a true/false-positive correction to retrain the ML model.

submit_compliance_feedback(scan_id, is_false_positive, user_id=None, notes=None) → Dict
# Store the scan response dict to get the scan_id
response = await sf._post(
    f"{sf.compliance_url}/api/compliance/intelligent/scan",
    {"content": "Call me at 555-0100"}
)
scan_id = response.get("scan_id")

if scan_id:
    await sf.submit_compliance_feedback(
        scan_id=scan_id,
        is_false_positive=True,
        user_id="admin_user",
        notes="555-0100 is a known test number, not real PII",
    )

Learning & ML Methods

# Organization-wide learning progress
summary = await sf.get_learning_summary()
print(f"Total users tracked: {summary.total_users}")
print(f"Users with complete baselines: {summary.users_learning_complete}")
print(f"Learning period: {summary.config_learning_days} days")

# Per-user learning status
status = await sf.get_learning_status("user-alice")
print(f"Progress: {status.progress_percent}%")
print(f"Complete: {status.learning_complete}")

# ML engine statistics
ml_stats = await sf.get_ml_stats()
print(f"Total patterns: {ml_stats.total_patterns}")
print(f"Learned patterns: {ml_stats.learned_patterns}")
print(f"Average confidence: {ml_stats.average_confidence:.2f}")

# Org baseline (for anomaly detection)
baseline = await sf.get_org_baseline("acme-corp")
print(f"Violation rate: {baseline.violation_rate:.2%}")
MethodReturnsDescription
get_org_summary()DictOrganization-level aggregate compliance stats
get_org_baseline(org_id)OrgBaselineBehavioral baseline used for anomaly detection
get_persistence_stats()PersistenceStatsRedis persistence stats for compliance data
save_compliance_data()DictTrigger manual flush of compliance data to Redis
get_intelligent_health()DictHealth status of ML engine and embedding service

MCP Tool Invocation

MCP tool calls are direct HTTP requests to the proxy. Use httpx or any HTTP client.

Calling a Tool

import httpx

async with httpx.AsyncClient() as client:
    response = await client.post(
        "http://smartflow:7775/github-tools/mcp/",
        headers={
            "Authorization": "Bearer sk-sf-...",
            "Content-Type": "application/json",
        },
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tools/call",
            "params": {
                "name": "create_issue",
                "arguments": {
                    "repo": "my-org/my-repo",
                    "title": "Bug: login fails on mobile",
                    "body": "Steps to reproduce...",
                }
            }
        }
    )
    result = response.json()
    print(result["result"]["content"])

Searching the Tool Catalog

async with httpx.AsyncClient() as client:
    r = await client.get(
        "http://smartflow:7775/api/mcp/tools/search",
        params={"q": "create github issue", "k": 3},
        headers={"Authorization": "Bearer sk-sf-..."},
    )
    for tool in r.json()["results"]:
        print(f"{tool['server_id']}.{tool['name']}: {tool['description']}")

MCP Usage and Cost

async with httpx.AsyncClient() as client:
    r = await client.get(
        "http://smartflow:7775/api/mcp/usage",
        headers={"Authorization": "Bearer sk-sf-..."},
    )

A2A Agent Invocation

A2A tasks are HTTP POST requests to the proxy. The proxy forwards to the registered agent, logs the exchange, and returns the result.

Sending a Task

import httpx

async with httpx.AsyncClient() as client:
    response = await client.post(
        "http://smartflow:7775/a2a/summarizer-agent",
        headers={
            "Authorization": "Bearer sk-sf-...",
            "Content-Type": "application/json",
            "x-a2a-trace-id": "trace-abc-123",  # correlate across agents
        },
        json={
            "id": "task-uuid-001",
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Summarise the Q4 earnings report."}]
            }
        }
    )
    result = response.json()
    print(result["result"]["parts"][0]["text"])

Agent Capability Card

async with httpx.AsyncClient() as client:
    r = await client.get(
        "http://smartflow:7775/a2a/summarizer-agent/.well-known/agent.json",
        headers={"Authorization": "Bearer sk-sf-..."},
    )
    card = r.json()
    print(card["name"], card["capabilities"])

The x-a2a-trace-id header is passed through all hops in a multi-agent chain so every agent's logs can be correlated by a single trace ID.

Cache Performance

stats = await sf.get_cache_stats()

print(f"Hit Rate:      {stats.hit_rate:.1%}")
print(f"L1 (Memory):   {stats.l1_hits:,} hits")
print(f"L2 (Semantic): {stats.l2_hits:,} hits")
print(f"L3 (Exact):    {stats.l3_hits:,} hits")
print(f"Tokens saved:  {stats.tokens_saved:,}")
print(f"Cost saved:    ${stats.cost_saved_cents / 100:.2f}")

Provider Health

providers = await sf.get_provider_health()
for p in providers:
    print(f"{p.provider}")
    print(f"  Status:   {p.status}")
    print(f"  Latency:  {p.latency_ms:.0f}ms")
    print(f"  Success:  {p.success_rate:.1%}")
    print(f"  Requests: {p.requests_total:,}")

System Health

# Full system health
health = await sf.health_comprehensive()
print(health.status)          # "healthy" | "degraded" | "unhealthy"
print(health.uptime_seconds)
print(health.version)

# Quick liveness check
status = await sf.health()
assert status["status"] == "ok"

Audit Logs (VAS)

get_logs(limit=50, provider=None) → List[VASLog]
logs = await sf.get_logs(limit=10, provider="openai")
for log in logs:
    cached_badge = "[CACHED]" if log.cached else ""
    print(f"[{log.timestamp}] {log.provider}/{log.model} {cached_badge}")
    print(f"  Tokens: {log.tokens_used} | Latency: {log.latency_ms:.0f}ms")

get_logs_hybrid()

Unified audit log across all Smartflow instances via the hybrid bridge.

all_logs = await sf.get_logs_hybrid(limit=500)
for log in all_logs:
    print(f"{log['timestamp']} | {log['provider']} | {log['model']}")

Analytics

get_analytics(start_date=None, end_date=None) → Dict
data = await sf.get_analytics(
    start_date="2026-02-01",
    end_date="2026-02-19",
)

Routing Control

# Current routing state
status = await sf.get_routing_status()

# Force to a specific provider (e.g. during an outage)
await sf.force_provider("openai", duration_seconds=600)

SmartflowAgent

Higher-level agent with conversation memory, compliance scanning, and tool support.

from smartflow import SmartflowClient, SmartflowAgent

async with SmartflowClient("http://smartflow:7775") as sf:

    agent = SmartflowAgent(
        client=sf,
        name="TechSupport",
        model="gpt-4o",
        system_prompt="""You are a senior technical support engineer.
Guidelines:
- Be patient and thorough
- Ask clarifying questions when needed
- Provide step-by-step solutions
- Never ask for or repeat sensitive information""",
        temperature=0.7,
        compliance_policy="enterprise_standard",
        enable_compliance_scan=True,
        user_id="support_session_123",
        org_id="tech_company",
    )

    print(await agent.chat("My application keeps crashing"))
    print(await agent.chat("It's a Python web app using Flask"))
    print(await agent.chat("Here's the error: MemoryError"))

    print(f"Messages exchanged: {agent.message_count}")
    agent.clear_history()
Method / PropertyDescription
chat(message, scan_input=True, scan_output=True)Send message; raises ComplianceError if blocked
clear_history()Reset conversation, preserve system prompt
get_history()Return copy of message history
message_countNumber of messages in history

SmartflowWorkflow

Chain AI operations with branching logic.

from smartflow import SmartflowClient, SmartflowWorkflow

async with SmartflowClient("http://smartflow:7775") as sf:

    workflow = SmartflowWorkflow(sf, name="ContentPipeline")

    workflow.add_step(
        name="analyze",
        action="chat",
        config={"prompt": "Analyze the tone of: {input}", "model": "gpt-4o-mini"},
        next_steps=["compliance_check"],
    )

    workflow.add_step(
        name="compliance_check",
        action="compliance_check",
        config={"content": "{input}"},
        next_steps=["route"],
    )

    workflow.add_step(
        name="route",
        action="condition",
        config={
            "field": "output",
            "cases": {"positive": "enhance", "negative": "review", "neutral": "publish"},
        },
    )

    result = await workflow.execute({"input": "This product exceeded my expectations!"})

    print(f"Success: {result.success}")
    print(f"Path: {' -> '.join(result.steps_executed)}")
    print(f"Time: {result.execution_time_ms:.0f}ms")
    print(f"Tokens: {result.total_tokens}")
Step ActionConfig fieldsDescription
"chat"prompt, model, temperatureChat completion; {input} / {output} are template variables
"compliance_check"contentRule-based compliance scan
"condition"field, cases, defaultBranch on a context value

SyncSmartflowClient

Synchronous wrapper. Every async method is available without await.

from smartflow import SyncSmartflowClient

sf = SyncSmartflowClient("http://smartflow:7775", api_key="sk-sf-...")

reply      = sf.chat("Hello!")
emb        = sf.embeddings("Hello", model="text-embedding-3-small")
img        = sf.image_generation("A sunset", model="dall-e-3")
transcript = sf.audio_transcription(open("audio.mp3", "rb"), model="whisper-1")
audio      = sf.text_to_speech("Hello!", voice="nova")
ranked     = sf.rerank("What is the return policy?", ["doc1", "doc2"])
stats      = sf.get_cache_stats()
logs       = sf.get_logs(limit=20)

sf.close()

Configuration

Client Options

sf = SmartflowClient(
    base_url="http://smartflow:7775",   # Proxy endpoint
    api_key="sk-sf-...",                # Virtual key for authentication
    timeout=30.0,                       # Request timeout in seconds
    management_port=7778,               # Health, metrics, routing API
    compliance_port=7777,               # Compliance API
    bridge_port=3500,                   # Hybrid bridge (cross-instance logs)
)

From Environment Variables

import os
from smartflow import SmartflowClient

sf = SmartflowClient(
    base_url=os.environ["SMARTFLOW_URL"],
    api_key=os.environ.get("SMARTFLOW_API_KEY"),
)

Error Handling

from smartflow import (
    SmartflowClient,
    SmartflowError,
    ConnectionError,
    ComplianceError,
    RateLimitError,
    TimeoutError,
)
import asyncio

try:
    async with SmartflowClient("http://smartflow:7775") as sf:
        response = await sf.chat("Hello!")

except ConnectionError:
    print("Cannot connect to Smartflow proxy")

except ComplianceError as e:
    print(f"Blocked by compliance policy: {e}")

except RateLimitError:
    print("Rate limited — backing off")
    await asyncio.sleep(60)

except TimeoutError:
    print("Request timed out")

except SmartflowError as e:
    print(f"Smartflow error: {e}")
ExceptionCondition
SmartflowErrorBase class for all SDK errors
ConnectionErrorCannot connect to proxy
AuthenticationError401 — invalid or missing key
RateLimitError429 — rate limit hit
ComplianceError403 — request blocked by compliance policy
ProviderErrorUpstream provider error
TimeoutErrorRequest timeout

Response Types

AIResponse

FieldTypeDescription
contentstrFirst choice text
choiceslistFull choices array
usageUsageToken usage (prompt_tokens, completion_tokens, total_tokens)
modelstrModel used
idstrResponse ID
cachedboolTrue if served from MetaCache
cache_hit_typestr"exact", "semantic", or None
providerstrProvider that served the request

CacheStats

FieldType
hit_ratefloat
hits / missesint
l1_hits / l2_hits / l3_hitsint
tokens_savedint
cost_saved_centsint
entriesint

ComplianceResult

FieldType
has_violationsbool
compliance_scorefloat
violationslist[str]
pii_detectedlist[str]
risk_levelstr"low" / "medium" / "high" / "critical"
recommendationslist[str]
redacted_contentstr | None

IntelligentScanResult

FieldType
has_violationsbool
risk_scorefloat— 0.0 to 1.0
recommended_actionstr"Allow" / "AllowAndLog" / "Review" / "Block"
explanationstr
regex_violationslist
ml_violationslist
behavior_deviationslist
processing_time_usint

VASLog

FieldType
request_idstr
timestampstr
providerstr
modelstr
tokens_usedint
latency_msfloat
cachedbool
complianceComplianceResult | None

ProviderHealth

FieldType
providerstr
statusstr"healthy" / "degraded" / "unhealthy"
latency_msfloat
success_ratefloat
error_ratefloat
requests_totalint
last_updatedstr

SystemHealth

FieldType
statusstr
uptime_secondsint
versionstr
providersdict
cachedict
timestampstr

WorkflowResult

FieldType
successbool
outputstr
steps_executedlist[str]
errorslist
total_tokensint
total_cost_centsint
execution_time_msfloat

Use Case 1: Secure Customer Support Bot

Challenge: Build a customer support chatbot that handles sensitive information while maintaining PCI-DSS and GDPR compliance.
import asyncio
from smartflow import SmartflowClient, SmartflowAgent

class SecureCustomerSupportBot:
    def __init__(self, smartflow_url: str):
        self.sf_url = smartflow_url

    async def handle_customer_session(self, customer_id: str, organization: str):
        async with SmartflowClient(self.sf_url) as sf:
            agent = SmartflowAgent(
                client=sf,
                name="SecureSupport",
                model="gpt-4o",
                system_prompt="""You are a helpful customer support agent.

CRITICAL RULES:
1. NEVER ask customers for full credit card numbers, SSNs, or passwords
2. If a customer shares sensitive info, acknowledge receipt but do not repeat it
3. For account verification, use last 4 digits only
4. Always offer secure channels for sensitive transactions""",
                compliance_policy="pci_dss_strict",
                enable_compliance_scan=True,
                user_id=f"customer_{customer_id}",
                org_id=organization,
            )

            while True:
                user_input = input("Customer: ")
                if user_input.lower() == "quit":
                    break
                try:
                    response = await agent.chat(user_input)
                    print(f"Support: {response}")
                except Exception as e:
                    if "compliance" in str(e).lower():
                        print("Support: For your protection, please use our secure verification process.")

# asyncio.run(SecureCustomerSupportBot("http://smartflow:7775").handle_customer_session("12345", "fintech_corp"))

What this demonstrates:PII detection blocks sensitive data before it reaches the AI provider · Behavioral tracking learns normal patterns per customer · Complete audit trail for compliance audits · Graceful handling of compliance violations

Use Case 2: Cost-Optimized Content Generation Pipeline

Challenge: Generate thousands of product descriptions daily while minimizing API costs.
import asyncio
from dataclasses import dataclass
from typing import List
from smartflow import SmartflowClient

@dataclass
class Product:
    id: str
    name: str
    category: str
    features: List[str]
    price: float

class ContentGenerationPipeline:
    def __init__(self, smartflow_url: str):
        self.sf_url = smartflow_url

    async def generate_description(self, sf, product: Product) -> dict:
        # Structured prompt to maximize semantic cache hits across similar products
        prompt = f"""Write a compelling product description.

Category: {product.category}
Product: {product.name}
Key Features: {', '.join(product.features)}
Price: ${product.price:.2f}

Requirements: 2-3 sentences, highlight key benefits, call-to-action, professional tone"""

        response = await sf.chat_completions(
            messages=[{"role": "user", "content": prompt}],
            model="gpt-4o-mini",
            temperature=0.7,
        )
        return {"product_id": product.id, "description": response.content,
                "cached": response.cached, "tokens": response.usage.total_tokens}

    async def process_catalog(self, products: List[Product]) -> dict:
        async with SmartflowClient(self.sf_url) as sf:
            initial_stats = await sf.get_cache_stats()
            results = []
            cached_count = 0
            total_tokens = 0

            for i, product in enumerate(products):
                result = await self.generate_description(sf, product)
                results.append(result)
                if result["cached"]:
                    cached_count += 1
                total_tokens += result["tokens"]

            final_stats = await sf.get_cache_stats()
            cost_saved = final_stats.cost_saved_cents - initial_stats.cost_saved_cents
            cache_hit_rate = cached_count / len(products) if products else 0

            return {
                "results": results,
                "summary": {
                    "total_products": len(products),
                    "cache_hit_rate": f"{cache_hit_rate:.1%}",
                    "tokens_used": total_tokens,
                    "cost_saved": f"${cost_saved / 100:.2f}",
                },
            }

# asyncio.run(ContentGenerationPipeline("http://smartflow:7775").process_catalog([...]))

What this demonstrates:Semantic caching recognizes similar products and reuses responses · Structured prompts maximize cache hit potential · Real-time cost tracking viacost_saved_cents

Use Case 3: Multi-Agent Research and Report Generation

Challenge: Coordinate multiple specialized AI agents to produce a polished, auditable research report.
import asyncio
from datetime import datetime
from smartflow import SmartflowClient, SmartflowAgent

class ResearchOrchestrator:
    """Agents: Researcher → Analyst → Writer → Editor (deep mode)"""

    def __init__(self, smartflow_url: str):
        self.sf_url = smartflow_url

    async def research_topic(self, topic: str, depth: str = "standard") -> dict:
        async with SmartflowClient(self.sf_url) as sf:
            timestamp = datetime.now().isoformat()

            researcher = SmartflowAgent(client=sf, name="Researcher", model="gpt-4o",
                system_prompt="You are a thorough research analyst. Provide structured findings.",
                user_id="research_system", org_id="analytics_dept")

            research_data = await researcher.chat(f"Research this topic: {topic}")

            if depth == "quick":
                return {"topic": topic, "timestamp": timestamp,
                        "report": research_data, "agents_used": ["Researcher"]}

            analyst = SmartflowAgent(client=sf, name="Analyst", model="gpt-4o",
                system_prompt="You are a strategic analyst. Identify patterns, risks, opportunities.",
                user_id="research_system", org_id="analytics_dept")

            analysis = await analyst.chat(f"Analyze and provide strategic insights:\n\n{research_data}")

            writer = SmartflowAgent(client=sf, name="Writer", model="gpt-4o",
                system_prompt="You are an expert business writer. Synthesize research for executives.",
                user_id="research_system", org_id="analytics_dept")

            report = await writer.chat(
                f"Write an executive report from this research and analysis:\n\n"
                f"RESEARCH:\n{research_data}\n\nANALYSIS:\n{analysis}"
            )

            if depth == "deep":
                editor = SmartflowAgent(client=sf, name="Editor", model="gpt-4o",
                    system_prompt="You are a senior editor. Review for accuracy, clarity, and flow.",
                    temperature=0.3, user_id="research_system", org_id="analytics_dept")
                report = await editor.chat(
                    f"Polish this report:\n\n{report}\n\nSource research:\n{research_data}")

            logs = await sf.get_logs(limit=10)
            return {
                "topic": topic, "timestamp": timestamp, "report": report,
                "agents_used": ["Researcher", "Analyst", "Writer"]
                    + (["Editor"] if depth == "deep" else []),
                "audit_trail": [{"timestamp": l.timestamp, "model": l.model,
                    "tokens": l.tokens_used, "cached": l.cached} for l in logs]
            }

# asyncio.run(ResearchOrchestrator("http://smartflow:7775").research_topic("AI agents in enterprise 2026", depth="deep"))

What this demonstrates:Coordinated multi-agent workflows with specialized roles · Progressive refinement through an agent chain · Complete per-request audit trail · Organizational context tracking for behavioral analysis

Summary

FeatureBenefit
Semantic Cache (3-tier)60–80% cost reduction, no external vector DB
ML Compliance EngineReal-time PII protection with adaptive learning
Smart RoutingLatency, cost, or priority-based provider selection
Full Audit Trail (VAS)Complete compliance visibility across every request
MCP Tool GatewayRegister and invoke external tools with shared auth and budgeting
A2A Agent OrchestrationRoute tasks across agents with full traceability
Agent BuilderProduction-ready conversational AI with memory and compliance
Workflow OrchestrationMulti-step AI pipelines with branching and error handling

Resources & Support

Changelog

v0.3.1 — 2026-03-05

v0.3.0

v0.2.0

v0.1.0


2026 LangSmart, Inc. All rights reserved. Smartflow is a trademark of LangSmart, Inc.