Databricks Structured Streaming illegal state exception












1















I'm trying out Structured Streaming in Azure Databricks using Databricks storage dbfs:/ as checkpoint and file storage locations. I'm streaming event data from an Azure Event Hub to parquet files on dbfs.



When I cancel the query and restart I get the dreaded illegal state exception.




Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2 doesn't exist when compacting batch 9 (compactInterval: 10)




I'm trying to figure out the relationship between the Azure Event hubs conf startingPosition option, the checkpointing offset and commit logs and the storage location _spark_metadata information.



Why do I get an illegal state on restart and how are the different logs and metadata information used by Spark?



There are 10 offset log files named 0 through 9. 9 commit log files named 0 through 8. Obviously there ought to be another commit log file if the last batch had been processed completely.



The commmit log files seem to contain no pertinent information why I guess it's their pure existence (name) that is important.



commit log file:
v1
{"nextBatchWatermarkMs":0}


The offset logs only contain information about the offsets per partition.



{
"batchWatermarkMs": 0,
"batchTimestampMs": 1542634173589,
"conf": {
"spark.sql.shuffle.partitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"spark.sql.streaming.multipleWatermarkPolicy": "min",
"spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion": "2"
}
}
{
"poc-test01": {
"1": 3666,
"0": 3664
}
}


If I understood correctly Structured Streaming has a cycle where it first writes a Write Ahead Log in the offset/ folder. Then process the batch data. Then write a Commit Log in the commit/ folder. Each batch triggers a new cycle.



The _spark_metadata/ folder only contain two log files, 0 and 1, each file contains information about two parquet files in the filepath/ folder.



{
"path": "dbfs:/test02/part-00000-98c5def2-bdf3-4e56-af9f-ff0796344b01-c000.snappy.parquet",
"size": 59180,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}
{
"path": "dbfs:/test02/part-00001-a38dbf8b-240a-48d0-8438-f81f0506a79c-c000.snappy.parquet",
"size": 59462,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}


There are 10 parquet files in the filepath/ folder, the event hub contain two partitions.



My best guess now is that the _spark_metadata/ folder is used for idempotence in the file sink. Spark ought to be able to track the processed batches from the source offset positions to the created files in the filepath/ folder wherein the _spark_metadata/ folder is located. Somehow this broke.




  • Is the combination of checkpointing and file store metadata handling documented somewhere? It seems like a huge blackbox that seem to work so so.

  • Is this a structured streaming bug or a file sink checkpoint bug?

  • How can I go back to the last working state?










share|improve this question























  • So I tried another query shutting it down gracefully. Now the query restarts as it should but if I try to read the folder with the data I get an illegal state exception yet again, now the first two _spark_metadata/ files are missing, 0 and 1. I guess this means dbfs:/ on Azure is the culprit and isn't stable for production use.

    – Molotch
    Nov 21 '18 at 13:17











  • Switched over to Azure Data Lake Gen1 and everything seems to work so far. I'm leaning to this is a problem with using Azure Storage.

    – Molotch
    Nov 21 '18 at 15:10
















1















I'm trying out Structured Streaming in Azure Databricks using Databricks storage dbfs:/ as checkpoint and file storage locations. I'm streaming event data from an Azure Event Hub to parquet files on dbfs.



When I cancel the query and restart I get the dreaded illegal state exception.




Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2 doesn't exist when compacting batch 9 (compactInterval: 10)




I'm trying to figure out the relationship between the Azure Event hubs conf startingPosition option, the checkpointing offset and commit logs and the storage location _spark_metadata information.



Why do I get an illegal state on restart and how are the different logs and metadata information used by Spark?



There are 10 offset log files named 0 through 9. 9 commit log files named 0 through 8. Obviously there ought to be another commit log file if the last batch had been processed completely.



The commmit log files seem to contain no pertinent information why I guess it's their pure existence (name) that is important.



commit log file:
v1
{"nextBatchWatermarkMs":0}


The offset logs only contain information about the offsets per partition.



{
"batchWatermarkMs": 0,
"batchTimestampMs": 1542634173589,
"conf": {
"spark.sql.shuffle.partitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"spark.sql.streaming.multipleWatermarkPolicy": "min",
"spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion": "2"
}
}
{
"poc-test01": {
"1": 3666,
"0": 3664
}
}


If I understood correctly Structured Streaming has a cycle where it first writes a Write Ahead Log in the offset/ folder. Then process the batch data. Then write a Commit Log in the commit/ folder. Each batch triggers a new cycle.



The _spark_metadata/ folder only contain two log files, 0 and 1, each file contains information about two parquet files in the filepath/ folder.



{
"path": "dbfs:/test02/part-00000-98c5def2-bdf3-4e56-af9f-ff0796344b01-c000.snappy.parquet",
"size": 59180,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}
{
"path": "dbfs:/test02/part-00001-a38dbf8b-240a-48d0-8438-f81f0506a79c-c000.snappy.parquet",
"size": 59462,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}


There are 10 parquet files in the filepath/ folder, the event hub contain two partitions.



My best guess now is that the _spark_metadata/ folder is used for idempotence in the file sink. Spark ought to be able to track the processed batches from the source offset positions to the created files in the filepath/ folder wherein the _spark_metadata/ folder is located. Somehow this broke.




  • Is the combination of checkpointing and file store metadata handling documented somewhere? It seems like a huge blackbox that seem to work so so.

  • Is this a structured streaming bug or a file sink checkpoint bug?

  • How can I go back to the last working state?










share|improve this question























  • So I tried another query shutting it down gracefully. Now the query restarts as it should but if I try to read the folder with the data I get an illegal state exception yet again, now the first two _spark_metadata/ files are missing, 0 and 1. I guess this means dbfs:/ on Azure is the culprit and isn't stable for production use.

    – Molotch
    Nov 21 '18 at 13:17











  • Switched over to Azure Data Lake Gen1 and everything seems to work so far. I'm leaning to this is a problem with using Azure Storage.

    – Molotch
    Nov 21 '18 at 15:10














1












1








1








I'm trying out Structured Streaming in Azure Databricks using Databricks storage dbfs:/ as checkpoint and file storage locations. I'm streaming event data from an Azure Event Hub to parquet files on dbfs.



When I cancel the query and restart I get the dreaded illegal state exception.




Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2 doesn't exist when compacting batch 9 (compactInterval: 10)




I'm trying to figure out the relationship between the Azure Event hubs conf startingPosition option, the checkpointing offset and commit logs and the storage location _spark_metadata information.



Why do I get an illegal state on restart and how are the different logs and metadata information used by Spark?



There are 10 offset log files named 0 through 9. 9 commit log files named 0 through 8. Obviously there ought to be another commit log file if the last batch had been processed completely.



The commmit log files seem to contain no pertinent information why I guess it's their pure existence (name) that is important.



commit log file:
v1
{"nextBatchWatermarkMs":0}


The offset logs only contain information about the offsets per partition.



{
"batchWatermarkMs": 0,
"batchTimestampMs": 1542634173589,
"conf": {
"spark.sql.shuffle.partitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"spark.sql.streaming.multipleWatermarkPolicy": "min",
"spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion": "2"
}
}
{
"poc-test01": {
"1": 3666,
"0": 3664
}
}


If I understood correctly Structured Streaming has a cycle where it first writes a Write Ahead Log in the offset/ folder. Then process the batch data. Then write a Commit Log in the commit/ folder. Each batch triggers a new cycle.



The _spark_metadata/ folder only contain two log files, 0 and 1, each file contains information about two parquet files in the filepath/ folder.



{
"path": "dbfs:/test02/part-00000-98c5def2-bdf3-4e56-af9f-ff0796344b01-c000.snappy.parquet",
"size": 59180,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}
{
"path": "dbfs:/test02/part-00001-a38dbf8b-240a-48d0-8438-f81f0506a79c-c000.snappy.parquet",
"size": 59462,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}


There are 10 parquet files in the filepath/ folder, the event hub contain two partitions.



My best guess now is that the _spark_metadata/ folder is used for idempotence in the file sink. Spark ought to be able to track the processed batches from the source offset positions to the created files in the filepath/ folder wherein the _spark_metadata/ folder is located. Somehow this broke.




  • Is the combination of checkpointing and file store metadata handling documented somewhere? It seems like a huge blackbox that seem to work so so.

  • Is this a structured streaming bug or a file sink checkpoint bug?

  • How can I go back to the last working state?










share|improve this question














I'm trying out Structured Streaming in Azure Databricks using Databricks storage dbfs:/ as checkpoint and file storage locations. I'm streaming event data from an Azure Event Hub to parquet files on dbfs.



When I cancel the query and restart I get the dreaded illegal state exception.




Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2 doesn't exist when compacting batch 9 (compactInterval: 10)




I'm trying to figure out the relationship between the Azure Event hubs conf startingPosition option, the checkpointing offset and commit logs and the storage location _spark_metadata information.



Why do I get an illegal state on restart and how are the different logs and metadata information used by Spark?



There are 10 offset log files named 0 through 9. 9 commit log files named 0 through 8. Obviously there ought to be another commit log file if the last batch had been processed completely.



The commmit log files seem to contain no pertinent information why I guess it's their pure existence (name) that is important.



commit log file:
v1
{"nextBatchWatermarkMs":0}


The offset logs only contain information about the offsets per partition.



{
"batchWatermarkMs": 0,
"batchTimestampMs": 1542634173589,
"conf": {
"spark.sql.shuffle.partitions": "200",
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"spark.sql.streaming.multipleWatermarkPolicy": "min",
"spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion": "2"
}
}
{
"poc-test01": {
"1": 3666,
"0": 3664
}
}


If I understood correctly Structured Streaming has a cycle where it first writes a Write Ahead Log in the offset/ folder. Then process the batch data. Then write a Commit Log in the commit/ folder. Each batch triggers a new cycle.



The _spark_metadata/ folder only contain two log files, 0 and 1, each file contains information about two parquet files in the filepath/ folder.



{
"path": "dbfs:/test02/part-00000-98c5def2-bdf3-4e56-af9f-ff0796344b01-c000.snappy.parquet",
"size": 59180,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}
{
"path": "dbfs:/test02/part-00001-a38dbf8b-240a-48d0-8438-f81f0506a79c-c000.snappy.parquet",
"size": 59462,
"isDir": false,
"modificationTime": 1542214122000,
"blockReplication": 1,
"blockSize": 536870912,
"action": "add"
}


There are 10 parquet files in the filepath/ folder, the event hub contain two partitions.



My best guess now is that the _spark_metadata/ folder is used for idempotence in the file sink. Spark ought to be able to track the processed batches from the source offset positions to the created files in the filepath/ folder wherein the _spark_metadata/ folder is located. Somehow this broke.




  • Is the combination of checkpointing and file store metadata handling documented somewhere? It seems like a huge blackbox that seem to work so so.

  • Is this a structured streaming bug or a file sink checkpoint bug?

  • How can I go back to the last working state?







azure apache-spark spark-streaming databricks






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 20 '18 at 16:28









MolotchMolotch

91212




91212













  • So I tried another query shutting it down gracefully. Now the query restarts as it should but if I try to read the folder with the data I get an illegal state exception yet again, now the first two _spark_metadata/ files are missing, 0 and 1. I guess this means dbfs:/ on Azure is the culprit and isn't stable for production use.

    – Molotch
    Nov 21 '18 at 13:17











  • Switched over to Azure Data Lake Gen1 and everything seems to work so far. I'm leaning to this is a problem with using Azure Storage.

    – Molotch
    Nov 21 '18 at 15:10



















  • So I tried another query shutting it down gracefully. Now the query restarts as it should but if I try to read the folder with the data I get an illegal state exception yet again, now the first two _spark_metadata/ files are missing, 0 and 1. I guess this means dbfs:/ on Azure is the culprit and isn't stable for production use.

    – Molotch
    Nov 21 '18 at 13:17











  • Switched over to Azure Data Lake Gen1 and everything seems to work so far. I'm leaning to this is a problem with using Azure Storage.

    – Molotch
    Nov 21 '18 at 15:10

















So I tried another query shutting it down gracefully. Now the query restarts as it should but if I try to read the folder with the data I get an illegal state exception yet again, now the first two _spark_metadata/ files are missing, 0 and 1. I guess this means dbfs:/ on Azure is the culprit and isn't stable for production use.

– Molotch
Nov 21 '18 at 13:17





So I tried another query shutting it down gracefully. Now the query restarts as it should but if I try to read the folder with the data I get an illegal state exception yet again, now the first two _spark_metadata/ files are missing, 0 and 1. I guess this means dbfs:/ on Azure is the culprit and isn't stable for production use.

– Molotch
Nov 21 '18 at 13:17













Switched over to Azure Data Lake Gen1 and everything seems to work so far. I'm leaning to this is a problem with using Azure Storage.

– Molotch
Nov 21 '18 at 15:10





Switched over to Azure Data Lake Gen1 and everything seems to work so far. I'm leaning to this is a problem with using Azure Storage.

– Molotch
Nov 21 '18 at 15:10












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53397380%2fdatabricks-structured-streaming-illegal-state-exception%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53397380%2fdatabricks-structured-streaming-illegal-state-exception%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

How to change which sound is reproduced for terminal bell?

Can I use Tabulator js library in my java Spring + Thymeleaf project?

Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents