Source code for stilt.execution.entrypoints
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from stilt.errors import ConfigValidationError
from .execute import execute_batch, execute_task
from .tasks import plan_simulation_task
if TYPE_CHECKING:
from stilt import Model
[docs]
def push_simulations(
model: Model,
sim_ids: list[str],
n_cores: int = 1,
*,
skip_existing: bool | None = None,
) -> None:
"""Execute an assigned list of simulations without claim polling."""
if sim_ids:
execute_batch(
[
plan_simulation_task(
model,
sim_id,
skip_existing=skip_existing,
)
for sim_id in sim_ids
],
n_cores=n_cores,
)
[docs]
def pull_simulations(
model: Model,
follow: bool = False,
poll_interval: float = 10.0,
*,
skip_existing: bool | None = None,
) -> None:
"""Drain the work queue through atomic claims."""
queue = model.queue
if queue is None:
raise ConfigValidationError(
"Pull-mode workers require a Postgres work queue. "
"Configure it via PYSTILT_DB_URL."
)
idle_sleep = max(poll_interval, 0.1)
max_idle_sleep = min(60.0, max(idle_sleep, poll_interval * 8))
while True:
with queue.claim_one() as claim:
if claim is None:
if follow:
time.sleep(idle_sleep)
idle_sleep = min(idle_sleep * 2.0, max_idle_sleep)
continue
return
idle_sleep = max(poll_interval, 0.1)
result = execute_task(
plan_simulation_task(
model,
claim.sim_id,
skip_existing=skip_existing,
)
)
claim.record(result)