Files

149 lines
5.1 KiB
Python

"""
AutoRetrieveFilesFromAReposito Agent — auto-generated by Agent Factory.
"""
import asyncio
import logging
import os
import click
import httpx
import uvicorn
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_openai import AzureChatOpenAI
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from app.config import (
AGENT_SELF_URL,
AZURE_OPENAI_API_KEY,
AZURE_OPENAI_API_VERSION,
AZURE_OPENAI_DEPLOYMENT,
AZURE_OPENAI_ENDPOINT,
LOG_LEVEL,
REGISTRY_URL,
)
from app.skills import AGENT_CONFIG, AUTO_RETRIEVE_FILES_FROM_A_REPOSITO_SKILLS
from app.workflows.auto_retrieve_files_from_a_reposito_workflow import create_auto_retrieve_files_from_a_reposito_workflow
load_dotenv()
# ── Logging ──────────────────────────────────────────────────────────────
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
# ── LLM ──────────────────────────────────────────────────────────────────
rate_limiter = InMemoryRateLimiter(
requests_per_second=10 / 60,
check_every_n_seconds=0.1,
max_bucket_size=10,
)
llm = AzureChatOpenAI(
temperature=0,
azure_deployment=AZURE_OPENAI_DEPLOYMENT,
api_version=AZURE_OPENAI_API_VERSION,
azure_endpoint=AZURE_OPENAI_ENDPOINT or "",
api_key=AZURE_OPENAI_API_KEY or "",
max_retries=5,
timeout=120,
rate_limiter=rate_limiter,
)
workflow = create_auto_retrieve_files_from_a_reposito_workflow(llm)
# ── Endpoints ────────────────────────────────────────────────────────────
async def health_check(request: Request) -> JSONResponse:
return JSONResponse({"status": "healthy", "agent": 'AutoRetrieveFilesFromAReposito'})
async def agent_manifest(request: Request) -> JSONResponse:
"""GET /.well-known/agent.json"""
return JSONResponse({
"name": AGENT_CONFIG["name"],
"version": AGENT_CONFIG["version"],
"description": AGENT_CONFIG["description"],
"url": "/",
"skills": [
{
"id": s.id,
"name": s.name,
"description": s.description,
"tags": s.tags,
"inputSchema": AGENT_CONFIG.get("input_schema"),
"outputSchema": AGENT_CONFIG.get("output_schema"),
} for s in AUTO_RETRIEVE_FILES_FROM_A_REPOSITO_SKILLS
],
"capabilities": AGENT_CONFIG["capabilities"],
})
async def process_endpoint(request: Request) -> JSONResponse:
"""POST /process — run the agent workflow."""
try:
body = await request.json()
result = await workflow.ainvoke(body)
return JSONResponse(result)
except Exception as exc:
logger.error("Processing failed: %s", exc, exc_info=True)
return JSONResponse({"error": str(exc)}, status_code=500)
# ── Self-registration ────────────────────────────────────────────────────
async def _register_with_registry():
if not REGISTRY_URL:
logger.info("REGISTRY_URL not set — skipping self-registration")
return
await asyncio.sleep(2)
url = f"{REGISTRY_URL.rstrip('/')}/agents/register-url"
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(url, json={"endpoint": AGENT_SELF_URL})
if resp.status_code in (200, 201):
logger.info("Self-registered with agent-registry at %s", REGISTRY_URL)
return
logger.warning("Registration attempt %d: HTTP %d", attempt + 1, resp.status_code)
except Exception as exc:
logger.warning("Registration attempt %d failed: %s", attempt + 1, exc)
await asyncio.sleep(5)
logger.error("Failed to self-register after 3 attempts")
@asynccontextmanager
async def lifespan(app):
task = asyncio.create_task(_register_with_registry())
yield
task.cancel()
app = Starlette(
routes=[
Route("/health", methods=["GET"], endpoint=health_check),
Route("/.well-known/agent.json", methods=["GET"], endpoint=agent_manifest),
Route("/process", methods=["POST"], endpoint=process_endpoint),
],
lifespan=lifespan,
)
@click.command()
@click.option("--host", default="0.0.0.0")
@click.option("--port", default=8080, type=int)
def main(host: str, port: int):
uvicorn.run(app, host=host, port=port, log_level=LOG_LEVEL.lower())
if __name__ == "__main__":
main()