Introduction

Our AI batch inference algorithm and inference orchestration platform is designed for companies specializing in competitive research, sentiment analysis, media monitoring, social listening, lead generation, and sales generation. This documentation outlines the features, benefits, and usage of our platform.

Quicskstart

When you are logged in, navigate to Flows in the left menu, then click the “Create” button and paste the following configuration to create your first flow with batch inference:

id: batch_inference
namespace: mymagic
description: Batch inference quickstart

labels:
env: dev
project: my-project

inputs:
- id: api_key
type: STRING
defaults: "<your api key>"

- id: model
type: STRING
defaults: "llama2_70b"

- id: system_prompt
type: STRING
defaults: "You are a helpful AI assistant"

- id: prompt
type: STRING
defaults: "Categorize it into one of these categories:"

- id: categories
type: ARRAY
itemType: STRING
defaults: ["category 1", "category 2", "category 3"]

- id: session
type: STRING
defaults: "my-session"

- id: max_tokens
type: INT
defaults: 10

- id: role_arn
type: STRING
defaults: "<arn role created during aws setup>"

- id: region
type: STRING
defaults: "<your s3 bucket region>"

- id: input_json_file
type: STRING
defaults: "<your input json file name>"

- id: bucket_name
type: STRING
defaults: "<your s3 bucket name>"

- id: aws_access_key
type: STRING
defaults: "<your aws access key>"

- id: aws_secret_key
type: STRING
defaults: "<your aws secret key>"

tasks:
- id: Categorization
type: ai.mymagic.plugin.core.flow.Sequential
tasks:
- id: Categorize
  type: ai.mymagic.plugin.core.http.Request
  uri: https://fastapi.mymagic.ai/v1/completions
  method: POST
  headers:
    Content-Type: application/json
    Authorization: Bearer {{ inputs.api_key }}
  contentType: application/json
  body: |-
    {
      "model": "{{ inputs.model }}",
      "system_prompt":  "{{ inputs.system_prompt }}",
      "question": "'{{ inputs.prompt }}' '{{ inputs.categories | join("', '") }}'",
      "storage_provider": "s3",
      "bucket_name": "{{ inputs.bucket_name }}",
      "session": "{{ inputs.session }}",
      "max_tokens": {{ inputs.max_tokens }},
      "role_arn": "{{ inputs.role_arn }}",
      "region": "{{ inputs.region }}",
      "input_json_file": "{{ inputs.input_json_file }}"
    }

- id: Run_Job
  type: ai.mymagic.plugin.scripts.shell.Script
  taskRunner:
    type: ai.mymagic.plugin.core.runner.Process
  script: |
      START_TIME=$(date +%s)
      MAX_DURATION=3600  # 1 hour in seconds

      while true; do
        CURRENT_TIME=$(date +%s)
        ELAPSED_TIME=$((CURRENT_TIME - START_TIME))
        
        if [ $ELAPSED_TIME -ge $MAX_DURATION ]; then
          echo "Timeout reached. Exiting."
          exit 1
        fi

        RESPONSE=$(curl -s "https://fastapi.mymagic.ai/get_result/{{ json(outputs.Categorize.body).task_id }}")
        STATUS=$(echo "$RESPONSE" | grep -o '"status":"[^"]*"' | cut -d'"' -f4)
        
        echo "Current status: $STATUS"
        
        if [ "$STATUS" = "SUCCESS" ]; then
          echo "Job completed successfully."
          exit 0
        elif [ "$STATUS" = "FAILED" ]; then
          echo "Job failed."
          exit 2
        else
          echo "Job still in progress. Waiting..."
          sleep 20
        fi
      done

- id: Read_Output
  type: ai.mymagic.plugin.scripts.shell.Script
  taskRunner:
    type: ai.mymagic.plugin.core.runner.Process
  env:
    AWS_CONFIG_FILE: "aws_config"
    AWS_SHARED_CREDENTIALS_FILE: "aws_credentials"
  inputFiles:
    aws_config: |
      [profile company-role]
      role_arn = {{ inputs.role_arn }}
      source_profile = default
    aws_credentials: |
      [default]
      aws_access_key_id = {{ inputs.aws_access_key }}
      aws_secret_access_key = {{ inputs.aws_secret_key }}
  outputFiles:
    - latest_file.json 
  script: |
    #!/bin/bash
    set -e
    
    aws configure list
    
    aws s3 ls s3://{{ inputs.bucket_name }}/{{ inputs.api_key }}/{{ inputs.session }}/ --profile company-role
  
    LATEST_FILE=$(aws s3 ls s3://{{ inputs.bucket_name }}/{{ inputs.api_key }}/{{ inputs.session }}/ --profile company-role | sort -r | head -n 1 | awk '{print $4}')
    
    if [ -z "$LATEST_FILE" ]; then
      echo "No file found."
      exit 1
    fi
    
    aws s3 cp s3://{{ inputs.bucket_name }}/{{ inputs.api_key }}/{{ inputs.session }}/$LATEST_FILE ./latest_file.json --profile company-role

Key Features and Benefits

1. Workflow Creation and Execution

  • Create and run workflows using a simple, declarative configuration in the embedded code editor.
  • User-friendly interface for creating, editing, and running workflows without local development setup.

2. Error Detection and Debugging

  • Quickly discover and troubleshoot errors in your workflows.
  • Real-time monitoring of workflow and individual task progress.
  • Full-text search of workflow execution logs for efficient troubleshooting.

3. Language Flexibility

  • Code in any language using our plugin system.
  • Support for Java-based plugins and scripts in Python, R, Julia, Ruby, Shell, PowerShell, and Node.js.

4. Event-Driven Processing

  • File detection triggers for processing new files as they arrive.
  • Automate workflow executions based on real-time events such as database updates, file uploads, or message queue activities.

5. Modular Design

  • Define default task or plugin configurations.
  • Use subflows to eliminate boilerplate code and improve maintainability.

6. Data Validation and Type Safety

  • Ensure incoming and outgoing data adheres to schema and quality constraints.
  • Declare and enforce data types for inputs and outputs, with automatic error raising for mismatches.

7. Flexible Deployment Options

  • Deploy serverlessly or on hybrid cloud infrastructure.
  • Support for on-premise installations, cloud-based VMs, and Kubernetes clusters.

8. Performance Monitoring

  • Built-in dashboards and metrics accessible directly from the user interface.
  • (Coming soon) Configurable alerts for workflow completion, delays, or failures.

9. Data Privacy and Transfer

  • In-built internal storage for secure data transfer between tasks.

10. Cost-Effective LLM Usage

  • Access to open-source LLMs for batch jobs, offering up to 50% cost savings compared to alternatives like TogetherAI or FireworksAI.

Workflow Design and Management

YAML Configuration

  • Simple YAML interface for defining workflows.
  • Built-in syntax validation to catch configuration errors early.
  • Suitable for both AI developers/scientists and domain experts.

Visual Workflow Management

  • Live topology view to visualize dependencies between processes, systems, and tasks.
  • Easy configuration of sequential, parallel, or conditional task execution.
  • Simplified management of task dependencies through intuitive ordering.

Automated Execution

  • Schedule workflows to run automatically at specific intervals (daily, weekly, monthly).
  • Event-based triggering for real-time responsiveness to data changes or system events.

Flow

Table of Contents

A flow serves as a container for tasks and their orchestration logic, defining the execution order and methods.

Flow Essentials

A flow is a comprehensive container that manages tasks, their inputs, outputs, error handling, and overall orchestration logic. It specifies how tasks are executed, whether sequentially, in parallel, or based on the status of upstream tasks and their dependencies.

Flows are defined declaratively using YAML files.

Essential flow components include:

  • A unique identifier (id)
  • A namespace
  • A list of tasks to be executed

Optional flow components can include:

  • Input and output specifications
  • Dynamic variables
  • Event triggers
  • Descriptive labels
  • Default plugin settings
  • Error handling strategies
  • Retry mechanisms
  • Timeout settings
  • Concurrency controls
  • Descriptive text
  • Activation status
  • Version number

Flow Example

Here’s an illustrative flow definition that showcases various features:

id: greetings
namespace: organization.department

description: A sample flow **demonstrating** various *features*

labels:
  environment: production
  team: data-science

inputs:
  - id: user-name
    type: STRING
    required: false
    defaults: "Guest"
    description: Optional name input for personalized greeting

variables:
  greeting: "Hello"
  message: "{{vars.greeting}}, {{inputs.user-name}}!"

tasks:
  - id: display-greeting
    type: ai.mymagic.plugin.core.debug.Return
    description: "Task to **output** the greeting *message*"
    format: "{{vars.message}} The current time is {{taskrun.startDate}}"

pluginDefaults:
  - type: ai.mymagic.plugin.core.log.Log
    values:
      level: INFO

Plugin Configuration Defaults

The pluginDefaults section allows you to set default properties for specific task types within your flow. This feature helps reduce repetition when using the same task type multiple times with similar configurations.

Dynamic Variables

Flow variables can be defined to store values accessible by all tasks using the syntax {{ vars.key }}. These variables are stored as key-value pairs and can be used to share information across different parts of the flow.

Task Sequence

The core of a flow is its task list, which defines the sequence of operations to be executed when the flow runs.

Flow Activation Control

By default, all flows are active and will execute based on their defined triggers or when manually initiated. However, you have the option to deactivate a flow, which can be useful for troubleshooting or maintenance purposes.

Task Types

Tasks in MyMagic fall into two main categories:

  1. Computational Tasks: These handle resource-intensive operations like file system interactions, API calls, or database queries. They are managed by worker nodes to ensure efficient resource utilization.

  2. Control Flow Tasks: These manage the flow’s logic, such as branching, grouping, or parallel processing. They are lightweight and handled by the executor, capable of frequent calls without significant computational overhead.

Categorization with Labels

Labels are key-value pairs that can be attached to flows for organizational purposes. They can be used to filter and sort flows in the user interface, making it easier to manage large numbers of flows.

Input Parameters

Inputs are strongly-typed parameters that can be passed to a flow at runtime. They can be optional or required, and can have default values. Input validation rules ensure data integrity before flow execution.

File-type inputs are automatically uploaded to MyMagic’s internal storage and made available to all tasks within the flow.

Task and Flow Outputs

Tasks and flows can produce outputs with multiple properties. These outputs are documented in the plugin specifications and can be accessed by subsequent tasks using expressions.

Certain output types are automatically stored in MyMagic’s internal storage and made available to all tasks. File outputs can be downloaded directly from the user interface.

Version Management

Each modification to a flow creates a new revision, incrementing the version number. MyMagic maintains a complete history of all flow revisions, providing built-in version control functionality.

Event-Based Initiation

Triggers allow flows to be started based on external events, such as scheduled times, webhooks, file creation, or messages in a message broker.

Dynamic Flow Information

Flows have access to various dynamic expressions that provide runtime information. For example:

ExpressionDescription
{{ flow.id }}The flow’s unique identifier
{{ flow.namespace }}The flow’s namespace
{{ flow.tenantId }}The tenant identifier (Coming Soon)
{{ flow.revision }}The current flow revision number

For a complete list of available expressions, please refer to our documentation.

Common Questions

Flow Storage in MyMagic

MyMagic stores flows in a serialized format within its backend database. (Coming soon) New flows can be added through the MyMagic user interface, or automatically via Git synchronization or CI/CD pipelines.

To visualize the file structure of flows, you can explore the _flows directory in the Namespace Files editor.

Tasks

Table of Contents

Tasks represent the individual steps within a flow. They are capable of accepting inputs and variables from the flow, and generating outputs for subsequent use by end users and other tasks.

Flowable Tasks

MyMagic orchestrates your flows using Flowable Tasks. These tasks don’t perform heavy computations but instead manage the flow’s orchestration behavior, enabling more sophisticated workflow patterns. Flowable tasks control the orchestration logic — run tasks or subflows in parallel, create loops and conditional branching.

Overview

  • Flowable Tasks don’t run heavy operations — those are handled by workers.
  • They are used for branching, grouping, running tasks in parallel, and more.
  • Flowable Tasks use expressions from the execution context to define the next tasks to run.

Types of Flowable Tasks

Sequential

Processes tasks one after another sequentially. Used to group tasks.

id: sequential
namespace: company.team

tasks:
  - id: sequential
    type: ai.mymagic.plugin.core.flow.Sequential
    tasks:
      - id: 1st
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.startDate}}"

      - id: 2nd
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.id}}"

  - id: last
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

You can access the output of a sibling task using the syntax {{outputs.sibling.value}}.

Parallel

Processes tasks in parallel. Convenient for processing many tasks at once.

id: parallel
namespace: company.team

tasks:
  - id: parallel
    type: ai.mymagic.plugin.core.flow.Parallel
    tasks:
      - id: 1st
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.startDate}}"

      - id: 2nd
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.id}}"

  - id: last
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

You cannot access the output of a sibling task as tasks will be run in parallel.

Switch

Processes a set of tasks conditionally depending on a contextual variable’s value.

id: switch
namespace: company.team

inputs:
  - id: param
    type: BOOLEAN

tasks:
  - id: decision
    type: ai.mymagic.plugin.core.flow.Switch
    value: "{{ inputs.param }}"
    cases:
      true:
        - id: is_true
          type: ai.mymagic.plugin.core.log.Log
          message: "This is true"
      false:
        - id: is_false
          type: ai.mymagic.plugin.core.log.Log
          message: "This is false"
If

Processes a set of tasks conditionally depending on a condition. The else branch is optional.

id: if-condition
namespace: company.team

inputs:
  - id: param
    type: BOOLEAN

tasks:
  - id: if
    type: ai.mymagic.plugin.core.flow.If
    condition: "{{ inputs.param }}"
    then:
      - id: when-true
        type: ai.mymagic.plugin.core.log.Log
        message: "This is true"
    else:
      - id: when-false
        type: ai.mymagic.plugin.core.log.Log
        message: "This is false"
EachSequential

Generates many tasks at runtime depending on the value of a variable. Each subtask will run after the others sequentially.

id: each_example
namespace: company.team

tasks:
  - id: each
    type: ai.mymagic.plugin.core.flow.EachSequential
    value: ["value 1", "value 2", "value 3"]
    tasks:
      - id: 1st
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"

      - id: 2nd
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"

  - id: last
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{task.id}} > {{taskrun.startDate}}"

You can access the output of a sibling task using the syntax {{outputs.sibling[taskrun.value].value}}.

EachParallel

Same as EachSequential, but each subtask will run in parallel.

ForEachItem

Allows you to iterate over a list of items and run a subflow for each item or batch of items.

AllowFailure

Allows child tasks to fail without stopping the execution.

Fail

Fails the flow; can be used with or without conditions.

Subflow

Triggers another flow, allowing decoupling and individual monitoring.

WorkingDirectory

Runs all nested tasks in the same working directory, allowing reuse of previous tasks’ output files.

Pause

Adds manual validation or wait time before continuing execution.

DAG (Directed Acyclic Graph)

Defines dependencies between tasks using a directed acyclic graph structure.

Best Practices

  1. Use Sequential for tasks that must run in a specific order.
  2. Leverage Parallel for independent tasks to improve execution speed.
  3. Utilize Switch and If for conditional logic in your workflows.
  4. EachSequential and EachParallel are powerful for dynamic task generation.
  5. Use WorkingDirectory for compute-intensive file system operations.
  6. Implement Pause for manual validations or timed delays in your workflow.
  7. Leverage DAG for complex task dependencies.

By effectively using these Flowable Tasks, you can create sophisticated, efficient, and flexible workflows in MyMagic.

Runnable tasks

In MyMagic, most data processing workloads are executed using Runnable tasks.

Unlike Flowable Tasks, Runnable tasks are responsible for performing the actual work. This includes operations such as file system interactions, API calls, database queries, etc. These tasks can be computationally intensive and are managed by worker nodes.

Examples of Runnable tasks include:

  • ai.mymagic.plugin.scripts.python.Commands
  • ai.mymagic.plugin.core.http.Request
  • ai.mymagic.plugin.notifications.slack.SlackExecution

Key Characteristics

  • Each task must have an identifier (id) and a type.
  • The type is the task’s Java Fully Qualified Class Name (FQCN).
  • Tasks have properties specific to their type; check each task’s documentation for available properties.
  • Most available tasks are Runnable Tasks, except for special Flowable Tasks.

Availability

By default, MyMagic only includes a few Runnable Tasks. However, many are available as plugins, and if you use the default Docker image, many will already be included.

Example

In this example, we have 2 Runnable Tasks: one which makes an HTTP request and another that logs the output of that request.

id: runnable_http
namespace: company.team

tasks:
  - id: make_request
    type: ai.mymagic.plugin.core.http.Request
    uri: https://reqres.in/api/products
    method: GET
    contentType: application/json

  - id: print_status
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ outputs.make_request.body }}"

Common Types of Runnable Tasks

  1. File System Operations: Tasks for reading, writing, and manipulating files.
  2. API Calls: Tasks for making HTTP requests to external services.
  3. Database Queries: Tasks for interacting with various database systems.
  4. Data Transformation: Tasks for processing and transforming data.
  5. Shell Commands: Tasks for executing shell scripts or commands.

Best Practices

  1. Error Handling: Implement proper error handling for Runnable Tasks, especially for external operations like API calls or database queries.
  2. Resource Management: Be mindful of resource usage, particularly for compute-intensive tasks.
  3. Idempotency: When possible, design tasks to be idempotent to handle retries and failures gracefully.
  4. Logging: Use appropriate logging levels to aid in debugging and monitoring.
  5. Timeouts: Set reasonable timeouts for tasks that interact with external systems.

Considerations

  • Runnable Tasks are executed by workers, which can be distributed across multiple machines for scalability.
  • The performance of Runnable Tasks can impact the overall execution time of your workflow.
  • Some Runnable Tasks may require specific configurations or dependencies, so check the documentation for each task type.

By effectively utilizing Runnable Tasks, you can create powerful and flexible data processing workflows in MyMagic, capable of handling a wide range of operations from simple file manipulations to complex API integrations and data transformations.

Essential Task Attributes

The following table outlines the core attributes available to all tasks:

AttributeDescription
idA unique identifier for the task within a flow
typeThe Java Fully Qualified Class Name of the task
descriptionA brief explanation of the task’s purpose
retrySpecifications for task retry behavior
timeoutMaximum task duration, expressed in ISO 8601 format
disabledWhen set to true, the task will not be executed
logLevelDefines the granularity of logs to be stored in the backend database
allowFailureWhen set to true, allows the flow to continue even if this task fails

Configurable vs. Fixed Task Properties

Task properties can be either configurable or fixed. Configurable properties can be set using expressions, allowing for dynamic values.

Some task properties are marked as “not configurable” because they serve as containers for other configurable properties. For instance, consider the runTasks property of the Databricks SubmitRun task. This property isn’t configurable because it’s an array of RunSubmitTaskSetting objects.

Furthermore, RunSubmitTaskSetting is a group of other properties that are either configurable or complex types themselves (serving as containers for other properties). It’s therefore important to examine properties at their most granular level — most properties at this level are configurable and can be templated using expressions.

Namespaces

Namespaces serve as logical groupings for flows, akin to folders in a file system. They play a crucial role in organizing workflows and managing access to secrets, plugin defaults, and variables.

Nested Namespace Structure

Utilizing the dot (.) symbol, you can create a hierarchical structure for your namespaces. This allows for logical separation of environments, projects, teams, and departments. For instance, product, engineering, marketing, finance, and data teams can all utilize the same MyMagic instance while maintaining organized and separate flows. Various stakeholders can have their own child namespaces nested within a parent namespace, grouped by environment, project, or team.

Namespace Naming Conventions

A namespace name can consist of alphanumeric characters, optionally separated by dots (.). The depth of the namespace hierarchy is unlimited. Here are some examples:

  • project_alpha
  • corporation.project_beta
  • corporation.division.project_gamma

Organizing Flows and Files with Namespaces

When creating a flow, you can assign it to a specific namespace:

id: greetings
namespace: corporation.division
tasks:
  - id: log_message
    type: ai.mymagic.plugin.core.log.Log
    message: Hello from {{ flow.namespace }}

Note: Once a flow is saved, its namespace cannot be changed. To move a flow to a different namespace, you’ll need to create a new flow.

This namespace assignment provides improved organization and filtering capabilities. Additionally, you can organize your code at the namespace level using the embedded Code editor and Namespace Files, with (Coming Soon) the option to synchronize these files from Git.

Namespace Interface

You can access detailed information about any namespace by clicking on its name or the details button to the right of the namespace.

Selecting the details button for a namespace opens the namespace overview page, which provides information about the executions of flows within that namespace.

The namespace overview page contains several tabs:

  1. Overview: The default landing page for the namespace. It contains dashboards and summaries of the executions of different flows within the namespace.

  2. Editor: An built-in editor for adding or modifying namespace files.

  3. Flows: Displays all flows in the namespace, providing brief information about each flow, including the flow ID, labels, last execution date and status, and execution statistics. You can navigate to a specific flow’s page by selecting the details button on the right of the flow.

  4. Dependencies: Shows the relationships between flows, such as dependencies through Subflows or Flow Triggers.

  5. KV Store: Allows management of key-value pairs associated with this namespace. More details on KV Store can be found in the dedicated documentation.

Executions

An execution represents a single run of a flow in a specific state. This section covers how to run your flows and interpret the results.

Task Run

A task run is an individual execution of a task within a flow execution. Each task run is associated with:

  • Execution ID
  • State
  • Start Date
  • End Date
  • Attempts

Most task runs have a single attempt, but you can configure retries. If retries are set up, a task failure will generate new attempts until reaching the retry maxAttempt or maxDuration threshold.

Outputs

Tasks can generate output data usable by other tasks in the current flow execution. These outputs can be variables or files stored in MyMagic’s internal storage. Output details are available in each task’s documentation and can be viewed in the Outputs tab of the Execution page.

Metrics

Tasks can expose metrics useful for understanding their internal workings, such as file size, number of returned rows, or query duration. Available metrics for a task type are listed in its documentation and can be viewed in the Metrics tab of the Executions page.

Here’s an example of a flow generating metrics:

id: data_load_to_bigquery
namespace: company.team

tasks:
  - id: http_download
    type: ai.mymagic.plugin.core.http.Download
    uri: https://huggingface.co/datasets/mymagic/datasets/raw/main/csv/orders.csv

  - id: load_biqquery
    type: ai.mymagic.plugin.gcp.bigquery.Load
    description: Load data into BigQuery
    autodetect: true
    csvOptions:
      fieldDelimiter: ","
    destinationTable: mymagic-dev.demo.orders
    format: CSV
    from: "{{ outputs.http_download.uri }}"

Execution States

An Execution or a Task Run can be in various states:

StateDescription
CREATEDWaiting to be processed, usually in a queue
RUNNINGCurrently being processed
PAUSEDPaused for manual validation or a specified delay
SUCCESSCompleted successfully
WARNINGExhibited unintended behavior but continued with a warning
FAILEDFailed due to unintended behavior
KILLINGIn the process of being terminated
KILLEDTerminated upon request
RESTARTEDTransitive state for a previously failed, restarted flow
CANCELLEDAborted due to reaching a defined concurrency limit set to CANCEL
QUEUEDOn hold due to reaching a defined concurrency limit set to QUEUE
RETRYINGCurrently being retried
RETRIEDStopped due to unintended behavior and created a new execution based on flow-level retry policy

Execution Expressions

You can use various execution expressions in your flow:

ParameterDescription
{{ execution.id }}Unique ID for each execution
{{ execution.startDate }}Start date of the current execution
{{ execution.originalId }}Original execution ID, unchanged even after replays

Executing a Flow

From the UI

Manually trigger a flow by clicking the Execute button on the flow’s page in the MyMagic UI.

Using Automatic Triggers

  • Add a Schedule trigger for regular time-based executions
  • Use a Flow trigger to launch based on another flow’s completion
  • Implement a Webhook trigger for HTTP request-based executions

Via API Call

Trigger a flow execution by calling the API directly:

curl -X POST \
http://localhost:8080/api/v1/executions/company.team/hello_world

You can also execute a specific revision or include inputs:

curl -X POST \
http://localhost:8080/api/v1/executions/company.team/hello_world?revision=2 \
-F greeting="hey there"

Using Python

Use the mymagic Python package to execute flows:

from mymagic import Flow
flow = Flow()
flow.execute('company.team', 'hello_world', {'greeting': 'hello from Python'})

ForEachItem Execution

The ForEachItem task allows iteration over a list of items, running a subflow for each item or batch:

id: each_example
namespace: company.team
tasks:
  - id: each
    type: ai.mymagic.plugin.core.flow.ForEachItem
    items: "{{ inputs.file }}"
    inputs:
      file: "{{ taskrun.items }}"
    batch:
      rows: 4
      bytes: "1024"
      partitions: 2
    namespace: company.team
    flowId: subflow
    revision: 1
    wait: true
    transmitFailed: true
    labels:
      key: value

This powerful feature enables parallel processing of large datasets or API payloads efficiently.

Variables

Variables are key-value pairs that facilitate value reuse across tasks. They can also be stored at the namespace level for use across multiple flows within a given namespace.

Configuring Variables

Here’s an example of how to configure variables in your flow:

id: greetings
namespace: company.team

variables:
  myvar: hello
  numeric_variable: 42

tasks:
  - id: log
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{ vars.myvar }} world {{ vars.numeric_variable }}"

To use variables, employ the syntax {{ vars.variable_name }}.

Variable Rendering

Variables can be used in any task property documented as dynamic. They are rendered using the Pebble templating engine, which allows processing of various expressions with filters and functions. For more details on variable processing, refer to the Expressions documentation.

Variables are not rendered recursively.

Frequently Asked Questions

Escaping Pebble Syntax

To prevent a block from being parsed by Pebble, use the {% raw %} and {% endraw %} tags:

{% raw %}{{ myvar }}{% endraw %}

Resolution Order of Inputs and Variables

  1. Inputs are resolved first, even before execution starts.
  2. Triggers are handled similarly to inputs.
  3. Expressions are rendered recursively.

You can use inputs within variables, but not vice versa. Trigger variables can be used within variables.

Examples

Using Inputs, Triggers, and Execution Variables

id: s3_upload
namespace: company.team

inputs:
  - id: bucket
    type: STRING
    defaults: declarative-data-orchestration

tasks:
  - id: get_zip_file
    type: ai.mymagic.plugin.core.http.Download
    uri: https://wri-dataportal-prod.s3.amazonaws.com/manual/global_power_plant_database_v_1_3.zip

  - id: unzip
    type: ai.mymagic.plugin.compress.ArchiveDecompress
    algorithm: ZIP
    from: "{{outputs.get_zip_file.uri}}"

  - id: csv_upload
    type: ai.mymagic.plugin.aws.s3.Upload
    from: "{{ outputs.unzip.files['global_power_plant_database.csv'] }}"
    bucket: "{{ inputs.bucket }}"
    key: "powerplant/{{ trigger.date ?? execution.startDate | date('yyyy_MM_dd__HH_mm_ss') }}.csv"

triggers:
  - id: hourly
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "@hourly"

Conditional Branching Based on Input

id: conditional_branching
namespace: company.team

inputs:
  - id: parameter
    type: STRING
    required: false

tasks:
  - id: if
    type: ai.mymagic.plugin.core.flow.If
    condition: "{{inputs.customInput ?? false }}"
    then:
      - id: if_not_null
        type: ai.mymagic.plugin.core.log.Log
        message: Received input {{inputs.parameter}}
    else:
      - id: if_null
        type: ai.mymagic.plugin.core.log.Log
        message: No input provided

Using Trigger Variables Within Triggers

id: backfill_past_mondays
namespace: company.team

tasks:
  - id: log_trigger_or_execution_date
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ trigger.date ?? execution.startDate }}"

triggers:
  - id: first_monday_of_the_month
    type: ai.mymagic.plugin.core.trigger.Schedule
    timezone: Europe/Berlin
    backfill:
      start: 2023-11-11T00:00:00Z
    cron: "0 11 * * MON"
    conditions:
      - type: ai.mymagic.plugin.core.condition.DayWeekInMonthCondition
        date: "{{ trigger.date }}"
        dayOfWeek: "MONDAY"
        dayInMonth: "FIRST"

Transforming Variables with Pebble Expressions

MyMagic uses Pebble Templates along with the execution context to render dynamic properties. This allows for the use of Pebble expressions to transform inputs and variables.

Example:

id: variables_demo
namespace: company.team

variables:
  DATE_FORMAT: "yyyy-MM-dd"

tasks:
  - id: seconds_of_day
    type: ai.mymagic.plugin.core.debug.Return
    format: '{{60 * 60 * 24}}'

  - id: start_date
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{ execution.startDate | date(vars.DATE_FORMAT) }}"

  - id: curr_date_unix
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{ now() | date(vars.DATE_FORMAT) | timestamp() }}"

  - id: next_date
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{ now() | dateAdd(1, 'DAYS') | date(vars.DATE_FORMAT) }}"

  - id: next_date_unix
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{ now() | dateAdd(1, 'DAYS') | date(vars.DATE_FORMAT) | timestamp() }}"

  - id: pass_downstream
    type: ai.mymagic.plugin.scripts.shell.Commands
    taskRunner:
      type: ai.mymagic.plugin.core.runner.Process
    commands:
      - echo {{outputs.next_date_unix.value}}

Using Nested Variables

Nested variables are supported, but may require wrapping the root variable in a json() function to access specific keys, depending on the task.

Example using a list of maps as a variable:

id: vars
namespace: company.myteam

variables:
  servers:
    - fqn: server01.mydomain.io
      user: root
    - fqn: server02.mydomain.io
      user: guest
    - fqn: server03.mydomain.io
      user: rick

tasks:
  - id: parallel
    type: ai.mymagic.plugin.core.flow.EachParallel
    value: "{{ vars.servers }}"
    tasks:
      - id: log
        type: ai.mymagic.plugin.core.log.Log
        message:
           - "{{ taskrun.value }}"
           - "{{ json(taskrun.value).fqn }}"
           - "{{ json(taskrun.value).user }}"

Inputs

Inputs are dynamic values passed to a flow at runtime, allowing for parameterized executions.

Understanding Inputs

Flows can be parameterized using inputs, enabling multiple executions with different values. Input values are stored as variables within the flow execution context and can be accessed using the syntax {{ inputs.parameter_name }}.

Inputs make tasks more dynamic, such as defining file paths for processing. Input values can be inspected in the Overview tab of the Execution page.

Declaring Inputs

Flows can have multiple inputs, either required or optional. For required inputs, it’s recommended to use the defaults property to set default values. Flows cannot start if required inputs are missing during Execution creation.

All inputs are parsed during execution creation, with invalid inputs preventing the execution from starting. If an execution isn’t created due to invalid or missing inputs, it won’t appear in the execution list.

Example flow using various inputs:

id: inputs_demo
namespace: company.team

inputs:
  - id: string_input
    type: STRING
    defaults: "Hello MyMagic!"

  - id: optional_input
    type: STRING
    required: false

  - id: int_input
    type: INT
    defaults: 100

  - id: float_input
    type: FLOAT
    defaults: 100.12

  - id: bool_input
    type: BOOLEAN
    defaults: true

  - id: enum_input
    type: ENUM
    defaults: VALUE_1
    values:
      - VALUE_1
      - VALUE_2
      - VALUE_3

  - id: datetime_input
    type: DATETIME
    defaults: "2013-08-09T14:19:00Z"

  - id: file_input
    type: FILE

  - id: json_input
    type: JSON
    defaults: |
      [{"name": "mymagic", "rating": "best in class"}]

  - id: uri_input
    type: URI
    defaults: "https://example.com/datasets/orders.csv"

  - id: secret_input
    type: SECRET

  - id: nested.string_input
    type: STRING
    defaults: "Hello Nested World!"

Note: FILE type inputs don’t currently support defaults.

Input Types and Validation

MyMagic inputs are strongly typed and validated before flow execution. Supported data types include STRING, INT, FLOAT, ENUM, BOOLEAN, DATETIME, DATE, TIME, DURATION, FILE, JSON, URI, SECRET, and ARRAY.

Additional validation rules can be applied to certain input types:

  • STRING: Custom regex validation
  • INT/FLOAT: Minimum and maximum value ranges
  • DURATION/DATE/TIME/DATETIME: Minimum and maximum ranges

Example flow with input validation:

id: validated_inputs
namespace: company.team

inputs:
  - id: age
    type: INT
    defaults: 42
    required: false
    min: 18
    max: 64

  - id: username
    type: STRING
    defaults: student
    required: false
    validator: ^student(\d+)?$

  - id: duration
    type: DURATION
    min: "PT5M6S"
    max: "PT12H58M46S"

tasks:
  - id: validator
    type: ai.mymagic.plugin.core.log.Log
    message: User {{ inputs.username }}, age {{ inputs.age }}

Nested and Array Inputs

Nested inputs can be created using a dot (.) in the input name. Array inputs allow passing lists of values, with the itemType property specifying the type of array items.

Using Input Values in Flows

Input values are accessible using {{ inputs.name }} or {{ inputs['name'] }}. For input IDs with special characters, use the bracket notation.

Setting Input Values

Input values can be set through the web UI, API calls, or programmatically in languages like Python or Java.

Example API call using curl:

curl -v "http://localhost:8080/api/v1/executions/example/mymagic-inputs" \
    -H "Transfer-Encoding:chunked" \
    -H "Content-Type:multipart/form-data" \
    -F string="a string"  \
    -F int=1  \
    -F float=1.255  \
    -F instant="2023-12-24T23:00:00.000Z" \
    -F "files=@/tmp/example.txt;filename=file"

Inputs vs Variables

While similar, inputs and variables serve different purposes:

  • Variables are constant values defined before execution.
  • Inputs can be set at execution time and are more suitable for changing values.

Dynamic Inputs

Inputs are not rendered recursively. To use dynamic expressions in inputs, use STRING type and the {{ render() }} function:

id: dynamic_input_demo
namespace: company.team

inputs:
  - id: date
    type: STRING
    defaults: "{{ now() }}"

tasks:
  - id: print_date
    type: ai.mymagic.plugin.core.log.Log
    message: hello on {{ render(inputs.date) }}

This approach improves security by preventing arbitrary code execution within expressions.

Outputs

Outputs facilitate data transfer between tasks and flows within MyMagic.

Understanding Outputs

Workflow executions can generate outputs, which are stored in the flow’s execution context (in memory) and are accessible to all downstream tasks and flows. Outputs can have multiple attributes, as specified in each task’s documentation.

Important: Avoid using outputs for sensitive data (passwords, secrets, API tokens).

Using Outputs

Here’s an example of how to use the output of one task in another:

id: task_outputs_demo
namespace: company.team
tasks:
  - id: produce_output
    type: ai.mymagic.plugin.core.debug.Return
    format: my output {{ execution.id }}

  - id: use_output
    type: ai.mymagic.plugin.core.log.Log
    message: The previous task output is {{ outputs.produce_output.value }}

In this example, the first task produces an output, which is then used in the second task’s message property.

Internal Storage

MyMagic can store task data in its internal storage. If an output attribute is stored internally, it will contain a URI pointing to the stored file. Here’s an example:

id: output_sample
namespace: company.team
tasks:
  - id: output_from_query
    type: ai.mymagic.plugin.gcp.bigquery.Query
    sql: |
      SELECT * FROM `bigquery-public-data.wikipedia.pageviews_2023`
      WHERE DATE(datehour) = current_date()
      ORDER BY datehour desc, views desc
      LIMIT 10
    store: true

  - id: write_to_csv
    type: ai.mymagic.plugin.serdes.csv.IonToCsv
    from: "{{ outputs.output_from_query.uri }}"

Dynamic Variables in Each Tasks

For dynamic flows using “Each” loops, you can access the current taskrun value with {{ taskrun.value }}:

id: taskrun_value_demo
namespace: company.team
tasks:
  - id: each
    type: ai.mymagic.plugin.core.flow.EachSequential
    value: ["value 1", "value 2", "value 3"]
    tasks:
      - id: inner
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.value }} > {{ taskrun.startDate }}"

Looping Over JSON Objects

When looping over JSON objects, use the json function to access properties:

id: loop_json_objects
namespace: company.team
tasks:
  - id: each
    type: ai.mymagic.plugin.core.flow.EachSequential
    value:
      - {"key": "my-key", "value": "my-value"}
      - {"key": "my-complex", "value": {"sub": 1, "bool": true}}
    tasks:
      - id: inner
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{ json({taskrun.value).key }} > {{ json({taskrun.value).value }}"

Accessing Specific Outputs in Dynamic Tasks

For dynamic tasks like EachSequential, you can access specific iteration outputs:

id: dynamic_output_demo
namespace: company.team
tasks:
  - id: each
    type: ai.mymagic.plugin.core.flow.EachSequential
    value: ["s1", "s2", "s3"]
    tasks:
      - id: sub
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{ task.id }} > {{ taskrun.value }} > {{ taskrun.startDate }}"

  - id: use
    type: ai.mymagic.plugin.core.debug.Return
    format: "Previous task produced output: {{ outputs.sub.s1.value }}"

Sibling Task Outputs

Accessing outputs from sibling tasks depends on whether the task tree is static or dynamic:

id: sibling_outputs_demo
namespace: company.team
tasks:
  - id: each
    type: ai.mymagic.plugin.core.flow.EachSequential
    value: ["value 1", "value 2", "value 3"]
    tasks:
      - id: first
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{ task.id }}"

      - id: second
        type: ai.mymagic.plugin.core.debug.Return
        format: "{{ outputs.first[taskrun.value].value }}"

  - id: end
    type: ai.mymagic.plugin.core.debug.Return
    format: "{{ task.id }} > {{ outputs.second['value 1'].value }}"

For complex nested structures, use the currentEachOutput function.

Flow Outputs

Flows can produce strongly typed outputs:

id: flow_outputs_demo
namespace: company.team

tasks:
  - id: mytask
    type: ai.mymagic.plugin.core.debug.Return
    format: this is a task output used as a final flow output

outputs:
  - id: final
    type: STRING
    value: "{{ outputs.mytask.value }}"

Accessing flow outputs in a parent flow:

id: parent_flow_demo
namespace: company.team

tasks:
  - id: subflow
    type: ai.mymagic.plugin.core.flow.Subflow
    flowId: flow_outputs_demo
    namespace: company.team
    wait: true

  - id: log_subflow_output
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ outputs.subflow.outputs.final }}"

Output Preview and Debugging

MyMagic provides preview options for output files stored in its internal storage. You can view file contents in a tabular format without downloading.

The Debug Outputs functionality in the Outputs tab allows you to evaluate outputs further using Pebble expressions. For example:

id: json_values_demo
namespace: company.team
tasks:
- id: sample_json
  type: ai.mymagic.plugin.core.debug.Return
  format: '{"data": [1, 2, 3]}'

You can then use the Debug Outputs feature to analyze this JSON data with various expressions.

Triggers

Schedule Type

The Schedule trigger in MyMagic generates new executions on a regular cadence based on Cron expressions or custom scheduling conditions.

type: "ai.mymagic.plugin.core.trigger.Schedule"

MyMagic can trigger flows based on a Schedule (time-based). This is useful when you need to run flows at specific intervals or when you can’t use event-driven mechanisms.

MyMagic can optionally handle schedule backfills for missed executions. Refer to the Schedule task documentation for a complete list of task properties and outputs.

Example: Quarterly Schedule

A schedule that runs every quarter of an hour:

triggers:
  - id: schedule
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "*/15 * * * *"

Example: Monthly Schedule

A schedule that runs only the first Monday of every month at 11 AM:

triggers:
  - id: schedule
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "0 11 * * 1"
    conditions:
      - type: ai.mymagic.plugin.core.condition.DayWeekInMonthCondition
        date: "{{ trigger.date }}"
        dayOfWeek: "MONDAY"
        dayInMonth: "FIRST"

Example: Daily Schedule in Specific Timezone

A schedule that runs daily at midnight US Eastern time:

triggers:
  - id: daily
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "@daily"
    timezone: America/New_York

Important Notes on Scheduling

  • Schedules cannot overlap. If a previous schedule is still running when the next one is due to start, MyMagic will wait for the previous one to finish.
  • To make your flow executable both via schedule and manually, use this expression: {{ trigger.date ?? execution.startDate | date("yyyy-MM-dd") }}.

Schedule Conditions

When cron expressions are insufficient, you can use conditions for more complex scheduling logic. Use the {{ trigger.date }} expression on the date property of the current schedule.

Available core conditions include:

  • DateTimeBetweenCondition
  • DayWeekCondition
  • DayWeekInMonthCondition
  • NotCondition
  • OrCondition
  • WeekendCondition
  • PublicHolidayCondition
  • TimeBetweenCondition
Example: Conditional Schedule
id: conditions_demo
namespace: company.team

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: This will execute only on Thursday!

triggers:
  - id: schedule
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "@hourly"
    conditions:
      - type: ai.mymagic.plugin.core.condition.DayWeekCondition
        dayOfWeek: "THURSDAY"

Recovering Missed Schedules

Automatic Recovery

MyMagic can automatically recover missed schedules. This behavior can be configured globally or per-flow.

Global configuration:

mymagic:
  plugins:
    configurations:
      - type: ai.mymagic.plugin.core.trigger.Schedule
        values:
          recoverMissedSchedules: NONE  # Options: ALL | NONE | LAST

Per-flow configuration:

triggers:
  - id: schedule
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "*/15 * * * *"
    recoverMissedSchedules: NONE
Using Backfill

Backfills allow you to replay missed schedule intervals between defined start and end dates. Access this feature from the Triggers tab on the Flow’s detail page.

Disabling Triggers

You can disable a trigger by adding disabled: true to your YAML or by toggling it off on the Triggers page. This is useful when you need time to decide on scheduling changes before the next run.

Flow Type

Flow triggers in MyMagic allow you to initiate a flow after the execution of another flow, enabling event-driven patterns.

type: "ai.mymagic.plugin.core.trigger.Flow"

MyMagic can trigger one flow after another, allowing for the chaining of flows without modifying the base flows. This feature enables breaking down responsibilities between different flows and teams.

For a comprehensive list of all properties, refer to the Flow trigger documentation.

Conditions

You can specify conditions to determine when your Flow should be executed. In addition to the core trigger conditions, you can use the following:

  • ExecutionFlowCondition
  • ExecutionNamespaceCondition
  • ExecutionLabelsCondition
  • ExecutionStatusCondition
  • ExecutionOutputsCondition
  • ExpressionCondition

Example: Basic Flow Trigger

This flow will be triggered after each successful execution of the flow ai.mymagic.tests.trigger-flow and forward the uri output of the my-task task.

id: trigger_flow_listener
namespace: company.team

inputs:
  - id: fromParent
    type: STRING

tasks:
  - id: onlyNoInput
    type: ai.mymagic.plugin.core.debug.Return
    format: "v1: {{ trigger.executionId }}"

triggers:
  - id: listenFlow
    type: ai.mymagic.plugin.core.trigger.Flow
    inputs:
      fromParent: '{{ outputs.myTask.uri }}'
    conditions:
      - type: ai.mymagic.plugin.core.condition.ExecutionFlowCondition
        namespace: company.team
        flowId: trigger_flow
      - type: ai.mymagic.plugin.core.condition.ExecutionStatusCondition
        in:
          - SUCCESS

Parent flow:

id: trigger_flow
namespace: company.team

tasks:
  - id: myTask
    type: ai.mymagic.plugin.core.http.Download
    uri: https://dummyjson.com/products

Example: Multiple Condition Trigger

This flow will be triggered after the successful execution of both flows flow-a and flow-b during the current day. When the conditions are met, the counter is reset and can be re-triggered during the same day.

id: trigger-multiplecondition-listener
namespace: company.team

tasks:
  - id: onlyListener
    type: ai.mymagic.plugin.core.debug.Return
    format: "let's go "

triggers:
  - id: multipleListenFlow
    type: ai.mymagic.plugin.core.trigger.Flow
    conditions:
      - id: multiple
        type: ai.mymagic.plugin.core.condition.MultipleCondition
        window: P1D
        windowAdvance: P0D
        conditions:
          flow-a:
            type: ai.mymagic.plugin.core.condition.ExecutionFlowCondition
            namespace: company.team
            flowId: trigger-multiplecondition-flow-a
          flow-b:
            type: ai.mymagic.plugin.core.condition.ExecutionFlowCondition
            namespace: company.team
            flowId: trigger-multiplecondition-flow-b

To trigger trigger-multiplecondition-listener, simply execute these two flows:

id: trigger-multiplecondition-flow-a
namespace: company.team

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: Trigger A
id: trigger-multiplecondition-flow-b
namespace: company.team

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: Trigger B

These examples demonstrate how to use Flow triggers in MyMagic to create complex, event-driven workflows that respond to the execution of other flows.

Webhook Type

Webhook triggers in MyMagic generate a unique URL that you can use to automatically create new executions based on events in external applications such as GitHub or Amazon EventBridge.

type: "ai.mymagic.plugin.core.trigger.Webhook"

How Webhook Triggers Work

A Webhook trigger allows you to initiate a flow from a webhook URL. When creating a trigger, you must set a secret key that will be used to secure your webhook URL. This key is incorporated into the URL that triggers the flow:

/api/v1/executions/webhook/{namespace}/{flowId}/{key}

It’s recommended to use a complex, generated sequence of characters for the key to enhance security. MyMagic accepts GET, POST, and PUT requests on this URL. The entire request body and headers will be available as variables within your flow.

Example Webhook Trigger

Here’s an example of how to set up a Webhook trigger in your flow:

id: trigger
namespace: company.team

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: "Hello World! 🚀"

triggers:
  - id: webhook
    type: ai.mymagic.plugin.core.trigger.Webhook
    key: 4wjtkzwVGBM9yKnjm3yv8r

Executing the Webhook-Triggered Flow

After the trigger is created, you can execute the flow using the following URL structure:

https://{mymagic_domain}/api/v1/executions/webhook/{namespace}/{flowId}/4wjtkzwVGBM9yKnjm3yv8r

Make sure to replace mymagic_domain, namespace, and flowId with your specific values.

Additional Information

For a comprehensive list of task properties and outputs related to Webhook triggers, please refer to the Webhook task documentation in MyMagic.

Security Considerations

When using Webhook triggers, keep the following security considerations in mind:

  1. Use a complex, randomly generated key to secure your webhook URL.
  2. Keep your webhook URL and key confidential, sharing them only with trusted systems and personnel.
  3. Consider implementing additional authentication mechanisms if your use case requires heightened security.
  4. Regularly rotate your webhook keys as part of your security best practices.

Use Cases

Webhook triggers are particularly useful for:

  1. Integrating with external services that support webhooks (e.g., GitHub, GitLab, Stripe).
  2. Creating event-driven workflows that respond to real-time events.
  3. Allowing external systems to initiate processes within your MyMagic workflows.
  4. Building automated pipelines that react to changes in other parts of your infrastructure.

By leveraging Webhook triggers, you can create highly responsive and interconnected workflows in MyMagic, enhancing your ability to automate complex processes across your entire technology stack.

Polling Type

Polling triggers in MyMagic are a type of trigger provided by plugins. They allow for the periodic checking of external systems for the presence of data. When data is ready to be processed, a flow execution is initiated.

Overview of Polling Triggers

MyMagic offers polling triggers for a wide variety of external systems, including:

  • Databases
  • Message brokers
  • FTP servers
  • And many more…

These triggers poll the external system at a fixed interval defined by the interval property. When triggered, the flow will have access to the outputs of the polling trigger via the trigger variable.

Example: PostgreSQL Polling Trigger

The following example demonstrates a flow that is triggered when rows are available in the my_table PostgreSQL table. When triggered, it deletes the rows (to avoid reprocessing) and logs them.

id: jdbc-trigger
namespace: company.team

inputs:
  - id: db_url
    type: STRING

tasks:
- id: update
  type: ai.mymagic.plugin.jdbc.postgresql.Query
  url: "{{ inputs.db_url }}"
  sql: DELETE * FROM my_table

- id: log
  type: ai.mymagic.plugin.core.log.Log
  message: "{{ trigger.rows }}"

triggers:
  - id: watch
    type: ai.mymagic.plugin.jdbc.postgresql.Trigger
    url: myurl
    interval: "PT5M"
    sql: "SELECT * FROM my_table"

Key Components of the Example

  1. Trigger Definition:

    • Type: ai.mymagic.plugin.jdbc.postgresql.Trigger
    • Interval: Set to “PT5M” (poll every 5 minutes)
    • SQL: Selects all rows from my_table
  2. Tasks:

    • Update task: Deletes processed rows from the table
    • Log task: Logs the rows that were retrieved by the trigger
  3. Data Flow:

    • The trigger polls the database every 5 minutes
    • If rows are found, a flow execution is started
    • The flow deletes the rows and logs their content

Benefits of Polling Triggers

  1. Flexibility: Can integrate with various external systems
  2. Reliability: Ensures data is processed even if it arrives when MyMagic is offline
  3. Customization: Interval and query can be tailored to specific needs
  4. Decoupling: Allows for separation between data production and processing

Considerations

  • Choose an appropriate polling interval to balance responsiveness and system load
  • Ensure your processing logic handles duplicate data in case of failures
  • Monitor the performance impact on the polled system, especially for frequent polls

Best Practices

  1. Use idempotent operations in your flow to handle potential reprocessing
  2. Implement error handling to manage issues with the external system
  3. Consider using transactional operations when removing or marking processed data
  4. Monitor the latency between data availability and processing to optimize your polling interval

By leveraging polling triggers, MyMagic enables you to create robust, event-driven workflows that can reliably process data from a wide range of external systems.

Realtime Type

While MyMagic triggers typically poll external systems for new events at regular intervals, business-critical workflows often require immediate reactions to events. This is where Realtime Triggers come into play, offering millisecond-level latency.

What are Realtime Triggers?

Realtime triggers in MyMagic listen to events in real time and instantaneously start a workflow execution when:

  • A new message is published to a Kafka topic
  • A new message is published to a Pulsar topic
  • A new message is published to an AMQP queue
  • A new message is published to an MQTT queue
  • A new message is published to an AWS SQS queue
  • A new message is published to Google Pub/Sub
  • A new message is published to Azure Event Hubs
  • A new message is published to a NATS subject
  • A new item is added to a Redis list
  • A new row is added, modified or deleted in Postgres, MySQL, or SQL Server

How Realtime Triggers Work

When you add a Realtime Trigger to your workflow, MyMagic initiates an always-on thread that continuously listens to the external system for new events. As soon as a new event occurs, MyMagic starts a workflow execution to process it.

Use Cases

Realtime Triggers enable the orchestration of business-critical processes and microservices in real time, suitable for scenarios such as:

  • Fraud and anomaly detection
  • Order processing
  • Realtime predictions or recommendations
  • Reacting to stock price changes
  • Shipping and delivery notifications
  • Change Data Capture for data orchestration

Triggers vs. Realtime Triggers: A Comparison

CriteriaTriggerRealtime Trigger
ImplementationMicro-batchRealtime
Event ProcessingBatch-process events until poll interval elapsesProcess each event immediately
LatencySecond(s) or minute(s)Millisecond(s)
Execution ModelEach execution processes one or many eventsEach execution processes exactly one event
Data HandlingStore all received events in a fileStore each event in raw format
Output formatURI of a file in internal storageRaw data of the event payload and related metadata
ApplicationData applications processing data in batchBusiness-critical operations reacting to events in real time
Use casesData orchestration for analytics and building data productsProcess and microservice orchestration

How to Use Realtime Triggers

To implement a Realtime Trigger, simply choose the RealtimeTrigger as the trigger type for your desired service. Here’s an example using the RealtimeTrigger to listen for new messages in an AWS SQS queue:

id: sqs
namespace: company.team

tasks:
  - id: log
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ trigger }}"

triggers:
  - id: realtime_trigger
    type: ai.mymagic.plugin.aws.sqs.RealtimeTrigger
    region: eu-north-1
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID')}}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
    queueUrl: https://sqs.eu-north-1.amazonaws.com/123456789/MyQueue

Comparison with Realtime Data Processing Engines

It’s important to note that MyMagic’s Realtime Triggers are not intended to replace real-time data processing engines like Apache Flink, Apache Beam, or Google Dataflow. These engines excel at stateful streaming applications and complex SQL transformations over real-time data streams.

In contrast, MyMagic’s Realtime Triggers are stateless, triggering one workflow execution per event. They are primarily designed to react to events in real time for orchestrating business-critical processes.

Best Practices

  1. Use Realtime Triggers for scenarios where immediate reaction to events is crucial.
  2. Ensure your workflows are designed to handle the potential high frequency of executions.
  3. Implement proper error handling and retry mechanisms in your workflows.
  4. Monitor the performance and resource usage of your Realtime Trigger workflows.
  5. Consider using batching techniques within your workflows if processing each event individually becomes too resource-intensive.

By leveraging Realtime Triggers, MyMagic enables you to create highly responsive, event-driven workflows that can handle critical business processes with minimal latency.

Labels

Labels are key-value pairs used to organize flows and executions in MyMagic. They provide a flexible way to categorize and search for your workflows based on various criteria such as project, maintainer, or any other relevant dimension.

The Purpose of Labels

Labels serve to organize and filter flows and their executions. By adding labels to your flows, you can sort executions across multiple dimensions.

Here’s a simple example of a flow with labels:

id: flow_with_labels
namespace: company.team

labels:
  song: never_gonna_give_you_up
  artist: rick_astley

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: hello from a flow with labels

Benefits of Labels

Labels offer several advantages in managing your workflows:

  1. Observability: Labels set during execution aid in monitoring and troubleshooting.
  2. Filtering: Labels facilitate finding specific executions, useful for tracking ML experiments, API responses, or labeling executions based on runtime-specific flow inputs.
  3. Organization: Labels help manage workflow executions at scale, especially in complex environments. Custom dashboards can be created based on labels, e.g., http://localhost:8080/ui/executions?labels=team:finance.

Execution Labels

Propagation from Flow Labels

When you execute a flow with labels, these labels are automatically propagated to the created executions.

Setting Labels from the UI

  • When manually executing flows, you can override and define new labels in the Advanced configuration section.
  • Labels can be set from the UI after execution completion, useful for collaboration and troubleshooting.
  • Labels can be set for multiple executions simultaneously, helpful for bulk operations.

Dynamic Label Setting

Labels can be set dynamically using a dedicated Labels task. This feature enhances observability, debugging, and failure monitoring.

Using a Map (Key-Value Pairs)

Ideal when the key is static and the value is dynamic:

id: labels_override
namespace: company.team

labels:
  song: never_gonna_give_you_up

tasks:
  - id: get
    type: ai.mymagic.plugin.core.debug.Return
    format: never_gonna_stop

  - id: update_labels
    type: ai.mymagic.plugin.core.execution.Labels
    labels:
      song: "{{ outputs.get.value }}"
      artist: rick_astley # new label

Using a List of Key-Value Pairs

Useful when both key and value are dynamic:

id: labels
namespace: company.team

inputs:
  - id: user
    type: STRING
    defaults: Rick Astley

  - id: url
    type: STRING
    defaults: song_url

tasks:
  - id: update_labels_with_map
    type: ai.mymagic.plugin.core.execution.Labels
    labels:
      customerId: "{{ inputs.user }}"

  - id: get
    type: ai.mymagic.plugin.core.debug.Return
    format: https://t.ly/Vemr0

  - id: update_labels_with_list
    type: ai.mymagic.plugin.core.execution.Labels
    labels:
      - key: "{{ inputs.url }}"
        value: "{{ outputs.get.value }}"

Overriding Flow Labels at Runtime

You can set default labels at the flow level and override them dynamically during execution:

id: flow_with_labels
namespace: company.team

labels:
  song: never_gonna_give_you_up
  artist: rick-astley
  genre: pop

tasks:
  - id: get
    type: ai.mymagic.plugin.core.debug.Return
    format: never_gonna_stop

  - id: update-list
    type: ai.mymagic.plugin.core.execution.Labels
    labels:
      song: "{{ outputs.get.value }}"

In this example, the default song label is overridden by the output of the get task.

Best Practices

  1. Use consistent naming conventions for your labels.
  2. Avoid creating too many unique labels, which can lead to clutter.
  3. Regularly review and clean up unused labels.
  4. Use labels in combination with MyMagic’s search functionality for powerful filtering.
  5. Document the meaning and intended use of labels within your team.

By effectively using labels, you can significantly enhance the organization, searchability, and manageability of your MyMagic workflows and executions.

Plugin Defaults

Plugin defaults are a list of default values applied to each task of a certain type within your flow(s). They function similarly to default function arguments, helping to avoid repetition when a given task or plugin is often called with the same values.

Plugin Defaults at the Flow Level

You can add plugin defaults to avoid repeating task properties on multiple occurrences of the same task using the pluginDefaults property. Here’s an example:

id: api_python_sql
namespace: company.team

tasks:
  - id: api
    type: ai.mymagic.plugin.core.http.Request
    uri: https://dummyjson.com/products

  - id: hello
    type: ai.mymagic.plugin.scripts.python.Script
    script: |
      print("Hello World!")

  - id: python
    type: ai.mymagic.plugin.scripts.python.Script
    beforeCommands:
      - pip install polars
    warningOnStdErr: false
    outputFiles:
      - "products.csv"
    script: |
      import polars as pl
      data = {{outputs.api.body | jq('.products') | first}}
      df = pl.from_dicts(data)
      df.glimpse()
      df.select(["brand", "price"]).write_csv("products.csv")

  - id: sql_query
    type: ai.mymagic.plugin.jdbc.duckdb.Query
    inputFiles:
      in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
    sql: |
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
      GROUP BY brand
      ORDER BY avg_price DESC;
    store: true

pluginDefaults:
  - type: ai.mymagic.plugin.scripts.python.Script
    values:
      taskRunner:
        type: ai.mymagic.plugin.scripts.runner.docker.Docker
        pullPolicy: ALWAYS # set it to NEVER to use a local image
      containerImage: python:slim

In this example, Docker and Python configurations are set within the pluginDefaults property, streamlining the configuration process and reducing the chances of errors caused by inconsistent settings across different tasks.

Note: When moving required task attributes into the pluginDefaults property, the code editor within the UI may show warnings about missing required arguments. These warnings can be ignored as long as pluginDefaults contains the relevant arguments, as they are resolved at runtime.

Plugin Defaults in Global Configuration

You can set plugin defaults in your global MyMagic configuration to apply the same defaults across multiple flows. For example, to centrally manage default values for the ai.mymagic.plugin.aws plugin:

mymagic:
  plugins:
    defaults:
      - type: ai.mymagic.plugin.aws
        values:
          accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          region: "us-east-1"

To set defaults for a specific task:

mymagic:
  plugins:
    defaults:
      - type: ai.mymagic.plugin.aws.s3.Upload
        values:
          accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
          secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
          region: "us-east-1"

Best Practices

  1. Use plugin defaults for configurations that are consistent across multiple tasks or flows.
  2. Keep sensitive information like credentials in secrets and reference them in plugin defaults.
  3. Regularly review and update plugin defaults to ensure they align with your current requirements.
  4. Document your use of plugin defaults to help team members understand the configuration.
  5. Be cautious when overriding plugin defaults in individual tasks to maintain consistency.

By effectively using plugin defaults, you can significantly reduce configuration overhead and improve the maintainability of your MyMagic workflows.

Subflows

Subflows allow you to build modular and reusable workflow components. They function similarly to calling functions, where a subflow execution is created when you call a flow from another flow.

Why Use Subflows?

Subflows enable the creation of modular, reusable components that can be utilized across multiple flows. For instance, you might have a subflow dedicated to alerting errors to Slack and email. By using a subflow, you can reuse these tasks together for all flows that require error notifications, eliminating the need to copy individual tasks for every flow.

Declaring a Subflow

To call a flow from another flow, use the ai.mymagic.plugin.core.flow.Subflow task. In this task, specify the flowId and namespace of the subflow you want to execute. Optionally, you can specify custom input values, similar to passing arguments in a function call.

The optional properties wait and transmitFailed control the execution behavior. By default, if wait is not set or set to false, the parent flow continues execution without waiting for the subflow’s completion. The transmitFailed property determines whether a failure in the subflow execution should cause the parent flow to fail.

Practical Example

Consider a subflow that encapsulates critical business logic:

id: critical_service
namespace: company.team

tasks:
  - id: return_data
    type: ai.mymagic.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT sum(total) as total, avg(quantity) as avg_quantity
      FROM read_csv_auto('https://huggingface.co/datasets/mymagic/datasets/raw/main/csv/orders.csv', header=True);
    store: true

outputs:
  - id: some_output
    type: STRING
    value: "{{ outputs.return_data.uri }}"

This subflow can be called from a parent flow:

id: parent_service
namespace: company.team

tasks:
  - id: subflow_call
    type: ai.mymagic.plugin.core.flow.Subflow
    namespace: company.team
    flowId: critical_service
    wait: true
    transmitFailed: true

  - id: log_subflow_output
    type: ai.mymagic.plugin.scripts.shell.Commands
    taskRunner:
      type: ai.mymagic.plugin.core.runner.Process
    commands:
      - cat "{{ outputs.subflow_call.outputs.some_output }}"

Subflow Properties

The ai.mymagic.plugin.core.flow.Subflow task has several properties, including:

  • flowId: The subflow’s identifier.
  • namespace: The namespace where the subflow is located.
  • inheritLabels: Determines if the subflow inherits labels from the parent (default: false).
  • inputs: Inputs passed to the subflow.
  • labels: Labels assigned to the subflow.
  • revision: The subflow revision to execute (defaults to the latest).
  • wait: If true, parent flow waits for subflow completion (default: false).
  • transmitFailed: If true, parent flow fails on subflow failure (requires wait to be true).

Passing Data Between Parent and Child Flows

Flows can emit outputs that can be accessed by the parent flow. For example:

id: flow_outputs
namespace: company.team

tasks:
  - id: mytask
    type: ai.mymagic.plugin.core.debug.Return
    format: this is a task output used as a final flow output

outputs:
  - id: final
    type: STRING
    value: "{{ outputs.mytask.value }}"

These outputs can be accessed from a parent task:

id: parent_flow
namespace: company.team

tasks:
  - id: subflow
    type: ai.mymagic.plugin.core.flow.Subflow
    flowId: flow_outputs
    namespace: company.team
    wait: true

  - id: log_subflow_output
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ outputs.subflow.outputs.final }}"

Passing Inputs to a Subflow

You can pass inputs to a Subflow task. Here’s an example:

Subflow:

id: subflow_example
namespace: company.team

inputs:
  - id: http_uri
    type: STRING

tasks:
  - id: download
    type: ai.mymagic.plugin.core.http.Request
    uri: "{{ inputs.http_uri }}"

  - id: log
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ outputs.download.body }}"

outputs:
  - id: data
    type: STRING
    value: "{{ outputs.download.body }}"

Parent flow:

id: inputs_subflow
namespace: company.team

inputs:
  - id: url
    type: STRING

tasks:
  - id: subflow
    type: ai.mymagic.plugin.core.flow.Subflow
    flowId: subflow_example
    namespace: company.team
    inputs:
      http_uri: "{{ inputs.url }}"
    wait: true

  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ outputs.subflow.outputs.data }}"

Nested Inputs

You can pass nested inputs to subflows, which is useful for handling complex data structures:

id: extract_json
namespace: company.team

tasks:
  - id: api
    type: ai.mymagic.plugin.core.http.Request
    uri: https://dummyjson.com/users

  - id: subflow
    type: ai.mymagic.plugin.core.flow.Subflow
    namespace: company.team
    flowId: subflow
    inputs:
      users.firstName: "{{ outputs.api.body | jq('.users') | first | first | jq('.firstName') | first }}"
      users.lastName: "{{ outputs.api.body | jq('.users') | first | first | jq('.lastName') | first }}"
    wait: true
    transmitFailed: true

The subflow can then use these nested inputs:

id: subflow
namespace: company.team

inputs:
  - id: users.firstName
    type: STRING
    defaults: Rick

  - id: users.lastName
    type: STRING
    defaults: Astley

tasks:
  - id: process_user_data
    type: ai.mymagic.plugin.core.log.Log
    message: hello {{ inputs.users }}

By leveraging subflows, you can create more maintainable, modular, and reusable workflows in MyMagic.

​Errors (Coming Soon)

Retries

Retries are a mechanism to handle transient failures in your workflows. They are defined at the task level and can be configured to retry a task a certain number of times, or with a specific delay between each retry.

Understanding Retries

MyMagic provides task retry functionality, allowing you to add retry behavior for any failed task run based on configurations in the flow description. A retry on a task run will create a new task run attempt.

Example

Here’s an example that defines a retry for the retry-sample task with a maximum of 5 attempts every 15 minutes:

- id: retry_sample
  type: ai.mymagic.plugin.core.log.Log
  message: my output for task {{task.id}}
  timeout: PT10M
  retry:
    type: constant
    maxAttempt: 5
    interval: PT15M

In this next example, the flow will be retried four times every 0.25 ms, with each attempt executing for a maximum of 1 minute before being failed:

id: retry
namespace: company.team
description: This flow will retry 4 times and will succeed at the 5th attempt

tasks:
- id: failed
  type: ai.mymagic.plugin.scripts.shell.Commands
  taskRunner:
    type: ai.mymagic.plugin.core.runner.Process
  commands:
  - 'if [ "{{taskrun.attemptsCount}}" -eq 4 ]; then exit 0; else exit 1; fi'
  retry:
    type: constant
    interval: PT0.25S
    maxAttempt: 5
    maxDuration: PT1M
    warningOnRetry: true

errors:
  - id: never-happen
    type: ai.mymagic.plugin.core.debug.Return
    format: Never happened {{task.id}}

Retry Options

All retry types share these common options:

  • type: Retry behavior to apply (constant, exponential, random)
  • maxAttempt: Number of retries before stopping
  • maxDuration: Maximum delay for retries
  • warningOnRetry: Flag the execution as warning if any retry was done

Retry Types

  1. Constant: Establishes constant retry times
  2. Exponential: Waits longer between each retry
  3. Random: Retries with a random delay between minimum and maximum limits

Configuring Retries Globally

You can configure retries globally for all tasks in a flow:

mymagic:
  plugins:
    configurations:
      - type: ai.mymagic
        values:
          retry:
            type: constant
            maxAttempt: 3
            interval: PT30S

Flow-level Retries

You can set a flow-level retry policy to restart the execution if any task fails:

id: flow_level_retry
namespace: company.team

retry:
  maxAttempt: 3
  behavior: CREATE_NEW_EXECUTION # or RETRY_FAILED_TASK
  type: constant
  interval: PT1S

tasks:
  - id: fail_1
    type: ai.mymagic.plugin.core.execution.Fail
    allowFailure: true

  - id: fail_2
    type: ai.mymagic.plugin.core.execution.Fail
    allowFailure: false

Retry vs. Restart vs. Replay

Automatic vs. Manual

Retries are automatic, while Restart and Replay are manual operations initiated from the UI.

Restart vs. Replay

  • Restart: Reruns failed tasks within the current Execution
  • Replay: Creates a new Execution with a different ID

Summary: Retries vs. Restart vs. Replay

ConceptFlow or task levelAutomatic or manualCreates new execution?
RetryTask levelAutomaticNo
RestartFlow levelManualNo
ReplayEitherManualYes

Best Practices

  1. Use retries for transient failures that might resolve on their own.
  2. Configure appropriate intervals and max attempts based on the task’s nature.
  3. Use flow-level retries for scenarios where the entire flow should be retried.
  4. Utilize Restart for manual intervention after unexpected failures.
  5. Use Replay when you need to create a new execution from a specific point.

By effectively using retries, restarts, and replays, you can create more resilient and fault-tolerant workflows in MyMagic.

Timeouts

Timeout allows you to set a maximum duration for a task run.

What is Timeout

If a task run exceeds the specified duration, MyMagic will automatically stop the task run and mark it as failed. This is particularly useful for tasks that may hang and run indefinitely.

Timeout is often used as a cost control mechanism for cloud-based workflows. For instance, a Snowflake query or an AWS Batch job running for hours could lead to unexpected costs. By setting a timeout, you can ensure that the task run will not exceed a certain duration.

Format

Similar to durations in retries, timeouts use the ISO 8601 Durations format. Here are some examples:

NameDescription
PT0.250S250 milliseconds delay
PT2S2 seconds delay
PT1M1 minute delay
PT3.5H3 hours and a half delay

Example

In this example, the costly_query task will sleep for 10 seconds, but the timeout is set to 5 seconds, leading to a failed task run:

id: timeout
namespace: company.team
description: This flow will always fail because of a timeout.

tasks:
  - id: costly_query
    type: ai.mymagic.plugin.scripts.shell.Commands
    taskRunner:
      type: ai.mymagic.plugin.core.runner.Process
    commands:
      - sleep 10
    timeout: PT5S

Best Practices

  1. Set reasonable timeouts based on the expected duration of your tasks.
  2. Use timeouts for tasks that interact with external services or resources to prevent hanging in case of connectivity issues.
  3. Consider setting different timeouts for development and production environments.
  4. Regularly review and adjust your timeouts based on task performance and changing requirements.
  5. Use timeouts in conjunction with retries for more robust error handling.

Considerations

  • Timeouts should be set carefully to avoid interrupting long-running but valid tasks.
  • When a task times out, ensure you have proper error handling and notification mechanisms in place.
  • For tasks with variable execution times, consider setting a generous timeout and using other monitoring mechanisms for more granular control.

By effectively using timeouts, you can create more reliable and cost-effective workflows, preventing runaway processes and ensuring timely completion of your tasks.

Concurrency Control

Control concurrent executions of a given flow using the flow-level concurrency property.

Overview

The concurrency property allows you to set a limit on the number of concurrent executions for a specific flow. This global concurrency limit applies to all executions of the flow, regardless of how they are triggered (automatically via trigger, webhook, API call, or manually from the UI).

Basic Usage

To set a concurrency limit, use the limit key within the concurrency property:

id: concurrency_example
namespace: company.team

concurrency:
  limit: 2

tasks:
  - id: wait
    type: ai.mymagic.plugin.scripts.shell.Commands
    commands:
      - sleep 10

In this example, only two executions of the flow will be allowed to run simultaneously. Any additional executions will be queued until one of the running executions completes.

Behavior Property

You can customize how the system responds when the concurrency limit is reached using the behavior property. This property accepts three possible values:

  • QUEUE: Queues additional executions (default behavior)
  • CANCEL: Immediately cancels additional executions
  • FAIL: Immediately fails additional executions

Example with custom behavior:

id: concurrency_limited_flow
namespace: company.team

concurrency:
  behavior: FAIL # QUEUE, CANCEL or FAIL
  limit: 2 # can be any integer >= 1

tasks:
  - id: wait
    type: ai.mymagic.plugin.scripts.shell.Commands
    commands:
      - sleep 10

In this case, when the concurrency limit of 2 is reached, any additional execution attempts will immediately fail without running any tasks.

Visual Feedback

The user interface provides visual feedback on the concurrency status:

  • For QUEUE behavior: Additional executions will show as queued while the first two finish.
  • For CANCEL or FAIL behavior: The third execution’s state will immediately be set to CANCELLED or FAILED respectively.

Best Practices

  1. Set appropriate concurrency limits based on your system’s resources and the flow’s requirements.
  2. Use the QUEUE behavior for tasks that can be delayed but must eventually run.
  3. Use CANCEL or FAIL for time-sensitive tasks where running later is not acceptable.
  4. Monitor queued executions to ensure they’re not building up excessively.
  5. Combine concurrency control with timeouts for more robust flow management.

Considerations

  • Concurrency limits apply per flow, not globally across all flows.
  • When using QUEUE, ensure your system can handle the potential backlog of executions.
  • CANCEL and FAIL behaviors can help prevent resource overload but may require additional error handling in your overall workflow.

By effectively using concurrency control, you can manage system resources, prevent overload, and ensure smooth execution of your workflows.

Descriptions

Add descriptions to your flows, inputs, outputs, tasks, and triggers using the description property.

Overview

The description property is a string that supports Markdown syntax, allowing for rich, formatted documentation within your workflow definitions. These descriptions are rendered in the UI, providing clear and accessible information about various components of your flows.

Supported Components

You can add a description property to:

  • Flows
  • Inputs
  • Outputs
  • Tasks
  • Triggers

Example

Here’s an example flow demonstrating the use of descriptions in different components:

id: myflow
namespace: company.team
description: |
  This is the **Flow Description**.
  You can look at `input description`, `task description`, `output description` and `trigger description` as well in this example. 

labels:
  env: dev
  project: myproject

inputs:
  - id: payload
    type: JSON
    description: JSON request payload to the API # Input description
    defaults: |
      [{"name": "mymagic", "rating": "best in class"}]

tasks:
  - id: send_data
    type: ai.mymagic.plugin.core.http.Request
    description: Task for sending POST API request to https://reqres.in/api/products # Task description
    uri: https://reqres.in/api/products
    method: POST
    contentType: application/json
    body: "{{ inputs.payload }}"

  - id: print_status
    type: ai.mymagic.plugin.core.debug.Return
    description: Task printing the API request date # Task description
    format: hello on {{ outputs.send_data.headers.date | first }}

outputs:
  - id: final
    type: STRING
    description: This is a task output used as a final flow output
    value: "{{ outputs.print_status.value }}"

triggers:
  - id: daily
    type: ai.mymagic.plugin.core.trigger.Schedule
    description: Trigger the flow at 09:00am every day # Trigger description
    cron: "0 9 * * *"

Best Practices

  1. Use clear and concise language in your descriptions.
  2. Leverage Markdown syntax for better readability (e.g., headers, lists, code blocks).
  3. Include relevant details such as purpose, expected inputs/outputs, and any important considerations.
  4. Keep descriptions up-to-date as your flows evolve.
  5. Use consistent formatting across your descriptions for a unified documentation style.

Tips for Effective Documentation

  • Flow Description: Provide an overview of the flow’s purpose and any important context.
  • Input Description: Explain the expected format, constraints, and significance of each input.
  • Task Description: Detail what the task does, any prerequisites, and its role in the overall flow.
  • Output Description: Clarify what the output represents and how it might be used downstream.
  • Trigger Description: Specify when and why the trigger activates the flow.

Benefits

  • Improves workflow understanding for team members and stakeholders.
  • Facilitates easier maintenance and troubleshooting.
  • Enhances collaboration by providing clear context for each component.
  • Serves as built-in documentation that stays in sync with your workflow definition.

By effectively using the description property throughout your flows, you create self-documenting workflows that are easier to understand, maintain, and collaborate on.

Disabled Flag

The disabled flag is a boolean property that allows you to skip a flow, task, or trigger.

Overview

This feature is helpful when debugging or testing specific parts of your flow without removing existing logic. Instead of deleting parts of your YAML, you can add the disabled property.

Disabled Flow

When a flow is disabled, it will not be executed, even if a trigger is set. Any active triggers on a disabled flow will be ignored automatically.

Example of a disabled flow:

id: disabled_flow
namespace: company.team
disabled: true

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: MyMagic team wishes you a great day! 👋

triggers:
  - id: fail_every_minute
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "*/1 * * * *"

Behavior with Subflows

When trying to execute a disabled flow from a subflow:

id: parent_runs_disabled_flow
namespace: company.team
tasks:
  - id: disabled_subflow
    type: ai.mymagic.plugin.core.flow.Subflow
    flowId: disabled_flow
    namespace: company.team

Executing the parent flow will immediately fail with the error message: Cannot execute a flow which is disabled.

API Behavior

When triggering a disabled flow via an API call:

curl -X POST http://localhost:8080/api/v1/executions/trigger/example/parent_runs_disabled_flow

The API call itself will be successful, but the execution will be immediately marked as failed with the error message: Cannot execute a flow which is disabled.

Disabled Trigger

You can temporarily disable a trigger by setting the disabled flag to true:

id: myflow
namespace: company.team

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: hello from a scheduled flow

triggers:
  - id: daily
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "0 9 * * *"
    disabled: true

No scheduled executions will be created for this flow while the trigger is disabled.

Disabled Task

You can disable a single task within a flow:

id: myflow
namespace: company.team

tasks:
  - id: enabled
    type: ai.mymagic.plugin.core.log.Log
    message: this task will run

  - id: disabled
    type: ai.mymagic.plugin.core.debug.Return
    format: this task will be skipped
    disabled: true

Disabled tasks are visually greyed out in the UI.

Best Practices

  1. Use the disabled flag for temporary changes during development or debugging.
  2. Document why a component is disabled, especially if committing the change.
  3. Regularly review disabled components to ensure they’re not forgotten.
  4. Use version control to track changes in disabled status.

Considerations

  • Disabling a flow affects all its triggers and tasks.
  • Disabling a trigger only affects that specific trigger, not the entire flow.
  • Disabled tasks are skipped during execution but remain in the flow definition.

By effectively using the disabled flag, you can streamline your development and debugging processes without altering the structure of your workflows.

States

States control the status of your workflow execution.

Overview

An execution is a single run of a flow in a specific state. Each state represents a particular point in the workflow where MyMagic’s orchestration system determines the next steps based on the control flow logic defined in the flow.

Execution States

Each MyMagic execution can transition through several states during its lifecycle. Here’s a brief description of each state:

  • CREATED: Execution created but not yet started. A transient state indicating the execution is waiting to be processed.
  • QUEUED: Execution waiting for a free slot to start running. Used when the flow has concurrency limits and all available slots are taken.
  • RUNNING: Execution currently in progress.
  • SUCCESS: Execution completed successfully. All tasks finished without errors (or were allowed to fail).
  • WARNING: Execution completed successfully, but one or more tasks emitted warnings.
  • FAILED: One or more tasks failed and will not be retried. Error tasks (if defined) will be executed before ending the execution.
  • RETRYING: Execution is currently retrying one or more failed task runs.
  • RETRIED: Execution has been retried according to the flow-level retry policy set to CREATE_NEW_EXECUTION behavior.
  • PAUSED: Execution awaiting manual approval or paused for a fixed duration.
  • RESTARTED: Equivalent to CREATED, but for a failed execution that has been restarted.
  • CANCELLED: Execution automatically cancelled by the system, usually due to concurrency limits.
  • KILLING: User has issued a command to kill the execution, system is terminating task runs.
  • KILLED: Execution has been killed upon user request. No more tasks will run.

Task Run States

Task run states represent the status of a single task run within an execution:

  • CREATED: Task run created but not yet started.
  • RUNNING: Task run currently in progress.
  • SUCCESS: Task run completed successfully.
  • WARNING: Task run completed successfully but with warnings.
  • FAILED: Task run has failed.
  • RETRYING: Task run is currently being retried.
  • RETRIED: Task run has been retried.
  • KILLING: Task run is in the process of being killed.
  • KILLED: Task run has been killed upon user request.

Note: There are no QUEUED, CANCELLED, PAUSED, or RESTARTED states for task runs.

Key Differences

  • CANCELLED vs KILLED: CANCELLED is used when the system automatically cancels an execution due to concurrency limits. KILLED is used when the execution has been terminated upon user request.
  • Execution States vs Task Run States: Execution states represent the overall status of the workflow, while task run states represent the status of individual tasks within the execution.

Best Practices

  1. Monitor executions in CREATED state to ensure they’re not stuck.
  2. Use the PAUSED state for manual approvals or timed delays in workflows.
  3. Implement proper error handling for FAILED states.
  4. Utilize the WARNING state to flag executions that completed but require attention.
  5. Be cautious when using KILLING state, as it may leave resources in an inconsistent state.

Considerations

  • State transitions are managed by MyMagic’s orchestration system.
  • Some states are transient (e.g., CREATED, RUNNING) while others are terminal (e.g., SUCCESS, FAILED).
  • The flow of states can be influenced by retry policies, concurrency settings, and manual interventions.

Understanding these states is crucial for effective monitoring, troubleshooting, and optimization of your MyMagic workflows.

Revisions

Manage versions of flows efficiently with built-in versioning capabilities.

Automatic Versioning

Flows are versioned by default. Each time you make changes to your flows, a new revision is automatically created. This feature provides a robust mechanism for tracking changes and enables easy rollback to previous versions when necessary.

Viewing Revisions

To access the revision history of a flow:

  1. Navigate to the specific flow.
  2. Go to the Revisions tab.
  3. You will see a list of all revisions for that flow.

Revision Comparison

The revision system allows you to compare different versions of your flow:

  • Compare two revisions side-by-side.
  • Easily identify changes between versions.
  • Understand the evolution of your flow over time.

Rollback Capability

If you need to revert to a previous version of your flow:

  1. Navigate to the Revisions tab.
  2. Select the desired previous revision.
  3. Use the rollback feature to restore that version.

This capability ensures that you can quickly recover from unintended changes or return to a known good state of your flow.

Best Practices

  1. Regular Reviews: Periodically review the revision history to understand how your flows have evolved.
  2. Meaningful Changes: Try to make meaningful, atomic changes between revisions to make the history more understandable.
  3. Comments: If your system supports it, add comments or descriptions to revisions to explain significant changes.
  4. Testing: Always test your flow after rolling back to ensure it still functions as expected in the current environment.

Considerations

  • Revisions are typically immutable once created.
  • Rolling back doesn’t delete newer revisions; it creates a new revision based on the older state.
  • Be cautious when rolling back flows that interact with external systems or data stores, as the current state may have changed since the previous revision.

By leveraging the flow revision system effectively, you can maintain a clear history of your workflow changes, easily track modifications, and ensure the ability to recover from unintended alterations quickly.

KV Store

Build stateful workflows with the Key-Value (KV) Store.

Overview

MyMagic’s workflows are designed to be stateless, with isolated executions and task runs to avoid unintended side effects. However, the KV Store allows you to share data beyond passing outputs between tasks, enabling data persistence across executions or different workflows.

Architecture Integration

The KV Store is built on top of MyMagic’s internal storage, ensuring that:

  • All values are stored in your private cloud storage bucket.
  • MyMagic’s database only contains metadata about the objects.
  • You have full control and privacy over your data.

Keys and Values

  • Keys are arbitrary strings that can contain uppercase and lowercase characters and standard ASCII characters.
  • Values are stored as ION files in MyMagic’s internal storage and can be of types: string, number, boolean, datetime, date, duration, or JSON.
  • Time to Live (TTL) can be set for each KV pair to manage storage efficiently.

Namespace Binding

KV pairs are defined at a namespace level, accessible from the namespace page in the UI under the KV Store tab. Cross-namespace access is possible for allowed namespaces.

Management Methods

KV pairs can be managed through various methods:

  1. MyMagic UI
  2. Tasks in a flow
  3. MyMagic’s API
  4. MyMagic’s Terraform provider
  5. Pebble function
  6. GitHub Actions

UI Management

Create New KV Pairs

  1. Navigate to the Namespaces page and select the desired namespace.
  2. Go to the KV Store tab.
  3. Click on “New Key-Value” button.
  4. Enter key name, select value type, enter value, and optionally set TTL.
  5. Save the changes.

Update and Delete KV Pairs

Use the Edit button next to each KV pair to modify or delete it.

Code Management

Create a KV Pair in a Flow

Use the ai.mymagic.plugin.core.kv.Set task:

id: add_kv_pair
namespace: company.team

tasks:
  - id: set_kv
    type: ai.mymagic.plugin.core.kv.Set
    key: my_key
    value: "{{ outputs.download.uri }}"
    namespace: company.team
    overwrite: true
    ttl: P30D

Read KV Pairs with Pebble

Use the {{ kv('YOUR_KEY') }} Pebble function:

id: read_kv_pair
namespace: company.team
tasks:
  - id: log_key
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ kv('my_key') }}"

Read KV Pairs with Get Task

Use the ai.mymagic.plugin.core.kv.Get task:

id: get_kv_pair
namespace: company.team

tasks:
  - id: get
    type: ai.mymagic.plugin.core.kv.Get
    key: my_key
    namespace: company.team
    errorOnMissing: false

Delete a KV Pair

Use the ai.mymagic.plugin.core.kv.Delete task:

id: delete_kv_pair
namespace: company.team

tasks:
  - id: kv
    type: ai.mymagic.plugin.core.kv.Delete
    key: my_key
    namespace: company.team
    errorOnMissing: false

API Management

Create a KV Pair

curl -X PUT -H "Content-Type: application/json" http://localhost:8080/api/v1/namespaces/company.team/kv/my_key -d '"Hello World"'

Read a KV Pair

curl -X GET -H "Content-Type: application/json" http://localhost:8080/api/v1/namespaces/company.team/kv/my_key

Delete a KV Pair

curl -X DELETE -H "Content-Type: application/json" http://localhost:8080/api/v1/namespaces/company.team/kv/my_key

Terraform Management

Create a KV Pair

resource "mymagic_kv" "my_key" {
  namespace = "company.team"
  key       = "my_key"
  value     = "Hello World"
  type      = "STRING"
}

Read a KV Pair

data "mymagic_kv" "new" {
  namespace = "company.team"
  key       = "my_key"
}

By leveraging these methods, you can effectively create, read, update, and delete KV pairs in MyMagic, enabling stateful workflows and data sharing across executions and workflows.

Pebble Templating

Dynamically render variables, inputs, and outputs using Pebble templating engine.

Overview

MyMagic uses Pebble, a Java templating engine inspired by Twig and similar to Python’s Jinja Template Engine, to dynamically render variables, inputs, and outputs within the execution context.

Reading Inputs

Access input values in your tasks using the inputs variable:

id: input_string
namespace: company.team

inputs:
  - id: name
    type: STRING

tasks:
  - id: say_hello
    type: ai.mymagic.plugin.core.log.Log
    message: "Hello 👋, my name is {{ inputs.name }}"

Reading Task Outputs

Access outputs from previous tasks using outputs.<task_name>.<output_name>:

id: input_string
namespace: company.team

inputs:
  - id: name
    type: STRING

tasks:
  - id: say_hello
    type: ai.mymagic.plugin.core.debug.Return
    format: "Hello 👋, my name is {{ inputs.name }}"

  - id: can_you_repeat
    type: ai.mymagic.plugin.core.log.Log
    message: '{{ outputs.say_hello.value }}'

TemplatedTask for Dynamic Rendering

Use the TemplatedTask to dynamically render all task properties:

id: templated_databricks_job
namespace: company.team

inputs:
  - id: host
    type: STRING
  - id: clusterId
    type: STRING
  # ... other inputs ...

tasks:
  - id: templated_spark_job
    type: ai.mymagic.plugin.core.templating.TemplatedTask
    spec: |
      type: ai.mymagic.plugin.databricks.job.CreateJob
      authentication:
        token: "{{ secret('DATABRICKS_API_TOKEN') }}"
      host: "{{ inputs.host }}"
      jobTasks:
        - existingClusterId: "{{ inputs.clusterId }}"
          # ... other properties ...

Date Formatting

Use the date filter for date formatting:

'{{ inputs.my_date | date("yyyyMMdd") }}'

Conditional Date Usage

Use the coalesce operator ?? to conditionally use trigger or execution date:

id: pebble_date_trigger
namespace: company.team

tasks:
  - id: return_date
    type: ai.mymagic.plugin.core.debug.Return
    format: '{{ trigger.date ?? execution.startDate | date("yyyy-MM-dd")}}'

triggers:
  - id: schedule
    type: ai.mymagic.plugin.core.trigger.Schedule
    cron: "* * * * *"

Parsing Objects & Lists with jq

Use jq to parse nested objects or lists:

id: object_example
namespace: company.team

inputs:
  - id: data
    type: JSON
    defaults: '{"value": [1, 2, 3]}'

tasks:
  - id: hello
    type: ai.mymagic.plugin.core.log.Log
    message: "{{ inputs.data.value | jq('.[1]') | first }}"

Using Conditions in Pebble

Apply conditions in tasks like If or Switch:

id: test-object
namespace: company.team

inputs:
  - id: data
    type: JSON
    defaults: '{"value": [1, 2, 3]}'

tasks:
  - id: if
    type: ai.mymagic.plugin.core.flow.If
    condition: '{{ inputs.data.value | jq(".[2]") | first == 3}}'
    then:
      - id: when_true
        type: ai.mymagic.plugin.core.log.Log
        message: 'Condition was true'
    else:
      - id: when_false
        type: ai.mymagic.plugin.core.log.Log
        message: 'Condition was false'

Best Practices

  1. Use the Debug Outputs feature in the UI to troubleshoot complex Pebble expressions.
  2. Leverage jq for parsing complex JSON structures.
  3. Use the coalesce operator ?? for handling both triggered and manual executions.
  4. Utilize the TemplatedTask for full dynamic rendering of task properties.

By mastering Pebble templating in MyMagic, you can create more dynamic and flexible workflows, easily adapting to various inputs and conditions.

Templates (Coming Soon)

Plugins (Coming Soon)

Upcoming Features

We plan to integrate LLM technology to enable natural language commands for workflow creation, offering an alternative to YAML configuration for users who prefer a more intuitive interface.

(More To Come)