close

How to find median and quantiles using Spark

Hello Guys, How are you all? Hope You all Are Fine. Today We Are Going To learn about How to find median and quantiles using Spark in Python. So Here I am Explain to you all the possible Methods here.

Without wasting your time, Let’s start This Article.

Table of Contents

How to find median and quantiles using Spark?

  1. How to find median and quantiles using Spark?

    Adding a solution if you want an RDD method only and dont want to move to DF. This snippet can get you a percentile for an RDD of double.

  2. find median and quantiles using Spark

    Adding a solution if you want an RDD method only and dont want to move to DF. This snippet can get you a percentile for an RDD of double.

Method 1

Here is the method I used using window functions (with pyspark 2.2.0).

from pyspark.sql import DataFrame

class median():
    """ Create median class with over method to pass partition """
    def __init__(self, df, col, name):
        assert col
        self.column=col
        self.df = df
        self.name = name

    def over(self, window):
        from pyspark.sql.functions import percent_rank, pow, first

        first_window = window.orderBy(self.column)                                  # first, order by column we want to compute the median for
        df = self.df.withColumn("percent_rank", percent_rank().over(first_window))  # add percent_rank column, percent_rank = 0.5 coressponds to median
        second_window = window.orderBy(pow(df.percent_rank-0.5, 2))                 # order by (percent_rank - 0.5)^2 ascending
        return df.withColumn(self.name, first(self.column).over(second_window))     # the first row of the window corresponds to median

def addMedian(self, col, median_name):
    """ Method to be added to spark native DataFrame class """
    return median(self, col, median_name)

# Add method to DataFrame class
DataFrame.addMedian = addMedian

Then call the addMedian method to calculate the median of col2:

from pyspark.sql import Window

median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)

Finally you can group by if needed.

df.groupby("col1", "median")

Method 2

Adding a solution if you want an RDD method only and dont want to move to DF. This snippet can get you a percentile for an RDD of double.

If you input percentile as 50, you should obtain your required median. Let me know if there are any corner cases not accounted for.

/**
  * Gets the nth percentile entry for an RDD of doubles
  *
  * @param inputScore : Input scores consisting of a RDD of doubles
  * @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
  *                     It prefers the higher value when the desired quantile lies between two data points
  * @return : The number best representing the percentile in the Rdd of double
  */    
  def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
    val numEntries = inputScore.count().toDouble
    val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt


    inputScore
      .sortBy { case (score) => score }
      .zipWithIndex()
      .filter { case (score, index) => index == retrievedEntry }
      .map { case (score, index) => score }
      .collect()(0)
  }

Conclusion

It’s all About this issue. Hope all Methods helped you a lot. Comment below Your thoughts and your queries. Also, Comment below which Method worked for you? Thank You.

Also, Read