agent-factory: generate agent auto-parse-email-content-and-extrac

This commit is contained in:
2026-04-07 20:54:30 +00:00
parent 9ac8be2c02
commit 58e98ad062
18 changed files with 549 additions and 0 deletions

0
app/__init__.py Normal file
View File

148
app/agent.py Normal file
View File

@@ -0,0 +1,148 @@
"""
AutoParseEmailContentAndExtrac Agent — auto-generated by Agent Factory.
"""
import asyncio
import logging
import os
import click
import httpx
import uvicorn
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_openai import AzureChatOpenAI
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from app.config import (
AGENT_SELF_URL,
AZURE_OPENAI_API_KEY,
AZURE_OPENAI_API_VERSION,
AZURE_OPENAI_DEPLOYMENT,
AZURE_OPENAI_ENDPOINT,
LOG_LEVEL,
REGISTRY_URL,
)
from app.skills import AGENT_CONFIG, AUTO_PARSE_EMAIL_CONTENT_AND_EXTRAC_SKILLS
from app.workflows.auto_parse_email_content_and_extrac_workflow import create_auto_parse_email_content_and_extrac_workflow
load_dotenv()
# ── Logging ──────────────────────────────────────────────────────────────
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
# ── LLM ──────────────────────────────────────────────────────────────────
rate_limiter = InMemoryRateLimiter(
requests_per_second=10 / 60,
check_every_n_seconds=0.1,
max_bucket_size=10,
)
llm = AzureChatOpenAI(
temperature=0,
azure_deployment=AZURE_OPENAI_DEPLOYMENT,
api_version=AZURE_OPENAI_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT or "",
api_key=AZURE_OPENAI_API_KEY or "",
max_retries=5,
timeout=120,
rate_limiter=rate_limiter,
)
workflow = create_auto_parse_email_content_and_extrac_workflow(llm)
# ── Endpoints ────────────────────────────────────────────────────────────
async def health_check(request: Request) -> JSONResponse:
return JSONResponse({"status": "healthy", "agent": 'AutoParseEmailContentAndExtrac'})
async def agent_manifest(request: Request) -> JSONResponse:
"""GET /.well-known/agent.json"""
return JSONResponse({
"name": AGENT_CONFIG["name"],
"version": AGENT_CONFIG["version"],
"description": AGENT_CONFIG["description"],
"url": "/",
"skills": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"tags": s.tags,
"inputSchema": AGENT_CONFIG.get("input_schema"),
"outputSchema": AGENT_CONFIG.get("output_schema"),
} for s in AUTO_PARSE_EMAIL_CONTENT_AND_EXTRAC_SKILLS
],
"capabilities": AGENT_CONFIG["capabilities"],
})
async def process_endpoint(request: Request) -> JSONResponse:
"""POST /process — run the agent workflow."""
try:
body = await request.json()
result = await workflow.ainvoke(body)
return JSONResponse(result)
except Exception as exc:
logger.error("Processing failed: %s", exc, exc_info=True)
return JSONResponse({"error": str(exc)}, status_code=500)
# ── Self-registration ────────────────────────────────────────────────────
async def _register_with_registry():
if not REGISTRY_URL:
logger.info("REGISTRY_URL not set — skipping self-registration")
return
await asyncio.sleep(2)
url = f"{REGISTRY_URL.rstrip('/')}/agents/register-url"
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url, json={"endpoint": AGENT_SELF_URL})
if resp.status_code in (200, 201):
logger.info("Self-registered with agent-registry at %s", REGISTRY_URL)
return
logger.warning("Registration attempt %d: HTTP %d", attempt + 1, resp.status_code)
except Exception as exc:
logger.warning("Registration attempt %d failed: %s", attempt + 1, exc)
await asyncio.sleep(5)
logger.error("Failed to self-register after 3 attempts")
@asynccontextmanager
async def lifespan(app):
task = asyncio.create_task(_register_with_registry())
yield
task.cancel()
app = Starlette(
routes=[
Route("/health", methods=["GET"], endpoint=health_check),
Route("/.well-known/agent.json", methods=["GET"], endpoint=agent_manifest),
Route("/process", methods=["POST"], endpoint=process_endpoint),
],
lifespan=lifespan,
)
@click.command()
@click.option("--host", default="0.0.0.0")
@click.option("--port", default=8080, type=int)
def main(host: str, port: int):
uvicorn.run(app, host=host, port=port, log_level=LOG_LEVEL.lower())
if __name__ == "__main__":
main()

21
app/config.py Normal file
View File

@@ -0,0 +1,21 @@
"""
Configuration for the AutoParseEmailContentAndExtrac agent.
"""
import os
# ── Azure OpenAI ─────────────────────────────────────────────────────────
AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY")
AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-08-01-preview")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o")
# ── Agent Registry ───────────────────────────────────────────────────────
REGISTRY_URL = os.getenv(
"REGISTRY_URL", "http://agent-gateway.agents.svc.cluster.local"
)
AGENT_SELF_URL = os.getenv(
"AGENT_SELF_URL", "http://auto-parse-email-content-and-extrac.agents.svc.cluster.local"
)
# ── Logging ──────────────────────────────────────────────────────────────
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")

0
app/nodes/__init__.py Normal file
View File

41
app/nodes/core_node.py Normal file
View File

@@ -0,0 +1,41 @@
"""
This module defines a LangGraph node function that parses raw email content
and extracts the sender's email address, the email subject, and the email body.
The function uses an LLM to perform the extraction and returns the parsed
information in a structured format.
"""
async def process(state: dict) -> dict:
from app.agent import llm
from langchain_core.messages import SystemMessage, HumanMessage
try:
email_raw = state.get("email_raw", "")
if not email_raw:
return {"error": "Missing 'email_raw' input", "phase": "failed"}
messages = [
SystemMessage(content="You are an expert in parsing email content. Extract the sender's email address, the subject, and the body from the provided raw email text."),
HumanMessage(content=email_raw),
]
response = await llm.ainvoke(messages)
# Assuming the LLM returns a structured response in the format:
# "Sender: <email>\nSubject: <subject>\nBody: <body>"
try:
lines = response.content.split("\n")
sender_email = next((line.split(": ", 1)[1] for line in lines if line.startswith("Sender:")), "").strip()
email_subject = next((line.split(": ", 1)[1] for line in lines if line.startswith("Subject:")), "").strip()
email_body = next((line.split(": ", 1)[1] for line in lines if line.startswith("Body:")), "").strip()
return {
"sender_email": sender_email,
"email_subject": email_subject,
"email_body": email_body,
"phase": "complete"
}
except Exception as parse_exc:
return {"error": f"Failed to parse LLM response: {str(parse_exc)}", "phase": "failed"}
except Exception as exc:
return {"error": str(exc), "phase": "failed"}

29
app/skills.py Normal file
View File

@@ -0,0 +1,29 @@
"""
A2A skill declarations for AutoParseEmailContentAndExtrac.
"""
from a2a.types import AgentSkill
AUTO_PARSE_EMAIL_CONTENT_AND_EXTRAC_SKILLS = [
AgentSkill(
id="auto_parse_email_content_and_extrac_skill",
name="AutoParseEmailContentAndExtrac",
description="Parse email content and extract sender, subject, and body",
tags=["auto-generated"],
examples=[],
),
]
AGENT_CONFIG = {
"name": "AutoParseEmailContentAndExtrac",
"description": "Extract relevant fields from incoming support email",
"version": "1.0.0",
"framework": "LangGraph + Starlette",
"capabilities": {
"streaming": False,
"async": True,
},
"input_schema": {"email_raw": "string"},
"output_schema": {"sender_email": "string", "email_subject": "string", "email_body": "string"},
}

0
app/states/__init__.py Normal file
View File

View File

@@ -0,0 +1,17 @@
"""
State definitions for AutoParseEmailContentAndExtrac agent.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from langgraph.graph import MessagesState
class AutoParseEmailContentAndExtracState(MessagesState):
"""Workflow state for AutoParseEmailContentAndExtrac."""
# ── Input fields ─────────────────────────────────────────────────────
pass
# ── Output fields ────────────────────────────────────────────────────
pass
# ── Internal ─────────────────────────────────────────────────────────
error: Optional[str]
phase: str

View File

View File

@@ -0,0 +1,19 @@
"""
LangGraph workflow for AutoParseEmailContentAndExtrac agent.
"""
from langgraph.graph import StateGraph, END
from app.states.auto_parse_email_content_and_extrac_state import AutoParseEmailContentAndExtracState
from app.nodes.core_node import process
def create_auto_parse_email_content_and_extrac_workflow(llm):
"""Build and compile the AutoParseEmailContentAndExtrac workflow graph."""
graph = StateGraph(AutoParseEmailContentAndExtracState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
return graph.compile()