Flatten JSON data with Apache Spark Java API

Akash Patel
3 min readOct 10, 2019

--

Hi Data Engineers,
Few Weeks ago I was exloring Machine Learning concepts. There are ample amount of Algorithms are ready to solve your business cases, but they required datasets which they can parse to their parameters.
There are two common methods, Normalization of Data and Flatten out Data.
This blog, focused on JSON Dataset. What if your data is available in JSON format and we need to prepared dataset for tabular format dataset. (Semi Structured Dataset → Structured Dataset)

Create spark application config >> create or connect Spark Session

class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;

public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
}
return instance;
}
}
SparkConf sparkConf = new SparkConf().setAppName("SPARK-JSON-Transformation");

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf)
.builder()
.config(sparkConf)
.getOrCreate();

Create Sample JSON Dataset

Schema of out sample data

root
| — age: long (nullable = true)
| — name: string (nullable = true)
| — phones: array (nullable = true)
| | — element: struct (containsNull = true)
| | | — models: array (nullable = true)
| | | | — element: string (containsNull = true)
| | | — name: string (nullable = true)
| — watches: struct (nullable = true)
| | — models: array (nullable = true)
| | | — element: string (containsNull = true)
| | — name: string (nullable = true)

List<String> jsonData = Arrays.asList("{ \"name\": \"Akash\", \"age\": 26, \"watches\": { \"name\": \"Apple\", \"models\": [ \"Apple Watch Series 5\", \"Apple Watch Nike\" ] }, \"phones\": [ { \"name\": \"Apple\", \"models\": [ \"iphone X\", \"iphone XR\", \"iphone XS\", \"iphone 11\", \"iphone 11 Pro\" ] }, { \"name\": \"Samsung\", \"models\": [ \"Galaxy Note10\", \"Galaxy Note10+\", \"Galaxy S10e\", \"Galaxy S10\", \"Galaxy S10+\", ] }, { \"name\": \"Google\", \"models\": [ \"Pixel 3\", \"Pixel 3a\" ] } ] }");
Dataset<String> myTechDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> accessoriesDataset = spark.read().json(myTechDataset);
accessoriesDataset.show();

Magic function >> Flattens JSONArray and nested JSON objects

This flattenJSONdf function accepts Dataset and returns flattened and structured Dataset.

private static Dataset flattenJSONdf(Dataset<Row> ds) {

StructField[] fields = ds.schema().fields();

List<String> fieldsNames = new ArrayList<>();
for (StructField s : fields) {
fieldsNames.add(s.name());
}

for (int i = 0; i < fields.length; i++) {

StructField field = fields[i];
DataType fieldType = field.dataType();
String fieldName = field.name();

if (fieldType instanceof ArrayType) {
List<String> fieldNamesExcludingArray = new ArrayList<String>();
for (String fieldName_index : fieldsNames) {
if (!fieldName.equals(fieldName_index))
fieldNamesExcludingArray.add(fieldName_index);
}

List<String> fieldNamesAndExplode = new ArrayList<>(fieldNamesExcludingArray);
String s = String.format("explode_outer(%s) as %s", fieldName, fieldName);
fieldNamesAndExplode.add(s);

String[] exFieldsWithArray = new String[fieldNamesAndExplode.size()];
Dataset exploded_ds = ds.selectExpr(fieldNamesAndExplode.toArray(exFieldsWithArray));

// explodedDf.show();

return flattenJSONdf(exploded_ds);

}
else if (fieldType instanceof StructType) {

String[] childFieldnames_struct = ((StructType) fieldType).fieldNames();

List<String> childFieldnames = new ArrayList<>();
for (String childName : childFieldnames_struct) {
childFieldnames.add(fieldName + "." + childName);
}

List<String> newfieldNames = new ArrayList<>();
for (String fieldName_index : fieldsNames) {
if (!fieldName.equals(fieldName_index))
newfieldNames.add(fieldName_index);
}

newfieldNames.addAll(childFieldnames);

List<Column> renamedStrutctCols = new ArrayList<>();

for(String newFieldNames_index : newfieldNames){
renamedStrutctCols.add( new Column(newFieldNames_index.toString()).as(newFieldNames_index.toString().replace(".", "_")));
}

Seq renamedStructCols_seq = JavaConverters.collectionAsScalaIterableConverter(renamedStrutctCols).asScala().toSeq();

Dataset ds_struct = ds.select(renamedStructCols_seq);

return flattenJSONdf(ds_struct);
}
else{

}

}
return ds;
}

Call this magic function with our dataset

Dataset flattened_ds = flattenJSONdf(accessoriesDataset);

flattened_ds.show();

spark.stop();
spark.close();

Final Result…

https://github.com/Akashpatel579/Flatten-JSON-Data.git

--

--

Akash Patel
Akash Patel

Written by Akash Patel

Data Engineer — 🗡️ Samurai

Responses (3)