Source code for stilt.execution.entrypoints

from __future__ import annotations

import time
from contextlib import AbstractContextManager
from typing import TYPE_CHECKING, Protocol, runtime_checkable

from stilt.errors import ConfigValidationError

from .execute import execute_batch, execute_task
from .tasks import plan_simulation_task

if TYPE_CHECKING:
    from stilt import Model
    from stilt.execution.tasks import SimulationResult


class _Claim(Protocol):
    """Private execution-only claim surface used by pull workers."""

    sim_id: str

    def record(self, result: SimulationResult) -> None: ...


@runtime_checkable
class _ClaimCapableIndex(Protocol):
    """Private execution-only capability for pull-worker indexes."""

    def claim_one(self) -> AbstractContextManager[_Claim | None]: ...


[docs] def push_simulations( model: Model, sim_ids: list[str], n_cores: int = 1, *, skip_existing: bool | None = None, ) -> None: """Execute an assigned list of simulations without claim polling.""" if sim_ids: execute_batch( [ plan_simulation_task( model, sim_id, skip_existing=skip_existing, ) for sim_id in sim_ids ], n_cores=n_cores, )
[docs] def pull_simulations( model: Model, follow: bool = False, poll_interval: float = 10.0, *, skip_existing: bool | None = None, ) -> None: """Drain pending simulations through atomic index claims.""" if not isinstance(model.index, _ClaimCapableIndex): raise ConfigValidationError( "Pull-mode workers require a claim-capable index backend. " "Configure PostgreSQL via PYSTILT_DB_URL." ) idle_sleep = max(poll_interval, 0.1) max_idle_sleep = min(60.0, max(idle_sleep, poll_interval * 8)) while True: with model.index.claim_one() as claim: if claim is None: if follow: time.sleep(idle_sleep) idle_sleep = min(idle_sleep * 2.0, max_idle_sleep) continue return idle_sleep = max(poll_interval, 0.1) result = execute_task( plan_simulation_task( model, claim.sim_id, skip_existing=skip_existing, ) ) claim.record(result)