Anomaly detection is a very important task. At Imperva we use it for threat hunting, risk analysis, risk mitigation, trends detection and more.
In a previous post we showed how it can be done in a simple method by SQL. This time we wanted to use Prophet, which is an algorithm for forecasting time series data by Facebook.
Prophet has advantages over other simple methods, for example, it is configurable and supports seasonality detection. However, it requires using Python and consumes much more resources.
The anomaly detection is done by comparing predicted results to the actual past values, and by looking at the predicted future values.
To predict, Prophet uses a time series – which might be complex to calculate. The calculation may require grouping, ordering, aggregate functions and more functionalities – a job many times best suited for SQL. Let the database do what it does best.
In a previous post we showed how it is possible to detect anomalies by SQL. This time the database cannot complete the job and we will integrate it with Python code to run Propet.
The query engine calculates many sets of time series, which are then sent to Prophet to detect anomalies. The detection is done set by set, has a low memory footprint, and is mostly CPU bound – which makes it possible to run it on multiple CPUs in parallel and speed up the process.
Read on to learn how to benefit from both worlds: database for time series calculation, and a detection algorithm for prediction, and get to an efficient anomaly detection at scale.
Time series calculation
The time series calculation has two steps. First the values over time are calculated and saved as records in memory by the DB. Later data is aggregated so each record returned by the DB represents a single time series. Each record has two arrays, one for intervals and one for values.
You can see in the following example the first step of the WITH clause processed the values over time into records. Using the ARRAY_AGG aggregate function, values are aggregated into records. We also require at least 10 values and a minimum average of 100.
WITH aggregated_data AS (
SELECT day, my_field AS scope, SUM(my_value) AS value
FROM my_table
WHERE day >= DATE_ADD('day', -30, CURRENT_DATE)
GROUP BY day, scope
ORDER BY day, scope)
SELECT scope, ARRAY_AGG(day) AS days, ARRAY_AGG(value) AS vals
FROM aggregated_data
GROUP BY scope
HAVING COUNT() > 10
AND AVG(value) >= 100
Note: While many databases support the ARRAY_AGG function it is not a part of the SQL standard. You will have to check if you database supports such functionality
In case you want to work with a weekly interval you can use the following expressions:
-- expression to get the first day of week
DATE_ADD('day', -DAY_OF_WEEK(DATE(day)) + 1, day)
-- filter to get 25 weeks back
day BETWEEN DATE_ADD('WEEK',-25 -DAY_OF_WEEK(CURRENT_DATE) + 1, CURRENT_DATE)
AND DATE_ADD('DAY', -DAY_OF_WEEK(CURRENT_DATE), CURRENT_DATE)
The DB returns a scope, which can be for example, a customer id. It also returns arrays for the intervals (e.g., days, weeks etc.) and values. Data is ready to be sent to the prediction process.
Forecasting
Given a record with time series, we will first convert a record to the format Prophet gets and then do the prediction. Here are the methods used for the conversion and predictions:
from prophet import Prophet
def record_to_prophet_input(record):
pass # your implementation here - prophet gets a data frame with two columns: ds and y
def predict(data):
m = Prophet()
m.fit(data)
future = m.make_future_dataframe(periods=5)
return m.predict(future)
The prediction result returns a set of predicted values called yhat. It includes prediction of the time frame with the actual values and future values.
Anomaly Detection
Prophet returns a data frame with several columns. We will explain the ones we will refer to:
- yhat – predicted values
- yhat_upper – upper bound for a predicted value
- yhat_lower – lower bound for a predicted value
Here is a simple example for deciding whether a set of values has an anomaly. We go over the upper predicted values (called yhat_upper) and check if an upper value is 2 times or more the actual value:
def has_anomaly(yhat_upper_series, y_series) -> bool:
for idx, yhat_upper in enumerate(yhat_upper_series):
if y[i if i = 2 * yhat_upper:
return True
return False
It is also possible to go over the entire series and find all anomalies, or score the anomalies. We can detect anomalies based on a score, and compare anomalies based on their score. Here is a scoring function example:
def score_anomaly(yhat_upper, y_value) -> float:
return (yvalue - yhat_upper) / y_value
Both functions are only examples, and the anomaly detection logic changes from one use case to another.
This is what our main loop will look like. The SQL query runs and returns records:
import pandas as pd
for _, record in pd.read_sql(sql, connection):
data = record_to_prophet_input(record)
predictions = predict(data)
if has_anomaly(predictions["yhat"], data["y"]):
print(f"Detected an anomaly: {record}")
Here you can find an example where seasonality is detected – there are three detected peaks, in which the actual values (y) are much bigger than predicted values (yhat and yhat upper):
This is how we have an anomaly detection process. The time series calculation is scalable and runs inside the DB. The prediction process is CPU bound and takes a lot of time – it can take hours if you have thousands of predictions to make. In the next section, we will run this process on multiple CPUs in parallel and improve performance dramatically.
Running at scale
The calculation we showed above is easily scalable. One call of
has_anomaly
uses a subset of the time series and is not dependent in any way on the other calls.
In Python, the multiprocessing module is made to run multiple CPU bound tasks in parallel. We will use it to change our main loop from the previous section to support parallel processing.
Here are the steps we used for using multiprocessing:
- Go over the records, add tasks to a pool
- Save a mapping from a future object the pool returns to the db record
- Wait for each of the tasks to finish, and print the db records with the anomalies
We prefer sending small objects to the processes in the pool since the data has to be copied between processes – that’s why we use a mapping inside the main process.
from multiprocessing.pool import Pool
from multiprocessing import cpu_count
import pandas as pd
def detect(data):
predictions = predict(data)
if has_anomaly(predictions["yhat"], data["y"]):
return predictions
# Pool size will be the number of CPU the machine has
with Pool(cpu_count()) as pool:
# Go over the db records in chunks to avoid a large data frame in memory
for chunk_index, df in enumerate(pd.read_sql(sql, connection, chunksize=500)):
future_to_record = {}
for _, record in df.iterrows():
data = record_to_prophet_input(record)
# add task to pool and map future object to db record
future = pool.apply_async(detect, [data])
future_to_record[future] = record
for future in future_to_record:
if future.get():
print(f"Anomaly Detected: {future_to_record[future]}")
It is also possible to return the prediction data to the main process and keep it as part of the anomaly data.
For a large scale of records, running predictions in parallel is a must. Without it, the processing time may become unreasonable. Since data is often large, we strongly suggest implementing a scalable process from the beginning.
Takeaway conclusion
In this post we integrated two different methods to solve a problem in an efficient way. The DB has advantages in calculating the time series over Python code, but it cannot perform predictions. Python can perform predictions and allows multi-processing at scale. Using both the DB and Python multi-processing allows performing anomaly detection at scale.
On your next big data processing problem think if you can use the DB to improve performance. Even if it doesn’t solve the entire problem – you can integrate it with your code and run tasks in parallel to complete your job.
Try Imperva for Free
Protect your business for 30 days on Imperva.