Orchestration
Orchestrating AI Inference with MyMagic
Introduction
Our AI batch inference algorithm and inference orchestration platform is designed for companies specializing in competitive research, sentiment analysis, media monitoring, social listening, lead generation, and sales generation. This documentation outlines the features, benefits, and usage of our platform.
Quicskstart
When you are logged in, navigate to Flows in the left menu, then click the “Create” button and paste the following configuration to create your first flow with batch inference:
id: batch_inference
namespace: mymagic
description: Batch inference quickstart
labels:
env: dev
project: my-project
inputs:
- id: api_key
type: STRING
defaults: "<your api key>"
- id: model
type: STRING
defaults: "llama2_70b"
- id: system_prompt
type: STRING
defaults: "You are a helpful AI assistant"
- id: prompt
type: STRING
defaults: "Categorize it into one of these categories:"
- id: categories
type: ARRAY
itemType: STRING
defaults: ["category 1", "category 2", "category 3"]
- id: session
type: STRING
defaults: "my-session"
- id: max_tokens
type: INT
defaults: 10
- id: role_arn
type: STRING
defaults: "<arn role created during aws setup>"
- id: region
type: STRING
defaults: "<your s3 bucket region>"
- id: input_json_file
type: STRING
defaults: "<your input json file name>"
- id: bucket_name
type: STRING
defaults: "<your s3 bucket name>"
- id: aws_access_key
type: STRING
defaults: "<your aws access key>"
- id: aws_secret_key
type: STRING
defaults: "<your aws secret key>"
tasks:
- id: Categorization
type: ai.mymagic.plugin.core.flow.Sequential
tasks:
- id: Categorize
type: ai.mymagic.plugin.core.http.Request
uri: https://fastapi.mymagic.ai/v1/completions
method: POST
headers:
Content-Type: application/json
Authorization: Bearer {{ inputs.api_key }}
contentType: application/json
body: |-
{
"model": "{{ inputs.model }}",
"system_prompt": "{{ inputs.system_prompt }}",
"question": "'{{ inputs.prompt }}' '{{ inputs.categories | join("', '") }}'",
"storage_provider": "s3",
"bucket_name": "{{ inputs.bucket_name }}",
"session": "{{ inputs.session }}",
"max_tokens": {{ inputs.max_tokens }},
"role_arn": "{{ inputs.role_arn }}",
"region": "{{ inputs.region }}",
"input_json_file": "{{ inputs.input_json_file }}"
}
- id: Run_Job
type: ai.mymagic.plugin.scripts.shell.Script
taskRunner:
type: ai.mymagic.plugin.core.runner.Process
script: |
START_TIME=$(date +%s)
MAX_DURATION=3600 # 1 hour in seconds
while true; do
CURRENT_TIME=$(date +%s)
ELAPSED_TIME=$((CURRENT_TIME - START_TIME))
if [ $ELAPSED_TIME -ge $MAX_DURATION ]; then
echo "Timeout reached. Exiting."
exit 1
fi
RESPONSE=$(curl -s "https://fastapi.mymagic.ai/get_result/{{ json(outputs.Categorize.body).task_id }}")
STATUS=$(echo "$RESPONSE" | grep -o '"status":"[^"]*"' | cut -d'"' -f4)
echo "Current status: $STATUS"
if [ "$STATUS" = "SUCCESS" ]; then
echo "Job completed successfully."
exit 0
elif [ "$STATUS" = "FAILED" ]; then
echo "Job failed."
exit 2
else
echo "Job still in progress. Waiting..."
sleep 20
fi
done
- id: Read_Output
type: ai.mymagic.plugin.scripts.shell.Script
taskRunner:
type: ai.mymagic.plugin.core.runner.Process
env:
AWS_CONFIG_FILE: "aws_config"
AWS_SHARED_CREDENTIALS_FILE: "aws_credentials"
inputFiles:
aws_config: |
[profile company-role]
role_arn = {{ inputs.role_arn }}
source_profile = default
aws_credentials: |
[default]
aws_access_key_id = {{ inputs.aws_access_key }}
aws_secret_access_key = {{ inputs.aws_secret_key }}
outputFiles:
- latest_file.json
script: |
#!/bin/bash
set -e
aws configure list
aws s3 ls s3://{{ inputs.bucket_name }}/{{ inputs.api_key }}/{{ inputs.session }}/ --profile company-role
LATEST_FILE=$(aws s3 ls s3://{{ inputs.bucket_name }}/{{ inputs.api_key }}/{{ inputs.session }}/ --profile company-role | sort -r | head -n 1 | awk '{print $4}')
if [ -z "$LATEST_FILE" ]; then
echo "No file found."
exit 1
fi
aws s3 cp s3://{{ inputs.bucket_name }}/{{ inputs.api_key }}/{{ inputs.session }}/$LATEST_FILE ./latest_file.json --profile company-role
Key Features and Benefits
1. Workflow Creation and Execution
- Create and run workflows using a simple, declarative configuration in the embedded code editor.
- User-friendly interface for creating, editing, and running workflows without local development setup.
2. Error Detection and Debugging
- Quickly discover and troubleshoot errors in your workflows.
- Real-time monitoring of workflow and individual task progress.
- Full-text search of workflow execution logs for efficient troubleshooting.
3. Language Flexibility
- Code in any language using our plugin system.
- Support for Java-based plugins and scripts in Python, R, Julia, Ruby, Shell, PowerShell, and Node.js.
4. Event-Driven Processing
- File detection triggers for processing new files as they arrive.
- Automate workflow executions based on real-time events such as database updates, file uploads, or message queue activities.
5. Modular Design
- Define default task or plugin configurations.
- Use subflows to eliminate boilerplate code and improve maintainability.
6. Data Validation and Type Safety
- Ensure incoming and outgoing data adheres to schema and quality constraints.
- Declare and enforce data types for inputs and outputs, with automatic error raising for mismatches.
7. Flexible Deployment Options
- Deploy serverlessly or on hybrid cloud infrastructure.
- Support for on-premise installations, cloud-based VMs, and Kubernetes clusters.
8. Performance Monitoring
- Built-in dashboards and metrics accessible directly from the user interface.
- (Coming soon) Configurable alerts for workflow completion, delays, or failures.
9. Data Privacy and Transfer
- In-built internal storage for secure data transfer between tasks.
10. Cost-Effective LLM Usage
- Access to open-source LLMs for batch jobs, offering up to 50% cost savings compared to alternatives like TogetherAI or FireworksAI.
Workflow Design and Management
YAML Configuration
- Simple YAML interface for defining workflows.
- Built-in syntax validation to catch configuration errors early.
- Suitable for both AI developers/scientists and domain experts.
Visual Workflow Management
- Live topology view to visualize dependencies between processes, systems, and tasks.
- Easy configuration of sequential, parallel, or conditional task execution.
- Simplified management of task dependencies through intuitive ordering.
Automated Execution
- Schedule workflows to run automatically at specific intervals (daily, weekly, monthly).
- Event-based triggering for real-time responsiveness to data changes or system events.
Flow
Table of Contents
- Flow Essentials
- Flow Example
- Plugin Configuration Defaults
- Dynamic Variables
- Task Sequence
- Flow Activation Control
- Task Types
- Categorization with Labels
- Input Parameters
- Task and Flow Outputs
- Version Management
- Event-Based Initiation
- Dynamic Flow Information
- Legacy Features
- Common Questions
A flow serves as a container for tasks and their orchestration logic, defining the execution order and methods.
Flow Essentials
A flow is a comprehensive container that manages tasks, their inputs, outputs, error handling, and overall orchestration logic. It specifies how tasks are executed, whether sequentially, in parallel, or based on the status of upstream tasks and their dependencies.
Flows are defined declaratively using YAML files.
Essential flow components include:
- A unique identifier (id)
- A namespace
- A list of tasks to be executed
Optional flow components can include:
- Input and output specifications
- Dynamic variables
- Event triggers
- Descriptive labels
- Default plugin settings
- Error handling strategies
- Retry mechanisms
- Timeout settings
- Concurrency controls
- Descriptive text
- Activation status
- Version number
Flow Example
Here’s an illustrative flow definition that showcases various features:
id: greetings
namespace: organization.department
description: A sample flow **demonstrating** various *features*
labels:
environment: production
team: data-science
inputs:
- id: user-name
type: STRING
required: false
defaults: "Guest"
description: Optional name input for personalized greeting
variables:
greeting: "Hello"
message: "{{vars.greeting}}, {{inputs.user-name}}!"
tasks:
- id: display-greeting
type: ai.mymagic.plugin.core.debug.Return
description: "Task to **output** the greeting *message*"
format: "{{vars.message}} The current time is {{taskrun.startDate}}"
pluginDefaults:
- type: ai.mymagic.plugin.core.log.Log
values:
level: INFO
Plugin Configuration Defaults
The pluginDefaults
section allows you to set default properties for specific task types within your flow. This feature helps reduce repetition when using the same task type multiple times with similar configurations.
Dynamic Variables
Flow variables can be defined to store values accessible by all tasks using the syntax {{ vars.key }}
. These variables are stored as key-value pairs and can be used to share information across different parts of the flow.
Task Sequence
The core of a flow is its task list, which defines the sequence of operations to be executed when the flow runs.
Flow Activation Control
By default, all flows are active and will execute based on their defined triggers or when manually initiated. However, you have the option to deactivate a flow, which can be useful for troubleshooting or maintenance purposes.
Task Types
Tasks in MyMagic fall into two main categories:
-
Computational Tasks: These handle resource-intensive operations like file system interactions, API calls, or database queries. They are managed by worker nodes to ensure efficient resource utilization.
-
Control Flow Tasks: These manage the flow’s logic, such as branching, grouping, or parallel processing. They are lightweight and handled by the executor, capable of frequent calls without significant computational overhead.
Categorization with Labels
Labels are key-value pairs that can be attached to flows for organizational purposes. They can be used to filter and sort flows in the user interface, making it easier to manage large numbers of flows.
Input Parameters
Inputs are strongly-typed parameters that can be passed to a flow at runtime. They can be optional or required, and can have default values. Input validation rules ensure data integrity before flow execution.
File-type inputs are automatically uploaded to MyMagic’s internal storage and made available to all tasks within the flow.
Task and Flow Outputs
Tasks and flows can produce outputs with multiple properties. These outputs are documented in the plugin specifications and can be accessed by subsequent tasks using expressions.
Certain output types are automatically stored in MyMagic’s internal storage and made available to all tasks. File outputs can be downloaded directly from the user interface.
Version Management
Each modification to a flow creates a new revision, incrementing the version number. MyMagic maintains a complete history of all flow revisions, providing built-in version control functionality.
Event-Based Initiation
Triggers allow flows to be started based on external events, such as scheduled times, webhooks, file creation, or messages in a message broker.
Dynamic Flow Information
Flows have access to various dynamic expressions that provide runtime information. For example:
Expression | Description |
---|---|
{{ flow.id }} | The flow’s unique identifier |
{{ flow.namespace }} | The flow’s namespace |
{{ flow.tenantId }} | The tenant identifier (Coming Soon) |
{{ flow.revision }} | The current flow revision number |
For a complete list of available expressions, please refer to our documentation.
Common Questions
Flow Storage in MyMagic
MyMagic stores flows in a serialized format within its backend database. (Coming soon) New flows can be added through the MyMagic user interface, or automatically via Git synchronization or CI/CD pipelines.
To visualize the file structure of flows, you can explore the _flows
directory in the Namespace Files editor.
Tasks
Table of Contents
Tasks represent the individual steps within a flow. They are capable of accepting inputs and variables from the flow, and generating outputs for subsequent use by end users and other tasks.
Flowable Tasks
MyMagic orchestrates your flows using Flowable Tasks. These tasks don’t perform heavy computations but instead manage the flow’s orchestration behavior, enabling more sophisticated workflow patterns. Flowable tasks control the orchestration logic — run tasks or subflows in parallel, create loops and conditional branching.
Overview
- Flowable Tasks don’t run heavy operations — those are handled by workers.
- They are used for branching, grouping, running tasks in parallel, and more.
- Flowable Tasks use expressions from the execution context to define the next tasks to run.
Types of Flowable Tasks
Sequential
Processes tasks one after another sequentially. Used to group tasks.
id: sequential
namespace: company.team
tasks:
- id: sequential
type: ai.mymagic.plugin.core.flow.Sequential
tasks:
- id: 1st
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
- id: 2nd
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.id}}"
- id: last
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
You can access the output of a sibling task using the syntax {{outputs.sibling.value}}
.
Parallel
Processes tasks in parallel. Convenient for processing many tasks at once.
id: parallel
namespace: company.team
tasks:
- id: parallel
type: ai.mymagic.plugin.core.flow.Parallel
tasks:
- id: 1st
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
- id: 2nd
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.id}}"
- id: last
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
You cannot access the output of a sibling task as tasks will be run in parallel.
Switch
Processes a set of tasks conditionally depending on a contextual variable’s value.
id: switch
namespace: company.team
inputs:
- id: param
type: BOOLEAN
tasks:
- id: decision
type: ai.mymagic.plugin.core.flow.Switch
value: "{{ inputs.param }}"
cases:
true:
- id: is_true
type: ai.mymagic.plugin.core.log.Log
message: "This is true"
false:
- id: is_false
type: ai.mymagic.plugin.core.log.Log
message: "This is false"
If
Processes a set of tasks conditionally depending on a condition. The else branch is optional.
id: if-condition
namespace: company.team
inputs:
- id: param
type: BOOLEAN
tasks:
- id: if
type: ai.mymagic.plugin.core.flow.If
condition: "{{ inputs.param }}"
then:
- id: when-true
type: ai.mymagic.plugin.core.log.Log
message: "This is true"
else:
- id: when-false
type: ai.mymagic.plugin.core.log.Log
message: "This is false"
EachSequential
Generates many tasks at runtime depending on the value of a variable. Each subtask will run after the others sequentially.
id: each_example
namespace: company.team
tasks:
- id: each
type: ai.mymagic.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
tasks:
- id: 1st
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"
- id: 2nd
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.value}} > {{taskrun.startDate}}"
- id: last
type: ai.mymagic.plugin.core.debug.Return
format: "{{task.id}} > {{taskrun.startDate}}"
You can access the output of a sibling task using the syntax {{outputs.sibling[taskrun.value].value}}
.
EachParallel
Same as EachSequential, but each subtask will run in parallel.
ForEachItem
Allows you to iterate over a list of items and run a subflow for each item or batch of items.
AllowFailure
Allows child tasks to fail without stopping the execution.
Fail
Fails the flow; can be used with or without conditions.
Subflow
Triggers another flow, allowing decoupling and individual monitoring.
WorkingDirectory
Runs all nested tasks in the same working directory, allowing reuse of previous tasks’ output files.
Pause
Adds manual validation or wait time before continuing execution.
DAG (Directed Acyclic Graph)
Defines dependencies between tasks using a directed acyclic graph structure.
Best Practices
- Use Sequential for tasks that must run in a specific order.
- Leverage Parallel for independent tasks to improve execution speed.
- Utilize Switch and If for conditional logic in your workflows.
- EachSequential and EachParallel are powerful for dynamic task generation.
- Use WorkingDirectory for compute-intensive file system operations.
- Implement Pause for manual validations or timed delays in your workflow.
- Leverage DAG for complex task dependencies.
By effectively using these Flowable Tasks, you can create sophisticated, efficient, and flexible workflows in MyMagic.
Runnable tasks
In MyMagic, most data processing workloads are executed using Runnable tasks.
Unlike Flowable Tasks, Runnable tasks are responsible for performing the actual work. This includes operations such as file system interactions, API calls, database queries, etc. These tasks can be computationally intensive and are managed by worker nodes.
Examples of Runnable tasks include:
ai.mymagic.plugin.scripts.python.Commands
ai.mymagic.plugin.core.http.Request
ai.mymagic.plugin.notifications.slack.SlackExecution
Key Characteristics
- Each task must have an identifier (id) and a type.
- The type is the task’s Java Fully Qualified Class Name (FQCN).
- Tasks have properties specific to their type; check each task’s documentation for available properties.
- Most available tasks are Runnable Tasks, except for special Flowable Tasks.
Availability
By default, MyMagic only includes a few Runnable Tasks. However, many are available as plugins, and if you use the default Docker image, many will already be included.
Example
In this example, we have 2 Runnable Tasks: one which makes an HTTP request and another that logs the output of that request.
id: runnable_http
namespace: company.team
tasks:
- id: make_request
type: ai.mymagic.plugin.core.http.Request
uri: https://reqres.in/api/products
method: GET
contentType: application/json
- id: print_status
type: ai.mymagic.plugin.core.log.Log
message: "{{ outputs.make_request.body }}"
Common Types of Runnable Tasks
- File System Operations: Tasks for reading, writing, and manipulating files.
- API Calls: Tasks for making HTTP requests to external services.
- Database Queries: Tasks for interacting with various database systems.
- Data Transformation: Tasks for processing and transforming data.
- Shell Commands: Tasks for executing shell scripts or commands.
Best Practices
- Error Handling: Implement proper error handling for Runnable Tasks, especially for external operations like API calls or database queries.
- Resource Management: Be mindful of resource usage, particularly for compute-intensive tasks.
- Idempotency: When possible, design tasks to be idempotent to handle retries and failures gracefully.
- Logging: Use appropriate logging levels to aid in debugging and monitoring.
- Timeouts: Set reasonable timeouts for tasks that interact with external systems.
Considerations
- Runnable Tasks are executed by workers, which can be distributed across multiple machines for scalability.
- The performance of Runnable Tasks can impact the overall execution time of your workflow.
- Some Runnable Tasks may require specific configurations or dependencies, so check the documentation for each task type.
By effectively utilizing Runnable Tasks, you can create powerful and flexible data processing workflows in MyMagic, capable of handling a wide range of operations from simple file manipulations to complex API integrations and data transformations.
Essential Task Attributes
The following table outlines the core attributes available to all tasks:
Attribute | Description |
---|---|
id | A unique identifier for the task within a flow |
type | The Java Fully Qualified Class Name of the task |
description | A brief explanation of the task’s purpose |
retry | Specifications for task retry behavior |
timeout | Maximum task duration, expressed in ISO 8601 format |
disabled | When set to true , the task will not be executed |
logLevel | Defines the granularity of logs to be stored in the backend database |
allowFailure | When set to true , allows the flow to continue even if this task fails |
Configurable vs. Fixed Task Properties
Task properties can be either configurable or fixed. Configurable properties can be set using expressions, allowing for dynamic values.
Some task properties are marked as “not configurable” because they serve as containers for other configurable properties. For instance, consider the runTasks
property of the Databricks SubmitRun
task. This property isn’t configurable because it’s an array of RunSubmitTaskSetting
objects.
Furthermore, RunSubmitTaskSetting
is a group of other properties that are either configurable or complex types themselves (serving as containers for other properties). It’s therefore important to examine properties at their most granular level — most properties at this level are configurable and can be templated using expressions.
Namespaces
Namespaces serve as logical groupings for flows, akin to folders in a file system. They play a crucial role in organizing workflows and managing access to secrets, plugin defaults, and variables.
Nested Namespace Structure
Utilizing the dot (.
) symbol, you can create a hierarchical structure for your namespaces. This allows for logical separation of environments, projects, teams, and departments. For instance, product, engineering, marketing, finance, and data teams can all utilize the same MyMagic instance while maintaining organized and separate flows. Various stakeholders can have their own child namespaces nested within a parent namespace, grouped by environment, project, or team.
Namespace Naming Conventions
A namespace name can consist of alphanumeric characters, optionally separated by dots (.
). The depth of the namespace hierarchy is unlimited. Here are some examples:
project_alpha
corporation.project_beta
corporation.division.project_gamma
Organizing Flows and Files with Namespaces
When creating a flow, you can assign it to a specific namespace:
id: greetings
namespace: corporation.division
tasks:
- id: log_message
type: ai.mymagic.plugin.core.log.Log
message: Hello from {{ flow.namespace }}
Note: Once a flow is saved, its namespace cannot be changed. To move a flow to a different namespace, you’ll need to create a new flow.
This namespace assignment provides improved organization and filtering capabilities. Additionally, you can organize your code at the namespace level using the embedded Code editor and Namespace Files, with (Coming Soon) the option to synchronize these files from Git.
Namespace Interface
You can access detailed information about any namespace by clicking on its name or the details button to the right of the namespace.
Selecting the details button for a namespace opens the namespace overview page, which provides information about the executions of flows within that namespace.
The namespace overview page contains several tabs:
-
Overview: The default landing page for the namespace. It contains dashboards and summaries of the executions of different flows within the namespace.
-
Editor: An built-in editor for adding or modifying namespace files.
-
Flows: Displays all flows in the namespace, providing brief information about each flow, including the flow ID, labels, last execution date and status, and execution statistics. You can navigate to a specific flow’s page by selecting the details button on the right of the flow.
-
Dependencies: Shows the relationships between flows, such as dependencies through Subflows or Flow Triggers.
-
KV Store: Allows management of key-value pairs associated with this namespace. More details on KV Store can be found in the dedicated documentation.
Executions
An execution represents a single run of a flow in a specific state. This section covers how to run your flows and interpret the results.
Task Run
A task run is an individual execution of a task within a flow execution. Each task run is associated with:
- Execution ID
- State
- Start Date
- End Date
- Attempts
Most task runs have a single attempt, but you can configure retries. If retries are set up, a task failure will generate new attempts until reaching the retry maxAttempt
or maxDuration
threshold.
Outputs
Tasks can generate output data usable by other tasks in the current flow execution. These outputs can be variables or files stored in MyMagic’s internal storage. Output details are available in each task’s documentation and can be viewed in the Outputs tab of the Execution page.
Metrics
Tasks can expose metrics useful for understanding their internal workings, such as file size, number of returned rows, or query duration. Available metrics for a task type are listed in its documentation and can be viewed in the Metrics tab of the Executions page.
Here’s an example of a flow generating metrics:
id: data_load_to_bigquery
namespace: company.team
tasks:
- id: http_download
type: ai.mymagic.plugin.core.http.Download
uri: https://huggingface.co/datasets/mymagic/datasets/raw/main/csv/orders.csv
- id: load_biqquery
type: ai.mymagic.plugin.gcp.bigquery.Load
description: Load data into BigQuery
autodetect: true
csvOptions:
fieldDelimiter: ","
destinationTable: mymagic-dev.demo.orders
format: CSV
from: "{{ outputs.http_download.uri }}"
Execution States
An Execution or a Task Run can be in various states:
State | Description |
---|---|
CREATED | Waiting to be processed, usually in a queue |
RUNNING | Currently being processed |
PAUSED | Paused for manual validation or a specified delay |
SUCCESS | Completed successfully |
WARNING | Exhibited unintended behavior but continued with a warning |
FAILED | Failed due to unintended behavior |
KILLING | In the process of being terminated |
KILLED | Terminated upon request |
RESTARTED | Transitive state for a previously failed, restarted flow |
CANCELLED | Aborted due to reaching a defined concurrency limit set to CANCEL |
QUEUED | On hold due to reaching a defined concurrency limit set to QUEUE |
RETRYING | Currently being retried |
RETRIED | Stopped due to unintended behavior and created a new execution based on flow-level retry policy |
Execution Expressions
You can use various execution expressions in your flow:
Parameter | Description |
---|---|
{{ execution.id }} | Unique ID for each execution |
{{ execution.startDate }} | Start date of the current execution |
{{ execution.originalId }} | Original execution ID, unchanged even after replays |
Executing a Flow
From the UI
Manually trigger a flow by clicking the Execute button on the flow’s page in the MyMagic UI.
Using Automatic Triggers
- Add a Schedule trigger for regular time-based executions
- Use a Flow trigger to launch based on another flow’s completion
- Implement a Webhook trigger for HTTP request-based executions
Via API Call
Trigger a flow execution by calling the API directly:
curl -X POST \
http://localhost:8080/api/v1/executions/company.team/hello_world
You can also execute a specific revision or include inputs:
curl -X POST \
http://localhost:8080/api/v1/executions/company.team/hello_world?revision=2 \
-F greeting="hey there"
Using Python
Use the mymagic
Python package to execute flows:
from mymagic import Flow
flow = Flow()
flow.execute('company.team', 'hello_world', {'greeting': 'hello from Python'})
ForEachItem Execution
The ForEachItem task allows iteration over a list of items, running a subflow for each item or batch:
id: each_example
namespace: company.team
tasks:
- id: each
type: ai.mymagic.plugin.core.flow.ForEachItem
items: "{{ inputs.file }}"
inputs:
file: "{{ taskrun.items }}"
batch:
rows: 4
bytes: "1024"
partitions: 2
namespace: company.team
flowId: subflow
revision: 1
wait: true
transmitFailed: true
labels:
key: value
This powerful feature enables parallel processing of large datasets or API payloads efficiently.
Variables
Variables are key-value pairs that facilitate value reuse across tasks. They can also be stored at the namespace level for use across multiple flows within a given namespace.
Configuring Variables
Here’s an example of how to configure variables in your flow:
id: greetings
namespace: company.team
variables:
myvar: hello
numeric_variable: 42
tasks:
- id: log
type: ai.mymagic.plugin.core.debug.Return
format: "{{ vars.myvar }} world {{ vars.numeric_variable }}"
To use variables, employ the syntax {{ vars.variable_name }}
.
Variable Rendering
Variables can be used in any task property documented as dynamic. They are rendered using the Pebble templating engine, which allows processing of various expressions with filters and functions. For more details on variable processing, refer to the Expressions documentation.
Variables are not rendered recursively.
Frequently Asked Questions
Escaping Pebble Syntax
To prevent a block from being parsed by Pebble, use the {% raw %}
and {% endraw %}
tags:
{% raw %}{{ myvar }}{% endraw %}
Resolution Order of Inputs and Variables
- Inputs are resolved first, even before execution starts.
- Triggers are handled similarly to inputs.
- Expressions are rendered recursively.
You can use inputs within variables, but not vice versa. Trigger variables can be used within variables.
Examples
Using Inputs, Triggers, and Execution Variables
id: s3_upload
namespace: company.team
inputs:
- id: bucket
type: STRING
defaults: declarative-data-orchestration
tasks:
- id: get_zip_file
type: ai.mymagic.plugin.core.http.Download
uri: https://wri-dataportal-prod.s3.amazonaws.com/manual/global_power_plant_database_v_1_3.zip
- id: unzip
type: ai.mymagic.plugin.compress.ArchiveDecompress
algorithm: ZIP
from: "{{outputs.get_zip_file.uri}}"
- id: csv_upload
type: ai.mymagic.plugin.aws.s3.Upload
from: "{{ outputs.unzip.files['global_power_plant_database.csv'] }}"
bucket: "{{ inputs.bucket }}"
key: "powerplant/{{ trigger.date ?? execution.startDate | date('yyyy_MM_dd__HH_mm_ss') }}.csv"
triggers:
- id: hourly
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "@hourly"
Conditional Branching Based on Input
id: conditional_branching
namespace: company.team
inputs:
- id: parameter
type: STRING
required: false
tasks:
- id: if
type: ai.mymagic.plugin.core.flow.If
condition: "{{inputs.customInput ?? false }}"
then:
- id: if_not_null
type: ai.mymagic.plugin.core.log.Log
message: Received input {{inputs.parameter}}
else:
- id: if_null
type: ai.mymagic.plugin.core.log.Log
message: No input provided
Using Trigger Variables Within Triggers
id: backfill_past_mondays
namespace: company.team
tasks:
- id: log_trigger_or_execution_date
type: ai.mymagic.plugin.core.log.Log
message: "{{ trigger.date ?? execution.startDate }}"
triggers:
- id: first_monday_of_the_month
type: ai.mymagic.plugin.core.trigger.Schedule
timezone: Europe/Berlin
backfill:
start: 2023-11-11T00:00:00Z
cron: "0 11 * * MON"
conditions:
- type: ai.mymagic.plugin.core.condition.DayWeekInMonthCondition
date: "{{ trigger.date }}"
dayOfWeek: "MONDAY"
dayInMonth: "FIRST"
Transforming Variables with Pebble Expressions
MyMagic uses Pebble Templates along with the execution context to render dynamic properties. This allows for the use of Pebble expressions to transform inputs and variables.
Example:
id: variables_demo
namespace: company.team
variables:
DATE_FORMAT: "yyyy-MM-dd"
tasks:
- id: seconds_of_day
type: ai.mymagic.plugin.core.debug.Return
format: '{{60 * 60 * 24}}'
- id: start_date
type: ai.mymagic.plugin.core.debug.Return
format: "{{ execution.startDate | date(vars.DATE_FORMAT) }}"
- id: curr_date_unix
type: ai.mymagic.plugin.core.debug.Return
format: "{{ now() | date(vars.DATE_FORMAT) | timestamp() }}"
- id: next_date
type: ai.mymagic.plugin.core.debug.Return
format: "{{ now() | dateAdd(1, 'DAYS') | date(vars.DATE_FORMAT) }}"
- id: next_date_unix
type: ai.mymagic.plugin.core.debug.Return
format: "{{ now() | dateAdd(1, 'DAYS') | date(vars.DATE_FORMAT) | timestamp() }}"
- id: pass_downstream
type: ai.mymagic.plugin.scripts.shell.Commands
taskRunner:
type: ai.mymagic.plugin.core.runner.Process
commands:
- echo {{outputs.next_date_unix.value}}
Using Nested Variables
Nested variables are supported, but may require wrapping the root variable in a json()
function to access specific keys, depending on the task.
Example using a list of maps as a variable:
id: vars
namespace: company.myteam
variables:
servers:
- fqn: server01.mydomain.io
user: root
- fqn: server02.mydomain.io
user: guest
- fqn: server03.mydomain.io
user: rick
tasks:
- id: parallel
type: ai.mymagic.plugin.core.flow.EachParallel
value: "{{ vars.servers }}"
tasks:
- id: log
type: ai.mymagic.plugin.core.log.Log
message:
- "{{ taskrun.value }}"
- "{{ json(taskrun.value).fqn }}"
- "{{ json(taskrun.value).user }}"
Inputs
Inputs are dynamic values passed to a flow at runtime, allowing for parameterized executions.
Understanding Inputs
Flows can be parameterized using inputs, enabling multiple executions with different values. Input values are stored as variables within the flow execution context and can be accessed using the syntax {{ inputs.parameter_name }}
.
Inputs make tasks more dynamic, such as defining file paths for processing. Input values can be inspected in the Overview tab of the Execution page.
Declaring Inputs
Flows can have multiple inputs, either required or optional. For required inputs, it’s recommended to use the defaults
property to set default values. Flows cannot start if required inputs are missing during Execution creation.
All inputs are parsed during execution creation, with invalid inputs preventing the execution from starting. If an execution isn’t created due to invalid or missing inputs, it won’t appear in the execution list.
Example flow using various inputs:
id: inputs_demo
namespace: company.team
inputs:
- id: string_input
type: STRING
defaults: "Hello MyMagic!"
- id: optional_input
type: STRING
required: false
- id: int_input
type: INT
defaults: 100
- id: float_input
type: FLOAT
defaults: 100.12
- id: bool_input
type: BOOLEAN
defaults: true
- id: enum_input
type: ENUM
defaults: VALUE_1
values:
- VALUE_1
- VALUE_2
- VALUE_3
- id: datetime_input
type: DATETIME
defaults: "2013-08-09T14:19:00Z"
- id: file_input
type: FILE
- id: json_input
type: JSON
defaults: |
[{"name": "mymagic", "rating": "best in class"}]
- id: uri_input
type: URI
defaults: "https://example.com/datasets/orders.csv"
- id: secret_input
type: SECRET
- id: nested.string_input
type: STRING
defaults: "Hello Nested World!"
Note: FILE type inputs don’t currently support defaults.
Input Types and Validation
MyMagic inputs are strongly typed and validated before flow execution. Supported data types include STRING, INT, FLOAT, ENUM, BOOLEAN, DATETIME, DATE, TIME, DURATION, FILE, JSON, URI, SECRET, and ARRAY.
Additional validation rules can be applied to certain input types:
- STRING: Custom regex validation
- INT/FLOAT: Minimum and maximum value ranges
- DURATION/DATE/TIME/DATETIME: Minimum and maximum ranges
Example flow with input validation:
id: validated_inputs
namespace: company.team
inputs:
- id: age
type: INT
defaults: 42
required: false
min: 18
max: 64
- id: username
type: STRING
defaults: student
required: false
validator: ^student(\d+)?$
- id: duration
type: DURATION
min: "PT5M6S"
max: "PT12H58M46S"
tasks:
- id: validator
type: ai.mymagic.plugin.core.log.Log
message: User {{ inputs.username }}, age {{ inputs.age }}
Nested and Array Inputs
Nested inputs can be created using a dot (.) in the input name. Array inputs allow passing lists of values, with the itemType
property specifying the type of array items.
Using Input Values in Flows
Input values are accessible using {{ inputs.name }}
or {{ inputs['name'] }}
. For input IDs with special characters, use the bracket notation.
Setting Input Values
Input values can be set through the web UI, API calls, or programmatically in languages like Python or Java.
Example API call using curl:
curl -v "http://localhost:8080/api/v1/executions/example/mymagic-inputs" \
-H "Transfer-Encoding:chunked" \
-H "Content-Type:multipart/form-data" \
-F string="a string" \
-F int=1 \
-F float=1.255 \
-F instant="2023-12-24T23:00:00.000Z" \
-F "files=@/tmp/example.txt;filename=file"
Inputs vs Variables
While similar, inputs and variables serve different purposes:
- Variables are constant values defined before execution.
- Inputs can be set at execution time and are more suitable for changing values.
Dynamic Inputs
Inputs are not rendered recursively. To use dynamic expressions in inputs, use STRING type and the {{ render() }}
function:
id: dynamic_input_demo
namespace: company.team
inputs:
- id: date
type: STRING
defaults: "{{ now() }}"
tasks:
- id: print_date
type: ai.mymagic.plugin.core.log.Log
message: hello on {{ render(inputs.date) }}
This approach improves security by preventing arbitrary code execution within expressions.
Outputs
Outputs facilitate data transfer between tasks and flows within MyMagic.
Understanding Outputs
Workflow executions can generate outputs, which are stored in the flow’s execution context (in memory) and are accessible to all downstream tasks and flows. Outputs can have multiple attributes, as specified in each task’s documentation.
Important: Avoid using outputs for sensitive data (passwords, secrets, API tokens).
Using Outputs
Here’s an example of how to use the output of one task in another:
id: task_outputs_demo
namespace: company.team
tasks:
- id: produce_output
type: ai.mymagic.plugin.core.debug.Return
format: my output {{ execution.id }}
- id: use_output
type: ai.mymagic.plugin.core.log.Log
message: The previous task output is {{ outputs.produce_output.value }}
In this example, the first task produces an output, which is then used in the second task’s message property.
Internal Storage
MyMagic can store task data in its internal storage. If an output attribute is stored internally, it will contain a URI pointing to the stored file. Here’s an example:
id: output_sample
namespace: company.team
tasks:
- id: output_from_query
type: ai.mymagic.plugin.gcp.bigquery.Query
sql: |
SELECT * FROM `bigquery-public-data.wikipedia.pageviews_2023`
WHERE DATE(datehour) = current_date()
ORDER BY datehour desc, views desc
LIMIT 10
store: true
- id: write_to_csv
type: ai.mymagic.plugin.serdes.csv.IonToCsv
from: "{{ outputs.output_from_query.uri }}"
Dynamic Variables in Each Tasks
For dynamic flows using “Each” loops, you can access the current taskrun value with {{ taskrun.value }}
:
id: taskrun_value_demo
namespace: company.team
tasks:
- id: each
type: ai.mymagic.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
tasks:
- id: inner
type: ai.mymagic.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.value }} > {{ taskrun.startDate }}"
Looping Over JSON Objects
When looping over JSON objects, use the json
function to access properties:
id: loop_json_objects
namespace: company.team
tasks:
- id: each
type: ai.mymagic.plugin.core.flow.EachSequential
value:
- {"key": "my-key", "value": "my-value"}
- {"key": "my-complex", "value": {"sub": 1, "bool": true}}
tasks:
- id: inner
type: ai.mymagic.plugin.core.debug.Return
format: "{{ json({taskrun.value).key }} > {{ json({taskrun.value).value }}"
Accessing Specific Outputs in Dynamic Tasks
For dynamic tasks like EachSequential, you can access specific iteration outputs:
id: dynamic_output_demo
namespace: company.team
tasks:
- id: each
type: ai.mymagic.plugin.core.flow.EachSequential
value: ["s1", "s2", "s3"]
tasks:
- id: sub
type: ai.mymagic.plugin.core.debug.Return
format: "{{ task.id }} > {{ taskrun.value }} > {{ taskrun.startDate }}"
- id: use
type: ai.mymagic.plugin.core.debug.Return
format: "Previous task produced output: {{ outputs.sub.s1.value }}"
Sibling Task Outputs
Accessing outputs from sibling tasks depends on whether the task tree is static or dynamic:
id: sibling_outputs_demo
namespace: company.team
tasks:
- id: each
type: ai.mymagic.plugin.core.flow.EachSequential
value: ["value 1", "value 2", "value 3"]
tasks:
- id: first
type: ai.mymagic.plugin.core.debug.Return
format: "{{ task.id }}"
- id: second
type: ai.mymagic.plugin.core.debug.Return
format: "{{ outputs.first[taskrun.value].value }}"
- id: end
type: ai.mymagic.plugin.core.debug.Return
format: "{{ task.id }} > {{ outputs.second['value 1'].value }}"
For complex nested structures, use the currentEachOutput
function.
Flow Outputs
Flows can produce strongly typed outputs:
id: flow_outputs_demo
namespace: company.team
tasks:
- id: mytask
type: ai.mymagic.plugin.core.debug.Return
format: this is a task output used as a final flow output
outputs:
- id: final
type: STRING
value: "{{ outputs.mytask.value }}"
Accessing flow outputs in a parent flow:
id: parent_flow_demo
namespace: company.team
tasks:
- id: subflow
type: ai.mymagic.plugin.core.flow.Subflow
flowId: flow_outputs_demo
namespace: company.team
wait: true
- id: log_subflow_output
type: ai.mymagic.plugin.core.log.Log
message: "{{ outputs.subflow.outputs.final }}"
Output Preview and Debugging
MyMagic provides preview options for output files stored in its internal storage. You can view file contents in a tabular format without downloading.
The Debug Outputs functionality in the Outputs tab allows you to evaluate outputs further using Pebble expressions. For example:
id: json_values_demo
namespace: company.team
tasks:
- id: sample_json
type: ai.mymagic.plugin.core.debug.Return
format: '{"data": [1, 2, 3]}'
You can then use the Debug Outputs feature to analyze this JSON data with various expressions.
Triggers
Schedule Type
The Schedule trigger in MyMagic generates new executions on a regular cadence based on Cron expressions or custom scheduling conditions.
type: "ai.mymagic.plugin.core.trigger.Schedule"
MyMagic can trigger flows based on a Schedule (time-based). This is useful when you need to run flows at specific intervals or when you can’t use event-driven mechanisms.
MyMagic can optionally handle schedule backfills for missed executions. Refer to the Schedule task documentation for a complete list of task properties and outputs.
Example: Quarterly Schedule
A schedule that runs every quarter of an hour:
triggers:
- id: schedule
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "*/15 * * * *"
Example: Monthly Schedule
A schedule that runs only the first Monday of every month at 11 AM:
triggers:
- id: schedule
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "0 11 * * 1"
conditions:
- type: ai.mymagic.plugin.core.condition.DayWeekInMonthCondition
date: "{{ trigger.date }}"
dayOfWeek: "MONDAY"
dayInMonth: "FIRST"
Example: Daily Schedule in Specific Timezone
A schedule that runs daily at midnight US Eastern time:
triggers:
- id: daily
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "@daily"
timezone: America/New_York
Important Notes on Scheduling
- Schedules cannot overlap. If a previous schedule is still running when the next one is due to start, MyMagic will wait for the previous one to finish.
- To make your flow executable both via schedule and manually, use this expression:
{{ trigger.date ?? execution.startDate | date("yyyy-MM-dd") }}
.
Schedule Conditions
When cron expressions are insufficient, you can use conditions for more complex scheduling logic. Use the {{ trigger.date }}
expression on the date
property of the current schedule.
Available core conditions include:
- DateTimeBetweenCondition
- DayWeekCondition
- DayWeekInMonthCondition
- NotCondition
- OrCondition
- WeekendCondition
- PublicHolidayCondition
- TimeBetweenCondition
Example: Conditional Schedule
id: conditions_demo
namespace: company.team
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: This will execute only on Thursday!
triggers:
- id: schedule
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "@hourly"
conditions:
- type: ai.mymagic.plugin.core.condition.DayWeekCondition
dayOfWeek: "THURSDAY"
Recovering Missed Schedules
Automatic Recovery
MyMagic can automatically recover missed schedules. This behavior can be configured globally or per-flow.
Global configuration:
mymagic:
plugins:
configurations:
- type: ai.mymagic.plugin.core.trigger.Schedule
values:
recoverMissedSchedules: NONE # Options: ALL | NONE | LAST
Per-flow configuration:
triggers:
- id: schedule
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "*/15 * * * *"
recoverMissedSchedules: NONE
Using Backfill
Backfills allow you to replay missed schedule intervals between defined start and end dates. Access this feature from the Triggers tab on the Flow’s detail page.
Disabling Triggers
You can disable a trigger by adding disabled: true
to your YAML or by toggling it off on the Triggers page. This is useful when you need time to decide on scheduling changes before the next run.
Flow Type
Flow triggers in MyMagic allow you to initiate a flow after the execution of another flow, enabling event-driven patterns.
type: "ai.mymagic.plugin.core.trigger.Flow"
MyMagic can trigger one flow after another, allowing for the chaining of flows without modifying the base flows. This feature enables breaking down responsibilities between different flows and teams.
For a comprehensive list of all properties, refer to the Flow trigger documentation.
Conditions
You can specify conditions to determine when your Flow should be executed. In addition to the core trigger conditions, you can use the following:
- ExecutionFlowCondition
- ExecutionNamespaceCondition
- ExecutionLabelsCondition
- ExecutionStatusCondition
- ExecutionOutputsCondition
- ExpressionCondition
Example: Basic Flow Trigger
This flow will be triggered after each successful execution of the flow ai.mymagic.tests.trigger-flow
and forward the uri
output of the my-task
task.
id: trigger_flow_listener
namespace: company.team
inputs:
- id: fromParent
type: STRING
tasks:
- id: onlyNoInput
type: ai.mymagic.plugin.core.debug.Return
format: "v1: {{ trigger.executionId }}"
triggers:
- id: listenFlow
type: ai.mymagic.plugin.core.trigger.Flow
inputs:
fromParent: '{{ outputs.myTask.uri }}'
conditions:
- type: ai.mymagic.plugin.core.condition.ExecutionFlowCondition
namespace: company.team
flowId: trigger_flow
- type: ai.mymagic.plugin.core.condition.ExecutionStatusCondition
in:
- SUCCESS
Parent flow:
id: trigger_flow
namespace: company.team
tasks:
- id: myTask
type: ai.mymagic.plugin.core.http.Download
uri: https://dummyjson.com/products
Example: Multiple Condition Trigger
This flow will be triggered after the successful execution of both flows flow-a
and flow-b
during the current day. When the conditions are met, the counter is reset and can be re-triggered during the same day.
id: trigger-multiplecondition-listener
namespace: company.team
tasks:
- id: onlyListener
type: ai.mymagic.plugin.core.debug.Return
format: "let's go "
triggers:
- id: multipleListenFlow
type: ai.mymagic.plugin.core.trigger.Flow
conditions:
- id: multiple
type: ai.mymagic.plugin.core.condition.MultipleCondition
window: P1D
windowAdvance: P0D
conditions:
flow-a:
type: ai.mymagic.plugin.core.condition.ExecutionFlowCondition
namespace: company.team
flowId: trigger-multiplecondition-flow-a
flow-b:
type: ai.mymagic.plugin.core.condition.ExecutionFlowCondition
namespace: company.team
flowId: trigger-multiplecondition-flow-b
To trigger trigger-multiplecondition-listener
, simply execute these two flows:
id: trigger-multiplecondition-flow-a
namespace: company.team
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: Trigger A
id: trigger-multiplecondition-flow-b
namespace: company.team
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: Trigger B
These examples demonstrate how to use Flow triggers in MyMagic to create complex, event-driven workflows that respond to the execution of other flows.
Webhook Type
Webhook triggers in MyMagic generate a unique URL that you can use to automatically create new executions based on events in external applications such as GitHub or Amazon EventBridge.
type: "ai.mymagic.plugin.core.trigger.Webhook"
How Webhook Triggers Work
A Webhook trigger allows you to initiate a flow from a webhook URL. When creating a trigger, you must set a secret key
that will be used to secure your webhook URL. This key is incorporated into the URL that triggers the flow:
/api/v1/executions/webhook/{namespace}/{flowId}/{key}
It’s recommended to use a complex, generated sequence of characters for the key to enhance security. MyMagic accepts GET
, POST
, and PUT
requests on this URL. The entire request body and headers will be available as variables within your flow.
Example Webhook Trigger
Here’s an example of how to set up a Webhook trigger in your flow:
id: trigger
namespace: company.team
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: "Hello World! 🚀"
triggers:
- id: webhook
type: ai.mymagic.plugin.core.trigger.Webhook
key: 4wjtkzwVGBM9yKnjm3yv8r
Executing the Webhook-Triggered Flow
After the trigger is created, you can execute the flow using the following URL structure:
https://{mymagic_domain}/api/v1/executions/webhook/{namespace}/{flowId}/4wjtkzwVGBM9yKnjm3yv8r
Make sure to replace mymagic_domain
, namespace
, and flowId
with your specific values.
Additional Information
For a comprehensive list of task properties and outputs related to Webhook triggers, please refer to the Webhook task documentation in MyMagic.
Security Considerations
When using Webhook triggers, keep the following security considerations in mind:
- Use a complex, randomly generated key to secure your webhook URL.
- Keep your webhook URL and key confidential, sharing them only with trusted systems and personnel.
- Consider implementing additional authentication mechanisms if your use case requires heightened security.
- Regularly rotate your webhook keys as part of your security best practices.
Use Cases
Webhook triggers are particularly useful for:
- Integrating with external services that support webhooks (e.g., GitHub, GitLab, Stripe).
- Creating event-driven workflows that respond to real-time events.
- Allowing external systems to initiate processes within your MyMagic workflows.
- Building automated pipelines that react to changes in other parts of your infrastructure.
By leveraging Webhook triggers, you can create highly responsive and interconnected workflows in MyMagic, enhancing your ability to automate complex processes across your entire technology stack.
Polling Type
Polling triggers in MyMagic are a type of trigger provided by plugins. They allow for the periodic checking of external systems for the presence of data. When data is ready to be processed, a flow execution is initiated.
Overview of Polling Triggers
MyMagic offers polling triggers for a wide variety of external systems, including:
- Databases
- Message brokers
- FTP servers
- And many more…
These triggers poll the external system at a fixed interval defined by the interval
property. When triggered, the flow will have access to the outputs of the polling trigger via the trigger
variable.
Example: PostgreSQL Polling Trigger
The following example demonstrates a flow that is triggered when rows are available in the my_table
PostgreSQL table. When triggered, it deletes the rows (to avoid reprocessing) and logs them.
id: jdbc-trigger
namespace: company.team
inputs:
- id: db_url
type: STRING
tasks:
- id: update
type: ai.mymagic.plugin.jdbc.postgresql.Query
url: "{{ inputs.db_url }}"
sql: DELETE * FROM my_table
- id: log
type: ai.mymagic.plugin.core.log.Log
message: "{{ trigger.rows }}"
triggers:
- id: watch
type: ai.mymagic.plugin.jdbc.postgresql.Trigger
url: myurl
interval: "PT5M"
sql: "SELECT * FROM my_table"
Key Components of the Example
-
Trigger Definition:
- Type:
ai.mymagic.plugin.jdbc.postgresql.Trigger
- Interval: Set to “PT5M” (poll every 5 minutes)
- SQL: Selects all rows from
my_table
- Type:
-
Tasks:
- Update task: Deletes processed rows from the table
- Log task: Logs the rows that were retrieved by the trigger
-
Data Flow:
- The trigger polls the database every 5 minutes
- If rows are found, a flow execution is started
- The flow deletes the rows and logs their content
Benefits of Polling Triggers
- Flexibility: Can integrate with various external systems
- Reliability: Ensures data is processed even if it arrives when MyMagic is offline
- Customization: Interval and query can be tailored to specific needs
- Decoupling: Allows for separation between data production and processing
Considerations
- Choose an appropriate polling interval to balance responsiveness and system load
- Ensure your processing logic handles duplicate data in case of failures
- Monitor the performance impact on the polled system, especially for frequent polls
Best Practices
- Use idempotent operations in your flow to handle potential reprocessing
- Implement error handling to manage issues with the external system
- Consider using transactional operations when removing or marking processed data
- Monitor the latency between data availability and processing to optimize your polling interval
By leveraging polling triggers, MyMagic enables you to create robust, event-driven workflows that can reliably process data from a wide range of external systems.
Realtime Type
While MyMagic triggers typically poll external systems for new events at regular intervals, business-critical workflows often require immediate reactions to events. This is where Realtime Triggers come into play, offering millisecond-level latency.
What are Realtime Triggers?
Realtime triggers in MyMagic listen to events in real time and instantaneously start a workflow execution when:
- A new message is published to a Kafka topic
- A new message is published to a Pulsar topic
- A new message is published to an AMQP queue
- A new message is published to an MQTT queue
- A new message is published to an AWS SQS queue
- A new message is published to Google Pub/Sub
- A new message is published to Azure Event Hubs
- A new message is published to a NATS subject
- A new item is added to a Redis list
- A new row is added, modified or deleted in Postgres, MySQL, or SQL Server
How Realtime Triggers Work
When you add a Realtime Trigger to your workflow, MyMagic initiates an always-on thread that continuously listens to the external system for new events. As soon as a new event occurs, MyMagic starts a workflow execution to process it.
Use Cases
Realtime Triggers enable the orchestration of business-critical processes and microservices in real time, suitable for scenarios such as:
- Fraud and anomaly detection
- Order processing
- Realtime predictions or recommendations
- Reacting to stock price changes
- Shipping and delivery notifications
- Change Data Capture for data orchestration
Triggers vs. Realtime Triggers: A Comparison
Criteria | Trigger | Realtime Trigger |
---|---|---|
Implementation | Micro-batch | Realtime |
Event Processing | Batch-process events until poll interval elapses | Process each event immediately |
Latency | Second(s) or minute(s) | Millisecond(s) |
Execution Model | Each execution processes one or many events | Each execution processes exactly one event |
Data Handling | Store all received events in a file | Store each event in raw format |
Output format | URI of a file in internal storage | Raw data of the event payload and related metadata |
Application | Data applications processing data in batch | Business-critical operations reacting to events in real time |
Use cases | Data orchestration for analytics and building data products | Process and microservice orchestration |
How to Use Realtime Triggers
To implement a Realtime Trigger, simply choose the RealtimeTrigger as the trigger type for your desired service. Here’s an example using the RealtimeTrigger to listen for new messages in an AWS SQS queue:
id: sqs
namespace: company.team
tasks:
- id: log
type: ai.mymagic.plugin.core.log.Log
message: "{{ trigger }}"
triggers:
- id: realtime_trigger
type: ai.mymagic.plugin.aws.sqs.RealtimeTrigger
region: eu-north-1
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID')}}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
queueUrl: https://sqs.eu-north-1.amazonaws.com/123456789/MyQueue
Comparison with Realtime Data Processing Engines
It’s important to note that MyMagic’s Realtime Triggers are not intended to replace real-time data processing engines like Apache Flink, Apache Beam, or Google Dataflow. These engines excel at stateful streaming applications and complex SQL transformations over real-time data streams.
In contrast, MyMagic’s Realtime Triggers are stateless, triggering one workflow execution per event. They are primarily designed to react to events in real time for orchestrating business-critical processes.
Best Practices
- Use Realtime Triggers for scenarios where immediate reaction to events is crucial.
- Ensure your workflows are designed to handle the potential high frequency of executions.
- Implement proper error handling and retry mechanisms in your workflows.
- Monitor the performance and resource usage of your Realtime Trigger workflows.
- Consider using batching techniques within your workflows if processing each event individually becomes too resource-intensive.
By leveraging Realtime Triggers, MyMagic enables you to create highly responsive, event-driven workflows that can handle critical business processes with minimal latency.
Labels
Labels are key-value pairs used to organize flows and executions in MyMagic. They provide a flexible way to categorize and search for your workflows based on various criteria such as project, maintainer, or any other relevant dimension.
The Purpose of Labels
Labels serve to organize and filter flows and their executions. By adding labels to your flows, you can sort executions across multiple dimensions.
Here’s a simple example of a flow with labels:
id: flow_with_labels
namespace: company.team
labels:
song: never_gonna_give_you_up
artist: rick_astley
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: hello from a flow with labels
Benefits of Labels
Labels offer several advantages in managing your workflows:
- Observability: Labels set during execution aid in monitoring and troubleshooting.
- Filtering: Labels facilitate finding specific executions, useful for tracking ML experiments, API responses, or labeling executions based on runtime-specific flow inputs.
- Organization: Labels help manage workflow executions at scale, especially in complex environments. Custom dashboards can be created based on labels, e.g.,
http://localhost:8080/ui/executions?labels=team:finance
.
Execution Labels
Propagation from Flow Labels
When you execute a flow with labels, these labels are automatically propagated to the created executions.
Setting Labels from the UI
- When manually executing flows, you can override and define new labels in the Advanced configuration section.
- Labels can be set from the UI after execution completion, useful for collaboration and troubleshooting.
- Labels can be set for multiple executions simultaneously, helpful for bulk operations.
Dynamic Label Setting
Labels can be set dynamically using a dedicated Labels task. This feature enhances observability, debugging, and failure monitoring.
Using a Map (Key-Value Pairs)
Ideal when the key is static and the value is dynamic:
id: labels_override
namespace: company.team
labels:
song: never_gonna_give_you_up
tasks:
- id: get
type: ai.mymagic.plugin.core.debug.Return
format: never_gonna_stop
- id: update_labels
type: ai.mymagic.plugin.core.execution.Labels
labels:
song: "{{ outputs.get.value }}"
artist: rick_astley # new label
Using a List of Key-Value Pairs
Useful when both key and value are dynamic:
id: labels
namespace: company.team
inputs:
- id: user
type: STRING
defaults: Rick Astley
- id: url
type: STRING
defaults: song_url
tasks:
- id: update_labels_with_map
type: ai.mymagic.plugin.core.execution.Labels
labels:
customerId: "{{ inputs.user }}"
- id: get
type: ai.mymagic.plugin.core.debug.Return
format: https://t.ly/Vemr0
- id: update_labels_with_list
type: ai.mymagic.plugin.core.execution.Labels
labels:
- key: "{{ inputs.url }}"
value: "{{ outputs.get.value }}"
Overriding Flow Labels at Runtime
You can set default labels at the flow level and override them dynamically during execution:
id: flow_with_labels
namespace: company.team
labels:
song: never_gonna_give_you_up
artist: rick-astley
genre: pop
tasks:
- id: get
type: ai.mymagic.plugin.core.debug.Return
format: never_gonna_stop
- id: update-list
type: ai.mymagic.plugin.core.execution.Labels
labels:
song: "{{ outputs.get.value }}"
In this example, the default song
label is overridden by the output of the get
task.
Best Practices
- Use consistent naming conventions for your labels.
- Avoid creating too many unique labels, which can lead to clutter.
- Regularly review and clean up unused labels.
- Use labels in combination with MyMagic’s search functionality for powerful filtering.
- Document the meaning and intended use of labels within your team.
By effectively using labels, you can significantly enhance the organization, searchability, and manageability of your MyMagic workflows and executions.
Plugin Defaults
Plugin defaults are a list of default values applied to each task of a certain type within your flow(s). They function similarly to default function arguments, helping to avoid repetition when a given task or plugin is often called with the same values.
Plugin Defaults at the Flow Level
You can add plugin defaults to avoid repeating task properties on multiple occurrences of the same task using the pluginDefaults
property. Here’s an example:
id: api_python_sql
namespace: company.team
tasks:
- id: api
type: ai.mymagic.plugin.core.http.Request
uri: https://dummyjson.com/products
- id: hello
type: ai.mymagic.plugin.scripts.python.Script
script: |
print("Hello World!")
- id: python
type: ai.mymagic.plugin.scripts.python.Script
beforeCommands:
- pip install polars
warningOnStdErr: false
outputFiles:
- "products.csv"
script: |
import polars as pl
data = {{outputs.api.body | jq('.products') | first}}
df = pl.from_dicts(data)
df.glimpse()
df.select(["brand", "price"]).write_csv("products.csv")
- id: sql_query
type: ai.mymagic.plugin.jdbc.duckdb.Query
inputFiles:
in.csv: "{{ outputs.python.outputFiles['products.csv'] }}"
sql: |
SELECT brand, round(avg(price), 2) as avg_price
FROM read_csv_auto('{{workingDir}}/in.csv', header=True)
GROUP BY brand
ORDER BY avg_price DESC;
store: true
pluginDefaults:
- type: ai.mymagic.plugin.scripts.python.Script
values:
taskRunner:
type: ai.mymagic.plugin.scripts.runner.docker.Docker
pullPolicy: ALWAYS # set it to NEVER to use a local image
containerImage: python:slim
In this example, Docker and Python configurations are set within the pluginDefaults
property, streamlining the configuration process and reducing the chances of errors caused by inconsistent settings across different tasks.
Note: When moving required task attributes into the pluginDefaults
property, the code editor within the UI may show warnings about missing required arguments. These warnings can be ignored as long as pluginDefaults
contains the relevant arguments, as they are resolved at runtime.
Plugin Defaults in Global Configuration
You can set plugin defaults in your global MyMagic configuration to apply the same defaults across multiple flows. For example, to centrally manage default values for the ai.mymagic.plugin.aws
plugin:
mymagic:
plugins:
defaults:
- type: ai.mymagic.plugin.aws
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "us-east-1"
To set defaults for a specific task:
mymagic:
plugins:
defaults:
- type: ai.mymagic.plugin.aws.s3.Upload
values:
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"
region: "us-east-1"
Best Practices
- Use plugin defaults for configurations that are consistent across multiple tasks or flows.
- Keep sensitive information like credentials in secrets and reference them in plugin defaults.
- Regularly review and update plugin defaults to ensure they align with your current requirements.
- Document your use of plugin defaults to help team members understand the configuration.
- Be cautious when overriding plugin defaults in individual tasks to maintain consistency.
By effectively using plugin defaults, you can significantly reduce configuration overhead and improve the maintainability of your MyMagic workflows.
Subflows
Subflows allow you to build modular and reusable workflow components. They function similarly to calling functions, where a subflow execution is created when you call a flow from another flow.
Why Use Subflows?
Subflows enable the creation of modular, reusable components that can be utilized across multiple flows. For instance, you might have a subflow dedicated to alerting errors to Slack and email. By using a subflow, you can reuse these tasks together for all flows that require error notifications, eliminating the need to copy individual tasks for every flow.
Declaring a Subflow
To call a flow from another flow, use the ai.mymagic.plugin.core.flow.Subflow
task. In this task, specify the flowId
and namespace
of the subflow you want to execute. Optionally, you can specify custom input values, similar to passing arguments in a function call.
The optional properties wait
and transmitFailed
control the execution behavior. By default, if wait
is not set or set to false, the parent flow continues execution without waiting for the subflow’s completion. The transmitFailed
property determines whether a failure in the subflow execution should cause the parent flow to fail.
Practical Example
Consider a subflow that encapsulates critical business logic:
id: critical_service
namespace: company.team
tasks:
- id: return_data
type: ai.mymagic.plugin.jdbc.duckdb.Query
sql: |
INSTALL httpfs;
LOAD httpfs;
SELECT sum(total) as total, avg(quantity) as avg_quantity
FROM read_csv_auto('https://huggingface.co/datasets/mymagic/datasets/raw/main/csv/orders.csv', header=True);
store: true
outputs:
- id: some_output
type: STRING
value: "{{ outputs.return_data.uri }}"
This subflow can be called from a parent flow:
id: parent_service
namespace: company.team
tasks:
- id: subflow_call
type: ai.mymagic.plugin.core.flow.Subflow
namespace: company.team
flowId: critical_service
wait: true
transmitFailed: true
- id: log_subflow_output
type: ai.mymagic.plugin.scripts.shell.Commands
taskRunner:
type: ai.mymagic.plugin.core.runner.Process
commands:
- cat "{{ outputs.subflow_call.outputs.some_output }}"
Subflow Properties
The ai.mymagic.plugin.core.flow.Subflow
task has several properties, including:
flowId
: The subflow’s identifier.namespace
: The namespace where the subflow is located.inheritLabels
: Determines if the subflow inherits labels from the parent (default: false).inputs
: Inputs passed to the subflow.labels
: Labels assigned to the subflow.revision
: The subflow revision to execute (defaults to the latest).wait
: If true, parent flow waits for subflow completion (default: false).transmitFailed
: If true, parent flow fails on subflow failure (requires wait to be true).
Passing Data Between Parent and Child Flows
Flows can emit outputs that can be accessed by the parent flow. For example:
id: flow_outputs
namespace: company.team
tasks:
- id: mytask
type: ai.mymagic.plugin.core.debug.Return
format: this is a task output used as a final flow output
outputs:
- id: final
type: STRING
value: "{{ outputs.mytask.value }}"
These outputs can be accessed from a parent task:
id: parent_flow
namespace: company.team
tasks:
- id: subflow
type: ai.mymagic.plugin.core.flow.Subflow
flowId: flow_outputs
namespace: company.team
wait: true
- id: log_subflow_output
type: ai.mymagic.plugin.core.log.Log
message: "{{ outputs.subflow.outputs.final }}"
Passing Inputs to a Subflow
You can pass inputs to a Subflow task. Here’s an example:
Subflow:
id: subflow_example
namespace: company.team
inputs:
- id: http_uri
type: STRING
tasks:
- id: download
type: ai.mymagic.plugin.core.http.Request
uri: "{{ inputs.http_uri }}"
- id: log
type: ai.mymagic.plugin.core.log.Log
message: "{{ outputs.download.body }}"
outputs:
- id: data
type: STRING
value: "{{ outputs.download.body }}"
Parent flow:
id: inputs_subflow
namespace: company.team
inputs:
- id: url
type: STRING
tasks:
- id: subflow
type: ai.mymagic.plugin.core.flow.Subflow
flowId: subflow_example
namespace: company.team
inputs:
http_uri: "{{ inputs.url }}"
wait: true
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: "{{ outputs.subflow.outputs.data }}"
Nested Inputs
You can pass nested inputs to subflows, which is useful for handling complex data structures:
id: extract_json
namespace: company.team
tasks:
- id: api
type: ai.mymagic.plugin.core.http.Request
uri: https://dummyjson.com/users
- id: subflow
type: ai.mymagic.plugin.core.flow.Subflow
namespace: company.team
flowId: subflow
inputs:
users.firstName: "{{ outputs.api.body | jq('.users') | first | first | jq('.firstName') | first }}"
users.lastName: "{{ outputs.api.body | jq('.users') | first | first | jq('.lastName') | first }}"
wait: true
transmitFailed: true
The subflow can then use these nested inputs:
id: subflow
namespace: company.team
inputs:
- id: users.firstName
type: STRING
defaults: Rick
- id: users.lastName
type: STRING
defaults: Astley
tasks:
- id: process_user_data
type: ai.mymagic.plugin.core.log.Log
message: hello {{ inputs.users }}
By leveraging subflows, you can create more maintainable, modular, and reusable workflows in MyMagic.
Errors (Coming Soon)
Retries
Retries are a mechanism to handle transient failures in your workflows. They are defined at the task level and can be configured to retry a task a certain number of times, or with a specific delay between each retry.
Understanding Retries
MyMagic provides task retry functionality, allowing you to add retry behavior for any failed task run based on configurations in the flow description. A retry on a task run will create a new task run attempt.
Example
Here’s an example that defines a retry for the retry-sample
task with a maximum of 5 attempts every 15 minutes:
- id: retry_sample
type: ai.mymagic.plugin.core.log.Log
message: my output for task {{task.id}}
timeout: PT10M
retry:
type: constant
maxAttempt: 5
interval: PT15M
In this next example, the flow will be retried four times every 0.25 ms, with each attempt executing for a maximum of 1 minute before being failed:
id: retry
namespace: company.team
description: This flow will retry 4 times and will succeed at the 5th attempt
tasks:
- id: failed
type: ai.mymagic.plugin.scripts.shell.Commands
taskRunner:
type: ai.mymagic.plugin.core.runner.Process
commands:
- 'if [ "{{taskrun.attemptsCount}}" -eq 4 ]; then exit 0; else exit 1; fi'
retry:
type: constant
interval: PT0.25S
maxAttempt: 5
maxDuration: PT1M
warningOnRetry: true
errors:
- id: never-happen
type: ai.mymagic.plugin.core.debug.Return
format: Never happened {{task.id}}
Retry Options
All retry types share these common options:
type
: Retry behavior to apply (constant, exponential, random)maxAttempt
: Number of retries before stoppingmaxDuration
: Maximum delay for retrieswarningOnRetry
: Flag the execution as warning if any retry was done
Retry Types
- Constant: Establishes constant retry times
- Exponential: Waits longer between each retry
- Random: Retries with a random delay between minimum and maximum limits
Configuring Retries Globally
You can configure retries globally for all tasks in a flow:
mymagic:
plugins:
configurations:
- type: ai.mymagic
values:
retry:
type: constant
maxAttempt: 3
interval: PT30S
Flow-level Retries
You can set a flow-level retry policy to restart the execution if any task fails:
id: flow_level_retry
namespace: company.team
retry:
maxAttempt: 3
behavior: CREATE_NEW_EXECUTION # or RETRY_FAILED_TASK
type: constant
interval: PT1S
tasks:
- id: fail_1
type: ai.mymagic.plugin.core.execution.Fail
allowFailure: true
- id: fail_2
type: ai.mymagic.plugin.core.execution.Fail
allowFailure: false
Retry vs. Restart vs. Replay
Automatic vs. Manual
Retries are automatic, while Restart and Replay are manual operations initiated from the UI.
Restart vs. Replay
- Restart: Reruns failed tasks within the current Execution
- Replay: Creates a new Execution with a different ID
Summary: Retries vs. Restart vs. Replay
Concept | Flow or task level | Automatic or manual | Creates new execution? |
---|---|---|---|
Retry | Task level | Automatic | No |
Restart | Flow level | Manual | No |
Replay | Either | Manual | Yes |
Best Practices
- Use retries for transient failures that might resolve on their own.
- Configure appropriate intervals and max attempts based on the task’s nature.
- Use flow-level retries for scenarios where the entire flow should be retried.
- Utilize Restart for manual intervention after unexpected failures.
- Use Replay when you need to create a new execution from a specific point.
By effectively using retries, restarts, and replays, you can create more resilient and fault-tolerant workflows in MyMagic.
Timeouts
Timeout allows you to set a maximum duration for a task run.
What is Timeout
If a task run exceeds the specified duration, MyMagic will automatically stop the task run and mark it as failed. This is particularly useful for tasks that may hang and run indefinitely.
Timeout is often used as a cost control mechanism for cloud-based workflows. For instance, a Snowflake query or an AWS Batch job running for hours could lead to unexpected costs. By setting a timeout, you can ensure that the task run will not exceed a certain duration.
Format
Similar to durations in retries, timeouts use the ISO 8601 Durations format. Here are some examples:
Name | Description |
---|---|
PT0.250S | 250 milliseconds delay |
PT2S | 2 seconds delay |
PT1M | 1 minute delay |
PT3.5H | 3 hours and a half delay |
Example
In this example, the costly_query
task will sleep for 10 seconds, but the timeout is set to 5 seconds, leading to a failed task run:
id: timeout
namespace: company.team
description: This flow will always fail because of a timeout.
tasks:
- id: costly_query
type: ai.mymagic.plugin.scripts.shell.Commands
taskRunner:
type: ai.mymagic.plugin.core.runner.Process
commands:
- sleep 10
timeout: PT5S
Best Practices
- Set reasonable timeouts based on the expected duration of your tasks.
- Use timeouts for tasks that interact with external services or resources to prevent hanging in case of connectivity issues.
- Consider setting different timeouts for development and production environments.
- Regularly review and adjust your timeouts based on task performance and changing requirements.
- Use timeouts in conjunction with retries for more robust error handling.
Considerations
- Timeouts should be set carefully to avoid interrupting long-running but valid tasks.
- When a task times out, ensure you have proper error handling and notification mechanisms in place.
- For tasks with variable execution times, consider setting a generous timeout and using other monitoring mechanisms for more granular control.
By effectively using timeouts, you can create more reliable and cost-effective workflows, preventing runaway processes and ensuring timely completion of your tasks.
Concurrency Control
Control concurrent executions of a given flow using the flow-level concurrency
property.
Overview
The concurrency
property allows you to set a limit on the number of concurrent executions for a specific flow. This global concurrency limit applies to all executions of the flow, regardless of how they are triggered (automatically via trigger, webhook, API call, or manually from the UI).
Basic Usage
To set a concurrency limit, use the limit
key within the concurrency
property:
id: concurrency_example
namespace: company.team
concurrency:
limit: 2
tasks:
- id: wait
type: ai.mymagic.plugin.scripts.shell.Commands
commands:
- sleep 10
In this example, only two executions of the flow will be allowed to run simultaneously. Any additional executions will be queued until one of the running executions completes.
Behavior Property
You can customize how the system responds when the concurrency limit is reached using the behavior
property. This property accepts three possible values:
QUEUE
: Queues additional executions (default behavior)CANCEL
: Immediately cancels additional executionsFAIL
: Immediately fails additional executions
Example with custom behavior:
id: concurrency_limited_flow
namespace: company.team
concurrency:
behavior: FAIL # QUEUE, CANCEL or FAIL
limit: 2 # can be any integer >= 1
tasks:
- id: wait
type: ai.mymagic.plugin.scripts.shell.Commands
commands:
- sleep 10
In this case, when the concurrency limit of 2 is reached, any additional execution attempts will immediately fail without running any tasks.
Visual Feedback
The user interface provides visual feedback on the concurrency status:
- For
QUEUE
behavior: Additional executions will show as queued while the first two finish. - For
CANCEL
orFAIL
behavior: The third execution’s state will immediately be set toCANCELLED
orFAILED
respectively.
Best Practices
- Set appropriate concurrency limits based on your system’s resources and the flow’s requirements.
- Use the
QUEUE
behavior for tasks that can be delayed but must eventually run. - Use
CANCEL
orFAIL
for time-sensitive tasks where running later is not acceptable. - Monitor queued executions to ensure they’re not building up excessively.
- Combine concurrency control with timeouts for more robust flow management.
Considerations
- Concurrency limits apply per flow, not globally across all flows.
- When using
QUEUE
, ensure your system can handle the potential backlog of executions. CANCEL
andFAIL
behaviors can help prevent resource overload but may require additional error handling in your overall workflow.
By effectively using concurrency control, you can manage system resources, prevent overload, and ensure smooth execution of your workflows.
Descriptions
Add descriptions to your flows, inputs, outputs, tasks, and triggers using the description
property.
Overview
The description
property is a string that supports Markdown syntax, allowing for rich, formatted documentation within your workflow definitions. These descriptions are rendered in the UI, providing clear and accessible information about various components of your flows.
Supported Components
You can add a description
property to:
- Flows
- Inputs
- Outputs
- Tasks
- Triggers
Example
Here’s an example flow demonstrating the use of descriptions in different components:
id: myflow
namespace: company.team
description: |
This is the **Flow Description**.
You can look at `input description`, `task description`, `output description` and `trigger description` as well in this example.
labels:
env: dev
project: myproject
inputs:
- id: payload
type: JSON
description: JSON request payload to the API # Input description
defaults: |
[{"name": "mymagic", "rating": "best in class"}]
tasks:
- id: send_data
type: ai.mymagic.plugin.core.http.Request
description: Task for sending POST API request to https://reqres.in/api/products # Task description
uri: https://reqres.in/api/products
method: POST
contentType: application/json
body: "{{ inputs.payload }}"
- id: print_status
type: ai.mymagic.plugin.core.debug.Return
description: Task printing the API request date # Task description
format: hello on {{ outputs.send_data.headers.date | first }}
outputs:
- id: final
type: STRING
description: This is a task output used as a final flow output
value: "{{ outputs.print_status.value }}"
triggers:
- id: daily
type: ai.mymagic.plugin.core.trigger.Schedule
description: Trigger the flow at 09:00am every day # Trigger description
cron: "0 9 * * *"
Best Practices
- Use clear and concise language in your descriptions.
- Leverage Markdown syntax for better readability (e.g., headers, lists, code blocks).
- Include relevant details such as purpose, expected inputs/outputs, and any important considerations.
- Keep descriptions up-to-date as your flows evolve.
- Use consistent formatting across your descriptions for a unified documentation style.
Tips for Effective Documentation
- Flow Description: Provide an overview of the flow’s purpose and any important context.
- Input Description: Explain the expected format, constraints, and significance of each input.
- Task Description: Detail what the task does, any prerequisites, and its role in the overall flow.
- Output Description: Clarify what the output represents and how it might be used downstream.
- Trigger Description: Specify when and why the trigger activates the flow.
Benefits
- Improves workflow understanding for team members and stakeholders.
- Facilitates easier maintenance and troubleshooting.
- Enhances collaboration by providing clear context for each component.
- Serves as built-in documentation that stays in sync with your workflow definition.
By effectively using the description
property throughout your flows, you create self-documenting workflows that are easier to understand, maintain, and collaborate on.
Disabled Flag
The disabled
flag is a boolean property that allows you to skip a flow, task, or trigger.
Overview
This feature is helpful when debugging or testing specific parts of your flow without removing existing logic. Instead of deleting parts of your YAML, you can add the disabled
property.
Disabled Flow
When a flow is disabled, it will not be executed, even if a trigger is set. Any active triggers on a disabled flow will be ignored automatically.
Example of a disabled flow:
id: disabled_flow
namespace: company.team
disabled: true
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: MyMagic team wishes you a great day! 👋
triggers:
- id: fail_every_minute
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "*/1 * * * *"
Behavior with Subflows
When trying to execute a disabled flow from a subflow:
id: parent_runs_disabled_flow
namespace: company.team
tasks:
- id: disabled_subflow
type: ai.mymagic.plugin.core.flow.Subflow
flowId: disabled_flow
namespace: company.team
Executing the parent flow will immediately fail with the error message: Cannot execute a flow which is disabled
.
API Behavior
When triggering a disabled flow via an API call:
curl -X POST http://localhost:8080/api/v1/executions/trigger/example/parent_runs_disabled_flow
The API call itself will be successful, but the execution will be immediately marked as failed with the error message: Cannot execute a flow which is disabled
.
Disabled Trigger
You can temporarily disable a trigger by setting the disabled
flag to true
:
id: myflow
namespace: company.team
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: hello from a scheduled flow
triggers:
- id: daily
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "0 9 * * *"
disabled: true
No scheduled executions will be created for this flow while the trigger is disabled.
Disabled Task
You can disable a single task within a flow:
id: myflow
namespace: company.team
tasks:
- id: enabled
type: ai.mymagic.plugin.core.log.Log
message: this task will run
- id: disabled
type: ai.mymagic.plugin.core.debug.Return
format: this task will be skipped
disabled: true
Disabled tasks are visually greyed out in the UI.
Best Practices
- Use the
disabled
flag for temporary changes during development or debugging. - Document why a component is disabled, especially if committing the change.
- Regularly review disabled components to ensure they’re not forgotten.
- Use version control to track changes in disabled status.
Considerations
- Disabling a flow affects all its triggers and tasks.
- Disabling a trigger only affects that specific trigger, not the entire flow.
- Disabled tasks are skipped during execution but remain in the flow definition.
By effectively using the disabled
flag, you can streamline your development and debugging processes without altering the structure of your workflows.
States
States control the status of your workflow execution.
Overview
An execution is a single run of a flow in a specific state. Each state represents a particular point in the workflow where MyMagic’s orchestration system determines the next steps based on the control flow logic defined in the flow.
Execution States
Each MyMagic execution can transition through several states during its lifecycle. Here’s a brief description of each state:
- CREATED: Execution created but not yet started. A transient state indicating the execution is waiting to be processed.
- QUEUED: Execution waiting for a free slot to start running. Used when the flow has concurrency limits and all available slots are taken.
- RUNNING: Execution currently in progress.
- SUCCESS: Execution completed successfully. All tasks finished without errors (or were allowed to fail).
- WARNING: Execution completed successfully, but one or more tasks emitted warnings.
- FAILED: One or more tasks failed and will not be retried. Error tasks (if defined) will be executed before ending the execution.
- RETRYING: Execution is currently retrying one or more failed task runs.
- RETRIED: Execution has been retried according to the flow-level retry policy set to CREATE_NEW_EXECUTION behavior.
- PAUSED: Execution awaiting manual approval or paused for a fixed duration.
- RESTARTED: Equivalent to CREATED, but for a failed execution that has been restarted.
- CANCELLED: Execution automatically cancelled by the system, usually due to concurrency limits.
- KILLING: User has issued a command to kill the execution, system is terminating task runs.
- KILLED: Execution has been killed upon user request. No more tasks will run.
Task Run States
Task run states represent the status of a single task run within an execution:
- CREATED: Task run created but not yet started.
- RUNNING: Task run currently in progress.
- SUCCESS: Task run completed successfully.
- WARNING: Task run completed successfully but with warnings.
- FAILED: Task run has failed.
- RETRYING: Task run is currently being retried.
- RETRIED: Task run has been retried.
- KILLING: Task run is in the process of being killed.
- KILLED: Task run has been killed upon user request.
Note: There are no QUEUED, CANCELLED, PAUSED, or RESTARTED states for task runs.
Key Differences
- CANCELLED vs KILLED: CANCELLED is used when the system automatically cancels an execution due to concurrency limits. KILLED is used when the execution has been terminated upon user request.
- Execution States vs Task Run States: Execution states represent the overall status of the workflow, while task run states represent the status of individual tasks within the execution.
Best Practices
- Monitor executions in CREATED state to ensure they’re not stuck.
- Use the PAUSED state for manual approvals or timed delays in workflows.
- Implement proper error handling for FAILED states.
- Utilize the WARNING state to flag executions that completed but require attention.
- Be cautious when using KILLING state, as it may leave resources in an inconsistent state.
Considerations
- State transitions are managed by MyMagic’s orchestration system.
- Some states are transient (e.g., CREATED, RUNNING) while others are terminal (e.g., SUCCESS, FAILED).
- The flow of states can be influenced by retry policies, concurrency settings, and manual interventions.
Understanding these states is crucial for effective monitoring, troubleshooting, and optimization of your MyMagic workflows.
Revisions
Manage versions of flows efficiently with built-in versioning capabilities.
Automatic Versioning
Flows are versioned by default. Each time you make changes to your flows, a new revision is automatically created. This feature provides a robust mechanism for tracking changes and enables easy rollback to previous versions when necessary.
Viewing Revisions
To access the revision history of a flow:
- Navigate to the specific flow.
- Go to the Revisions tab.
- You will see a list of all revisions for that flow.
Revision Comparison
The revision system allows you to compare different versions of your flow:
- Compare two revisions side-by-side.
- Easily identify changes between versions.
- Understand the evolution of your flow over time.
Rollback Capability
If you need to revert to a previous version of your flow:
- Navigate to the Revisions tab.
- Select the desired previous revision.
- Use the rollback feature to restore that version.
This capability ensures that you can quickly recover from unintended changes or return to a known good state of your flow.
Best Practices
- Regular Reviews: Periodically review the revision history to understand how your flows have evolved.
- Meaningful Changes: Try to make meaningful, atomic changes between revisions to make the history more understandable.
- Comments: If your system supports it, add comments or descriptions to revisions to explain significant changes.
- Testing: Always test your flow after rolling back to ensure it still functions as expected in the current environment.
Considerations
- Revisions are typically immutable once created.
- Rolling back doesn’t delete newer revisions; it creates a new revision based on the older state.
- Be cautious when rolling back flows that interact with external systems or data stores, as the current state may have changed since the previous revision.
By leveraging the flow revision system effectively, you can maintain a clear history of your workflow changes, easily track modifications, and ensure the ability to recover from unintended alterations quickly.
KV Store
Build stateful workflows with the Key-Value (KV) Store.
Overview
MyMagic’s workflows are designed to be stateless, with isolated executions and task runs to avoid unintended side effects. However, the KV Store allows you to share data beyond passing outputs between tasks, enabling data persistence across executions or different workflows.
Architecture Integration
The KV Store is built on top of MyMagic’s internal storage, ensuring that:
- All values are stored in your private cloud storage bucket.
- MyMagic’s database only contains metadata about the objects.
- You have full control and privacy over your data.
Keys and Values
- Keys are arbitrary strings that can contain uppercase and lowercase characters and standard ASCII characters.
- Values are stored as ION files in MyMagic’s internal storage and can be of types: string, number, boolean, datetime, date, duration, or JSON.
- Time to Live (TTL) can be set for each KV pair to manage storage efficiently.
Namespace Binding
KV pairs are defined at a namespace level, accessible from the namespace page in the UI under the KV Store tab. Cross-namespace access is possible for allowed namespaces.
Management Methods
KV pairs can be managed through various methods:
- MyMagic UI
- Tasks in a flow
- MyMagic’s API
- MyMagic’s Terraform provider
- Pebble function
- GitHub Actions
UI Management
Create New KV Pairs
- Navigate to the Namespaces page and select the desired namespace.
- Go to the KV Store tab.
- Click on “New Key-Value” button.
- Enter key name, select value type, enter value, and optionally set TTL.
- Save the changes.
Update and Delete KV Pairs
Use the Edit button next to each KV pair to modify or delete it.
Code Management
Create a KV Pair in a Flow
Use the ai.mymagic.plugin.core.kv.Set
task:
id: add_kv_pair
namespace: company.team
tasks:
- id: set_kv
type: ai.mymagic.plugin.core.kv.Set
key: my_key
value: "{{ outputs.download.uri }}"
namespace: company.team
overwrite: true
ttl: P30D
Read KV Pairs with Pebble
Use the {{ kv('YOUR_KEY') }}
Pebble function:
id: read_kv_pair
namespace: company.team
tasks:
- id: log_key
type: ai.mymagic.plugin.core.log.Log
message: "{{ kv('my_key') }}"
Read KV Pairs with Get Task
Use the ai.mymagic.plugin.core.kv.Get
task:
id: get_kv_pair
namespace: company.team
tasks:
- id: get
type: ai.mymagic.plugin.core.kv.Get
key: my_key
namespace: company.team
errorOnMissing: false
Delete a KV Pair
Use the ai.mymagic.plugin.core.kv.Delete
task:
id: delete_kv_pair
namespace: company.team
tasks:
- id: kv
type: ai.mymagic.plugin.core.kv.Delete
key: my_key
namespace: company.team
errorOnMissing: false
API Management
Create a KV Pair
curl -X PUT -H "Content-Type: application/json" http://localhost:8080/api/v1/namespaces/company.team/kv/my_key -d '"Hello World"'
Read a KV Pair
curl -X GET -H "Content-Type: application/json" http://localhost:8080/api/v1/namespaces/company.team/kv/my_key
Delete a KV Pair
curl -X DELETE -H "Content-Type: application/json" http://localhost:8080/api/v1/namespaces/company.team/kv/my_key
Terraform Management
Create a KV Pair
resource "mymagic_kv" "my_key" {
namespace = "company.team"
key = "my_key"
value = "Hello World"
type = "STRING"
}
Read a KV Pair
data "mymagic_kv" "new" {
namespace = "company.team"
key = "my_key"
}
By leveraging these methods, you can effectively create, read, update, and delete KV pairs in MyMagic, enabling stateful workflows and data sharing across executions and workflows.
Pebble Templating
Dynamically render variables, inputs, and outputs using Pebble templating engine.
Overview
MyMagic uses Pebble, a Java templating engine inspired by Twig and similar to Python’s Jinja Template Engine, to dynamically render variables, inputs, and outputs within the execution context.
Reading Inputs
Access input values in your tasks using the inputs
variable:
id: input_string
namespace: company.team
inputs:
- id: name
type: STRING
tasks:
- id: say_hello
type: ai.mymagic.plugin.core.log.Log
message: "Hello 👋, my name is {{ inputs.name }}"
Reading Task Outputs
Access outputs from previous tasks using outputs.<task_name>.<output_name>
:
id: input_string
namespace: company.team
inputs:
- id: name
type: STRING
tasks:
- id: say_hello
type: ai.mymagic.plugin.core.debug.Return
format: "Hello 👋, my name is {{ inputs.name }}"
- id: can_you_repeat
type: ai.mymagic.plugin.core.log.Log
message: '{{ outputs.say_hello.value }}'
TemplatedTask for Dynamic Rendering
Use the TemplatedTask
to dynamically render all task properties:
id: templated_databricks_job
namespace: company.team
inputs:
- id: host
type: STRING
- id: clusterId
type: STRING
# ... other inputs ...
tasks:
- id: templated_spark_job
type: ai.mymagic.plugin.core.templating.TemplatedTask
spec: |
type: ai.mymagic.plugin.databricks.job.CreateJob
authentication:
token: "{{ secret('DATABRICKS_API_TOKEN') }}"
host: "{{ inputs.host }}"
jobTasks:
- existingClusterId: "{{ inputs.clusterId }}"
# ... other properties ...
Date Formatting
Use the date
filter for date formatting:
'{{ inputs.my_date | date("yyyyMMdd") }}'
Conditional Date Usage
Use the coalesce operator ??
to conditionally use trigger or execution date:
id: pebble_date_trigger
namespace: company.team
tasks:
- id: return_date
type: ai.mymagic.plugin.core.debug.Return
format: '{{ trigger.date ?? execution.startDate | date("yyyy-MM-dd")}}'
triggers:
- id: schedule
type: ai.mymagic.plugin.core.trigger.Schedule
cron: "* * * * *"
Parsing Objects & Lists with jq
Use jq
to parse nested objects or lists:
id: object_example
namespace: company.team
inputs:
- id: data
type: JSON
defaults: '{"value": [1, 2, 3]}'
tasks:
- id: hello
type: ai.mymagic.plugin.core.log.Log
message: "{{ inputs.data.value | jq('.[1]') | first }}"
Using Conditions in Pebble
Apply conditions in tasks like If
or Switch
:
id: test-object
namespace: company.team
inputs:
- id: data
type: JSON
defaults: '{"value": [1, 2, 3]}'
tasks:
- id: if
type: ai.mymagic.plugin.core.flow.If
condition: '{{ inputs.data.value | jq(".[2]") | first == 3}}'
then:
- id: when_true
type: ai.mymagic.plugin.core.log.Log
message: 'Condition was true'
else:
- id: when_false
type: ai.mymagic.plugin.core.log.Log
message: 'Condition was false'
Best Practices
- Use the Debug Outputs feature in the UI to troubleshoot complex Pebble expressions.
- Leverage
jq
for parsing complex JSON structures. - Use the coalesce operator
??
for handling both triggered and manual executions. - Utilize the
TemplatedTask
for full dynamic rendering of task properties.
By mastering Pebble templating in MyMagic, you can create more dynamic and flexible workflows, easily adapting to various inputs and conditions.
Templates (Coming Soon)
Plugins (Coming Soon)
Upcoming Features
We plan to integrate LLM technology to enable natural language commands for workflow creation, offering an alternative to YAML configuration for users who prefer a more intuitive interface.
(More To Come)