Parsing Data in Apache Spark Scala org.apache.spark.SparkException: Task not serializable error when trying...
Input file:
___DATE___
2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06
___DATE___
2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05
Required output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}
object parse_stats extends App {
case class LoadSchema(date:String)
val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")
val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}
transformRDD.collect().foreach(println)
}
If run this code from Intellij, I get below output.
((),(),())
((),(),())
((),(),())
If I run from spark-shell, I get below error:
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")
scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37
scala>
scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>
scala>
scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
What am I missing?
scala apache-spark rdd hadoop2 spark-shell
add a comment |
Input file:
___DATE___
2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06
___DATE___
2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05
Required output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}
object parse_stats extends App {
case class LoadSchema(date:String)
val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")
val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}
transformRDD.collect().foreach(println)
}
If run this code from Intellij, I get below output.
((),(),())
((),(),())
((),(),())
If I run from spark-shell, I get below error:
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")
scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37
scala>
scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>
scala>
scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
What am I missing?
scala apache-spark rdd hadoop2 spark-shell
textinputformat.record.delimiter should be single character, not a word
– stack0114106
Nov 20 '18 at 12:36
As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.
– Rohit Nimmala
Nov 20 '18 at 12:44
it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens
– stack0114106
Nov 20 '18 at 13:18
Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(
– Rohit Nimmala
Nov 20 '18 at 14:08
Yes, that can also be done.. let me update the answer
– stack0114106
Nov 20 '18 at 16:48
add a comment |
Input file:
___DATE___
2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06
___DATE___
2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05
Required output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}
object parse_stats extends App {
case class LoadSchema(date:String)
val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")
val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}
transformRDD.collect().foreach(println)
}
If run this code from Intellij, I get below output.
((),(),())
((),(),())
((),(),())
If I run from spark-shell, I get below error:
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")
scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37
scala>
scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>
scala>
scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
What am I missing?
scala apache-spark rdd hadoop2 spark-shell
Input file:
___DATE___
2018-11-16T06:3937
Linux hortonworks 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:39:37 up 100 days, 1:04, 2 users, load average: 9.01, 8.30, 8.48
06:30:01 AM all 6.08 0.00 2.83 0.04 0.00 91.06
___DATE___
2018-11-16T06:4037
Linux cloudera 3.10.0-514.26.2.el7.x86_64 #1 SMP Fri Jun 30 05:26:04 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
06:40:37 up 100 days, 1:05, 28 users, load average: 8.39, 8.26, 8.45
06:40:01 AM all 6.92 1.11 1.88 0.04 0.00 90.05
Required output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
I'm trying to get my hands on Spark with Scala.
Trying to parse this input file with Spark 2.3.1 and scala 2.11.6 . Here's my code.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SparkConf, SparkContext}
object parse_stats extends App {
case class LoadSchema(date:String)
val conf = new SparkConf().setAppName("ParseStats").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf.set("textinputformat.record.delimiter","___DATE___")
val input = sc.newAPIHadoopFile("C:\Users\rohit\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
(if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
(if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
)
}
transformRDD.collect().foreach(println)
}
If run this code from Intellij, I get below output.
((),(),())
((),(),())
((),(),())
If I run from spark-shell, I get below error:
scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration
scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hadoopConf = new Configuration(sc.hadoopConfiguration)
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> hadoopConf.set("textinputformat.record.delimiter","___DATE___")
scala> val input = sc.newAPIHadoopFile("C:\Users\rnimmal1\Documents\dataset\sys_stats.log",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],hadoopConf).map(line=>line._2.toString)
input: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:37
scala>
scala> lazy val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
date_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val uname_pattern="[Linux][0-9a-zA-z-#() . : _ /]+[GNU/Linux]".r
uname_pattern: scala.util.matching.Regex = <lazy>
scala> lazy val cpu_regex="[ 0-9]+[:][0-9]+[:][0-9]+[0-9a-zA-Z, : .]+[load average][:][0-9 . ,]+".r
cpu_regex: scala.util.matching.Regex = <lazy>
scala>
scala> val transformRDD = input.map{eachline=>((if(date_pattern.pattern.matcher(eachline).matches()) eachline), //collects date
| (if(uname_pattern.pattern.matcher(eachline).matches()) eachline.split("\s+")(1).trim() ), //collects hostname
| (if (cpu_regex.pattern.matcher(eachline).matches()) eachline.split(",")(2).trim()) //collects cpu users
| )
| }
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:371)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
... 54 elided
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
- object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml)
- field (class: $iw, name: hadoopConf, type: class org.apache.hadoop.conf.Configuration)
- object (class $iw, $iw@63fa0b9)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3f4b52fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@338f9bb5)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3d63becf)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3aca7082)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@4ccfd904)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6e4e7a62)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5aaab2b0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5c51a7eb)
- field (class: $line36.$read, name: $iw, type: class $iw)
- object (class $line36.$read, $line36.$read@2ba3b4a6)
- field (class: $iw, name: $line36$read, type: class $line36.$read)
- object (class $iw, $iw@6559f04e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@8f7cbcc)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@465b16bb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@373efaa2)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5f2896fa)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@f777d41)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@43ec41d7)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@61c0a61)
- field (class: $line38.$read, name: $iw, type: class $iw)
- object (class $line38.$read, $line38.$read@10d1f6da)
- field (class: $iw, name: $line38$read, type: class $line38.$read)
- object (class $iw, $iw@2095e085)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@380cb7e3)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
... 63 more
What am I missing?
scala apache-spark rdd hadoop2 spark-shell
scala apache-spark rdd hadoop2 spark-shell
edited Nov 20 '18 at 12:42
antoine-sac
2,67621241
2,67621241
asked Nov 20 '18 at 11:55
Rohit NimmalaRohit Nimmala
33419
33419
textinputformat.record.delimiter should be single character, not a word
– stack0114106
Nov 20 '18 at 12:36
As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.
– Rohit Nimmala
Nov 20 '18 at 12:44
it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens
– stack0114106
Nov 20 '18 at 13:18
Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(
– Rohit Nimmala
Nov 20 '18 at 14:08
Yes, that can also be done.. let me update the answer
– stack0114106
Nov 20 '18 at 16:48
add a comment |
textinputformat.record.delimiter should be single character, not a word
– stack0114106
Nov 20 '18 at 12:36
As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.
– Rohit Nimmala
Nov 20 '18 at 12:44
it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens
– stack0114106
Nov 20 '18 at 13:18
Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(
– Rohit Nimmala
Nov 20 '18 at 14:08
Yes, that can also be done.. let me update the answer
– stack0114106
Nov 20 '18 at 16:48
textinputformat.record.delimiter should be single character, not a word
– stack0114106
Nov 20 '18 at 12:36
textinputformat.record.delimiter should be single character, not a word
– stack0114106
Nov 20 '18 at 12:36
As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.
– Rohit Nimmala
Nov 20 '18 at 12:44
As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.
– Rohit Nimmala
Nov 20 '18 at 12:44
it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens
– stack0114106
Nov 20 '18 at 13:18
it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens
– stack0114106
Nov 20 '18 at 13:18
Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(
– Rohit Nimmala
Nov 20 '18 at 14:08
Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(
– Rohit Nimmala
Nov 20 '18 at 14:08
Yes, that can also be done.. let me update the answer
– stack0114106
Nov 20 '18 at 16:48
Yes, that can also be done.. let me update the answer
– stack0114106
Nov 20 '18 at 16:48
add a comment |
2 Answers
2
active
oldest
votes
After changing the __DATA__
to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => { val x = line.split("""n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
Output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
Update1:
With regex matching:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
add a comment |
I believe the problem is that you are defining those filters objects (date_pattern
) outside of the RDD, so Spark has to send the entire parse_stats
object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.
Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53392471%2fparsing-data-in-apache-spark-scala-org-apache-spark-sparkexception-task-not-ser%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
After changing the __DATA__
to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => { val x = line.split("""n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
Output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
Update1:
With regex matching:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
add a comment |
After changing the __DATA__
to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => { val x = line.split("""n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
Output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
Update1:
With regex matching:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
add a comment |
After changing the __DATA__
to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => { val x = line.split("""n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
Output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
Update1:
With regex matching:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
After changing the __DATA__
to pipe "|" , the below code snippet produces the required output. Note that I have used Windows platform, so I'm replacing "r". Please check out
val spark = SparkSession.builder().appName("Spark_test").master("local[*]").getOrCreate()
import spark.implicits._
val file1 = spark.sparkContext.textFile("./in/machine_logs.txt")
spark.sparkContext.hadoopConfiguration.set("textinputformat.record.delimiter","|")
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => { val x = line.split("""n""")
val p = x(2).replaceAll("\r","") // not needed if Unix platform
val q = x(3).split(" ")(1)
val r = x(4).split(",")(2)
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
//file2.saveAsTextFile("./in/machine_logs.out") --> comment above line and uncomment this line to save in file
Output:
2018-11-16T06:3937,hortonworks, 2 users
2018-11-16T06:4037,cloudera, 28 users
Update1:
With regex matching:
val date_pattern="[0-9]+[-][0-9]+[-][0-9]+[T][0-9]+[:][0-9]+".r
val uname_pattern="(Linux) (.*?) [0-9a-zA-z-#() . : _ /]+(GNU/Linux)".r
val cpu_regex="""(.+),(.*?),s+(load average)[:][0-9 . ,]+""".r
val file2 = file1.filter( line => { val x = line.split("""n"""); x.length > 5 } )
.map( line => {
var q = ""; var r = "";
val p = date_pattern.findFirstIn(line).mkString
uname_pattern.findAllIn(line).matchData.foreach(m=> {q = m.group(2).mkString} )
cpu_regex.findAllIn(line).matchData.foreach(m=> {r = m.group(2).mkString} )
(p + "," + q + "," + r)
} )
file2.collect.foreach(println)
edited Nov 20 '18 at 16:49
answered Nov 20 '18 at 14:49
stack0114106stack0114106
3,7072419
3,7072419
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
add a comment |
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
Thanks for the answer, let me try.
– Rohit Nimmala
Nov 20 '18 at 15:09
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
This worked! but I dont want to hardcode the line numbers (eg: x(2),x(3)). I want to get it working with pattern matching, I mean extract the value only if the string matches the pattern extract the data.
– Rohit Nimmala
Nov 20 '18 at 16:13
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
updated the answer. Note that I tweaked the regex that you used as there were some mistakes
– stack0114106
Nov 20 '18 at 16:51
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
Thanks mate! I figured it out just a while ago. Anyways thanks for your patience and help :)
– Rohit Nimmala
Nov 20 '18 at 17:00
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
glad that you figured it out..
– stack0114106
Nov 20 '18 at 17:06
add a comment |
I believe the problem is that you are defining those filters objects (date_pattern
) outside of the RDD, so Spark has to send the entire parse_stats
object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.
Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67
add a comment |
I believe the problem is that you are defining those filters objects (date_pattern
) outside of the RDD, so Spark has to send the entire parse_stats
object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.
Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67
add a comment |
I believe the problem is that you are defining those filters objects (date_pattern
) outside of the RDD, so Spark has to send the entire parse_stats
object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.
Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67
I believe the problem is that you are defining those filters objects (date_pattern
) outside of the RDD, so Spark has to send the entire parse_stats
object to all of the executors, which it cannot do because it cannot serialize that entire object. This doesn't happen when you run it in local mode because it doesn't need to send any objects to other executors.
Check out the great answer here: Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
This gist has some quick and simple ways to avoid serialization: https://gist.github.com/kmader/1d64e64621e63d566f67
answered Nov 20 '18 at 16:01
MBillauMBillau
5,28522129
5,28522129
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53392471%2fparsing-data-in-apache-spark-scala-org-apache-spark-sparkexception-task-not-ser%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
textinputformat.record.delimiter should be single character, not a word
– stack0114106
Nov 20 '18 at 12:36
As much as I know "textinputformat.record.delimiter" is not confined to single character. I used the same delimiter with Apache Pig and it worked there, and it should work in spark also, as this property is specific to hadoop.
– Rohit Nimmala
Nov 20 '18 at 12:44
it can be multibyte if the encoding is utf8 or utf16, but not a word.. I just ran and it ignores DATA and takes the default newline. if you change DATA to "|" the split happens
– stack0114106
Nov 20 '18 at 13:18
Thanks, I replaced DATE string with "|" in the data set and in the property as well, I see the code is working the same way. :(
– Rohit Nimmala
Nov 20 '18 at 14:08
Yes, that can also be done.. let me update the answer
– stack0114106
Nov 20 '18 at 16:48