How to define and use a User-Defined Aggregate Function in Spark SQL?





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







35















I know how to write a UDF in Spark SQL:



def belowThreshold(power: Int): Boolean = {
return power < -40
}

sqlContext.udf.register("belowThreshold", belowThreshold _)


Can I do something similar to define an aggregate function? How is this done?



For context, I want to run the following SQL query:



val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")


It should return something like



Row(span1, false, T0)



I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?










share|improve this question

























  • Related: stackoverflow.com/questions/33899977/…

    – Josiah Yoder
    Aug 12 '16 at 16:24











  • Perhaps use reduceByKey / foldByKey as recommended by zero323

    – Josiah Yoder
    Aug 12 '16 at 16:27


















35















I know how to write a UDF in Spark SQL:



def belowThreshold(power: Int): Boolean = {
return power < -40
}

sqlContext.udf.register("belowThreshold", belowThreshold _)


Can I do something similar to define an aggregate function? How is this done?



For context, I want to run the following SQL query:



val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")


It should return something like



Row(span1, false, T0)



I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?










share|improve this question

























  • Related: stackoverflow.com/questions/33899977/…

    – Josiah Yoder
    Aug 12 '16 at 16:24











  • Perhaps use reduceByKey / foldByKey as recommended by zero323

    – Josiah Yoder
    Aug 12 '16 at 16:27














35












35








35


18






I know how to write a UDF in Spark SQL:



def belowThreshold(power: Int): Boolean = {
return power < -40
}

sqlContext.udf.register("belowThreshold", belowThreshold _)


Can I do something similar to define an aggregate function? How is this done?



For context, I want to run the following SQL query:



val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")


It should return something like



Row(span1, false, T0)



I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?










share|improve this question
















I know how to write a UDF in Spark SQL:



def belowThreshold(power: Int): Boolean = {
return power < -40
}

sqlContext.udf.register("belowThreshold", belowThreshold _)


Can I do something similar to define an aggregate function? How is this done?



For context, I want to run the following SQL query:



val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")


It should return something like



Row(span1, false, T0)



I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?







scala apache-spark apache-spark-sql aggregate-functions user-defined-functions






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Oct 8 '18 at 19:19









user6910411

36.2k1092114




36.2k1092114










asked Aug 19 '15 at 16:28









Rory ByrneRory Byrne

4191619




4191619













  • Related: stackoverflow.com/questions/33899977/…

    – Josiah Yoder
    Aug 12 '16 at 16:24











  • Perhaps use reduceByKey / foldByKey as recommended by zero323

    – Josiah Yoder
    Aug 12 '16 at 16:27



















  • Related: stackoverflow.com/questions/33899977/…

    – Josiah Yoder
    Aug 12 '16 at 16:24











  • Perhaps use reduceByKey / foldByKey as recommended by zero323

    – Josiah Yoder
    Aug 12 '16 at 16:27

















Related: stackoverflow.com/questions/33899977/…

– Josiah Yoder
Aug 12 '16 at 16:24





Related: stackoverflow.com/questions/33899977/…

– Josiah Yoder
Aug 12 '16 at 16:24













Perhaps use reduceByKey / foldByKey as recommended by zero323

– Josiah Yoder
Aug 12 '16 at 16:27





Perhaps use reduceByKey / foldByKey as recommended by zero323

– Josiah Yoder
Aug 12 '16 at 16:27












1 Answer
1






active

oldest

votes


















73














Supported methods



Spark >= 2.3



Vectorized udf (Python only):



from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df

return below_threshold_


Example usage:



df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+


See also Applying UDFs on GroupedData in PySpark (with functioning python example)



Spark >= 2.0 (optionally 1.6 but with slightly different API):



It is possible to use Aggregators on typed Datasets:



import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc

def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)


Spark >= 1.5:



In Spark 1.5 you can create UDAF like this although it is most likely an overkill:



import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}


Example usage:



df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark 1.4 workaround:



I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:



val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark <= 1.4:



As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).



Unsupported / internal methods



Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.



There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):



import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)

override def nullable: Boolean = false
override def dataType: DataType = BooleanType

private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()

// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil

override lazy val initialValues = Seq(
Literal(false)
)

override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))

override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)

override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}


It should be further wrapped with an equivalent of withAggregateFunction.






share|improve this answer


























  • As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

    – Martin Tapp
    Nov 15 '16 at 12:58














Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f32100973%2fhow-to-define-and-use-a-user-defined-aggregate-function-in-spark-sql%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









73














Supported methods



Spark >= 2.3



Vectorized udf (Python only):



from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df

return below_threshold_


Example usage:



df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+


See also Applying UDFs on GroupedData in PySpark (with functioning python example)



Spark >= 2.0 (optionally 1.6 but with slightly different API):



It is possible to use Aggregators on typed Datasets:



import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc

def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)


Spark >= 1.5:



In Spark 1.5 you can create UDAF like this although it is most likely an overkill:



import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}


Example usage:



df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark 1.4 workaround:



I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:



val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark <= 1.4:



As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).



Unsupported / internal methods



Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.



There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):



import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)

override def nullable: Boolean = false
override def dataType: DataType = BooleanType

private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()

// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil

override lazy val initialValues = Seq(
Literal(false)
)

override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))

override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)

override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}


It should be further wrapped with an equivalent of withAggregateFunction.






share|improve this answer


























  • As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

    – Martin Tapp
    Nov 15 '16 at 12:58


















73














Supported methods



Spark >= 2.3



Vectorized udf (Python only):



from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df

return below_threshold_


Example usage:



df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+


See also Applying UDFs on GroupedData in PySpark (with functioning python example)



Spark >= 2.0 (optionally 1.6 but with slightly different API):



It is possible to use Aggregators on typed Datasets:



import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc

def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)


Spark >= 1.5:



In Spark 1.5 you can create UDAF like this although it is most likely an overkill:



import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}


Example usage:



df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark 1.4 workaround:



I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:



val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark <= 1.4:



As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).



Unsupported / internal methods



Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.



There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):



import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)

override def nullable: Boolean = false
override def dataType: DataType = BooleanType

private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()

// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil

override lazy val initialValues = Seq(
Literal(false)
)

override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))

override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)

override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}


It should be further wrapped with an equivalent of withAggregateFunction.






share|improve this answer


























  • As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

    – Martin Tapp
    Nov 15 '16 at 12:58
















73












73








73







Supported methods



Spark >= 2.3



Vectorized udf (Python only):



from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df

return below_threshold_


Example usage:



df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+


See also Applying UDFs on GroupedData in PySpark (with functioning python example)



Spark >= 2.0 (optionally 1.6 but with slightly different API):



It is possible to use Aggregators on typed Datasets:



import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc

def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)


Spark >= 1.5:



In Spark 1.5 you can create UDAF like this although it is most likely an overkill:



import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}


Example usage:



df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark 1.4 workaround:



I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:



val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark <= 1.4:



As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).



Unsupported / internal methods



Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.



There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):



import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)

override def nullable: Boolean = false
override def dataType: DataType = BooleanType

private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()

// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil

override lazy val initialValues = Seq(
Literal(false)
)

override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))

override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)

override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}


It should be further wrapped with an equivalent of withAggregateFunction.






share|improve this answer















Supported methods



Spark >= 2.3



Vectorized udf (Python only):



from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
@pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
def below_threshold_(df):
df = pd.DataFrame(
df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
df.reset_index(inplace=True, drop=False)
return df

return below_threshold_


Example usage:



df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## | b| true|
## | a| false|
## +-----+---------------+


See also Applying UDFs on GroupedData in PySpark (with functioning python example)



Spark >= 2.0 (optionally 1.6 but with slightly different API):



It is possible to use Aggregators on typed Datasets:



import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]
with Serializable {
def zero = false
def reduce(acc: Boolean, x: I) = acc | f(x)
def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
def finish(acc: Boolean) = acc

def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)


Spark >= 1.5:



In Spark 1.5 you can create UDAF like this although it is most likely an overkill:



import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}


Example usage:



df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark 1.4 workaround:



I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:



val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+


Spark <= 1.4:



As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).



Unsupported / internal methods



Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.



There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):



import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression)
extends DeclarativeAggregate {
override def children: Seq[Expression] = Seq(child, threshold)

override def nullable: Boolean = false
override def dataType: DataType = BooleanType

private lazy val belowThreshold = AttributeReference(
"belowThreshold", BooleanType, nullable = false
)()

// Used to derive schema
override lazy val aggBufferAttributes = belowThreshold :: Nil

override lazy val initialValues = Seq(
Literal(false)
)

override lazy val updateExpressions = Seq(Or(
belowThreshold,
If(IsNull(child), Literal(false), LessThan(child, threshold))
))

override lazy val mergeExpressions = Seq(
Or(belowThreshold.left, belowThreshold.right)
)

override lazy val evaluateExpression = belowThreshold
override def defaultResult: Option[Literal] = Option(Literal(false))
}


It should be further wrapped with an equivalent of withAggregateFunction.







share|improve this answer














share|improve this answer



share|improve this answer








edited Jan 31 at 23:29









user6910411

36.2k1092114




36.2k1092114










answered Aug 19 '15 at 17:02









zero323zero323

175k42518591




175k42518591













  • As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

    – Martin Tapp
    Nov 15 '16 at 12:58





















  • As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

    – Martin Tapp
    Nov 15 '16 at 12:58



















As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

– Martin Tapp
Nov 15 '16 at 12:58







As of Spark 2.0.1, Aggregator works with both groupBy and groupByKey (see github.com/apache/spark/blob/master/sql/core/src/test/scala/org/…). Unfortunatelly, Aggregator doesn't work on windows where you need to use UserDefinedAggregateFunction.

– Martin Tapp
Nov 15 '16 at 12:58






















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f32100973%2fhow-to-define-and-use-a-user-defined-aggregate-function-in-spark-sql%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Biblatex bibliography style without URLs when DOI exists (in Overleaf with Zotero bibliography)

ComboBox Display Member on multiple fields

Is it possible to collect Nectar points via Trainline?