Apache Liminal: when MLOps meets GitOps

Apache Liminal is an open-source software program which proposes an answer to deploy end-to-end Machine Studying pipelines. Certainly it permits to centralize all of the steps wanted to assemble Machine Studying fashions, from knowledge cleansing to mannequin deployment.

This answer proposes a declarative strategy for MLOps tasks. The pipeline that encapsulates the totally different steps for the preparation, coaching and deployment of your Machine Studying is written in YAML.

This file, and the Python scripts that it factors to, are simply versioned utilizing instruments like Git, opening the door of a GitOps apply. GitOps describes an structure through which the system is reproducable from the state saved in a Git repository. Information engineers and knowledge scientists can then collaborate collectively to enhance the mannequin.

Apache Liminal leverages Apache Airflow, Docker and Kubernetes in oder to create and deploy our pipeline.

Set up

To breed all of the instructions discovered on this article, Apache Liminal requires to put in Docker and Kubernetes in your machine. Kubernetes will be put in with minikube.

If you’re on MacOS with Docker already put in, the simplest strategy is to activate Kubernetes by ticking the field labeled “Deploy Docker Stacks to Kubernetes by default” in Docker Desktop.


img

Subsequent you possibly can set up Apache Liminal utilizing pip.

pip set up
git+https://github.com/apache/incubator-liminal.git

Creation of a Liminal pipeline

Creation of the Python scripts

Let’s begin by making a folder on the root of our venture listing to collect all mandatory Python scripts for our pipeline.

Inside we first create our necessities.txt file for dependencies administration. Apache Liminal will use this file to put in all of the listed Python packages wanted to make sure the correct functioning of our scripts on Docker photographs. In our instance we’re going to use the next packages:

urllib3
pandas
numpy
tensorflow
scikit-learn

In our use-case the info preparation step will likely be primarily decreased to the obtain of the dataset. We’re going to use the wine-quality.csv file to coach our mannequin. As we are going to see later, these knowledge will likely be straight accessible from the pods.

We’re going to create a file named obtain.py that can comprise all of the logic to obtain the file and clear the info:



import urllib3
import pandas as pd
import numpy as np
import os

PATH = "/mnt/knowledge/"
file_path = str(PATH) + "file.csv"

http = urllib3.PoolManager()
url = os.environ['url']

r = http.request('GET', url)

if os.path.exists(file_path):
    os.take away(file_path)
else:
    print("file not exist")

with open(file_path, 'xb') as f:
        f.write(r.knowledge)

dataset = pd.read_csv(file_path)

for subject in dataset.columns:
    if sort(dataset[field][0]) == np.int64 :
        new_field = subject.exchange(' ', '_')
        dataset = dataset.rename(columns=subject : new_field)
        print('i - subject = ' + str(new_field))
    elif sort(dataset[field][0]) == np.float64 :
        new_field = subject.exchange(' ', '_')
        dataset = dataset.rename(columns=subject : new_field)
        print('f - subject = ' + str(new_field))

dataset.to_csv(file_path, index=False)

Right here we get the file utilizing an surroundings variable named url which is outlined in our YAML script as adopted:

env_vars:
  url: "https://uncooked.githubusercontent.com/mlflow/mlflow/grasp/examples/sklearn_elasticnet_wine/wine-quality.csv"

Subsequent we create a python script named wine_linear_regression.py to coach our mannequin:



import os
import sys

import numpy as np
import pandas as pd
import tensorflow as tf

from six.strikes import urllib
from sklearn.model_selection import train_test_split




PATH = "/mnt/knowledge/"

path = str(PATH) + "file.csv"

dataset = pd.read_csv(path)

labels = dataset['quality'].tolist()

dataset = dataset.drop(["quality"], axis=1)

x_train, x_test, y_train, y_test = train_test_split(dataset,
                                                    labels,
                                                    train_size=0.9)

NUMERIC_COLUMNS = ['alcohol', 'chlorides', 'citric_acid', 'density', 'fixed_acidity',
                   'free_sulfur_dioxide', 'pH', 'residual_sugar', 'sulphates', 'total_sulfur_dioxide',
                   'volatile_acidity']

CATEGORICAL_COLUMNS = ['quality']

feature_columns = []

for feature_name in NUMERIC_COLUMNS:
    feature_columns.append(tf.feature_column.numeric_column(feature_name, dtype=tf.float32))

def make_input_fn(data_df, label_df, num_epochs=10, shuffle=True, batch_size=32):
  def input_function():
    ds = tf.knowledge.Dataset.from_tensor_slices((dict(data_df), label_df))
    if shuffle:
      ds = ds.shuffle(1000)
    ds = ds.batch(batch_size).repeat(num_epochs)
    return ds
  return input_function

train_input_fn = make_input_fn(x_train, y_train)
eval_input_fn = make_input_fn(x_test, y_test, num_epochs=1, shuffle=False)


linear_est = tf.estimator.LinearRegressor(
    feature_columns=feature_columns,
    model_dir=str(PATH) + "practice"
)

linear_est.practice(train_input_fn)

end result = linear_est.consider(eval_input_fn)

print("--> OUTPUT = " + str(end result))

def serving_input_receiver_fn():
    inputs = 
    for feat in feature_columns:
        inputs[feat.name] = tf.compat.v1.placeholder(form=[None], dtype=feat.dtype)

    print("--> INPUTS = " + str(inputs))
    return tf.estimator.export.ServingInputReceiver(inputs, inputs)

linear_est.export_saved_model(export_dir_base=str(PATH) + "mannequin", serving_input_receiver_fn=serving_input_receiver_fn)

Lastly we create a python script to match the efficacy of the final educated mannequin with the mannequin operating in manufacturing as a way to all the time maintain operating the very best mannequin. All of the code will likely be written in a file named validation.py:

import pandas
import random
from pathlib import Path
import tensorflow as tf
import numpy as np
import sys
import os


PATH = "/mnt/knowledge/"


model_dir = str(PATH) + "mannequin"
subdirs = [x for x in Path(model_dir).iterdir()
           if x.is_dir() and 'temp' not in str(x)]
newest = str(sorted(subdirs)[-1])

print("--> LATEST = " + str(newest))


model_prod_dir = str(PATH) + "model_prod"
if not os.path.exists(model_prod_dir):
    os.makedirs(model_prod_dir)
subdirs_prod = [x for x in Path(model_prod_dir).iterdir()
           if x.is_dir() and 'temp' not in str(x)]

if not subdirs_prod:
    os.rename(newest, model_prod_dir + "/" + newest.break up("/")[-1])
    sys.exit(0)

latest_prod = str(sorted(subdirs_prod)[-1])
print("--> PROD = " + str(latest_prod))


randomlist = []

df = pandas.read_csv(str(PATH) + 'file.csv')
nb_raw = len(df)
for i in vary(0, int((nb_raw/10))):
    n = random.randint(0,nb_raw)
    if n<nb_raw and n>=0:
        randomlist.append(n)
    else:
        print(" _BAD_RANDOM_ ")





def build_predict(df, mannequin):
    res = mannequin(chlorides=tf.fixed(df['chlorides'], dtype=tf.float32, form=1),
           alcohol=tf.fixed(df['alcohol'], dtype=tf.float32, form=1),
           citric_acid=tf.fixed(df['citric_acid'], dtype=tf.float32, form=1),
           residual_sugar=tf.fixed(df['residual_sugar'], dtype=tf.float32, form=1),
           total_sulfur_dioxide=tf.fixed(df['total_sulfur_dioxide'], dtype=tf.float32, form=1),
           free_sulfur_dioxide=tf.fixed(df['free_sulfur_dioxide'], dtype=tf.float32, form=1),
           pH=tf.fixed(df['pH'], dtype=tf.float32, form=1),
           fixed_acidity=tf.fixed(df['fixed_acidity'], dtype=tf.float32, form=1),
           sulphates=tf.fixed(df['sulphates'], dtype=tf.float32, form=1),
           density=tf.fixed(df['density'], dtype=tf.float32, form=1),
           
           volatile_acidity=tf.fixed(df['volatile_acidity'], dtype=tf.float32, form=1)
          )
    return res

mannequin = tf.saved_model.load(export_dir=str(newest)).signatures['predict']

model_prod = tf.saved_model.load(export_dir=str(latest_prod)).signatures['predict']

pred = []
pred_prod = []
score_train=0
score_prod=0



for x in randomlist:
    worth = df.drop(["quality"], axis=1).iloc[x]
    actual = df['quality'].iloc[x]
    pred_train = spherical(np.array(build_predict(worth, mannequin)['predictions'])[0][0])
    if actual == pred_train:
        score_train += 1
    pred_prod = spherical(np.array(build_predict(worth, model_prod)['predictions'])[0][0])
    if actual == pred_prod:
        score_prod += 1

print("score_train : " + str(score_train))
print("score_prod : " + str(score_prod))



if score_train > score_prod:
    model_old_dir = str(PATH) + "model_old"
    if not os.path.exists(model_old_dir):
        os.makedirs(model_prod_dir)
    os.rename(latest_prod, str(PATH) + "model_old/" + latest_prod.break up("/")[-1])
    os.rename(newest, model_prod_dir + "/" + newest.break up("/")[-1])

Creation of the pipeline

Now we’re going to create a YAML file on the root of our venture listing named liminal.yml. First let’s declare our mounting volumes. For that we create a Kubernetes quantity named knowledge linked to the listing the place our liminal.yml file is situated.

identify: GettingStartedPipeline
volumes:
  - quantity: knowledge
    native:
      path: .

Subsequent we are going to construction and declare the ordering of our pipeline utilizing a duties. A duties consists of a number of job and is characterised by:

  • job that’s the identify of the duty (watch out every job has a singular identify)
  • sort that specifies the kind of scripts that will likely be run, in our case we’re utilizing Python scripts
  • description that allows to explain the target of the duty
  • picture that specifies to which Docker photographs the script will likely be related to
  • supply that signifies the trail the place the script is situated
  • cmd that enables to alias the execution command of the script
  • mounts that enables to mount inside quantity as outlined above in a folder
  • env_vars that specifies the surroundings variables we need to provision to our photographs.

Every job is run by an Airflow DAG in a definite pod. In our case all of them share the identical Docker picture, declared within the picture subject, and the identical quantity specified within the mounts subject.

identify: GettingStartedPipeline
volumes:
  - quantity: knowledge
    native:
      path: .
pipelines:
  - pipeline: getting_started_pipeline
    proprietor: Aargan
    start_date: 1970-01-01
    timeout_minutes: 10
    schedule: 0 * 1 * *
    default_array_loaded: [2, 3, 4]
    default_object_loaded:
      key1: val1
      key2: val2
    metrics:
      namespace: TestNamespace
      backends: [ ]
    duties:
      - job: load_data
        sort: python
        description: Load Dataset
        picture: python_hello_world_example_image
        supply: pythonscript
        mounts:
          - mount: mymount
            quantity: knowledge
            path: /mnt/knowledge
        cmd: python -u obtain.py
        env_vars:
          url: "https://uncooked.githubusercontent.com/mlflow/mlflow/grasp/examples/sklearn_elasticnet_wine/wine-quality.csv"
      - job: training_model
        sort: python
        description: coaching mannequin
        picture: python_hello_world_example_image
        supply: pythonscript
        mounts:
          - mount: mymount
            quantity: knowledge
            path: /mnt/knowledge
        cmd: python -u wine_linear_regression.py
      - job: validation_model
        sort: python
        description: validation mannequin
        picture: python_hello_world_example_image
        supply: pythonscript
        mounts:
          - mount: mymount
            quantity: knowledge
            path: /mnt/knowledge
        cmd: python -u validation.py

Run Apache Liminal

Now, let’s deploy our pipeline utilizing the next instructions:

liminal construct
liminal deploy --clean
liminal begin

Apache Liminal is began. The Apache Airflow UI is accessible on the following tackle: http://127.0.0.1:8080


img

Simply activate the DAG and the pipeline will likely be triggered robotically.


img

We comply with our DAG and entry the logs by the Tree View (see our article Introducing Apache Airflow on AWS should you want to higher perceive Apache Airflow functionalities).


img

As soon as the pipeline is absolutely executed and terminated we cease our Liminal server utilizing the command:

Conclusion

Apache Liminal proposes to simplify the creation of end-to-end Machine Studying pipelines. We expect the initiative is a hit. Certainly one YAML file means that you can coherently describe the execution of your totally different Machine Studying pipelines.

Moreover, leveraging Kubernetes let the consumer deploy its pipelines in distant clusters. You connect with your distant cluster utilizing the command:


kubectl config set-context <your distant kubernetes cluster>

Lastly using declarative YAML information presents the benefit of automating your Machine Studying pipeline in your CI/CD pipelines as a way to model, publish and function your fashions.

References