Skip to content

Helpers API

Overview

The Helpers API provides utility functions to partition work across replicas of a CANFAR session. Containers receive REPLICA_ID and REPLICA_COUNT environment variables, and these helpers make using them simple and correct.

Practical Examples

Stripe: take every Nth item with an offset

from canfar.helpers import distributed

# Assume REPLICA_ID=2 and REPLICA_COUNT=4
# Replica 2 (1-based) will see indices 1, 5, 9, ...
items = list(range(12))
shard = list(distributed.stripe(items, replica=2, total=4))
print(shard)  # [1, 5, 9]

Chunk: contiguous chunks of roughly equal size

from canfar.helpers import distributed

# Assume 10 items, 4 replicas
items = list(range(10))
# Replica 1 gets [0,1], 2->[2,3], 3->[4,5], 4->[6,7,8,9] (last takes remainder)
print(list(distributed.chunk(items, replica=1, total=4)))
print(list(distributed.chunk(items, replica=4, total=4)))

Sparse distribution

When items < replicas, chunk assigns exactly one item to each of the first len(items) replicas, and later replicas get nothing. This avoids duplication.

Using container-provided environment variables

# Inside CANFAR container replicas, you can omit replica/total and read from env
from canfar.helpers import distributed
work = list(range(1000))
for item in distributed.chunk(work):
    process(item)

Validation and errors

  • replica must be >= 1 and <= total
  • total must be > 0

API Reference

canfar.helpers.distributed

Helper functions for distributed computing.

stripe

stripe(
    iterable: Iterable[T],
    replica: int = int(os.environ.get("REPLICA_ID", "1")),
    total: int = int(os.environ.get("REPLICA_COUNT", "1")),
) -> Iterator[T]

Returns every total-th item from the iterable with a replica-th offset.

PARAMETER DESCRIPTION
iterable

The iterable to partition.

TYPE: Iterable[T]

replica

The replica number. Defaults to int(os.environ.get("REPLICA_ID", 1)).

TYPE: int DEFAULT: int(get('REPLICA_ID', '1'))

total

The total number of replicas. Defaults to int(os.environ.get("REPLICA_COUNT", 1)).

TYPE: int DEFAULT: int(get('REPLICA_COUNT', '1'))

Examples:

>>> from canfar.helpers import distributed
>>> dataset = range(100)
>>> for data in distributed.partition(dataset, 1, 10):
        print(data)
0, 10, 20, 30, 40, 50, 60, 70, 80, 90
YIELDS DESCRIPTION
T

Iterator[T]: The replica-th partition of the iterable.

chunk

chunk(
    iterable: Iterable[T],
    replica: int = int(os.environ.get("REPLICA_ID", "1")),
    total: int = int(os.environ.get("REPLICA_COUNT", "1")),
) -> Iterator[T]

Returns the replica-th chunk of the iterable split into total chunks.

This function distributes items from an iterable across multiple replicas canfar provided container environment variables.

Distribution Behavior:

  • Standard Distribution (items >= replicas): Items are divided into roughly equal chunks, with the last replica receiving any remainder items.
  • Sparse Distribution (items < replicas): Each of the first N replicas gets exactly one item (where N = number of items), remaining replicas get empty results.
PARAMETER DESCRIPTION
iterable

The iterable to distribute across replicas.

TYPE: Iterable[T]

replica

The replica number using 1-based indexing. Must be >= 1 and <= total. Defaults to REPLICA_ID environment variable.

TYPE: int DEFAULT: int(get('REPLICA_ID', '1'))

total

The total number of replicas. Must be > 0. Defaults to REPLICA_COUNT environment variable.

TYPE: int DEFAULT: int(get('REPLICA_COUNT', '1'))

RETURNS DESCRIPTION
Iterator[T]

Iterator[T]: An iterator yielding items assigned to this replica.

RAISES DESCRIPTION
ValueError

If replica < 1 (1-based indexing expected).

ValueError

If replica > total (replica cannot exceed total replicas).

ValueError

If total <= 0 (must have at least one replica).

Note

This function is designed for use in canfar containerized environments where REPLICA_ID and REPLICA_COUNT environment variables are automatically set. The 1-based indexing matches the container environment expectations.

For optimal performance with large datasets, consider using this function with iterators rather than converting large datasets to lists beforehand.

When items < replicas, the sparse distribution ensures no replica receives an unfair share - each item goes to exactly one replica, and excess replicas receive empty results rather than duplicating data.