Flatten JSON data with Apache Spark Java API
Hi Data Engineers,
Few Weeks ago I was exloring Machine Learning concepts. There are ample amount of Algorithms are ready to solve your business cases, but they required datasets which they can parse to their parameters.
There are two common methods, Normalization of Data and Flatten out Data.
This blog, focused on JSON Dataset. What if your data is available in JSON format and we need to prepared dataset for tabular format dataset. (Semi Structured Dataset → Structured Dataset)
Create spark application config >> create or connect Spark Session
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
}
return instance;
}
}SparkConf sparkConf = new SparkConf().setAppName("SPARK-JSON-Transformation");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SparkSession spark = JavaSparkSessionSingleton.getInstance(sparkConf)
.builder()
.config(sparkConf)
.getOrCreate();
Create Sample JSON Dataset
Schema of out sample data
root
| — age: long (nullable = true)
| — name: string (nullable = true)
| — phones: array (nullable = true)
| | — element: struct (containsNull = true)
| | | — models: array (nullable = true)
| | | | — element: string (containsNull = true)
| | | — name: string (nullable = true)
| — watches: struct (nullable = true)
| | — models: array (nullable = true)
| | | — element: string (containsNull = true)
| | — name: string (nullable = true)
List<String> jsonData = Arrays.asList("{ \"name\": \"Akash\", \"age\": 26, \"watches\": { \"name\": \"Apple\", \"models\": [ \"Apple Watch Series 5\", \"Apple Watch Nike\" ] }, \"phones\": [ { \"name\": \"Apple\", \"models\": [ \"iphone X\", \"iphone XR\", \"iphone XS\", \"iphone 11\", \"iphone 11 Pro\" ] }, { \"name\": \"Samsung\", \"models\": [ \"Galaxy Note10\", \"Galaxy Note10+\", \"Galaxy S10e\", \"Galaxy S10\", \"Galaxy S10+\", ] }, { \"name\": \"Google\", \"models\": [ \"Pixel 3\", \"Pixel 3a\" ] } ] }");
Dataset<String> myTechDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> accessoriesDataset = spark.read().json(myTechDataset);
accessoriesDataset.show();
Magic function >> Flattens JSONArray and nested JSON objects
This flattenJSONdf function accepts Dataset and returns flattened and structured Dataset.
private static Dataset flattenJSONdf(Dataset<Row> ds) {
StructField[] fields = ds.schema().fields();
List<String> fieldsNames = new ArrayList<>();
for (StructField s : fields) {
fieldsNames.add(s.name());
}
for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
DataType fieldType = field.dataType();
String fieldName = field.name();
if (fieldType instanceof ArrayType) {
List<String> fieldNamesExcludingArray = new ArrayList<String>();
for (String fieldName_index : fieldsNames) {
if (!fieldName.equals(fieldName_index))
fieldNamesExcludingArray.add(fieldName_index);
}
List<String> fieldNamesAndExplode = new ArrayList<>(fieldNamesExcludingArray);
String s = String.format("explode_outer(%s) as %s", fieldName, fieldName);
fieldNamesAndExplode.add(s);
String[] exFieldsWithArray = new String[fieldNamesAndExplode.size()];
Dataset exploded_ds = ds.selectExpr(fieldNamesAndExplode.toArray(exFieldsWithArray));
// explodedDf.show();
return flattenJSONdf(exploded_ds);
}
else if (fieldType instanceof StructType) {
String[] childFieldnames_struct = ((StructType) fieldType).fieldNames();
List<String> childFieldnames = new ArrayList<>();
for (String childName : childFieldnames_struct) {
childFieldnames.add(fieldName + "." + childName);
}
List<String> newfieldNames = new ArrayList<>();
for (String fieldName_index : fieldsNames) {
if (!fieldName.equals(fieldName_index))
newfieldNames.add(fieldName_index);
}
newfieldNames.addAll(childFieldnames);
List<Column> renamedStrutctCols = new ArrayList<>();
for(String newFieldNames_index : newfieldNames){
renamedStrutctCols.add( new Column(newFieldNames_index.toString()).as(newFieldNames_index.toString().replace(".", "_")));
}
Seq renamedStructCols_seq = JavaConverters.collectionAsScalaIterableConverter(renamedStrutctCols).asScala().toSeq();
Dataset ds_struct = ds.select(renamedStructCols_seq);
return flattenJSONdf(ds_struct);
}
else{
}
}
return ds;
}
Call this magic function with our dataset
Dataset flattened_ds = flattenJSONdf(accessoriesDataset);
flattened_ds.show();
spark.stop();
spark.close();
Final Result…