How to split dataset to two datasets with unique and duplicate rows each?
I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.
the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.
My Code:
var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')
val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()
Input Dataframe: (findDuplicateRecordsDF.show())
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.
The Output Dataframe should be
Duplicate Records Output
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
--------------------------------------------------------
Unique Records Dataframe Output:
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
Thanks in advance.
scala apache-spark apache-spark-sql
add a comment |
I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.
the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.
My Code:
var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')
val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()
Input Dataframe: (findDuplicateRecordsDF.show())
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.
The Output Dataframe should be
Duplicate Records Output
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
--------------------------------------------------------
Unique Records Dataframe Output:
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
Thanks in advance.
scala apache-spark apache-spark-sql
You can specify comma separated list of columns inpartitionBy()
.
– vindev
Nov 22 '18 at 5:40
add a comment |
I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.
the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.
My Code:
var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')
val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()
Input Dataframe: (findDuplicateRecordsDF.show())
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.
The Output Dataframe should be
Duplicate Records Output
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
--------------------------------------------------------
Unique Records Dataframe Output:
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
Thanks in advance.
scala apache-spark apache-spark-sql
I want to take duplicate records in a Spark scala Dataframe. for example, I want to take duplicate values based on 3 columns like "id", "name", "age".condition part contains any no of columns(Dynamic Input). based on the column value I want to take the duplicate records.
the below code I have tried. only one attribute I tried. I don't know how to do if more than one column.
My Code:
var s= "age|id|name " // Note- This is dynamic input. so it will increase or decrease
var columnNames= s.replace('|', ',')
val findDuplicateRecordsDF= spark.sql("SELECT * FROM " + dbname + "." + tablename)
findDuplicateRecordsDF.show()
findDuplicateRecordsDF.withColumn("count", count("*")
.over(Window.partitionBy($"id"))) // here how to add more than one column?(Dynamic input)
.where($"count">1)
.show()
Input Dataframe: (findDuplicateRecordsDF.show())
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
here I am going to take duplicate records based on 4 columns(id, name, phone, email). the above one is example data frame. original data frame contains any no of columns.
The Output Dataframe should be
Duplicate Records Output
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 3 | sam | 23 | 9876543210 | sam@yahoo.com |
| 3 | sam | 28 | 9876543210 | sam@yahoo.com |
| 6 | haris | 30 | 6543210777 | haris@gmail.com |
| 6 | haris | 24 | 6543210777 | haris@gmail.com |
--------------------------------------------------------
Unique Records Dataframe Output:
--------------------------------------------------------
| id | name | age | phone | email_id |
|-------------------------------------------------------|
| 7 | ram | 27 | 8765432190 | ram@gmail.com |
| 9 | ram | 27 | 8765432130 | ram94@gmail.com |
| 4 | karthi| 26 | 4321066666 | karthi@gmail.com |
--------------------------------------------------------
Thanks in advance.
scala apache-spark apache-spark-sql
scala apache-spark apache-spark-sql
edited Nov 25 '18 at 19:42
Jacek Laskowski
46.2k18138278
46.2k18138278
asked Nov 22 '18 at 4:42
stack0114104stack0114104
326
326
You can specify comma separated list of columns inpartitionBy()
.
– vindev
Nov 22 '18 at 5:40
add a comment |
You can specify comma separated list of columns inpartitionBy()
.
– vindev
Nov 22 '18 at 5:40
You can specify comma separated list of columns in
partitionBy()
.– vindev
Nov 22 '18 at 5:40
You can specify comma separated list of columns in
partitionBy()
.– vindev
Nov 22 '18 at 5:40
add a comment |
2 Answers
2
active
oldest
votes
You can use window functions. Check this out
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+
scala> df2.createOrReplaceTempView("contact2")
//Duplicates
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+
// Unique
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+
EDIT2:
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show
Results:
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+
EDIT3: - Null condition check
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
Results:
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+
+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+
blank check
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )
Filter only when all 3 columns are either blank or null
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
|
show 6 more comments
You need to give comma separated col names.
col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)
findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424011%2fhow-to-split-dataset-to-two-datasets-with-unique-and-duplicate-rows-each%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
You can use window functions. Check this out
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+
scala> df2.createOrReplaceTempView("contact2")
//Duplicates
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+
// Unique
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+
EDIT2:
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show
Results:
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+
EDIT3: - Null condition check
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
Results:
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+
+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+
blank check
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )
Filter only when all 3 columns are either blank or null
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
|
show 6 more comments
You can use window functions. Check this out
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+
scala> df2.createOrReplaceTempView("contact2")
//Duplicates
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+
// Unique
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+
EDIT2:
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show
Results:
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+
EDIT3: - Null condition check
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
Results:
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+
+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+
blank check
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )
Filter only when all 3 columns are either blank or null
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
|
show 6 more comments
You can use window functions. Check this out
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+
scala> df2.createOrReplaceTempView("contact2")
//Duplicates
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+
// Unique
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+
EDIT2:
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show
Results:
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+
EDIT3: - Null condition check
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
Results:
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+
+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+
blank check
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )
Filter only when all 3 columns are either blank or null
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
You can use window functions. Check this out
scala> val df = Seq((3,"sam",23,"9876543210","sam@yahoo.com"),(7,"ram",27,"8765432190","ram@gmail.com"),(3,"sam",28,"9876543210","sam@yahoo.com"),(6,"haris",30,"6543210777","haris@gmail.com"),(9,"ram",27,"8765432130","ram94@gmail.com"),(6,"haris",24,"6543210777","haris@gmail.com"),(4,"karthi",26,"4321066666","karthi@gmail.com")).toDF("id","name","age","phone","email_id")
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]
scala> val dup_cols = List("id","name","phone","email_id");
dup_cols: List[String] = List(id, name, phone, email_id)
scala> df.createOrReplaceTempView("contact")
scala> val dup_cols_qry = dup_cols.mkString(" count(*) over(partition by ", "," , " ) as cnt ")
dup_cols_qry: String = " count(*) over(partition by id,name,phone,email_id ) as cnt "
scala> val df2 = spark.sql("select *,"+ dup_cols_qry + " from contact ")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 4 more fields]
scala> df2.show(false)
+---+------+---+----------+----------------+---+
|id |name |age|phone |email_id |cnt|
+---+------+---+----------+----------------+---+
|4 |karthi|26 |4321066666|karthi@gmail.com|1 |
|7 |ram |27 |8765432190|ram@gmail.com |1 |
|9 |ram |27 |8765432130|ram94@gmail.com |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |2 |
|6 |haris |30 |6543210777|haris@gmail.com |2 |
|6 |haris |24 |6543210777|haris@gmail.com |2 |
+---+------+---+----------+----------------+---+
scala> df2.createOrReplaceTempView("contact2")
//Duplicates
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 2").show
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
+---+-----+----------+---------------+
// Unique
scala> spark.sql("select " + dup_cols.mkString(",") + " from contact2 where cnt = 1").show
+---+------+----------+----------------+
| id| name| phone| email_id|
+---+------+----------+----------------+
| 4|karthi|4321066666|karthi@gmail.com|
| 7| ram|8765432190| ram@gmail.com|
| 9| ram|8765432130| ram94@gmail.com|
+---+------+----------+----------------+
EDIT2:
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",30,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val dup_cols = List("name","phone","email_id")
val dup_cols_str = dup_cols.mkString(",")
df.createOrReplaceTempView("contact")
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
spark.sql("select id, " + dup_cols_str + " from contact2 where cnt > 1 and rwn > 1").show
Results:
+---+-----+----------+---------------+
| id| name| phone| email_id|
+---+-----+----------+---------------+
| 6|haris|6543210777|haris@gmail.com|
| 6|haris|6543210777|haris@gmail.com|
| 3| sam|9876543210| sam@yahoo.com|
| 3| sam|9876543210| sam@yahoo.com|
| 9| ram|8765432190| ram@gmail.com|
+---+-----+----------+---------------+
EDIT3: - Null condition check
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,"haris",30,null,"haris@gmail.com"),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,"sam",28,"9876543210","sam@yahoo.com"),
(6,"haris",24,"6543210777","haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
Results:
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|
+---+------+---+----------+----------------+----------+
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |
|6 |haris |30 |6543210777|haris@gmail.com |3 |
|6 |haris |30 |null |haris@gmail.com |2 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
|6 |null |24 |6543210777|null |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |
+---+------+---+----------+----------------+----------+
|id |name |age|phone |email_id |null_count|cnt|rwn|
+---+------+---+----------+----------------+----------+---+---+
|6 |haris |30 |6543210777|haris@gmail.com |3 |3 |1 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |2 |
|6 |haris |24 |6543210777|haris@gmail.com |3 |3 |3 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |1 |
|3 |sam |23 |9876543210|sam@yahoo.com |3 |3 |2 |
|3 |sam |28 |9876543210|sam@yahoo.com |3 |3 |3 |
|7 |ram |27 |8765432190|ram@gmail.com |3 |2 |1 |
|9 |ram |27 |8765432190|ram@gmail.com |3 |2 |2 |
|4 |karthi|26 |4321066666|karthi@gmail.com|3 |1 |1 |
+---+------+---+----------+----------------+----------+---+---+
+-----+----------+---------------+---+---+
|name |phone |email_id |id |age|
+-----+----------+---------------+---+---+
|haris|6543210777|haris@gmail.com|6 |24 |
|haris|6543210777|haris@gmail.com|6 |24 |
|sam |9876543210|sam@yahoo.com |3 |23 |
|sam |9876543210|sam@yahoo.com |3 |28 |
|ram |8765432190|ram@gmail.com |9 |27 |
+-----+----------+---------------+---+---+
blank check
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x), """^s*$""","")=== lit(""),0).otherwise(1)).reduce( _ + _ )
Filter only when all 3 columns are either blank or null
val df = Seq(
(4,"karthi",26,"4321066666","karthi@gmail.com"),
(6,"haris",30,"6543210777","haris@gmail.com"),
(6,null,30,null,null),
(7,"ram",27,"8765432190","ram@gmail.com"),
(9,"",27,"",""),
(7,"ram",27,"8765432190","ram@gmail.com"),
(6,"haris",24,"6543210777","haris@gmail.com"),
(6,null,24,"6543210777",null),
(3,"sam",23,"9876543210","sam@yahoo.com"),
(3,null,23,"9876543210","sam@yahoo.com"),
(3,null,28,"9876543213",null),
(6,"haris",24,null,"haris@gmail.com")
).toDF("id","name","age","phone","email_id")
val all_cols = df.columns
val dup_cols = List("name","phone","email_id")
val rem_cols = all_cols.diff(dup_cols)
val dup_cols_str = dup_cols.mkString(",")
val rem_cols_str = rem_cols.mkString(",")
val dup_cols_length = dup_cols.length
//val df_null_col = dup_cols.map( x => when(col(x).isNull,0).otherwise(1)).reduce( _ + _ )
val df_null_col = dup_cols.map( x => when(col(x).isNull or regexp_replace(col(x),lit("""^s*$"""),lit("")) === lit(""),0).otherwise(1)).reduce( _ + _ )
val df_null = df.withColumn("null_count", df_null_col)
df_null.createOrReplaceTempView("contact")
df_null.show(false)
val dup_cols_count_qry = " count(*) over(partition by " + dup_cols_str + " ) as cnt "
val dup_cols_row_num_qry = " row_number() over(partition by " + dup_cols_str + " order by " + dup_cols_str + " ) as rwn "
//val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count = " + dup_cols_length )
val df2 = spark.sql("select *,"+ dup_cols_count_qry + "," + dup_cols_row_num_qry + " from contact " + " where null_count != 0 ")
df2.show(false)
df2.createOrReplaceTempView("contact2")
val df3 = spark.sql("select " + dup_cols_str + ", " + rem_cols_str + " from contact2 where cnt > 1 and rwn > 1")
df3.show(false)
edited Dec 14 '18 at 9:30
answered Nov 22 '18 at 6:53
stack0114106stack0114106
4,9632423
4,9632423
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
|
show 6 more comments
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
I am getting below exception at the time of spark-submit == SQL == select , count() over(partition by [condition: string] ) as cnt from contact -------------------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:114)
– stack0114104
Nov 27 '18 at 7:41
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
seems dup_cols_qry is empty string.. check again
– stack0114106
Nov 27 '18 at 8:40
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
looks like you posted one more question and marked it as duplicate..
– stack0114106
Nov 27 '18 at 8:52
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
yes, now it's working... Thank you stack0114106... No that is not a duplicate question...another question explanation is "I am taking each column or required column to count number of unique and duplicate records.for more information read that question"
– stack0114104
Nov 27 '18 at 9:29
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
good..glad that it worked
– stack0114106
Nov 27 '18 at 9:32
|
show 6 more comments
You need to give comma separated col names.
col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)
findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
add a comment |
You need to give comma separated col names.
col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)
findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
add a comment |
You need to give comma separated col names.
col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)
findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()
You need to give comma separated col names.
col1 ..col2 should be of string type.
val window= Window.partitionBy(col1,col2,..)
findDuplicateRecordsDF.withColumn("count", count("*")
.over(window)
.where($"count">1)
.show()
edited Nov 22 '18 at 6:33
answered Nov 22 '18 at 5:39
Taha NaqviTaha Naqvi
1,134720
1,134720
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
add a comment |
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
the input contain N no of columns .. Its dynamic value
– stack0114104
Nov 22 '18 at 5:56
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53424011%2fhow-to-split-dataset-to-two-datasets-with-unique-and-duplicate-rows-each%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
You can specify comma separated list of columns in
partitionBy()
.– vindev
Nov 22 '18 at 5:40