Source code for scystream.sdk.config.config_loader

import yaml
from typing import Union
from pydantic import ValidationError
from pathlib import Path
from scystream.sdk.config.models import ComputeBlock, Entrypoint, \
    InputOutputModel
from scystream.sdk.config.compute_block_utils import get_compute_block

CBC_CONFIG_DEFAULT_IDENTIFIER = "cbc.yaml"
UNNAMED_APP_NAME = "unnamed_compute_block"
# In production, the ComputeBlock must be within the same docker network
# as the spark-master & workers!
COMPUTE_BLOCK_SPARK_DEFAULT_MASTER = "spark://spark-master:7077"


[docs] class SDKConfig: """ Singleton class that holds the SDK configuration. This class manages the configuration for the SDK, primarily the path to the configuration file (`cbc.yaml`), the application name, and the Spark master URL for ComputeBlock communication. :param app_name: The name of the application (default: 'unnamed_compute_block'). :param cb_spark_master: The URL of the Spark master (default: 'spark://spark-master:7077'). """ _instance = None
[docs] def __new__( cls, app_name: str = UNNAMED_APP_NAME, cb_spark_master: str = COMPUTE_BLOCK_SPARK_DEFAULT_MASTER ): """ Creates or returns the singleton instance of SDKConfig. :param app_name: The name of the application. :param cb_spark_master: The URL of the Spark master. :return: The singleton SDKConfig instance. """ if cls._instance is None: cls._instance = super(SDKConfig, cls).__new__(cls) cls._instance.app_name = app_name cls._instance.cb_spark_master = cb_spark_master return cls._instance
[docs] def get_cb_spark_master(self) -> str: """ Get the Spark master URL. :return: The Spark master URL. """ return self.cb_spark_master
[docs] def set_cb_spark_master(self, spark_master: str) -> str: """ Set the Spark master URL. :param spark_master: The spark master URL with this schema: spark://url:port """
def _compare_configs( c1: Union[ComputeBlock, Entrypoint, InputOutputModel], c2: Union[ComputeBlock, Entrypoint, InputOutputModel], name="block" ): """ Compares two configurations and raises a ValueError if they don't match. :param c1: The configuration loaded from the YAML file. :param c2: The configuration generated from the code. :param name: A descriptive name for the configuration (default: "block"). :raises ValueError: If the configurations do not match. """ if c1 != c2: raise ValueError( f"The {name} configs (envs, inputs, outputs) defined\ in your config YAML do not match the settings defined\ in your code." )
[docs] def validate_config_with_code( entrypoint_name: str | None = None, config_path: str | None = None ): """ Validates that the configuration loaded from the YAML file matches the code-defined configuration for the ComputeBlock. :param entrypoint_name: Optional name of an entrypoint to validate. If provided, it will validate the specific entrypoint instead of the entire ComputeBlock configuration. :raises ValueError: If the configurations do not match. """ block_from_cfg = load_config(config_path) block_from_code = get_compute_block() if entrypoint_name: _compare_configs( block_from_cfg.entrypoints[entrypoint_name], block_from_code.entrypoints[entrypoint_name] ) else: _compare_configs(block_from_cfg, block_from_code)
[docs] def load_config(path_to_cfg: str | None) -> ComputeBlock: """ Loads and validates configuration from a YAML file. If no path is provided, attempts to load from the default location in the current working directory. :param path_to_cfg: Optional path to the configuration YAML file. :return: A ComputeBlock instance if the YAML is valid. :raises ValueError: If the config file is missing, invalid, or fails validation. """ try: if path_to_cfg: path = Path(path_to_cfg) else: path = Path.cwd() / CBC_CONFIG_DEFAULT_IDENTIFIER if not path.is_file(): raise FileNotFoundError(f"Configuration file '{path}' not found.") with path.open("r") as f: config_data = yaml.safe_load(f) return ComputeBlock(**config_data) except FileNotFoundError as e: raise FileNotFoundError(str(e)) except yaml.YAMLError as e: raise ValueError(f"Error parsing YAML configuration: {e}") except ValidationError as e: raise ValueError(f"Configuration validation error: {e}")
[docs] def generate_config_from_compute_block( compute_block: ComputeBlock, output_path: Path ): """ Generates a YAML configuration file from a ComputeBlock instance. Make sure to edit the generated yaml accordingly. :param compute_block: The ComputeBlock instance to generate the configuration from. :param output_path: The path where the YAML configuration file should be saved. """ with output_path.open("w") as file: yaml.dump(compute_block.model_dump(), file, default_flow_style=False)