Different behaviour of cache method for PySpark dataframes in Spark 2.3












1















After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.



Example:



from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window

sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records

# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records

inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")


This is a minimal example to produce my problem.



And now, in Spark 2.1 inc_tbl has been just saved to the inc_tbl with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl table already has records from the current date so just inserting records that were there before processing.










share|improve this question




















  • 2





    Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call saveAsTable, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?

    – user10465355
    Nov 21 '18 at 21:24






  • 1





    Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.

    – max04
    Nov 21 '18 at 21:45











  • In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?

    – max04
    Nov 21 '18 at 22:03













  • These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.

    – user10465355
    Nov 21 '18 at 23:22






  • 1





    What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.

    – max04
    Nov 22 '18 at 21:08
















1















After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.



Example:



from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window

sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records

# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records

inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")


This is a minimal example to produce my problem.



And now, in Spark 2.1 inc_tbl has been just saved to the inc_tbl with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl table already has records from the current date so just inserting records that were there before processing.










share|improve this question




















  • 2





    Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call saveAsTable, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?

    – user10465355
    Nov 21 '18 at 21:24






  • 1





    Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.

    – max04
    Nov 21 '18 at 21:45











  • In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?

    – max04
    Nov 21 '18 at 22:03













  • These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.

    – user10465355
    Nov 21 '18 at 23:22






  • 1





    What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.

    – max04
    Nov 22 '18 at 21:08














1












1








1








After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.



Example:



from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window

sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records

# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records

inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")


This is a minimal example to produce my problem.



And now, in Spark 2.1 inc_tbl has been just saved to the inc_tbl with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl table already has records from the current date so just inserting records that were there before processing.










share|improve this question
















After Spark upgrade from 2.1 to 2.3 I have issues with cached PySpark dataframes. In Spark 2.1 cache() method worked for me as deep copy even though it shouldn't worked like that based on the documentation.



Example:



from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql import functions as spark_func
from pyspark.sql import Window

sparkSession = (SparkSession
.builder
.appName('process_name')
.enableHiveSupport()
.getOrCreate())
src_tbl = sparkSession.sql("SELECT * FROM src_tbl")
dst_tbl = sparkSession.sql("SELECT * FROM snpsht_tbl")
delta = src_tbl.subtract(dst_tbl) # find the difference

# find new records based on delta
new_records = delta.join(dst_tbl, how='left_anti', on=join_field).cache()
# create snpsht df
snpsht_tbl = dst_tbl.union(new_records)
# create incremental df
snpsht_tbl_tmp = snpsht_tbl.withColumn("row_nbr", spark_func.row_number().
over(Window.partitionBy(join_field).
orderBy(spark_func.desc("last_modified_date"))))
inc_tbl = snpsht_tbl_tmp.filter("row_nbr = 1").drop("row_nbr")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 100 records

# save final tables to DB
snpsht_tbl_name = 'snpsht'
snpsht_tbl.write.mode("overwrite").saveAsTable(snpsht_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE snpsht_tbl +
SELECT * FROM snpsht_table_name_tmp""")

inc_tbl.filter(spark_func.col("last_modified_date").isin(dt_today)).count() # 0 records

inc_tbl_name = 'inc'
inc_tbl.write.mode("overwrite").saveAsTable(inc_table_name_tmp)
sparkSession.sql("""INSERT OVERWRITE TABLE inc_tbl +
SELECT * FROM inc_table_name_tmp""")


This is a minimal example to produce my problem.



And now, in Spark 2.1 inc_tbl has been just saved to the inc_tbl with all new records (from the current day) with the data that was there in the moment of cache method usage and this is what I want to have. In Spark 2.3 there is something that calculates all transformations from the beginning again so checking that snpsht_tbl table already has records from the current date so just inserting records that were there before processing.







apache-spark dataframe pyspark apache-spark-2.3






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 22 '18 at 10:12







max04

















asked Nov 21 '18 at 20:52









max04max04

2501313




2501313








  • 2





    Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call saveAsTable, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?

    – user10465355
    Nov 21 '18 at 21:24






  • 1





    Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.

    – max04
    Nov 21 '18 at 21:45











  • In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?

    – max04
    Nov 21 '18 at 22:03













  • These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.

    – user10465355
    Nov 21 '18 at 23:22






  • 1





    What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.

    – max04
    Nov 22 '18 at 21:08














  • 2





    Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call saveAsTable, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?

    – user10465355
    Nov 21 '18 at 21:24






  • 1





    Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.

    – max04
    Nov 21 '18 at 21:45











  • In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?

    – max04
    Nov 21 '18 at 22:03













  • These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.

    – user10465355
    Nov 21 '18 at 23:22






  • 1





    What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.

    – max04
    Nov 22 '18 at 21:08








2




2





Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call saveAsTable, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?

– user10465355
Nov 21 '18 at 21:24





Technically speaking cache doesn't work as copy at all (deep or shallow) and doesn't provide any strict guarantees. Additionally cache is lazy, so in this snippet data could be cached only when you call saveAsTable, so there is not enough information here to diagnose the problem. Could you please edit the question and include a Minimal, Complete, and Verifiable example?

– user10465355
Nov 21 '18 at 21:24




1




1





Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.

– max04
Nov 21 '18 at 21:45





Thanks! I will edit my question tomorrow and add some details. For now, based on your comment, I do not understand then why it worked as deep copy in Spark 2.1. Tomorrow I will give you detailed example of it.

– max04
Nov 21 '18 at 21:45













In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?

– max04
Nov 21 '18 at 22:03







In meantime, @user10465355 if cache() doesn't work as copy at all do you know what method does?

– max04
Nov 21 '18 at 22:03















These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.

– user10465355
Nov 21 '18 at 23:22





These are pretty good explanations. In general it is an optimization tool, which reduces probability of repeated evaluation of a certain lineage. However caching might be partial, not happen at all, or be evicted. As far as I know caching mechanism haven't change between 2.1 and 2.3, which means you probably see one of these.

– user10465355
Nov 21 '18 at 23:22




1




1





What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.

– max04
Nov 22 '18 at 21:08





What I noticed after deep investigation is that probably in version 2.1 there was some bug in case of cache method usage that has been fixed now. I have changed my code to make it working.

– max04
Nov 22 '18 at 21:08












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53420296%2fdifferent-behaviour-of-cache-method-for-pyspark-dataframes-in-spark-2-3%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53420296%2fdifferent-behaviour-of-cache-method-for-pyspark-dataframes-in-spark-2-3%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

How to change which sound is reproduced for terminal bell?

Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents

Can I use Tabulator js library in my java Spring + Thymeleaf project?