Different behaviour of cache method for PySpark dataframes in Spark 2.3
After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.
Example:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window
sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference
# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records
# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records
inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")
This is a minimal example to produce my problem.
And now, in Spark 2.1 inc_tbl
has been just saved to the inc_tbl
with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl
table already has records from the current date so just inserting records that were there before processing.
apache-spark dataframe pyspark apache-spark-2.3
add a comment |
After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.
Example:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window
sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference
# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records
# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records
inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")
This is a minimal example to produce my problem.
And now, in Spark 2.1 inc_tbl
has been just saved to the inc_tbl
with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl
table already has records from the current date so just inserting records that were there before processing.
apache-spark dataframe pyspark apache-spark-2.3
2
Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you callsaveAsTable
, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?
– user10465355
Nov 21 '18 at 21:24
1
Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.
– max04
Nov 21 '18 at 21:45
In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?
– max04
Nov 21 '18 at 22:03
These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.
– user10465355
Nov 21 '18 at 23:22
1
What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.
– max04
Nov 22 '18 at 21:08
add a comment |
After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.
Example:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window
sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference
# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records
# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records
inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")
This is a minimal example to produce my problem.
And now, in Spark 2.1 inc_tbl
has been just saved to the inc_tbl
with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl
table already has records from the current date so just inserting records that were there before processing.
apache-spark dataframe pyspark apache-spark-2.3
After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.
Example:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window
sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference
# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records
# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")
inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records
inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")
This is a minimal example to produce my problem.
And now, in Spark 2.1 inc_tbl
has been just saved to the inc_tbl
with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl
table already has records from the current date so just inserting records that were there before processing.
apache-spark dataframe pyspark apache-spark-2.3
apache-spark dataframe pyspark apache-spark-2.3
edited Nov 22 '18 at 10:12
max04
asked Nov 21 '18 at 20:52
max04max04
2501313
2501313
2
Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you callsaveAsTable
, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?
– user10465355
Nov 21 '18 at 21:24
1
Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.
– max04
Nov 21 '18 at 21:45
In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?
– max04
Nov 21 '18 at 22:03
These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.
– user10465355
Nov 21 '18 at 23:22
1
What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.
– max04
Nov 22 '18 at 21:08
add a comment |
2
Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you callsaveAsTable
, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?
– user10465355
Nov 21 '18 at 21:24
1
Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.
– max04
Nov 21 '18 at 21:45
In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?
– max04
Nov 21 '18 at 22:03
These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.
– user10465355
Nov 21 '18 at 23:22
1
What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.
– max04
Nov 22 '18 at 21:08
2
2
Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call
saveAsTable
, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?– user10465355
Nov 21 '18 at 21:24
Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call
saveAsTable
, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?– user10465355
Nov 21 '18 at 21:24
1
1
Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.
– max04
Nov 21 '18 at 21:45
Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.
– max04
Nov 21 '18 at 21:45
In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?
– max04
Nov 21 '18 at 22:03
In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?
– max04
Nov 21 '18 at 22:03
These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.
– user10465355
Nov 21 '18 at 23:22
These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.
– user10465355
Nov 21 '18 at 23:22
1
1
What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.
– max04
Nov 22 '18 at 21:08
What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.
– max04
Nov 22 '18 at 21:08
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53420296%2fdifferent-behaviour-of-cache-method-for-pyspark-dataframes-in-spark-2-3%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53420296%2fdifferent-behaviour-of-cache-method-for-pyspark-dataframes-in-spark-2-3%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2
Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call
saveAsTable
, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?– user10465355
Nov 21 '18 at 21:24
1
Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.
– max04
Nov 21 '18 at 21:45
In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?
– max04
Nov 21 '18 at 22:03
These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.
– user10465355
Nov 21 '18 at 23:22
1
What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.
– max04
Nov 22 '18 at 21:08