from alien.adapters.base import Adapter, ExecutionPlan, ResourceProfile
from alien.adapters.loader import register_adapter
class InferenceAdapter(Adapter):
"""Service-oriented execution with auto-scaling."""
def prepare(self, job_spec: Dict[str, object]) -> tuple[ResourceProfile, ExecutionPlan]:
profile = ResourceProfile(
num_gpus=int(job_spec.get("num_gpus", 1)),
min_vram_gb=int(job_spec.get("min_vram_gb", 16)),
interconnect=tuple(job_spec.get("interconnect", ("pcie",))),
scm_minutes=int(job_spec.get("scm_minutes", 60)),
features=tuple(job_spec.get("features", ()))
)
plan = ExecutionPlan(
image=job_spec.get("image", "ghcr.io/alien/inference:latest"),
command=tuple(job_spec.get("command", ("python", "-m", "server"))),
env={k: str(v) for k, v in job_spec.get("env", {}).items()},
volumes={},
strategy="service",
rendezvous={"type": "none"},
io={"mode": "stream"},
replicas=int(job_spec.get("replicas", 1)),
service_type="ClusterIP",
readiness_probe={"httpGet": {"path": "/health", "port": 8080}},
liveness_probe={"httpGet": {"path": "/health", "port": 8080}},
autoscaling=job_spec.get("autoscaling"),
restart_policy="Always"
)
return profile, plan
def map_metrics(self, raw: Dict[str, object]) -> Dict[str, object]:
return {"latency_p95_ms": raw.get("latency_p95_ms", 0),
"throughput_qps": raw.get("qps", 0)}
# Register your adapter globally
register_adapter("my-inference", InferenceAdapter)