The world of Machine Learning (ML) is constantly evolving, and as models become more complex and data volumes grow, managing the entire ML lifecycle efficiently becomes paramount. This is where MLOps, and specifically ML Pipelines, come into play. If you've been looking to streamline your ML workflows, automate repetitive tasks, and ensure reproducibility, then you're in the right place!
Welcome to the ultimate guide on how to create a Vertex AI Pipeline! By the end of this comprehensive post, you'll not only understand the "why" behind pipelines but also have a clear, step-by-step roadmap to build your own, robust ML workflows on Google Cloud's Vertex AI.
Are you ready to transform your scattered ML scripts into a well-oiled, automated machine? Let's dive in!
What are Vertex AI Pipelines?
Vertex AI Pipelines is a serverless MLOps service on Google Cloud that helps you automate, monitor, and govern your machine learning systems. It allows you to orchestrate your ML workflows using pipelines defined with the Kubeflow Pipelines (KFP) SDK or TensorFlow Extended (TFX) framework. Essentially, it helps you break down your complex ML tasks (like data preprocessing, model training, evaluation, and deployment) into smaller, reusable components that can be chained together.
Key Benefits of Vertex AI Pipelines:
Automation: Automate the entire ML lifecycle, reducing manual effort and potential for errors.
Reproducibility: Easily reproduce past experiments and model versions, crucial for auditing and debugging.
Scalability: Leverage Google Cloud's infrastructure to scale your ML workloads as needed, without managing servers.
Collaboration: Share and collaborate on ML workflows with your team using standardized pipeline definitions.
Monitoring and Governance: Track lineage of artifacts, monitor pipeline runs, and gain insights into your ML system's performance.
Cost Optimization: Pay only for the resources consumed during pipeline execution, with serverless capabilities.
How To Create Vertex Ai Pipeline |
Step 1: Setting Up Your Google Cloud Environment
Before we start coding our pipeline, we need to ensure our Google Cloud environment is properly configured.
Sub-heading 1.1: Enable Necessary APIs
First things first, let's make sure your Google Cloud project has the required APIs enabled.
Navigate to the Google Cloud Console.
In the search bar, type "Vertex AI" and go to the Vertex AI Dashboard.
If prompted, click "Enable All Recommended APIs". This typically includes APIs like Vertex AI API, Cloud Storage API, and others essential for ML workflows.
Alternatively, you can enable them manually by going to Navigation menu > APIs & Services > Enabled APIs & Services and searching for and enabling:
Vertex AI API
Cloud Storage API
Artifact Registry API (if you plan to store pipeline templates)
Sub-heading 1.2: Create a Cloud Storage Bucket
Vertex AI Pipelines often use Cloud Storage for storing pipeline artifacts, datasets, and model outputs.
In the Google Cloud Console, navigate to Cloud Storage > Buckets.
Click "CREATE BUCKET".
Give your bucket a unique name (e.g.,
your-project-id-vertex-ai-pipelines
).Choose a region close to where you'll be running your pipeline (e.g.,
us-central1
,asia-southeast1
). This helps with latency and data residency.Select a default storage class (Standard is usually fine for general use).
Click "CREATE".
Important: Make a note of your bucket URI (e.g.,
gs://your-project-id-vertex-ai-pipelines
). You'll need this later as your pipeline root.
Sub-heading 1.3: Configure Service Account Permissions
Your Vertex AI Pipeline will run under a service account. By default, it uses the Compute Engine default service account. For production environments, it's highly recommended to create a dedicated service account with the principle of least privilege.
In the Google Cloud Console, navigate to IAM & Admin > Service Accounts.
Click "+ CREATE SERVICE ACCOUNT".
Give it a Service account name (e.g.,
vertex-ai-pipeline-sa
).Click "CREATE AND CONTINUE".
Grant the following roles to this service account:
Vertex AI User
Storage Object Admin (for reading/writing to your Cloud Storage bucket)
You might need additional roles depending on the specific services your pipeline interacts with (e.g., BigQuery User, Cloud Functions Developer).
Click "DONE".
Step 2: Choosing Your Pipeline Framework and Setting Up Your Environment
Vertex AI Pipelines support both Kubeflow Pipelines (KFP) SDK and TensorFlow Extended (TFX). For this guide, we'll focus on KFP, which is widely used for general-purpose ML orchestration.
Sub-heading 2.1: Local Development Environment Setup
To write and test your pipeline code, you'll need a Python environment.
Install Python: Ensure you have Python 3.7 or higher installed.
Create a Virtual Environment: It's good practice to create a virtual environment for your project to manage dependencies.
Bashpython3 -m venv venv source venv/bin/activate # On Windows: .\venv\Scripts\activate
Install Kubeflow Pipelines SDK and Google Cloud Pipeline Components:
Bashpip install kfp google-cloud-aiplatform google-cloud-pipeline-components
kfp
is the Kubeflow Pipelines SDK.google-cloud-aiplatform
is the Vertex AI Python client library.google-cloud-pipeline-components
provides pre-built components for interacting with various Vertex AI services.
QuickTip: Pay attention to first and last sentences.
Sub-heading 2.2: Vertex AI Workbench (Optional but Recommended)
For a fully integrated development experience within Google Cloud, Vertex AI Workbench notebooks are an excellent choice. They come with many necessary libraries pre-installed and are integrated with your GCP project.
In the Google Cloud Console, navigate to Vertex AI > Workbench.
Click "USER-MANAGED NOTEBOOKS".
Click "CREATE NEW".
Choose a TensorFlow Enterprise or Python 3 instance.
Configure the instance name, region, and machine type as needed.
Click "CREATE".
Once the instance is running, click "OPEN JUPYTERLAB" to launch your development environment. You can then upload or clone your pipeline code here.
Step 3: Defining Your Pipeline Components
The core of any ML pipeline is its components. Each component performs a specific task in your ML workflow. You can create custom components or use pre-built ones provided by Google.
Sub-heading 3.1: Understanding Component Types
Python Function-based Components: These are the easiest to create. You write a Python function, and the KFP SDK packages it into a component. This is great for simple, self-contained tasks.
Container-based Components: For more complex tasks, or if your component needs specific libraries or environments, you can define it using a Docker container image. This offers maximum flexibility.
Google Cloud Pipeline Components (GCPC): These are pre-built, optimized components for common Vertex AI tasks like dataset creation, model training (AutoML or Custom), model deployment, and batch prediction. They significantly simplify pipeline development by abstracting away much of the underlying infrastructure.
Sub-heading 3.2: Creating a Simple Python Function-Based Component
Let's start with a very basic custom component. This component will simply preprocess some data.
Create a Python file (e.g., my_pipeline.py
) and add the following:
from kfp import dsl
from kfp.dsl import component, Output, Dataset
@component(
packages_to_install=["pandas", "scikit-learn"],
base_image="python:3.9"
)
def preprocess_data(
raw_data_path: str,
processed_data: Output[Dataset]
):
"""
A component that simulates data preprocessing.
Reads data, performs a simple transformation, and saves it.
"""
print(f"Reading raw data from: {raw_data_path}")
import pandas as pd
from sklearn.model_selection import train_test_split
# Simulate reading data (e.g., from a CSV in GCS)
# In a real scenario, you'd use a GCS client to read from raw_data_path
data = pd.DataFrame({
'feature1': [1, 2, 3, 4, 5],
'feature2': [10, 20, 30, 40, 50],
'target': [0, 1, 0, 1, 0]
})
# Simulate a simple preprocessing step (e.g., creating a new feature)
data['new_feature'] = data['feature1'] * data['feature2']
# Simulate saving processed data to the output path
# KFP automatically mounts the output path as a local directory
# and handles uploading to GCS
output_file = f"{processed_data.path}/processed_data.csv"
data.to_csv(output_file, index=False)
print(f"Processed data saved to: {output_file}")
# You can also add metadata to your output artifact
processed_data.metadata["rows"] = len(data)
processed_data.metadata["columns"] = len(data.columns)
print(f"Metadata added: {processed_data.metadata}")
Key takeaways from this component:
@component
: This decorator tells KFP that this Python function is a component.packages_to_install
: Specifies Python packages required by this component. KFP will install them in the container.base_image
: The Docker image to use as the base for this component's execution environment.raw_data_path: str
: A parameter input. Parameters are passed by value (like strings, integers).processed_data: Output[Dataset]
: An artifact output. Artifacts represent larger data objects (datasets, models, metrics) and are typically stored in Cloud Storage. KFP provides apath
attribute to the artifact object, which is a local path within the container where you should write your output. Vertex AI then handles moving this to your GCS pipeline root.
Sub-heading 3.3: Using Google Cloud Pipeline Components (GCPC)
GCPC simplify interaction with Vertex AI services. Let's add a component that trains an AutoML tabular model.
Add to my_pipeline.py
:
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
# Define your project ID and GCS bucket
PROJECT_ID = "your-gcp-project-id" # REPLACE WITH YOUR PROJECT ID
GCS_BUCKET_URI = "gs://your-gcp-project-id-vertex-ai-pipelines" # REPLACE WITH YOUR BUCKET URI
PIPELINE_ROOT = f"{GCS_BUCKET_URI}/pipeline_root"
@component(
packages_to_install=["google-cloud-aiplatform"], # Ensure this is installed in your component's environment if it's a custom component
base_image="python:3.9"
)
def create_dataset_from_gcs_uri(
project: str,
gcs_source_uri: str,
dataset_display_name: str,
) -> Output[Dataset]:
"""
Creates a Vertex AI Tabular Dataset from a GCS URI.
"""
from google.cloud import aiplatform
from google.cloud.aiplatform.compat.types import dataset as dataset_types
aiplatform.init(project=project, location="us-central1") # Adjust location as needed
# Check if dataset already exists to avoid re-creation errors in demonstrations
existing_datasets = aiplatform.TabularDataset.list(filter=f'display_name="{dataset_display_name}"')
if existing_datasets:
print(f"Dataset '{dataset_display_name}' already exists. Using existing dataset.")
return existing_datasets[0] # Return the existing dataset object
print(f"Creating Vertex AI Tabular Dataset: {dataset_display_name} from {gcs_source_uri}")
dataset = aiplatform.TabularDataset.create(
display_name=dataset_display_name,
gcs_source=[gcs_source_uri]
)
print(f"Dataset created with ID: {dataset.resource_name}")
return dataset
Note: The create_dataset_from_gcs_uri
component above is a custom component that leverages the google-cloud-aiplatform
SDK. While GCPC offer many pre-built components, sometimes you need to wrap aiplatform
SDK calls within custom components for specific interactions or to handle custom logic.
Step 4: Defining Your Pipeline Workflow
Now, let's stitch our components together to form a pipeline. A pipeline is essentially a Python function annotated with @kfp.dsl.pipeline
.
Add to my_pipeline.py
:
from kfp import dsl
from kfp.dsl import pipeline
@pipeline(
name="my-first-vertex-ai-pipeline",
description="A simple pipeline to preprocess data and train an AutoML model.",
pipeline_root=PIPELINE_ROOT # This is where pipeline artifacts will be stored
)
def my_ml_pipeline(
raw_data_gcs_uri: str,
dataset_display_name: str = "my_pipeline_dataset",
model_display_name: str = "my_pipeline_automl_model",
target_column: str = "target"
):
"""
Defines the end-to-end ML pipeline.
"""
# Step 1: Preprocess data using our custom component
preprocess_task = preprocess_data(
raw_data_path=raw_data_gcs_uri
)
# Step 2: Create a Vertex AI Dataset from the processed data
# We use the output path of the previous task as input for this one
create_dataset_task = create_dataset_from_gcs_uri(
project=PROJECT_ID,
gcs_source_uri=preprocess_task.outputs["processed_data"].path,
dataset_display_name=dataset_display_name
)
# Step 3: Train an AutoML Tabular Classification model
# We use the dataset created in the previous step
# This is a Google Cloud Pipeline Component
automl_training_task = AutoMLTabularTrainingJobRunOp(
project=PROJECT_ID,
display_name=f"{model_display_name}-training",
dataset=create_dataset_task.outputs["output"],
target_column=target_column,
model_display_name=model_display_name,
sync=True # Wait for the training job to complete
)
# Step 4: (Optional) Create an Endpoint and Deploy the Model
# These are also Google Cloud Pipeline Components
create_endpoint_task = EndpointCreateOp(
project=PROJECT_ID,
display_name=f"{model_display_name}-endpoint"
)
ModelDeployOp(
project=PROJECT_ID,
endpoint=create_endpoint_task.outputs["endpoint"],
model=automl_training_task.outputs["model"],
deployed_model_display_name=f"{model_display_name}-deployed",
machine_type="n1-standard-2", # Specify machine type for prediction
min_replica_count=1,
max_replica_count=1
)
Explanation of the pipeline definition:
@pipeline
: Decorator to define a KFP pipeline.name
anddescription
: Metadata for your pipeline.pipeline_root
: The GCS bucket where all pipeline run artifacts will be stored. This is crucial.raw_data_gcs_uri
,dataset_display_name
,model_display_name
,target_column
: These are pipeline parameters that you can set when you trigger a pipeline run.Chaining components: Notice how
create_dataset_task
takespreprocess_task.outputs["processed_data"].path
as itsgcs_source_uri
. This is how you define dependencies and pass data between pipeline steps. KFP automatically manages the lineage and movement of these artifacts.sync=True
forAutoMLTabularTrainingJobRunOp
means the pipeline will wait for this training job to complete before moving to the next step.The
ModelDeployOp
defines how the model should be deployed to the endpoint, including machine type and scaling.
Step 5: Compiling Your Pipeline
Once your pipeline Python function is defined, you need to compile it into a YAML file. This YAML file is the portable representation of your pipeline that Vertex AI understands.
QuickTip: Focus on what feels most relevant.
Add to my_pipeline.py
:
from kfp import compiler
if __name__ == "__main__":
# Compile the pipeline
compiled_pipeline_path = "my_ml_pipeline.json" # KFP can compile to JSON as well, which is essentially YAML
compiler.Compiler().compile(
pipeline_func=my_ml_pipeline,
package_path=compiled_pipeline_path
)
print(f"Pipeline compiled to: {compiled_pipeline_path}")
Run this Python script:
python my_pipeline.py
This will generate a
my_ml_pipeline.json
(or.yaml
) file in your current directory.
Step 6: Running Your Vertex AI Pipeline
There are several ways to run your compiled pipeline on Vertex AI. We'll cover the two most common: via the Google Cloud Console and programmatically using the Vertex AI SDK for Python.
Sub-heading 6.1: Running from Google Cloud Console
This is a great way to visually inspect and manage your pipeline runs.
Navigate to the Vertex AI Dashboard in the Google Cloud Console.
Go to Pipelines.
Click "CREATE RUN".
Under "Run details", select "Upload file".
Click "Browse" and upload the
my_ml_pipeline.json
(or.yaml
) file you compiled.Provide a Run name (e.g.,
my-first-pipeline-run-jul-8
).In the "Runtime configuration" section, expand "Advanced options" to set the Service account you configured earlier.
Crucially, under "Pipeline parameters", you'll need to provide the
raw_data_gcs_uri
. For a quick test, you can upload a dummy CSV to your bucket (e.g.,gs://your-bucket/data/dummy_data.csv
).Example dummy CSV content (dummy_data.csv):
Code snippetfeature1,feature2,target 10,100,0 20,200,1 30,300,0 40,400,1
Set
raw_data_gcs_uri
togs://your-bucket/data/dummy_data.csv
.
Click "SUBMIT".
You'll see your pipeline run appear in the list on the Pipelines page. Click on the run name to view its graph, logs, and artifact lineage. This is where you'll observe the magical orchestration!
Sub-heading 6.2: Running Programmatically with Vertex AI SDK for Python
For automation and integration into CI/CD workflows, running pipelines programmatically is ideal.
Add to my_pipeline.py
(after the compilation step):
from google.cloud import aiplatform
if __name__ == "__main__":
# ... (previous compilation code) ...
# Initialize Vertex AI SDK
aiplatform.init(
project=PROJECT_ID,
location="us-central1", # Adjust to your chosen region
staging_bucket=GCS_BUCKET_URI # This is where temporary pipeline files will be uploaded
)
# Create and run a pipeline job
job = aiplatform.PipelineJob(
display_name="my-first-pipeline-run-programmatic",
template_path=compiled_pipeline_path,
pipeline_root=PIPELINE_ROOT,
parameter_values={
"raw_data_gcs_uri": f"{GCS_BUCKET_URI}/data/dummy_data.csv",
"dataset_display_name": "my_programmatic_dataset",
"model_display_name": "my_programmatic_automl_model",
"target_column": "target"
},
enable_caching=False, # Set to True for faster re-runs if inputs haven't changed
service_account=f"vertex-ai-pipeline-sa@{PROJECT_ID}.iam.gserviceaccount.com" # Replace with your service account email
)
print("Submitting pipeline job...")
job.submit()
print("Pipeline job submitted. Check Vertex AI Pipelines dashboard for progress.")
# You can also wait for the job to complete
# job.wait_for_completion()
# print(f"Pipeline job completed with state: {job.state}")
Make sure you replace
PROJECT_ID
,GCS_BUCKET_URI
, and the service account email with your actual values.Run the script again:
python my_pipeline.py
.This will submit the pipeline job to Vertex AI, and you can monitor its progress in the Google Cloud Console.
Step 7: Monitoring and Debugging Your Pipeline
Pipelines, especially complex ones, often require debugging. Vertex AI provides excellent tools for this.
Sub-heading 7.1: Visualizing Pipeline Runs
In the Google Cloud Console, navigate to Vertex AI > Pipelines.
Click on the name of your pipeline run.
You'll see a graphical representation of your pipeline, showing each component as a node.
Green nodes indicate success, red nodes indicate failure, and blue nodes indicate running tasks.
Click on any component node to see its details, including:
Inputs and Outputs: What data went in and came out.
Logs: Crucial for debugging errors. Always check the logs first when a component fails!
Artifacts: Links to the data and models generated by that step in Cloud Storage.
Execution Details: Machine type, resource usage, etc.
Sub-heading 7.2: Interpreting Logs
When a pipeline component fails, the logs are your best friend.
From the component details page, click on the "Logs" tab.
Look for
ERROR
orFAILURE
messages. These will often point to issues in your component code, missing dependencies, or incorrect input parameters.Sometimes, errors are in the underlying Google Cloud service that the component interacts with (e.g., AutoML training job failure). The logs will usually provide enough context to investigate further in the respective service's logs.
Sub-heading 7.3: Retrying and Rerunning
If a pipeline run fails, you can often retry the entire run or specific failed tasks directly from the UI.
For persistent issues, you'll modify your code, recompile, and then create a new run.
Step 8: Advanced Concepts and Best Practices
As you become more comfortable, explore these advanced capabilities.
Tip: Keep your attention on the main thread.
Sub-heading 8.1: Conditional Logic
Pipelines can include conditional execution, allowing different branches of your workflow to run based on the output of a preceding step. For example, deploying a model only if its evaluation metrics meet a certain threshold.
from kfp.dsl import Condition
# ... (component definitions) ...
@pipeline(...)
def my_conditional_pipeline(...):
# ... (previous steps) ...
# Assume an evaluation component outputs a metric 'accuracy'
evaluation_result = evaluate_model(...)
with Condition(evaluation_result.outputs["accuracy"] > 0.8, name="deploy-if-accurate"):
ModelDeployOp(...)
Sub-heading 8.2: Parameterized Runs
Make your pipelines flexible by exposing parameters that can be changed at runtime (as shown in Step 6.1 and 6.2). This is essential for experimentation and deployment to different environments.
Sub-heading 8.3: Caching
Vertex AI Pipelines can cache component outputs. If a component is run with the exact same inputs as a previous run, it can skip execution and use the cached output, significantly speeding up development and debugging. Enable with enable_caching=True
when submitting the PipelineJob
.
Sub-heading 8.4: Version Control
Store your pipeline definitions (the Python code and the compiled YAML/JSON) in a version control system like Git. This ensures traceability and enables collaborative development.
Sub-heading 8.5: Custom Containers
For components with complex dependencies or non-Python code, build your own Docker containers.
Write a
Dockerfile
that sets up your environment and includes your component code.Build the Docker image and push it to Google Cloud Artifact Registry (or Container Registry).
In your
@component
decorator, specifybase_image="your-artifact-registry-path/your-image:tag"
.
Conclusion
Congratulations! You've just embarked on the journey of building robust and automated ML workflows with Vertex AI Pipelines. From setting up your environment and defining individual components to orchestrating them into a cohesive pipeline and running it on Google Cloud, you now have a solid foundation.
Remember, MLOps is an iterative process. Start simple, iterate, and leverage the powerful capabilities of Vertex AI to build scalable, reproducible, and efficient machine learning systems. Happy pipelining!
Frequently Asked Questions (FAQs)
How to install Kubeflow Pipelines SDK?
To install the Kubeflow Pipelines SDK, you can use pip: pip install kfp
. It's recommended to do this within a Python virtual environment.
QuickTip: Read actively, not passively.
How to define inputs and outputs for a pipeline component?
Inputs for a component are defined as function parameters. Outputs are defined using kfp.dsl.Output
types (like Output[Dataset]
, Output[Model]
) or by returning values for parameter outputs. KFP handles the transfer of artifacts and values between components.
How to compile a Kubeflow Pipeline?
You compile a Kubeflow Pipeline by calling kfp.compiler.Compiler().compile(pipeline_func=your_pipeline_function, package_path="pipeline.json")
on your pipeline Python function. This generates a YAML or JSON file.
How to run a compiled Vertex AI Pipeline?
You can run a compiled Vertex AI Pipeline through the Google Cloud Console by uploading the compiled YAML/JSON file, or programmatically using the Vertex AI SDK for Python's aiplatform.PipelineJob
class.
How to monitor a Vertex AI Pipeline run?
You monitor a Vertex AI Pipeline run directly in the Google Cloud Console under the "Vertex AI > Pipelines" section. The UI provides a visual graph of the run, detailed logs for each component, and artifact lineage.
How to troubleshoot a failed Vertex AI Pipeline component?
The primary way to troubleshoot a failed component is to examine its logs in the Google Cloud Console. Look for error messages, tracebacks, and any print statements from your component code.
How to pass parameters to a Vertex AI Pipeline?
Parameters are passed to a Vertex AI Pipeline when you create a run. In the Google Cloud Console, there's a "Pipeline parameters" section. Programmatically, you pass a dictionary of parameter_values
to the aiplatform.PipelineJob
constructor.
How to use pre-built Google Cloud Pipeline Components?
Import them from google_cloud_pipeline_components.v1
(or other versions as needed). Instantiate them within your pipeline function and provide the required inputs. Examples include AutoMLTabularTrainingJobRunOp
, ModelUploadOp
, and EndpointCreateOp
.
How to enable caching for Vertex AI Pipeline runs?
To enable caching for faster re-runs, set the enable_caching=True
argument when submitting your aiplatform.PipelineJob
. This instructs Vertex AI to reuse previous component outputs if their inputs haven't changed.
How to deploy a model from a Vertex AI Pipeline?
After model training, you typically use the ModelUploadOp
to register the model in Vertex AI Model Registry, followed by EndpointCreateOp
to create an endpoint, and ModelDeployOp
to deploy the uploaded model to that endpoint within your pipeline.
This page may contain affiliate links — we may earn a small commission at no extra cost to you.
💡 Breath fresh Air with this Air Purifier with washable filter.