Spark Chained Transformations

Chanukya Pekala
3 min readJan 8, 2024
Dalle 3 — AI generated image, prompt relevant to the topic

In typical data engineering tasks, we follow some procedural style of code, if we take a sample dataset,for eg.,

  1. Read and parse the file
  2. Perform some aggregations on top of the file
  3. Persist the aggregation to a table or share the results on console

In Spark, we can do this in multiple ways, but lets see couple of organized approaches

Approach #1: Chained Transformations

a simple aggregation and filter transformation

In the above class — we could notice two simple functions, each handling a particular task — aggregation and a filter

entry point class with chained transformations example

Here if you notice the main class — ChainedTransformationMain, we read a open source spotify dataset to load the data to a dataframe and on top of it, we would like to apply some transformations like aggregation and filter. Its much easier now to chain the set of transformations here, its a lot easier to understand and maintain the code.

val result = loadSpotifyAlbum
.transform(numAlbumPopularity)
.transform(greaterThan6kAlbums)

Here we have used transform function. It is a function that is used in Spark DataFrames, and is used to apply a function to one or more columns of a DataFrame, generating new columns with the results of the function.

def transform[U](func: (DataFrame) => U): U

The func argument is a function that accepts a DataFrame as its argument, performs some transformations on the DataFrame, and returns a result of any type U.

When transform is called, it applies the func function to the DataFrame column(s) specified in the code block, and returns the result of the func function.

I would like to highlight that, the chained transformations is a best way to practice the spark development. It helps in

  1. readability of the code to process smaller transformations to chain together.
  2. performance this allows operations to be optimized and executed in a single pass, which can reduce the time and resources needed to execute the processing steps.
  3. maintainabilityIf each transformation is a separate method, it can be reused in other parts of the code, which makes it easier to maintain the code and can result in fewer bugs.

Lets see another approach, even further..!

Approach #2: Implicit Classes

same transformations — aggregate and filter, but inside implicit class

implicit class is a syntactic sugar in Scala that allows implicit conversions between the original class and the new enriched class. In this case, ClassDataFrame enriches instances of the DataFrame class with the two additional methods — numAlbumPopularity and greaterThan6kAlbums

chained transformations but with much cleaner approach..

If you see the readability aspect of the code of result variable, it clearly states that loading the spotify album , count the number of album popularity and find the greater than 6k albums — its more like reading simple english sentence, if the variables are appropriately named.

val result = loadSpotifyAlbum.numAlbumPopularity.greaterThan6kAlbums 

Execution — I did try to run this on a Databricks notebook, so there was not need for explicit spark session as a separate variable. Code was executed something like this..

ImplicitClassMain.main(Array("Hey Implicit Class!"))
ChainedTransformationMain.main(Array("Hey Chained Transform!"))

Conclusion

I would like to highlight that we do have multiple approaches to handle spark transformations — transform and implicit class I have been using transform for a while but the other approach is something I felt extremely interesting and useful, because of the readability concerns.

Code

Its maintained in our datatribe community github account. Link is here

References

transform, implicit class

--

--