Skip to main content

Automating Data Loading with Tasks

A task encapsulates specific SQL statements that are designed to be executed either at predetermined intervals, triggered by specific events, or as part of a broader sequence of tasks. Tasks in Databend Cloud are commonly used to regularly capture data changes from streams, such as newly added records, and then synchronize this data with designated target destinations. Furthermore, tasks offer support for Webhook and other messaging systems, facilitating the delivery of error messages and notifications as needed.

Creating a Task

This topic breaks down the procedure of creating a task in Databend Cloud. In Databend Cloud, you create a task using the CREATE TASK command. When creating a task, follow the illustration below to design the workflow:

alt text

  1. Set a name for the task.

  2. Specify a warehouse to run the task. To create a warehouse, see Work with Warehouses.

  3. Determine how to trigger the task to run.

    • You can schedule the task to run by specifying the interval in minutes or seconds, or by using a CRON expression with an optional time zone for more precise scheduling.
    Examples:
    -- This task runs every 2 minutes
    CREATE TASK mytask
    WAREHOUSE = 'default'
    SCHEDULE = 2 MINUTE
    AS ...

    -- This task runs daily at midnight (local time) in the Asia/Tokyo timezone
    CREATE TASK mytask
    WAREHOUSE = 'default'
    SCHEDULE = USING CRON '0 0 0 * * *' 'Asia/Tokyo'
    AS ...
    • Alternatively, you can establish dependencies between tasks, setting the task as a child task in a Directed Acyclic Graph.
    Examples:
    --  This task is dependent on the completion of the 'task_root' task in the DAG
    CREATE TASK mytask
    WAREHOUSE = 'default'
    AFTER task_root
    AS ...
  4. Specify the condition under which the task will execute, allowing you to optionally control task execution based on a boolean expression.

    Examples:
    -- This task runs every 2 minutes and executes the SQL after AS only if 'mystream' contains data changes
    CREATE TASK mytask
    WAREHOUSE = 'default'
    SCHEDULE = 2 MINUTE
    WHEN STREAM_STATUS('mystream') = TRUE
    AS ...
  5. Specify what to do if the task results in an error, including options such as setting the number of consecutive failures to suspend the task and specifying the notification integration for error notifications. For more information about setting an error notification, see Configuring Notification Integrations.

    Examples:
    --  This task will suspend after 3 consecutive failures
    CREATE TASK mytask
    WAREHOUSE = 'default'
    SUSPEND_TASK_AFTER_NUM_FAILURES = 3
    AS ...

    -- This task will utilize the 'my_webhook' integration for error notifications.
    CREATE TASK mytask
    WAREHOUSE = 'default'
    ERROR_INTEGRATION = 'my_webhook'
    AS ...
  6. Specify the SQL statement the task will execute.

    Examples:
    -- This task updates the 'age' column in the 'employees' table, incrementing it by 1 every year.
    CREATE TASK mytask
    WAREHOUSE = 'default'
    SCHEDULE = USING CRON '0 0 1 1 * *' 'UTC'
    AS
    UPDATE employees
    SET age = age + 1;

Configuring Notification Integrations

Databend Cloud allows you to configure error notifications for a task, automating the process of sending notifications when an error occurs during the task execution. It currently supports Webhook integrations, facilitating seamless communication of error events to external systems or services in real-time.

Task Error Payload

A task error payload refers to the data or information that is sent as part of an error notification when a task encounters an error during its execution. This payload typically includes details about the error, such as error codes, error messages, timestamps, and potentially other relevant contextual information that can help in diagnosing and resolving the issue.

Task Error Payload Example:
{
"version": "1.0",
"messageId": "063e40ab-0b55-439e-9cd2-504c496e1566",
"messageType": "TASK_FAILED",
"timestamp": "2024-03-19T02:37:21.160705788Z",
"tenantId": "tn78p61xz",
"taskName": "my_task",
"taskId": "15",
"rootTaskName": "my_task",
"rootTaskId": "15",
"messages": [
{
"runId": "unknown",
"scheduledTime": "2024-03-19T02:37:21.157169855Z",
"queryStartTime": "2024-03-19T02:37:21.043090475Z",
"completedTime": "2024-03-19T02:37:21.157169205Z",
"queryId": "88bb9d5d-5d5e-4e52-92cc-b1953406245a",
"errorKind": "UnexpectedError",
"errorCode": "500",
"errorMessage": "query sync failed: All attempts fail:\n#1: query error: code: 1006, message: divided by zero while evaluating function `divide(1, 0)`"
}
]
}

Usage Examples

Before configuring error notifications for a task, you must create a notification integration with the CREATE NOTIFICATION INTEGRATION command. The following is an example of creating and configuring a notification integration for a task. The example utilizes Webhook.site to simulate the messaging system, receiving payloads from Databend Cloud.

  1. Open the Webhook.site in your web browser, and obtain the URL of your Webhook.

alt text

  1. In Databend Cloud, create a notification integration, and then create a task with the notification integration:
-- Create a task named 'my_task' to run every minute, with error notifications sent to 'my_webhook'.
-- Intentionally divide by zero to generate an error.
CREATE TASK my_task
WAREHOUSE = 'default'
SCHEDULE = 1 MINUTE
ERROR_INTEGRATION = 'my_webhook'
AS
SELECT 1 / 0;

-- Create a notification integration named 'my_webhook' for sending webhook notifications.
CREATE NOTIFICATION INTEGRATION my_webhook
TYPE = WEBHOOK
ENABLED = TRUE
WEBHOOK = (
url = '<YOUR-WEBHOOK_URL>',
method = 'POST'
);

-- Resume the task after creation
ALTER TASK my_task RESUME;
  1. Wait for a moment, and you'll notice that your webhook starts to receive the payload from the created task.

alt text

Usage Examples

See Example: Tracking and Transforming Data in Real-Time for a complete demo on how to capture data changes with a stream and sync them with a task.

Did this page help you?
Yes
No
Explore Databend Cloud for FREE
Low-cost
Fast Analytics
Easy Data Ingestion
Elastic Scaling
Try it today