Edit

Share via


Create and run machine learning pipelines by using components with the Machine Learning SDK v2

APPLIES TO: Python SDK azure-ai-ml v2 (current)

In this article, you learn how to build an Azure Machine Learning pipeline by using the Azure Machine Learning Python SDK v2 to complete an image classification task that contains three steps: prepare data, train an image classification model, and score the model. Machine Learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on machine learning instead of infrastructure and automation.

The example pipeline trains a small Keras convolutional neural network to classify images in the Fashion MNIST dataset. The pipeline looks like this:

Screenshot showing a pipeline graph of the image classification example.

In this article, you complete the following tasks:

  • Prepare input data for the pipeline job.
  • Create three components to prepare the data, train an image, and score the model.
  • Build a pipeline from the components.
  • Get access to a workspace that has compute.
  • Submit the pipeline job.
  • Review the output of the components and the trained neural network.
  • (Optional) Register the component for further reuse and sharing within the workspace.

If you don't have an Azure subscription, create a free account before you begin. Try the free or paid version of Azure Machine Learning today.

Prerequisites

  • An Azure Machine Learning workspace. If you don't have one, complete the Create resources tutorial.

  • A Python environment in which you've installed Azure Machine Learning Python SDK v2. For installation instructions, see Getting started. This environment is for defining and controlling your Azure Machine Learning resources and is separate from the environment that's used at runtime for training.

  • A clone of the examples repository.

    To run the training examples, first clone the examples repository and go to the sdk directory:

    git clone --depth 1 https://github.com/Azure/azureml-examples
    cd azureml-examples/sdk
    

Start an interactive Python session

This article uses the Azure Machine Learning Python SDK to create and control an Azure Machine Learning pipeline. The article is written based on the assumption that you'll be running the code snippets interactively in either a Python REPL environment or a Jupyter notebook.

This article is based on the image_classification_keras_minist_convnet.ipynb notebook, which you can find in the sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet directory of the Azure Machine Learning examples repository.

Import required libraries

Import all the Azure Machine Learning libraries that you need for this article:

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

Prepare input data for your pipeline job

You need to prepare the input data for the image classification pipeline.

Fashion MNIST is a dataset of fashion images that's divided into 10 classes. Each image is a 28 x 28 grayscale image. There are 60,000 training images and 10,000 test images. As an image classification problem, Fashion MNIST is more challenging than the classic MNIST handwritten digit database. It's distributed in the same compressed binary form as the original handwritten digit database.

By defining an Input, you create a reference to the data source ___location. The data remains in its existing ___location, so no extra storage cost is incurred.

Create components for building the pipeline

The image classification task can be split into three steps: prepare data, train the model, and score the model.

An Azure Machine Learning component is a self-contained piece of code that completes one step in a machine learning pipeline. In this article, you create three components for the image classification task:

  • Prepare data for training and test it.
  • Train a neural network for image classification by using training data.
  • Score the model by using test data.

For each component, you need to complete these steps:

  1. Prepare the Python script that contains the execution logic.

  2. Define the interface of the component.

  3. Add other metadata of the component, including the runtime environment and the command to run the component.

The next section shows how to create the components in two ways. For the first two components, you use a Python function. For the third component, you use YAML definition.

Create the data preparation component

The first component in this pipeline converts the compressed data files of fashion_ds into two .csv files, one for training and the other for scoring. You use a Python function to define this component.

If you're following along with the example in the Azure Machine Learning examples repo, the source files are already available in the prep folder. This folder contains two files to construct the component: prep_component.py, which defines the component, and conda.yaml, which defines the runtime environment of the component.

Define component by using a Python function

By using the command_component() function as a decorator, you can easily define the component's interface, its metadata, and the code to run from a Python function. Each decorated Python function will be transformed into a single static specification (YAML) that the pipeline service can process.

# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="prep_data",
    version="1",
    display_name="Prep Data",
    description="Convert data to CSV file, and split to training and test data",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def prepare_data_component(
    input_data: Input(type="uri_folder"),
    training_data: Output(type="uri_folder"),
    test_data: Output(type="uri_folder"),
):
    convert(
        os.path.join(input_data, "train-images-idx3-ubyte"),
        os.path.join(input_data, "train-labels-idx1-ubyte"),
        os.path.join(training_data, "mnist_train.csv"),
        60000,
    )
    convert(
        os.path.join(input_data, "t10k-images-idx3-ubyte"),
        os.path.join(input_data, "t10k-labels-idx1-ubyte"),
        os.path.join(test_data, "mnist_test.csv"),
        10000,
    )


def convert(imgf, labelf, outf, n):
    f = open(imgf, "rb")
    l = open(labelf, "rb")
    o = open(outf, "w")

    f.read(16)
    l.read(8)
    images = []

    for i in range(n):
        image = [ord(l.read(1))]
        for j in range(28 * 28):
            image.append(ord(f.read(1)))
        images.append(image)

    for image in images:
        o.write(",".join(str(pix) for pix in image) + "\n")
    f.close()
    o.close()
    l.close()

The preceding code defines a component with display name Prep Data by using the @command_component decorator:

  • name is the unique identifier of the component.

  • version is the current version of the component. A component can have multiple versions.

  • display_name is a friendly display name of the component for UI. It isn't unique.

  • description usually describes the task the component can complete.

  • environment specifies the runtime environment for the component. The environment of this component specifies a Docker image and refers to the conda.yaml file.

    The conda.yaml file contains all packages used for the component:

    name: imagekeras_prep_conda_env
    channels:
      - defaults
    dependencies:
      - python=3.7.11
      - pip=20.0
      - pip:
        - mldesigner==0.1.0b4
    
  • The prepare_data_component function defines one input for input_data and two outputs for training_data and test_data. input_data is the input data path. training_data and test_data are output data paths for training data and test data.

  • The component converts the data from input_data into a training_data .csv to train data and a test_data .csv to test data.

This is what a component looks like in the studio UI:

  • A component is a block in a pipeline graph.
  • input_data, training_data, and test_data are ports of the component, which connect to other components for data streaming.

Screenshot of the Prep Data component in the UI and code.

You've now prepared all source files for the Prep Data component.

Create the model training component

In this section, you'll create a component for training the image classification model in a Python function, as you did with the Prep Data component.

Because the training logic is more complicated, you'll put the training code in a separate Python file.

The source files for this component are in the train folder in the Azure Machine Learning examples repo. This folder contains three files to construct the component:

  • train.py contains the logic to train the model.
  • train_component.py defines the interface of the component and imports the function that's in train.py.
  • conda.yaml defines the runtime environment of the component.

Get a script that contains the logic

The train.py file contains a normal Python function that performs the logic for training a Keras neural network for image classification. To view the code, see the train.py file on GitHub.

Define the component by using a Python function

After you define the training function, you can use @command_component in the Azure Machine Learning SDK v2 to wrap your function as a component that can be used in Azure Machine Learning pipelines:

import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="train_image_classification_keras",
    version="1",
    display_name="Train Image Classification Keras",
    description="train image classification with keras",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def keras_train_component(
    input_data: Input(type="uri_folder"),
    output_model: Output(type="uri_folder"),
    epochs=10,
):
    # avoid dependency issue, execution logic is in train() func in train.py file
    from train import train

    train(input_data, output_model, epochs)

The preceding code defines a component that has display name Train Image Classification Keras by using @command_component.

  • The keras_train_component function defines one input, input_data, for source training data, one input, epochs, which specifies the number of epochs to use during training, and one output, output_model, which specifies the output path for the model file. The default value of epochs is 10. The logic of this component is from the train() function in train.py.

The train model component has a slightly more complex configuration than the prepare data component. The conda.yaml looks like this:

name: imagekeras_train_conda_env
channels:
  - defaults
dependencies:
  - python=3.8
  - pip=20.2
  - pip:
    - mldesigner==0.1.0b12
    - azureml-mlflow==1.50.0
    - tensorflow==2.7.0
    - numpy==1.21.4
    - scikit-learn==1.0.1
    - pandas==1.3.4
    - matplotlib==3.2.2
    - protobuf==3.20.0

You've now prepared all the source files for the Train Image Classification Keras component.

Create the model scoring component

In this section, you create a component to score the trained model via YAML specification and script.

If you're following along with the example in the Azure Machine Learning examples repo, the source files are already available in the score folder. This folder contains three files to construct the component:

  • score.py contains the source code of the component.
  • score.yaml defines the interface and other details of the component.
  • conda.yaml defines the runtime environment of the component.

Get a script that contains the logic

The score.py file contains a normal Python function that performs the training model logic:

from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model

import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow


def get_file(f):

    f = Path(f)
    if f.is_file():
        return f
    else:
        files = list(f.iterdir())
        if len(files) == 1:
            return files[0]
        else:
            raise Exception("********This path contains more than one file*******")


def parse_args():
    # setup argparse
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument(
        "--input_data", type=str, help="path containing data for scoring"
    )
    parser.add_argument(
        "--input_model", type=str, default="./", help="input path for model"
    )

    parser.add_argument(
        "--output_result", type=str, default="./", help="output path for model"
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


def score(input_data, input_model, output_result):

    test_file = get_file(input_data)
    data_test = pd.read_csv(test_file, header=None)

    img_rows, img_cols = 28, 28
    input_shape = (img_rows, img_cols, 1)

    # Read test data
    X_test = np.array(data_test.iloc[:, 1:])
    y_test = to_categorical(np.array(data_test.iloc[:, 0]))
    X_test = (
        X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
    )

    # Load model
    files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
    model = load_model(input_model + "/" + files[0])

    # Log metrics of the model
    eval = model.evaluate(X_test, y_test, verbose=0)

    mlflow.log_metric("Final test loss", eval[0])
    print("Test loss:", eval[0])

    mlflow.log_metric("Final test accuracy", eval[1])
    print("Test accuracy:", eval[1])

    # Score model using test data
    y_predict = model.predict(X_test)
    y_result = np.argmax(y_predict, axis=1)

    # Output result
    np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")


def main(args):
    score(args.input_data, args.input_model, args.output_result)


# run script
if __name__ == "__main__":
    # parse args
    args = parse_args()

    # call main function
    main(args)

The code in score.py takes three command-line arguments: input_data, input_model, and output_result. The program scores the input model by using input data and then outputs the result.

Define the component via YAML

In this section, you'll learn how to create a component specification in the valid YAML component specification format. This file specifies the following information:

  • Metadata. Name, display name, version, type, and so on.
  • Interface. Inputs and outputs.
  • Command, code, and environment. The command, code, and environment used to run the component.
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command

name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
  input_data: 
    type: uri_folder
  input_model:
    type: uri_folder
outputs:
  output_result:
    type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
  conda_file: ./conda.yaml
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
  • name is the unique identifier of the component. Its display name is Score Image Classification Keras.
  • This component has two inputs and one output.
  • The source code path is defined in the code section. When the component is run in the cloud, all files from that path will be uploaded as the snapshot of the component.
  • The command section specifies the command to execute when the component runs.
  • The environment section contains a Docker image and a conda YAML file. The source file is in the sample repository.

You now have all the source files for the model scoring component.

Load the components to build a pipeline

You can import the data preparation component and the model training component, which are defined by Python functions, just like normal Python functions.

The following code imports the prepare_data_component() and keras_train_component() functions from the prep_component.py file in the prep folder and the train_component file in the train folder, respectively.

%load_ext autoreload
%autoreload 2

# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component

# print hint of components
help(prepare_data_component)
help(keras_train_component)

You can use the load_component() function to load the score component, which is defined by YAML.

# load component function from yaml
keras_score_component = load_component(source="./score/score.yaml")

Build your pipeline

You've created and loaded all the components and input data to build the pipeline. You can now compose them into a pipeline:

Note

To use serverless compute, add from azure.ai.ml.entities import ResourceConfiguration to the top of the file. Then replace:

  • default_compute=cpu_compute_target with default_compute="serverless".
  • train_node.compute = gpu_compute_target with train_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2).
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
    default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
    """E2E image classification pipeline with keras using python sdk."""
    prepare_data_node = prepare_data_component(input_data=pipeline_input_data)

    train_node = keras_train_component(
        input_data=prepare_data_node.outputs.training_data
    )
    train_node.compute = gpu_compute_target

    score_node = keras_score_component(
        input_data=prepare_data_node.outputs.test_data,
        input_model=train_node.outputs.output_model,
    )


# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=mnist_ds)

The pipeline has a default compute cpu_compute_target. If you don't specify compute for a specific node, that node will run on the default compute.

The pipeline has a pipeline-level input, pipeline_input_data. You can assign a value to pipeline input when you submit a pipeline job.

The pipeline contains three nodes: prepare_data_node, train_node, and score_node.

  • The input_data of prepare_data_node uses the value of pipeline_input_data.

  • The input_data of train_node is the training_data output of prepare_data_node.

  • The input_data of score_node is the test_data output of prepare_data_node, and the input_model is the output_model of train_node.

  • Because train_node trains a CNN model, you can specify its compute as the gpu_compute_target. Doing so can improve the training performance.

Submit your pipeline job

Now that you constructed the pipeline, you can submit the job to your workspace. To submit a job, you first need to connect to a workspace.

Get access to your workspace

Configure credentials

You'll use DefaultAzureCredential to get access to the workspace. DefaultAzureCredential should be capable of handling most Azure SDK authentication scenarios.

If DefaultAzureCredential doesn't work for you, see this configure credential example and identity Package.

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

Get a handle to a workspace that has compute

Create an MLClient object to manage Azure Machine Learning services. If you use serverless compute, you don't need to create these computes.

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))

Important

This code snippet expects the workspace configuration JSON file to be saved in the current directory or its parent. For more information on creating a workspace, see Create workspace resources. For more information on saving the configuration to a file, see Create a workspace configuration file.

Submit the pipeline job to the workspace

Now that you have a handle to your workspace, you can submit your pipeline job:

pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

The preceding code submits this image classification pipeline job to an experiment called pipeline_samples. It automatically creates the experiment if it doesn't exist. pipeline_input_data uses fashion_ds.

The call to submit the experiment completes quickly. It produces output similar to this:

Experiment Name Type Status Details page
pipeline_samples sharp_pipe_4gvqx6h1fb pipeline Preparing Link to Azure Machine Learning studio.

You can monitor the pipeline run by selecting the link. Or you can block it until it completes by running this code:

# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

Important

The first pipeline run takes about 15 minutes. All dependencies are downloaded, a Docker image is created, and the Python environment is provisioned and created. Running the pipeline again takes significantly less time because those resources are reused instead of created. However, total runtime for the pipeline depends on the workload of your scripts and the processes that run in each pipeline step.

Check outputs and debug your pipeline in the UI

You can select the Link to Azure Machine Learning studio, which is the job detail page of your pipeline. You'll see the pipeline graph:

Screenshot of the pipeline job detail page.

You can check the logs and outputs of each component by right-clicking the component, or select the component to open its detail pane. To learn more about how to debug your pipeline in the UI, see Use Azure Machine Learning studio to debug pipeline failures.

(Optional) Register components to the workspace

In the previous section, you built a pipeline by using three components to complete an image classification task. You can also register components to your workspace so that they can be shared and reused within the workspace. The following example shows how to register the data preparation component:

try:
    # try get back the component
    prep = ml_client.components.get(name="prep_data", version="1")
except:
    # if not exists, register component using following code
    prep = ml_client.components.create_or_update(prepare_data_component)

# list all components registered in workspace
for c in ml_client.components.list():
    print(c)

You can use ml_client.components.get() to get a registered component by name and version. You can use ml_client.components.create_or_update() to register a component that was previously loaded from a Python function or YAML.

Next steps