How to split dataset to two datasets with unique and duplicate rows each?












3















I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.










share|improve this question

























  • You can specify comma separated list of columns in partitionBy().

    – vindev
    Nov 22 '18 at 5:40
















3















I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.










share|improve this question

























  • You can specify comma separated list of columns in partitionBy().

    – vindev
    Nov 22 '18 at 5:40














3












3








3








I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.










share|improve this question
















I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.



the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.



My Code:



 var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')


val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()


Input Dataframe: (findDuplicateRecordsDF.show())



       --------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------


here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.



The Output Dataframe should be





  1. Duplicate Records Output



           --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 3 | sam | 23 | 9876543210 | sam@yahoo.com |
    | 3 | sam | 28 | 9876543210 | sam@yahoo.com |
    | 6 | haris | 30 | 6543210777 | haris@gmail.com |
    | 6 | haris | 24 | 6543210777 | haris@gmail.com |
    --------------------------------------------------------



  2. Unique Records Dataframe Output:



          --------------------------------------------------------
    | id | name | age | phone | email_id |
    |-------------------------------------------------------|
    | 7 | ram | 27 | 8765432190 | ram@gmail.com |
    | 9 | ram | 27 | 8765432130 | ram94@gmail.com |
    | 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
    --------------------------------------------------------



Thanks in advance.







scala apache-spark apache-spark-sql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 25 '18 at 19:42









Jacek Laskowski

46.2k18138278




46.2k18138278










asked Nov 22 '18 at 4:42









stack0114104stack0114104

326




326













  • You can specify comma separated list of columns in partitionBy().

    – vindev
    Nov 22 '18 at 5:40



















  • You can specify comma separated list of columns in partitionBy().

    – vindev
    Nov 22 '18 at 5:40

















You can specify comma separated list of columns in partitionBy().

– vindev
Nov 22 '18 at 5:40





You can specify comma separated list of columns in partitionBy().

– vindev
Nov 22 '18 at 5:40












2 Answers
2






active

oldest

votes


















1














You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )


Filter only when all 3 columns are either blank or null



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)





share|improve this answer


























  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

    – stack0114104
    Nov 27 '18 at 7:41











  • seems dup_cols_qry is empty string.. check again

    – stack0114106
    Nov 27 '18 at 8:40











  • looks like you posted one more question and marked it as duplicate..

    – stack0114106
    Nov 27 '18 at 8:52











  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

    – stack0114104
    Nov 27 '18 at 9:29













  • good..glad that it worked

    – stack0114106
    Nov 27 '18 at 9:32



















0














You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer


























  • the input contain N no of columns .. Its dynamic value

    – stack0114104
    Nov 22 '18 at 5:56












Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424011%2fhow-to-split-dataset-to-two-datasets-with-unique-and-duplicate-rows-each%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























2 Answers
2






active

oldest

votes








2 Answers
2






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )


Filter only when all 3 columns are either blank or null



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)





share|improve this answer


























  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

    – stack0114104
    Nov 27 '18 at 7:41











  • seems dup_cols_qry is empty string.. check again

    – stack0114106
    Nov 27 '18 at 8:40











  • looks like you posted one more question and marked it as duplicate..

    – stack0114106
    Nov 27 '18 at 8:52











  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

    – stack0114104
    Nov 27 '18 at 9:29













  • good..glad that it worked

    – stack0114106
    Nov 27 '18 at 9:32
















1














You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )


Filter only when all 3 columns are either blank or null



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)





share|improve this answer


























  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

    – stack0114104
    Nov 27 '18 at 7:41











  • seems dup_cols_qry is empty string.. check again

    – stack0114106
    Nov 27 '18 at 8:40











  • looks like you posted one more question and marked it as duplicate..

    – stack0114106
    Nov 27 '18 at 8:52











  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

    – stack0114104
    Nov 27 '18 at 9:29













  • good..glad that it worked

    – stack0114106
    Nov 27 '18 at 9:32














1












1








1







You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )


Filter only when all 3 columns are either blank or null



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)





share|improve this answer















You can use window functions. Check this out



scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)

scala> df.createOrReplaceTempView("contact")

scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "

scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]

scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+


scala> df2.createOrReplaceTempView("contact2")


//Duplicates



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+


// Unique



scala>  spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+


EDIT2:



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show


Results:



+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+


EDIT3: - Null condition check



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)


Results:



+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+


|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+

+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+


blank check



val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )


Filter only when all 3 columns are either blank or null



val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")

val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)

val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)






share|improve this answer














share|improve this answer



share|improve this answer








edited Dec 14 '18 at 9:30

























answered Nov 22 '18 at 6:53









stack0114106stack0114106

4,9632423




4,9632423













  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

    – stack0114104
    Nov 27 '18 at 7:41











  • seems dup_cols_qry is empty string.. check again

    – stack0114106
    Nov 27 '18 at 8:40











  • looks like you posted one more question and marked it as duplicate..

    – stack0114106
    Nov 27 '18 at 8:52











  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

    – stack0114104
    Nov 27 '18 at 9:29













  • good..glad that it worked

    – stack0114106
    Nov 27 '18 at 9:32



















  • I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

    – stack0114104
    Nov 27 '18 at 7:41











  • seems dup_cols_qry is empty string.. check again

    – stack0114106
    Nov 27 '18 at 8:40











  • looks like you posted one more question and marked it as duplicate..

    – stack0114106
    Nov 27 '18 at 8:52











  • yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

    – stack0114104
    Nov 27 '18 at 9:29













  • good..glad that it worked

    – stack0114106
    Nov 27 '18 at 9:32

















I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

– stack0114104
Nov 27 '18 at 7:41





I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)

– stack0114104
Nov 27 '18 at 7:41













seems dup_cols_qry is empty string.. check again

– stack0114106
Nov 27 '18 at 8:40





seems dup_cols_qry is empty string.. check again

– stack0114106
Nov 27 '18 at 8:40













looks like you posted one more question and marked it as duplicate..

– stack0114106
Nov 27 '18 at 8:52





looks like you posted one more question and marked it as duplicate..

– stack0114106
Nov 27 '18 at 8:52













yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

– stack0114104
Nov 27 '18 at 9:29







yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"

– stack0114104
Nov 27 '18 at 9:29















good..glad that it worked

– stack0114106
Nov 27 '18 at 9:32





good..glad that it worked

– stack0114106
Nov 27 '18 at 9:32













0














You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer


























  • the input contain N no of columns .. Its dynamic value

    – stack0114104
    Nov 22 '18 at 5:56
















0














You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer


























  • the input contain N no of columns .. Its dynamic value

    – stack0114104
    Nov 22 '18 at 5:56














0












0








0







You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()





share|improve this answer















You need to give comma separated col names.



col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)


findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 22 '18 at 6:33

























answered Nov 22 '18 at 5:39









Taha NaqviTaha Naqvi

1,134720




1,134720













  • the input contain N no of columns .. Its dynamic value

    – stack0114104
    Nov 22 '18 at 5:56



















  • the input contain N no of columns .. Its dynamic value

    – stack0114104
    Nov 22 '18 at 5:56

















the input contain N no of columns .. Its dynamic value

– stack0114104
Nov 22 '18 at 5:56





the input contain N no of columns .. Its dynamic value

– stack0114104
Nov 22 '18 at 5:56


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424011%2fhow-to-split-dataset-to-two-datasets-with-unique-and-duplicate-rows-each%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

How to change which sound is reproduced for terminal bell?

Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents

Can I use Tabulator js library in my java Spring + Thymeleaf project?