Data queried from Cassandra cannot be filtered on same column again (InvalidQueryException)
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}
I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:
WARN 2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ? ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
This is the piece of code I am trying to execute:
case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)
val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart
val dataFrame = sparkSession.sql(queryTimeRange)
import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]
dataSet.show(1)
dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)
val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
dtRangeData.show(1)
Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation
scala apache-spark cassandra datastax
add a comment |
I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:
WARN 2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ? ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
This is the piece of code I am trying to execute:
case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)
val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart
val dataFrame = sparkSession.sql(queryTimeRange)
import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]
dataSet.show(1)
dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)
val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
dtRangeData.show(1)
Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation
scala apache-spark cassandra datastax
1
It seems like spark mergessparkSession.sql(queryTimeRange)
anddataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command. Could you persist the dataframe (dataFrame.persist
) and force the evaluation (something likedataFrame.rdd.count
) between these commands?
– Aleksey Isachenkov
Nov 22 '18 at 13:51
@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?
– Yasir
Nov 22 '18 at 15:08
Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to executedataFrame.unpersist
when you complete using it.
– Aleksey Isachenkov
Nov 22 '18 at 15:26
I'll write an answer then.
– Aleksey Isachenkov
Nov 22 '18 at 15:27
good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!
– Yasir
Nov 22 '18 at 15:29
add a comment |
I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:
WARN 2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ? ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
This is the piece of code I am trying to execute:
case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)
val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart
val dataFrame = sparkSession.sql(queryTimeRange)
import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]
dataSet.show(1)
dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)
val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
dtRangeData.show(1)
Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation
scala apache-spark cassandra datastax
I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:
WARN 2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ? ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
This is the piece of code I am trying to execute:
case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)
val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart
val dataFrame = sparkSession.sql(queryTimeRange)
import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]
dataSet.show(1)
dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)
val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
dtRangeData.show(1)
Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation
scala apache-spark cassandra datastax
scala apache-spark cassandra datastax
edited Nov 22 '18 at 15:50
Yasir
asked Nov 22 '18 at 13:25
YasirYasir
83
83
1
It seems like spark mergessparkSession.sql(queryTimeRange)
anddataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command. Could you persist the dataframe (dataFrame.persist
) and force the evaluation (something likedataFrame.rdd.count
) between these commands?
– Aleksey Isachenkov
Nov 22 '18 at 13:51
@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?
– Yasir
Nov 22 '18 at 15:08
Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to executedataFrame.unpersist
when you complete using it.
– Aleksey Isachenkov
Nov 22 '18 at 15:26
I'll write an answer then.
– Aleksey Isachenkov
Nov 22 '18 at 15:27
good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!
– Yasir
Nov 22 '18 at 15:29
add a comment |
1
It seems like spark mergessparkSession.sql(queryTimeRange)
anddataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command. Could you persist the dataframe (dataFrame.persist
) and force the evaluation (something likedataFrame.rdd.count
) between these commands?
– Aleksey Isachenkov
Nov 22 '18 at 13:51
@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?
– Yasir
Nov 22 '18 at 15:08
Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to executedataFrame.unpersist
when you complete using it.
– Aleksey Isachenkov
Nov 22 '18 at 15:26
I'll write an answer then.
– Aleksey Isachenkov
Nov 22 '18 at 15:27
good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!
– Yasir
Nov 22 '18 at 15:29
1
1
It seems like spark merges
sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command. Could you persist the dataframe (dataFrame.persist
) and force the evaluation (something like dataFrame.rdd.count
) between these commands?– Aleksey Isachenkov
Nov 22 '18 at 13:51
It seems like spark merges
sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command. Could you persist the dataframe (dataFrame.persist
) and force the evaluation (something like dataFrame.rdd.count
) between these commands?– Aleksey Isachenkov
Nov 22 '18 at 13:51
@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?
– Yasir
Nov 22 '18 at 15:08
@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?
– Yasir
Nov 22 '18 at 15:08
Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute
dataFrame.unpersist
when you complete using it.– Aleksey Isachenkov
Nov 22 '18 at 15:26
Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute
dataFrame.unpersist
when you complete using it.– Aleksey Isachenkov
Nov 22 '18 at 15:26
I'll write an answer then.
– Aleksey Isachenkov
Nov 22 '18 at 15:27
I'll write an answer then.
– Aleksey Isachenkov
Nov 22 '18 at 15:27
good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!
– Yasir
Nov 22 '18 at 15:29
good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!
– Yasir
Nov 22 '18 at 15:29
add a comment |
1 Answer
1
active
oldest
votes
Spark merges sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command which in cql looks like:
SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?
And there you get two same restrictions on the same field "event_time" >= ?
.
If you persist dataFrame
before the execution of .filter
Spark will compute dataFrame
separately from .filter
:
val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53432008%2fdata-queried-from-cassandra-cannot-be-filtered-on-same-column-again-invalidquer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Spark merges sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command which in cql looks like:
SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?
And there you get two same restrictions on the same field "event_time" >= ?
.
If you persist dataFrame
before the execution of .filter
Spark will compute dataFrame
separately from .filter
:
val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
add a comment |
Spark merges sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command which in cql looks like:
SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?
And there you get two same restrictions on the same field "event_time" >= ?
.
If you persist dataFrame
before the execution of .filter
Spark will compute dataFrame
separately from .filter
:
val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
add a comment |
Spark merges sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command which in cql looks like:
SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?
And there you get two same restrictions on the same field "event_time" >= ?
.
If you persist dataFrame
before the execution of .filter
Spark will compute dataFrame
separately from .filter
:
val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
Spark merges sparkSession.sql(queryTimeRange)
and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command which in cql looks like:
SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?
And there you get two same restrictions on the same field "event_time" >= ?
.
If you persist dataFrame
before the execution of .filter
Spark will compute dataFrame
separately from .filter
:
val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
edited Nov 22 '18 at 18:36
answered Nov 22 '18 at 15:32
Aleksey IsachenkovAleksey Isachenkov
698215
698215
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53432008%2fdata-queried-from-cassandra-cannot-be-filtered-on-same-column-again-invalidquer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
It seems like spark merges
sparkSession.sql(queryTimeRange)
anddataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))
into one command. Could you persist the dataframe (dataFrame.persist
) and force the evaluation (something likedataFrame.rdd.count
) between these commands?– Aleksey Isachenkov
Nov 22 '18 at 13:51
@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?
– Yasir
Nov 22 '18 at 15:08
Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute
dataFrame.unpersist
when you complete using it.– Aleksey Isachenkov
Nov 22 '18 at 15:26
I'll write an answer then.
– Aleksey Isachenkov
Nov 22 '18 at 15:27
good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!
– Yasir
Nov 22 '18 at 15:29