Skip to content

Backend workflows

Backend (BE) Workflows is a tool for creating backend workflows as Directed Acyclic Graphs (DAGs).

How Backend workflows work

You can trigger a workflow via an API call, and the response is returned in JSON format. By default, when you trigger a BE Workflow, it runs on the foreground, meaning that it runs synchronously, and the client waits for the workflow to complete before receiving a response. However, in some cases, it may be more efficient to run the workflow as a background process, meaning that it runs asynchronously, and the client receives an immediate response indicating that the workflow has started but not necessarily completed. This can be useful for long-running workflows or workflows that don't require immediate feedback.

Example use-case

Streamlining the machine learning pipeline from reading the dataset and preprocessing data to training a model.

Directed Acyclic Graphs

In BE Workflows, workflows are represented as Directed Acyclic Graphs (DAGs). A DAG is a collection of tasks organized in a directed, non-cyclic structure where dependencies between tasks are represented by directed edges.

Tasks

Tasks in BE Workflows are individual units of work that can be executed. They are written as Python functions and have access, through a task instance “ti“, to various functions that enable them to retrieve request data, share data, get shared data, and add data to the response. Each task in a workflow has the following properties:

  • Label: The visual alias for the task.
  • Name: A required task identifier starting with a letter or underscore, followed by any combination of letters, digits, or underscores. No special characters or spaces.
  • Description: A description of the task (Optional).
  • Code: The code to be executed, including the task function (Required).
  • Enabled: A flag indicating if the task is currently enabled and should be executed (Required). You can Enable or disble it in BE workflow settings.

Each task in a BE Workflow has a single function that will be executed when the task is run. The name of the function must match the "Name" property defined in the task. The function takes a single argument, "ti," which represents the task instance.

Here's an example of task code:

def node_name(ti):
    # Code to be executed

Sharing Data between Tasks

Data can be shared between tasks using ti.push and ti.pull functions. The order of tasks should be considered while pushing and pulling data, as the workflow is a DAG.

Example of using ti.push:

def node1(ti):
    x = "hello"
    ti.push("my_variable", x)

Example of using ti.pull:

def node1(ti):
    x = ti.pull("my_variable")

A unique key must be specified when pushing data. The same key is used to pull the data in other tasks. In the examples, the key "my_variable" is used.

Background Workflow Specifics

Interceptor Function

An interceptor function can be integrated into a background backend workflow. Its primary function is to modify the request body, URL arguments, and form data before the workflow runs. It can also validate the incoming data. This opens up the possibility of augmenting the functionality of a backend workflow by intercepting an incoming request and executing additional actions before the workflow initiates. For instance, an interceptor function can be deployed to extract a file from the request form data, save it in the object store, and append the file ID to the request body. This ID can subsequently be utilized by the backend workflow to fetch the file from the object store and perform the necessary operations. Furthermore, it can validate incoming data and throw an exception if it's invalid.

To leverage an interceptor function with BE Workflows, a function named "intercept" should be defined. This function accepts a single argument, "ti" (representing the task instance), and updates URL arguments, the request body, and form data or validates received data as necessary. The following code illustrates the structure of the function:

def intercept(ti):
    # Interceptor code goes here
    # Modify the URL arguments, request body, and form data as needed
    ti.req_body['var'] = ti.req_body['var'] * 2

SSE Events

The send_user_task_sse function is specifically tailored to dispatch server-side events (SSE). It serves as a powerful tool for transmitting notifications to the end user, keeping them informed about the status of long-running tasks.

The following snippet presents a basic usage example of the send_user_task_sse function:

from common.framework.utils.events import send_user_task_sse

send_user_task_sse("Task is 50% complete")

In this illustration, a server-side event is relayed to the user related to the current task, indicating that the task is halfway complete. The send_user_task_sse function accepts a message string as a parameter, which constitutes the body of the update transmitted to the user.

Triggering a workflow

The workflow can be triggered by making a REST API call to the specified endpoint:

https://<origin>/api/app/be_workflows/be_wf_graph/id/<workflow_id>/call

The HTTP method (GET, POST, PUT, or DELETE) of the request must be specified when triggering the workflow. Query parameters and/or request body and/or form data can be included in the API call, depending on the requirement of the workflow.

Request

Workflow URL args

BE Workflows allows for the use of request url arguments in workflows. The schema for query parameters must be defined using the following properties.

  • Label: Identifier of the parameter
  • Name: Name (label) of the parameter
  • Desc: Description of the parameter
  • Type: Type of the parameter (number, string, etc.)
  • Options: List of accepted values (if type is enum)
  • Default: Default value for the parameter (if not required)
  • Required: Specify if the param is required
  • Validation Regex: Validation regex for the parameter (if type is string)

In the API call, url arguments are included after the "?" in the URL:

https://<origin>/api/app/be_workflows/be_wf_graph/id/<workflow_id>/call?age=30&color=red

Then they can be obtained in the task code as follows:

def node_name(ti):
    color = ti.url_params['color']
    cut = ti.url_params['cut']

Workflow request body

The request body can be used and defined in workflows, just like the url arguments. The schema for the request body must be defined in the same way as the url arguments.

The request body can be accessed in a task function using ti.req_body:

def node_name(ti):
    color = ti.req_body['color']
    cut = ti.req_body['cut']

Here's an example API call that includes a request body:

https://<origin>/api/app/be_workflows/be_wf_graph/id/<workflow_id>/call
{
    "age": 30,
    "color": "red"
}

Workflow form data

The form data can be used and defined in workflows, just like url arguments and request body. The schema for the request body must be defined in the same way.

The request body can be accessed in a task function using ti.req_form:

def node_name(ti):
    color = ti.req_form['color']
    cut = ti.req_form['cut']

Response

The workflow response is composed of multiple response items, each containing two parts: "data" and "meta". The "data" part of each response item holds the response data, while the "meta" part contains metadata related to the returned data.

Response Item

Each response item within the response represents a specific set of data. It is uniquely identified by a key, which serves as a reference to access its corresponding "data" and "meta" parts. Users must define the schema for each response item to ensure the integrity of the data within.

The schema for each response item can be defined using the following properties:

  • Label: Identifier of the parameter.
  • Name: Name (label) of the parameter.
  • Desc: Description of the parameter.
  • Type: Type of the parameter (number, string, etc.).
  • Options: List of accepted values (if type is enum).
  • Required: Specify if the param is required.
  • Validation Regex: Validation regex for the parameter (if type is string).

Example of a workflow Response

{
    "data": {
        "response_element_key1": [{...}, {...}],
        "response_element_key2": [{...}, {...}, {...}]},
    "meta": {
        "response_element_key1": {...},
        "response_element_key2": {...}}
    }
}

In the above example, the response consists of multiple response items, each identified by the keys "response_item_key1" and "response_item_key2". Each response item has its "data" and "meta" parts, and the data within should adhere to the predefined schema for that item.

Adding Data and Metadata to the Response

To add data and metadata to the response, you can use the ti.response_data and ti.response_meta functions, respectively. Below is an example:

def node1(ti):
    # Adding data to response
    # data here must be a list of objects
    # each object must respect the predefined schema of "response_element_key1"
    ti.response_data("response_element_key1", data)
    # Adding meta to response
    ti.response_meta("response_element_key1", {"count": 100)

For each response element in the "data" section, the data should be provided as a list of objects, and each object must conform to the predefined schema of its corresponding response element.