Spark + RStudio

SparkR and RStudio Integration

Akash Patel
10 min readDec 3, 2022

In this article, we are going to cover — how to process a dataset using R Studio & SparkR APIs on the Spark Master-Slave cluster.

First of all, let’s built the ground, Why do we need SparkR and RStudio integration?
When we work on small to medium size datasets, Single Machine running on R can handle the workload, but on the other side if we work on the Giga bytes, Peta bytes scales of data; in that case we need SparkR like solution which can handle the workload in the distrubuted manner.

In order to set up the environment, we need to install the following software and Spark Cluster (Master-Slave) :

  • Install Java
  • Install Python
  • Install R
  • Install SparkR
  • Install RStudio

Let’s update the existing packages on the Ubuntu machine with the following Bash Command.

sudo apt-get update

Install Java & Validation

sudo update-alternatives --config java
java --version
Java Version

Install python & validation

sudo apt install python3.8
python --version
Python Version

Install R & Validation

sudo apt install r-base r-base-dev
R --version
R Version

Install R Studio

wget https://download1.rstudio.org/desktop/bionic/amd64/rstudio-2022.02.1-461-amd64.deb
sudo apt install -f ./rstudio-2022.02.1-461-amd64.deb

Install Spark

wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

Configure Environment Variables i.e. .bashrc

vi ~/.bashrc

Insert following lines to configure SPARK_HOME and Python alias.

export SPARK_HOME=/home/bigdatacomp01/Downloads/software/spark-3.3.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
alias python='python3'
source ~/.bashrc

Validate the PySpark installation

Spark Version
start-master.sh
start-slave.sh spark://ubuntu:7077
Start Master Node
Start Worker Node

Let’s open the webpage of the Master & Worker Nodes:
http://127.0.0.1:8080/
http://127.0.0.1:4040/executors/

Spark UI for Master and Worker nodes

Install Basic R packages

install.packages(c("rmarkdown", "ggplot2", "magrittr", "whisker", "data.table"))

SPARK_HOME & SparkR configuration

spark_path <- '/home/bigdatacomp01/Downloads/software/spark-3.3.1-bin-hadoop3'
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = spark_path)
}

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))

Start SparkSession and Connect through R Studio

# Connect to the Master-slave Cluster (Single Master and One Worker)
sparkR.session(master = "spark://ubuntu:7077", sparkConfig = list(spark.driver.memory = "2g"))
Spark Session has been started using the RStudio Application
We can see that the RStudio application has been connected to the Spark Master node.
# Read Existing Dataset
df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
First Spark Task — ShowDF has been executed successfully on the Worker node.
First Spark Task — ShowDF, the results show details about the memory usage, logs, etc..

Let’s explore the Apartment Building Evaluation dataset from the Toronto Open Data portal. We will perform read-write file operation and data cleaning steps using R-Studio and Spark. This will help everybody to understand the tools and their use cases.

# Input Dataset
filepath = "/home/bigdatacomp01/Downloads/dataset/Apartment Building Evaluation.csv"

# Read CSV file with Header and allow infer schema
appartment_df <- read.df(filepath, "csv", inferSchema = "true", header = "true" )
# Print Schema of the dataframe
printSchema(appartment_df)

Output:

root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: double (nullable = true)
|-- YEAR_EVALUATED: double (nullable = true)
|-- YEAR_BUILT: double (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
# Write data in to the parquet files with Overwrite option. 
write.df(appartment_df, path = "/home/bigdatacomp01/Downloads/dataset/parquet/", source = "parquet", mode = "overwrite")
After successfully converting the dataset from CSV to a Parquet file its directory structures looks like this.
# Read the Parquet files/dataset 
appartment_parquet_df <- read.df(path = "/home/bigdatacomp01/Downloads/dataset/parquet/", source = "parquet")

# Print Schema of the dataframe
printSchema(appartment_parquet_df)

root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: double (nullable = true)
|-- YEAR_EVALUATED: double (nullable = true)
|-- YEAR_BUILT: double (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
# Print the Dataset to review it
showDF(appartment_df)
Print the results of the appartment_df dataframe.
# High Level summary for the selected columns 
summary_df = select(
summary(appartment_df, "min", "25%", "75%", "max"),
"YEAR_REGISTERED", "YEAR_EVALUATED", "YEAR_BUILT")

showDF(summary_df)

Output:
+---------------+--------------+----------+
|YEAR_REGISTERED|YEAR_EVALUATED|YEAR_BUILT|
+---------------+--------------+----------+
| 2017.0| 2017.0| 1805.0|
| 2017.0| 2017.0| 1955.0|
| 2017.0| 2020.0| 1970.0|
| 2022.0| 2021.0| 2021.0|
+---------------+--------------+----------+
# As we show in the above listed columns have slighly wrong data types. 
# The columns should have integer data types instead float.
appartment_processed_df <- withColumn(appartment_df, "YEAR_REGISTERED_processed", cast(appartment_df$YEAR_REGISTERED, "int"))
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_EVALUATED_processed", cast(appartment_df$YEAR_EVALUATED, "int"))
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_BUILT_processed", cast(appartment_df$YEAR_BUILT, "int"))

# After correcting the schema let's print the schema details.
printSchema(appartment_processed_df)

Output :
root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: double (nullable = true)
|-- YEAR_EVALUATED: double (nullable = true)
|-- YEAR_BUILT: double (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
|-- YEAR_REGISTERED_processed: integer (nullable = true)
|-- YEAR_EVALUATED_processed: integer (nullable = true)
|-- YEAR_BUILT_processed: integer (nullable = true)
# Let's select the processed columns - YEAR_REGISTERED, YEAR_EVALUATED, YEAR_BUILT
processed_cols_df = (select(appartment_processed_df, appartment_processed_df$YEAR_REGISTERED_processed, appartment_processed_df$YEAR_EVALUATED_processed, appartment_processed_df$YEAR_BUILT_processed))

# Let's check the distinct values of the EAR_EVALUATED column
YEAR_EVALUATED_col_df = select(processed_cols_df, processed_cols_df$YEAR_EVALUATED_processed)
showDF(distinct(YEAR_EVALUATED_col_df))

Output :
+------------------------+
|YEAR_EVALUATED_processed|
+------------------------+
| 2018|
| null|
| 2019|
| 2020|
| 2017|
| 2021|
+------------------------+
# Replace existing columns with the processed columns 
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_REGISTERED", appartment_processed_df$YEAR_REGISTERED_processed)
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_EVALUATED", appartment_processed_df$YEAR_EVALUATED_processed)
appartment_processed_df <- withColumn(appartment_processed_df, "YEAR_BUILT", appartment_processed_df$YEAR_BUILT_processed)
# Drop Columns 
appartment_processed_df <- drop(appartment_processed_df, c("YEAR_REGISTERED_processed", "YEAR_EVALUATED_processed", "YEAR_BUILT_processed"))
# After correcting the schema let's print the schema details.
printSchema(appartment_processed_df)

Output:

root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: integer (nullable = true)
|-- YEAR_EVALUATED: integer (nullable = true)
|-- YEAR_BUILT: integer (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
# Let's print the the processed columns - YEAR_REGISTERED, YEAR_EVALUATED, YEAR_BUILT
temp_result_df <- (select(appartment_processed_df, appartment_processed_df$YEAR_REGISTERED, appartment_processed_df$YEAR_EVALUATED, appartment_processed_df$YEAR_BUILT))
showDF(temp_result_df)

Output:

+---------------+--------------+----------+
|YEAR_REGISTERED|YEAR_EVALUATED|YEAR_BUILT|
+---------------+--------------+----------+
| 2017| null| 1960|
| 2017| null| 1929|
| 2017| null| 1957|
| 2017| null| 1950|
| 2017| null| 1955|
| 2017| null| 1956|
| 2017| null| 1951|
| 2017| null| 1953|
| 2017| null| 1958|
| 2017| null| 1990|
| 2017| null| 1954|
| null| null| 1954|
| 2017| null| 1957|
| 2017| null| 1960|
| 2017| null| 1951|
| 2017| null| 1954|
| 2017| null| 1960|
| 2017| null| 1968|
| 2017| null| 1960|
| 2017| null| 1960|
+---------------+--------------+----------+
only showing top 20 rows
# Prepared final result dataframe
result_df <- appartment_processed_df

# Write final data in to the parquet files with Overwrite option.
write.df(result_df, path = "/home/bigdatacomp01/Downloads/output/appartment/parquet/", mode = "overwrite")
The Final Results have been stored with the parquet files
# Read the Parquet Result files/dataset 
appartment_parquet_df <- read.df(path = "/home/bigdatacomp01/Downloads/output/appartment/parquet/", source = "parquet")

# Print Schema of the dataframe
printSchema(appartment_parquet_df)

# Print the records
showDF(appartment_parquet_df)

Output :

root
|-- _id: string (nullable = true)
|-- RSN: integer (nullable = true)
|-- YEAR_REGISTERED: integer (nullable = true)
|-- YEAR_EVALUATED: integer (nullable = true)
|-- YEAR_BUILT: integer (nullable = true)
|-- PROPERTY_TYPE: string (nullable = true)
|-- WARD: integer (nullable = true)
|-- WARDNAME: string (nullable = true)
|-- SITE_ADDRESS: string (nullable = true)
|-- CONFIRMED_STOREYS: integer (nullable = true)
|-- CONFIRMED_UNITS: integer (nullable = true)
|-- EVALUATION_COMPLETED_ON: timestamp (nullable = true)
|-- SCORE: integer (nullable = true)
|-- RESULTS_OF_SCORE: string (nullable = true)
|-- NO_OF_AREAS_EVALUATED: integer (nullable = true)
|-- ENTRANCE_LOBBY: double (nullable = true)
|-- ENTRANCE_DOORS_WINDOWS: double (nullable = true)
|-- SECURITY: double (nullable = true)
|-- STAIRWELLS: double (nullable = true)
|-- LAUNDRY_ROOMS: double (nullable = true)
|-- INTERNAL_GUARDS_HANDRAILS: double (nullable = true)
|-- GARBAGE_CHUTE_ROOMS: double (nullable = true)
|-- GARBAGE_BIN_STORAGE_AREA: double (nullable = true)
|-- ELEVATORS: double (nullable = true)
|-- STORAGE_AREAS_LOCKERS: double (nullable = true)
|-- INTERIOR_WALL_CEILING_FLOOR: double (nullable = true)
|-- INTERIOR_LIGHTING_LEVELS: double (nullable = true)
|-- GRAFFITI: double (nullable = true)
|-- EXTERIOR_CLADDING: double (nullable = true)
|-- EXTERIOR_GROUNDS: double (nullable = true)
|-- EXTERIOR_WALKWAYS: double (nullable = true)
|-- BALCONY_GUARDS: double (nullable = true)
|-- WATER_PEN_EXT_BLDG_ELEMENTS: double (nullable = true)
|-- PARKING_AREA: double (nullable = true)
|-- OTHER_FACILITIES: double (nullable = true)
|-- GRID: string (nullable = true)
|-- LATITUDE: double (nullable = true)
|-- LONGITUDE: double (nullable = true)
|-- X: double (nullable = true)
|-- Y: double (nullable = true)
We can see that the processed columns have been read with the proper data type and formats.
These are the activities performed by the Spark application on the Master-Slave Spark cluster using Rstudio.

Congratulation! You have successfully configured RStudio with SparkR. Now, it’s ready to use!!

--

--

Akash Patel
Akash Patel

Written by Akash Patel

Data Engineer — 🗡️ Samurai

No responses yet