Source code for stilt.execution.backends.kubernetes

"""Kubernetes execution backend."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from .protocol import DispatchMode

from stilt.service.kubernetes import (
    service_name,
    worker_job_manifest,
)


class KubernetesHandle:
    """Handle for one Kubernetes Job created by :class:`KubernetesExecutor`."""

    def __init__(self, name: str, namespace: str) -> None:
        self._name = name
        self._namespace = namespace
        self._completed = False

    @property
    def job_id(self) -> str:
        """Return the Kubernetes resource identifier for the launched workers."""
        return f"job/{self._name}"

    def wait(self) -> None:
        """Poll the Job until completions are satisfied."""
        if self._completed:
            return
        import time

        try:
            from kubernetes import client as k8s_client
            from kubernetes import config as k8s_cfg
        except ImportError as exc:
            raise ImportError(
                "KubernetesExecutor requires the kubernetes package. "
                "Install it with: pip install 'pystilt[cloud]'"
            ) from exc

        try:
            k8s_cfg.load_incluster_config()
        except k8s_cfg.ConfigException:
            k8s_cfg.load_kube_config()

        batch_v1 = k8s_client.BatchV1Api()
        while True:
            job = batch_v1.read_namespaced_job(self._name, self._namespace)
            status = job.status  # type: ignore[union-attr]
            if (status.succeeded or 0) + (status.failed or 0) >= (
                job.spec.completions or 1  # type: ignore[union-attr]
            ):
                break
            time.sleep(15)
        self._completed = True


[docs] class KubernetesExecutor: """Deploy batch-mode STILT pull workers as Kubernetes Jobs.""" dispatch: DispatchMode = "pull" def __init__( self, image: str, namespace: str = "default", n_workers: int = 1, db_secret: str = "pystilt-db", **pod_spec: Any, ) -> None: self._image = image self._namespace = namespace self._n_workers = n_workers self._db_secret = db_secret self._pod_spec = pod_spec @property def n_workers(self) -> int: """Return the default Job parallelism for this executor.""" return self._n_workers
[docs] @classmethod def from_config(cls, config: dict[str, Any]) -> KubernetesExecutor: """Build a Kubernetes executor from ``ModelConfig.execution`` values.""" cfg = dict(config) cfg.pop("backend", None) return cls( image=cfg.pop("image"), namespace=cfg.pop("namespace", "default"), n_workers=cfg.pop("n_workers", 1), db_secret=cfg.pop("db_secret", "pystilt-db"), **cfg, )
def _apply(self, manifest: dict) -> None: """Create one Job manifest, tolerating already-exists conflicts.""" try: from kubernetes import client as k8s_client from kubernetes import config as k8s_cfg from kubernetes.client.rest import ApiException except ImportError as exc: raise ImportError( "KubernetesExecutor requires the kubernetes package. " "Install it with: pip install 'pystilt[cloud]'" ) from exc try: k8s_cfg.load_incluster_config() except k8s_cfg.ConfigException: k8s_cfg.load_kube_config() ns = manifest["metadata"]["namespace"] try: k8s_client.BatchV1Api().create_namespaced_job(ns, manifest) except ApiException as exc: if exc.status != 409: raise
[docs] def start( self, pending: list[str], *, project: str, n_workers: int | None = None, output_dir: str | None = None, compute_root: str | None = None, skip_existing: bool | None = None, ) -> KubernetesHandle: """Create the worker Job and return its handle.""" name = service_name(project) n = n_workers if n_workers is not None else self._n_workers manifest = worker_job_manifest( project, image=self._image, n_workers=n, namespace=self._namespace, output_dir=output_dir, compute_root=compute_root, db_secret=self._db_secret, pod_spec=self._pod_spec, ) manifest["metadata"] = {"name": name, "namespace": self._namespace} self._apply(manifest) return KubernetesHandle(name, self._namespace)