SparkR and RStudio Integration
In this article, we are going to cover — how to process a dataset using R Studio & SparkR APIs on the Spark Master-Slave cluster.
First of all, let’s built the ground, Why do we need SparkR and RStudio integration?
When we work on small to medium size datasets, Single Machine running on R can handle the workload, but on the other side if we work on the Giga bytes, Peta bytes scales of data; in that case we need SparkR like solution which can handle the workload in the distrubuted manner.
In order to set up the environment, we need to install the following software and Spark Cluster (Master-Slave) :
- Install Java
- Install Python
- Install R
- Install SparkR
- Install RStudio
Let’s update the existing packages on the Ubuntu machine with the following Bash Command.
sudo apt-get update
Install Java & Validation
sudo update-alternatives --config java
java --version
Install python & validation
sudo apt install python3.8
python --version
Install R & Validation
sudo apt install r-base r-base-dev
R --version
Install R Studio
wget https://download1.rstudio.org/desktop/bionic/amd64/rstudio-2022.02.1-461-amd64.deb
sudo apt install -f ./rstudio-2022.02.1-461-amd64.deb
Install Spark
wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Configure Environment Variables i.e. .bashrc
vi ~/.bashrc
Insert following lines to configure SPARK_HOME and Python alias.
export SPARK_HOME=/home/bigdatacomp01/Downloads/software/spark-3.3.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
alias python='python3'
source ~/.bashrc
Validate the PySpark installation
start-master.sh
start-slave.sh spark://ubuntu:7077
Let’s open the webpage of the Master & Worker Nodes:
http://127.0.0.1:8080/
http://127.0.0.1:4040/executors/
Install Basic R packages
install.packages(c("rmarkdown", "ggplot2", "magrittr", "whisker", "data.table"))
SPARK_HOME & SparkR configuration
spark_path <- '/home/bigdatacomp01/Downloads/software/spark-3.3.1-bin-hadoop3'
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = spark_path)
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
Start SparkSession and Connect through R Studio
# Connect to the Master-slave Cluster (Single Master and One Worker)
sparkR.session(master = "spark://ubuntu:7077", sparkConfig = list(spark.driver.memory = "2g"))
# Read Existing Dataset
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
Let’s explore the Apartment Building Evaluation dataset from the Toronto Open Data portal. We will perform read-write file operation and data cleaning steps using R-Studio and Spark. This will help everybody to understand the tools and their use cases.
# Input Dataset
filepath = "/home/bigdatacomp01/Downloads/dataset/Apartment Building Evaluation.csv"
# Read CSV file with Header and allow infer schema
appartment_df <- read.df(filepath, "csv", inferSchema = "true", header = "true" )
# Print Schema of the dataframe
printSchema(appartment_df)
Output:
root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: double (nullable = true)
|-- YEAR_EVALUATED: double (nullable = true)
|-- YEAR_BUILT: double (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
# Write data in to the parquet files with Overwrite option.
write.df(appartment_df, path = "/home/bigdatacomp01/Downloads/dataset/parquet/", source = "parquet", mode = "overwrite")
# Read the Parquet files/dataset
appartment_parquet_df <- read.df(path = "/home/bigdatacomp01/Downloads/dataset/parquet/", source = "parquet")
# Print Schema of the dataframe
printSchema(appartment_parquet_df)
root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: double (nullable = true)
|-- YEAR_EVALUATED: double (nullable = true)
|-- YEAR_BUILT: double (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
# Print the Dataset to review it
showDF(appartment_df)
# High Level summary for the selected columns
summary_df = select(
summary(appartment_df, "min", "25%", "75%", "max"),
"YEAR_REGISTERED", "YEAR_EVALUATED", "YEAR_BUILT")
showDF(summary_df)
Output:
+---------------+--------------+----------+
|YEAR_REGISTERED|YEAR_EVALUATED|YEAR_BUILT|
+---------------+--------------+----------+
| 2017.0| 2017.0| 1805.0|
| 2017.0| 2017.0| 1955.0|
| 2017.0| 2020.0| 1970.0|
| 2022.0| 2021.0| 2021.0|
+---------------+--------------+----------+
# As we show in the above listed columns have slighly wrong data types.
# The columns should have integer data types instead float.
appartment_processed_df <- withColumn(appartment_df, "YEAR_REGISTERED_processed", cast(appartment_df$YEAR_REGISTERED, "int"))
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_EVALUATED_processed", cast(appartment_df$YEAR_EVALUATED, "int"))
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_BUILT_processed", cast(appartment_df$YEAR_BUILT, "int"))
# After correcting the schema let's print the schema details.
printSchema(appartment_processed_df)
Output :
root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: double (nullable = true)
|-- YEAR_EVALUATED: double (nullable = true)
|-- YEAR_BUILT: double (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
|-- YEAR_REGISTERED_processed: integer (nullable = true)
|-- YEAR_EVALUATED_processed: integer (nullable = true)
|-- YEAR_BUILT_processed: integer (nullable = true)
# Let's select the processed columns - YEAR_REGISTERED, YEAR_EVALUATED, YEAR_BUILT
processed_cols_df = (select(appartment_processed_df, appartment_processed_df$YEAR_REGISTERED_processed, appartment_processed_df$YEAR_EVALUATED_processed, appartment_processed_df$YEAR_BUILT_processed))
# Let's check the distinct values of the EAR_EVALUATED column
YEAR_EVALUATED_col_df = select(processed_cols_df, processed_cols_df$YEAR_EVALUATED_processed)
showDF(distinct(YEAR_EVALUATED_col_df))
Output :
+------------------------+
|YEAR_EVALUATED_processed|
+------------------------+
| 2018|
| null|
| 2019|
| 2020|
| 2017|
| 2021|
+------------------------+
# Replace existing columns with the processed columns
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_REGISTERED", appartment_processed_df$YEAR_REGISTERED_processed)
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_EVALUATED", appartment_processed_df$YEAR_EVALUATED_processed)
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_BUILT", appartment_processed_df$YEAR_BUILT_processed)
# Drop Columns
appartment_processed_df <- drop(appartment_processed_df, c("YEAR_REGISTERED_processed", "YEAR_EVALUATED_processed", "YEAR_BUILT_processed"))
# After correcting the schema let's print the schema details.
printSchema(appartment_processed_df)
Output:
root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: integer (nullable = true)
|-- YEAR_EVALUATED: integer (nullable = true)
|-- YEAR_BUILT: integer (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
# Let's print the the processed columns - YEAR_REGISTERED, YEAR_EVALUATED, YEAR_BUILT
temp_result_df <- (select(appartment_processed_df, appartment_processed_df$YEAR_REGISTERED, appartment_processed_df$YEAR_EVALUATED, appartment_processed_df$YEAR_BUILT))
showDF(temp_result_df)
Output:
+---------------+--------------+----------+
|YEAR_REGISTERED|YEAR_EVALUATED|YEAR_BUILT|
+---------------+--------------+----------+
| 2017| null| 1960|
| 2017| null| 1929|
| 2017| null| 1957|
| 2017| null| 1950|
| 2017| null| 1955|
| 2017| null| 1956|
| 2017| null| 1951|
| 2017| null| 1953|
| 2017| null| 1958|
| 2017| null| 1990|
| 2017| null| 1954|
| null| null| 1954|
| 2017| null| 1957|
| 2017| null| 1960|
| 2017| null| 1951|
| 2017| null| 1954|
| 2017| null| 1960|
| 2017| null| 1968|
| 2017| null| 1960|
| 2017| null| 1960|
+---------------+--------------+----------+
only showing top 20 rows
# Prepared final result dataframe
result_df <- appartment_processed_df
# Write final data in to the parquet files with Overwrite option.
write.df(result_df, path = "/home/bigdatacomp01/Downloads/output/appartment/parquet/", mode = "overwrite")
# Read the Parquet Result files/dataset
appartment_parquet_df <- read.df(path = "/home/bigdatacomp01/Downloads/output/appartment/parquet/", source = "parquet")
# Print Schema of the dataframe
printSchema(appartment_parquet_df)
# Print the records
showDF(appartment_parquet_df)
Output :
root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: integer (nullable = true)
|-- YEAR_EVALUATED: integer (nullable = true)
|-- YEAR_BUILT: integer (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
Congratulation! You have successfully configured RStudio with SparkR. Now, it’s ready to use!!