Source code for scystream.sdk.config.compute_block_utils
from typing import Union
from pydantic_core import PydanticUndefinedType
from scystream.sdk.config.models import ComputeBlock, Entrypoint, \
InputOutputModel, FILE_TYPE_IDENTIFIER, PG_TABLE_TYPE_IDENTIFIER, \
CUSTOM_TYPE_IDENTIFIER
from scystream.sdk.env.settings import InputSettings, \
OutputSettings, FileSettings, PostgresSettings
from scystream.sdk.config.entrypoints import get_registered_functions
def _get_pydantic_default_value_or_none(value):
if type(value.default) is PydanticUndefinedType:
return None
return value.default
def _get_io_type(subj: Union[InputSettings, OutputSettings]) -> str:
"""
Determines the type of input/output based on the subject class.
:param subject: The settings class
:return: The determined type ("file", "pg_table", "custom")
"""
if issubclass(subj, FileSettings):
return FILE_TYPE_IDENTIFIER
elif issubclass(subj, PostgresSettings):
return PG_TABLE_TYPE_IDENTIFIER
return CUSTOM_TYPE_IDENTIFIER
def _build_input_output_dict_from_class(
subject: Union[InputSettings, OutputSettings]
):
config_dict = {}
identifier = getattr(subject, "__identifier__", None)
for key, value in subject.model_fields.items():
default_value = _get_pydantic_default_value_or_none(value)
if identifier:
key = f"{identifier}_{key}"
config_dict[key] = default_value
return InputOutputModel(
type=_get_io_type(subject),
description="<to-be-set>",
config=config_dict
)
[docs]
def get_compute_block() -> ComputeBlock:
"""
Converts Entrypoints & Settings defined in the Code
to a ComputeBlock instance.
"""
entrypoints = {}
for entrypoint, func in get_registered_functions().items():
envs = {}
inputs = {}
outputs = {}
if func["settings"]:
entrypoint_settings_class = func["settings"]
for key, value in entrypoint_settings_class.model_fields.items():
if (
isinstance(value.default_factory, type) and
issubclass(value.default_factory, InputSettings)
):
inputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
elif (
isinstance(value.default_factory, type) and
issubclass(value.default_factory, OutputSettings)
):
outputs[key] = _build_input_output_dict_from_class(
value.default_factory
)
else:
envs[key] = _get_pydantic_default_value_or_none(value)
entrypoints[entrypoint] = Entrypoint(
description="<tbd>",
envs=envs if envs != {} else None,
inputs=inputs if inputs != {} else None,
outputs=outputs if outputs != {} else None
)
return ComputeBlock(
name="<tbs>",
description="<tbs>",
author="<tbs>",
entrypoints=entrypoints,
docker_image="<tbs>"
)