Data queried from Cassandra cannot be filtered on same column again (InvalidQueryException)





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty{ height:90px;width:728px;box-sizing:border-box;
}







1















I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:



WARN  2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?   ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)


This is the piece of code I am trying to execute:



case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)

val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart

val dataFrame = sparkSession.sql(queryTimeRange)

import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]

dataSet.show(1)



dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)

val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

dtRangeData.show(1)


Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation










share|improve this question




















  • 1





    It seems like spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command. Could you persist the dataframe (dataFrame.persist) and force the evaluation (something like dataFrame.rdd.count) between these commands?

    – Aleksey Isachenkov
    Nov 22 '18 at 13:51













  • @AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?

    – Yasir
    Nov 22 '18 at 15:08











  • Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute dataFrame.unpersist when you complete using it.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:26











  • I'll write an answer then.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:27











  • good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!

    – Yasir
    Nov 22 '18 at 15:29


















1















I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:



WARN  2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?   ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)


This is the piece of code I am trying to execute:



case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)

val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart

val dataFrame = sparkSession.sql(queryTimeRange)

import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]

dataSet.show(1)



dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)

val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

dtRangeData.show(1)


Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation










share|improve this question




















  • 1





    It seems like spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command. Could you persist the dataframe (dataFrame.persist) and force the evaluation (something like dataFrame.rdd.count) between these commands?

    – Aleksey Isachenkov
    Nov 22 '18 at 13:51













  • @AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?

    – Yasir
    Nov 22 '18 at 15:08











  • Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute dataFrame.unpersist when you complete using it.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:26











  • I'll write an answer then.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:27











  • good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!

    – Yasir
    Nov 22 '18 at 15:29














1












1








1








I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:



WARN  2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?   ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)


This is the piece of code I am trying to execute:



case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)

val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart

val dataFrame = sparkSession.sql(queryTimeRange)

import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]

dataSet.show(1)



dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)

val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

dtRangeData.show(1)


Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation










share|improve this question
















I am trying to query a large chunk of data by time from cassandra, and then use spark Datasets to get smaller chunks to process at a time, however, the application fails with an invalid query exception:



WARN  2018-11-22 13:16:54 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.212, executor 0): java.io.IOException: Exception during preparation of SELECT "userid", "event_time", "value" FROM "user_1234"."data" WHERE token("userid") > ? AND token("userid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?   ALLOW FILTERING: More than one restriction was found for the start bound on event_time
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:366)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: More than one restriction was found for the start bound on event_time
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:41)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:28)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:108)
at com.datastax.driver.dse.DefaultDseSession.prepare(DefaultDseSession.java:278)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)


This is the piece of code I am trying to execute:



case class RawDataModel(userid: String, event_time: Long, value: Double)
var dtRangeEnd = System.currentTimeMillis()
var dtRangeStart = (dtRangeEnd - (60 * 60 * 1000).toLong)

val queryTimeRange = "SELECT * FROM user1234.datafile WHERE event_time >= " + dtRangeStart

val dataFrame = sparkSession.sql(queryTimeRange)

import sparkSession.implicits._
val dataSet: Dataset[RawDataModel] = dataFrame.as[RawDataModel]

dataSet.show(1)



dtRangeEnd = System.currentTimeMillis()
dtRangeStart = (dtRangeEnd - (15 * 60 * 1000).toLong)

val dtRangeData = dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))

dtRangeData.show(1)


Note: This is not a DataSets problem since I have tried to swap them with DataFrames with no difference. I thought this was a lazy evaluation problem at first with two different bounds being lazily applied at the same time but the dataSet.show(1) command should call an early aggregation and avoid cascaded evaluation







scala apache-spark cassandra datastax






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 22 '18 at 15:50







Yasir

















asked Nov 22 '18 at 13:25









YasirYasir

83




83








  • 1





    It seems like spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command. Could you persist the dataframe (dataFrame.persist) and force the evaluation (something like dataFrame.rdd.count) between these commands?

    – Aleksey Isachenkov
    Nov 22 '18 at 13:51













  • @AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?

    – Yasir
    Nov 22 '18 at 15:08











  • Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute dataFrame.unpersist when you complete using it.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:26











  • I'll write an answer then.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:27











  • good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!

    – Yasir
    Nov 22 '18 at 15:29














  • 1





    It seems like spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command. Could you persist the dataframe (dataFrame.persist) and force the evaluation (something like dataFrame.rdd.count) between these commands?

    – Aleksey Isachenkov
    Nov 22 '18 at 13:51













  • @AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?

    – Yasir
    Nov 22 '18 at 15:08











  • Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute dataFrame.unpersist when you complete using it.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:26











  • I'll write an answer then.

    – Aleksey Isachenkov
    Nov 22 '18 at 15:27











  • good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!

    – Yasir
    Nov 22 '18 at 15:29








1




1





It seems like spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command. Could you persist the dataframe (dataFrame.persist) and force the evaluation (something like dataFrame.rdd.count) between these commands?

– Aleksey Isachenkov
Nov 22 '18 at 13:51







It seems like spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command. Could you persist the dataframe (dataFrame.persist) and force the evaluation (something like dataFrame.rdd.count) between these commands?

– Aleksey Isachenkov
Nov 22 '18 at 13:51















@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?

– Yasir
Nov 22 '18 at 15:08





@AlekseyIsachenkov your suggestion has certainly worked. So the hunch was right, it is a lazy evaluation problem, forcing the evaluation works. I am skeptical of the cost of doing this in a production grade application, any suggestions?

– Yasir
Nov 22 '18 at 15:08













Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute dataFrame.unpersist when you complete using it.

– Aleksey Isachenkov
Nov 22 '18 at 15:26





Basically, it depends on your data size and your cluster capabilities. Persist with MEMORY_ONLY mode costs almost nothing except memory for caching. But even DISK_ONLY mode doesn't slow down your code too much. Just don't forget to execute dataFrame.unpersist when you complete using it.

– Aleksey Isachenkov
Nov 22 '18 at 15:26













I'll write an answer then.

– Aleksey Isachenkov
Nov 22 '18 at 15:27





I'll write an answer then.

– Aleksey Isachenkov
Nov 22 '18 at 15:27













good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!

– Yasir
Nov 22 '18 at 15:29





good point, oo I almost forgot to add the unpersist in my code :P thanks though, great tips yep go ahead !!!

– Yasir
Nov 22 '18 at 15:29












1 Answer
1






active

oldest

votes


















2














Spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command which in cql looks like:




SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?




And there you get two same restrictions on the same field "event_time" >= ?.



If you persist dataFrame before the execution of .filter Spark will compute dataFrame separately from .filter:



val dataFrame = sparkSession.sql(queryTimeRange)
dataFrame.persist
dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))





share|improve this answer


























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53432008%2fdata-queried-from-cassandra-cannot-be-filtered-on-same-column-again-invalidquer%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    2














    Spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command which in cql looks like:




    SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?




    And there you get two same restrictions on the same field "event_time" >= ?.



    If you persist dataFrame before the execution of .filter Spark will compute dataFrame separately from .filter:



    val dataFrame = sparkSession.sql(queryTimeRange)
    dataFrame.persist
    dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))





    share|improve this answer






























      2














      Spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command which in cql looks like:




      SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?




      And there you get two same restrictions on the same field "event_time" >= ?.



      If you persist dataFrame before the execution of .filter Spark will compute dataFrame separately from .filter:



      val dataFrame = sparkSession.sql(queryTimeRange)
      dataFrame.persist
      dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))





      share|improve this answer




























        2












        2








        2







        Spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command which in cql looks like:




        SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?




        And there you get two same restrictions on the same field "event_time" >= ?.



        If you persist dataFrame before the execution of .filter Spark will compute dataFrame separately from .filter:



        val dataFrame = sparkSession.sql(queryTimeRange)
        dataFrame.persist
        dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))





        share|improve this answer















        Spark merges sparkSession.sql(queryTimeRange) and dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd)) into one command which in cql looks like:




        SELECT "sensorid", "event_time", "value" FROM "company_5a819ee2522e572c8a16a43a"."data" WHERE token("sensorid") > ? AND token("sensorid") <= ? AND "event_time" >= ? AND "event_time" >= ? AND "event_time" <= ?




        And there you get two same restrictions on the same field "event_time" >= ?.



        If you persist dataFrame before the execution of .filter Spark will compute dataFrame separately from .filter:



        val dataFrame = sparkSession.sql(queryTimeRange)
        dataFrame.persist
        dataSet.filter(dataSet("event_time").between(dtRangeStart, dtRangeEnd))






        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Nov 22 '18 at 18:36

























        answered Nov 22 '18 at 15:32









        Aleksey IsachenkovAleksey Isachenkov

        698215




        698215
































            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53432008%2fdata-queried-from-cassandra-cannot-be-filtered-on-same-column-again-invalidquer%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Biblatex bibliography style without URLs when DOI exists (in Overleaf with Zotero bibliography)

            ComboBox Display Member on multiple fields

            Is it possible to collect Nectar points via Trainline?