Files
Scaffolder d10e2ce029
All checks were successful
Build and Push to ACR / Build and Push (push) Successful in 1m43s
initial commit
Change-Id: I3184ea3bc23ecc769c9de5fad0a0a0229e353629
2026-03-23 23:19:03 +00:00

91 lines
2.4 KiB
Python

from __future__ import annotations
from threading import Lock
from typing import Optional
from fastapi import FastAPI, HTTPException
from prometheus_fastapi_instrumentator import Instrumentator
from pydantic import BaseModel
app = FastAPI(
title="test-alex-gitops-1",
description="test-alex-gitops-1",
version="0.1.0",
)
# Expose /metrics for Prometheus scraping
Instrumentator().instrument(app).expose(app)
# ---------------------------------------------------------------------------
# Domain model
# ---------------------------------------------------------------------------
class ItemRequest(BaseModel):
name: str
description: str = ""
class Item(BaseModel):
id: int
name: str
description: str
# ---------------------------------------------------------------------------
# In-memory store (thread-safe)
# ---------------------------------------------------------------------------
_store: dict[int, Item] = {}
_counter = 0
_lock = Lock()
def _next_id() -> int:
global _counter
with _lock:
_counter += 1
return _counter
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/health", tags=["observability"])
def health() -> dict:
return {"status": "UP"}
@app.get("/api/items", response_model=list[Item], tags=["items"])
def list_items():
return list(_store.values())
@app.post("/api/items", response_model=Item, status_code=201, tags=["items"])
def create_item(body: ItemRequest):
item = Item(id=_next_id(), name=body.name, description=body.description)
_store[item.id] = item
return item
@app.get("/api/items/{item_id}", response_model=Item, tags=["items"])
def get_item(item_id: int):
item = _store.get(item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.put("/api/items/{item_id}", response_model=Item, tags=["items"])
def update_item(item_id: int, body: ItemRequest):
item = _store.get(item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
updated = Item(id=item_id, name=body.name, description=body.description)
_store[item_id] = updated
return updated
@app.delete("/api/items/{item_id}", tags=["items"])
def delete_item(item_id: int):
_store.pop(item_id, None)
return {"deleted": item_id}