Skip to main content
Sifr’s concurrency modules are a native structured-concurrency substrate that compiles to Rust’s async runtime. Unlike CPython’s asyncio, there is no exposed event-loop object, no global task registry, and no fire-and-forget detachment. Every task belongs to a scope, every value that crosses a task boundary must be owned and sendable, and every error is a typed value you handle explicitly. This page is the stdlib reference for concurrency. For an introduction to the concepts — ownership at task boundaries, the structured-scope model, and how Sifr’s concurrency compares to CPython’s — see the Language / Concurrency guide.

sifr.task — Structured Tasks

sifr.task is the surface for creating, scoping, and coordinating async tasks.

Scoped Spawn

Spawn work inside a task.scope() block. The scope keeps ownership of all spawned handles and guarantees cleanup before the block exits — even under cancellation:
async def main() -> None:
    async with task.scope() as scope:
        handle_a = scope.spawn(compute_a())
        handle_b = scope.spawn(compute_b())
        result_a = await handle_a
        result_b = await handle_b
    # Both tasks have completed or been cancelled before reaching here
TaskHandle[T, E] is a linear ownership value. Awaiting or joining a handle consumes it; the compiler rejects any attempt to use the handle again.

Timeout and Deadline

Bound the runtime of any awaitable expression using task.timeout (duration-based) or task.deadline (absolute time):
from sifr.task import TaskGroup

async def fetch_with_timeout(url: str) -> bytes:
    try:
        async with task.timeout(5.0):
            return await fetch(url)
    except TimeoutError:
        return b""

TaskGroup

TaskGroup collects a set of tasks and waits for all of them, propagating the first error:
async def run_all() -> None:
    group: TaskGroup = TaskGroup()
    group.spawn(job_one())
    group.spawn(job_two())
    await group.join()

Context Propagation

Pass typed context values across task boundaries using ContextKey[T]:
from sifr.task import ContextKey, current_context, empty_context

request_id_key: ContextKey[str] = ContextKey("request_id")

async def handle() -> None:
    ctx = current_context()
    req_id: str = ctx.get(request_id_key)
    print(req_id)
CPython’s asyncio.get_event_loop(), loop.run_until_complete(), asyncio.ensure_future(), and contextvars global mutation are not available in Sifr. Use task.scope() and explicit ContextKey[T] propagation instead.

sifr.sync — Channels and Locks

sifr.sync provides same-process communication and synchronization primitives. All values that cross a task boundary must satisfy Sifr’s sendability requirements — the compiler enforces this at the call site.

Shared State

Shared[T] holds an immutable value that can be read from any task without a lock:
from sifr.sync import Shared

shared: Shared[int] = Shared(41)
print(shared.get())   # 41
For mutable shared state, wrap the value in Lock[T] and access it through a guard:
from sifr.sync import Lock

lock: Lock[int] = Lock(10)
guard = lock.lock()
print(guard.get())    # 10
# guard is released when it goes out of scope
RwLock[T] allows multiple simultaneous readers or one exclusive writer:
from sifr.sync import RwLock

readers: RwLock[int] = RwLock(20)
r1 = readers.read()
r2 = readers.read()    # multiple readers are fine
w  = readers.write()   # exclusive writer
Lock guards and semaphore permits are scoped resources. Holding a guard across an await point or returning one from a scope boundary is rejected by ownership diagnostics.

Channels

Channels are the primary way to move ownership of values between tasks. channel() creates an unbounded channel; bounded_channel(n) creates one that applies backpressure at capacity n:
from sifr.sync import ChannelSender, ChannelReceiver, ClosedError, channel, bounded_channel

# Unbounded channel
endpoints: tuple[ChannelSender[int], ChannelReceiver[int]] = channel()
sender, receiver = endpoints

sent: Result[None, ClosedError] = await sender.send(7)
received: Result[int, ClosedError] = await receiver.receive()
print(str(received))   # Ok(7)

# Clone a sender to share write access
cloned: ChannelSender[int] = sender.clone()
await cloned.send(99)
Closing a sender signals to receivers that no more values will arrive. Queued values drain normally; subsequent sends return Err(ClosedError):
await sender.send(30)
sender.close()

value: Result[int, ClosedError] = await receiver.receive()   # Ok(30)
rejected: Result[None, ClosedError] = await sender.send(40)  # Err(ClosedError)

Semaphore and Notify

Semaphore limits concurrent access to a resource. Notify is a lightweight one-shot or broadcast signal:
from sifr.sync import Semaphore, Notify

semaphore: Semaphore = Semaphore(1)
permit = await semaphore.acquire()
# ... critical section ...
# permit released when it goes out of scope

notify: Notify = Notify()
notify.notify_one()
await notify.notified()
notify.notify_all()

Full Channel Demo

The following is the complete sync-channel demo from the Sifr repository, showing unbounded channels, bounded channels with backpressure, sender close semantics, and cancellation safety:
from sifr.sync import (
    ChannelReceiver,
    ChannelSender,
    ClosedError,
    Lock,
    Notify,
    RwLock,
    Semaphore,
    Shared,
    bounded_channel,
    channel,
)


async def drain_two(own mut receiver: ChannelReceiver[int]) -> Result[int, ClosedError]:
    first: Result[int, ClosedError] = await receiver.receive()
    await task.sleep(0.0)
    second: Result[int, ClosedError] = await receiver.receive()
    assert str(first) == "Ok(1)"
    assert str(second) == "Ok(2)"
    return 2


def shared_and_guard_demo() -> None:
    shared: Shared[int] = Shared(41)
    assert shared.get() == 41

    lock: Lock[int] = Lock(10)
    guard = lock.lock()
    assert guard.get() == 10

    readers: RwLock[int] = RwLock(20)
    first_reader = readers.read()
    second_reader = readers.read()
    writer = readers.write()
    assert first_reader.get() == 20
    assert second_reader.get() == 20
    assert writer.get() == 20
    return None


async def main() -> Result[None, Error]:
    shared_and_guard_demo()

    semaphore: Semaphore = Semaphore(1)
    permit = await semaphore.acquire()

    notify: Notify = Notify()
    notify.notify_one()
    await notify.notified()
    notify.notify_all()

    # Unbounded channel
    endpoints: tuple[ChannelSender[int], ChannelReceiver[int]] = channel()
    sender, receiver = endpoints
    cloned_sender: ChannelSender[int] = sender.clone()
    sent: Result[None, ClosedError] = await cloned_sender.send(7)
    received: Result[int, ClosedError] = await receiver.receive()
    assert str(sent) == "Ok(())"
    assert str(received) == "Ok(7)"

    # Bounded channel with backpressure
    bounded: tuple[ChannelSender[int], ChannelReceiver[int]] = bounded_channel(1)
    bounded_sender, bounded_receiver = bounded
    async with task.scope() as scope:
        first_send: Result[None, ClosedError] = await bounded_sender.send(1)
        drain = scope.spawn(drain_two(bounded_receiver))
        second_send: Result[None, ClosedError] = await bounded_sender.send(2)
        drained = await drain
        assert str(first_send) == "Ok(())"
        assert str(second_send) == "Ok(())"
        assert str(drained) == "Ok(2)"

    # Sender close semantics
    closing: tuple[ChannelSender[int], ChannelReceiver[int]] = channel()
    closing_sender, closing_receiver = closing
    queued: Result[None, ClosedError] = await closing_sender.send(30)
    closing_sender.close()
    drained_value: Result[int, ClosedError] = await closing_receiver.receive()
    rejected: Result[None, ClosedError] = await closing_sender.send(40)
    assert str(queued) == "Ok(())"
    assert str(drained_value) == "Ok(30)"
    assert str(rejected) == "Err(ClosedError)"

    return None

sifr.parallel — CPU Parallelism

sifr.parallel runs CPU-heavy work across native worker threads. Use it for compute-intensive maps over large datasets when async concurrency alone is not enough:
from sifr.parallel import map, try_map, Pool, PoolConfig, WorkerError

# Parallel map — preserves output order
results: list[int] = map([1, 2, 3, 4, 5], lambda x: x * x)
# [1, 4, 9, 16, 25]

# try_map — collects typed errors per item
outcomes: list[Result[int, WorkerError]] = try_map(
    [1, 0, 3],
    lambda x: 10 // x
)
For repeated parallel work over a long-lived process, configure a Pool explicitly:
config: PoolConfig = PoolConfig(workers=4)
pool: Pool = Pool(config)

results: list[str] = pool.map(["a", "b", "c"], str.upper)
pool.close()
Values passed into and returned from pool.map must satisfy worker-boundary sendability. Non-send resources (lock guards, task handles, borrowed values) are rejected by the compiler at the call site.

sifr.signal — Shutdown Signals

React to OS signals with structured, awaitable values rather than global handler mutation:
from sifr.signal import ctrl_c, shutdown_stream, SIGINT, SIGTERM, strsignal

# Await Ctrl-C once
async def main() -> None:
    print("Running. Press Ctrl-C to stop.")
    await ctrl_c()
    print("Shutting down cleanly.")

# React to a stream of shutdown signals
async def server_main() -> None:
    stream = shutdown_stream()
    signal = await stream.next()
    print("Received:", strsignal(signal))
signal.signal(), set_wakeup_fd(), and arbitrary handler registration from CPython are not available. Signal delivery is represented as typed values or SignalError, never as process-global mutation.

sifr.process — Subprocesses

Spawn and manage child processes through sifr.process. All handles are owned resources:
from sifr.process import run, output, output_text, ProcessError

# Run and wait for exit status
try:
    status = run(["echo", "hello"])
    print(status.code)   # 0
except ProcessError as e:
    print(e.message)

# Capture stdout as bytes
try:
    out = output(["ls", "-la"])
    print(out.stdout)
except ProcessError as e:
    print(e.message)

# Capture stdout as decoded text
try:
    text_out = output_text(["cat", "/etc/hostname"])
    print(text_out)
except ProcessError as e:
    print(e.message)
subprocess.Popen is not available. Shell execution is an explicit opt-in through run_shell and output_shell, not the default path.

sifr.runtime — Runtime Diagnostics

sifr.runtime provides structured diagnostic events for observability around task, sync, process, signal, and IPC surfaces. Diagnostic emission is an explicit, typed operation — not a global logging side channel.
from sifr.runtime import DiagnosticLevel, DiagnosticEvent, diagnostic_event, emit_diagnostic, INFO, WARN, ERROR

event: DiagnosticEvent = diagnostic_event(INFO, "sifr.task", "task_spawned", "new task started")
result = emit_diagnostic(event)
emit_diagnostic returns Result[None, DiagnosticError] — call sites handle the result explicitly. Payload bytes, process command lines, environment values, and decoded IPC payloads must not be used as diagnostic messages unless an explicit redaction rule applies.
sifr.runtime diagnostics are not CPython warnings or logging global handler mutation. There is no basicConfig, no getLogger, and no handler registration. All diagnostic state is explicit and scoped.

sifr.resource — Deterministic Cleanup

sifr.resource provides nullcontext — an owned-value context manager for deterministic cleanup in structured resource patterns:
from sifr.resource import NullContext, nullcontext

# Use nullcontext to hold an optional resource in a uniform with-block pattern
ctx: NullContext[str] = nullcontext("my-value")
async with ctx as value:
    print(value)   # my-value
Language-level cleanup under task cancellation is part of Sifr’s structured runtime contract: cleanup runs before cancellation evidence is observed by the caller.
ExitStack, AsyncExitStack, closing, and aclosing are unsupported. Cleanup helpers do not provide a dynamic stack of arbitrary callbacks; all resource lifetimes are visible in the ownership graph.