# Anomaly Detection on Data Streams in a few lines of Bash

Let’s assume that we have a stream of data points, and we want to detect outliers i.e. points that deviate from the norm. This is interesting because usually such data are linked to faulty or malicious behavior.

The fact that our data points are available as a stream means that we cannot “look” at them more than once. Therefore we need an algorithm with complexity linear to the number of points of the stream.

In this article, we’ll build an algorithm for this task and apply it to detect anomalous behavior in the KDD Cup ‘99 dataset which contains data from network connections to build a network intrusion detector.

We will model our data using a simple Gaussian-distribution model.

I thought it would be fun to implement it in one line of Bash. It turned out that squeezing everything in a huge line is not so fun, so I expanded it in a few more lines 🙃.

### Gaussian model to detect anomalies

Without further ado, let’s begin with some theory. We start by assuming that our data are following the Gaussian distribution.

Therefore, given an instance $$x \sim \mathcal{N}(\mu, \Sigma), x \in \mathbb{R}^d, x=[x_1, x_2, \ldots, x_d]^T$$, its probability density function is:

$p(x; \mu, \Sigma)= \frac{1}{(2\pi)^{d/2}|\Sigma|^{\frac{1}{2}}}\exp\bigg(-\frac{1}{2}(x-\mu)^T\Sigma^{-1}(x-\mu)\bigg)$

where $$\mu \in \mathbb{R}^d$$ is the mean and $$\Sigma \in \mathbb{R}^{d \times d}$$ is the covariance matrix.

Now assuming that variables $$x_i \sim \mathcal{N}(\mu_i, \sigma_i^{2})$$ are all independent, we get:

$p(x) = p(x_1, x_2, \ldots, x_d) = p(x_1; \mu_1, \sigma_1^2)p(x_2; \mu_2, \sigma_2^2)\ldots p(x_d; \mu_d, \sigma_d^2) = \prod_{j=1}^{d} p(x_j; \mu_j, \sigma_j^2)$ where:

$p(x; \mu, \sigma^2)=\frac{1}{\sqrt{2\pi}\sigma} \exp\Big(- \frac{(x-\mu)^2}{2\sigma^2}\Big)$

To train the model (which basically consists of the values $$\mu_i, \sigma_i^2, \forall i \in [1, \ldots, d]$$), one needs to calculate the following parameters (MLE):

• $$\mu_i = \frac{1}{n} \sum_{j=1}^{n}{x_i^{(j)}}$$, where $$n$$ is the number of training examples (the size of the training dataset),
• $$\sigma_i^2 = \frac{1}{n}{\sum_{j=1}^{n}{(x_i^{(j)} - \mu_i)^2}}$$

Then, in the evaluation phase, given a new example $$x$$, we compute:

$p(x) = \prod_{i=1}^{d}{ \frac{1}{\sqrt{2\pi}\sigma_i} \exp\Big(- \frac{(x_i-\mu_i)^2}{2\sigma_i^2}\Big) }$ and we flag $$x$$ as anomaly if the value of $$p(x)$$ is smaller than a threshold value $$\epsilon$$ (hyperparameter).

Now here is the imporant part. How will we compute these values incrementally?

By having the tuples $$T_i = \big( \sum_{j=1}^{n}{x_i^{(j)}}, ~ \sum_{j=1}^{n}{{(x_i^{(j)})}^2}, ~ n \big)$$, where $$n$$ is the count of the instances, available at any point of the computation! This way, when training or evaluating, in batch or on stream, we have access to all parameters of the model $$\mu_i$$ and $$\sigma_i^2$$ at any time by calculating:

$\mu_i = \frac{\sum_{j=1}^{n}{x_i^{(j)}}}{n} = \frac{T_i[0]}{T_i[2]}, ~~~~~ \sigma_i^2 = \frac{\sum_{j=1}^{n}{{(x_i^{(j)})}^2}}{n} - \mu_i^2 = \frac{T_i[1]}{T_i[2]} - (\frac{T_i[0]}{T_i[2]})^2$

So let’s go ahead and fetch the dataset, then implement the algorithm with awk.

### Fetch and prepare the dataset

The dataset is available here. I will be using the 10 percent version, and only the HTTP protocol.

We’ll use curl for the GET request, pipe it into zcat (because it is gzipped), and then use awk to keep only the HTTP rows, and filter out most features, keeping only the 4 most important columns: duration, src_bytes and dst_bytes, plus the label. We also edit the label to be 1 in case of anomaly, or 0, in case of normal observation.

URL=http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz # 10 percent
OUTFILE=http.csv

curl $URL | zcat | awk 'BEGIN{ FS=","; OFS="," } { LABEL=($NF=="normal." ? 0 : 1); if($3 == "http") print$1, $5,$6, LABEL }' > $OUTFILE  So now our dataset with 64293 rows is in the http.csv file. ### Implementation We’ll put our code in a file called gauss.awk. Start by defining some variables: BEGIN { FS = ","; # < NF to ignore the last column (label) for (i = 1; i < NF; i++) { SUMS[i] = 0; SQUARES[i] = 0; } }  FS is a special variable used by awk, denoting the input field separator. Since we have a csv, we use commas. SUMS and SQUARES are two arrays, with the one holding the cumulative sum of the values of each feature, and the other holding the cumulative sum of the squares of these values. Next we move on to the main awk loop, and write the code which will apply to every row of the input file. Here we will first update the model, by updating the SUMS and SQUARES arrays. For each feature (or column, call it however you like), we update the respective value using the current row that awk is parsing. After that, we use the current row and the model (the arrays), to calculate an anomaly score, according to the formula we discussed earlier (see $$p(x)$$). To get an accurate value of $$\pi$$ for the calculations, I am using atan2(0, -1). { # Update the model for (i = 1; i < NF; i++) { SUMS[i] +=$i;
SQUARES[i] += $i ** 2; } # Calculate score using the model score = 1; for (i = 1; i < NF; i++) { mu = SUMS[i] / NR; sigma2 = (SQUARES[i] / NR) - mu ** 2; if (sigma2 == 0) score = 0; else { score *= exp(- (($i - mu) ** 2) / (2 * sigma2)) / sqrt(2 * atan2(0, -1) * sigma2);
}
}

# Output the score
print score, $NF }  Finally, we output the score, along with the actual label. Now let’s run this: $ awk -f gauss.awk http.csv
...
2.01308e-12 0
2.02266e-12 0
2.04232e-12 0
2.04342e-12 0
2.0474e-12 0
2.02895e-12 0
2.0324e-12 0
2.03025e-12 0


Looks good. Some warnings on stderr about the exp() function are normal, it is because some arguments passed in exp are < -745 and therefore return incredibly small values but awk handles them as 0 so we’re good.

Now I want to calculate the ROC AUC score, and for that, I will be using a simple python script with the numpy module, and the roc_auc_score function from the scikit-learn module:

import numpy as np
from sklearn.metrics import roc_auc_score
from sys import stdin

preds = []
actuals = []

for line in stdin:
s = line.split(' ')
preds.append(-float(s[0]))
actuals.append(int(s[1]))

preds_arr = np.array(preds)
actuals_arr = np.array(actuals)

print(roc_auc_score(actuals_arr, preds_arr))


Now I can pipe the output of awk into the script, which I’ll call auroc.py, like this:

$awk -f gauss.awk http.csv | python3 auroc.py  , and this prints the score, which is 0.9762313543238387, a pretty good value. Last but not least, we can even choose a threshold and start flagging the values. This is what an anomaly detector actually does. What’s the intuition behind this? Since the scores are nothing more than the probability of the observation in the Gaussian distribution (the model), we will be flagging the value as anomaly, if this score is lower than a particular threshold, meaning that it is having a low probability of occurring. A good threshold here could be 9.83056225277626e-15 (found with some interpolation), so we can just change the last lines of the script to: if (score < 9.83056225277626e-15) print 1,$NF
else
print 0, $NF  , so that it prints 1 if the score is lower than the threshold, or 0 if is not, along with the actual label. You can now calculate the F1 score if you want with some greps of the output. ### Putting it all together Now let’s glue everything together in one line: $ curl -s $URL | zcat | awk 'BEGIN{ FS=","; OFS="," } { LABEL=($NF=="normal." ? 0 : 1); if($3 == "http") print$1, $5,$6, LABEL }' | awk -f gauss.awk 2> /dev/null | python3 auroc.py
0.9762313543238387


I got rid of the http.csv file and piped the output of the first awk directly into the second one, and I also redirected the stderr of the second awk to /dev/null to silence the warnings, so that we can get a clean output in the terminal.

So there you go, one pass on the data with a few lines of awk are enough to handle many cases of spotting outliers in datasets.

### Sidenote: Big Data Frameworks

This algorithm can be implemented in any Big Data framework (Apache Spark, Hadoop, Flink, whatever…) in a very straightforward way, with the functional operators map and reduce. Assume that our training data are $$n$$ d-dimensional vectors $$x^{(j)} = [x_1^{(j)}, x_2^{(j)}, \ldots, x_d^{(j)}]^T, j \in [1, \ldots, n]$$. Then the values of vectors $$\mu$$ and $$\sigma^2$$ can be computed by:

// Scala
dataSet
.map { x => (x, pow(x, 2), 1) }
.reduce { (x1, x2) => (x1._1 + x2._1, x1._2 + x2._2, x1._3 + x2._3) }
.collect()