Apache Liminal: when MLOps meets GitOps

Apache Liminal is an open-source software program which proposes an answer to deploy end-to-end Machine Studying pipelines. Certainly it permits to centralize all of the steps wanted to assemble Machine Studying fashions, from knowledge cleansing to mannequin deployment.
This answer proposes a declarative strategy for MLOps tasks. The pipeline that encapsulates the totally different steps for the preparation, coaching and deployment of your Machine Studying is written in YAML.
This file, and the Python scripts that it factors to, are simply versioned utilizing instruments like Git, opening the door of a GitOps apply. GitOps describes an structure through which the system is reproducable from the state saved in a Git repository. Information engineers and knowledge scientists can then collaborate collectively to enhance the mannequin.
Apache Liminal leverages Apache Airflow, Docker and Kubernetes in oder to create and deploy our pipeline.
Set up
To breed all of the instructions discovered on this article, Apache Liminal requires to put in Docker and Kubernetes in your machine. Kubernetes will be put in with minikube.
If you’re on MacOS with Docker already put in, the simplest strategy is to activate Kubernetes by ticking the field labeled “Deploy Docker Stacks to Kubernetes by default” in Docker Desktop.
Subsequent you possibly can set up Apache Liminal utilizing pip
.
pip set up
git+https://github.com/apache/incubator-liminal.git
Creation of a Liminal pipeline
Creation of the Python scripts
Let’s begin by making a folder on the root of our venture listing to collect all mandatory Python scripts for our pipeline.
Inside we first create our necessities.txt
file for dependencies administration. Apache Liminal will use this file to put in all of the listed Python packages wanted to make sure the correct functioning of our scripts on Docker photographs. In our instance we’re going to use the next packages:
urllib3
pandas
numpy
tensorflow
scikit-learn
In our use-case the info preparation step will likely be primarily decreased to the obtain of the dataset. We’re going to use the wine-quality.csv file to coach our mannequin. As we are going to see later, these knowledge will likely be straight accessible from the pods.
We’re going to create a file named obtain.py
that can comprise all of the logic to obtain the file and clear the info:
import urllib3
import pandas as pd
import numpy as np
import os
PATH = "/mnt/knowledge/"
file_path = str(PATH) + "file.csv"
http = urllib3.PoolManager()
url = os.environ['url']
r = http.request('GET', url)
if os.path.exists(file_path):
os.take away(file_path)
else:
print("file not exist")
with open(file_path, 'xb') as f:
f.write(r.knowledge)
dataset = pd.read_csv(file_path)
for subject in dataset.columns:
if sort(dataset[field][0]) == np.int64 :
new_field = subject.exchange(' ', '_')
dataset = dataset.rename(columns=subject : new_field)
print('i - subject = ' + str(new_field))
elif sort(dataset[field][0]) == np.float64 :
new_field = subject.exchange(' ', '_')
dataset = dataset.rename(columns=subject : new_field)
print('f - subject = ' + str(new_field))
dataset.to_csv(file_path, index=False)
Right here we get the file utilizing an surroundings variable named url
which is outlined in our YAML script as adopted:
env_vars:
url: "https://uncooked.githubusercontent.com/mlflow/mlflow/grasp/examples/sklearn_elasticnet_wine/wine-quality.csv"
Subsequent we create a python script named wine_linear_regression.py
to coach our mannequin:
import os
import sys
import numpy as np
import pandas as pd
import tensorflow as tf
from six.strikes import urllib
from sklearn.model_selection import train_test_split
PATH = "/mnt/knowledge/"
path = str(PATH) + "file.csv"
dataset = pd.read_csv(path)
labels = dataset['quality'].tolist()
dataset = dataset.drop(["quality"], axis=1)
x_train, x_test, y_train, y_test = train_test_split(dataset,
labels,
train_size=0.9)
NUMERIC_COLUMNS = ['alcohol', 'chlorides', 'citric_acid', 'density', 'fixed_acidity',
'free_sulfur_dioxide', 'pH', 'residual_sugar', 'sulphates', 'total_sulfur_dioxide',
'volatile_acidity']
CATEGORICAL_COLUMNS = ['quality']
feature_columns = []
for feature_name in NUMERIC_COLUMNS:
feature_columns.append(tf.feature_column.numeric_column(feature_name, dtype=tf.float32))
def make_input_fn(data_df, label_df, num_epochs=10, shuffle=True, batch_size=32):
def input_function():
ds = tf.knowledge.Dataset.from_tensor_slices((dict(data_df), label_df))
if shuffle:
ds = ds.shuffle(1000)
ds = ds.batch(batch_size).repeat(num_epochs)
return ds
return input_function
train_input_fn = make_input_fn(x_train, y_train)
eval_input_fn = make_input_fn(x_test, y_test, num_epochs=1, shuffle=False)
linear_est = tf.estimator.LinearRegressor(
feature_columns=feature_columns,
model_dir=str(PATH) + "practice"
)
linear_est.practice(train_input_fn)
end result = linear_est.consider(eval_input_fn)
print("--> OUTPUT = " + str(end result))
def serving_input_receiver_fn():
inputs =
for feat in feature_columns:
inputs[feat.name] = tf.compat.v1.placeholder(form=[None], dtype=feat.dtype)
print("--> INPUTS = " + str(inputs))
return tf.estimator.export.ServingInputReceiver(inputs, inputs)
linear_est.export_saved_model(export_dir_base=str(PATH) + "mannequin", serving_input_receiver_fn=serving_input_receiver_fn)
Lastly we create a python script to match the efficacy of the final educated mannequin with the mannequin operating in manufacturing as a way to all the time maintain operating the very best mannequin. All of the code will likely be written in a file named validation.py
:
import pandas
import random
from pathlib import Path
import tensorflow as tf
import numpy as np
import sys
import os
PATH = "/mnt/knowledge/"
model_dir = str(PATH) + "mannequin"
subdirs = [x for x in Path(model_dir).iterdir()
if x.is_dir() and 'temp' not in str(x)]
newest = str(sorted(subdirs)[-1])
print("--> LATEST = " + str(newest))
model_prod_dir = str(PATH) + "model_prod"
if not os.path.exists(model_prod_dir):
os.makedirs(model_prod_dir)
subdirs_prod = [x for x in Path(model_prod_dir).iterdir()
if x.is_dir() and 'temp' not in str(x)]
if not subdirs_prod:
os.rename(newest, model_prod_dir + "/" + newest.break up("/")[-1])
sys.exit(0)
latest_prod = str(sorted(subdirs_prod)[-1])
print("--> PROD = " + str(latest_prod))
randomlist = []
df = pandas.read_csv(str(PATH) + 'file.csv')
nb_raw = len(df)
for i in vary(0, int((nb_raw/10))):
n = random.randint(0,nb_raw)
if n<nb_raw and n>=0:
randomlist.append(n)
else:
print(" _BAD_RANDOM_ ")
def build_predict(df, mannequin):
res = mannequin(chlorides=tf.fixed(df['chlorides'], dtype=tf.float32, form=1),
alcohol=tf.fixed(df['alcohol'], dtype=tf.float32, form=1),
citric_acid=tf.fixed(df['citric_acid'], dtype=tf.float32, form=1),
residual_sugar=tf.fixed(df['residual_sugar'], dtype=tf.float32, form=1),
total_sulfur_dioxide=tf.fixed(df['total_sulfur_dioxide'], dtype=tf.float32, form=1),
free_sulfur_dioxide=tf.fixed(df['free_sulfur_dioxide'], dtype=tf.float32, form=1),
pH=tf.fixed(df['pH'], dtype=tf.float32, form=1),
fixed_acidity=tf.fixed(df['fixed_acidity'], dtype=tf.float32, form=1),
sulphates=tf.fixed(df['sulphates'], dtype=tf.float32, form=1),
density=tf.fixed(df['density'], dtype=tf.float32, form=1),
volatile_acidity=tf.fixed(df['volatile_acidity'], dtype=tf.float32, form=1)
)
return res
mannequin = tf.saved_model.load(export_dir=str(newest)).signatures['predict']
model_prod = tf.saved_model.load(export_dir=str(latest_prod)).signatures['predict']
pred = []
pred_prod = []
score_train=0
score_prod=0
for x in randomlist:
worth = df.drop(["quality"], axis=1).iloc[x]
actual = df['quality'].iloc[x]
pred_train = spherical(np.array(build_predict(worth, mannequin)['predictions'])[0][0])
if actual == pred_train:
score_train += 1
pred_prod = spherical(np.array(build_predict(worth, model_prod)['predictions'])[0][0])
if actual == pred_prod:
score_prod += 1
print("score_train : " + str(score_train))
print("score_prod : " + str(score_prod))
if score_train > score_prod:
model_old_dir = str(PATH) + "model_old"
if not os.path.exists(model_old_dir):
os.makedirs(model_prod_dir)
os.rename(latest_prod, str(PATH) + "model_old/" + latest_prod.break up("/")[-1])
os.rename(newest, model_prod_dir + "/" + newest.break up("/")[-1])
Creation of the pipeline
Now we’re going to create a YAML file on the root of our venture listing named liminal.yml
. First let’s declare our mounting volumes. For that we create a Kubernetes quantity named knowledge
linked to the listing the place our liminal.yml
file is situated.
identify: GettingStartedPipeline
volumes:
- quantity: knowledge
native:
path: .
Subsequent we are going to construction and declare the ordering of our pipeline utilizing a duties
. A duties
consists of a number of job
and is characterised by:
job
that’s the identify of the duty (watch out every job has a singular identify)sort
that specifies the kind of scripts that will likely be run, in our case we’re utilizing Python scriptsdescription
that allows to explain the target of the dutypicture
that specifies to which Docker photographs the script will likely be related tosupply
that signifies the trail the place the script is situatedcmd
that enables to alias the execution command of the scriptmounts
that enables to mount inside quantity as outlined above in a folderenv_vars
that specifies the surroundings variables we need to provision to our photographs.
Every job is run by an Airflow DAG in a definite pod. In our case all of them share the identical Docker picture, declared within the picture
subject, and the identical quantity specified within the mounts
subject.
identify: GettingStartedPipeline
volumes:
- quantity: knowledge
native:
path: .
pipelines:
- pipeline: getting_started_pipeline
proprietor: Aargan
start_date: 1970-01-01
timeout_minutes: 10
schedule: 0 * 1 * *
default_array_loaded: [2, 3, 4]
default_object_loaded:
key1: val1
key2: val2
metrics:
namespace: TestNamespace
backends: [ ]
duties:
- job: load_data
sort: python
description: Load Dataset
picture: python_hello_world_example_image
supply: pythonscript
mounts:
- mount: mymount
quantity: knowledge
path: /mnt/knowledge
cmd: python -u obtain.py
env_vars:
url: "https://uncooked.githubusercontent.com/mlflow/mlflow/grasp/examples/sklearn_elasticnet_wine/wine-quality.csv"
- job: training_model
sort: python
description: coaching mannequin
picture: python_hello_world_example_image
supply: pythonscript
mounts:
- mount: mymount
quantity: knowledge
path: /mnt/knowledge
cmd: python -u wine_linear_regression.py
- job: validation_model
sort: python
description: validation mannequin
picture: python_hello_world_example_image
supply: pythonscript
mounts:
- mount: mymount
quantity: knowledge
path: /mnt/knowledge
cmd: python -u validation.py
Run Apache Liminal
Now, let’s deploy our pipeline utilizing the next instructions:
liminal construct
liminal deploy --clean
liminal begin
Apache Liminal is began. The Apache Airflow UI is accessible on the following tackle: http://127.0.0.1:8080
Simply activate the DAG and the pipeline will likely be triggered robotically.
We comply with our DAG and entry the logs by the Tree View
(see our article Introducing Apache Airflow on AWS should you want to higher perceive Apache Airflow functionalities).
As soon as the pipeline is absolutely executed and terminated we cease our Liminal server utilizing the command:
Conclusion
Apache Liminal proposes to simplify the creation of end-to-end Machine Studying pipelines. We expect the initiative is a hit. Certainly one YAML file means that you can coherently describe the execution of your totally different Machine Studying pipelines.
Moreover, leveraging Kubernetes let the consumer deploy its pipelines in distant clusters. You connect with your distant cluster utilizing the command:
kubectl config set-context <your distant kubernetes cluster>
Lastly using declarative YAML information presents the benefit of automating your Machine Studying pipeline in your CI/CD pipelines as a way to model, publish and function your fashions.