Skip to content

Asynchronous Sessions

Overview

canfar supports asynchronous sessions using the AsyncSession class while maintaining 1-to-1 compatibility with the Session class.

Bases: HTTPClient

Asynchronous CANFAR Session Management Client.

This class provides methods to manage sessions in the system, including fetching session details, creating new sessions, retrieving logs, and destroying existing sessions.

This class is a subclass of the HTTPClient class and inherits its attributes and methods.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession(
        server="https://something.example.com",
        version="v1",
        token="token",
        timeout=30,
        concurrency=100,
        loglevel=40,
    )
Source code in canfar/sessions.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
class AsyncSession(HTTPClient):
    """Asynchronous CANFAR Session Management Client.

    This class provides methods to manage sessions in the system,
    including fetching session details, creating new sessions,
    retrieving logs, and destroying existing sessions.

    This class is a subclass of the `HTTPClient` class and inherits its
    attributes and methods.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession(
                server="https://something.example.com",
                version="v1",
                token="token",
                timeout=30,
                concurrency=100,
                loglevel=40,
            )
    """

    async def fetch(
        self,
        kind: Kind | None = None,
        status: Status | None = None,
        view: View | None = None,
    ) -> list[dict[str, str]]:
        """List open sessions for the user.

        Args:
            kind (Kind | None, optional): Session kind. Defaults to None.
            status (Status | None, optional): Session status. Defaults to None.
            view (View | None, optional): Session view level. Defaults to None.

        Notes:
            By default, only the calling user's sessions are listed. If views is
            set to 'all', all user sessions are listed (with limited information).

        Returns:
            list: Sessions information.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.fetch(kind="notebook")
            [{'id': 'vl91sfzz',
            'userid': 'brars',
            'runAsUID': '166169204',
            'runAsGID': '166169204',
            'supplementalGroups': [34241,
            34337,
            35124,
            36227,
            1902365706,
            1454823273,
            1025424273],
            'appid': '<none>',
            'image': 'image-server/repo/image:version',
            'type': 'notebook',
            'status': 'Running',
            'name': 'notebook1',
            'startTime': '2025-03-05T21:48:29Z',
            'expiryTime': '2025-03-09T21:48:29Z',
            'connectURL': 'https://canfar.net/session/notebook/some/url',
            'requestedRAM': '8G',
            'requestedCPUCores': '2',
            'requestedGPUCores': '0',
            'ramInUse': '<none>',
            'gpuRAMInUse': '<none>',
            'cpuCoresInUse': '<none>',
            'gpuUtilization': '<none>'}]
        """
        parameters: dict[str, Any] = build.fetch_parameters(kind, status, view)
        response: Response = await self.asynclient.get(url="session", params=parameters)
        data: list[dict[str, str]] = response.json()
        log.debug(data)
        return data

    async def stats(self) -> dict[str, Any]:
        """Get statistics for the canfar cluster.

        Returns:
            Dict[str, Any]: Cluster statistics.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.stats()
            {'instances': {
             'session': 88, 'desktopApp': 30, 'headless': 0, 'total': 118},
             'cores': {'requestedCPUCores': 377,
             'coresAvailable': 960,
             'maxCores': {'cores': 32, 'withRam': '147Gi'}},
             'ram': {'maxRAM': {'ram': '226Gi', 'withCores': 32}}}
        """
        parameters = {"view": "stats"}
        response: Response = await self.asynclient.get("session", params=parameters)
        data: dict[str, Any] = response.json()
        return data

    async def info(self, ids: list[str] | str) -> list[dict[str, Any]]:
        """Get information about session[s].

        Args:
            ids (Union[List[str], str]): Session ID[s].

        Returns:
            Dict[str, Any]: Session information.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.info(session_id="hjko98yghj")
            >>> await session.info(id=["hjko98yghj", "ikvp1jtp"])
        """
        # Convert id to list if it is a string
        if isinstance(ids, str):
            ids = [ids]
        results: list[dict[str, Any]] = []
        tasks: list[Any] = []
        semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)

        async def bounded(value: str) -> dict[str, Any]:
            async with semaphore:
                response = await self.asynclient.get(url=f"session/{value}")
                data: dict[str, Any] = response.json()
                return data

        tasks = [bounded(value) for value in ids]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        for reply in responses:
            if isinstance(reply, Exception):
                log.error(reply)
            elif isinstance(reply, dict):
                results.append(reply)
        return results

    async def logs(
        self,
        ids: list[str] | str,
        verbose: bool = False,
    ) -> dict[str, str] | None:
        """Get logs from a session[s].

        Args:
            ids (Union[List[str], str]): Session ID[s].
            verbose (bool, optional): Print logs to stdout. Defaults to False.

        Returns:
            Dict[str, str]: Logs in text/plain format.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.logs(id="hjko98yghj")
            >>> await session.logs(id=["hjko98yghj", "ikvp1jtp"])
        """
        if isinstance(ids, str):
            ids = [ids]
        parameters: dict[str, str] = {"view": "logs"}
        results: dict[str, str] = {}

        semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
        tasks: list[Any] = []

        async def bounded(value: str) -> tuple[str, str]:
            async with semaphore:
                response = await self.asynclient.get(
                    url=f"session/{value}",
                    params=parameters,
                )
                return value, response.text

        tasks = [bounded(value) for value in ids]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        for reply in responses:
            if isinstance(reply, Exception):
                log.error(reply)
            elif isinstance(reply, tuple):
                results[reply[0]] = reply[1]

        # Print logs to stdout if verbose is set to True
        if verbose:
            for key, value in results.items():
                log.info("Session ID: %s\n", key)
                log.info(value)
            return None
        return results

    async def create(
        self,
        name: str,
        image: str,
        cores: int | None = None,
        ram: int | None = None,
        kind: Kind = "headless",
        gpu: int | None = None,
        cmd: str | None = None,
        args: str | None = None,
        env: dict[str, Any] | None = None,
        replicas: int = 1,
    ) -> list[str]:
        """Launch a canfar session.

        Args:
            name (str): A unique name for the session.
            image (str): Container image to use for the session.
            cores (int, optional): Number of cores.
                Defaults to None, i.e. flexible mode.
            ram (int, optional): Amount of RAM (GB).
                Defaults to None, i.e. flexible mode.
            kind (str, optional): Type of canfar session. Defaults to "headless".
            gpu (Optional[int], optional): Number of GPUs. Defaults to None.
            cmd (Optional[str], optional): Command to run. Defaults to None.
            args (Optional[str], optional): Arguments to the command. Defaults to None.
            env (Optional[Dict[str, Any]], optional): Environment variables to inject.
                Defaults to None.
            replicas (int, optional): Number of sessions to launch. Defaults to 1.

        Notes:
            - If cores and ram are not specified, the session will be created with
              flexible resource allocation of upto 8 cores and 32GB of RAM.
            - The name of the session suffixed with the replica number. eg. test-42
              when replicas > 1.
            - Each container will have the following environment variables injected:
                * REPLICA_ID - The replica number
                * REPLICA_COUNT - The total number of replicas

        Returns:
            List[str]: A list of session IDs for the launched sessions.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> session.create(
                    name="test",
                    image='images.canfar.net/skaha/terminal:1.1.1',
                    cores=2,
                    ram=8,
                    gpu=1,
                    kind="headless",
                    cmd="env",
                    env={"TEST": "test"},
                    replicas=2,
                )
            >>> ["hjko98yghj", "ikvp1jtp"]
        """
        payloads: list[list[tuple[str, Any]]] = build.create_parameters(
            name,
            image,
            cores,
            ram,
            kind,
            gpu,
            cmd,
            args,
            env,
            replicas,
        )
        results: list[str] = []
        tasks: list[Any] = []
        semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)

        async def bounded(parameters: list[tuple[str, Any]]) -> Any:
            async with semaphore:
                log.debug("HTTP Request Parameters: %s", parameters)
                response = await self.asynclient.post(url="session", params=parameters)
                return response.text.rstrip("\r\n")

        tasks = [bounded(payload) for payload in payloads]
        msg = f"Creating {replicas} {kind} session[s]."
        log.debug(msg)
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        for reply in responses:
            if isinstance(reply, Exception):
                log.error(reply)
            elif isinstance(reply, str):
                results.append(reply)
        return results

    async def events(
        self,
        ids: str | list[str],
        verbose: bool = False,
    ) -> list[dict[str, str]] | None:
        """Get deployment events for a session[s].

        Args:
            ids (Union[str, List[str]]): Session ID[s].
            verbose (bool, optional): Print events to stdout. Defaults to False.

        Returns:
            Optional[List[Dict[str, str]]]: A list of events for the session[s].

        Notes:
            When verbose is True, the events will be printed to stdout only.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.events(id="hjko98yghj")
            >>> await session.events(id=["hjko98yghj", "ikvp1jtp"])
        """
        if isinstance(ids, str):
            ids = [ids]
        results: list[dict[str, str]] = []
        parameters: dict[str, str] = {"view": "events"}
        tasks: list[Any] = []
        semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)

        async def bounded(value: str) -> dict[str, str]:
            async with semaphore:
                response = await self.asynclient.get(
                    url=f"session/{value}",
                    params=parameters,
                )
                return {value: response.text}

        tasks = [bounded(value) for value in ids]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        for reply in responses:
            if isinstance(reply, Exception):
                log.error(reply)
            elif isinstance(reply, dict):
                results.append(dict(reply))

        if verbose and results:
            for result in results:
                for key, value in result.items():
                    log.info("Session ID: %s", key)
                    log.info(value)
        return results if not verbose else None

    async def destroy(self, ids: str | list[str]) -> dict[str, bool]:
        """Destroy session[s].

        Args:
            ids (Union[str, List[str]]): Session ID[s].

        Returns:
            Dict[str, bool]: A dictionary of session IDs
            and a bool indicating if the session was destroyed.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.destroy(id="hjko98yghj")
            >>> await session.destroy(id=["hjko98yghj", "ikvp1jtp"])
        """
        if isinstance(ids, str):
            ids = [ids]
        results: dict[str, bool] = {}
        semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
        tasks: list[Any] = []

        async def bounded(value: str) -> tuple[str, bool]:
            async with semaphore:
                try:
                    await self.asynclient.delete(url=f"session/{value}")
                except HTTPError as err:
                    msg = f"Failed to destroy session {value}: {err}"
                    log.exception(msg)
                    return value, False
                else:
                    return value, True

        tasks = [bounded(value) for value in ids]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        for reply in responses:
            if isinstance(reply, tuple):
                results[reply[0]] = reply[1]
        return results

    async def destroy_with(
        self,
        prefix: str,
        kind: Kind = "headless",
        status: Status = "Completed",
    ) -> dict[str, bool]:
        """Destroy session[s] matching search criteria.

        Args:
            prefix (str): Prefix to match in the session name.
            kind (Kind): Type of session. Defaults to "headless".
            status (Status): Status of the session. Defaults to "Completed".


        Returns:
            Dict[str, bool]: A dictionary of session IDs
            and a bool indicating if the session was destroyed.

        Notes:
            The prefix is case-sensitive.
            This method is useful for destroying multiple sessions at once.

        Examples:
            >>> from canfar.session import AsyncSession
            >>> session = AsyncSession()
            >>> await session.destroy_with(prefix="test")
            >>> await session.destroy_with(prefix="test", kind="desktop")
            >>> await session.destroy_with(prefix="car", kind="carta", status="Running")

        """
        ids: list[str] = [
            session["id"]
            for session in await self.fetch(kind=kind, status=status)
            if session["name"].startswith(prefix)
        ]
        return await self.destroy(ids)

    async def connect(self, ids: list[str] | str) -> None:
        """Connect to a session[s] in a web browser.

        Args:
            ids (Union[List[str], str]): Session ID[s].

        Examples:
            >>> from canfar.sessions import AsyncSession
            >>> session = AsyncSession()
            >>> await session.connect(id="hjko98yghj")
            >>> await session.connect(id=["hjko98yghj", "ikvp1jtp"])
        """
        if isinstance(ids, str):
            ids = [ids]
        info = await self.info(ids)
        for session in info:
            connect_url = session.get("connectURL")
            if connect_url:
                open_new_tab(connect_url)

connect async

connect(ids)

Connect to a session[s] in a web browser.

PARAMETER DESCRIPTION
ids

Session ID[s].

TYPE: Union[List[str], str]

Examples:

>>> from canfar.sessions import AsyncSession
>>> session = AsyncSession()
>>> await session.connect(id="hjko98yghj")
>>> await session.connect(id=["hjko98yghj", "ikvp1jtp"])
Source code in canfar/sessions.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
async def connect(self, ids: list[str] | str) -> None:
    """Connect to a session[s] in a web browser.

    Args:
        ids (Union[List[str], str]): Session ID[s].

    Examples:
        >>> from canfar.sessions import AsyncSession
        >>> session = AsyncSession()
        >>> await session.connect(id="hjko98yghj")
        >>> await session.connect(id=["hjko98yghj", "ikvp1jtp"])
    """
    if isinstance(ids, str):
        ids = [ids]
    info = await self.info(ids)
    for session in info:
        connect_url = session.get("connectURL")
        if connect_url:
            open_new_tab(connect_url)

create async

create(
    name,
    image,
    cores=None,
    ram=None,
    kind="headless",
    gpu=None,
    cmd=None,
    args=None,
    env=None,
    replicas=1,
)

Launch a canfar session.

PARAMETER DESCRIPTION
name

A unique name for the session.

TYPE: str

image

Container image to use for the session.

TYPE: str

cores

Number of cores. Defaults to None, i.e. flexible mode.

TYPE: int DEFAULT: None

ram

Amount of RAM (GB). Defaults to None, i.e. flexible mode.

TYPE: int DEFAULT: None

kind

Type of canfar session. Defaults to "headless".

TYPE: str DEFAULT: 'headless'

gpu

Number of GPUs. Defaults to None.

TYPE: Optional[int] DEFAULT: None

cmd

Command to run. Defaults to None.

TYPE: Optional[str] DEFAULT: None

args

Arguments to the command. Defaults to None.

TYPE: Optional[str] DEFAULT: None

env

Environment variables to inject. Defaults to None.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

replicas

Number of sessions to launch. Defaults to 1.

TYPE: int DEFAULT: 1

Notes
  • If cores and ram are not specified, the session will be created with flexible resource allocation of upto 8 cores and 32GB of RAM.
  • The name of the session suffixed with the replica number. eg. test-42 when replicas > 1.
  • Each container will have the following environment variables injected:
    • REPLICA_ID - The replica number
    • REPLICA_COUNT - The total number of replicas
RETURNS DESCRIPTION
list[str]

List[str]: A list of session IDs for the launched sessions.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> session.create(
        name="test",
        image='images.canfar.net/skaha/terminal:1.1.1',
        cores=2,
        ram=8,
        gpu=1,
        kind="headless",
        cmd="env",
        env={"TEST": "test"},
        replicas=2,
    )
>>> ["hjko98yghj", "ikvp1jtp"]
Source code in canfar/sessions.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
async def create(
    self,
    name: str,
    image: str,
    cores: int | None = None,
    ram: int | None = None,
    kind: Kind = "headless",
    gpu: int | None = None,
    cmd: str | None = None,
    args: str | None = None,
    env: dict[str, Any] | None = None,
    replicas: int = 1,
) -> list[str]:
    """Launch a canfar session.

    Args:
        name (str): A unique name for the session.
        image (str): Container image to use for the session.
        cores (int, optional): Number of cores.
            Defaults to None, i.e. flexible mode.
        ram (int, optional): Amount of RAM (GB).
            Defaults to None, i.e. flexible mode.
        kind (str, optional): Type of canfar session. Defaults to "headless".
        gpu (Optional[int], optional): Number of GPUs. Defaults to None.
        cmd (Optional[str], optional): Command to run. Defaults to None.
        args (Optional[str], optional): Arguments to the command. Defaults to None.
        env (Optional[Dict[str, Any]], optional): Environment variables to inject.
            Defaults to None.
        replicas (int, optional): Number of sessions to launch. Defaults to 1.

    Notes:
        - If cores and ram are not specified, the session will be created with
          flexible resource allocation of upto 8 cores and 32GB of RAM.
        - The name of the session suffixed with the replica number. eg. test-42
          when replicas > 1.
        - Each container will have the following environment variables injected:
            * REPLICA_ID - The replica number
            * REPLICA_COUNT - The total number of replicas

    Returns:
        List[str]: A list of session IDs for the launched sessions.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> session.create(
                name="test",
                image='images.canfar.net/skaha/terminal:1.1.1',
                cores=2,
                ram=8,
                gpu=1,
                kind="headless",
                cmd="env",
                env={"TEST": "test"},
                replicas=2,
            )
        >>> ["hjko98yghj", "ikvp1jtp"]
    """
    payloads: list[list[tuple[str, Any]]] = build.create_parameters(
        name,
        image,
        cores,
        ram,
        kind,
        gpu,
        cmd,
        args,
        env,
        replicas,
    )
    results: list[str] = []
    tasks: list[Any] = []
    semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)

    async def bounded(parameters: list[tuple[str, Any]]) -> Any:
        async with semaphore:
            log.debug("HTTP Request Parameters: %s", parameters)
            response = await self.asynclient.post(url="session", params=parameters)
            return response.text.rstrip("\r\n")

    tasks = [bounded(payload) for payload in payloads]
    msg = f"Creating {replicas} {kind} session[s]."
    log.debug(msg)
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    for reply in responses:
        if isinstance(reply, Exception):
            log.error(reply)
        elif isinstance(reply, str):
            results.append(reply)
    return results

destroy async

destroy(ids)

Destroy session[s].

PARAMETER DESCRIPTION
ids

Session ID[s].

TYPE: Union[str, List[str]]

RETURNS DESCRIPTION
dict[str, bool]

Dict[str, bool]: A dictionary of session IDs

dict[str, bool]

and a bool indicating if the session was destroyed.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.destroy(id="hjko98yghj")
>>> await session.destroy(id=["hjko98yghj", "ikvp1jtp"])
Source code in canfar/sessions.py
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
async def destroy(self, ids: str | list[str]) -> dict[str, bool]:
    """Destroy session[s].

    Args:
        ids (Union[str, List[str]]): Session ID[s].

    Returns:
        Dict[str, bool]: A dictionary of session IDs
        and a bool indicating if the session was destroyed.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.destroy(id="hjko98yghj")
        >>> await session.destroy(id=["hjko98yghj", "ikvp1jtp"])
    """
    if isinstance(ids, str):
        ids = [ids]
    results: dict[str, bool] = {}
    semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
    tasks: list[Any] = []

    async def bounded(value: str) -> tuple[str, bool]:
        async with semaphore:
            try:
                await self.asynclient.delete(url=f"session/{value}")
            except HTTPError as err:
                msg = f"Failed to destroy session {value}: {err}"
                log.exception(msg)
                return value, False
            else:
                return value, True

    tasks = [bounded(value) for value in ids]
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    for reply in responses:
        if isinstance(reply, tuple):
            results[reply[0]] = reply[1]
    return results

destroy_with async

destroy_with(prefix, kind='headless', status='Completed')

Destroy session[s] matching search criteria.

PARAMETER DESCRIPTION
prefix

Prefix to match in the session name.

TYPE: str

kind

Type of session. Defaults to "headless".

TYPE: Kind DEFAULT: 'headless'

status

Status of the session. Defaults to "Completed".

TYPE: Status DEFAULT: 'Completed'

RETURNS DESCRIPTION
dict[str, bool]

Dict[str, bool]: A dictionary of session IDs

dict[str, bool]

and a bool indicating if the session was destroyed.

Notes

The prefix is case-sensitive. This method is useful for destroying multiple sessions at once.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.destroy_with(prefix="test")
>>> await session.destroy_with(prefix="test", kind="desktop")
>>> await session.destroy_with(prefix="car", kind="carta", status="Running")
Source code in canfar/sessions.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
async def destroy_with(
    self,
    prefix: str,
    kind: Kind = "headless",
    status: Status = "Completed",
) -> dict[str, bool]:
    """Destroy session[s] matching search criteria.

    Args:
        prefix (str): Prefix to match in the session name.
        kind (Kind): Type of session. Defaults to "headless".
        status (Status): Status of the session. Defaults to "Completed".


    Returns:
        Dict[str, bool]: A dictionary of session IDs
        and a bool indicating if the session was destroyed.

    Notes:
        The prefix is case-sensitive.
        This method is useful for destroying multiple sessions at once.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.destroy_with(prefix="test")
        >>> await session.destroy_with(prefix="test", kind="desktop")
        >>> await session.destroy_with(prefix="car", kind="carta", status="Running")

    """
    ids: list[str] = [
        session["id"]
        for session in await self.fetch(kind=kind, status=status)
        if session["name"].startswith(prefix)
    ]
    return await self.destroy(ids)

events async

events(ids, verbose=False)

Get deployment events for a session[s].

PARAMETER DESCRIPTION
ids

Session ID[s].

TYPE: Union[str, List[str]]

verbose

Print events to stdout. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
list[dict[str, str]] | None

Optional[List[Dict[str, str]]]: A list of events for the session[s].

Notes

When verbose is True, the events will be printed to stdout only.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.events(id="hjko98yghj")
>>> await session.events(id=["hjko98yghj", "ikvp1jtp"])
Source code in canfar/sessions.py
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
async def events(
    self,
    ids: str | list[str],
    verbose: bool = False,
) -> list[dict[str, str]] | None:
    """Get deployment events for a session[s].

    Args:
        ids (Union[str, List[str]]): Session ID[s].
        verbose (bool, optional): Print events to stdout. Defaults to False.

    Returns:
        Optional[List[Dict[str, str]]]: A list of events for the session[s].

    Notes:
        When verbose is True, the events will be printed to stdout only.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.events(id="hjko98yghj")
        >>> await session.events(id=["hjko98yghj", "ikvp1jtp"])
    """
    if isinstance(ids, str):
        ids = [ids]
    results: list[dict[str, str]] = []
    parameters: dict[str, str] = {"view": "events"}
    tasks: list[Any] = []
    semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)

    async def bounded(value: str) -> dict[str, str]:
        async with semaphore:
            response = await self.asynclient.get(
                url=f"session/{value}",
                params=parameters,
            )
            return {value: response.text}

    tasks = [bounded(value) for value in ids]
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    for reply in responses:
        if isinstance(reply, Exception):
            log.error(reply)
        elif isinstance(reply, dict):
            results.append(dict(reply))

    if verbose and results:
        for result in results:
            for key, value in result.items():
                log.info("Session ID: %s", key)
                log.info(value)
    return results if not verbose else None

fetch async

fetch(kind=None, status=None, view=None)

List open sessions for the user.

PARAMETER DESCRIPTION
kind

Session kind. Defaults to None.

TYPE: Kind | None DEFAULT: None

status

Session status. Defaults to None.

TYPE: Status | None DEFAULT: None

view

Session view level. Defaults to None.

TYPE: View | None DEFAULT: None

Notes

By default, only the calling user's sessions are listed. If views is set to 'all', all user sessions are listed (with limited information).

RETURNS DESCRIPTION
list

Sessions information.

TYPE: list[dict[str, str]]

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.fetch(kind="notebook")
[{'id': 'vl91sfzz',
'userid': 'brars',
'runAsUID': '166169204',
'runAsGID': '166169204',
'supplementalGroups': [34241,
34337,
35124,
36227,
1902365706,
1454823273,
1025424273],
'appid': '<none>',
'image': 'image-server/repo/image:version',
'type': 'notebook',
'status': 'Running',
'name': 'notebook1',
'startTime': '2025-03-05T21:48:29Z',
'expiryTime': '2025-03-09T21:48:29Z',
'connectURL': 'https://canfar.net/session/notebook/some/url',
'requestedRAM': '8G',
'requestedCPUCores': '2',
'requestedGPUCores': '0',
'ramInUse': '<none>',
'gpuRAMInUse': '<none>',
'cpuCoresInUse': '<none>',
'gpuUtilization': '<none>'}]
Source code in canfar/sessions.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
async def fetch(
    self,
    kind: Kind | None = None,
    status: Status | None = None,
    view: View | None = None,
) -> list[dict[str, str]]:
    """List open sessions for the user.

    Args:
        kind (Kind | None, optional): Session kind. Defaults to None.
        status (Status | None, optional): Session status. Defaults to None.
        view (View | None, optional): Session view level. Defaults to None.

    Notes:
        By default, only the calling user's sessions are listed. If views is
        set to 'all', all user sessions are listed (with limited information).

    Returns:
        list: Sessions information.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.fetch(kind="notebook")
        [{'id': 'vl91sfzz',
        'userid': 'brars',
        'runAsUID': '166169204',
        'runAsGID': '166169204',
        'supplementalGroups': [34241,
        34337,
        35124,
        36227,
        1902365706,
        1454823273,
        1025424273],
        'appid': '<none>',
        'image': 'image-server/repo/image:version',
        'type': 'notebook',
        'status': 'Running',
        'name': 'notebook1',
        'startTime': '2025-03-05T21:48:29Z',
        'expiryTime': '2025-03-09T21:48:29Z',
        'connectURL': 'https://canfar.net/session/notebook/some/url',
        'requestedRAM': '8G',
        'requestedCPUCores': '2',
        'requestedGPUCores': '0',
        'ramInUse': '<none>',
        'gpuRAMInUse': '<none>',
        'cpuCoresInUse': '<none>',
        'gpuUtilization': '<none>'}]
    """
    parameters: dict[str, Any] = build.fetch_parameters(kind, status, view)
    response: Response = await self.asynclient.get(url="session", params=parameters)
    data: list[dict[str, str]] = response.json()
    log.debug(data)
    return data

info async

info(ids)

Get information about session[s].

PARAMETER DESCRIPTION
ids

Session ID[s].

TYPE: Union[List[str], str]

RETURNS DESCRIPTION
list[dict[str, Any]]

Dict[str, Any]: Session information.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.info(session_id="hjko98yghj")
>>> await session.info(id=["hjko98yghj", "ikvp1jtp"])
Source code in canfar/sessions.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
async def info(self, ids: list[str] | str) -> list[dict[str, Any]]:
    """Get information about session[s].

    Args:
        ids (Union[List[str], str]): Session ID[s].

    Returns:
        Dict[str, Any]: Session information.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.info(session_id="hjko98yghj")
        >>> await session.info(id=["hjko98yghj", "ikvp1jtp"])
    """
    # Convert id to list if it is a string
    if isinstance(ids, str):
        ids = [ids]
    results: list[dict[str, Any]] = []
    tasks: list[Any] = []
    semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)

    async def bounded(value: str) -> dict[str, Any]:
        async with semaphore:
            response = await self.asynclient.get(url=f"session/{value}")
            data: dict[str, Any] = response.json()
            return data

    tasks = [bounded(value) for value in ids]
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    for reply in responses:
        if isinstance(reply, Exception):
            log.error(reply)
        elif isinstance(reply, dict):
            results.append(reply)
    return results

logs async

logs(ids, verbose=False)

Get logs from a session[s].

PARAMETER DESCRIPTION
ids

Session ID[s].

TYPE: Union[List[str], str]

verbose

Print logs to stdout. Defaults to False.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
dict[str, str] | None

Dict[str, str]: Logs in text/plain format.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.logs(id="hjko98yghj")
>>> await session.logs(id=["hjko98yghj", "ikvp1jtp"])
Source code in canfar/sessions.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
async def logs(
    self,
    ids: list[str] | str,
    verbose: bool = False,
) -> dict[str, str] | None:
    """Get logs from a session[s].

    Args:
        ids (Union[List[str], str]): Session ID[s].
        verbose (bool, optional): Print logs to stdout. Defaults to False.

    Returns:
        Dict[str, str]: Logs in text/plain format.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.logs(id="hjko98yghj")
        >>> await session.logs(id=["hjko98yghj", "ikvp1jtp"])
    """
    if isinstance(ids, str):
        ids = [ids]
    parameters: dict[str, str] = {"view": "logs"}
    results: dict[str, str] = {}

    semaphore: asyncio.Semaphore = asyncio.Semaphore(self.concurrency)
    tasks: list[Any] = []

    async def bounded(value: str) -> tuple[str, str]:
        async with semaphore:
            response = await self.asynclient.get(
                url=f"session/{value}",
                params=parameters,
            )
            return value, response.text

    tasks = [bounded(value) for value in ids]
    responses = await asyncio.gather(*tasks, return_exceptions=True)
    for reply in responses:
        if isinstance(reply, Exception):
            log.error(reply)
        elif isinstance(reply, tuple):
            results[reply[0]] = reply[1]

    # Print logs to stdout if verbose is set to True
    if verbose:
        for key, value in results.items():
            log.info("Session ID: %s\n", key)
            log.info(value)
        return None
    return results

stats async

stats()

Get statistics for the canfar cluster.

RETURNS DESCRIPTION
dict[str, Any]

Dict[str, Any]: Cluster statistics.

Examples:

>>> from canfar.session import AsyncSession
>>> session = AsyncSession()
>>> await session.stats()
{'instances': {
 'session': 88, 'desktopApp': 30, 'headless': 0, 'total': 118},
 'cores': {'requestedCPUCores': 377,
 'coresAvailable': 960,
 'maxCores': {'cores': 32, 'withRam': '147Gi'}},
 'ram': {'maxRAM': {'ram': '226Gi', 'withCores': 32}}}
Source code in canfar/sessions.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
async def stats(self) -> dict[str, Any]:
    """Get statistics for the canfar cluster.

    Returns:
        Dict[str, Any]: Cluster statistics.

    Examples:
        >>> from canfar.session import AsyncSession
        >>> session = AsyncSession()
        >>> await session.stats()
        {'instances': {
         'session': 88, 'desktopApp': 30, 'headless': 0, 'total': 118},
         'cores': {'requestedCPUCores': 377,
         'coresAvailable': 960,
         'maxCores': {'cores': 32, 'withRam': '147Gi'}},
         'ram': {'maxRAM': {'ram': '226Gi', 'withCores': 32}}}
    """
    parameters = {"view": "stats"}
    response: Response = await self.asynclient.get("session", params=parameters)
    data: dict[str, Any] = response.json()
    return data