Source code for scystream.sdk.scheduler

from scystream.sdk.config.entrypoints import get_registered_functions
from scystream.sdk.config import validate_config_with_code


[docs] class Scheduler:
[docs] @staticmethod def list_entrypoints(): """List all registered entrypoint functions.""" functions = get_registered_functions() for name in functions: print(f"'{name}' is available as an entrypoint.")
[docs] @staticmethod def execute_function(name, *args, **kwargs): """ Validate the in code defined entrypoints with the settings defined in the cfg file """ validate_config_with_code(entrypoint_name=name) functions = get_registered_functions() if name in functions: return functions[name]["function"](*args, **kwargs) else: raise Exception(f"No entrypoint found with the name: {name}")