Questions tagged [spark-dataframe]
2168 questions
1
votes
1
answer
728
Views
How do I select an ambiguous column reference? [duplicate]
This question already has an answer here:
Enable case sensitivity for spark.sql globally
1 answer
Here's some sample code illustrating what I'm trying to do. There is a dataframe with columns companyid and companyId. I want to select companyId, but the reference is ambiguous. How do I unambiguously...
1
votes
0
answer
41
Views
How to make Spark Worker read data from local mongodb with mongodb-spark-connector?
I have got two 'mongodb' on two computers. And there is also a 'Spark Worker' on each computer. But when I run 'spark', it doesn't read data from its local 'mongodb'. Instead, it reads from one of them. Therefore, only got partial data.
There is a page. https://docs.mongodb.com/spark-connector/maste...
1
votes
0
answer
41
Views
Is there intermediate computation optimization when using functions.window [Spark]
I am using functions.window to create sliding window computation using Spark and Java. Example code:
Column slidingWindow = functions.window(singleIPPerRow.col('timestamp'), '3 hours', '1 seconds');
Dataset aggregatedResultsForWindow = singleIPPerRow.groupBy(slidingWindow, singleIPPerRow.col('area')...
1
votes
0
answer
80
Views
Spark Sql z-score calculation with given peer group in Java-8
How to calculate z-scores in Spark sql in java-8?
I tried understanding other posts with Window functions, but in my case peer group will be 100.
1
votes
0
answer
250
Views
How to convert DataFrame to javaRdd with distibuted copy?
I'm new to spark optimizaiton.
I'm trying to read hive data into a dataFrame. Then I'm converting the dataFrame to javaRdd and running a map function on top of it.
The problem I'm facing is, the transformation running on top of javaRdd is running with single task. Also the transformations running on...
1
votes
1
answer
431
Views
Get date months with iteration over Spark dataframe
I have a problem case to iterate for last 36 months based on an input date. Currently using Scala, through a DataFrame I am getting the max value of a timestamp field. For example:
val vGetDate = hc.read.format('filodb.spark').option('database','YYYYY').option('dataset','XXX').load().agg(max('inv_da...
1
votes
1
answer
317
Views
Spark | For a synchronous request/response use case
Spark Newbie alert. I've been exploring the ideas to design a requirement which involves the following:
Building a base predictive model for Linear Regression(One off activity)
Pass the data points to get the value for the response variable.
Do something with the result.
At regular intervals update...
1
votes
3
answer
221
Views
Python Pandas: How to remove all columns from dataframe that contains the values in a list?
include_cols_path = sys.argv[5]
with open(include_cols_path) as f:
include_cols = f.read().splitlines()
include_cols is a list of strings
df1 = sqlContext.read.csv(input_path + '/' + lot_number +'.csv', header=True).toPandas()
df1 is a dataframe of a large file. I would like to only retain the colum...
1
votes
1
answer
25
Views
not able to store result in hdfs when code runs for second iteration
Well I am new to spark and scala and have been trying to implement cleaning of data in spark. below code checks for the missing value for one column and stores it in outputrdd and runs loops for calculating missing value. code works well when there is only one missing value in file. Since hdfs does...
1
votes
1
answer
675
Views
PySpark - Calling a sub-setting function within a UDF
I have to find neighbors of a specific data point in a pyspark dataframe.
a= spark.createDataFrame([('A', [0,1]), ('B', [5,9]), ('D', [13,5])],['Letter', 'distances'])
I have created this function that will take in the dataframe (DB) and then check the closest data points to a fixed point (Q) using...
1
votes
0
answer
45
Views
How to calculate connections of the node in Spark 2
I have the following DataFrame df:
val df = Seq(
(1, 0, 1, 0, 0), (1, 4, 1, 0, 4), (2, 2, 1, 2, 2),
(4, 3, 1, 4, 4), (4, 5, 1, 4, 4)
).toDF('from', 'to', 'attr', 'type_from', 'type_to')
+-----+-----+----+---------------+---------------+
|from |to |attr|type_from |type_to |
+-----+-----...
1
votes
1
answer
1.3k
Views
How to relationalize a JSON to flat structure in AWS Glue
Trying to flatten input JSON data having two map/dictionary fields (custom_event1 and custom_event2), which may contain any key-value pair data. In order to create an output table from the data frame, will have to avoid the flattening of custom_events and store it as JSON string in the column.
Follo...
1
votes
1
answer
910
Views
Save spark dataframe schema to hdfs
For a given data frame (df) we get the schema by df.schema, which is a StructType array. Can I save just this schema onto hdfs, while running from spark-shell? Also, what would be the best format in which the schema should be saved?
1
votes
0
answer
184
Views
Spark : Why some executors are having 0 active tasks and some 13 tasks?
I am trying to read from s3 and do a count on the data frame. I have a cluster of 76 r3.4xlarge(1 master and 75 slaves). I set :
spark.dynamicAllocation.enabled 'true'
maximizeResourceAllocation 'true'
When I checked the Spark UI, I am just seeing :
Just 25 executors - in that only 7 have active ta...
1
votes
0
answer
560
Views
Spark job dataframe write to Oracle using jdbc failing
When writing spark dataframe to Oracle database (Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit), the spark job is failing with the exception java.sql.SQLRecoverableException: IO Error: The Network Adapter could not establish the connection. The scala code is
dataFrame.write.mode(...
1
votes
1
answer
350
Views
spark-excel dataype issues
I am using spark-excel package for processing ms excel files using spark 2.2. Some of the files are getting failed to load as a spark dataframe with below exception. If someone have faced this issue can you please help to fix such data type issues?
After analyzing I found at that if column name is n...
1
votes
0
answer
189
Views
Data manipulation in PySpark [duplicate]
This question already has an answer here:
How to melt Spark DataFrame?
3 answers
Unpivot in spark-sql/pyspark
1 answer
I have a dataframe A, whose content is as below:
site id date1 date2
A 4/14/2001 1/1/1997
B 3/04/2000 4/8/1999
I want to pivot down data and store i...
1
votes
1
answer
957
Views
How to read nested json objects fields into Scala case classes with Spark
I have a tweets json file whose structure is as shown below. (This is a sample of just one tweet from my tweets file). I need to read it in as JSON with spark and transform it into a case class with the below scala code. I need to read specific fields of the json files which are nested. I particular...
1
votes
1
answer
2.1k
Views
Adding a Arraylist value to a new column in Spark Dataframe using Pyspark [duplicate]
This question already has an answer here:
How to add a constant column in a Spark DataFrame?
3 answers
I want add a new column in my existing dataframe. Below is my dataframe -
+---+---+-----+
| x1| x2| x3|
+---+---+-----+
| 1| a| 23.0|
| 3| B|-23.0|
+---+---+-----+
I am able to add df = df....
1
votes
0
answer
86
Views
How spark works in terms of Persisting, Spilling To Disk, Partitioning ? Is Spark not a Better option always compared to HIVE?
Using Spark 2.1.0.
What is the Difference between Spark Spilling on Disk and Persist/Caching. I mean how they work.
Spilling to Disk means when a task is working on some partition & that is too big to fit into the memory, It spills to Disk. Is this Correct?
Persist/Caching is when we have done som...
1
votes
1
answer
488
Views
Spark Dataset appending unique ID
I'm looking whether there is an 'already implemented alternative' to append unique ID on a spark dataset.
My scenario:
I have an incremental job that runs each day processing a batch of information. In this job, I create a dimension table of something and assign unique IDs to each row using monotoni...
1
votes
0
answer
48
Views
Best performance when using multiple DataFrame's
I am working with a master list DataFrame and then merging and removing data based on a change list MasterFrame.
I am very new to Scala so I have no doubt that I have not done things the most efficient way!
Basically just trying to establish how I can spend up the performance of the final DF as I h...
1
votes
0
answer
264
Views
Spark Streaming groupBy on RDD vs Structured Streaming groupBy on DF (Scala/Spark)
We have a high volume streaming job (Spark/Kafka) and the data (avro) needs to be grouped by a timestamp field inside the payload. We are doing groupBy on RDD to achieve this: RDD[Timestamp, Iterator[Records]]. This works well for decent volume records. But for loads like 150k every 10 seconds, the...
1
votes
0
answer
63
Views
Data Workflow - Trouble aggregating 50 million rows in R?
The data set begins as 12 monthly csv files. Each file is loaded into R to work with. One step is to find all combinations of a certain factor group which results in 4 million rows. I have 16gb RAM on my computer, which with other tasks, is not enough to store all 12x4million rows. The end goal is t...
1
votes
1
answer
329
Views
Does it help to filter down a dataframe before a left outer join?
I've only seen sources say that this helps for RDDs, so I was wondering if it helped for DataFrames since the Spark core and spark SQL engines optimize differently.
Let's say table 1 has 6mil records and we're joining to table 2 which has 600mil records. We are joining these two tables on table 2'...
1
votes
0
answer
163
Views
Performance issue with spark Dataframe, each iteration takes longer
I´m using Spark 2.2.1 with Scala 2.11.12 version as a language to generate a recursive algorithm. First, I tried an implementation using RDD but the time when I used a lot of data was too much. I have made a new version using DataFrames but with very little data it takes too much, taking less data...
1
votes
1
answer
289
Views
Spark version 2.0 Streaming : how to dynamically infer the schema of a JSON String rdd and convert it to a DF
With versions prior to 2.0, I could use an SQLContext to do the same like this:
val sqlContext = new SQLContext(sc)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set('myTopicName'))
stream.foreachRDD(
rdd => {
val dataFrame = sqlContext....
1
votes
1
answer
178
Views
Schema for type TypeTag[java.sql.Timestamp] is not supported when creating Spark Dataset
Hi I have the following code which tries to create DataFrame from Seq[T]
case class CaseConvert[T: TypeTag](a: T)
def createDf[T: TypeTag](data: Seq[T]): DataFrame = {
spark.createDataFrame(data.map(CaseConvert[T])
}
When above createDf method executed by passing type say Seq[java.sql.Timestamp] it...
1
votes
1
answer
800
Views
Spark Driver does not release memory when using spark.sql.autoBroadcastJoinThreshold
I have came across abnormal behaviour,
I have a query (inside loop) in which I have inner joins over 5 tables one with around 200MB and all other are under 10MB (All persisted at the start of loop, and unpersisted at the end of loop).
Whenever I use spark.sql.autoBroadcastJoinThreshold (tried defau...
1
votes
0
answer
47
Views
How to orderBy desc UTC time stamp column in spark scala
I need to order by desc for specific column in my spark data frame .
For example here is the date time format that i want to reorder
'2017-11-21T12:09:23+00:00'
'2017-11-21T12:18:55+00:00'
'2017-11-21T11:41:14+00:00'
This is what i am doing to reorder
val windowSpec = Window.partitionBy('Source_or...
1
votes
0
answer
251
Views
Spark pivot-ing for large number of distinct values
I have a 5GB csv file, with the following columns:
ColA ColB Date Value
There are aprox. 11k unique ColA values, and about 400 unique ColB values.
I am trying to pivot the table, using Spark, so I get to the following structure:
ColA Date ColB_1 ColB_2 ColB_3 .. ColB_n
Value_1 Value_2 Value_3 .....
1
votes
2
answer
451
Views
Create a new spark dataframe that contains pairwise combinations of another dataframe?
Consider the following code
question = spark.createDataFrame([{'A':1,'B':5},{'A':2,'B':5},
{'A':3,'B':5},{'A':3,'B':6}])
#+---+---+
#| A| B|
#+---+---+
#| 1| 5|
#| 2| 5|
#| 3| 5|
#| 3| 6|
#+---+---+
How can I create a spark dataframe that looks as follows :
solution = spark.createDataFram...
1
votes
1
answer
584
Views
Spark and Scala, add new column with value from another dataframe by mapping common key [duplicate]
This question already has an answer here:
How to join two DataFrames and change column for missing values?
3 answers
How to do left outer join in spark sql?
3 answers
I have 2 dataframes.
df1 =
dep-code rank
abc 1
bcd 2
df2 =
some cols... dep-code
abc
bcd
abc
I want to add n...
1
votes
1
answer
296
Views
Spark Stateful Streaming with DataFrame
Is it possible to use DataFrame as a State / StateSpec for Spark Streaming? The current StateSpec implementation seems to allow only key-value pair data structure (mapWithState etc..).
My objective is to keep a fixed size FIFO buffer as a StateSpec that gets updated every time new data streams in. I...
1
votes
1
answer
95
Views
Pyspark dataframe: Counting of unique values in a column, independently co-ocurring with values in other columns
I have a spark data frame comprising billions of predictions of interactions between two types of molecules, Regulators and Targets (there is no overlap between these) as obtained from various Sources. I need to add a column
containing number resources that predict at least one interaction for the g...
1
votes
1
answer
175
Views
Spark write csv fail through zeppelin
Spark 2.1.1.2.6.2.0-205
Zeppelin 0.7.3
I'm running a piece of code on a certain machine, say server-A. The code finishes just fine.
When I run the same code through Zeppelin (connected to the same machine), when it gets to this line (it's python):
df.coalesce(1).write.csv(remote_path, header=True)
I...
1
votes
1
answer
540
Views
Write dataframe to multiple tabs in a excel sheet using Spark
I have been using Spark-excel(https://github.com/crealytics/spark-excel) to write the output to a single sheet of an excel sheet.However, I am unable to write the output to different sheets(tabs).
Can anyone suggest any alternative?
Thanks,
Sai
1
votes
1
answer
49
Views
parallelism in spark sql when list of tables are need to be processed
Using Spark 1.6.0 , cdh 5.7.0
I have a csv file which has list of tables to be processed and I want to achieve parallelism in processing.
As of now I am using collect to process each , tried using future option in scala and even tried this https://blog.knoldus.com/2015/10/21/demystifying-asynchron...
1
votes
0
answer
213
Views
Inconsistent results in pyspark
Running the following code repeatedly generates inconsistent results. So far, I have only seen two outputs. The results get repeated any random number of times before switching to the other results, which then also repeat any random number of times before switching back again.
Why is this happenin...
1
votes
0
answer
231
Views
Process a 1/2 billion rows with PySpark creates shuffle read problems
I am apparently facing a read shuffle problems.
My Pyspark Script is running on a Hadoop cluster 1 EdgeNode and 12 Datanodes, using YARN as resources manager and Spark 1.6.2.
###[ini_file containing conf spark]
spark.app.name = MY_PYSPARK_APP
spark.master = yarn-client
spark.yarn.queue = agr_queue
s...