Logo

dev-resources.site

for different kinds of informations.

Automating BigQuery Data Preprocessing and AutoML with Vertex AI Pipelines

Published at
12/6/2024
Categories
machinelearning
ai
googlecloud
beginners
Author
suzuki0430
Author
10 person written this
suzuki0430
open
Automating BigQuery Data Preprocessing and AutoML with Vertex AI Pipelines

I previously used BigQuery data updated daily for monthly model training. However, the manual processes of data preprocessing, labeling, and training were prone to frequent errors. To improve operational efficiency, I automated these processes using Google Cloud's Vertex AI Pipelines.

What Is Vertex AI Pipelines?

Vertex AI Pipelines is a service that allows you to build and manage machine learning pipelines on Google Cloud. With Kubeflow Pipelines SDK or TFX Pipeline DSL, you can efficiently execute pipelines in a serverless environment without worrying about managing Kubernetes clusters.

It is highly compatible with the Google Cloud ecosystem and provides the following benefits:

  • Prebuilt components for BigQuery and AutoML
  • Scheduled execution without relying on Cloud Scheduler
  • Serverless and scalable

Implementation Steps

File Structure

The files necessary for defining and running the pipeline are organized as follows:

- sql_queries/
  - add_reaction_label.sql
  - training_pre_processing.sql
  - undersampling.sql
- pipeline_definition.py
- pipeline_notebook.ipynb
Enter fullscreen mode Exit fullscreen mode
  • sql_queries: Stores SQL scripts for preprocessing
  • pipeline_definition.py: Contains the pipeline definition
  • pipeline_notebook.ipynb: Jupyter notebook for compiling and running the pipeline

Pipeline Definition

Below is the pipeline definition in pipeline_definition.py:

from google.cloud import aiplatform
from kfp import dsl
from kfp.dsl import component
import datetime
from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

aiplatform.init(project=PROJECT_ID, location="asia-northeast1")

RECIPIENTS_LIST = ["[email protected]"]  # NOTE: You can register up to three email addresses
PROJECT_ID = "your-project-id"

def load_sql(file_path):
    with open(file_path, "r") as file:
        return file.read()

@dsl.component(base_image='python:3.12', packages_to_install=['google-cloud-bigquery'])
def create_bigquery_op(dataset_name: str, location: str) -> str:
    """Creates a BigQuery dataset if it does not exist."""
    from google.cloud import bigquery
    client = bigquery.Client(project=PROJECT_ID)
    dataset_id = f"{PROJECT_ID}.{dataset_name}"
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = location
    dataset = client.create_dataset(dataset, exists_ok=True)
    print(f"Dataset {dataset_id} created.")
    return dataset_id

@dsl.pipeline(name="data-preprocessing-and-training-pipeline")
def my_pipeline():    
    today = datetime.date.today().strftime('%Y%m%d')
    dest_dataset = f"pre_processed_dataset_{today}"
    create_dataset_op = create_bigquery_op(
        dataset_name=dest_dataset, location="asia-northeast1")

    label_sql = load_sql("sql_queries/add_reaction_label.sql")
    preprocess_sql = load_sql("sql_queries/training_pre_processing.sql")
    undersampling_sql = load_sql("sql_queries/undersampling.sql")

    formatted_label_sql = label_sql.format(
        dataset=dest_dataset, table="add_reaction_label")
    formatted_preprocess_sql = preprocess_sql.format(
        dataset=dest_dataset, table="training_preprocessed")
    formatted_undersampling_sql = undersampling_sql.format(
        dataset=dest_dataset, table="summary_all_processed_undersampling")

    notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

    with dsl.ExitHandler(notify_email_op):
        label_sql_op = BigqueryQueryJobOp(
            query=formatted_label_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(create_dataset_op)

        preprocess_sql_op = BigqueryQueryJobOp(
            query=formatted_preprocess_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(label_sql_op)

        undersampling_sql_op = BigqueryQueryJobOp(
            query=formatted_undersampling_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(preprocess_sql_op)

        dataset_create_op = TabularDatasetCreateOp(
            display_name=f"tabular_dataset_from_bigquery_{today}",
            bq_source=f"bq://{PROJECT_ID}.{dest_dataset}.summary_all_processed_undersampling",
            project="{{$.pipeline_google_cloud_project_id}}",
            location="asia-northeast1"
        ).after(undersampling_sql_op)

        model_training_op = AutoMLTabularTrainingJobRunOp(
            display_name=f"visitor_prediction_model_{today}",
            dataset=dataset_create_op.outputs["dataset"],
            target_column="reaction_score",
            training_fraction_split=0.8,
            validation_fraction_split=0.1,
            test_fraction_split=0.1,
            budget_milli_node_hours=72000,
            project="{{$.pipeline_google_cloud_project_id}}",
            optimization_prediction_type="classification",
            location="asia-northeast1"
        ).after(dataset_create_op)
Enter fullscreen mode Exit fullscreen mode

Adding Execution Date to Dataset Names

To distinguish datasets for each pipeline run, the execution date is included in the dataset name. This is dynamically generated using the create_bigquery_op component.

today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
    dataset_name=dest_dataset, location="asia-northeast1")
Enter fullscreen mode Exit fullscreen mode

Dynamically Loading SQL Files with Variables

To reuse the same pipeline with different datasets or table names, dynamic placeholders are used in SQL scripts. SQL files are loaded using the load_sql function, and placeholders like {dataset} and {table} are dynamically replaced.

def load_sql(file_path):
    with open(file_path, "r") as file:
        return file.read()

undersampling_sql = load_sql("sql_queries/undersampling.sql")

formatted_undersampling_sql = undersampling_sql.format(
        dataset=dest_dataset, table="summary_all_processed_undersampling")
Enter fullscreen mode Exit fullscreen mode

Example of undersampling.sql:

CREATE OR REPLACE TABLE `{dataset}.{table}` AS
WITH class_counts AS (
  SELECT reaction_score, COUNT(*) as count
  FROM `{dataset}.training_preprocessed`
  GROUP BY reaction_score
),
median_count AS (
  SELECT APPROX_QUANTILES(count, 2)[OFFSET(1)] as target_count
  FROM class_counts
)
SELECT data.*
FROM `{dataset}.training_preprocessed` data
JOIN class_counts
ON data.reaction_score = class_counts.reaction_score
JOIN median_count
ON TRUE
WHERE RAND() < (median_count.target_count / class_counts.count);
Enter fullscreen mode Exit fullscreen mode

Using Google Cloud Pipeline Components

Google Cloud Pipeline Components simplify interactions with various Google Cloud services. The components used in this pipeline include:

  • BigqueryQueryJobOp: Runs SQL on BigQuery
  • TabularDatasetCreateOp: Registers a BigQuery table as a dataset in Vertex AI
  • AutoMLTabularTrainingJobRunOp: Runs AutoML training jobs for tabular data
  • VertexNotificationEmailOp: Sends notifications to specified email addresses

Controlling Execution Order

Execution dependencies between components are controlled using the .after() method. For example:

label_sql_op = BigqueryQueryJobOp(
    query=formatted_label_sql,
    location="asia-northeast1",
    project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)
Enter fullscreen mode Exit fullscreen mode

Sending Email Notifications at Pipeline Completion

To send notifications upon pipeline completion, the dsl.ExitHandler is used. It wraps the pipeline steps and ensures that notifications are sent regardless of success or failure.

notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

with dsl.ExitHandler(notify_email_op):
    label_sql_op = BigqueryQueryJobOp(
        query=formatted_label_sql,
        location="asia-northeast1",
        project="{{$.pipeline_google_cloud_project_id}}"
    ).after(create_dataset_op)
Enter fullscreen mode Exit fullscreen mode

Sample email notification:

Notification Email Screenshot

Executing the Pipeline

To run Vertex AI Pipelines, you first compile the pipeline and then execute it either on-demand or on a schedule. For this example, Vertex AI Workbench was used as the execution environment.

Screenshot 2024-12-06 14.05.19.png

On-Demand Execution

For on-demand execution, the pipeline definition is first compiled into a YAML file. Then, the compiled YAML is used to execute the pipeline.

The following code compiles the pipeline definition from pipeline_definition.py and saves it as a compiled_pipeline.yaml file:

# pipeline_notebook.ipynb
from google.cloud import aiplatform
from kfp import compiler
import pipeline_definition

aiplatform.init(project=<project-id>, location="asia-northeast1")

compiler.Compiler().compile(
    pipeline_func=pipeline_definition.my_pipeline,
    package_path="compiled_pipeline.yaml"
)
Enter fullscreen mode Exit fullscreen mode

Using the compiled compiled_pipeline.yaml, the pipeline can be executed with the following code:

# pipeline_notebook.ipynb
aiplatform.PipelineJob(
    display_name="data-preprocessing-and-training-pipeline",
    template_path="compiled_pipeline.yaml",
    parameter_values={},
    # enable_caching=False
).submit()
Enter fullscreen mode Exit fullscreen mode

When the execution succeeds, the results are displayed in the Vertex AI console, allowing you to track the progress and status of each step.

Screenshot 2024-12-06 13.29.11.png

Scheduled Execution

To execute the pipeline on a regular schedule, use the PipelineJob.create_schedule method. The following example creates a schedule to run the pipeline at 9:00 AM JST on the first day of every month:

# pipeline_notebook.ipynb
pipeline_job = aiplatform.PipelineJob(
    display_name="data-preprocessing-and-training-pipeline",
    template_path="compiled_pipeline.yaml",
    parameter_values={},
    # enable_caching=False
)

pipeline_job_schedule = pipeline_job.create_schedule(
    display_name="monthly-data-preprocessing-and-training",
    cron="TZ=Asia/Tokyo 0 9 1 * *",  # 9:00 AM JST on the 1st of every month
    max_concurrent_run_count=1,
    max_run_count=None
)
Enter fullscreen mode Exit fullscreen mode

Once the schedule is registered, you can verify it in the Vertex AI schedule tab, where the following screen will be displayed:

Screenshot 2024-12-06 13.32.22.png

Conclusion

By leveraging Vertex AI Pipelines, I successfully automated the previously error-prone manual processes of data preprocessing, labeling, and model training. This solution not only improved operational efficiency but also provided scalability and reliability for handling growing data volumes. Vertex AI’s seamless integration with BigQuery and AutoML components made it straightforward to build a robust end-to-end pipeline, which can now be executed both on-demand and on a schedule with minimal manual intervention.

googlecloud Article's
30 articles in total
Favicon
Are millions of accounts vulnerable due to Google's OAuth Flaw?
Favicon
How to Become a Google Cloud Platform (GCP) Engineer in 2025
Favicon
How I Use Google Cloud AI for Programming
Favicon
Google Cloud Shell: Establishing Secure Connections via SSH
Favicon
Deploy your Flask API on GCP Cloud Run 🚀
Favicon
Create spot instances on GCP & AWS
Favicon
Sécuriser l'association entre un projet et son compte de facturation sur Google Cloud Platform
Favicon
Top 6 Big Data Trends and Future Predictions
Favicon
My SaaS passed CASA Tier 2 Assessment and yours can too. Here is how
Favicon
A Complete Guide to Database Services in Google Cloud Platform: Features, Capacity, and Popularity
Favicon
🏆 Branching to Level Up in the Cloud! ☁️
Favicon
🌟 Ticket - 2024: Status - Closed 🏆
Favicon
Data Transfer from S3 to Cloud Storage using GCP Storage Transfer Service
Favicon
Creating a Local Environment to Operate GCS Emulator from Flask
Favicon
Why Rust is the Future of AI and ML Ops
Favicon
How Data Analytics in the Cloud Can Level Up Your App
Favicon
Crack the AWS Solution Architect Associate Exam with Eduleem's Expert Training Program!
Favicon
Top Tips: Savings Plans vs. Reserved Instances Across AWS, Azure, and GCP
Favicon
Why Cloud Monitoring is Non-Negotiable for Today’s Tech Giants
Favicon
Why Cloud Provider’s Native Tooling Struggles to Deliver FinOps Excellence
Favicon
Cloud Migration Made Easy: Expert Solutions for Seamless Business Transition
Favicon
HOW TO CREATE A PRESIGNED URL FOR AN S3 (simple storage service)aws
Favicon
NgSysV2-5.3: A Young Person's Introduction to the Google Cloud Shell
Favicon
Read “Enhance Your AI Interactions with Vertex AI Prompt Optimizer“ by Daksh Srivastava on Medium
Favicon
The Secrets of Cloud Services — Avoid Hidden Traps and Maximize Benefits
Favicon
Top 45+ GCP Interview Questions in 2025 - Google Cloud Platform Interview Tips
Favicon
Automating BigQuery Data Preprocessing and AutoML with Vertex AI Pipelines
Favicon
Cloudnosys Now Available on Google Cloud Marketplace
Favicon
Buy Best Cloud Accounts
Favicon
Mastering Cloud Costs Optimization: Proven Strategies And Best Practices

Featured ones: