Bag of words with pySpark reduceByKey
up vote
2
down vote
favorite
I am trying to do some text mining tasks with pySpark. I am new to Spark and I've been following this example http://mccarroll.net/blog/pyspark2/index.html to build the bag of words for my data.
Originally my data looked something like this
df.show(5)
+------------+---------+----------------+--------------------+
|Title |Month | Author | Document|
+------------+---------+----------------+--------------------+
| a | Jan| John |This is a document |
| b | Feb| Mary |A book by Mary |
| c | Mar| Luke |Newspaper article |
+------------+---------+----------------+--------------------+
So far I have extracted the terms of each document with
bow0 = df.rdd
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
Which gives me
[('This', 1),
('is', 1),
('a', 1),
('document', 1)]
But when I try to compute the frequency with reduceByKey and try to see the result
bow0.reduceByKey(lambda x,y:x+y).take(50)
I get this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-53-966f90775397> in <module>()
----> 1 bow0.reduceByKey(lambda x,y:x+y).take(50)
/usr/local/spark/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
990 # SparkContext#runJob.
991 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
994
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 31.0 failed 4 times, most recent failure: Lost task 1.3 in stage 31.0 (TID 84, 9.242.64.15, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:455)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
pyspark rdd reduce
add a comment |
up vote
2
down vote
favorite
I am trying to do some text mining tasks with pySpark. I am new to Spark and I've been following this example http://mccarroll.net/blog/pyspark2/index.html to build the bag of words for my data.
Originally my data looked something like this
df.show(5)
+------------+---------+----------------+--------------------+
|Title |Month | Author | Document|
+------------+---------+----------------+--------------------+
| a | Jan| John |This is a document |
| b | Feb| Mary |A book by Mary |
| c | Mar| Luke |Newspaper article |
+------------+---------+----------------+--------------------+
So far I have extracted the terms of each document with
bow0 = df.rdd
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
Which gives me
[('This', 1),
('is', 1),
('a', 1),
('document', 1)]
But when I try to compute the frequency with reduceByKey and try to see the result
bow0.reduceByKey(lambda x,y:x+y).take(50)
I get this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-53-966f90775397> in <module>()
----> 1 bow0.reduceByKey(lambda x,y:x+y).take(50)
/usr/local/spark/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
990 # SparkContext#runJob.
991 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
994
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 31.0 failed 4 times, most recent failure: Lost task 1.3 in stage 31.0 (TID 84, 9.242.64.15, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:455)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
pyspark rdd reduce
The error you haveAttributeError: 'NoneType' object has no attribute 'replace'
means that somewhere.replace
is being called onNone
. There is only one place that this could happen, so this means you have somenull
values in yourDocument
column. The quickest modification to your code would be to change yourmap
function to the following:.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower() if x.Document else '')
or add a.filter(lambda x: x.Document is not None)
before callingmap
– pault
Nov 15 at 14:38
add a comment |
up vote
2
down vote
favorite
up vote
2
down vote
favorite
I am trying to do some text mining tasks with pySpark. I am new to Spark and I've been following this example http://mccarroll.net/blog/pyspark2/index.html to build the bag of words for my data.
Originally my data looked something like this
df.show(5)
+------------+---------+----------------+--------------------+
|Title |Month | Author | Document|
+------------+---------+----------------+--------------------+
| a | Jan| John |This is a document |
| b | Feb| Mary |A book by Mary |
| c | Mar| Luke |Newspaper article |
+------------+---------+----------------+--------------------+
So far I have extracted the terms of each document with
bow0 = df.rdd
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
Which gives me
[('This', 1),
('is', 1),
('a', 1),
('document', 1)]
But when I try to compute the frequency with reduceByKey and try to see the result
bow0.reduceByKey(lambda x,y:x+y).take(50)
I get this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-53-966f90775397> in <module>()
----> 1 bow0.reduceByKey(lambda x,y:x+y).take(50)
/usr/local/spark/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
990 # SparkContext#runJob.
991 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
994
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 31.0 failed 4 times, most recent failure: Lost task 1.3 in stage 31.0 (TID 84, 9.242.64.15, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:455)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
pyspark rdd reduce
I am trying to do some text mining tasks with pySpark. I am new to Spark and I've been following this example http://mccarroll.net/blog/pyspark2/index.html to build the bag of words for my data.
Originally my data looked something like this
df.show(5)
+------------+---------+----------------+--------------------+
|Title |Month | Author | Document|
+------------+---------+----------------+--------------------+
| a | Jan| John |This is a document |
| b | Feb| Mary |A book by Mary |
| c | Mar| Luke |Newspaper article |
+------------+---------+----------------+--------------------+
So far I have extracted the terms of each document with
bow0 = df.rdd
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
Which gives me
[('This', 1),
('is', 1),
('a', 1),
('document', 1)]
But when I try to compute the frequency with reduceByKey and try to see the result
bow0.reduceByKey(lambda x,y:x+y).take(50)
I get this error:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-53-966f90775397> in <module>()
----> 1 bow0.reduceByKey(lambda x,y:x+y).take(50)
/usr/local/spark/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
990 # SparkContext#runJob.
991 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
994
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 31.0 failed 4 times, most recent failure: Lost task 1.3 in stage 31.0 (TID 84, 9.242.64.15, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:455)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
return f(iterator)
File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
merger.mergeValues(iterator)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
for k, v in iterator:
File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
pyspark rdd reduce
pyspark rdd reduce
edited Nov 15 at 15:14
pault
13.7k31744
13.7k31744
asked Nov 15 at 13:42
Catalina Herrera
132
132
The error you haveAttributeError: 'NoneType' object has no attribute 'replace'
means that somewhere.replace
is being called onNone
. There is only one place that this could happen, so this means you have somenull
values in yourDocument
column. The quickest modification to your code would be to change yourmap
function to the following:.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower() if x.Document else '')
or add a.filter(lambda x: x.Document is not None)
before callingmap
– pault
Nov 15 at 14:38
add a comment |
The error you haveAttributeError: 'NoneType' object has no attribute 'replace'
means that somewhere.replace
is being called onNone
. There is only one place that this could happen, so this means you have somenull
values in yourDocument
column. The quickest modification to your code would be to change yourmap
function to the following:.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower() if x.Document else '')
or add a.filter(lambda x: x.Document is not None)
before callingmap
– pault
Nov 15 at 14:38
The error you have
AttributeError: 'NoneType' object has no attribute 'replace'
means that somewhere .replace
is being called on None
. There is only one place that this could happen, so this means you have some null
values in your Document
column. The quickest modification to your code would be to change your map
function to the following: .map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower() if x.Document else '')
or add a .filter(lambda x: x.Document is not None)
before calling map
– pault
Nov 15 at 14:38
The error you have
AttributeError: 'NoneType' object has no attribute 'replace'
means that somewhere .replace
is being called on None
. There is only one place that this could happen, so this means you have some null
values in your Document
column. The quickest modification to your code would be to change your map
function to the following: .map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower() if x.Document else '')
or add a .filter(lambda x: x.Document is not None)
before calling map
– pault
Nov 15 at 14:38
add a comment |
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
To expand on my comment, the error you are receiving is due to the presence of a null
value in your Document column. Here's a small example to demonstrate:
data = [
['a', 'Jan', 'John', 'This is a document'],
['b', 'Feb', 'Mary', 'A book by Mary'],
['c', 'Mar', 'Luke', 'Newspaper article'],
['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
df = spark.createDataFrame(data, columns)
df.show()
#+-----+-----+------+------------------+
#|Title|Month|Author| Document|
#+-----+-----+------+------------------+
#| a| Jan| John|This is a document|
#| b| Feb| Mary| A book by Mary|
#| c| Mar| Luke| Newspaper article|
#| d| Apr| Mark| null|
#+-----+-----+------+------------------+
For the last row, the value in the Document
column is null
. When you compute bow0
as in your question, when the map
function operates on that row it tries to call x.Document.replace
where x
is None
. This results in AttributeError: 'NoneType' object has no attribute 'replace'
.
One way to overcome this is to filter out the bad values before calling map
:
bow0 = df.rdd
.filter(lambda x: x.Document)
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
bow0.reduceByKey(lambda x,y:x+y).take(50)
#[(u'a', 2),
# (u'this', 1),
# (u'is', 1),
# (u'newspaper', 1),
# (u'article', 1),
# (u'by', 1),
# (u'book', 1),
# (u'mary', 1),
# (u'document', 1)]
Or you can build in the check for None
condition inside of your map
function. In general, it is good practice to make your map
function robust to bad inputs.
As an aside, you can do the same thing using the DataFrame API functions. In this case:
from pyspark.sql.functions import explode, split, regexp_replace, col, lower
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "s+")).alias("word"))
.groupby(lower(col("word")).alias("lower"))
.count()
.show()
#+---------+-----+
#| lower|count|
#+---------+-----+
#| document| 1|
#| by| 1|
#|newspaper| 1|
#| article| 1|
#| mary| 1|
#| is| 1|
#| a| 2|
#| this| 1|
#| book| 1|
#+---------+-----+
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53320803%2fbag-of-words-with-pyspark-reducebykey%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
To expand on my comment, the error you are receiving is due to the presence of a null
value in your Document column. Here's a small example to demonstrate:
data = [
['a', 'Jan', 'John', 'This is a document'],
['b', 'Feb', 'Mary', 'A book by Mary'],
['c', 'Mar', 'Luke', 'Newspaper article'],
['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
df = spark.createDataFrame(data, columns)
df.show()
#+-----+-----+------+------------------+
#|Title|Month|Author| Document|
#+-----+-----+------+------------------+
#| a| Jan| John|This is a document|
#| b| Feb| Mary| A book by Mary|
#| c| Mar| Luke| Newspaper article|
#| d| Apr| Mark| null|
#+-----+-----+------+------------------+
For the last row, the value in the Document
column is null
. When you compute bow0
as in your question, when the map
function operates on that row it tries to call x.Document.replace
where x
is None
. This results in AttributeError: 'NoneType' object has no attribute 'replace'
.
One way to overcome this is to filter out the bad values before calling map
:
bow0 = df.rdd
.filter(lambda x: x.Document)
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
bow0.reduceByKey(lambda x,y:x+y).take(50)
#[(u'a', 2),
# (u'this', 1),
# (u'is', 1),
# (u'newspaper', 1),
# (u'article', 1),
# (u'by', 1),
# (u'book', 1),
# (u'mary', 1),
# (u'document', 1)]
Or you can build in the check for None
condition inside of your map
function. In general, it is good practice to make your map
function robust to bad inputs.
As an aside, you can do the same thing using the DataFrame API functions. In this case:
from pyspark.sql.functions import explode, split, regexp_replace, col, lower
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "s+")).alias("word"))
.groupby(lower(col("word")).alias("lower"))
.count()
.show()
#+---------+-----+
#| lower|count|
#+---------+-----+
#| document| 1|
#| by| 1|
#|newspaper| 1|
#| article| 1|
#| mary| 1|
#| is| 1|
#| a| 2|
#| this| 1|
#| book| 1|
#+---------+-----+
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
add a comment |
up vote
1
down vote
accepted
To expand on my comment, the error you are receiving is due to the presence of a null
value in your Document column. Here's a small example to demonstrate:
data = [
['a', 'Jan', 'John', 'This is a document'],
['b', 'Feb', 'Mary', 'A book by Mary'],
['c', 'Mar', 'Luke', 'Newspaper article'],
['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
df = spark.createDataFrame(data, columns)
df.show()
#+-----+-----+------+------------------+
#|Title|Month|Author| Document|
#+-----+-----+------+------------------+
#| a| Jan| John|This is a document|
#| b| Feb| Mary| A book by Mary|
#| c| Mar| Luke| Newspaper article|
#| d| Apr| Mark| null|
#+-----+-----+------+------------------+
For the last row, the value in the Document
column is null
. When you compute bow0
as in your question, when the map
function operates on that row it tries to call x.Document.replace
where x
is None
. This results in AttributeError: 'NoneType' object has no attribute 'replace'
.
One way to overcome this is to filter out the bad values before calling map
:
bow0 = df.rdd
.filter(lambda x: x.Document)
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
bow0.reduceByKey(lambda x,y:x+y).take(50)
#[(u'a', 2),
# (u'this', 1),
# (u'is', 1),
# (u'newspaper', 1),
# (u'article', 1),
# (u'by', 1),
# (u'book', 1),
# (u'mary', 1),
# (u'document', 1)]
Or you can build in the check for None
condition inside of your map
function. In general, it is good practice to make your map
function robust to bad inputs.
As an aside, you can do the same thing using the DataFrame API functions. In this case:
from pyspark.sql.functions import explode, split, regexp_replace, col, lower
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "s+")).alias("word"))
.groupby(lower(col("word")).alias("lower"))
.count()
.show()
#+---------+-----+
#| lower|count|
#+---------+-----+
#| document| 1|
#| by| 1|
#|newspaper| 1|
#| article| 1|
#| mary| 1|
#| is| 1|
#| a| 2|
#| this| 1|
#| book| 1|
#+---------+-----+
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
add a comment |
up vote
1
down vote
accepted
up vote
1
down vote
accepted
To expand on my comment, the error you are receiving is due to the presence of a null
value in your Document column. Here's a small example to demonstrate:
data = [
['a', 'Jan', 'John', 'This is a document'],
['b', 'Feb', 'Mary', 'A book by Mary'],
['c', 'Mar', 'Luke', 'Newspaper article'],
['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
df = spark.createDataFrame(data, columns)
df.show()
#+-----+-----+------+------------------+
#|Title|Month|Author| Document|
#+-----+-----+------+------------------+
#| a| Jan| John|This is a document|
#| b| Feb| Mary| A book by Mary|
#| c| Mar| Luke| Newspaper article|
#| d| Apr| Mark| null|
#+-----+-----+------+------------------+
For the last row, the value in the Document
column is null
. When you compute bow0
as in your question, when the map
function operates on that row it tries to call x.Document.replace
where x
is None
. This results in AttributeError: 'NoneType' object has no attribute 'replace'
.
One way to overcome this is to filter out the bad values before calling map
:
bow0 = df.rdd
.filter(lambda x: x.Document)
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
bow0.reduceByKey(lambda x,y:x+y).take(50)
#[(u'a', 2),
# (u'this', 1),
# (u'is', 1),
# (u'newspaper', 1),
# (u'article', 1),
# (u'by', 1),
# (u'book', 1),
# (u'mary', 1),
# (u'document', 1)]
Or you can build in the check for None
condition inside of your map
function. In general, it is good practice to make your map
function robust to bad inputs.
As an aside, you can do the same thing using the DataFrame API functions. In this case:
from pyspark.sql.functions import explode, split, regexp_replace, col, lower
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "s+")).alias("word"))
.groupby(lower(col("word")).alias("lower"))
.count()
.show()
#+---------+-----+
#| lower|count|
#+---------+-----+
#| document| 1|
#| by| 1|
#|newspaper| 1|
#| article| 1|
#| mary| 1|
#| is| 1|
#| a| 2|
#| this| 1|
#| book| 1|
#+---------+-----+
To expand on my comment, the error you are receiving is due to the presence of a null
value in your Document column. Here's a small example to demonstrate:
data = [
['a', 'Jan', 'John', 'This is a document'],
['b', 'Feb', 'Mary', 'A book by Mary'],
['c', 'Mar', 'Luke', 'Newspaper article'],
['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
df = spark.createDataFrame(data, columns)
df.show()
#+-----+-----+------+------------------+
#|Title|Month|Author| Document|
#+-----+-----+------+------------------+
#| a| Jan| John|This is a document|
#| b| Feb| Mary| A book by Mary|
#| c| Mar| Luke| Newspaper article|
#| d| Apr| Mark| null|
#+-----+-----+------+------------------+
For the last row, the value in the Document
column is null
. When you compute bow0
as in your question, when the map
function operates on that row it tries to call x.Document.replace
where x
is None
. This results in AttributeError: 'NoneType' object has no attribute 'replace'
.
One way to overcome this is to filter out the bad values before calling map
:
bow0 = df.rdd
.filter(lambda x: x.Document)
.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())
.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
bow0.reduceByKey(lambda x,y:x+y).take(50)
#[(u'a', 2),
# (u'this', 1),
# (u'is', 1),
# (u'newspaper', 1),
# (u'article', 1),
# (u'by', 1),
# (u'book', 1),
# (u'mary', 1),
# (u'document', 1)]
Or you can build in the check for None
condition inside of your map
function. In general, it is good practice to make your map
function robust to bad inputs.
As an aside, you can do the same thing using the DataFrame API functions. In this case:
from pyspark.sql.functions import explode, split, regexp_replace, col, lower
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "s+")).alias("word"))
.groupby(lower(col("word")).alias("lower"))
.count()
.show()
#+---------+-----+
#| lower|count|
#+---------+-----+
#| document| 1|
#| by| 1|
#|newspaper| 1|
#| article| 1|
#| mary| 1|
#| is| 1|
#| a| 2|
#| this| 1|
#| book| 1|
#+---------+-----+
edited Nov 15 at 15:31
answered Nov 15 at 15:11
pault
13.7k31744
13.7k31744
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
add a comment |
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
Indeed that was the problema with my data... I've cleaned it up and now it's working perfectly. Thanks for your help
– Catalina Herrera
Nov 15 at 15:36
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53320803%2fbag-of-words-with-pyspark-reducebykey%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
The error you have
AttributeError: 'NoneType' object has no attribute 'replace'
means that somewhere.replace
is being called onNone
. There is only one place that this could happen, so this means you have somenull
values in yourDocument
column. The quickest modification to your code would be to change yourmap
function to the following:.map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower() if x.Document else '')
or add a.filter(lambda x: x.Document is not None)
before callingmap
– pault
Nov 15 at 14:38