Skip to content

I/O Utilities

The io module provides a collection of utility functions for handling various input/output operations, including file system interactions, and working with different file formats.

copy_dir(src, dst, replace=False)

Copies a directory tree from the source to the destination.

Parameters:

Name Type Description Default
src PathLike

The source directory path.

required
dst PathLike

The destination directory path.

required
replace bool

If True, the destination directory will be deleted if it already exists. Defaults to False.

False

Raises:

Type Description
FileNotFoundError

If the source directory does not exist.

FileExistsError

If the destination directory already exists and replace is False.

Example

Copy a directory to a new destination:

# Assuming 'source_folder' exists
oc.io.copy_dir("source_folder", "destination_folder")


Replace an existing destination directory:

# Assuming 'source_folder' and 'destination_folder' exist
oc.io.copy_dir("source_folder", "destination_folder", replace=True)
Output:
Replacing directory destination_folder
Directory deleted: destination_folder

Source code in opencrate/core/utils/io/system.py
def copy_dir(src: PathLike, dst: PathLike, replace: bool = False) -> None:
    """Copies a directory tree from the source to the destination.

    Args:
        src (PathLike): The source directory path.
        dst (PathLike): The destination directory path.
        replace (bool): If True, the destination directory will be deleted if
            it already exists. Defaults to False.

    Raises:
        FileNotFoundError: If the source directory does not exist.
        FileExistsError: If the destination directory already exists and
            `replace` is False.

    Example:
        Copy a directory to a new destination:
        ```python
        # Assuming 'source_folder' exists
        oc.io.copy_dir("source_folder", "destination_folder")
        ```
        ---
        Replace an existing destination directory:
        ```python
        # Assuming 'source_folder' and 'destination_folder' exist
        oc.io.copy_dir("source_folder", "destination_folder", replace=True)
        ```
        Output:
        ```
        Replacing directory destination_folder
        Directory deleted: destination_folder
        ```
    """
    src_path = ensure_dir_exists(src)
    handle_replace(dst, replace)
    shutil.copytree(src_path, dst)

create_archive(output_filename, source_dir, format='zip')

Creates an archive from a directory.

Parameters:

Name Type Description Default
output_filename str

The name of the archive file (without extension).

required
source_dir PathLike

The path to the source directory.

required
format str

The archive format. Valid formats are: 'zip', 'tar', 'gztar', 'bztar', and 'xztar'. Defaults to 'zip'.

'zip'
Example

Create a zip archive:

# Assuming 'my_folder' exists
oc.io.create_archive("archive", "my_folder", format="zip")
Output:
Archive created: archive.zip


Create a gzipped tar archive for better compression:

oc.io.create_archive("archive", "my_folder", format="gztar")
Output:
Archive created: archive.tar.gz

Source code in opencrate/core/utils/io/system.py
def create_archive(output_filename: str, source_dir: PathLike, format: str = "zip") -> None:
    """Creates an archive from a directory.

    Args:
        output_filename (str): The name of the archive file (without extension).
        source_dir (PathLike): The path to the source directory.
        format (str): The archive format. Valid formats are: 'zip', 'tar',
            'gztar', 'bztar', and 'xztar'. Defaults to 'zip'.

    Example:
        Create a zip archive:
        ```python
        # Assuming 'my_folder' exists
        oc.io.create_archive("archive", "my_folder", format="zip")
        ```
        Output:
        ```
        Archive created: archive.zip
        ```
        ---
        Create a gzipped tar archive for better compression:
        ```python
        oc.io.create_archive("archive", "my_folder", format="gztar")
        ```
        Output:
        ```
        Archive created: archive.tar.gz
        ```
    """
    valid_formats = [name for name, _ in shutil.get_archive_formats()]
    if format not in valid_formats:
        raise ValueError(f"Invalid archive format: {format}. Valid formats are: {', '.join(valid_formats)}")

    source_path = ensure_dir_exists(source_dir)
    archive_path = shutil.make_archive(output_filename, format, source_path)
    print(f"Archive created: {archive_path}")

create_dir(path, replace=False)

Creates a directory at the specified path.

Parameters:

Name Type Description Default
path PathLike

The path to the directory to be created.

required
replace bool

If True, the directory will be deleted if it already exists. Defaults to False.

False

Raises:

Type Description
FileExistsError

If the directory already exists and replace is False.

Example

Create a new directory:

oc.io.create_dir("new_folder")
Output:
Directory created: new_folder


Replace an existing directory:

# Assuming 'existing_folder' already exists
oc.io.create_dir("existing_folder", replace=True)
Output:
Replacing directory existing_folder
Directory deleted: existing_folder
Directory created: existing_folder

Source code in opencrate/core/utils/io/system.py
def create_dir(path: PathLike, replace: bool = False) -> None:
    """Creates a directory at the specified path.

    Args:
        path (PathLike): The path to the directory to be created.
        replace (bool): If True, the directory will be deleted if it already
            exists. Defaults to False.

    Raises:
        FileExistsError: If the directory already exists and `replace` is False.

    Example:
        Create a new directory:
        ```python
        oc.io.create_dir("new_folder")
        ```
        Output:
        ```
        Directory created: new_folder
        ```
        ---
        Replace an existing directory:
        ```python
        # Assuming 'existing_folder' already exists
        oc.io.create_dir("existing_folder", replace=True)
        ```
        Output:
        ```
        Replacing directory existing_folder
        Directory deleted: existing_folder
        Directory created: existing_folder
        ```
    """
    p = Path(path)
    handle_replace(p, replace)
    p.mkdir(parents=True, exist_ok=True)
    print(f"Directory created: {p}")

delete_dir(path)

Deletes a directory and all its contents recursively.

Parameters:

Name Type Description Default
path PathLike

The path to the directory to be deleted.

required

Raises:

Type Description
FileNotFoundError

If the directory does not exist.

Example

Delete a directory:

# Assuming 'old_folder' exists
oc.io.delete_dir("old_folder")
Output:
Directory deleted: old_folder


Raise an error if the directory does not exist:

oc.io.delete_dir("non_existent_folder")
Output:
FileNotFoundError:

Directory not found: non_existent_folder

Source code in opencrate/core/utils/io/system.py
def delete_dir(path: PathLike) -> None:
    """Deletes a directory and all its contents recursively.

    Args:
        path (PathLike): The path to the directory to be deleted.

    Raises:
        FileNotFoundError: If the directory does not exist.

    Example:
        Delete a directory:
        ```python
        # Assuming 'old_folder' exists
        oc.io.delete_dir("old_folder")
        ```
        Output:
        ```
        Directory deleted: old_folder
        ```
        ---
        Raise an error if the directory does not exist:
        ```python
        oc.io.delete_dir("non_existent_folder")
        ```
        Output:
        ```
        FileNotFoundError:

        Directory not found: non_existent_folder

        ```
    """
    p = ensure_dir_exists(path)
    shutil.rmtree(p)
    print(f"Directory deleted: {p}")

delete_file(file_path)

Deletes a file at the specified path.

Parameters:

Name Type Description Default
file_path PathLike

The path to the file.

required

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Example

Delete a file:

# Assuming 'example.txt' exists
oc.io.delete_file("example.txt")
Output:
File deleted: example.txt


Raise an error if the file does not exist:

oc.io.delete_file("non_existent_file.txt")
Output:
FileNotFoundError:

File not found: non_existent_file.txt

Source code in opencrate/core/utils/io/system.py
def delete_file(file_path: PathLike) -> None:
    """Deletes a file at the specified path.

    Args:
        file_path (PathLike): The path to the file.

    Raises:
        FileNotFoundError: If the file does not exist.

    Example:
        Delete a file:
        ```python
        # Assuming 'example.txt' exists
        oc.io.delete_file("example.txt")
        ```
        Output:
        ```
        File deleted: example.txt
        ```
        ---
        Raise an error if the file does not exist:
        ```python
        oc.io.delete_file("non_existent_file.txt")
        ```
        Output:
        ```
        FileNotFoundError:

        File not found: non_existent_file.txt

        ```
    """
    p = ensure_file_exists(file_path)
    p.unlink()
    print(f"File deleted: {p}")

download_file(url, file_path, replace=False)

Downloads a file from a URL and saves it to the specified path.

Parameters:

Name Type Description Default
url str

The URL of the file to download.

required
file_path PathLike

The path to save the downloaded file.

required
replace bool

If True, the file will be deleted if it already exists. Defaults to False.

False

Raises:

Type Description
FileExistsError

If the file already exists and replace is False.

Example

Download a file:

oc.io.download_file("https://example.com/file.txt", "downloaded_file.txt")
Output:
File downloaded: downloaded_file.txt


Download and replace an existing file:

oc.io.download_file("https://example.com/file.txt", "downloaded_file.txt", replace=True)
Output:
Replacing file downloaded_file.txt
File deleted: downloaded_file.txt
File downloaded: downloaded_file.txt

Source code in opencrate/core/utils/io/system.py
def download_file(url: str, file_path: PathLike, replace: bool = False) -> None:
    """Downloads a file from a URL and saves it to the specified path.

    Args:
        url (str): The URL of the file to download.
        file_path (PathLike): The path to save the downloaded file.
        replace (bool): If True, the file will be deleted if it already exists.
            Defaults to False.

    Raises:
        FileExistsError: If the file already exists and `replace` is False.

    Example:
        Download a file:
        ```python
        oc.io.download_file("https://example.com/file.txt", "downloaded_file.txt")
        ```
        Output:
        ```
        File downloaded: downloaded_file.txt
        ```
        ---
        Download and replace an existing file:
        ```python
        oc.io.download_file("https://example.com/file.txt", "downloaded_file.txt", replace=True)
        ```
        Output:
        ```
        Replacing file downloaded_file.txt
        File deleted: downloaded_file.txt
        File downloaded: downloaded_file.txt
        ```
    """
    p = Path(file_path)
    handle_replace(p, replace)
    response = requests.get(url)
    response.raise_for_status()
    p.write_bytes(response.content)
    print(f"File downloaded: {p}")

ensure_dir_exists(path)

Ensures that a directory exists.

Parameters:

Name Type Description Default
path PathLike

The path to the directory.

required

Raises:

Type Description
FileNotFoundError

If the path does not exist or is not a directory.

Returns:

Name Type Description
Path Path

The Path object of the directory.

Example

Check if a directory exists:

# Assuming 'my_folder' exists
oc.io.ensure_dir_exists("my_folder")


Raise an error if the directory does not exist:

oc.io.ensure_dir_exists("non_existent_folder")
Output:
FileNotFoundError:

Directory not found: non_existent_folder

Source code in opencrate/core/utils/io/system.py
def ensure_dir_exists(path: PathLike) -> Path:
    """Ensures that a directory exists.

    Args:
        path (PathLike): The path to the directory.

    Raises:
        FileNotFoundError: If the path does not exist or is not a directory.

    Returns:
        Path: The Path object of the directory.

    Example:
        Check if a directory exists:
        ```python
        # Assuming 'my_folder' exists
        oc.io.ensure_dir_exists("my_folder")
        ```
        ---
        Raise an error if the directory does not exist:
        ```python
        oc.io.ensure_dir_exists("non_existent_folder")
        ```
        Output:
        ```
        FileNotFoundError:

        Directory not found: non_existent_folder

        ```
    """
    p = Path(path)
    if not p.is_dir():
        raise FileNotFoundError(f"\n\nDirectory not found: {path}\n")
    return p

ensure_file_exists(path)

Ensures that a file exists.

Parameters:

Name Type Description Default
path PathLike

The path to the file.

required

Raises:

Type Description
FileNotFoundError

If the path does not exist or is not a file.

Returns:

Name Type Description
Path Path

The Path object of the file.

Example

Check if a file exists:

# Assuming 'my_file.txt' exists
oc.io.ensure_file_exists("my_file.txt")


Raise an error if the file does not exist:

oc.io.ensure_file_exists("non_existent_file.txt")
Output:
FileNotFoundError:

File not found: non_existent_file.txt

Source code in opencrate/core/utils/io/system.py
def ensure_file_exists(path: PathLike) -> Path:
    """Ensures that a file exists.

    Args:
        path (PathLike): The path to the file.

    Raises:
        FileNotFoundError: If the path does not exist or is not a file.

    Returns:
        Path: The Path object of the file.

    Example:
        Check if a file exists:
        ```python
        # Assuming 'my_file.txt' exists
        oc.io.ensure_file_exists("my_file.txt")
        ```
        ---
        Raise an error if the file does not exist:
        ```python
        oc.io.ensure_file_exists("non_existent_file.txt")
        ```
        Output:
        ```
        FileNotFoundError:

        File not found: non_existent_file.txt

        ```
    """
    p = Path(path)
    if not p.is_file():
        raise FileNotFoundError(f"\n\nFile not found: {path}\n")
    return p

extract_archive(archive_file, dest_dir, replace=True)

Extracts an archive to a directory.

Supports formats: zip, tar, gztar, bztar, and xztar.

Parameters:

Name Type Description Default
archive_file PathLike

The path to the archive.

required
dest_dir PathLike

The destination directory to extract the archive to.

required
replace bool

If True, the destination directory will be deleted if it already exists. Defaults to True.

True
Example

Extract an archive to a new directory:

# Assuming 'archive.zip' exists
oc.io.extract_archive("archive.zip", "extracted_folder")
Output:
Archive extracted: archive.zip


Extract and replace an existing directory:

# Assuming 'archive.zip' and 'extracted_folder' exist
oc.io.extract_archive("archive.zip", "extracted_folder", replace=True)
Output:
Replacing directory extracted_folder
Directory deleted: extracted_folder
Archive extracted: archive.zip

Source code in opencrate/core/utils/io/system.py
def extract_archive(archive_file: PathLike, dest_dir: PathLike, replace: bool = True) -> None:
    """Extracts an archive to a directory.

    Supports formats: zip, tar, gztar, bztar, and xztar.

    Args:
        archive_file (PathLike): The path to the archive.
        dest_dir (PathLike): The destination directory to extract the archive to.
        replace (bool): If True, the destination directory will be deleted if
            it already exists. Defaults to True.

    Example:
        Extract an archive to a new directory:
        ```python
        # Assuming 'archive.zip' exists
        oc.io.extract_archive("archive.zip", "extracted_folder")
        ```
        Output:
        ```
        Archive extracted: archive.zip
        ```
        ---
        Extract and replace an existing directory:
        ```python
        # Assuming 'archive.zip' and 'extracted_folder' exist
        oc.io.extract_archive("archive.zip", "extracted_folder", replace=True)
        ```
        Output:
        ```
        Replacing directory extracted_folder
        Directory deleted: extracted_folder
        Archive extracted: archive.zip
        ```
    """
    archive_path = ensure_file_exists(archive_file)
    dest_path = Path(dest_dir)
    handle_replace(dest_path, replace)
    shutil.unpack_archive(archive_path, dest_path)
    print(f"Archive extracted: {archive_path}")

get_file_extension(file_path)

Returns the extension of a file from its path.

Parameters:

Name Type Description Default
file_path PathLike

The path to the file.

required

Returns:

Name Type Description
str str

The extension of the file (without the dot).

Example

Get the extension of a file:

ext = oc.io.get_file_extension("example.txt")
print(ext)
Output:
'txt'

Source code in opencrate/core/utils/io/system.py
def get_file_extension(file_path: PathLike) -> str:
    """Returns the extension of a file from its path.

    Args:
        file_path (PathLike): The path to the file.

    Returns:
        str: The extension of the file (without the dot).

    Example:
        Get the extension of a file:
        ```python
        ext = oc.io.get_file_extension("example.txt")
        print(ext)
        ```
        Output:
        ```
        'txt'
        ```
    """
    # return Path(file_path).suffix.lstrip(".")
    return os.path.splitext(file_path)[1][1:]

get_file_name(file_path)

Returns the name of a file from its path.

Parameters:

Name Type Description Default
file_path PathLike

The path to the file.

required

Returns:

Name Type Description
str str

The name of the file.

Example

Get the name of a file:

name = oc.io.get_file_name("path/to/example.txt")
print(name)
Output:
'example.txt'

Source code in opencrate/core/utils/io/system.py
def get_file_name(file_path: PathLike) -> str:
    """Returns the name of a file from its path.

    Args:
        file_path (PathLike): The path to the file.

    Returns:
        str: The name of the file.

    Example:
        Get the name of a file:
        ```python
        name = oc.io.get_file_name("path/to/example.txt")
        print(name)
        ```
        Output:
        ```
        'example.txt'
        ```
    """
    return os.path.basename(file_path)

get_parent_dir(path)

Returns the parent directory of a file or directory.

Parameters:

Name Type Description Default
path PathLike

The path to the file or directory.

required

Returns:

Name Type Description
Path Path

The path to the parent directory.

Example

Get the parent directory:

parent = oc.io.get_parent_dir("path/to/example.txt")
print(parent)
Output:
path/to

Source code in opencrate/core/utils/io/system.py
def get_parent_dir(path: PathLike) -> Path:
    """Returns the parent directory of a file or directory.

    Args:
        path (PathLike): The path to the file or directory.

    Returns:
        Path: The path to the parent directory.

    Example:
        Get the parent directory:
        ```python
        parent = oc.io.get_parent_dir("path/to/example.txt")
        print(parent)
        ```
        Output:
        ```
        path/to
        ```
    """
    # return Path(path).parent
    return Path(os.path.dirname(path))

get_size(path, unit=None)

Returns the size of a file or directory in human-readable format.

This function calculates the total size of a file or directory (including all subdirectories and files recursively) and returns it in a human-readable format with appropriate units (Bytes, KB, MB, GB, TB). By default, it automatically selects the most suitable unit, but you can specify a particular unit if needed.

Parameters:

Name Type Description Default
path PathLike

The path to the file or directory.

required
unit Optional[str]

The unit to use for the size. Valid values are 'Bytes', 'KB', 'MB', 'GB', 'TB'. If None, automatically selects the most suitable unit. Defaults to None.

None

Returns:

Name Type Description
str str

The size in the specified or auto-selected unit, formatted with up to 2 decimal places (e.g., '323 Bytes', '12.52 GB', '53.45 MB').

Raises:

Type Description
FileNotFoundError

If the path does not exist.

ValueError

If an invalid unit is specified.

Example

Get the size with automatic unit selection:

size = oc.io.get_size("example.txt")
print(size)
Output:
'1.23 KB'


Get the size of a directory:

size = oc.io.get_size("my_folder")
print(size)
Output:
'45.67 MB'


Get the size in a specific unit:

size = oc.io.get_size("large_file.zip", unit="GB")
print(size)
Output:
'2.34 GB'


Get the size in bytes:

size = oc.io.get_size("small.txt", unit="Bytes")
print(size)
Output:
'323 Bytes'

Source code in opencrate/core/utils/io/system.py
def get_size(path: PathLike, unit: Optional[str] = None) -> str:
    """Returns the size of a file or directory in human-readable format.

    This function calculates the total size of a file or directory (including all
    subdirectories and files recursively) and returns it in a human-readable format
    with appropriate units (Bytes, KB, MB, GB, TB). By default, it automatically
    selects the most suitable unit, but you can specify a particular unit if needed.

    Args:
        path (PathLike): The path to the file or directory.
        unit (Optional[str]): The unit to use for the size. Valid values are
            'Bytes', 'KB', 'MB', 'GB', 'TB'. If None, automatically selects
            the most suitable unit. Defaults to None.

    Returns:
        str: The size in the specified or auto-selected unit, formatted with
            up to 2 decimal places (e.g., '323 Bytes', '12.52 GB', '53.45 MB').

    Raises:
        FileNotFoundError: If the path does not exist.
        ValueError: If an invalid unit is specified.

    Example:
        Get the size with automatic unit selection:
        ```python
        size = oc.io.get_size("example.txt")
        print(size)
        ```
        Output:
        ```
        '1.23 KB'
        ```
        ---
        Get the size of a directory:
        ```python
        size = oc.io.get_size("my_folder")
        print(size)
        ```
        Output:
        ```
        '45.67 MB'
        ```
        ---
        Get the size in a specific unit:
        ```python
        size = oc.io.get_size("large_file.zip", unit="GB")
        print(size)
        ```
        Output:
        ```
        '2.34 GB'
        ```
        ---
        Get the size in bytes:
        ```python
        size = oc.io.get_size("small.txt", unit="Bytes")
        print(size)
        ```
        Output:
        ```
        '323 Bytes'
        ```
    """
    p = Path(path)
    if not p.exists():
        raise FileNotFoundError(f"Path not found: {p}")

    # Valid units and their conversion factors
    valid_units = ["Bytes", "KB", "MB", "GB", "TB"]
    if unit is not None and unit not in valid_units:
        raise ValueError(f"Invalid unit: {unit}. Valid units are: {', '.join(valid_units)}")

    # Calculate total size in bytes
    if p.is_file():
        size_bytes = p.stat().st_size
    else:
        size_bytes = 0
        try:
            for item in p.rglob("*"):
                if item.is_file():
                    try:
                        size_bytes += item.stat().st_size
                    except (OSError, PermissionError):
                        pass
        except (OSError, PermissionError):
            pass

    # Convert to specified unit or auto-select
    if unit is None:
        # Auto-select the most suitable unit
        if size_bytes == 0:
            return "0 Bytes"

        size_value = float(size_bytes)
        unit_index = 0
        while size_value >= 1024 and unit_index < len(valid_units) - 1:
            size_value /= 1024.0
            unit_index += 1

        selected_unit = valid_units[unit_index]
        # Format: no decimals for Bytes, up to 2 decimals for others
        if selected_unit == "Bytes":
            return f"{int(size_value)} {selected_unit}"
        else:
            return f"{size_value:.2f} {selected_unit}"
    else:
        # Convert to specified unit
        unit_index = valid_units.index(unit)
        size_value = float(size_bytes)

        # Convert bytes to the specified unit
        for _ in range(unit_index):
            size_value /= 1024.0

        # Format: no decimals for Bytes, up to 2 decimals for others
        if unit == "Bytes":
            return f"{int(size_value)} {unit}"
        else:
            return f"{size_value:.2f} {unit}"

handle_replace(path, replace)

Handles the replacement of a file or directory if it exists.

Parameters:

Name Type Description Default
path PathLike

The path to the file or directory.

required
replace bool

If True, the file or directory will be deleted if it already exists.

required

Raises:

Type Description
FileExistsError

If the file or directory already exists and replace is False.

Example

Replace an existing file:

# Assuming 'data.txt' is a file that exists
oc.io.handle_replace("data.txt", replace=True)
Output:
Replacing file data.txt
File deleted: data.txt


Replace an existing directory:

# Assuming 'my_folder' is a directory that exists
oc.io.handle_replace("my_folder", replace=True)
Output:
Replacing directory my_folder
Directory deleted: my_folder


Raise an error if the path already exists and replace is False:

oc.io.handle_replace("data.txt", replace=False)
Output:
FileExistsError:
Path already exists: data.txt.
Pass `replace=True` if you want to replace the existing file or directory.

Source code in opencrate/core/utils/io/system.py
def handle_replace(path: PathLike, replace: bool) -> None:
    """Handles the replacement of a file or directory if it exists.

    Args:
        path (PathLike): The path to the file or directory.
        replace (bool): If True, the file or directory will be deleted if it
            already exists.

    Raises:
        FileExistsError: If the file or directory already exists and `replace`
            is False.

    Example:
        Replace an existing file:
        ```python
        # Assuming 'data.txt' is a file that exists
        oc.io.handle_replace("data.txt", replace=True)
        ```
        Output:
        ```
        Replacing file data.txt
        File deleted: data.txt
        ```
        ---
        Replace an existing directory:
        ```python
        # Assuming 'my_folder' is a directory that exists
        oc.io.handle_replace("my_folder", replace=True)
        ```
        Output:
        ```
        Replacing directory my_folder
        Directory deleted: my_folder
        ```
        ---
        Raise an error if the path already exists and replace is False:
        ```python
        oc.io.handle_replace("data.txt", replace=False)
        ```
        Output:
        ```
        FileExistsError:
        Path already exists: data.txt.
        Pass `replace=True` if you want to replace the existing file or directory.
        ```
    """
    p = Path(path)
    if p.exists():
        if replace:
            if p.is_dir():
                print(f"Replacing directory {p}")
                delete_dir(p)
            else:
                print(f"Replacing file {p}")
                delete_file(p)
        else:
            raise FileExistsError(f"\nPath already exists: {p}.\nPass `replace=True` if you want to replace the existing file or directory.\n")

list_dir(dir, extension=None, recursive=True)

Recursively lists all files in a directory tree.

Parameters:

Name Type Description Default
dir str

The path to the directory.

required
extension Optional[List[str] | str]

The file extension(s) to filter by. Can be a single extension as a string or a list of extensions. Defaults to None.

None

Returns:

Type Description
List[str]

List[str]: A list of file paths.

Raises:

Type Description
FileNotFoundError

If the directory does not exist.

Example

List all files:

files = oc.io.list_dir("my_folder")
print(files)
Output:
['my_folder/file1.txt', 'my_folder/subfolder/file2.log']


List only files with a specific extension:

files = oc.io.list_dir("my_folder", extension="txt")
print(files)
Output:
['my_folder/file1.txt']


List files with multiple extensions:

files = oc.io.list_dir("my_folder", extension=["txt", "log"])
print(files)
Output:
['my_folder/file1.txt', 'my_folder/subfolder/file2.log']

Source code in opencrate/core/utils/io/system.py
def list_dir(dir: str, extension: Optional[Union[List[str], str]] = None, recursive: bool = True) -> List[str]:
    """Recursively lists all files in a directory tree.

    Args:
        dir (str): The path to the directory.
        extension (Optional[List[str] | str]): The file extension(s) to filter by.
            Can be a single extension as a string or a list of extensions.
            Defaults to None.

    Returns:
        List[str]: A list of file paths.

    Raises:
        FileNotFoundError: If the directory does not exist.

    Example:
        List all files:
        ```python
        files = oc.io.list_dir("my_folder")
        print(files)
        ```
        Output:
        ```
        ['my_folder/file1.txt', 'my_folder/subfolder/file2.log']
        ```
        ---
        List only files with a specific extension:
        ```python
        files = oc.io.list_dir("my_folder", extension="txt")
        print(files)
        ```
        Output:
        ```
        ['my_folder/file1.txt']
        ```
        ---
        List files with multiple extensions:
        ```python
        files = oc.io.list_dir("my_folder", extension=["txt", "log"])
        print(files)
        ```
        Output:
        ```
        ['my_folder/file1.txt', 'my_folder/subfolder/file2.log']
        ```
    """
    ensure_dir_exists(dir)

    if not recursive:
        return os.listdir(dir)

    file_paths: List[str] = []

    # Convert single extension to list for uniform handling
    if isinstance(extension, str):
        extensions = [extension]
    elif extension is None:
        extensions = None
    else:
        extensions = extension

    for root, _, files in os.walk(dir):
        for file in files:
            if extensions:
                if any(file.endswith(ext) for ext in extensions):
                    file_paths.append(os.path.join(root, file))
            else:
                file_paths.append(os.path.join(root, file))
    print(f"Found {len(file_paths)} files in {dir}")
    return file_paths

move_dir(src, dst, replace=False)

Moves a directory from the source to the destination.

Parameters:

Name Type Description Default
src PathLike

The source directory path.

required
dst PathLike

The destination directory path.

required
replace bool

If True, the destination directory will be deleted if it already exists. Defaults to False.

False

Raises:

Type Description
FileNotFoundError

If the source directory does not exist.

FileExistsError

If the destination directory already exists and replace is False.

Example

Move a directory:

# Assuming 'old_folder' exists
oc.io.move_dir("old_folder", "new_folder")
Output:
Directory moved from old_folder to new_folder


Replace an existing destination:

# Assuming 'old_folder' and 'new_folder' exist
oc.io.move_dir("old_folder", "new_folder", replace=True)
Output:
Replacing directory new_folder
Directory deleted: new_folder
Directory moved from old_folder to new_folder

Source code in opencrate/core/utils/io/system.py
def move_dir(src: PathLike, dst: PathLike, replace: bool = False) -> None:
    """Moves a directory from the source to the destination.

    Args:
        src (PathLike): The source directory path.
        dst (PathLike): The destination directory path.
        replace (bool): If True, the destination directory will be deleted if
            it already exists. Defaults to False.

    Raises:
        FileNotFoundError: If the source directory does not exist.
        FileExistsError: If the destination directory already exists and
            `replace` is False.

    Example:
        Move a directory:
        ```python
        # Assuming 'old_folder' exists
        oc.io.move_dir("old_folder", "new_folder")
        ```
        Output:
        ```
        Directory moved from old_folder to new_folder
        ```
        ---
        Replace an existing destination:
        ```python
        # Assuming 'old_folder' and 'new_folder' exist
        oc.io.move_dir("old_folder", "new_folder", replace=True)
        ```
        Output:
        ```
        Replacing directory new_folder
        Directory deleted: new_folder
        Directory moved from old_folder to new_folder
        ```
    """
    src_path = ensure_dir_exists(src)
    handle_replace(dst, replace)
    shutil.move(str(src_path), str(dst))
    print(f"Directory moved from {src_path} to {dst}")

path_exists(path)

Checks if a path exists (file or directory).

Parameters:

Name Type Description Default
path PathLike

The path to check.

required

Returns:

Name Type Description
bool bool

True if the path exists, False otherwise.

Example

Check if a file exists:

exists = oc.io.path_exists("example.txt")
print(exists)
Output:
True


Check if a directory exists:

exists = oc.io.path_exists("my_folder")
print(exists)
Output:
True

Source code in opencrate/core/utils/io/system.py
def path_exists(path: PathLike) -> bool:
    """Checks if a path exists (file or directory).

    Args:
        path (PathLike): The path to check.

    Returns:
        bool: True if the path exists, False otherwise.

    Example:
        Check if a file exists:
        ```python
        exists = oc.io.path_exists("example.txt")
        print(exists)
        ```
        Output:
        ```
        True
        ```
        ---
        Check if a directory exists:
        ```python
        exists = oc.io.path_exists("my_folder")
        print(exists)
        ```
        Output:
        ```
        True
        ```
    """
    return Path(path).exists()

rename(src, dst, replace=False)

Renames a file or directory.

Parameters:

Name Type Description Default
src PathLike

The current path to the file or directory.

required
dst PathLike

The new path for the file or directory.

required
replace bool

If True, the destination will be overwritten if it already exists. Defaults to False.

False

Raises:

Type Description
FileNotFoundError

If the source file or directory does not exist.

FileExistsError

If the destination already exists and replace is False.

Example

Rename a file:

# Assuming 'old_name.txt' exists
oc.io.rename("old_name.txt", "new_name.txt")
Output:
Renamed old_name.txt to new_name.txt


Replace an existing file with rename:

# Assuming 'old_name.txt' and 'new_name.txt' exist
oc.io.rename("old_name.txt", "new_name.txt", replace=True)
Output:
Replacing file new_name.txt
File deleted: new_name.txt
Renamed old_name.txt to new_name.txt

Source code in opencrate/core/utils/io/system.py
def rename(src: PathLike, dst: PathLike, replace: bool = False) -> None:
    """Renames a file or directory.

    Args:
        src (PathLike): The current path to the file or directory.
        dst (PathLike): The new path for the file or directory.
        replace (bool): If True, the destination will be overwritten if it
            already exists. Defaults to False.

    Raises:
        FileNotFoundError: If the source file or directory does not exist.
        FileExistsError: If the destination already exists and `replace` is False.

    Example:
        Rename a file:
        ```python
        # Assuming 'old_name.txt' exists
        oc.io.rename("old_name.txt", "new_name.txt")
        ```
        Output:
        ```
        Renamed old_name.txt to new_name.txt
        ```
        ---
        Replace an existing file with rename:
        ```python
        # Assuming 'old_name.txt' and 'new_name.txt' exist
        oc.io.rename("old_name.txt", "new_name.txt", replace=True)
        ```
        Output:
        ```
        Replacing file new_name.txt
        File deleted: new_name.txt
        Renamed old_name.txt to new_name.txt
        ```
    """
    src_path = Path(src)
    if not src_path.exists():
        raise FileNotFoundError(f"Source path not found: {src_path}")

    dst_path = Path(dst)
    handle_replace(dst_path, replace)
    src_path.rename(dst_path)
    print(f"Renamed {src_path} to {dst_path}")

show_files_in_dir(directory, extensions=None, depth=2, verbose=False)

Displays all files in a directory tree using Rich Tree structure.

Parameters:

Name Type Description Default
directory PathLike

The path to the directory.

required
extensions Optional[Union[str, List[str]]]

File extensions to filter by.

None
depth Optional[int]

Maximum depth to display. Defaults to 2.

2
verbose bool

If True, displays file modification time and size. Defaults to False.

False

Raises:

Type Description
FileNotFoundError

If the directory does not exist.

Example

Show all files with default depth:

oc.io.show_files_in_dir("my_folder")


Show only Python files with custom depth:

oc.io.show_files_in_dir("my_folder", extensions=".py", depth=3)


Show files with multiple extensions and unlimited depth:

oc.io.show_files_in_dir("my_folder", extensions=[".py", ".txt"], depth=None)


Show files with verbose information:

oc.io.show_files_in_dir("my_folder", verbose=True)

Source code in opencrate/core/utils/io/system.py
def show_files_in_dir(
    directory: PathLike,
    extensions: Optional[Union[str, List[str]]] = None,
    depth: Optional[int] = 2,
    verbose: bool = False,
) -> None:
    """Displays all files in a directory tree using Rich Tree structure.

    Args:
        directory (PathLike): The path to the directory.
        extensions (Optional[Union[str, List[str]]]): File extensions to filter by.
        depth (Optional[int]): Maximum depth to display. Defaults to 2.
        verbose (bool): If True, displays file modification time and size. Defaults to False.

    Raises:
        FileNotFoundError: If the directory does not exist.

    Example:
        Show all files with default depth:
        ```python
        oc.io.show_files_in_dir("my_folder")
        ```
        ---
        Show only Python files with custom depth:
        ```python
        oc.io.show_files_in_dir("my_folder", extensions=".py", depth=3)
        ```
        ---
        Show files with multiple extensions and unlimited depth:
        ```python
        oc.io.show_files_in_dir("my_folder", extensions=[".py", ".txt"], depth=None)
        ```
        ---
        Show files with verbose information:
        ```python
        oc.io.show_files_in_dir("my_folder", verbose=True)
        ```
    """
    console = Console()
    dir_path = ensure_dir_exists(directory)

    tree = Tree(f"{dir_path.name}")

    extensions_list = extensions if isinstance(extensions, (list, tuple)) else [extensions] if extensions else None
    if extensions_list:
        extensions_list = [f".{ext.lstrip('.')}" for ext in extensions_list]

    def add_files_to_tree(current_path: Path, current_tree: Tree, current_level: int = 0):
        if depth is not None and current_level >= depth:
            return

        try:
            items = sorted(current_path.iterdir(), key=lambda p: (p.is_file(), p.name.lower()))

            for item in items:
                if item.is_dir():
                    dir_name = f"{item.name}/"
                    if verbose:
                        try:
                            stat = item.stat()
                            modified_time = datetime.datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M")
                            size_str = get_size(item)
                            dir_name += f" [dim]({modified_time}, {size_str})[/dim]"
                        except (OSError, PermissionError):
                            dir_name += " [dim](Permission denied)[/dim]"
                    branch = current_tree.add(dir_name)
                    add_files_to_tree(item, branch, current_level + 1)
                elif extensions_list is None or item.suffix.lower() in extensions_list:
                    file_name = f"{item.name}"
                    if verbose:
                        try:
                            stat = item.stat()
                            modified_time = datetime.datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M")
                            size_str = get_size(item)
                            file_name += f" [dim]({modified_time}, {size_str})[/dim]"
                        except (OSError, PermissionError):
                            file_name += " [dim](Permission denied)[/dim]"
                    current_tree.add(file_name)
        except PermissionError:
            current_tree.add("Permission denied")

    add_files_to_tree(dir_path, tree)
    console.print(tree)

audio

load(path, lib='librosa', **kwargs)

Loads an audio file and returns its data and metadata.

This function provides a unified interface for loading audio using different libraries, returning a standardized dictionary containing the audio waveform and key properties.

Parameters:

Name Type Description Default
path str

The file path to the audio file.

required
lib str

The library to use for loading. Supported: "librosa", "pydub", "scipy", "soundfile", "torchaudio". Defaults to "librosa".

'librosa'
**kwargs Any

Additional keyword arguments passed to the loading function of the selected library (e.g., sr=22050 for librosa).

{}

Returns:

Type Description
Dict[str, Any]

A dictionary with the following keys:

Dict[str, Any]
  • "data" (np.ndarray): The audio waveform as a NumPy array.
Dict[str, Any]
  • "sample_rate" (int): The sample rate of the audio.
Dict[str, Any]
  • "duration" (float): The duration of the audio in seconds.
Dict[str, Any]
  • "channels" (int): The number of audio channels.
Dict[str, Any]
  • "library_object" (Any): The original object loaded by the library.

Raises:

Type Description
FileNotFoundError

If the specified file path does not exist.

ValueError

If an unsupported library is specified.

ImportError

If the required audio library is not installed.

Examples:

Load an audio file using librosa (default):
import opencrate as oc
audio_info = oc.io.audio.load("speech.wav")
print(f"Sample Rate: {audio_info['sample_rate']}")
print(f"Duration: {audio_info['duration']:.2f}s")
Load an audio file using pydub:
import opencrate as oc
audio_info = oc.io.audio.load("music.mp3", lib="pydub")
# The returned data is always a NumPy array for consistency
print(f"Waveform shape: {audio_info['data'].shape}")
Load a WAV file using scipy (fast for .wav):
import opencrate as oc
audio_info = oc.io.audio.load("speech.wav", lib="scipy")
print(f"Loaded {audio_info['duration']:.2f}s of audio.")
Load an audio file using torchaudio:
import opencrate as oc
audio_info = oc.io.audio.load("music.wav", lib="torchaudio")
print(f"Channels: {audio_info['channels']}")
Source code in opencrate/core/utils/io/audio.py
def load(path: str, lib: str = "librosa", **kwargs: Any) -> Dict[str, Any]:
    """Loads an audio file and returns its data and metadata.

    This function provides a unified interface for loading audio using different
    libraries, returning a standardized dictionary containing the audio waveform
    and key properties.

    Args:
        path (str): The file path to the audio file.
        lib (str): The library to use for loading. Supported: "librosa", "pydub", "scipy", "soundfile", "torchaudio".
            Defaults to "librosa".
        **kwargs: Additional keyword arguments passed to the loading function of the
            selected library (e.g., `sr=22050` for librosa).

    Returns:
        A dictionary with the following keys:
        - "data" (np.ndarray): The audio waveform as a NumPy array.
        - "sample_rate" (int): The sample rate of the audio.
        - "duration" (float): The duration of the audio in seconds.
        - "channels" (int): The number of audio channels.
        - "library_object" (Any): The original object loaded by the library.

    Raises:
        FileNotFoundError: If the specified file path does not exist.
        ValueError: If an unsupported library is specified.
        ImportError: If the required audio library is not installed.

    Examples:
        Load an audio file using librosa (default):
        ---
        ```python
        import opencrate as oc
        audio_info = oc.io.audio.load("speech.wav")
        print(f"Sample Rate: {audio_info['sample_rate']}")
        print(f"Duration: {audio_info['duration']:.2f}s")
        ```

        Load an audio file using pydub:
        ---
        ```python
        import opencrate as oc
        audio_info = oc.io.audio.load("music.mp3", lib="pydub")
        # The returned data is always a NumPy array for consistency
        print(f"Waveform shape: {audio_info['data'].shape}")
        ```

        Load a WAV file using scipy (fast for .wav):
        ---
        ```python
        import opencrate as oc
        audio_info = oc.io.audio.load("speech.wav", lib="scipy")
        print(f"Loaded {audio_info['duration']:.2f}s of audio.")
        ```

        Load an audio file using torchaudio:
        ---
        ```python
        import opencrate as oc
        audio_info = oc.io.audio.load("music.wav", lib="torchaudio")
        print(f"Channels: {audio_info['channels']}")
        ```
    """

    if not os.path.exists(path):
        raise FileNotFoundError(f"No such file or directory: '{path}'")

    if lib == "librosa":
        librosa = _lazy_import("librosa")
        y, sr = librosa.load(path, **kwargs)

        return {
            "data": y,
            "sample_rate": sr,
            "duration": librosa.get_duration(y=y, sr=sr),
            "channels": y.shape[0] if y.ndim > 1 else 1,
            "library_object": (y, sr),
        }

    elif lib == "pydub":
        pydub = _lazy_import("pydub")
        audio_segment = pydub.AudioSegment.from_file(path, **kwargs)
        samples = audio_segment.get_array_of_samples()

        return {
            "data": np.array(samples, dtype=np.float32),
            "sample_rate": audio_segment.frame_rate,
            "duration": audio_segment.duration_seconds,
            "channels": audio_segment.channels,
            "library_object": audio_segment,
        }

    elif lib == "scipy":
        scipy_io = _lazy_import("scipy.io.wavfile")
        sample_rate, data = scipy_io.read(path)

        return {
            "data": data,
            "sample_rate": sample_rate,
            "duration": data.shape[0] / sample_rate,
            "channels": data.shape[1] if data.ndim > 1 else 1,
            "library_object": (sample_rate, data),
        }
    elif lib == "soundfile":
        soundfile = _lazy_import("soundfile")
        data, sample_rate = soundfile.read(path, **kwargs)
        return {
            "data": data,
            "sample_rate": sample_rate,
            "duration": data.shape[0] / sample_rate,
            "channels": data.shape[1] if data.ndim > 1 else 1,
            "library_object": (sample_rate, data),
        }

    elif lib == "torchaudio":
        torchaudio = _lazy_import("torchaudio")
        waveform, sample_rate = torchaudio.load(path, **kwargs)
        # Convert torch tensor to numpy array
        data = waveform.numpy()
        # torchaudio returns (channels, samples) format, transpose to (samples, channels) for consistency
        if data.ndim > 1:
            data = data.T

        return {
            "data": data,
            "sample_rate": sample_rate,
            "duration": data.shape[0] / sample_rate,
            "channels": waveform.shape[0],
            "library_object": (waveform, sample_rate),
        }

    else:
        raise ValueError(f"Unsupported library: '{lib}'. Supported libraries are 'librosa', 'pydub', 'scipy', 'soundfile', 'torchaudio'.")

save(data, path, sample_rate, lib='soundfile', **kwargs)

Saves a NumPy array as an audio file.

Parameters:

Name Type Description Default
data ndarray

The audio waveform to save. Must be a NumPy array.

required
path str

The destination file path for the audio file.

required
sample_rate int

The sample rate of the audio data.

required
lib str

The library to use for saving. Supported: "soundfile", "scipy", "librosa", "torchaudio". Defaults to "soundfile".

'soundfile'
**kwargs Any

Additional keyword arguments to pass to the saving function.

{}

Raises:

Type Description
ValueError

If the data is not a NumPy array or an unsupported library is specified.

ImportError

If the required audio library is not installed.

IOError

If there is an error writing the file.

Examples:

Generate a sine wave and save it as a WAV file:
import opencrate as oc
import numpy as np
sr = 22050
duration = 5
frequency = 440.0
t = np.linspace(0., duration, int(sr * duration))
amplitude = np.iinfo(np.int16).max * 0.5
data = (amplitude * np.sin(2. * np.pi * frequency * t)).astype(np.int16)

oc.io.audio.save(data, "sine_wave.wav", sr, lib="soundfile")
Save using torchaudio:
import opencrate as oc
import numpy as np

# Generate some audio data
data = np.random.randn(22050)  # 1 second of random audio
oc.io.audio.save(data, "output.wav", 22050, lib="torchaudio")
Source code in opencrate/core/utils/io/audio.py
def save(
    data,
    path: str,
    sample_rate: int,
    lib: str = "soundfile",
    **kwargs: Any,
) -> None:
    """Saves a NumPy array as an audio file.

    Args:
        data (np.ndarray): The audio waveform to save. Must be a NumPy array.
        path (str): The destination file path for the audio file.
        sample_rate (int): The sample rate of the audio data.
        lib (str): The library to use for saving. Supported: "soundfile", "scipy", "librosa", "torchaudio".
            Defaults to "soundfile".
        **kwargs: Additional keyword arguments to pass to the saving function.

    Raises:
        ValueError: If the data is not a NumPy array or an unsupported library is specified.
        ImportError: If the required audio library is not installed.
        IOError: If there is an error writing the file.

    Examples:
        Generate a sine wave and save it as a WAV file:
        ---
        ```python
        import opencrate as oc
        import numpy as np
        sr = 22050
        duration = 5
        frequency = 440.0
        t = np.linspace(0., duration, int(sr * duration))
        amplitude = np.iinfo(np.int16).max * 0.5
        data = (amplitude * np.sin(2. * np.pi * frequency * t)).astype(np.int16)

        oc.io.audio.save(data, "sine_wave.wav", sr, lib="soundfile")
        ```

        Save using torchaudio:
        ---
        ```python
        import opencrate as oc
        import numpy as np

        # Generate some audio data
        data = np.random.randn(22050)  # 1 second of random audio
        oc.io.audio.save(data, "output.wav", 22050, lib="torchaudio")
        ```
    """

    output_dir = os.path.dirname(path)
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)

    if not isinstance(data, np.ndarray):
        raise ValueError(f"Input data must be a NumPy array, but got {type(data)}.")

    try:
        if lib == "soundfile":
            soundfile = _lazy_import("soundfile")
            soundfile.write(path, data, sample_rate, **kwargs)
        elif lib == "scipy":
            scipy_io = _lazy_import("scipy.io.wavfile")
            scipy_io.write(path, sample_rate, data)
        elif lib == "librosa":
            soundfile = _lazy_import("soundfile")
            # librosa uses soundfile for saving
            soundfile.write(path, data, sample_rate, **kwargs)
        elif lib == "torchaudio":
            torchaudio = _lazy_import("torchaudio")
            torch = _lazy_import("torch")

            # Convert numpy to torch tensor
            tensor_data = torch.from_numpy(data)

            # Ensure proper shape: torchaudio expects (channels, samples)
            if tensor_data.ndim == 1:
                tensor_data = tensor_data.unsqueeze(0)  # Add channel dimension
            elif tensor_data.ndim == 2 and tensor_data.shape[1] > tensor_data.shape[0]:
                # If shape is (samples, channels), transpose to (channels, samples)
                tensor_data = tensor_data.T

            torchaudio.save(path, tensor_data, sample_rate, **kwargs)
        else:
            raise ValueError(f"Unsupported library: '{lib}'. Supported libraries for saving are 'soundfile', 'scipy', 'librosa', 'torchaudio'.")
    except Exception as e:
        raise OSError(f"Failed to save audio to {path}: {e}")

checkpoint

load(path, **kwargs)

Loads a model, state dict, or pipeline, inferring the format.

This function acts as a universal loader, automatically selecting the correct loading mechanism based on the file extension. Required libraries are imported on-the-fly. For ONNX files, it returns an onnxruntime.InferenceSession ready for execution.

Parameters:

Name Type Description Default
path str

The source file path. The extension determines the format.

required
**kwargs Any

Additional keyword arguments to be passed to the underlying load function (e.g., map_location for torch.load).

{}

Returns:

Name Type Description
Any Any

The loaded object (e.g., a state_dict, Keras model, Scikit-Learn pipeline, or ONNX inference session).

Raises:

Type Description
ImportError

If the required library for the specified format is not installed.

ValueError

If the file extension is not a supported format.

FileNotFoundError

If the specified path does not exist.

Examples:

Loading PyTorch model checkpoint:

import torch.nn as nn
# First, save a checkpoint: save(model.state_dict(), "model.pt")
model = nn.Linear(10, 2)
state_dict = load("model.pt", map_location="cpu")
model.load_state_dict(state_dict)


Loading safetensors checkpoint:

import torch.nn as nn
# First, save a checkpoint: save(model.state_dict(), "model.safetensors")
model = nn.Linear(10, 2)
state_dict = load("model.safetensors", device="cpu")
model.load_state_dict(state_dict)


Loading Scikit-Learn pipeline checkpoint:

# First, save a checkpoint: save(fitted_pipe, "model.joblib")
loaded_pipeline = load("model.joblib")
# loaded_pipeline is now ready to .predict()


Loading TensorFlow/Keras model checkpoint:

# First, save a checkpoint: save(keras_model, "model.keras")
loaded_keras_model = load("model.keras")
# loaded_keras_model is now a compiled, ready-to-use model


Loading an ONNX model for inference:

import numpy as np
# First, export the model: save(pytorch_model, "model.onnx", args=...)
inference_session = load("model.onnx")
input_name = inference_session.get_inputs()[0].name
dummy_data = np.random.randn(1, 10).astype(np.float32)
result = inference_session.run(None, {input_name: dummy_data})

Source code in opencrate/core/utils/io/checkpoint.py
def load(path: str, **kwargs: Any) -> Any:
    """Loads a model, state dict, or pipeline, inferring the format.

    This function acts as a universal loader, automatically selecting the
    correct loading mechanism based on the file extension. Required libraries
    are imported on-the-fly. For ONNX files, it returns an
    `onnxruntime.InferenceSession` ready for execution.

    Args:
        path (str): The source file path. The extension determines the format.
        **kwargs (Any): Additional keyword arguments to be passed to the
            underlying load function (e.g., `map_location` for `torch.load`).

    Returns:
        Any: The loaded object (e.g., a `state_dict`, Keras model,
            Scikit-Learn pipeline, or ONNX inference session).

    Raises:
        ImportError: If the required library for the specified format is not
            installed.
        ValueError: If the file extension is not a supported format.
        FileNotFoundError: If the specified path does not exist.

    Examples:
        Loading PyTorch model checkpoint:
        ```python
        import torch.nn as nn
        # First, save a checkpoint: save(model.state_dict(), "model.pt")
        model = nn.Linear(10, 2)
        state_dict = load("model.pt", map_location="cpu")
        model.load_state_dict(state_dict)
        ```
        ---
        Loading safetensors checkpoint:
        ```python
        import torch.nn as nn
        # First, save a checkpoint: save(model.state_dict(), "model.safetensors")
        model = nn.Linear(10, 2)
        state_dict = load("model.safetensors", device="cpu")
        model.load_state_dict(state_dict)
        ```
        ---
        Loading Scikit-Learn pipeline checkpoint:
        ```python
        # First, save a checkpoint: save(fitted_pipe, "model.joblib")
        loaded_pipeline = load("model.joblib")
        # loaded_pipeline is now ready to .predict()
        ```
        ---
        Loading TensorFlow/Keras model checkpoint:
        ```python
        # First, save a checkpoint: save(keras_model, "model.keras")
        loaded_keras_model = load("model.keras")
        # loaded_keras_model is now a compiled, ready-to-use model
        ```
        ---
        Loading an ONNX model for inference:
        ```python
        import numpy as np
        # First, export the model: save(pytorch_model, "model.onnx", args=...)
        inference_session = load("model.onnx")
        input_name = inference_session.get_inputs()[0].name
        dummy_data = np.random.randn(1, 10).astype(np.float32)
        result = inference_session.run(None, {input_name: dummy_data})
        ```
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"No such file or directory: '{path}'")

    _, extension = os.path.splitext(path)

    if extension in (".pt", ".pth"):
        torch = _lazy_import("torch", "PyTorch not installed. Use 'pip install torch'.")
        return torch.load(path, **kwargs)

    elif extension == ".safetensors":
        safetensors_torch = _lazy_import(
            "safetensors.torch",
            "Safetensors not installed. Use 'pip install safetensors'.",
        )
        return safetensors_torch.load_file(path, **kwargs)

    elif extension in (".h5", ".keras"):
        tf = _lazy_import("tensorflow", "TensorFlow not installed. Use 'pip install tensorflow'.")
        return tf.keras.models.load_model(path, **kwargs)

    elif extension == ".joblib":
        joblib = _lazy_import("joblib", "Joblib not installed. Use 'pip install joblib'.")
        return joblib.load(path, **kwargs)

    elif extension == ".onnx":
        onnxruntime = _lazy_import("onnxruntime", "ONNX Runtime not installed. Use 'pip install onnxruntime'.")
        return onnxruntime.InferenceSession(path, **kwargs)

    else:
        raise ValueError(f"Unsupported file format: '{extension}'. Supported: .pt, .pth, .safetensors, .h5, .keras, .joblib, .onnx.")

save(obj, path, **kwargs)

Saves a model, state dict, or pipeline, inferring the format.

This function acts as a universal saver, automatically selecting the correct saving mechanism based on the file extension of the provided path. Required libraries are imported on-the-fly.

For ONNX export, you must provide a tuple of dummy inputs via the args keyword argument (e.g., args=(dummy_tensor,)).

Parameters:

Name Type Description Default
obj Any

The object to save (e.g., PyTorch model state_dict, Keras model, Scikit-Learn pipeline).

required
path str

The destination file path. The extension determines the saving format (e.g., .pt, .safetensors, .h5, .joblib, .onnx).

required
**kwargs Any

Additional keyword arguments to be passed to the underlying save function. For ONNX, this must include args.

{}

Raises:

Type Description
ImportError

If the required library for the specified format is not installed.

ValueError

If the file extension is not supported or if required arguments for a specific format (like args for ONNX) are missing.

Examples:

Saving PyTorch model checkpoint:

import torch.nn as nn
pytorch_model = nn.Linear(10, 2)
oc.io.save(pytorch_model.state_dict(), "model.pt")
Saving safetensors checkpoint:
import torch.nn as nn
pytorch_model = nn.Linear(10, 2)
oc.io.save(pytorch_model.state_dict(), "model.safetensors")


Saving Scikit-Learn pipeline checkpoint:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
pipe = Pipeline([("scaler", StandardScaler()), ("svc", LogisticRegression())])
oc.io.save(pipe, "model.joblib")


Saving TensorFlow/Keras model checkpoint:

import tensorflow as tf
keras_model = tf.keras.Sequential([tf.keras.layers.Dense(5)])
oc.io.save(keras_model, "model.keras")
---
Saving PyTorch model to ONNX checkpoint:
```python
import torch
import torch.nn as nn
model = nn.Linear(10, 2)
model.eval()
dummy_input = torch.randn(1, 10)
oc.io.save(
    model,
    "model.onnx",
    args=(dummy_input,),
    input_names=["input"],
    output_names=["output"],
    opset_version=11,
) # here you can add any other argument supported by torch.onnx.export

Source code in opencrate/core/utils/io/checkpoint.py
def save(obj: Any, path: str, **kwargs: Any) -> None:
    """Saves a model, state dict, or pipeline, inferring the format.

    This function acts as a universal saver, automatically selecting the
    correct saving mechanism based on the file extension of the provided path.
    Required libraries are imported on-the-fly.

    For ONNX export, you must provide a tuple of dummy inputs via the `args`
    keyword argument (e.g., `args=(dummy_tensor,)`).

    Args:
        obj (Any): The object to save (e.g., PyTorch model `state_dict`,
            Keras model, Scikit-Learn pipeline).
        path (str): The destination file path. The extension determines the
            saving format (e.g., `.pt`, `.safetensors`, `.h5`, `.joblib`, `.onnx`).
        **kwargs (Any): Additional keyword arguments to be passed to the
            underlying save function. For ONNX, this must include `args`.

    Raises:
        ImportError: If the required library for the specified format is not
            installed.
        ValueError: If the file extension is not supported or if required
            arguments for a specific format (like `args` for ONNX) are missing.

    Examples:
        Saving PyTorch model checkpoint:
        ```python
        import torch.nn as nn
        pytorch_model = nn.Linear(10, 2)
        oc.io.save(pytorch_model.state_dict(), "model.pt")
        ```
        Saving safetensors checkpoint:
        ```python
        import torch.nn as nn
        pytorch_model = nn.Linear(10, 2)
        oc.io.save(pytorch_model.state_dict(), "model.safetensors")
        ```
        ---
        Saving Scikit-Learn pipeline checkpoint:
        ```python
        from sklearn.pipeline import Pipeline
        from sklearn.preprocessing import StandardScaler
        from sklearn.linear_model import LogisticRegression
        pipe = Pipeline([("scaler", StandardScaler()), ("svc", LogisticRegression())])
        oc.io.save(pipe, "model.joblib")
        ```
        ---
        Saving TensorFlow/Keras model checkpoint:
        ```
        import tensorflow as tf
        keras_model = tf.keras.Sequential([tf.keras.layers.Dense(5)])
        oc.io.save(keras_model, "model.keras")
        ---
        Saving PyTorch model to ONNX checkpoint:
        ```python
        import torch
        import torch.nn as nn
        model = nn.Linear(10, 2)
        model.eval()
        dummy_input = torch.randn(1, 10)
        oc.io.save(
            model,
            "model.onnx",
            args=(dummy_input,),
            input_names=["input"],
            output_names=["output"],
            opset_version=11,
        ) # here you can add any other argument supported by torch.onnx.export
        ```
    """
    dir_name = os.path.dirname(path)
    if dir_name:
        os.makedirs(dir_name, exist_ok=True)

    _, extension = os.path.splitext(path)

    if extension in (".pt", ".pth"):
        torch = _lazy_import("torch", "PyTorch not installed. Use 'pip install torch'.")
        torch.save(obj, path, **kwargs)

    elif extension == ".safetensors":
        safetensors_torch = _lazy_import(
            "safetensors.torch",
            "Safetensors not installed. Use 'pip install safetensors'.",
        )
        if not isinstance(obj, dict):
            raise TypeError("For .safetensors, the object must be a state_dict (dict).")
        safetensors_torch.save_file(obj, path, **kwargs)

    elif extension in (".h5", ".keras"):
        obj.save(path, **kwargs)

    elif extension == ".joblib":
        joblib = _lazy_import("joblib", "Joblib not installed. Use 'pip install joblib'.")
        joblib.dump(obj, path, **kwargs)

    elif extension == ".onnx":
        torch = _lazy_import("torch", "PyTorch is required for ONNX export. Use 'pip install torch'.")
        _lazy_import("onnx", "ONNX is required for ONNX export. Use 'pip install onnx'.")

        if not isinstance(obj, torch.nn.Module):
            raise TypeError("ONNX export is currently supported for PyTorch models (`torch.nn.Module`).")
        if "args" not in kwargs:
            raise ValueError("ONNX export requires a dummy input. Please provide it as a tuple via the 'args' keyword argument, e.g., save(model, path, args=(dummy_input,)).")

        dummy_args = kwargs.pop("args")
        torch.onnx.export(obj, dummy_args, path, **kwargs)
    else:
        raise ValueError(f"Unsupported file format: '{extension}'. Supported: .pt, .pth, .safetensors, .h5, .keras, .joblib, .onnx.")

csv

load(path, lib='pandas', **kwargs)

Loads data from a CSV file using different libraries.

Parameters:

Name Type Description Default
path str

The path to the CSV file.

required
lib str

The library to use for loading. Defaults to "csv". - "csv": Uses Python's built-in csv module. Returns a list of lists. - "numpy": Uses NumPy to load data. Returns a NumPy array. - "pandas": Uses pandas to load data. Returns a pandas DataFrame.

'pandas'
**kwargs Any

Additional keyword arguments passed to the loading function.

{}

Returns:

Type Description
CsvDataType

Union[List[list], "np.ndarray", "pd.DataFrame"]: The loaded data.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If an unsupported library is specified.

ImportError

If the required library (NumPy or pandas) is not installed.

IOError

If there is an issue reading the file.

Examples:

# Load with csv library (default)
data_csv = load("data.csv")
# Returns: [['col1', 'col2'], ['1', '2'], ['3', '4']]

# Load with numpy
data_numpy = load("data.csv", lib="numpy", skiprows=1)
# Returns: numpy array with numeric data

# Load with pandas
data_pandas = load("data.csv", lib="pandas")
# Returns: pandas DataFrame
Source code in opencrate/core/utils/io/csv.py
def load(path: str, lib: str = "pandas", **kwargs: Any) -> CsvDataType:
    """
    Loads data from a CSV file using different libraries.

    Args:
        path (str): The path to the CSV file.
        lib (str, optional): The library to use for loading. Defaults to "csv".
            - "csv": Uses Python's built-in csv module. Returns a list of lists.
            - "numpy": Uses NumPy to load data. Returns a NumPy array.
            - "pandas": Uses pandas to load data. Returns a pandas DataFrame.
        **kwargs: Additional keyword arguments passed to the loading function.

    Returns:
        Union[List[list], "np.ndarray", "pd.DataFrame"]: The loaded data.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If an unsupported library is specified.
        ImportError: If the required library (NumPy or pandas) is not installed.
        IOError: If there is an issue reading the file.

    Examples:
        ```python
        # Load with csv library (default)
        data_csv = load("data.csv")
        # Returns: [['col1', 'col2'], ['1', '2'], ['3', '4']]

        # Load with numpy
        data_numpy = load("data.csv", lib="numpy", skiprows=1)
        # Returns: numpy array with numeric data

        # Load with pandas
        data_pandas = load("data.csv", lib="pandas")
        # Returns: pandas DataFrame
        ```
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"No such file or directory: '{path}'")

    try:
        if lib == "csv":
            with open(path, newline="", encoding="utf-8") as f:
                reader = csv.reader(f, **kwargs)
                return list(reader)

        elif lib == "numpy":
            return np.genfromtxt(path, delimiter=",", **kwargs)

        elif lib == "pandas":
            return pd.read_csv(path, **kwargs)

        else:
            raise ValueError(f"Unsupported library: {lib}. Supported libraries are 'csv', 'numpy', and 'pandas'.")

    except Exception as e:
        raise OSError(f"Failed to load CSV from {path}: {e}")

save(data, path, lib=None, **kwargs)

Saves data to a CSV file using different libraries.

The library can be specified explicitly or inferred from the data type.

Parameters:

Name Type Description Default
path str

The path where the CSV file will be saved.

required
data CsvDataType

The data to save. Can be a list of lists, a NumPy array, or a pandas DataFrame.

required
lib str

The library to use for saving. If None, it's inferred from the data type. Defaults to None. - "csv": Saves a list of lists. - "numpy": Saves a NumPy array. - "pandas": Saves a pandas DataFrame.

None
**kwargs Any

Additional keyword arguments passed to the saving function.

{}

Raises:

Type Description
ValueError

If the library is not specified and cannot be inferred, or if an unsupported library is specified.

ImportError

If the required library (NumPy or pandas) is not installed.

IOError

If there is an issue writing the file.

Examples:

# 1. Save a list of lists using 'csv'
list_data = [["col1", "col2"], [1, 2], [3, 4]]
save("list.csv", list_data)
print(os.path.exists("list.csv"))
# True

# 2. Save a NumPy array
numpy_data = np.array([[1, 2], [3, 4]])
save("numpy.csv", numpy_data, lib="numpy", fmt="%d")
print(os.path.exists("numpy.csv"))
# True

# 3. Save a pandas DataFrame
df_data = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
save("pandas.csv", df_data, index=False)
print(os.path.exists("pandas.csv"))
# True
Source code in opencrate/core/utils/io/csv.py
def save(data: CsvDataType, path: str, lib: Optional[str] = None, **kwargs: Any) -> None:
    """
    Saves data to a CSV file using different libraries.

    The library can be specified explicitly or inferred from the data type.

    Args:
        path (str): The path where the CSV file will be saved.
        data (CsvDataType): The data to save. Can be a list of lists,
            a NumPy array, or a pandas DataFrame.
        lib (str, optional): The library to use for saving. If None, it's
            inferred from the data type. Defaults to None.
            - "csv": Saves a list of lists.
            - "numpy": Saves a NumPy array.
            - "pandas": Saves a pandas DataFrame.
        **kwargs: Additional keyword arguments passed to the saving function.

    Raises:
        ValueError: If the library is not specified and cannot be inferred,
            or if an unsupported library is specified.
        ImportError: If the required library (NumPy or pandas) is not installed.
        IOError: If there is an issue writing the file.

    Examples:
        ```python

        # 1. Save a list of lists using 'csv'
        list_data = [["col1", "col2"], [1, 2], [3, 4]]
        save("list.csv", list_data)
        print(os.path.exists("list.csv"))
        # True

        # 2. Save a NumPy array
        numpy_data = np.array([[1, 2], [3, 4]])
        save("numpy.csv", numpy_data, lib="numpy", fmt="%d")
        print(os.path.exists("numpy.csv"))
        # True

        # 3. Save a pandas DataFrame
        df_data = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
        save("pandas.csv", df_data, index=False)
        print(os.path.exists("pandas.csv"))
        # True
        ```
    """
    # Infer library from data type if not provided
    if lib is None:
        if isinstance(data, list):
            lib = "csv"
        elif isinstance(data, np.ndarray):
            lib = "numpy"
        elif isinstance(data, pd.DataFrame):
            lib = "pandas"
        else:
            raise ValueError(f"Could not infer library from data type. Data type: {type(data)}, supported types are list, np.ndarray, pd.DataFrame.")

    # Ensure the output directory exists
    output_dir = os.path.dirname(path)
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)

    try:
        if lib == "csv":
            if not isinstance(data, list):
                raise TypeError("Data must be a list of lists for lib='csv'")
            with open(path, mode="w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f, **kwargs)
                writer.writerows(data)

        elif lib == "numpy":
            if not isinstance(data, np.ndarray):
                raise TypeError("Data must be a NumPy array for lib='numpy'")
            np.savetxt(path, data, delimiter=",", **kwargs)

        elif lib == "pandas":
            if not isinstance(data, pd.DataFrame):
                raise TypeError("Data must be a pandas DataFrame for lib='pandas'")
            data.to_csv(path, **kwargs)

        else:
            raise ValueError(f"Unsupported library: {lib}. Supported libraries are 'csv', 'numpy', and 'pandas'.")

    except Exception as e:
        raise OSError(f"Failed to save CSV to {path}: {e}")

gif

dir_to_gif(src_dir, output_path, fps=10)

Converts all images in a directory to a GIF.

Parameters:

Name Type Description Default
src_dir str

Directory containing image files

required
output_path str

Path where the GIF will be saved

required
fps int

Frames per second for the output GIF

10
Source code in opencrate/core/utils/io/gif.py
def dir_to_gif(src_dir: str, output_path: str, fps: int = 10) -> None:
    """
    Converts all images in a directory to a GIF.

    Args:
        src_dir: Directory containing image files
        output_path: Path where the GIF will be saved
        fps: Frames per second for the output GIF
    """
    images = []

    # Sort files to ensure correct sequence
    def natural_sort_key(s):
        return [int(text) if text.isdigit() else text.lower() for text in re.split(r"(\d+)", str(s))]

    for filename in sorted(os.listdir(src_dir), key=natural_sort_key):
        if filename.lower().endswith((".png", ".jpg", ".jpeg", ".bmp", ".tiff")):
            file_path = os.path.join(src_dir, filename)
            img = Image.open(file_path)
            images.append(img)

    if not images:
        raise ValueError(f"\n\nNo valid image files found in '{src_dir}' for creating the gif\n")

    # Use _to_gif helper function
    _to_gif(images, output_path, fps)

images_to_gif(images, output_path, fps=10)

Converts a list of images to a GIF.

Parameters:

Name Type Description Default
images List[Union[NDArray[Any], Image]]

List of images (supports numpy arrays and PIL images)

required
output_path str

Path where the GIF will be saved

required
fps int

Frames per second for the output GIF

10
Source code in opencrate/core/utils/io/gif.py
def images_to_gif(images: List[Union[npt.NDArray[Any], Image.Image]], output_path: str, fps: int = 10) -> None:
    """
    Converts a list of images to a GIF.

    Args:
        images: List of images (supports numpy arrays and PIL images)
        output_path: Path where the GIF will be saved
        fps: Frames per second for the output GIF
    """

    _to_gif(images, output_path, fps)

image

load(path, lib='pil', **kwargs)

Load an image from a file path using either PIL or OpenCV.

This function provides a unified interface for loading images using different backends (PIL or OpenCV) while handling common image formats.

Parameters:

Name Type Description Default
path str

Path to the image file to load.

required
lib str

Library to use for loading. Defaults to "pil". - "pil": Use PIL/Pillow library - "cv2": Use OpenCV library

'pil'

Returns:

Type Description
Union[NDArray[Any], Image]

Union[np.ndarray, Image.Image]: Loaded image. - When using PIL: Returns PIL Image object - When using OpenCV: Returns numpy array

Raises:

Type Description
FileNotFoundError

If the specified file path does not exist.

ValueError

If an unsupported library is specified or image cannot be loaded.

IOError

If there's an error during the image loading process.

Examples:

Load an image using PIL (default):

>>> img = load("path/to/image.jpg")

Load an image using OpenCV:

>>> img = load("path/to/image.jpg", lib="cv2")
Source code in opencrate/core/utils/io/image.py
def load(path: str, lib: str = "pil", **kwargs) -> Union[NDArray[Any], Image.Image]:
    """
    Load an image from a file path using either PIL or OpenCV.

    This function provides a unified interface for loading images using different backends
    (PIL or OpenCV) while handling common image formats.

    Args:
        path (str): Path to the image file to load.
        lib (str, optional): Library to use for loading. Defaults to "pil".
            - "pil": Use PIL/Pillow library
            - "cv2": Use OpenCV library

    Returns:
        Union[np.ndarray, Image.Image]: Loaded image.
            - When using PIL: Returns PIL Image object
            - When using OpenCV: Returns numpy array

    Raises:
        FileNotFoundError: If the specified file path does not exist.
        ValueError: If an unsupported library is specified or image cannot be loaded.
        IOError: If there's an error during the image loading process.

    Examples:
        Load an image using PIL (default):
        >>> img = load("path/to/image.jpg")

        Load an image using OpenCV:
        >>> img = load("path/to/image.jpg", lib="cv2")
    """

    if not os.path.exists(path):
        raise FileNotFoundError(f"No such file or directory: '{path}'")

    # try:
    if lib == "pil":
        return Image.open(path, **kwargs)

    elif lib == "cv2":
        img = cv2.imread(path, **kwargs)
        if img is None:
            raise ValueError(f"Could not load image from {path}")
        return img

    else:
        raise ValueError(f"Unsupported library: {lib}. Supported libraries are 'pil' and 'cv2'.")

json

CustomJSONEncoder

Bases: JSONEncoder

Custom JSON encoder to handle additional data types. - datetime.datetime and datetime.date: converted to ISO 8601 strings. - pathlib.Path: converted to strings. - set: converted to lists.

Source code in opencrate/core/utils/io/json.py
class CustomJSONEncoder(json.JSONEncoder):
    """
    Custom JSON encoder to handle additional data types.
    - datetime.datetime and datetime.date: converted to ISO 8601 strings.
    - pathlib.Path: converted to strings.
    - set: converted to lists.
    """

    def default(self, o):
        if isinstance(o, (datetime.datetime, datetime.date)):
            return o.isoformat()
        if isinstance(o, Path):
            return str(o)
        if isinstance(o, set):
            return list(o)
        return super().default(o)

load(path, encoding='utf-8', **kwargs)

Loads data from a JSON file.

This function deserializes a JSON file into a Python object. It is a wrapper around the standard json.load function.

Note

This function does not automatically convert strings back into complex types like datetime or Path. If you need to deserialize these, you can pass a custom object_hook in **kwargs.

Parameters:

Name Type Description Default
path str or Path

The path to the JSON file to load.

required
encoding str

The file encoding to use. Defaults to "utf-8".

'utf-8'
**kwargs Any

Additional keyword arguments to pass to json.load(), such as object_hook for custom deserialization.

{}

Returns:

Name Type Description
Any Any

The deserialized Python object from the JSON file. This can be a dict, list, str, int, float, bool, or None depending on the JSON content.

Raises:

Type Description
FileNotFoundError

If the specified file path does not exist.

JSONDecodeError

If the file contains invalid JSON.

OSError

If there is an issue reading from the file path.

Example
Load a standard JSON file:
import opencrate as oc

# Assuming 'user.json' contains: {"name": "John Doe"}
user_data = oc.io.json.load("user.json")
print(user_data)
# Output: {'name': 'John Doe'}
Handle a file that does not exist:
import opencrate as oc
import json as json_lib

data = oc.io.json.load("non_existent_file.json")
Custom arguments that will be passed on to the json.loads internally
import opencrate as oc
from datetime import datetime
import re

def datetime_parser(dct):
    # A simple object_hook to find and convert ISO date strings
    for k, v in dct.items():
        if isinstance(v, str) and re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+$', v):
            try:
                dct[k] = datetime.fromisoformat(v)
            except (ValueError, TypeError):
                pass  # Ignore if conversion fails
    return dct

report_data = oc.io.json.load("report.json", object_hook=datetime_parser)
print(type(report_data.get("timestamp")))
# Output: <class 'datetime.datetime'>
Source code in opencrate/core/utils/io/json.py
def load(path, encoding: str = "utf-8", **kwargs: Any) -> Any:
    """Loads data from a JSON file.

    This function deserializes a JSON file into a Python object. It is a
    wrapper around the standard `json.load` function.

    Note:
        This function does not automatically convert strings back into complex
        types like `datetime` or `Path`. If you need to deserialize these,
        you can pass a custom `object_hook` in `**kwargs`.

    Args:
        path (str or Path): The path to the JSON file to load.
        encoding (str, optional): The file encoding to use. Defaults to "utf-8".
        **kwargs (Any): Additional keyword arguments to pass to `json.load()`, such
            as `object_hook` for custom deserialization.

    Returns:
        Any: The deserialized Python object from the JSON file. This can be a
            dict, list, str, int, float, bool, or None depending on the JSON content.

    Raises:
        FileNotFoundError: If the specified file path does not exist.
        json.JSONDecodeError: If the file contains invalid JSON.
        OSError: If there is an issue reading from the file path.

    Example:
        Load a standard JSON file:
        ---
        ```python
        import opencrate as oc

        # Assuming 'user.json' contains: {"name": "John Doe"}
        user_data = oc.io.json.load("user.json")
        print(user_data)
        # Output: {'name': 'John Doe'}
        ```

        Handle a file that does not exist:
        ---
        ```python
        import opencrate as oc
        import json as json_lib

        data = oc.io.json.load("non_existent_file.json")
        ```

        Custom arguments that will be passed on to the json.loads internally
        ---
        ```python
        import opencrate as oc
        from datetime import datetime
        import re

        def datetime_parser(dct):
            # A simple object_hook to find and convert ISO date strings
            for k, v in dct.items():
                if isinstance(v, str) and re.match(r'^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+$', v):
                    try:
                        dct[k] = datetime.fromisoformat(v)
                    except (ValueError, TypeError):
                        pass  # Ignore if conversion fails
            return dct

        report_data = oc.io.json.load("report.json", object_hook=datetime_parser)
        print(type(report_data.get("timestamp")))
        # Output: <class 'datetime.datetime'>
        ```
    """

    try:
        with open(path, encoding=encoding) as file:
            return json.load(file, **kwargs)
    except (FileNotFoundError, json.JSONDecodeError, OSError):
        raise

save(data, path, encoder=None, **kwargs)

Saves data to a JSON file with extended support for additional types.

This function serializes a Python object to a JSON-formatted file. It extends the standard json.dump with a custom encoder that can handle datetime, pathlib.Path, and set objects.

Parameters:

Name Type Description Default
path str or Path

The file path where the JSON data will be saved. The directory will be created if it does not exist.

required
data Any

The Python object to serialize.

required
**kwargs Any

Additional keyword arguments to pass to json.dump(), such as indent for pretty-printing or sort_keys.

{}

Raises:

Type Description
TypeError

If the data contains an object that cannot be serialized.

OSError

If there is an issue writing to the file path.

Example
Save a simple dictionary:
import opencrate as oc

user_data = {"name": "John Doe", "email": "john.doe@example.com"}
oc.io.json.save(user_data, "user.json", indent=4)
Save data containing datetime and other types:
import opencrate as oc
from datetime import datetime
from pathlib import Path

complex_data = {
    "report_id": 123,
    "timestamp": datetime.now(),
    "source_files": {Path("/src/data.csv"), Path("/src/log.txt")},
    "status": "completed"
}
oc.io.json.save(complex_data, "report.json", indent=4, sort_keys=True)
Source code in opencrate/core/utils/io/json.py
def save(data, path, encoder: Optional[json.JSONEncoder] = None, **kwargs: Any):
    """Saves data to a JSON file with extended support for additional types.

    This function serializes a Python object to a JSON-formatted file. It extends
    the standard `json.dump` with a custom encoder that can handle `datetime`,
    `pathlib.Path`, and `set` objects.

    Args:
        path (str or Path): The file path where the JSON data will be saved.
            The directory will be created if it does not exist.
        data (Any): The Python object to serialize.
        **kwargs (Any): Additional keyword arguments to pass to `json.dump()`, such as
            `indent` for pretty-printing or `sort_keys`.

    Raises:
        TypeError: If the data contains an object that cannot be serialized.
        OSError: If there is an issue writing to the file path.

    Example:
        Save a simple dictionary:
        ---
        ```python
        import opencrate as oc

        user_data = {"name": "John Doe", "email": "john.doe@example.com"}
        oc.io.json.save(user_data, "user.json", indent=4)
        ```

        Save data containing datetime and other types:
        ---
        ```python
        import opencrate as oc
        from datetime import datetime
        from pathlib import Path

        complex_data = {
            "report_id": 123,
            "timestamp": datetime.now(),
            "source_files": {Path("/src/data.csv"), Path("/src/log.txt")},
            "status": "completed"
        }
        oc.io.json.save(complex_data, "report.json", indent=4, sort_keys=True)
        ```
    """

    os.makedirs(os.path.dirname(path), exist_ok=True)

    # Use the custom encoder if no other encoder is specified
    if "cls" not in kwargs:
        if encoder is None:
            kwargs["cls"] = CustomJSONEncoder
        else:
            kwargs["cls"] = encoder

    with open(path, "w", encoding="utf-8") as file:
        json.dump(data, file, **kwargs)

text

load(path, encoding='utf-8', default=None)

Load text content from a file.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the input file

required
encoding str

File encoding (default: utf-8)

'utf-8'
default Optional[Any]

Default value if file doesn't exist or can't be read

None

Returns:

Type Description
str

The text content from the file, or default value if specified

Raises:

Type Description
FileNotFoundError

If file doesn't exist and no default provided

Examples:

>>> content = load("input.txt")
>>> content = load(Path("data/file.txt"))
>>> content = load("missing.txt", default="")
>>> content = load("file.txt", encoding="latin-1")
Source code in opencrate/core/utils/io/text.py
def load(path: Union[str, Path], encoding: str = "utf-8", default: Optional[Any] = None) -> str:
    """Load text content from a file.

    Args:
        path: Path to the input file
        encoding: File encoding (default: utf-8)
        default: Default value if file doesn't exist or can't be read

    Returns:
        The text content from the file, or default value if specified

    Raises:
        FileNotFoundError: If file doesn't exist and no default provided

    Examples:
        >>> content = load("input.txt")
        >>> content = load(Path("data/file.txt"))
        >>> content = load("missing.txt", default="")
        >>> content = load("file.txt", encoding="latin-1")
    """
    try:
        with open(path, encoding=encoding) as file:
            return file.read()
    except FileNotFoundError:
        if default is not None:
            return default
        raise

save(data, path, encoding='utf-8')

Save data content to a file.

Parameters:

Name Type Description Default
data Any

The data content to save

required
path Union[str, Path]

Path to the output file

required
encoding str

File encoding (default: utf-8)

'utf-8'

Examples:

>>> save("Hello world", "output.txt")
>>> save("Content", Path("data/file.txt"))
>>> save("UTF-8 data", "file.txt", encoding="utf-8")
Source code in opencrate/core/utils/io/text.py
def save(
    data: Any,
    path: Union[str, Path],
    encoding: str = "utf-8",
) -> None:
    """Save data content to a file.

    Args:
        data: The data content to save
        path: Path to the output file
        encoding: File encoding (default: utf-8)

    Examples:
        >>> save("Hello world", "output.txt")
        >>> save("Content", Path("data/file.txt"))
        >>> save("UTF-8 data", "file.txt", encoding="utf-8")
    """

    os.makedirs(os.path.dirname(path), exist_ok=True)

    with open(path, "w", encoding=encoding) as file:
        file.write(str(data))

video

load(path, lib='cv2', **kwargs)

Loads a video file and returns its frames, audio, and metadata.

This function provides a unified interface for loading videos using different libraries, returning a standardized dictionary.

Parameters:

Name Type Description Default
path str

The file path to the video file.

required
lib str

The library to use for loading. Supported: "cv2", "moviepy", "torchvision", "av". Defaults to "cv2".

'cv2'
**kwargs Any

Additional keyword arguments passed to the loading function of the selected library.

{}

Returns:

Type Description
Dict[str, Any]

A dictionary with the following keys:

Dict[str, Any]
  • "frames" (List[np.ndarray]): A list of video frames as NumPy arrays (H, W, C).
Dict[str, Any]
  • "audio" (np.ndarray | None): The audio waveform as a NumPy array, or None.
Dict[str, Any]
  • "fps" (float): Frames per second of the video.
Dict[str, Any]
  • "frame_count" (int): Total number of frames in the video.
Dict[str, Any]
  • "duration" (float): Duration of the video in seconds.
Dict[str, Any]
  • "width" (int): Width of the video frames.
Dict[str, Any]
  • "height" (int): Height of the video frames.
Dict[str, Any]
  • "audio_fps" (int | None): Sample rate of the audio, or None.
Dict[str, Any]
  • "object" (Any): The original object loaded by the library.

Raises:

Type Description
FileNotFoundError

If the specified file path does not exist.

ValueError

If an unsupported library is specified.

ImportError

If the required video library is not installed.

Examples:

Load a video using OpenCV (cv2):
import opencrate as oc
video_info = oc.io.video.load("my_video.mp4", lib="cv2")
print(f"Loaded {video_info['frame_count']} frames at {video_info['fps']:.2f} FPS.")
# Note: 'cv2' does not load audio.
Load a video with audio using moviepy:
import opencrate as oc
video_info = oc.io.video.load("my_video.mp4", lib="moviepy")
if video_info['audio'] is not None:
    print(f"Audio loaded with sample rate: {video_info['audio_fps']}")
Load a video using torchvision (efficient):
import opencrate as oc
video_info = oc.io.video.load("my_video.mp4", lib="torchvision")
print(f"Loaded video of size {video_info['width']}x{video_info['height']}.")
Source code in opencrate/core/utils/io/video.py
def load(path: str, lib: str = "cv2", **kwargs: Any) -> Dict[str, Any]:
    """Loads a video file and returns its frames, audio, and metadata.

    This function provides a unified interface for loading videos using different
    libraries, returning a standardized dictionary.

    Args:
        path (str): The file path to the video file.
        lib (str): The library to use for loading. Supported: "cv2", "moviepy",
            "torchvision", "av". Defaults to "cv2".
        **kwargs: Additional keyword arguments passed to the loading function of the
            selected library.

    Returns:
        A dictionary with the following keys:
        - "frames" (List[np.ndarray]): A list of video frames as NumPy arrays (H, W, C).
        - "audio" (np.ndarray | None): The audio waveform as a NumPy array, or None.
        - "fps" (float): Frames per second of the video.
        - "frame_count" (int): Total number of frames in the video.
        - "duration" (float): Duration of the video in seconds.
        - "width" (int): Width of the video frames.
        - "height" (int): Height of the video frames.
        - "audio_fps" (int | None): Sample rate of the audio, or None.
        - "object" (Any): The original object loaded by the library.

    Raises:
        FileNotFoundError: If the specified file path does not exist.
        ValueError: If an unsupported library is specified.
        ImportError: If the required video library is not installed.

    Examples:
        Load a video using OpenCV (cv2):
        ---
        ```python
        import opencrate as oc
        video_info = oc.io.video.load("my_video.mp4", lib="cv2")
        print(f"Loaded {video_info['frame_count']} frames at {video_info['fps']:.2f} FPS.")
        # Note: 'cv2' does not load audio.
        ```

        Load a video with audio using moviepy:
        ---
        ```python
        import opencrate as oc
        video_info = oc.io.video.load("my_video.mp4", lib="moviepy")
        if video_info['audio'] is not None:
            print(f"Audio loaded with sample rate: {video_info['audio_fps']}")
        ```

        Load a video using torchvision (efficient):
        ---
        ```python
        import opencrate as oc
        video_info = oc.io.video.load("my_video.mp4", lib="torchvision")
        print(f"Loaded video of size {video_info['width']}x{video_info['height']}.")
        ```
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"No such file or directory: '{path}'")

    if lib == "cv2":
        cv2 = _lazy_import("cv2", package_name="opencv-python")
        cap = cv2.VideoCapture(path)
        if not cap.isOpened():
            raise OSError(f"Could not open video file: {path}")

        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()

        return {
            "frames": frames,
            "audio": None,  # OpenCV does not handle audio
            "fps": fps,
            "frame_count": frame_count,
            "duration": frame_count / fps if fps > 0 else 0,
            "width": width,
            "height": height,
            "audio_fps": None,
            "object": cap,
        }

    elif lib == "moviepy":
        moviepy_editor = _lazy_import("moviepy.editor")
        clip = moviepy_editor.VideoFileClip(path, **kwargs)
        frames = list(clip.iter_frames())
        audio_data = clip.audio.to_soundarray(fps=clip.audio.fps) if clip.audio else None

        result = {
            "frames": frames,
            "audio": audio_data,
            "fps": clip.fps,
            "frame_count": int(clip.duration * clip.fps),
            "duration": clip.duration,
            "width": clip.w,
            "height": clip.h,
            "audio_fps": clip.audio.fps if clip.audio else None,
            "object": clip,
        }
        clip.close()
        return result

    elif lib == "torchvision":
        torchvision = _lazy_import("torchvision")
        vframes, aframes, info = torchvision.io.read_video(path, pts_unit="sec", **kwargs)

        # Convert Tensors to NumPy arrays (C, T, H, W) -> (T, H, W, C)
        frames_np = vframes.permute(0, 2, 3, 1).numpy()
        audio_np = aframes.t().numpy() if aframes.numel() > 0 else None

        return {
            "frames": list(frames_np),
            "audio": audio_np,
            "fps": info.get("video_fps"),
            "frame_count": len(frames_np),
            "duration": len(frames_np) / info.get("video_fps", 1),
            "width": frames_np.shape[2],
            "height": frames_np.shape[1],
            "audio_fps": info.get("audio_fps"),
            "object": (vframes, aframes, info),
        }

    elif lib == "av":
        av = _lazy_import("av", package_name="av")
        with av.open(path) as container:
            video_stream = container.streams.video[0]
            frames = [frame.to_ndarray(format="rgb24") for frame in container.decode(video=0)]

            audio_data = None
            audio_fps = None
            if container.streams.audio:
                audio_stream = container.streams.audio[0]
                audio_frames = b"".join(p.to_bytes() for p in container.decode(audio_stream))
                audio_data = np.frombuffer(audio_frames, dtype=np.int16)
                audio_fps = audio_stream.rate

            return {
                "frames": frames,
                "audio": audio_data,
                "fps": float(video_stream.average_rate),
                "frame_count": len(frames),
                "duration": float(video_stream.duration * video_stream.time_base),
                "width": video_stream.width,
                "height": video_stream.height,
                "audio_fps": audio_fps,
                "object": container,
            }

    else:
        raise ValueError(f"Unsupported library: '{lib}'. Supported are 'cv2', 'moviepy', 'torchvision', 'av'.")

save(frames, path, fps, lib='cv2', audio=None, audio_fps=None, **kwargs)

Saves a sequence of frames as a video file.

Parameters:

Name Type Description Default
frames Iterable[ndarray] or ndarray

An iterable of frames (H, W, C) or a single NumPy array of shape (T, H, W, C). Frames should be in RGB format.

required
path str

The destination file path for the video file.

required
fps float

The frames per second for the output video.

required
lib str

The library to use for saving. Supported: "cv2", "moviepy", "torchvision". Defaults to "cv2".

'cv2'
audio ndarray

An optional audio track to add to the video.

None
audio_fps int

The sample rate of the audio track. Required if audio is provided.

None
**kwargs Any

Additional keyword arguments passed to the saving function. For 'cv2', fourcc can be specified (e.g., fourcc='mp4v'). For 'moviepy', codec can be specified (e.g., codec='libx264').

{}

Raises:

Type Description
ValueError

If input parameters are invalid or an unsupported library is specified.

ImportError

If the required video library is not installed.

IOError

If there is an error writing the file.

Source code in opencrate/core/utils/io/video.py
def save(
    frames: Union[Iterable[NDArray[Any]], NDArray[Any]],
    path: str,
    fps: float,
    lib: str = "cv2",
    audio: Optional[NDArray[Any]] = None,
    audio_fps: Optional[int] = None,
    **kwargs: Any,
) -> None:
    """Saves a sequence of frames as a video file.

    Args:
        frames (Iterable[np.ndarray] or np.ndarray): An iterable of frames (H, W, C)
            or a single NumPy array of shape (T, H, W, C). Frames should be in RGB format.
        path (str): The destination file path for the video file.
        fps (float): The frames per second for the output video.
        lib (str): The library to use for saving. Supported: "cv2", "moviepy",
            "torchvision". Defaults to "cv2".
        audio (np.ndarray, optional): An optional audio track to add to the video.
        audio_fps (int, optional): The sample rate of the audio track. Required if
            `audio` is provided.
        **kwargs: Additional keyword arguments passed to the saving function.
            For 'cv2', `fourcc` can be specified (e.g., `fourcc='mp4v'`).
            For 'moviepy', `codec` can be specified (e.g., `codec='libx264'`).

    Raises:
        ValueError: If input parameters are invalid or an unsupported library is specified.
        ImportError: If the required video library is not installed.
        IOError: If there is an error writing the file.
    """
    output_dir = os.path.dirname(path)
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)

    if isinstance(frames, np.ndarray) and frames.ndim == 4:
        frame_iter: Iterator[NDArray[Any]] = iter(frames)
    else:
        frame_iter = iter(frames)

    # Peek at the first frame to get dimensions
    try:
        first_frame = next(frame_iter)
        height, width, _ = first_frame.shape
        # Chain the first frame back to the iterator
        frame_iter = chain([first_frame], frame_iter)
    except StopIteration:
        raise ValueError("Cannot save an empty sequence of frames.")

    try:
        if lib == "cv2":
            cv2 = _lazy_import("cv2", package_name="opencv-python")
            fourcc_map = {
                "mp4": "mp4v",
                "avi": "XVID",
            }
            ext = os.path.splitext(path)[1][1:].lower()
            fourcc_str = kwargs.get("fourcc", fourcc_map.get(ext, "mp4v"))
            fourcc = cv2.VideoWriter_fourcc(*fourcc_str)
            writer = cv2.VideoWriter(path, fourcc, fps, (width, height))

            for frame in frame_iter:
                writer.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
            writer.release()

            if audio is not None:
                print("Warning: 'cv2' backend does not support writing audio. Audio track ignored.")

        elif lib == "moviepy":
            moviepy_editor = _lazy_import("moviepy.editor")
            clip = moviepy_editor.ImageSequenceClip(list(frame_iter), fps=fps)

            if audio is not None and audio_fps is not None:
                audio_clip = moviepy_editor.AudioFileClip.from_soundarray(audio, fps=audio_fps)
                clip = clip.set_audio(audio_clip)

            codec = kwargs.get("codec", "libx264")
            clip.write_videofile(path, codec=codec, fps=fps, **kwargs)
            clip.close()

        elif lib == "torchvision":
            torchvision = _lazy_import("torchvision")
            torch = _lazy_import("torch")
            # Convert all frames to a single tensor (T, H, W, C) -> (T, C, H, W)
            video_tensor = torch.from_numpy(np.stack(list(frame_iter))).permute(0, 3, 1, 2)
            torchvision.io.write_video(path, video_tensor, fps, **kwargs)

            if audio is not None:
                print("Warning: 'torchvision' backend does not support writing audio directly with video. Audio track ignored.")

        else:
            raise ValueError(f"Unsupported library: '{lib}'. Supported are 'cv2', 'moviepy', 'torchvision'.")
    except Exception as e:
        raise OSError(f"Failed to save video to {path}: {e}")