## How to find median and quantiles using Spark?

## Method 1

Here is the method I used using window functions (with pyspark 2.2.0).

from pyspark.sql import DataFrame class median(): """ Create median class with over method to pass partition """ def __init__(self, df, col, name): assert col self.column=col self.df = df self.name = name def over(self, window): from pyspark.sql.functions import percent_rank, pow, first first_window = window.orderBy(self.column) # first, order by column we want to compute the median for df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median def addMedian(self, col, median_name): """ Method to be added to spark native DataFrame class """ return median(self, col, median_name) # Add method to DataFrame class DataFrame.addMedian = addMedian

Then call the addMedian method to calculate the median of col2:

from pyspark.sql import Window median_window = Window.partitionBy("col1") df = df.addMedian("col2", "median").over(median_window)

Finally you can group by if needed.

df.groupby("col1", "median")

## Method 2

Adding a solution if you want an RDD method only and dont want to move to DF. This snippet can get you a percentile for an RDD of double.

If you input percentile as 50, you should obtain your required median. Let me know if there are any corner cases not accounted for.

/** * Gets the nth percentile entry for an RDD of doubles * * @param inputScore : Input scores consisting of a RDD of doubles * @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23. * It prefers the higher value when the desired quantile lies between two data points * @return : The number best representing the percentile in the Rdd of double */ def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = { val numEntries = inputScore.count().toDouble val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt inputScore .sortBy { case (score) => score } .zipWithIndex() .filter { case (score, index) => index == retrievedEntry } .map { case (score, index) => score } .collect()(0) }

