Problem:
No assignments are allowed inside lambda or dataframe transforms, this means we usually have to create a new structure for every data manipulation done in Dataframes with Spark.
Example (Python):
I have previously gotten around this issue by simply creating the modified data in-place without assignments in lists and dictionaries, however the numpy arithmetic is proving to be quite troublesome. And I have ran some simulations on putting all this data into lists, and it would be slowed down quite significantly since the arrays are pretty large. (Ex. these arrays are about 3K elements long each, contained in lists of 30 arrays per db row, for several million rows)
a = np.zeros(5)
# Actual operation
a[1:3] += 7
print "{}".format(a)
>> [ 0. 7. 7. 0. 0.]
# Spark compatability - Create modified array in memory to avoid assignment
# Not sure if this is best "solution" performance-wise
c = np.concatenate([a[:1], a[1:3] + 7, a[3:]])
print "{}n".format(c)
>> [ 0. 7. 7. 0. 0.]
Example (pyspark):
So now you can see the output I'm expecting, here is a Spark version.
t = sc.parallelize(a)
t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]))
t2.take(1)
Error:
I thought this would work, however I get this. I thought the issue was this "ar[1:3] + 7" but after running it without that, it still gave same error. Maybe there's something I'm missing.
Maybe the np.concatenate() does some sort of assignment that causes this. If that is the case what would be a way around it?
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-46-4a4c467a0b3d> in <module>()
12 t = sc.parallelize(a)
13 t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]))
---> 14 t2.take(1)
/databricks/spark/python/pyspark/rdd.py in take(self, num)
1297
1298 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1299 res = self.context.runJob(self, takeUpToNumLeft, p)
1300
1301 items += res
/databricks/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
914 # SparkContext#runJob.
915 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 916 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
917 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
918
/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
34 def deco(*a, **kw):
35 try:
---> 36 return f(*a, **kw)
37 except py4j.protocol.Py4JJavaError as e:
38 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 30, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 111, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
yield next(iterator)
File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda>
IndexError: invalid index to scalar variable.
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1827)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1840)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1853)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 111, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
yield next(iterator)
File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda>
IndexError: invalid index to scalar variable.
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Aucun commentaire:
Enregistrer un commentaire