credict_card_detection.py
# %%
# Importing modules
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib import gridspec
# %%
#read the dataset
dataset = pd.read_csv("creditcard.csv")
# read the first 5 and last 5 rows of the data
dataset.head().append(dataset.tail())
# %%
# check for relative proportion
print("Fraudulent Cases: " + str(len(dataset[dataset["Class"] == 1])))
print("Valid Transactions: " + str(len(dataset[dataset["Class"] == 0])))
print("Proportion of Fraudulent Cases: " + str(len(dataset[dataset["Class"] == 1])/ dataset.shape[0]))
# To see how small are the number of Fraud transactions
data_p = dataset.copy()
data_p[" "] = np.where(data_p["Class"] == 1 , "Fraud", "Genuine")
# plot a pie chart
data_p[" "].value_counts().plot(kind="pie")
# %%
# plot the named features
f, axes = plt.subplots(1, 2, figsize=(18,4), sharex = True)
amount_value = dataset['Amount'].values # values
time_value = dataset['Time'].values # values
sns.distplot(amount_value, hist=False, color="m", kde_kws={"shade": True}, ax=axes[0]).set_title('Distribution of Amount')
sns.distplot(time_value, hist=False, color="m", kde_kws={"shade": True}, ax=axes[1]).set_title('Distribution of Time')
plt.show()
# %%
print("Average Amount in a Fraudulent Transaction: " + str(dataset[dataset["Class"] == 1]["Amount"].mean()))
print("Average Amount in a Valid Transaction: " + str(dataset[dataset["Class"] == 0]["Amount"].mean()))
# %%
print("Summary of the feature - Amount" + "\n-------------------------------")
print(dataset["Amount"].describe())
# %%
# Reorder the columns Amount, Time then the rest
data_plot = dataset.copy()
amount = data_plot['Amount']
data_plot.drop(labels=['Amount'], axis=1, inplace = True)
data_plot.insert(0, 'Amount', amount)
# Plot the distributions of the features
columns = data_plot.iloc[:,0:30].columns
plt.figure(figsize=(12,30*4))
grids = gridspec.GridSpec(30, 1)
for grid, index in enumerate(data_plot[columns]):
ax = plt.subplot(grids[grid])
sns.distplot(data_plot[index][data_plot.Class == 1], hist=False, kde_kws={"shade": True}, bins=50)
sns.distplot(data_plot[index][data_plot.Class == 0], hist=False, kde_kws={"shade": True}, bins=50)
ax.set_xlabel("")
ax.set_title("Distribution of Column: " + str(index))
plt.show()
# %%
# check for null values
dataset.isnull().shape[0]
print("Non-missing values: " + str(dataset.isnull().shape[0]))
print("Missing values: " + str(dataset.shape[0] - dataset.isnull().shape[0]))
# %%
from sklearn.preprocessing import RobustScaler
scaler = RobustScaler().fit(dataset[["Time", "Amount"]])
dataset[["Time", "Amount"]] = scaler.transform(dataset[["Time", "Amount"]])
dataset.head().append(dataset.tail())
# %%
# Separate response and features Undersampling before cross validation will lead to overfiting
y = dataset["Class"] # target
X = dataset.iloc[:,0:30]
# Use SKLEARN for the split
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size = 0.2, random_state = 42)
X_train.shape, X_test.shape, y_train.shape, y_test.shape
# %%
# Create the cross validation framework
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV, cross_val_score, RandomizedSearchCV
kf = StratifiedKFold(n_splits=5, random_state = None, shuffle = False)
# %%
# Import the imbalance Learn module
from imblearn.pipeline import make_pipeline ## Create a Pipeline using the provided estimators .
from imblearn.under_sampling import NearMiss ## perform Under-sampling based on NearMiss methods.
from imblearn.over_sampling import SMOTE ## PerformOver-sampling class that uses SMOTE.
# import the metrics
from sklearn.metrics import roc_curve, roc_auc_score, accuracy_score, recall_score, precision_score, f1_score
# Import the classifiers
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
# %%
# Using SKLEARN module for random forest
from sklearn.ensemble import RandomForestClassifier
# Fit and predict
rfc = RandomForestClassifier()
rfc.fit(X_train, y_train)
y_pred = rfc.predict(X_test)
# For the performance let's use some metrics from SKLEARN module
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
print("The accuracy is", accuracy_score(y_test, y_pred))
print("The precision is", precision_score(y_test, y_pred))
print("The recall is", recall_score(y_test, y_pred))
print("The F1 score is", f1_score(y_test, y_pred))
# %%
def get_model_best_estimator_and_metrics(estimator, params, kf=kf, X_train=X_train,
y_train=y_train, X_test=X_test,
y_test=y_test, is_grid_search=True,
sampling=NearMiss(), scoring="f1",
n_jobs=2):
if sampling is None:
# make the pipeline of only the estimator, just so the remaining code will work fine
pipeline = make_pipeline(estimator)
else:
# make the pipeline of over/undersampling and estimator
pipeline = make_pipeline(sampling, estimator)
# get the estimator name
estimator_name = estimator.__class__.__name__.lower()
# construct the parameters for grid/random search cv
new_params = {f'{estimator_name}__{key}': params[key] for key in params}
if is_grid_search:
# grid search instead of randomized search
search = GridSearchCV(pipeline, param_grid=new_params, cv=kf, scoring=scoring, return_train_score=True, n_jobs=n_jobs, verbose=2)
else:
# randomized search
search = RandomizedSearchCV(pipeline, param_distributions=new_params,
cv=kf, scoring=scoring, return_train_score=True,
n_jobs=n_jobs, verbose=1)
# fit the model
search.fit(X_train, y_train)
cv_score = cross_val_score(search, X_train, y_train, scoring=scoring, cv=kf)
# make predictions on the test data
y_pred = search.best_estimator_.named_steps[estimator_name].predict(X_test)
# calculate the metrics: recall, accuracy, F1 score, etc.
recall = recall_score(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
y_proba = search.best_estimator_.named_steps[estimator_name].predict_proba(X_test)[::, 1]
fpr, tpr, _ = roc_curve(y_test, y_proba)
auc = roc_auc_score(y_test, y_proba)
# return the best estimator along with the metrics
return {
"best_estimator": search.best_estimator_,
"estimator_name": estimator_name,
"cv_score": cv_score,
"recall": recall,
"accuracy": accuracy,
"f1_score": f1,
"fpr": fpr,
"tpr": tpr,
"auc": auc,
}
# %%
# Cumulatively create a table for the ROC curve
## Create the dataframe
res_table = pd.DataFrame(columns=['classifiers', 'fpr','tpr','auc'])
rfc_results = get_model_best_estimator_and_metrics(
estimator=RandomForestClassifier(),
params={
'n_estimators': [50, 100, 200],
'max_depth': [4, 6, 10, 12],
'random_state': [13]
},
sampling=None,
n_jobs=3,
)
res_table = res_table.append({'classifiers': rfc_results["estimator_name"],
'fpr': rfc_results["fpr"],
'tpr': rfc_results["tpr"],
'auc': rfc_results["auc"]
}, ignore_index=True)
# %%
print(f"==={rfc_results['estimator_name']}===")
print("Model:", rfc_results['best_estimator'])
print("Accuracy:", rfc_results['accuracy'])
print("Recall:", rfc_results['recall'])
print("F1 Score:", rfc_results['f1_score'])
# %%
logreg_us_results = get_model_best_estimator_and_metrics(
estimator=LogisticRegression(),
params={"penalty": ['l1', 'l2'],
'C': [ 0.01, 0.1, 1, 100],
'solver' : ['liblinear']},
sampling=NearMiss(),
n_jobs=3,
)
print(f"==={logreg_us_results['estimator_name']}===")
print("Model:", logreg_us_results['best_estimator'])
print("Accuracy:", logreg_us_results['accuracy'])
print("Recall:", logreg_us_results['recall'])
print("F1 Score:", logreg_us_results['f1_score'])
res_table = res_table.append({'classifiers': logreg_us_results["estimator_name"],
'fpr': logreg_us_results["fpr"],
'tpr': logreg_us_results["tpr"],
'auc': logreg_us_results["auc"]
}, ignore_index=True)
res_table
# %%
# Plot the ROC curve for undersampling
res_table.set_index('classifiers', inplace=True)
fig = plt.figure(figsize=(17,7))
for j in res_table.index:
plt.plot(res_table.loc[j]['fpr'],
res_table.loc[j]['tpr'],
label="{}, AUC={:.3f}".format(j, res_table.loc[j]['auc']))
plt.plot([0,1], [0,1], color='orange', linestyle='--')
plt.xticks(np.arange(0.0, 1.1, step=0.1))
plt.xlabel("Positive Rate(False)", fontsize=15)
plt.yticks(np.arange(0.0, 1.1, step=0.1))
plt.ylabel("Positive Rate(True)", fontsize=15)
plt.title('Analysis for Oversampling', fontweight='bold', fontsize=15)
plt.legend(prop={'size':13}, loc='lower right')
plt.show()
# %%
# Cumulatively create a table for the ROC curve
res_table = pd.DataFrame(columns=['classifiers', 'fpr','tpr','auc'])
lin_reg_os_results = get_model_best_estimator_and_metrics(
estimator=LogisticRegression(),
params={"penalty": ['l1', 'l2'], 'C': [ 0.01, 0.1, 1, 100, 100],
'solver' : ['liblinear']},
sampling=SMOTE(random_state=42),
scoring="f1",
is_grid_search=False,
n_jobs=2,
)
print(f"==={lin_reg_os_results['estimator_name']}===")
print("Model:", lin_reg_os_results['best_estimator'])
print("Accuracy:", lin_reg_os_results['accuracy'])
print("Recall:", lin_reg_os_results['recall'])
print("F1 Score:", lin_reg_os_results['f1_score'])
res_table = res_table.append({'classifiers': lin_reg_os_results["estimator_name"],
'fpr': lin_reg_os_results["fpr"],
'tpr': lin_reg_os_results["tpr"],
'auc': lin_reg_os_results["auc"]
}, ignore_index=True)
# %%
# boxplot for two example variables in the dataset
f, axes = plt.subplots(1, 2, figsize=(18,4), sharex = True)
variable1 = dataset["V1"]
variable2 = dataset["V2"]
sns.boxplot(variable1, color="m", ax=axes[0]).set_title('Boxplot for V1')
sns.boxplot(variable2, color="m", ax=axes[1]).set_title('Boxplot for V2')
plt.show()
# %%
# Find the IQR for all the feature variables
# Please note that we are keeping Class variable also in this evaluation, though we know using this method no observation
# be removed based on this variable.
quartile1 = dataset.quantile(0.25)
quartile3 = dataset.quantile(0.75)
IQR = quartile3 - quartile1
print(IQR)
# %%
# Remove the outliers
constant = 3
datavalid = dataset[~((dataset < (quartile1 - constant * IQR)) |(dataset > (quartile3 + constant * IQR))).any(axis=1)]
deletedrows = dataset.shape[0] - datavalid.shape[0]
print("We have removed " + str(deletedrows) + " rows from the data as outliers")