#| code_folding: [0]
# more imports
import logging
import numpy as np
import os
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Dense
from tensorflow.keras.layers import Flatten, Layer, Reshape, InputLayer
from tensorflow.keras.regularizers import l1
from alibi_detect.od import OutlierVAE
from alibi_detect.utils.fetching import fetch_detector
from alibi_detect.utils.perturbation import apply_mask
from alibi_detect.saving import save_detector, load_detector
from alibi_detect.utils.visualize import plot_instance_score, plot_feature_outlier_image
logger = tf.get_logger()
logger.setLevel(logging.ERROR)
Load detector or train from scratch
The pretrained outlier and adversarial detectors used in the notebook can be found here. You can use the built-in fetch_detector function which saves the pre-trained models in a local directory filepath and loads the detector. Alternatively, you can train a detector from scratch:
load_pretrained = True
#| scrolled: true
filepath = os.path.join(os.getcwd(), 'outlier')
detector_type = 'outlier'
dataset = 'cifar10'
detector_name = 'OutlierVAE'
filepath = os.path.join(filepath, detector_name)
if load_pretrained: # load pre-trained detector
od = fetch_detector(filepath, detector_type, dataset, detector_name)
else: # define model, initialize, train and save outlier detector
# define encoder and decoder networks
latent_dim = 1024
encoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(32, 32, 3)),
Conv2D(64, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2D(128, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2D(512, 4, strides=2, padding='same', activation=tf.nn.relu)
]
)
decoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(latent_dim,)),
Dense(4*4*128),
Reshape(target_shape=(4, 4, 128)),
Conv2DTranspose(256, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2DTranspose(64, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2DTranspose(3, 4, strides=2, padding='same', activation='sigmoid')
]
)
# initialize outlier detector
od = OutlierVAE(
threshold=.015, # threshold for outlier score
encoder_net=encoder_net, # can also pass VAE model instead
decoder_net=decoder_net, # of separate encoder and decoder
latent_dim=latent_dim
)
# train
od.fit(X_train, epochs=50, verbose=False)
# save the trained outlier detector
save_detector(od, filepath)
Let's check whether the model manages to reconstruct the in-distribution training data:
Finding good threshold values can be tricky since they are typically not easy to interpret. The infer_threshold method helps finding a sensible value. We need to pass a batch of instances X and specify what percentage of those we consider to be normal via threshold_perc.
print('Current threshold: {}'.format(od.threshold))
od.infer_threshold(X_train, threshold_perc=99, batch_size=128) # assume 1% of the training data are outliers
print('New threshold: {}'.format(od.threshold))
Create and detect outliers
We can create some outliers by applying a random noise mask to the original instances:
#| code_folding: [0]
# check if outlier and visualize outlier scores
labels = ['No!', 'Yes!']
print(f"Is original outlier? {labels[preds['data']['is_outlier'][1]]}")
print(f"Is perturbed outlier? {labels[preds['data']['is_outlier'][0]]}")
plot_feature_outlier_image(preds, sample, x_recon, max_instances=1)
Deploy the detector
For this example we use the open source deployment platform Seldon Core and eventing based project Knative which allows serverless components to be connected to event streams. The Seldon Core payload logger sends events containing model requests to the Knative broker which can farm these out to serverless components such as the outlier, drift or adversarial detection modules. Further eventing components can be added to feed off events produced by these components to send onwards to, for example, alerting or storage modules. This happens asynchronously.
We already configured a cluster on DigitalOcean with Seldon Core installed. The configuration steps to set everything up from scratch are detailed in this example notebook.
First we get the IP address of the Istio Ingress Gateway. This assumes Istio is installed with a LoadBalancer.
CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)
Let's check the message dumper for the output of the outlier detector:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data = []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Outlier?",labels[j["data"]["is_outlier"]==[1]])
We then make a prediction on the perturbed instance:
show(x_mask)
predict(x_mask)
Although the prediction is still correct, the instance is clearly an outlier:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Outlier?",labels[j["data"]["is_outlier"]==[1]])
3. Adversarial detection by matching prediction probabilities
Method
The adversarial detector is based on Adversarial Detection and Correction by Matching Prediction Distributions. Usually, autoencoders are trained to find a transformation $T$ that reconstructs the input instance $x$ as accurately as possible with loss functions that are suited to capture the similarities between x and $x'$ such as the mean squared reconstruction error. The novelty of the adversarial autoencoder (AE) detector relies on the use of a classification model-dependent loss function based on a distance metric in the output space of the model to train the autoencoder network. Given a classification model $M$ we optimise the weights of the autoencoder such that the KL-divergence between the model predictions on $x$ and on $x'$ is minimised. Without the presence of a reconstruction loss term $x'$ simply tries to make sure that the prediction probabilities $M(x')$ and $M(x)$ match without caring about the proximity of $x'$ to $x$. As a result, $x'$ is allowed to live in different areas of the input feature space than $x$ with different decision boundary shapes with respect to the model $M$. The carefully crafted adversarial perturbation which is effective around x does not transfer to the new location of $x'$ in the feature space, and the attack is therefore neutralised. Training of the autoencoder is unsupervised since we only need access to the model prediction probabilities and the normal training instances. We do not require any knowledge about the underlying adversarial attack and the classifier weights are frozen during training.
The detector can be used as follows:
An adversarial score $S$ is computed. $S$ equals the K-L divergence between the model predictions on $x$ and $x'$.
If $S$ is above a threshold (explicitly defined or inferred from training data), the instance is flagged as adversarial.
For adversarial instances, the model $M$ uses the reconstructed instance $x'$ to make a prediction. If the adversarial score is below the threshold, the model makes a prediction on the original instance $x$.
This procedure is illustrated in the diagram below:
The method is very flexible and can also be used to detect common data corruptions and perturbations which negatively impact the model performance.
#| code_folding: [0]
# more imports
from sklearn.metrics import roc_curve, auc
from alibi_detect.ad import AdversarialAE
from alibi_detect.datasets import fetch_attack
from alibi_detect.utils.fetching import fetch_tf_model
from alibi_detect.utils.tensorflow import predict_batch
We investigate both Carlini-Wagner (C&W) and SLIDE attacks. You can simply load previously found adversarial instances on the pretrained ResNet-56 model. The attacks are generated by using Foolbox:
The detector first reconstructs the input instances which can be adversarial. The reconstructed input is then fed to the classifier to compute the adversarial score. If the score is above a threshold, the instance is classified as adversarial and the detector tries to correct the attack. Let's investigate what happens when we reconstruct attacked instances and make predictions on them:
The detector restores the accuracy after the attacks from almost $0$% to well over $80$%! We can compute the adversarial scores and inspect some of the reconstructed instances:
The threshold for the adversarial score can be set via infer_threshold. We need to pass a batch of instances $X$ and specify what percentage of those we consider to be normal via threshold_perc. Assume we have only normal instances some of which the model has misclassified leading to a higher score if the reconstruction picked up features from the correct class or some might look adversarial in the first place. As a result, we set our threshold at $95$%:
The correct method of the detector executes the diagram in Figure 1. First the adversarial scores is computed. For instances where the score is above the threshold, the classifier prediction on the reconstructed instance is returned. Otherwise the original prediction is kept. The method returns a dictionary containing the metadata of the detector, whether the instances in the batch are adversarial (above the threshold) or not, the classifier predictions using the correction mechanism and both the original and reconstructed predictions. Let's illustrate this on a batch containing some adversarial (C&W) and original test set instances:
There are a few other tricks highlighted in the paper (temperature scaling and hidden layer K-L divergence) and implemented in Alibi Detect which can further boost the adversarial detector's performance. Check this example notebook for more details.
4. Drift detection with Kolmogorov-Smirnov
Method
The drift detector applies feature-wise two-sample Kolmogorov-Smirnov (K-S) tests. For multivariate data, the obtained p-values for each feature are aggregated either via the Bonferroni or the False Discovery Rate (FDR) correction. The Bonferroni correction is more conservative and controls for the probability of at least one false positive. The FDR correction on the other hand allows for an expected fraction of false positives to occur.
For high-dimensional data, we typically want to reduce the dimensionality before computing the feature-wise univariate K-S tests and aggregating those via the chosen correction method. Following suggestions in Failing Loudly: An Empirical Study of Methods for Detecting Dataset Shift, we incorporate Untrained AutoEncoders (UAE), black-box shift detection using the classifier's softmax outputs (BBSDs) and PCA as out-of-the box preprocessing methods. Preprocessing methods which do not rely on the classifier will usually pick up drift in the input data, while BBSDs focuses on label shift. The adversarial detector which is part of the library can also be transformed into a drift detector picking up drift that reduces the performance of the classification model. We can therefore combine different preprocessing techniques to figure out if there is drift which hurts the model performance, and whether this drift can be classified as input drift or label shift.
We will use the CIFAR-10-C dataset (Hendrycks & Dietterich, 2019) to evaluate the drift detector. The instances in CIFAR-10-C come from the test set in CIFAR-10 but have been corrupted and perturbed by various types of noise, blur, brightness etc. at different levels of severity, leading to a gradual decline in the classification model performance. We also check for drift against the original test set with class imbalances.
#| code_folding: [0]
# yet again import stuff
from alibi_detect.cd import KSDrift
from alibi_detect.cd.preprocess import UAE, HiddenOutput
from alibi_detect.datasets import fetch_cifar10c, corruption_types_cifar10c
We can select from the following corruption types at 5 severity levels:
We split the original test set in a reference dataset and a dataset which should not be rejected under the H0 of the K-S test. We also split the corrupted data by corruption type:
Given the drop in performance, it is important that we detect the harmful data drift!
Detect drift
We are trying to detect data drift on high-dimensional (32x32x3) data using an aggregation of univariate K-S tests. It therefore makes sense to apply dimensionality reduction first. Some dimensionality reduction methods also used in Failing Loudly: An Empirical Study of Methods for Detecting Dataset Shift are readily available: UAE (Untrained AutoEncoder), BBSDs (black-box shift detection using the classifier's softmax outputs) and PCA (using scikit-learn).
Untrained AutoEncoder
First we try UAE:
tf.random.set_seed(0)
# define encoder
encoding_dim = 32
encoder_net = tf.keras.Sequential(
[
InputLayer(input_shape=(32, 32, 3)),
Conv2D(64, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2D(128, 4, strides=2, padding='same', activation=tf.nn.relu),
Conv2D(512, 4, strides=2, padding='same', activation=tf.nn.relu),
Flatten(),
Dense(encoding_dim,)
]
)
uae = UAE(encoder_net=encoder_net)
preprocess_kwargs = {'model': uae, 'batch_size': 128}
# initialise drift detector
p_val = .05
cd = KSDrift(
p_val=p_val, # p-value for K-S test
X_ref=X_ref, # test against original test set
preprocess_kwargs=preprocess_kwargs
)
Let's check whether the detector thinks drift occurred within the original test set:
As expected, no drift occurred. We can also inspect the feature-wise K-S statistics, threshold value and p-values for each univariate K-S test by (encoded) feature before the multivariate correction. Most of them are well above the $0.05$ threshold:
Here we use the output of the softmax layer to detect the drift, but other hidden layers can be extracted as well by setting 'layer' to the index of the desired hidden layer in the model:
# use output softmax layer
preprocess_kwargs = {'model': HiddenOutput(model=clf, layer=-1), 'batch_size': 128}
cd = KSDrift(
p_val=p_val,
X_ref=X_ref,
preprocess_kwargs=preprocess_kwargs
)
There is again no drift on the original held out test set:
#| code_folding: []
for x, c in zip(X_c, corruption):
preds = cd.predict(x)
print(f'Corruption type: {c}')
print('Drift? {}'.format(labels[preds['data']['is_drift']]))
print('Feature-wise p-values:')
print(preds['data']['p_val'])
print('')
For more functionality and examples, such as updating the reference data with reservoir sampling or picking another multivariate correction mechanism, check out this example notebook.
Leveraging the adversarial detector for malicious drift detection
While monitoring covariate and predicted label shift is all very interesting and exciting, at the end of the day we are mainly interested in whether the drift actually hurt the model performance significantly. To this end, we can leverage the adversarial detector and measure univiariate drift on the adversarial scores!
Make drift predictions on the original test set and corrupted data:
#| code_folding: [0]
# evaluate classifier on different datasets
clf_accuracy['h0'] = clf.evaluate(X_h0, y_h0, batch_size=128, verbose=0)[1]
preds_h0 = cd.predict(X_h0)
print('H0: Accuracy {:.4f} -- Drift? {}'.format(
clf_accuracy['h0'], labels[preds_h0['data']['is_drift']]))
for x, c in zip(X_c, corruption):
preds = cd.predict(x)
print('{}: Accuracy {:.4f} -- Drift? {}'.format(
c, clf_accuracy[c],labels[preds['data']['is_drift']]))
We can therefore use the scores of the detector itself to quantify the harmfulness of the drift! We can generalise this to all the corruptions at each severity level in CIFAR-10-C.
On the plot below we show the mean values and standard deviations of the adversarial scores per severity level. The plot shows the mean adversarial scores (lhs) and ResNet-32 accuracies (rhs) for increasing data corruption severity levels. Level 0 corresponds to the original test set. Harmful scores are scores from instances which have been flipped from the correct to an incorrect prediction because of the corruption. Not harmful means that the prediction was unchanged after the corruption. The chart can be reproduced in this notebook.
Deploy
We can deploy the drift detector in a similar fashion as the outlier detector. For a more detailed step-by-step overview of the deployment process, check this notebook.
The deployed drift detector accumulates requests until a predefined drift_batch_size is reached, in our case $5000$ which is defined in the yaml for the deployment and set in the drift detector wrapper. After $5000$ instances, the batch is cleared and fills up again.
from tqdm.notebook import tqdm
drift_batch_size = 5000
# accumulate batches
for i in tqdm(range(0, drift_batch_size, 100)):
x = X_h0[i:i+100]
predict(x)
# check message dumper
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper-drift -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Drift?", labels[j["data"]["is_drift"]==1])
We now run the same test on some corrupted data:
c = 0
print(f'Corruption: {corruption[c]}')
# accumulate batches
for i in tqdm(range(0, drift_batch_size, 100)):
x = X_c[c][i:i+100]
predict(x)
# check message dumper
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper-drift -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Drift?", labels[j["data"]["is_drift"]==1])