Skip to content

Scheduler API Reference

Auto-generated API documentation for the scheduler module.

scheduler

Zero-dependency in-process task scheduler with cron support.

Part of zerodep: https://github.com/Oaklight/zerodep Copyright (c) 2026 Peng Ding. MIT License.

Provides a lightweight, pure-Python task scheduler that runs in a background daemon thread. Supports cron expressions, fixed-interval scheduling, one-shot tasks, per-job callbacks, and async jobs.

Quick start::

from scheduler import Scheduler, every

sched = Scheduler()

@sched.scheduled_job(every(10, "seconds"))
def heartbeat():
    print("alive")

sched.start()

Cron usage::

from scheduler import Scheduler, cron

sched = Scheduler()
sched.add_job(cleanup, cron("0 3 * * *"))  # daily at 03:00
sched.start()

Context-manager usage::

with Scheduler() as sched:
    sched.add_job(tick, every(1, "seconds"))
    time.sleep(5)

Requires Python 3.10+.

SchedulerError

Bases: Exception

Base exception for scheduler errors.

Source code in scheduler/scheduler.py
class SchedulerError(Exception):
    """Base exception for scheduler errors."""

SchedulerAlreadyRunning

Bases: SchedulerError

Raised when start() is called on an already-running scheduler.

Source code in scheduler/scheduler.py
class SchedulerAlreadyRunning(SchedulerError):
    """Raised when start() is called on an already-running scheduler."""

SchedulerNotRunning

Bases: SchedulerError

Raised when an operation requires a running scheduler.

Source code in scheduler/scheduler.py
class SchedulerNotRunning(SchedulerError):
    """Raised when an operation requires a running scheduler."""

JobNotFound

Bases: SchedulerError

Raised when a job ID is not found.

Source code in scheduler/scheduler.py
class JobNotFound(SchedulerError):
    """Raised when a job ID is not found."""

    def __init__(self, job_id: str) -> None:
        self.job_id = job_id
        super().__init__(f"Job not found: {job_id!r}")

InvalidCronExpression

Bases: SchedulerError

Raised when a cron expression cannot be parsed.

Source code in scheduler/scheduler.py
class InvalidCronExpression(SchedulerError):
    """Raised when a cron expression cannot be parsed."""

    def __init__(self, expr: str, reason: str = "") -> None:
        self.expr = expr
        msg = f"Invalid cron expression: {expr!r}"
        if reason:
            msg += f" ({reason})"
        super().__init__(msg)

CronSpec dataclass

Parsed 5-field cron specification.

Attributes:

Name Type Description
minutes frozenset[int]

Allowed minute values (0-59).

hours frozenset[int]

Allowed hour values (0-23).

days frozenset[int]

Allowed day-of-month values (1-31).

months frozenset[int]

Allowed month values (1-12).

weekdays frozenset[int]

Allowed day-of-week values (0-6, 0=Sunday).

expression str

The original expression string.

Source code in scheduler/scheduler.py
@dataclasses.dataclass(frozen=True, slots=True)
class CronSpec:
    """Parsed 5-field cron specification.

    Attributes:
        minutes: Allowed minute values (0-59).
        hours: Allowed hour values (0-23).
        days: Allowed day-of-month values (1-31).
        months: Allowed month values (1-12).
        weekdays: Allowed day-of-week values (0-6, 0=Sunday).
        expression: The original expression string.
    """

    minutes: frozenset[int]
    hours: frozenset[int]
    days: frozenset[int]
    months: frozenset[int]
    weekdays: frozenset[int]
    expression: str = ""

    def __repr__(self) -> str:
        return f"CronSpec({self.expression!r})"

IntervalTrigger

Bases: _BaseTrigger

Fire at fixed time intervals.

Parameters:

Name Type Description Default
seconds float

Interval in seconds.

required
start_time datetime | None

When to start (default: now).

None

Example::

IntervalTrigger(seconds=60)  # every minute
Source code in scheduler/scheduler.py
class IntervalTrigger(_BaseTrigger):
    """Fire at fixed time intervals.

    Args:
        seconds: Interval in seconds.
        start_time: When to start (default: now).

    Example::

        IntervalTrigger(seconds=60)  # every minute
    """

    __slots__ = ("_seconds", "_start_time")

    def __init__(self, seconds: float, *, start_time: datetime | None = None) -> None:
        if seconds <= 0:
            raise ValueError("Interval must be positive")
        self._seconds = seconds
        self._start_time = start_time

    @property
    def seconds(self) -> float:
        return self._seconds

    def next_fire_time(self, now: datetime) -> datetime:
        if self._start_time is None:
            self._start_time = now
        if now < self._start_time:
            return self._start_time
        elapsed = (now - self._start_time).total_seconds()
        periods = int(elapsed / self._seconds) + 1
        return self._start_time + timedelta(seconds=periods * self._seconds)

    def __repr__(self) -> str:
        return f"IntervalTrigger(seconds={self._seconds})"

CronTrigger

Bases: _BaseTrigger

Fire on a cron schedule.

Parameters:

Name Type Description Default
expression str

Standard 5-field cron expression.

required
tz timezone | None

Timezone for evaluation (default: local time via datetime.now()).

None

Example::

CronTrigger("30 9 * * 1-5")  # 9:30 AM weekdays
Source code in scheduler/scheduler.py
class CronTrigger(_BaseTrigger):
    """Fire on a cron schedule.

    Args:
        expression: Standard 5-field cron expression.
        tz: Timezone for evaluation (default: local time via
            ``datetime.now()``).

    Example::

        CronTrigger("30 9 * * 1-5")  # 9:30 AM weekdays
    """

    __slots__ = ("_spec", "_tz")

    def __init__(self, expression: str, *, tz: timezone | None = None) -> None:
        self._spec = parse_cron(expression)
        self._tz = tz

    @property
    def spec(self) -> CronSpec:
        return self._spec

    def next_fire_time(self, now: datetime) -> datetime:
        if self._tz is not None:
            now = now.astimezone(self._tz)
        return _cron_next_fire_time(self._spec, now)

    def __repr__(self) -> str:
        return f"CronTrigger({self._spec.expression!r})"

OnceTrigger

Bases: _BaseTrigger

Fire exactly once at a given time.

Parameters:

Name Type Description Default
run_time datetime

When to fire.

required

Example::

OnceTrigger(datetime(2026, 4, 1, 9, 0))
Source code in scheduler/scheduler.py
class OnceTrigger(_BaseTrigger):
    """Fire exactly once at a given time.

    Args:
        run_time: When to fire.

    Example::

        OnceTrigger(datetime(2026, 4, 1, 9, 0))
    """

    __slots__ = ("_run_time", "_fired")

    def __init__(self, run_time: datetime) -> None:
        self._run_time = run_time
        self._fired = False

    @property
    def run_time(self) -> datetime:
        return self._run_time

    def next_fire_time(self, now: datetime) -> datetime | None:
        if self._fired or now >= self._run_time:
            return None
        return self._run_time

    def mark_fired(self) -> None:
        self._fired = True

    def __repr__(self) -> str:
        return f"OnceTrigger({self._run_time!r})"

JobStatus

Bases: Enum

Status of a scheduled job.

Source code in scheduler/scheduler.py
class JobStatus(enum.Enum):
    """Status of a scheduled job."""

    pending = "pending"
    running = "running"
    paused = "paused"

EventType

Bases: Enum

Types of scheduler events.

Source code in scheduler/scheduler.py
class EventType(enum.Enum):
    """Types of scheduler events."""

    job_executed = "job_executed"
    job_error = "job_error"
    job_missed = "job_missed"
    job_added = "job_added"
    job_removed = "job_removed"

JobEvent dataclass

Event emitted by the scheduler.

Attributes:

Name Type Description
event_type EventType

What happened.

job_id str

The job that triggered the event.

scheduled_time datetime | None

When the job was supposed to run.

run_time datetime | None

When the job actually ran (if applicable).

return_value Any

Return value of the job function (on success).

exception BaseException | None

Exception raised (on error).

Source code in scheduler/scheduler.py
@dataclasses.dataclass
class JobEvent:
    """Event emitted by the scheduler.

    Attributes:
        event_type: What happened.
        job_id: The job that triggered the event.
        scheduled_time: When the job was supposed to run.
        run_time: When the job actually ran (if applicable).
        return_value: Return value of the job function (on success).
        exception: Exception raised (on error).
    """

    event_type: EventType
    job_id: str
    scheduled_time: datetime | None = None
    run_time: datetime | None = None
    return_value: Any = None
    exception: BaseException | None = None

Job dataclass

A scheduled job.

Attributes:

Name Type Description
id str

Unique job identifier.

name str

Human-readable name.

fn Callable[..., Any]

The callable to invoke.

trigger _BaseTrigger

When to invoke the callable.

args tuple[Any, ...]

Positional arguments for fn.

kwargs dict[str, Any]

Keyword arguments for fn.

next_run_time datetime | None

Next scheduled execution time.

status JobStatus

Current job status.

misfire_grace_time float

Seconds after next_run_time within which a late execution is still accepted.

on_success Callable[[Any], None] | None

Callback invoked with the return value on success.

on_error Callable[[BaseException], None] | None

Callback invoked with the exception on failure.

Source code in scheduler/scheduler.py
@dataclasses.dataclass
class Job:
    """A scheduled job.

    Attributes:
        id: Unique job identifier.
        name: Human-readable name.
        fn: The callable to invoke.
        trigger: When to invoke the callable.
        args: Positional arguments for *fn*.
        kwargs: Keyword arguments for *fn*.
        next_run_time: Next scheduled execution time.
        status: Current job status.
        misfire_grace_time: Seconds after ``next_run_time`` within which
            a late execution is still accepted.
        on_success: Callback invoked with the return value on success.
        on_error: Callback invoked with the exception on failure.
    """

    id: str
    name: str
    fn: Callable[..., Any]
    trigger: _BaseTrigger
    args: tuple[Any, ...] = ()
    kwargs: dict[str, Any] = dataclasses.field(default_factory=dict)
    next_run_time: datetime | None = None
    status: JobStatus = JobStatus.pending
    misfire_grace_time: float = DEFAULT_MISFIRE_GRACE_TIME
    on_success: Callable[[Any], None] | None = None
    on_error: Callable[[BaseException], None] | None = None

    def __repr__(self) -> str:
        return (
            f"<Job id={self.id!r} name={self.name!r} "
            f"trigger={self.trigger!r} status={self.status.value!r}>"
        )

Scheduler

In-process task scheduler with background thread execution.

Jobs are checked and dispatched by a daemon thread that wakes at tick_interval resolution. Supports sync and async callables.

Concurrency Model

The scheduler runs a single background daemon thread that periodically checks for due jobs via _run_loop.

  • Sync jobs: executed directly in the scheduler thread. This means long-running sync jobs block the scheduler tick.
  • Async jobs: executed via a temporary asyncio event loop created per invocation (asyncio.new_event_loop()), then closed in a finally block. The scheduler thread blocks until the coroutine completes.

Threading guarantees: - self._jobs dict access is protected by self._lock. - _get_due_jobs() snapshots due jobs under the lock. - Job execution (_execute_job) runs outside the lock. - Job status transitions (pending → running → pending) are performed under the lock. _execute_job atomically checks and sets status = running, preventing concurrent double-execution of the same job. - _reschedule mutates next_run_time under the lock. - _process_job reads next_run_time under the lock. - Listener dispatch (_emit) runs outside the lock, so listeners may observe intermediate job states.

Shutdown semantics: - self._running set to False; self._event signaled. - Background thread finishes its current tick iteration (including any in-flight job) before exiting the loop. - _thread.join() blocks the caller until the thread exits (unless wait=False). - There is no pre-emption: a running sync job will complete; a running async job's event loop will run to completion. - Listeners receive events for jobs that complete during the final tick, but no synthetic "shutdown" event is emitted.

Parameters:

Name Type Description Default
tick_interval float

How often (seconds) the scheduler checks for due jobs. Lower values give better timing accuracy at the cost of CPU.

DEFAULT_TICK_INTERVAL
daemon bool

If True (default), the background thread is a daemon and will not prevent process exit.

True

Example::

sched = Scheduler()
sched.add_job(my_func, every(10, "seconds"))
sched.start()
# ... later ...
sched.shutdown()
Source code in scheduler/scheduler.py
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
class Scheduler:
    """In-process task scheduler with background thread execution.

    Jobs are checked and dispatched by a daemon thread that wakes
    at ``tick_interval`` resolution.  Supports sync and async callables.

    Concurrency Model:
        The scheduler runs a single background daemon thread that
        periodically checks for due jobs via ``_run_loop``.

        - **Sync jobs**: executed directly in the scheduler thread.
          This means long-running sync jobs block the scheduler tick.
        - **Async jobs**: executed via a temporary ``asyncio`` event loop
          created per invocation (``asyncio.new_event_loop()``), then
          closed in a ``finally`` block.  The scheduler thread blocks
          until the coroutine completes.

        Threading guarantees:
        - ``self._jobs`` dict access is protected by ``self._lock``.
        - ``_get_due_jobs()`` snapshots due jobs under the lock.
        - Job execution (``_execute_job``) runs **outside** the lock.
        - Job status transitions (``pending → running → pending``)
          are performed under the lock.  ``_execute_job`` atomically
          checks and sets ``status = running``, preventing concurrent
          double-execution of the same job.
        - ``_reschedule`` mutates ``next_run_time`` under the lock.
        - ``_process_job`` reads ``next_run_time`` under the lock.
        - Listener dispatch (``_emit``) runs outside the lock, so
          listeners may observe intermediate job states.

        Shutdown semantics:
        - ``self._running`` set to ``False``; ``self._event`` signaled.
        - Background thread finishes its current tick iteration
          (including any in-flight job) before exiting the loop.
        - ``_thread.join()`` blocks the caller until the thread exits
          (unless ``wait=False``).
        - There is no pre-emption: a running sync job will complete;
          a running async job's event loop will run to completion.
        - Listeners receive events for jobs that complete during the
          final tick, but no synthetic "shutdown" event is emitted.

    Args:
        tick_interval: How often (seconds) the scheduler checks for
            due jobs.  Lower values give better timing accuracy at the
            cost of CPU.
        daemon: If ``True`` (default), the background thread is a daemon
            and will not prevent process exit.

    Example::

        sched = Scheduler()
        sched.add_job(my_func, every(10, "seconds"))
        sched.start()
        # ... later ...
        sched.shutdown()
    """

    def __init__(
        self,
        *,
        tick_interval: float = DEFAULT_TICK_INTERVAL,
        daemon: bool = True,
    ) -> None:
        self._tick_interval = tick_interval
        self._daemon = daemon
        self._jobs: dict[str, Job] = {}
        self._lock = threading.Lock()
        self._event = threading.Event()  # for shutdown signaling
        self._thread: threading.Thread | None = None
        self._running = False
        self._listeners: list[Callable[[JobEvent], None]] = []

    # ── Context manager ──

    def __enter__(self) -> Scheduler:
        self.start()
        return self

    def __exit__(self, *args: Any) -> None:
        self.shutdown()

    # ── Job management ──

    def add_job(
        self,
        fn: Callable[..., Any],
        trigger: _BaseTrigger,
        *,
        id: str | None = None,
        name: str | None = None,
        args: tuple[Any, ...] = (),
        kwargs: dict[str, Any] | None = None,
        misfire_grace_time: float = DEFAULT_MISFIRE_GRACE_TIME,
        on_success: Callable[[Any], None] | None = None,
        on_error: Callable[[BaseException], None] | None = None,
    ) -> Job:
        """Add a job to the scheduler.

        Args:
            fn: Callable to invoke.
            trigger: Trigger that determines when the job fires.
            id: Unique ID (auto-generated if omitted).
            name: Human-readable name (defaults to ``fn.__name__``).
            args: Positional arguments for *fn*.
            kwargs: Keyword arguments for *fn*.
            misfire_grace_time: Seconds of late execution tolerance.
            on_success: Called with the return value on success.
            on_error: Called with the exception on failure.

        Returns:
            The created ``Job``.
        """
        job_id = id or str(uuid.uuid4())[:8]
        job_name = name or getattr(fn, "__name__", repr(fn))
        now = datetime.now()

        job = Job(
            id=job_id,
            name=job_name,
            fn=fn,
            trigger=trigger,
            args=args,
            kwargs=kwargs if kwargs is not None else {},
            next_run_time=trigger.next_fire_time(now),
            misfire_grace_time=misfire_grace_time,
            on_success=on_success,
            on_error=on_error,
        )

        with self._lock:
            self._jobs[job_id] = job

        self._emit(JobEvent(EventType.job_added, job_id))
        return job

    def scheduled_job(
        self,
        trigger: _BaseTrigger,
        **kwargs: Any,
    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        """Decorator to register a function as a scheduled job.

        Args:
            trigger: Trigger for the job.
            **kwargs: Passed to ``add_job``.

        Returns:
            Decorator that registers the function and returns it unchanged.

        Example::

            @sched.scheduled_job(every(60, "seconds"))
            def periodic_task():
                ...
        """

        def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
            self.add_job(fn, trigger, **kwargs)
            return fn

        return decorator

    def remove_job(self, job_id: str) -> None:
        """Remove a job by ID.

        Args:
            job_id: The job to remove.

        Raises:
            JobNotFound: If *job_id* doesn't exist.
        """
        with self._lock:
            if job_id not in self._jobs:
                raise JobNotFound(job_id)
            del self._jobs[job_id]
        self._emit(JobEvent(EventType.job_removed, job_id))

    def get_job(self, job_id: str) -> Job | None:
        """Get a job by ID, or ``None`` if not found."""
        with self._lock:
            return self._jobs.get(job_id)

    def get_jobs(self) -> list[Job]:
        """Return a list of all jobs."""
        with self._lock:
            return list(self._jobs.values())

    def pause_job(self, job_id: str) -> None:
        """Pause a job so it won't fire until resumed.

        Args:
            job_id: The job to pause.

        Raises:
            JobNotFound: If *job_id* doesn't exist.
        """
        with self._lock:
            job = self._jobs.get(job_id)
            if job is None:
                raise JobNotFound(job_id)
            job.status = JobStatus.paused

    def resume_job(self, job_id: str) -> None:
        """Resume a paused job.

        Args:
            job_id: The job to resume.

        Raises:
            JobNotFound: If *job_id* doesn't exist.
        """
        with self._lock:
            job = self._jobs.get(job_id)
            if job is None:
                raise JobNotFound(job_id)
            job.status = JobStatus.pending
            job.next_run_time = job.trigger.next_fire_time(datetime.now())

    def run_job(self, job_id: str) -> Any:
        """Execute a job immediately, bypassing its trigger.

        This does **not** update ``next_run_time`` — the trigger's
        schedule continues independently.

        Double-execution is prevented by the status guard in
        ``_execute_job``, which atomically checks and sets
        ``job.status = running`` under the lock.

        Args:
            job_id: The job to run.

        Returns:
            The return value of the job function, or ``None`` if the
            job is already running.

        Raises:
            JobNotFound: If *job_id* doesn't exist.
        """
        with self._lock:
            job = self._jobs.get(job_id)
            if job is None:
                raise JobNotFound(job_id)

        return self._execute_job(job, datetime.now())

    # ── Event system (listeners, emission) ──

    def add_listener(self, callback: Callable[[JobEvent], None]) -> None:
        """Register a global event listener.

        Args:
            callback: Called with a ``JobEvent`` whenever an event occurs.
        """
        self._listeners.append(callback)

    def remove_listener(self, callback: Callable[[JobEvent], None]) -> None:
        """Unregister a global event listener."""
        self._listeners.remove(callback)

    def _emit(self, event: JobEvent) -> None:
        """Dispatch an event to all listeners.

        Runs outside the lock, so listeners may observe intermediate
        job states (e.g., ``status=running``).
        """
        # Note: self._listeners is a plain list; add_listener/
        # remove_listener mutate it without locking.  This is safe
        # under CPython's GIL (list.append is atomic), but not
        # guaranteed by the language spec.
        for listener in self._listeners:
            # Tier 2: best-effort observable — listener errors logged
            try:
                listener(event)
            except Exception:
                logger.exception("Error in event listener")

    # ── Lifecycle (start, main loop, shutdown) ──

    @property
    def running(self) -> bool:
        """Whether the scheduler is currently running."""
        return self._running

    def start(self) -> None:
        """Start the background scheduler thread.

        Raises:
            SchedulerAlreadyRunning: If already started.
        """
        if self._running:
            raise SchedulerAlreadyRunning("Scheduler is already running")
        self._running = True
        self._event.clear()
        self._thread = threading.Thread(
            target=self._run_loop, name="zerodep-scheduler", daemon=self._daemon
        )
        self._thread.start()

    def shutdown(self, wait: bool = True) -> None:
        """Stop the scheduler.

        Args:
            wait: If ``True``, block until the background thread exits.

        Shutdown behavior:
        - Currently running jobs: the scheduler thread finishes the
          current ``_process_job`` call (including ``_execute_job``)
          before checking ``self._running`` again.  There is no
          pre-emption — sync jobs run to completion, and async jobs'
          event loops run to completion.
        - Listener events: listeners receive ``job_executed`` /
          ``job_error`` events for any job that completes during the
          final tick.  No synthetic "scheduler_stopped" event is emitted.
        - Async job cleanup: the per-invocation event loop is always
          closed in ``_run_async_job``'s ``finally`` block, regardless
          of shutdown timing.
        """
        # Signal the run loop to stop after its current iteration
        self._running = False
        # Wake the thread from Event.wait() so it exits promptly
        self._event.set()
        if wait and self._thread is not None and self._thread.is_alive():
            self._thread.join()
        self._thread = None

    def _run_loop(self) -> None:
        """Background thread main loop.

        Note: ``self._running`` is read without a lock.  This is safe
        because it is a simple boolean flag written by the main thread
        (in ``shutdown``) and read here — worst case is one extra tick.
        """
        while self._running:
            now = datetime.now()
            due_jobs = self._get_due_jobs(now)
            for job in due_jobs:
                self._process_job(job, now)
            self._event.wait(timeout=self._tick_interval)

    def _get_due_jobs(self, now: datetime) -> list[Job]:
        """Return jobs whose next_run_time <= now and are not paused."""
        with self._lock:
            return [
                job
                for job in self._jobs.values()
                if (
                    job.status == JobStatus.pending
                    and job.next_run_time is not None
                    and job.next_run_time <= now
                )
            ]

    def _process_job(self, job: Job, now: datetime) -> None:
        """Check misfire and execute or skip a due job.

        Note: *job* was snapshotted from ``_get_due_jobs`` under lock,
        but is processed here without the lock held.  This is intentional
        to avoid holding the lock during potentially long job execution.
        """
        with self._lock:
            scheduled = job.next_run_time
        if scheduled is None:
            return

        late = (now - scheduled).total_seconds()
        if late > job.misfire_grace_time:
            # Missed — skip and reschedule
            self._emit(JobEvent(EventType.job_missed, job.id, scheduled_time=scheduled))
            logger.debug(
                "Job %r missed (late by %.1fs > grace %.1fs)",
                job.id,
                late,
                job.misfire_grace_time,
            )
        else:
            try:
                self._execute_job(job, now)
            except Exception:
                pass  # already handled via on_error / event emission

        # Reschedule
        self._reschedule(job, now)

    def _execute_job(self, job: Job, now: datetime) -> Any:
        """Run a job function, handling sync and async callables.

        Acquires the lock to atomically check and set job status to
        ``running``.  If the job is already running (concurrent
        ``run_job`` or scheduler execution), the call is skipped.
        The lock is released before job execution and re-acquired
        briefly to reset the status in the ``finally`` block.
        """
        with self._lock:
            if job.status == JobStatus.running:
                logger.debug("Job %r already running, skipping", job.id)
                return None
            job.status = JobStatus.running
            scheduled = job.next_run_time

        try:
            if inspect.iscoroutinefunction(job.fn):
                result = self._run_async_job(job)
            else:
                result = job.fn(*job.args, **job.kwargs)

            self._emit(
                JobEvent(
                    EventType.job_executed,
                    job.id,
                    scheduled_time=scheduled,
                    run_time=now,
                    return_value=result,
                )
            )
            if job.on_success is not None:
                # Tier 2: best-effort observable — callback error logged
                try:
                    job.on_success(result)
                except Exception:
                    logger.exception("Error in on_success callback for job %r", job.id)

            return result

        except Exception as exc:
            self._emit(
                JobEvent(
                    EventType.job_error,
                    job.id,
                    scheduled_time=scheduled,
                    run_time=now,
                    exception=exc,
                )
            )
            if job.on_error is not None:
                # Tier 2: best-effort observable — callback error logged
                try:
                    job.on_error(exc)
                except Exception:
                    logger.exception("Error in on_error callback for job %r", job.id)
            raise

        finally:
            # Tier 1: must-succeed — job status must reset
            with self._lock:
                if job.status == JobStatus.running:
                    job.status = JobStatus.pending

    def _run_async_job(self, job: Job) -> Any:
        """Execute an async job function in a new event loop.

        A fresh event loop is created per invocation so that the
        scheduler thread (which has no running loop) can drive async
        jobs.  The loop is always closed in the ``finally`` block,
        ensuring no resource leak even on exception.
        """
        loop = asyncio.new_event_loop()
        try:
            return loop.run_until_complete(job.fn(*job.args, **job.kwargs))
        finally:
            # Tier 1: must-succeed — event loop must close
            loop.close()

    def _reschedule(self, job: Job, now: datetime) -> None:
        """Update a job's next_run_time from its trigger.

        Called from the scheduler thread after job execution.
        Trigger computation runs outside the lock; only the state
        mutation is protected.
        """
        if isinstance(job.trigger, OnceTrigger):
            with self._lock:
                job.trigger.mark_fired()
                job.next_run_time = None
            return

        nxt = job.trigger.next_fire_time(now)
        with self._lock:
            job.next_run_time = nxt

    # ── Wakeup on job add ──

    def wakeup(self) -> None:
        """Signal the scheduler thread to check for due jobs immediately."""
        self._event.set()

running property

Whether the scheduler is currently running.

add_job(fn, trigger, *, id=None, name=None, args=(), kwargs=None, misfire_grace_time=DEFAULT_MISFIRE_GRACE_TIME, on_success=None, on_error=None)

Add a job to the scheduler.

Parameters:

Name Type Description Default
fn Callable[..., Any]

Callable to invoke.

required
trigger _BaseTrigger

Trigger that determines when the job fires.

required
id str | None

Unique ID (auto-generated if omitted).

None
name str | None

Human-readable name (defaults to fn.__name__).

None
args tuple[Any, ...]

Positional arguments for fn.

()
kwargs dict[str, Any] | None

Keyword arguments for fn.

None
misfire_grace_time float

Seconds of late execution tolerance.

DEFAULT_MISFIRE_GRACE_TIME
on_success Callable[[Any], None] | None

Called with the return value on success.

None
on_error Callable[[BaseException], None] | None

Called with the exception on failure.

None

Returns:

Type Description
Job

The created Job.

Source code in scheduler/scheduler.py
def add_job(
    self,
    fn: Callable[..., Any],
    trigger: _BaseTrigger,
    *,
    id: str | None = None,
    name: str | None = None,
    args: tuple[Any, ...] = (),
    kwargs: dict[str, Any] | None = None,
    misfire_grace_time: float = DEFAULT_MISFIRE_GRACE_TIME,
    on_success: Callable[[Any], None] | None = None,
    on_error: Callable[[BaseException], None] | None = None,
) -> Job:
    """Add a job to the scheduler.

    Args:
        fn: Callable to invoke.
        trigger: Trigger that determines when the job fires.
        id: Unique ID (auto-generated if omitted).
        name: Human-readable name (defaults to ``fn.__name__``).
        args: Positional arguments for *fn*.
        kwargs: Keyword arguments for *fn*.
        misfire_grace_time: Seconds of late execution tolerance.
        on_success: Called with the return value on success.
        on_error: Called with the exception on failure.

    Returns:
        The created ``Job``.
    """
    job_id = id or str(uuid.uuid4())[:8]
    job_name = name or getattr(fn, "__name__", repr(fn))
    now = datetime.now()

    job = Job(
        id=job_id,
        name=job_name,
        fn=fn,
        trigger=trigger,
        args=args,
        kwargs=kwargs if kwargs is not None else {},
        next_run_time=trigger.next_fire_time(now),
        misfire_grace_time=misfire_grace_time,
        on_success=on_success,
        on_error=on_error,
    )

    with self._lock:
        self._jobs[job_id] = job

    self._emit(JobEvent(EventType.job_added, job_id))
    return job

scheduled_job(trigger, **kwargs)

Decorator to register a function as a scheduled job.

Parameters:

Name Type Description Default
trigger _BaseTrigger

Trigger for the job.

required
**kwargs Any

Passed to add_job.

{}

Returns:

Type Description
Callable[[Callable[..., Any]], Callable[..., Any]]

Decorator that registers the function and returns it unchanged.

Example::

@sched.scheduled_job(every(60, "seconds"))
def periodic_task():
    ...
Source code in scheduler/scheduler.py
def scheduled_job(
    self,
    trigger: _BaseTrigger,
    **kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator to register a function as a scheduled job.

    Args:
        trigger: Trigger for the job.
        **kwargs: Passed to ``add_job``.

    Returns:
        Decorator that registers the function and returns it unchanged.

    Example::

        @sched.scheduled_job(every(60, "seconds"))
        def periodic_task():
            ...
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        self.add_job(fn, trigger, **kwargs)
        return fn

    return decorator

remove_job(job_id)

Remove a job by ID.

Parameters:

Name Type Description Default
job_id str

The job to remove.

required

Raises:

Type Description
JobNotFound

If job_id doesn't exist.

Source code in scheduler/scheduler.py
def remove_job(self, job_id: str) -> None:
    """Remove a job by ID.

    Args:
        job_id: The job to remove.

    Raises:
        JobNotFound: If *job_id* doesn't exist.
    """
    with self._lock:
        if job_id not in self._jobs:
            raise JobNotFound(job_id)
        del self._jobs[job_id]
    self._emit(JobEvent(EventType.job_removed, job_id))

get_job(job_id)

Get a job by ID, or None if not found.

Source code in scheduler/scheduler.py
def get_job(self, job_id: str) -> Job | None:
    """Get a job by ID, or ``None`` if not found."""
    with self._lock:
        return self._jobs.get(job_id)

get_jobs()

Return a list of all jobs.

Source code in scheduler/scheduler.py
def get_jobs(self) -> list[Job]:
    """Return a list of all jobs."""
    with self._lock:
        return list(self._jobs.values())

pause_job(job_id)

Pause a job so it won't fire until resumed.

Parameters:

Name Type Description Default
job_id str

The job to pause.

required

Raises:

Type Description
JobNotFound

If job_id doesn't exist.

Source code in scheduler/scheduler.py
def pause_job(self, job_id: str) -> None:
    """Pause a job so it won't fire until resumed.

    Args:
        job_id: The job to pause.

    Raises:
        JobNotFound: If *job_id* doesn't exist.
    """
    with self._lock:
        job = self._jobs.get(job_id)
        if job is None:
            raise JobNotFound(job_id)
        job.status = JobStatus.paused

resume_job(job_id)

Resume a paused job.

Parameters:

Name Type Description Default
job_id str

The job to resume.

required

Raises:

Type Description
JobNotFound

If job_id doesn't exist.

Source code in scheduler/scheduler.py
def resume_job(self, job_id: str) -> None:
    """Resume a paused job.

    Args:
        job_id: The job to resume.

    Raises:
        JobNotFound: If *job_id* doesn't exist.
    """
    with self._lock:
        job = self._jobs.get(job_id)
        if job is None:
            raise JobNotFound(job_id)
        job.status = JobStatus.pending
        job.next_run_time = job.trigger.next_fire_time(datetime.now())

run_job(job_id)

Execute a job immediately, bypassing its trigger.

This does not update next_run_time — the trigger's schedule continues independently.

Double-execution is prevented by the status guard in _execute_job, which atomically checks and sets job.status = running under the lock.

Parameters:

Name Type Description Default
job_id str

The job to run.

required

Returns:

Type Description
Any

The return value of the job function, or None if the

Any

job is already running.

Raises:

Type Description
JobNotFound

If job_id doesn't exist.

Source code in scheduler/scheduler.py
def run_job(self, job_id: str) -> Any:
    """Execute a job immediately, bypassing its trigger.

    This does **not** update ``next_run_time`` — the trigger's
    schedule continues independently.

    Double-execution is prevented by the status guard in
    ``_execute_job``, which atomically checks and sets
    ``job.status = running`` under the lock.

    Args:
        job_id: The job to run.

    Returns:
        The return value of the job function, or ``None`` if the
        job is already running.

    Raises:
        JobNotFound: If *job_id* doesn't exist.
    """
    with self._lock:
        job = self._jobs.get(job_id)
        if job is None:
            raise JobNotFound(job_id)

    return self._execute_job(job, datetime.now())

add_listener(callback)

Register a global event listener.

Parameters:

Name Type Description Default
callback Callable[[JobEvent], None]

Called with a JobEvent whenever an event occurs.

required
Source code in scheduler/scheduler.py
def add_listener(self, callback: Callable[[JobEvent], None]) -> None:
    """Register a global event listener.

    Args:
        callback: Called with a ``JobEvent`` whenever an event occurs.
    """
    self._listeners.append(callback)

remove_listener(callback)

Unregister a global event listener.

Source code in scheduler/scheduler.py
def remove_listener(self, callback: Callable[[JobEvent], None]) -> None:
    """Unregister a global event listener."""
    self._listeners.remove(callback)

start()

Start the background scheduler thread.

Raises:

Type Description
SchedulerAlreadyRunning

If already started.

Source code in scheduler/scheduler.py
def start(self) -> None:
    """Start the background scheduler thread.

    Raises:
        SchedulerAlreadyRunning: If already started.
    """
    if self._running:
        raise SchedulerAlreadyRunning("Scheduler is already running")
    self._running = True
    self._event.clear()
    self._thread = threading.Thread(
        target=self._run_loop, name="zerodep-scheduler", daemon=self._daemon
    )
    self._thread.start()

shutdown(wait=True)

Stop the scheduler.

Parameters:

Name Type Description Default
wait bool

If True, block until the background thread exits.

True

Shutdown behavior: - Currently running jobs: the scheduler thread finishes the current _process_job call (including _execute_job) before checking self._running again. There is no pre-emption — sync jobs run to completion, and async jobs' event loops run to completion. - Listener events: listeners receive job_executed / job_error events for any job that completes during the final tick. No synthetic "scheduler_stopped" event is emitted. - Async job cleanup: the per-invocation event loop is always closed in _run_async_job's finally block, regardless of shutdown timing.

Source code in scheduler/scheduler.py
def shutdown(self, wait: bool = True) -> None:
    """Stop the scheduler.

    Args:
        wait: If ``True``, block until the background thread exits.

    Shutdown behavior:
    - Currently running jobs: the scheduler thread finishes the
      current ``_process_job`` call (including ``_execute_job``)
      before checking ``self._running`` again.  There is no
      pre-emption — sync jobs run to completion, and async jobs'
      event loops run to completion.
    - Listener events: listeners receive ``job_executed`` /
      ``job_error`` events for any job that completes during the
      final tick.  No synthetic "scheduler_stopped" event is emitted.
    - Async job cleanup: the per-invocation event loop is always
      closed in ``_run_async_job``'s ``finally`` block, regardless
      of shutdown timing.
    """
    # Signal the run loop to stop after its current iteration
    self._running = False
    # Wake the thread from Event.wait() so it exits promptly
    self._event.set()
    if wait and self._thread is not None and self._thread.is_alive():
        self._thread.join()
    self._thread = None

wakeup()

Signal the scheduler thread to check for due jobs immediately.

Source code in scheduler/scheduler.py
def wakeup(self) -> None:
    """Signal the scheduler thread to check for due jobs immediately."""
    self._event.set()

parse_cron(expr)

Parse a 5-field cron expression.

Supports standard cron syntax::

┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12 or jan-dec)
│ │ │ │ ┌───────────── day of week (0-6, Sun=0, or sun-sat)
│ │ │ │ │
* * * * *

Parameters:

Name Type Description Default
expr str

Cron expression string (5 space-separated fields).

required

Returns:

Type Description
CronSpec

A CronSpec with parsed field sets.

Raises:

Type Description
InvalidCronExpression

On invalid syntax.

Source code in scheduler/scheduler.py
def parse_cron(expr: str) -> CronSpec:
    """Parse a 5-field cron expression.

    Supports standard cron syntax::

        ┌───────────── minute (0-59)
        │ ┌───────────── hour (0-23)
        │ │ ┌───────────── day of month (1-31)
        │ │ │ ┌───────────── month (1-12 or jan-dec)
        │ │ │ │ ┌───────────── day of week (0-6, Sun=0, or sun-sat)
        │ │ │ │ │
        * * * * *

    Args:
        expr: Cron expression string (5 space-separated fields).

    Returns:
        A ``CronSpec`` with parsed field sets.

    Raises:
        InvalidCronExpression: On invalid syntax.
    """
    fields = expr.strip().split()
    if len(fields) != 5:
        raise InvalidCronExpression(expr, f"expected 5 fields, got {len(fields)}")

    parsed = []
    for field, (lo, hi) in zip(fields, _CRON_FIELD_RANGES):
        parsed.append(_parse_cron_field(field, lo, hi))

    return CronSpec(
        minutes=parsed[0],
        hours=parsed[1],
        days=parsed[2],
        months=parsed[3],
        weekdays=parsed[4],
        expression=expr,
    )

every(interval, unit='seconds')

Create an interval trigger with human-friendly units.

Parameters:

Name Type Description Default
interval float

Numeric interval value.

required
unit str

One of "seconds", "minutes", "hours".

'seconds'

Returns:

Type Description
IntervalTrigger

An IntervalTrigger.

Example::

every(30, "seconds")
every(5, "minutes")
every(1, "hours")
Source code in scheduler/scheduler.py
def every(interval: float, unit: str = "seconds") -> IntervalTrigger:
    """Create an interval trigger with human-friendly units.

    Args:
        interval: Numeric interval value.
        unit: One of ``"seconds"``, ``"minutes"``, ``"hours"``.

    Returns:
        An ``IntervalTrigger``.

    Example::

        every(30, "seconds")
        every(5, "minutes")
        every(1, "hours")
    """
    multipliers = {"seconds": 1, "minutes": 60, "hours": 3600}
    # Also accept singular forms
    multipliers.update({"second": 1, "minute": 60, "hour": 3600})
    unit_lower = unit.lower()
    if unit_lower not in multipliers:
        raise ValueError(
            f"Unknown unit: {unit!r}. Expected one of: {', '.join(sorted(multipliers))}"
        )
    return IntervalTrigger(interval * multipliers[unit_lower])

cron(expression, **kwargs)

Create a cron trigger from an expression string.

Shorthand for CronTrigger(expression, **kwargs).

Parameters:

Name Type Description Default
expression str

5-field cron expression.

required
**kwargs Any

Passed to CronTrigger.

{}

Returns:

Type Description
CronTrigger

A CronTrigger.

Source code in scheduler/scheduler.py
def cron(expression: str, **kwargs: Any) -> CronTrigger:
    """Create a cron trigger from an expression string.

    Shorthand for ``CronTrigger(expression, **kwargs)``.

    Args:
        expression: 5-field cron expression.
        **kwargs: Passed to ``CronTrigger``.

    Returns:
        A ``CronTrigger``.
    """
    return CronTrigger(expression, **kwargs)

once(run_time)

Create a one-shot trigger.

Shorthand for OnceTrigger(run_time).

Parameters:

Name Type Description Default
run_time datetime

When to fire.

required

Returns:

Type Description
OnceTrigger

An OnceTrigger.

Source code in scheduler/scheduler.py
def once(run_time: datetime) -> OnceTrigger:
    """Create a one-shot trigger.

    Shorthand for ``OnceTrigger(run_time)``.

    Args:
        run_time: When to fire.

    Returns:
        An ``OnceTrigger``.
    """
    return OnceTrigger(run_time)