Getting started#
Install the scystream-sdk:
pip install scystream-sdk
Introduction#
One of the central concepts of scystream are the so-called Compute Blocks.
A Compute Block describes an independent program that acts as a worker, which will be scheduled using the scystream-core application. This worker executes a task (e.g. an NLP task, a crawling task).
This SDK provides helper function and all other requirements needed to implement a custom Compute Block.
Each worker can have multiple entry points, each aimed at solving one task. These entrypoints can be configured externally using Settings, which are essentially environment variables parsed and validated using pydantic.
You can set “global” Setting (for the entrypoint) using the envs block, or set “input/output-related” Settings using the config block in each input/output.
There are three types of input/output Settings that can be used: , PostgresSettings or CustomSettings. File and Postgres Settings do have predefined settings, such as PG_USER, PG_PASS or PG_HOST.
Basic Usage of the SDK#
from scystream.sdk.core import entrypoint
from scystream.sdk.scheduler import Scheduler
@entrypoint()
def example_task():
print("Executing example_task")
@entrypoint()
def another_task(task_name):
print(f"Executing another_task with task name: {task_name}")
def main():
"""
The Scheduler functions are primarily used by the Scheduler in scystream-core to trigger
the execution of entrypoints. However, you can also use it to trigger your functions manually.
"""
Scheduler.list_entrypoints()
Scheduler.execute_function("example_task")
Scheduler.execute_function("another_task", "Scheduled Task")
if __name__ == "__main__":
main()
How to Define and Use Settings#
Earlier, we were already talking about Settings. Each Input & Output can be configured using these settings.
There are also Global Settings, refered to as envs in the cbc.yaml
Types of Inputs and Outputs#
We provide predefined setting types that include standardized environment variable keys for common use cases. These settings are designed to simplify configuration and ensure consistency across your project.
File Settings (FileSettings
)#
Use the FileSettings
class for configurations related to file-based inputs and outputs, such as S3 file storage. It includes the following standardized environment variable keys:
S3_HOST
: The host address for the S3 service.S3_PORT
: The port for the S3 service.S3_ACCESS_KEY
: The access key for authenticating with the S3 service.S3_SECRET_KEY
: The secret key for authenticating with the S3 service.BUCKET_NAME
: The name of the S3 bucket.FILE_PATH
: The path to the file within the bucket.FILE_NAME
: The name of the file.
PostgreSQL Settings (PostgresSettings
)#
Use the PostgresSettings
class for configurations related to PostgreSQL database inputs and outputs. It includes the following standardized environment variable keys:
PG_USER
: The username for accessing the PostgreSQL database.PG_PASS
: The password for accessing the PostgreSQL database.PG_HOST
: The host address for the PostgreSQL database.PG_PORT
: The port for the PostgreSQL database.DB_TABLE
: The name of the database table.
Usage Instructions#
To use these predefined settings, simply include them in your configuration as shown in the examples below.
Important Notes#
__identifier__ Requirement: - When using
FileSettings
orPostgresSettings
, you must define an__identifier__
attribute in your input/output class. - The__identifier__
is used to prefix the environment variable keys, ensuring that they do not conflict when multiple inputs or outputs of the same type are defined. - Make sure, that the__identifier__
is unique across your project!Example:
class MyFileInput(FileSettings, InputSettings): __identifier__ = "my_file_input" # Prefixes env vars with `my_file_input_`
Optional but Recommended: - While you are not required to use these predefined settings, we strongly recommend them for file or PostgreSQL-based inputs and outputs to maintain consistency and avoid configuration errors.
Example Configuration#
Here’s an example of how to define and use these settings in your project:
Below you can find a simple example of how to extend the previously created entrypoints by settings. Therefore you should use the EnvSettings class.
from scystream.sdk.core import entrypoint
from scystream.sdk.env.settings import EnvSettings, InputSettings, OutputSettings, FileSettings, PostgresSettings
# Assuming the Input of your Task is a database table.
class ExampleTaskDBInput(PostgresSettings, InputSettings):
__identifier__ = "my_first_pg"
pass
# Assuming the Ouput of you Task is a File.
class ExampleTaskFileOutput(FileSettings, OutputSettings):
__identifier__ = "my_first_file"
pass
class CustomOutputConfigurable(OutputSettings):
FB_USER: str = "RWTH"
FB_PASS: str # this variable e.g. has to be set by in the envs, or the validation will fail
# The "global" settings for the Entrypoint
class ExampleTaskSettings(EnvSettings):
LANGUAGE: str = "de"
pg_input: ExampleTaskDBInput
file_output: ExampleTaskFileOutput
custom_output: CustomOutputConfigurable
# pass it into the Entrypoint here
@entrypoint(ExampleTaskSettings)
def example_task(settings):
print("You can use your variables now in your entrypoint.")
print(f"Look at this: {settings.pg_input.PG_USER}")
print(f"Or this: {settings.file_output.FILE_NAME}")
print("Executing example_task")
Configure the SDK#
You can configure three aspects of the SDK.
The app_name (which will be shown in the Apache Spark Control Plane)
The cb_spark_master (which defines the externally reachable URL of the Spark Master)
You can configure it like the following:
from scystream.sdk.config import SDKConfig
SDKConfig(
app_name="test_app"
cb_spark_master="local[*]"
)
print("The rest of your code.")
Compute Block Config File#
Every repository which will be used within the scystream application must contain a Compute Block Config File, the cbc.yaml, within the root directory.
This cbc.yaml will be used to define the entrypoints, the inputs & outputs each Compute Block offers, necessary for the scystream-core application to understand.
ATTENTION: When executing entrypoints, the cbc.yaml and the defined Settings will be cross-validated! So make sure the cbc.yaml is always up-to-date with your code!
For the Code we previously wrote, this is an example cbc.yaml:
name: "Example Compute Block"
description: "Contains examples"
author: "John Doe"
docker_image: "https://ghcr.io/nlp-toolbox"
entrypoints:
example_task:
description: "Run example"
envs:
LANGUAGE: "de"
inputs:
pg_input:
description: "Postgres input example"
type: "pg_table"
config:
my_first_pg_PG_USER: null
my_first_pg_PG_PASS: null
my_first_pg_PG_HOST: null
my_first_pg_PG_PORT: null
my_first_pg_DB_TABLE: null
outputs:
file_output:
type: "file"
config:
my_first_file_BUCKET_NAME: null
my_first_file_FILE_NAME: null
my_first_file_FILE_PATH: null
my_first_file_S3_ACCESS_KEY: null
my_first_file_S3_HOST: null
my_first_file_S3_PORT: null
my_first_file_S3_SECRET_KEY: null
custom_output:
description: "custom description"
type: "custom"
config:
FB_USER: "RWTH"
FB_PASS: null
Validating the Config#
You can validate you config like this:
from scystream.sdk.config import validate_config_with_code
@entrypoint
def example_entrypoint():
print("Example")
if __name__ == "__main__":
validate_config_with_code()
Generating the Config#
If you didn’t write the cbc.yaml on your own, and already have some entrypoints implemented, you can also generate the cbc.yaml automatically.
from scystream.sdk.config import generate_config_from_compute_block, get_compute_block
from pathlib import Path
@entrypoint()
def example_entrypoint():
print("Example...")
if __name__ == "__main__":
compute_block = get_compute_block()
generate_config_from_compute_block(compute_block, Path("cbc.yaml"))
Using a Database#
The SDK has utilities implemented to query to & from a postgres database. Therefore its using Apache Spark.
To interact with a database you have to do the following:
You have to create a Spark connection
scystream.sdk.spark_manager.SparkManager
Configure your Postgres connection using the
scystream.sdk.database_handling.postgres_manager.PostgresConfig
Note: You can also use
scystream.sdk.env.settings.PostgresSettings
Setup Postgres in your Spark Session
scystream.sdk.spark_manager.SparkManager.setup_pg()
Interact with the Database using
scystream.sdk.database_handling.postgres_manager
!
See a simple example here:
from scystream.sdk.spark_manager import SparkManager
from scystream.sdk.database_handling.postgres_manager import PostgresConfig
@entrypoint()
def test():
manager = SparkManager()
database_conf = PostgresConfig(
pg_user="postgres",
pg_pass="postgres",
pg_host="postgres",
pg_port=5432
)
db_conn = manager.setup_pg(database_conf)
# Use sparks dataframes
spark_df = manager.session.createDataFrame({
Row(id=1, name="test"),
Row(id=2, name="test")
})
# Write to the database
db_conn.write(
database_name="postgres",
dataframe=spark_df,
table="test",
mode="overwrite"
)
# Read from the database
read_df = db_conn.read(
database_name="postgres",
query=f"SELECT id FROM test WHERE id > 1"
)
Using a S3 Bucket#
The SDK has utilities implemented to up- & download from a S3 Bucket. Currently, it’s NOT using Apache Spark for that.
To interact with a S3 Bucket you have to do the following:
Configure the S3 Connection using the
scystream.sdk.file_handling.s3_manager.S3Config
Note: You can also use
scystream.sdk.env.settings.FileSettings
Setup the S3 Connection using the
scystream.sdk.file_handling.s3_manager.S3Operations
Use the Operations
See a simple example here:
from scystream.sdk.file_handling.s3_manager import S3Config, S3Operations
@entrypoint()
def test():
s3_conf = S3Config(
access_key="access",
secret_key="secret",
endpoint="http://localhost",
post=9000
)
s3_conn = S3Operations(s3_conf)
s3_conn.upload_file(
path_to_file="path/test.txt",
bucket_name="Example",
target_name="target_file_name.txt"
)
s3_conn.download_file(
bucket_name="Example",
s3_object_name="target_file_name.txt",
local_file_path="download.txt"
)