{"cells":[{"metadata":{},"cell_type":"markdown","source":["# Predicting Loan Risk using SparkML on IBM Cloud Pak for Data (ICP4D)"]},{"metadata":{},"cell_type":"markdown","source":["We'll use this notebook to create a machine learning model to predict credit risk. In this notebook we will build the prediction model using the SparkML library.\n","\n","This notebook walks you through these steps:\n","\n","- Load and Visualize data set.\n","- Build a predictive model with SparkML API\n","- Save the model in the ML repository"]},{"metadata":{},"cell_type":"markdown","source":["## 1.0 Install required packages\n","\n","There are a couple of Python packages we will use in this notebook. First we make sure the Watson Machine Learning client v3 is removed (its not installed by default) and then install/upgrade the v4 version of the client (this package is installed by default on CP4D).\n","\n","WML Client: https://wml-api-pyclient-dev-v4.mybluemix.net/#repository"]},{"metadata":{"collapsed":true},"cell_type":"markdown","source":["### 1.1 Package Installation"]},{"metadata":{},"cell_type":"code","source":["import warnings\n","warnings.filterwarnings('ignore')"],"execution_count":null,"outputs":[]},{"metadata":{"scrolled":false},"cell_type":"code","source":["!pip uninstall watson-machine-learning-client -y | tail -n 1\n","!pip install --user watson-machine-learning-client-v4==1.0.95 --upgrade | tail -n 1\n","!pip install --user pyspark==2.3.3 --upgrade | tail -n 1"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["#### Action: restart the kernel!"]},{"metadata":{},"cell_type":"markdown","source":["### 1.2 Package Imports"]},{"metadata":{},"cell_type":"code","source":["import pandas as pd\n","import numpy as np\n","import json\n","import os\n","\n","# Import the Project Library to read/write project assets\n","from project_lib import Project\n","project = Project.access()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["## 2.0 Load and Clean data\n","\n","We'll load our data as a pandas data frame.\n","\n","**<< FOLLOW THE INSTRUCTIONS BELOW TO LOAD THE DATASET >>**\n","\n","* Highlight the cell below by clicking it.\n","* Click the `10/01` \"Find data\" icon in the upper right of the notebook.\n","* Add the locally uploaded file `german_credit_data.csv` by choosing the `Files` tab. Then choose the `german_credit_data.csv`. Click `Insert to code` and choose `Insert Pandas DataFrame`.\n","* The code to bring the data into the notebook environment and create a Pandas DataFrame will be added to the cell below.\n","* Run the cell\n"]},{"metadata":{},"cell_type":"code","source":["# Place cursor below and insert the Pandas DataFrame for the Credit Risk data\n","\n"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["We'll use the Pandas naming convention df for our DataFrame. Make sure that the cell below uses the name for the dataframe used above. For the locally uploaded file it should look like df_data_1 or df_data_2 or df_data_x. \n","**<< UPDATE THE VARIABLE ASSIGNMENT TO THE VARIABLE GENERATED ABOVE. >>**"]},{"metadata":{},"cell_type":"code","source":["# Replace df_data_1 with the variable name generated above.\n","df = df_data_1"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 2.1 Drop Some Features\n","Some columns are data attributes that we will not want to use in the machine learning model. We can drop those columns / features:\n","\n","- CustomerID feature (column)\n","- Personal Attributes: FIRSTNAME, LASTNAME, EMAIL, STREETADDRESS, CITY, STATE, POSTALCODE"]},{"metadata":{},"cell_type":"code","source":["#Drop some columns, ignoring errors for missing keys in case we use different data sets.\n","df = df.drop(columns=['CUSTOMERID', 'FIRSTNAME', 'LASTNAME', 'EMAIL', 'STREETADDRESS', 'CITY', 'STATE', 'POSTALCODE'\n","], axis=1, errors='ignore')\n","df.head(5)"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 2.2 Examine the data types of the features"]},{"metadata":{"scrolled":true},"cell_type":"code","source":["df.info()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["# Statistics for the columns (features). Set it to all, since default is to describe just the numeric features.\n","df.describe(include = 'all')"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["We see that the loan amounts range from 250 to ~11,600. That the age range for applicants is between 19 and 74. etc."]},{"metadata":{},"cell_type":"markdown","source":["### 2.3 Check for missing data\n","\n","We should check if there are missing values in our dataset. There are various ways we can address this issue:\n","\n","- Drop records with missing values \n","- Fill in the missing value with one of the following strategies: Zero, Mean of the values for the column, Random value, etc)."]},{"metadata":{},"cell_type":"code","source":["# Check if we have any NaN values and see which features have missing values that should be addressed\n","print(df.isnull().values.any())\n","df.isnull().sum()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["In this dataset there are no missing values, if there had been any missing values from the output above. The sample below would be one approach to handle this issue by imputing the values for the column that reported missing data:\n","\n","from sklearn.preprocessing import Imputer\n","target_idx = df.columns.get_loc(\"TotalCharges\")\n","imputer = Imputer(missing_values=\"NaN\", strategy=\"mean\")\n","df.iloc[:, target_idx] = imputer.fit_transform(df.iloc[:,target_idx].values.reshape(-1, 1))\n","df.iloc[:, target_idx] = pd.Series(df.iloc[:, target_idx])"]},{"metadata":{},"cell_type":"markdown","source":["### 2.4 Categorize Features\n","\n","We will categorize some of the columns / features based on wether they are categorical values or continuous (i.e numerical) values. We will use this in later sections to build visualizations."]},{"metadata":{},"cell_type":"code","source":["TARGET_LABEL_COLUMN_NAME = 'RISK'\n","columns_idx = np.s_[0:] # Slice of first row(header) with all columns.\n","first_record_idx = np.s_[0] # Index of first record\n","\n","string_fields = [type(fld) is str for fld in df.iloc[first_record_idx, columns_idx]] # All string fields\n","all_features = [x for x in df.columns if x != TARGET_LABEL_COLUMN_NAME]\n","categorical_columns = list(np.array(df.columns)[columns_idx][string_fields])\n","categorical_features = [x for x in categorical_columns if x != TARGET_LABEL_COLUMN_NAME]\n","continuous_features = [x for x in all_features if x not in categorical_features]\n","\n","print('All Features: ', all_features)\n","print('\\nCategorical Features: ', categorical_features)\n","print('\\nContinuous Features: ', continuous_features)\n","print('\\nAll Categorical Columns: ', categorical_columns)"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 2.5 Visualize data\n","\n","Data visualization can be used to find patterns, detect outliers, understand distribution and more. We can use graphs such as:\n","\n","- Histograms, boxplots, etc: To find distribution / spread of our continuous variables.\n","- Bar charts: To show frequency in categorical values.\n"]},{"metadata":{},"cell_type":"code","source":["import seaborn as sns\n","import matplotlib.pyplot as plt\n","\n","from sklearn.preprocessing import LabelEncoder\n","\n","%matplotlib inline\n","sns.set(style=\"darkgrid\")\n","sns.set_palette(\"hls\", 3)"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["First, we get a high level view of the distribution of Risk. What percentage of applicants in our dataset represent Risk vs No Risk."]},{"metadata":{},"cell_type":"code","source":["print(df.groupby([TARGET_LABEL_COLUMN_NAME]).size())\n","risk_plot = sns.countplot(data=df, x=TARGET_LABEL_COLUMN_NAME, order=df[TARGET_LABEL_COLUMN_NAME].value_counts().index)\n","plt.ylabel('Count')\n","for p in risk_plot.patches:\n"," height = p.get_height()\n"," risk_plot.text(p.get_x()+p.get_width()/2., height + 1,'{0:.0%}'.format(height/float(len(df))),ha=\"center\") \n","plt.show()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["We can get use frequency counts charts to get an understanding of the categorical features relative to Risk\n","\n","- We can see that for `CheckingStatus` the records with 'no_checking' has more occurrences of Risk the other `CheckingStatus`.\n","- We can see that for `CreditHistory`, the loans that have no credits (i.e. all credit has been paid back) have no occurences of Risk (at least in this dataset). There is a small count of Risk for those applicants that have paid back all credit to date. And there is a higher frequency or ratio of Risk for applicants that have existing credit (i.e outstanding credit)."]},{"metadata":{},"cell_type":"code","source":["# Categorical feature count plots\n","f, ((ax1, ax2), (ax3, ax4), (ax5, ax6), (ax7, ax8), (ax9, ax10), (ax11, ax12), (ax13, ax14)) = plt.subplots(7, 2, figsize=(25, 25))\n","ax = [ax1, ax2, ax3, ax4, ax5, ax6, ax7, ax8, ax9, ax10, ax11, ax12, ax13, ax14 ]\n","\n","for i in range(len(categorical_features)):\n"," sns.countplot(x = categorical_features[i], hue=TARGET_LABEL_COLUMN_NAME, data=df, ax=ax[i])\n"," "],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["We can get use histrogram and boxplots to get an understanding of the distribution of our continuous / numerical features relative to Risk.\n","\n","- We can see that for loans that have Risk, the `InstallmentPercent` tends to be higher (i.e. the loans with Risk tend to have loan amounts with higher percentage of the loan applicants disposable income).\n","- We can see that those with 'No Risk' seem to be those with fewer existing credit loans at the bank (`ExistingCreditCount`)\n"]},{"metadata":{},"cell_type":"code","source":["# Continuous feature histograms.\n","f, ((ax1, ax2),(ax3, ax4), (ax5, ax6), (ax7, ax8)) = plt.subplots(4, 2, figsize=(25, 25))\n","ax = [ax1, ax2, ax3, ax4, ax5, ax6, ax7, ax8]\n","for i in range(len(continuous_features)):\n"," #sns.distplot(df[continuous_features[i]], bins=20, color=\"blue\", hist=True, ax=ax[i])\n"," sns.distplot(df[df.RISK == 'Risk'][continuous_features[i]], bins=20, color=\"Red\", hist=True, ax=ax[i])\n"," sns.distplot(df[df.RISK == 'No Risk'][continuous_features[i]], bins=20, color=\"blue\", hist=True, ax=ax[i])\n"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["# Plot boxplots of numerical columns. More variation in the boxplot implies higher significance. \n","f, ((ax1, ax2),(ax3, ax4), (ax5, ax6), (ax7, ax8)) = plt.subplots(4, 2, figsize=(25, 25))\n","ax = [ax1, ax2, ax3, ax4, ax5, ax6, ax7, ax8]\n","for i in range(len(continuous_features)):\n"," sns.boxplot(x = TARGET_LABEL_COLUMN_NAME, y = continuous_features[i], data=df, ax=ax[i])\n"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["## 3.0 Create a model\n","\n","Now we can create our machine learning model. You could use the insights / intuition gained from the data visualization steps above to what kind of model to create or which features to use. We will create a simple classification model."]},{"metadata":{},"cell_type":"code","source":["from pyspark.sql import SparkSession\n","import pandas as pd\n","import json\n","\n","spark = SparkSession.builder.getOrCreate()\n","df_data = spark.createDataFrame(df)\n","df_data.head()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 3.1 Split the data into training and test sets"]},{"metadata":{"scrolled":true},"cell_type":"code","source":["spark_df = df_data\n","(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)\n","\n","print(\"Number of records for training: \" + str(train_data.count()))\n","print(\"Number of records for evaluation: \" + str(test_data.count()))"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 3.2 Examine the Spark DataFrame Schema\n","Look at the data types to determine requirements for feature engineering"]},{"metadata":{},"cell_type":"code","source":["spark_df.printSchema()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 3.3 Use StringIndexer to encode a string column of labels to a column of label indices\n","\n","We are using the Pipeline package to build the development steps as pipeline. \n","We are using StringIndexer to handle categorical / string features from the dataset. StringIndexer encodes a string column of labels to a column of label indices\n","\n","We then use VectorAssembler to asemble these features into a vector. Pipelines API requires that input variables are passed in a vector"]},{"metadata":{},"cell_type":"code","source":["from pyspark.ml.classification import RandomForestClassifier\n","from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler, SQLTransformer\n","from pyspark.ml.evaluation import BinaryClassificationEvaluator\n","from pyspark.ml import Pipeline, Model\n","\n","#Create StringIndexer columns whose names are same as the categorical column with an appended _IX.\n","categorical_num_features = [x + '_IX' for x in categorical_features]\n","si_list = [StringIndexer(inputCol=nm_in, outputCol=nm_out) for nm_in, nm_out in zip(categorical_features, categorical_num_features)]"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["# Encode our target label column (i.e Risk or No Risk). \n","# Also, creates an label convert which performs an inverse map to get back a 'Risk' or 'No Risk' label from the encoded prediction.\n","si_label = StringIndexer(inputCol=TARGET_LABEL_COLUMN_NAME, outputCol=\"label\").fit(spark_df)\n","label_converter = IndexToString(inputCol=\"prediction\", outputCol=\"predictedLabel\", labels=si_label.labels)"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["# Construct all encoded categorical features plus continuous features into a vector\n","va_features = VectorAssembler(inputCols=categorical_num_features + continuous_features, outputCol=\"features\")"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 3.4 Create a pipeline, and fit a model using RandomForestClassifier \n","Assemble all the stages into a pipeline. We don't expect a clean linear regression, so we'll use RandomForestClassifier to find the best decision tree for the data.\n","\n","The pipeline will consist of: the feature string indexing step, the label string indexing Step, vector sssembly of all features step, random forest classifier, label converter step, and ending with a feature filter step.\n","\n","**Note: If you want filter features from model output, you could use the feature filter by replacing `*` with feature names to be retained in SQLTransformer statement.**"]},{"metadata":{"scrolled":true},"cell_type":"code","source":["classifier = RandomForestClassifier(featuresCol=\"features\")\n","feature_filter = SQLTransformer(statement=\"SELECT * FROM __THIS__\")\n","pipeline = Pipeline(stages= si_list + [si_label, va_features, classifier, label_converter, feature_filter])\n","\n","model = pipeline.fit(train_data)"],"execution_count":null,"outputs":[]},{"metadata":{"scrolled":true},"cell_type":"code","source":["predictions = model.transform(test_data)\n","evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol=\"prediction\", metricName='areaUnderROC')\n","area_under_curve = evaluatorDT.evaluate(predictions)\n","\n","evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol=\"prediction\", metricName='areaUnderPR')\n","area_under_PR = evaluatorDT.evaluate(predictions)\n","#default evaluation is areaUnderROC\n","print(\"areaUnderROC = %g\" % area_under_curve, \"areaUnderPR = %g\" % area_under_PR)"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### 3.5 evaluate more metrics by exporting them into pandas and numpy"]},{"metadata":{},"cell_type":"code","source":["from sklearn.metrics import classification_report\n","y_pred = predictions.toPandas()['prediction']\n","y_pred = ['Risk' if pred == 1.0 else 'No Risk' for pred in y_pred]\n","y_test = test_data.toPandas()[TARGET_LABEL_COLUMN_NAME]\n","print(classification_report(y_test, y_pred, target_names=['Risk', 'No Risk']))"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["## 4.0 Save the model and test data\n","\n","Now the model can be saved for future deployment. The model will be saved using the Watson Machine Learning client, to a deployment space."]},{"metadata":{},"cell_type":"markdown","source":["### 4.1 Save the model to ICP4D local Watson Machine Learning\n","\n","Replace the `username` and `password` values of `************` with your Cloud Pak for Data `username` and `password`. The value for `url` should match the `url` for your Cloud Pak for Data cluster."]},{"metadata":{},"cell_type":"code","source":["from watson_machine_learning_client import WatsonMachineLearningAPIClient\n","\n","wml_credentials = {\n"," \"url\": \"************\",\n"," \"username\": \"************\",\n"," \"password\" : \"************\",\n"," \"instance_id\": \"wml_local\",\n"," \"version\" : \"2.5.0\"\n"," }\n","\n","wml_client = WatsonMachineLearningAPIClient(wml_credentials)\n"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["wml_client.spaces.list()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["### Use the desired space as the `default_space`\n","\n","The deployment space ID will be looked up based on the name specified above. If you do not receive a space GUID as an output to the next cell, do not proceed until you have created a deployment space.\n","\n","**<< UPDATE THE VARIABLE 'MODEL_NAME' TO A UNIQUE NAME>>**\n","\n","**<< UPDATE THE VARIABLE 'DEPLOYMENT_SPACE_NAME' TO THE NAME OF THE DEPLOYMENT SPACE CREATED PREVIOUSLY>>**"]},{"metadata":{},"cell_type":"code","source":["MODEL_NAME = \"MY_NAME RISK MODEL\"\n","DEPLOYMENT_SPACE_NAME = \"MY_NAME RISK MODEL DEPLOYMENT\""],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["wml_client.spaces.list()\n","all_spaces = wml_client.spaces.get_details()['resources']\n","space_id = None\n","for space in all_spaces:\n"," if space['entity']['name'] == DEPLOYMENT_SPACE_NAME:\n"," space_id = space[\"metadata\"][\"guid\"]\n"," print(\"\\nDeployment Space GUID: \", space_id)\n","\n","if space_id is None:\n"," print(\"WARNING: Your space does not exist. Create a deployment space before proceeding to the next cell.\")\n"," #space_id = client.spaces.store(meta_props={client.spaces.ConfigurationMetaNames.NAME: space_name})[\"metadata\"][\"guid\"]"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["# Now set the default space to the GUID for your deployment space. If this is successful, you will see a 'SUCCESS' message.\n","wml_client.set.default_space('########-####-####-############')\n"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["#### (Optional) Remove Existing Model and Deployment"]},{"metadata":{"scrolled":true},"cell_type":"code","source":["#wml_models = wml_client.repository.get_model_details()\n","#model_uid = None\n","#for model_in in wml_models['resources']:\n","# if MODEL_NAME == model_in['entity']['name']:\n","# model_uid = model_in['metadata']['guid']\n","# print('Deleting model id', model_uid)\n","# wml_client.repository.delete(model_uid)\n","# break\n","#\n","# wml_client.repository.list_models()"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["#### Save the Model"]},{"metadata":{"scrolled":true},"cell_type":"code","source":["metadata = {\n"," wml_client.repository.ModelMetaNames.NAME: MODEL_NAME,\n"," wml_client.repository.ModelMetaNames.TYPE: 'mllib_2.3',\n"," wml_client.repository.ModelMetaNames.RUNTIME_UID: 'spark-mllib_2.3',\n","}\n","\n","published_model_details = wml_client.repository.store_model(model, metadata, training_data=df_data, pipeline=pipeline)\n","model_uid = wml_client.repository.get_model_uid(published_model_details)\n","\n","print(json.dumps(published_model_details, indent=3))\n"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":["# Use this cell to do any cleanup of previously created models and deployments\n","wml_client.repository.list_models()\n","wml_client.deployments.list()\n","\n","# wml_client.repository.delete('GUID of stored model')\n","# wml_client.deployments.delete('GUID of deployed model')"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":["## 5.0 Save Test Data\n","\n","We will save the test data we used to evaluate the model to our project. Although not required, this will make it easier to run batch tests later on."]},{"metadata":{},"cell_type":"code","source":["write_score_CSV=test_data.toPandas().drop([TARGET_LABEL_COLUMN_NAME], axis=1)\n","write_score_CSV.to_csv('/project_data/data_asset/GermanCreditRiskSparkMLBatchScore.csv', sep=',', index=False)\n","#project.save_data('GermanCreditRiskSparkMLBatchScore.csv', write_score_CSV.to_csv())\n","\n","write_eval_CSV=test_data.toPandas()\n","write_eval_CSV.to_csv('/project_data/data_asset/GermanCreditRiskSparkMLEval.csv', sep=',', index=False)\n","#project.save_data('GermanCreditRiskSparkMLEval.csv', write_eval_CSV.to_csv())"],"execution_count":null,"outputs":[]},{"metadata":{},"cell_type":"code","source":[],"execution_count":null,"outputs":[]}],"metadata":{"kernelspec":{"name":"python3","display_name":"Python 3.6","language":"python"},"language_info":{"name":"python","version":"3.6.10","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat":4,"nbformat_minor":1}