mercredi 22 juin 2016

How to modify numpy arrays in Spark dataframe?


Problem:
No assignments are allowed inside lambda or dataframe transforms, this means we usually have to create a new structure for every data manipulation done in Dataframes with Spark.

Example (Python):
I have previously gotten around this issue by simply creating the modified data in-place without assignments in lists and dictionaries, however the numpy arithmetic is proving to be quite troublesome. And I have ran some simulations on putting all this data into lists, and it would be slowed down quite significantly since the arrays are pretty large. (Ex. these arrays are about 3K elements long each, contained in lists of 30 arrays per db row, for several million rows)

a = np.zeros(5)

# Actual operation
a[1:3] += 7
print "{}".format(a)
>> [ 0.  7.  7.  0.  0.]

# Spark compatability - Create modified array in memory to avoid assignment
# Not sure if this is best "solution" performance-wise
c = np.concatenate([a[:1], a[1:3] + 7, a[3:]])
print "{}n".format(c)
>> [ 0.  7.  7.  0.  0.]

Example (pyspark):
So now you can see the output I'm expecting, here is a Spark version.

t = sc.parallelize(a)
t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]))
t2.take(1)

Error:
I thought this would work, however I get this. I thought the issue was this "ar[1:3] + 7" but after running it without that, it still gave same error. Maybe there's something I'm missing.

Maybe the np.concatenate() does some sort of assignment that causes this. If that is the case what would be a way around it?

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-4a4c467a0b3d> in <module>()
     12 t = sc.parallelize(a)
     13 t2 = t.map(lambda ar: np.concatenate([ar[:1], ar[1:3] + 7, ar[3:]]))
---> 14 t2.take(1)

/databricks/spark/python/pyspark/rdd.py in take(self, num)
   1297 
   1298             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1299             res = self.context.runJob(self, takeUpToNumLeft, p)
   1300 
   1301             items += res

/databricks/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    914         # SparkContext#runJob.
    915         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 916         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    917         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    918 

/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
     38             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 30, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 111, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda>
IndexError: invalid index to scalar variable.

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1827)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1840)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1853)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 111, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/databricks/spark/python/pyspark/rdd.py", line 1295, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-46-4a4c467a0b3d>", line 13, in <lambda>
IndexError: invalid index to scalar variable.

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

Aucun commentaire:

Enregistrer un commentaire