Disengagements are when an autonomous vehicle needs to hand control back over to a human driver due to some unexpected situation or error. These situations can be caused by anything from bad weather to malfunctioning sensors. The goal of model fitting for recurrent event data in disengagement events is to identify patterns and trends that may help us understand why these disengagements are happening, so we can improve the performance of our autonomous vehicles.
Now let’s get into the details. First, you need some data. You can either collect your own or use a publicly available dataset like the Udacity Self-Driving Car Dataset. This tutorial will be using the latter for simplicity.
Once you have your data, we’ll load it in and clean it up a bit:
# Import necessary libraries
import pandas as pd # Importing pandas library for data manipulation and analysis
from sklearn.metrics import roc_auc_score # Importing roc_auc_score function from sklearn library for calculating area under the curve
from scipy.stats import sem # Importing sem function from scipy library for calculating standard error of the mean
import matplotlib.pyplot as plt # Importing pyplot from matplotlib library for data visualization
import seaborn as sns # Importing seaborn library for advanced data visualization
# Load the data
df = pd.read_csv('disengagement_events.csv') # Reading the csv file and storing it in a pandas dataframe called 'df'
# Clean up the data (remove missing values, convert to datetime)
df['timestamp'] = pd.to_datetime(df['timestamp']) # Converting the 'timestamp' column to datetime format
df = df[~df['sensor_fusion'].isnull()] # Removing rows with missing values in the 'sensor_fusion' column
Next, we’ll create a function that will split our data into training and testing sets:
# Function to split data into training and testing sets
def train_test_split(data):
# Split the data into training (80%) and testing (20%) sets
train = data.sample(frac=0.8, random_state=42) # Use the sample function to randomly select 80% of the data and assign it to the train variable
test = data[~data.index.isin(train.index)] # Use the ~ operator to select the remaining 20% of the data and assign it to the test variable
return train, test # Return the train and test sets
# Example usage
train, test = train_test_split(data) # Call the function and assign the returned train and test sets to variables for later use
Now we’re ready to fit our model! We’ll be using a recurrent neural network (RNN), specifically the Long Short-Term Memory (LSTM) variant. Here’s how you can do that:
# Import necessary libraries
from sklearn.metrics import roc_auc_score, confusion_matrix # Importing libraries for evaluation
from sklearn.model_selection import train_test_split # Importing library for splitting data into training and testing sets
from keras.models import Sequential # Importing library for creating a sequential model
from keras.layers import LSTM, Dense # Importing libraries for adding LSTM and Dense layers
import numpy as np # Importing library for handling arrays
# Load the data and split into training and testing sets
train, test = train_test_split(df) # Splitting the data into training and testing sets
# Preprocess the data (convert to arrays of features and labels)
X = [] # Initializing an empty list for features
y = [] # Initializing an empty list for labels
for _, row in train.iterrows(): # Looping through each row in the training data
# Extract the sensor fusion values for the previous 10 seconds
window = df[(df['timestamp'] >= row['timestamp']) & (df['timestamp'] < row['timestamp'].values pd.Timedelta(seconds=10))] # Creating a window of data for the previous 10 seconds
X.append([np.mean(window['sensor_fusion'])]) # Adding the mean of sensor fusion values to the features list
y.append(int(row['disengagement'])) # Adding the disengagement value to the labels list
X = np.array(X) # Converting the features list into a numpy array
y = np.array(y) # Converting the labels list into a numpy array
# Define the model architecture (LSTM with one hidden layer and output layer)
model = Sequential() # Creating a sequential model
model.add(LSTM(10, input_shape=(X.shape[1], 1), return_sequences=True)) # Adding an LSTM layer with 10 units, input shape of (number of features, 1) and returning sequences
model.add(Dense(1, activation='sigmoid')) # Adding a Dense layer with 1 unit and sigmoid activation function
model.compile('adam', loss='binary_crossentropy') # Compiling the model with adam optimizer and binary crossentropy loss function
# Train the model on the training data
history = model.fit(X, y, epochs=50) # Fitting the model on the training data for 50 epochs
Now that our model is trained, we can use it to predict disengagements in new data:
# Load test data and preprocess it
test_data = pd.read_csv('new_disengagement_events.csv') # Load test data from csv file
X_test = [] # Initialize empty list for test data
y_test = [] # Initialize empty list for test labels
# Iterate through each row in test data
for _, row in test_data.iterrows():
# Create a time window of 10 seconds before and after the timestamp in the current row
window = df[(df['timestamp'] >= row['timestamp'] - pd.Timedelta(seconds=10)) & (df['timestamp'] < row['timestamp'] + pd.Timedelta(seconds=10))]
# Calculate the mean of the 'sensor_fusion' column in the time window and append it to X_test
X_test.append([np.mean(window['sensor_fusion'])])
# Convert X_test to a numpy array
X_test = np.array(X_test)
# Use the trained model to make predictions on the test data
y_pred = model.predict(X_test, verbose=0)[0] # Predict labels for test data
y_prob = y_pred > 0.5 # Convert predicted labels to probabilities by comparing them to a threshold of 0.5
Finally, we can evaluate our model’s performance using the ROC AUC score:
# Calculate the ROC AUC score on the test data
# Import necessary libraries
import numpy as np
from sklearn.metrics import roc_auc_score
# Define variables for the test data and predicted values
test_data = y[test.index] # y is the true labels for the test data
predicted_values = y_pred # y_pred is the predicted labels for the test data
# Calculate the ROC AUC score using the roc_auc_score function from sklearn
fpr, tpr, thresholds = roc_auc_score(test_data, predicted_values)
# Calculate the mean of the true positive rates for all thresholds where the false positive rate is less than 0.1
roc_auc = np.mean([tpr[i] for i in range(len(thresholds)) if fpr[i] < 0.1])
# Print the ROC AUC score rounded to 4 decimal places
print('ROC AUC score:', round(roc_auc, 4))
# The code above calculates the ROC AUC score for a model's performance on a test dataset.
# It first imports the necessary libraries, then defines the variables for the test data and predicted values.
# The roc_auc_score function is then used to calculate the ROC AUC score, which returns three arrays: false positive rates, true positive rates, and thresholds.
# The mean of the true positive rates is then calculated for all thresholds where the false positive rate is less than 0.1.
# Finally, the ROC AUC score is printed, rounded to 4 decimal places.
And that’s it! You now have a model that can predict disengagements based on sensor fusion data from autonomous vehicles. Of course, this is just one example of how you could use scikit-learn to fit models for recurrent event data in the context of autonomous vehicle disengagement events. There are many other techniques and libraries out there that may be more appropriate depending on your specific needs. But hopefully, this tutorial has given you a good starting point!