Questions tagged [rdd]

1

votes
1

answer
27

Views

How to properly apply HashPartitioner before a join in Spark?

To reduce shuffling during the joining of two RDDs, I decided to partition them using HashPartitioner first. Here is how I do it. Am I doing it correctly, or is there a better way to do this? val rddA = ... val rddB = ... val numOfPartitions = rddA.getNumPartitions val rddApartitioned = rddA.partiti...
MetallicPriest
-1

votes
0

answer
17

Views

Split ordered events into snapshots with Spark

I have a data structure called auction hits and I want to move through each event in this RDD to produce snapshots of these auctions at each event. So that I will be left with an collection of these those auction hits, where if a an auction event is type NEW it is added to the current collection fo...
eboni
1

votes
1

answer
3.7k

Views

Reading a text file in Spark

I am trying to read a simple text file into a spark RDD and I see that there are two ways of doing so as follows : from pyspark.sql import SparkSession spark = SparkSession.builder.master('local[*]').getOrCreate() sc = spark.sparkContext textRDD1 = sc.textFile('hobbit.txt') textRDD2 = spark.read.tex...
Calcutta
1

votes
0

answer
224

Views

Spark UI storage tab shows more RDDs

I am running a spark streaming application. The app uses MapWithStateRDD to manage state across batches. App and setup details: Nodes: 2 Memory per executor: 3 GB Num of partitions for MapWithStateRDD: 200 Standalone mode Batch size: 20 sec Timeout duration: 1 minute Checkpointing enabled Checkpoint...
scorpio
1

votes
0

answer
250

Views

How to convert DataFrame to javaRdd with distibuted copy?

I'm new to spark optimizaiton. I'm trying to read hive data into a dataFrame. Then I'm converting the dataFrame to javaRdd and running a map function on top of it. The problem I'm facing is, the transformation running on top of javaRdd is running with single task. Also the transformations running on...
Makubex
1

votes
1

answer
25

Views

not able to store result in hdfs when code runs for second iteration

Well I am new to spark and scala and have been trying to implement cleaning of data in spark. below code checks for the missing value for one column and stores it in outputrdd and runs loops for calculating missing value. code works well when there is only one missing value in file. Since hdfs does...
Yogesh Patel
1

votes
0

answer
184

Views

Spark : Why some executors are having 0 active tasks and some 13 tasks?

I am trying to read from s3 and do a count on the data frame. I have a cluster of 76 r3.4xlarge(1 master and 75 slaves). I set : spark.dynamicAllocation.enabled 'true' maximizeResourceAllocation 'true' When I checked the Spark UI, I am just seeing : Just 25 executors - in that only 7 have active ta...
user3407267
1

votes
1

answer
107

Views

pyspark in databricks - replace delimiters and string for every element in RDD at one shot

I feel stupid when trying to do something so simple as the following: I'm working with dataset /databricks-datasets/cs110x/ml-1m/data-001 which includes 3 dat files with info about user, movies and their ratings. RDD is like this sample of 10 entries: [u'1::F::1::10::48067', u'2::M::56::16::70072',...
zontxo
1

votes
1

answer
163

Views

Spark can not serialize the BufferedImage class

I have a Not Serializable Class exception in Spark 2.2.0. The following procedure is what I am trying to do in Scala: To read from HDFS a set of JPEG images. To build an array of java.awt.image.BufferedImageS. To extract the java.awt.image.BufferedImage buffer and store it in a 2D array for each ima...
Vitrion
1

votes
0

answer
159

Views

How to perform Welch's Ttest in Spark 2.0.0 using StreamingTest

I want to try Welch's T-test in Spark 2.0.0 As I know I can use StreamingTest() from mllib on this website. [https://spark.apache.org/docs/2.0.0/api/scala/index.html#org.apache.spark.mllib.stat.test.StreamingTest] this is my code in spark-shell import org.apache.spark.mllib.stat.test.{BinarySample,...
Jchanho
1

votes
2

answer
23

Views

Scala--How to get the same part of the two RDDS?

There are two RDD: val rdd1 = sc.parallelize(List(('aaa', 1), ('bbb', 4), ('ccc', 3))) val rdd2 = sc.parallelize(List(('aaa', 2), ('bbb', 5), ('ddd', 2))) If I want to join those by the first field and get the result like: List(('aaa', 1,2), ('bbb',4 ,5)) What should I code?Thx!!!!
Ryan Wang
1

votes
1

answer
62

Views

convert sets to matrix: how can I do this efficiently in Spark

I have a JavaPairRDD that contains the following pairs: (key0, (a,d)) (key1, (c)) (key2, (b,d,e)) (key3, (a,c,d)) Now, I would like to accomplish the following: combine all the values together (without worrying about the keys) to get the 'universal space': (a,b,c,d,e) convert each value into a v...
lee
1

votes
3

answer
71

Views

Scala-String filter operation from a new learner

There are some Strings like: 'A,C,D' 'A,C' 'A,B' 'B,C' 'D,F' 'G,D,H' If I want to filter those Strings by the key: A,C. That means, if the String contains A or C, I will take it. For example, through this rule, I will get: 'A,C,D' 'A,C' 'A,B' 'B,C' How should I code this function?
Wangkkkkkk
1

votes
1

answer
289

Views

Spark version 2.0 Streaming : how to dynamically infer the schema of a JSON String rdd and convert it to a DF

With versions prior to 2.0, I could use an SQLContext to do the same like this: val sqlContext = new SQLContext(sc) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set('myTopicName')) stream.foreachRDD( rdd => { val dataFrame = sqlContext....
void
1

votes
1

answer
44

Views

Is it possible to run Spark jobs (like the WordCount sample) in the local mode on Cygwin?

I am executing a simple spark code for reading a file from local system but getting error below is the code on cygwin console : val orders = sc.textFile('C:///DataResearch/retail_db/orders') orders.first() after executing orders.first() I am getting below error : java.lang.NullPointerException at ja...
Harshit Kakkar
1

votes
1

answer
252

Views

Pyspark Dataframe to Array RDD for KMEANS

I am trying to run Kmeans clustering algo in Spark 2.2. I am not able to find the correct input format. It gives TypeError: Cannot convert type into Vector error. I checked further that my inputrdd is an Row Rdd. CAn we convert it to an array RDD? This MLlib Doc says shows that we can pass a parall...
COSTA
1

votes
1

answer
83

Views

Differences: Object instantiation within mapPartitions vs outside

I'm a beginner to Apache Spark. Spark's RDD API offers transformation functions like map, mapPartitions. I can understand that map works with each element from the RDD but mapPartitions works with each partition and many people have mentioned mapPartitions are ideally used where we want to do object...
1

votes
2

answer
1.5k

Views

Subtract values of columns from two different data frames in PySpark to find RMSE

I am not able to figure it out. I am trying to calculate the RMSE between test and prediction data. test col1 col2 a 2 b 3 prediction col1 col2 a 4 b 5 I am trying to do this test(col2)-prediction(col2). That is 2-4 =-2 3-5 =-2 I tried test.select('col2').subtra...
Jerry George
1

votes
0

answer
51

Views

How to handle and convert null datetime field into unixtime stamp in Scala

I have some code snippet as below that is not accepted by Scala, it would be appreciated if someone can help to fix it, thanks. train_no_header is an RDD generated from a csv file, its first line is shown as below: scala> train_no_header.first res4: String = 87540,12,1,13,497,2017-11-07 09:30:38,,0...
Choix
1

votes
0

answer
118

Views

Get the fields and values from a Spark RDD

I have a spark RDD(org.apache.spark.rdd) made from a JSON string that looks like this : {'id':1 , 'text':'sample1'} I am using spray JSON in my application and I need to extract the keys into a JsArray (keys_jsArray) - (contains id, text). Also the values to be extracted into another JsArray. - (co...
Nagireddy Hanisha
1

votes
2

answer
378

Views

how to sort the rdd with mixed ascending and descending on multiple fields in Scala

So here is the data. Structure explained: CREATE TABLE products ( product_id int(11) NOT NULL AUTO_INCREMENT, product_category_id int(11) NOT NULL, product_name varchar(45) NOT NULL, product_description varchar(255) NOT NULL, product_price float NOT NULL, product_image varchar(255) NOT NULL,...
Choix
1

votes
0

answer
361

Views

Spark set executor's class loader

I have some RDD[String] and I need to perform some filtering on this RDD. I also have public class MyCustomClassLoader extends ClassLoader { } So it looks something like the following: val rdd: RDD[String] = //... rdd.filter(str => { //Here I need to set Thread.currentThread().setContextClassLoade...
St.Antario
1

votes
1

answer
121

Views

number of tuples limit in RDD; reading RDD throws arrayIndexOutOfBoundsException

I tried a modification of DF to RDD for a table containing 25 columns. Thereafter I came to know that Scala (until 2.11.8) has a limitation of a max of 22 tuples that could be used. val rdd = sc.textFile('/user/hive/warehouse/myDB.db/myTable/') rdd: org.apache.spark.rdd.RDD[String] = /user/hive/ware...
knowone
1

votes
2

answer
1.7k

Views

How to overwrite the rdd saveAsPickleFile(path) if file already exist in pyspark?

How to overwrite RDD output objects any existing path when we are saving time. test1: 975078|56691|2.000|20171001_926_570_1322 975078|42993|1.690|20171001_926_570_1322 975078|46462|2.000|20171001_926_570_1322 975078|87815|1.000|20171001_926_570_1322 rdd=sc.textFile('/home/administrator/work/test1'...
Sai
1

votes
1

answer
101

Views

groupByKey of RDD not getting passed through

Have a query regarding groupByKey on my RDD. Below is the query I'm trying: rdd3.map{ case(HandleMaxTuple(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18, col19, col20, col21, col22, col23, col24, col25)) => (HandleMaxTuple(col1,col...
knowone
1

votes
1

answer
192

Views

write pyspark Rdd or DF in neo4j

I'm experiencing some issue with py2neo and spark-driver since i could not insert node inside a foreach loop or map loop .Like the code below for exemple. from py2neo import authenticate, Graph, cypher, Node from pyspark import broadcast infos=df.rdd authenticate('localhost:7474', 'neo4j', 'admin')...
1

votes
1

answer
221

Views

Pyspark: Create a spark dataframe from RDD of lists where some elements of list are objects

I am trying to convert a pandas.DataFrame code to equivalent pyspark DataFrame. I have an RDD of the below format. myRdd = [[1, 'a', {'a':[1, 2]}], [2, 'b', {'c': 1, 'd':3}], [3, 'c', {}]] columnNames = ['sl', 'name', 'params'] The third element in the inner list does not have a specific structure....
hisham rahman
1

votes
0

answer
292

Views

Getting java.io.NotSerializableException: org.apache.spark.SparkContext Exception

(This question has many duplicates; I tried all those but didn't worked out for me. That's why I am asking a new question). I am trying to fetch the records from database and based on that value, I am calling one URL using curl command and saves the output to the database again. When I tried same th...
user6325753
1

votes
1

answer
136

Views

How to open a file in the rdd having path to this file?

I am working with sentinel images data on apache spark using Scala. At some step I filter metadata that contains specific location and for those data I want to open new file located in subfolder. Filter rdd contains key as path to file with globalmetadata and value as path to file which I would l...
1

votes
0

answer
188

Views

Not able to collect data from an RDD

This is my spark configuration for spark-context in Clojure. I am running Spark 2.2.1 and flambo 0.8.2. Flambo is a Clojure DSL for Apache Spark and flambo-0.8x is compatible with Spark-2x. (ns some-namespace (:require [flambo.api :as f] [flambo.conf :as conf])) ;; Spark-configurations (def c (-> (c...
Abhishek B Jangid
1

votes
0

answer
31

Views

Optimize Kmeans Clustering Code in Pyspark RDD

This is a very lengthy problem. This is my code for kmeans clustering. for 10 clusters it took me more than an hour to compute the results. Instead of using for and while loops as in my code, can I make the following code faster with RDD(map,flatmap,filter) etc? If so , where should I make the chan...
Jerry George
1

votes
0

answer
926

Views

xxx is not a valid external type for schema of string

I try to use the sparksession createDataframe function and I keep getting the error like. xxx is not a valid external type for schema of string. xxx can be Arrays$ArrayList and Ljava.lang.String. This is the code of creating row Row row = RowFactory.create(new String[][] {s.split(',')}); I make the...
Mike Wang
1

votes
0

answer
48

Views

Cassandra join only on secondary index

Actualy i'm using spark and cassandra to run some jobs. I would to join an RDD with cassandra using joinWithCassandraTable. In my case i need to do it only with secondary index but it's look like i have to add the partitionkey as well. Myrdd.repartitionByCassandraReplica(keyspace, table) .joinWithC...
Dimac
1

votes
0

answer
392

Views

Pairwise similarity calculation in PySpark RDD takes forever

I have an RDD (called 'data') where each row is an id/vector pair, like so: [('1', array([ 0.16501912, -0.25183533, -0.07702908, 0.07335572, 0.15868553])), ('2', array([ 0.01280832, -0.27269777, 0.09148506, 0.03950897, 0.15832097])), I need to calculate pair-wise similarity for this RDD, compar...
user3490622
1

votes
1

answer
143

Views

Spark RDD recursive operations on simple collection

I have users informations in an RDD : (Id:10, Name:bla, Adress:50, ...) And I have another collection containing the successive change of identity we gathered for each user. (lastId, newId) (10, 43) (85, 90) (43, 50) I need to get the last identity for each user's id, in this example : getFinalIdent...
Jean Wisser
1

votes
1

answer
849

Views

Spark: sort the list by string [duplicate]

This question already has an answer here: Spark get collection sorted by value 10 answers In the below code I am able to arrange the list by 'sortByKey'. However, requirement is to sort by string in alphabetical orders. That is expected output should be: Array((3,book),(2,cat),(7,cup),(5,heater),(...
vikas
1

votes
0

answer
119

Views

How to Call AWS Lambda Service from Spark

I would like to take each row of my Spark dataframe and process the data through a microservice that returns a new value (list) that I can then add as a new column. Since the dataframe will be processed concurrently, my thought was to structure this as a UDF that then calls a amazon lambda microserv...
SriK
1

votes
1

answer
73

Views

Analysis of Movie Ratings percentages across Occupation and Movie Genre

I just started learning Spark Programming and Python programming: Can you please help me understand my mistake in my code: I am running code in Jupyter Notebooks, interactive mode. The below test code is working fine, where I tested the concept: rdd = sc.parallelize([('librarian', (1, [0, 0, 1, 0, 1...
Vinod
1

votes
0

answer
118

Views

delete hbase on spark rdd,Task not serializable

i want to delete data from hbase with rdd, here are my codes def delGraphIdVertexInfo(vertexInfoRDD: RDD[(BqsVertex, String)]): Unit = { vertexInfoRDD.foreachPartition(partition => { val hc = HBaseConfiguration.create() val cn = ConnectionFactory.createConnection(hc) val userTable = TableName.valueO...
user7687835
1

votes
0

answer
20

Views

RDD map+sortByKey - Error comprehension list python

He Guys. I have been receiving an error when I do something like this: RDD = [(0, [1,2,3], 2),(1, [2,3,4], 4),(2, [4,5,6], 6),(3, [6,7,8], 8)] Dados = [4,5,6] TesteEuclediana = RDD.map(lambda x: (x[0], [math.sqrt((x[1][i][1]- Dados[i])^2) for i in range (0, int(len(Dados)))], x[2]) ).sortByKey() I c...

View additional questions