Automating product launch using Prefect workflow orchestrator

Introduction

With the growing demand for task automation, modern workflow orchestration tools have gained popularity. In this tutorial, we'll explore Prefect, a powerful alternative to traditional tools like crontab. Prefect offers dynamic workflows, built-in observability, automatic retries, and scalability, making it an ideal choice for managing complex workflows.

What is Prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python. In this guide we will use Prefect 3.0 and Azure container to automate product launch process. To run our workflow using Prefect, we need to set up several key components:

  • Tasks: The fundamental building blocks that define individual units of work.
  • Flows: The orchestration layer that connects and manages tasks.
  • Variables: Configurable parameters that help make workflows dynamic and reusable.
  • Blocks: Securely stored configurations, such as API credentials, database connections, or storage locations.
  • Deployment: A packaged version of the workflow, enabling scheduling, parameterization, and execution via the Prefect API.
  • Work pool: Defines the execution environment, specifying where and how flows run (e.g., Docker, Kubernetes, local processes).

Work pools are a bridge between the Prefect orchestration layer and the infrastructure where flows are run.

  • Pull work pools: These require workers to actively poll for flow runs to execute.
  • Push work pools: These submit runs directly to serverless infrastructure providers. In this article we will use this type of work pools
  • Managed work pools: These are administered by Prefect and handle both submission and execution of code.

We will use Azure container to run the workflow in this guide, but you can use any supported platform, you can also use prefect on premise or use their managed cloud solution. For more details about work pools please visit https://docs.prefect.io/v3/deploy/infrastructure-concepts/work-pools

Setup Prefect

To get started, we first need to install Prefect and set up the CLI. Since I'm running Prefect locally, I will configure the local API to manage workflows without relying on Prefect Cloud.

1
2pip install prefect
3prefect server start
4prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
5

In this guide, we will use Azure Container Instances (ACI) as a push work pool. This will allow us to execute flows without needing to run a worker process to poll for flow runs. Let's configure Azure and create the required work pool.

For macOS users, you can install and configure the Azure CLI by running the following commands::

1
2brew update && brew install azure-cli
3az login
4

To create a push work pool let's execute this command

1
2prefect work-pool create --type azure-container-instance:push --provision-infra workflows-guru
3

The --provision-infra flag automatically sets up your default Azure account to execute flows through Azure Container Instances. In your Azure account, this command creates a resource group, app registration, service account with necessary permission, generates a secret for the app registration, and creates an Azure Container Registry, (if they don't already exist). In your Prefect workspace, this command creates an AzureContainerInstanceCredentials block to store the client secret value from the generated secret. The image below show the output after running the command:

Create a work pool with prefect CLI

Create a work pool with prefect CLI

After the Setup of prefect CLI and Azure infrastructure to run our flows let's create our workflow.

Build workflows using Prefect

Variable and Blocks are component of prefect, they are used to store parameters needed by our tasks. Before implementing the workflow let's define the variable and blocks that we will use.

Use Variables and Blocks

  • Variables: Are used to store non-sensitive data, store dynamic data such as the launch date, subscriber list, or social media platforms.
  • Blocks: Store credentials used to access external systems, Reusable infrastructure components, e.g., email credentials or API keys.

You can use the UI to declare variables and blocks, or you can the prefect lib, here is how to add the required variables using python code.

1from prefect.variables import Variable
2
3Variable.set("platform", "bluesky")
4Variable.get("platform")
5

In this example, we set a platform variable and retrieve its value. To create a Block, you can use the following code:

1from typing import Optional
2from prefect.blocks.core import Block
3from pydantic import SecretStr # if pydantic version >= 2.0, use: from pydantic.v1 import SecretStr
4
5class AwsCredentials(Block):
6 aws_access_key_id: Optional[str] = None
7 aws_secret_access_key: Optional[SecretStr] = None
8 aws_session_token: Optional[str] = None
9 profile_name: Optional[str] = None
10 region_name: Optional[str] = None
11
12aws_credentials_block = AwsCredentials(
13 aws_access_key_id="AKIAJKLJKLJKLJKLJKLJK",
14 aws_secret_access_key="secret_access_key"
15)
16
17aws_credentials_block.save("aws-creds")
18AwsCredentials.load("aws-creds")
19

Define Tasks

The notification system is composed of different tasks that will make some automated action before a product launch. Here is the list of actions that we will implement by our workflow:

  • Checking if the launch date is near.
  • Sending email notifications to subscribers.
  • Posting updates to social media.
  • Updating a dashboard with status and feedback data..

To define a Prefect Task, you simply need to use the @task decorator. The following code creates three tasks that will be used in our flow. This example is provided for demonstration purposes.

1
2from prefect import task, flow
3
4@task
5def send_email_notifications(subscribers):
6 for subscriber in subscribers:
7 print(f"Sending email to {subscriber}")
8 return len(subscribers)
9
10@task
11def post_to_social_media(platforms):
12 for platform in platforms:
13 print(f"Posting update to {platform}")
14 return platforms
15
16@task
17def update_dashboard(email_count, platforms):
18 print(f"Dashboard updated: {email_count} emails sent, updates posted to {platforms}")
19

Define the Flow

After defining our tasks, here's how to define the flow. Similar to tasks, flows in Prefect are defined using the @flow decorator.

1from datetime import datetime
2
3@flow
4def launch_notification_flow(launch_date, subscribers, platforms):
5 is_near = check_launch_date(launch_date)
6 if is_near:
7 email_count = send_email_notifications(subscribers)
8 posted_platforms = post_to_social_media(platforms)
9 update_dashboard(email_count, posted_platforms)
10 else:
11 print("Launch date is not near. No notifications sent.")
12

Deploy and Run Prefect workflows

Deployments allow you to run flows on a schedule and trigger runs based on events. Deployments are server-side representations of flows. They store the crucial metadata for remote orchestration including when, where, and how a workflow should run. In addition to manually triggering and managing flow runs, deploying a flow exposes an API and UI that allow you to: trigger new runs, cancel active runs, pause scheduled runs, customize parameters, and more remotely configure schedules and automation rules dynamically provision infrastructure with work pools.

Create a Deployment

To deploy your workflow, use the command below, assuming your flow is stored in launch_notification_flow.py

1
2prefect deployment build launch_notification_flow.py:launch_notification_flow --name "Launch Notifications"
3prefect deployment apply launch_notification_flow-deployment.yaml
4

Schedule and Monitor

Once the deployment is created you can schedule a run every day do check for launch date and execute the workflow. In the UI you can click on the deployment to add a schedule, you can use different schedule type as you see in the following image

Schedule Prefect Deployment with UI

Schedule Prefect Deployment with UI

Conclusion

In this article, we explored the fundamentals of Prefect and demonstrated its capabilities by implementing a basic workflow designed to automate product launches. Whether you're managing simple tasks or orchestrating intricate workflows, Prefect empowers you to achieve efficiency and reliability with ease. As you continue your journey with Prefect, you'll discover even more ways to optimize and enhance your workflows for seamless execution.