Questions tagged [spark-dataframe]

1

votes
1

answer
728

Views

How do I select an ambiguous column reference? [duplicate]

This question already has an answer here: Enable case sensitivity for spark.sql globally 1 answer Here's some sample code illustrating what I'm trying to do. There is a dataframe with columns companyid and companyId. I want to select companyId, but the reference is ambiguous. How do I unambiguously...
chris.mclennon
1

votes
0

answer
41

Views

How to make Spark Worker read data from local mongodb with mongodb-spark-connector?

I have got two 'mongodb' on two computers. And there is also a 'Spark Worker' on each computer. But when I run 'spark', it doesn't read data from its local 'mongodb'. Instead, it reads from one of them. Therefore, only got partial data. There is a page. https://docs.mongodb.com/spark-connector/maste...
BobXWu
1

votes
0

answer
41

Views

Is there intermediate computation optimization when using functions.window [Spark]

I am using functions.window to create sliding window computation using Spark and Java. Example code: Column slidingWindow = functions.window(singleIPPerRow.col('timestamp'), '3 hours', '1 seconds'); Dataset aggregatedResultsForWindow = singleIPPerRow.groupBy(slidingWindow, singleIPPerRow.col('area')...
Anton.P
1

votes
0

answer
80

Views

Spark Sql z-score calculation with given peer group in Java-8

How to calculate z-scores in Spark sql in java-8? I tried understanding other posts with Window functions, but in my case peer group will be 100.
Suresh Polisetti
1

votes
0

answer
250

Views

How to convert DataFrame to javaRdd with distibuted copy?

I'm new to spark optimizaiton. I'm trying to read hive data into a dataFrame. Then I'm converting the dataFrame to javaRdd and running a map function on top of it. The problem I'm facing is, the transformation running on top of javaRdd is running with single task. Also the transformations running on...
Makubex
1

votes
1

answer
431

Views

Get date months with iteration over Spark dataframe

I have a problem case to iterate for last 36 months based on an input date. Currently using Scala, through a DataFrame I am getting the max value of a timestamp field. For example: val vGetDate = hc.read.format('filodb.spark').option('database','YYYYY').option('dataset','XXX').load().agg(max('inv_da...
Rajdip
1

votes
1

answer
317

Views

Spark | For a synchronous request/response use case

Spark Newbie alert. I've been exploring the ideas to design a requirement which involves the following: Building a base predictive model for Linear Regression(One off activity) Pass the data points to get the value for the response variable. Do something with the result. At regular intervals update...
user1189332
1

votes
3

answer
221

Views

Python Pandas: How to remove all columns from dataframe that contains the values in a list?

include_cols_path = sys.argv[5] with open(include_cols_path) as f: include_cols = f.read().splitlines() include_cols is a list of strings df1 = sqlContext.read.csv(input_path + '/' + lot_number +'.csv', header=True).toPandas() df1 is a dataframe of a large file. I would like to only retain the colum...
Cody
1

votes
1

answer
25

Views

not able to store result in hdfs when code runs for second iteration

Well I am new to spark and scala and have been trying to implement cleaning of data in spark. below code checks for the missing value for one column and stores it in outputrdd and runs loops for calculating missing value. code works well when there is only one missing value in file. Since hdfs does...
Yogesh Patel
1

votes
1

answer
675

Views

PySpark - Calling a sub-setting function within a UDF

I have to find neighbors of a specific data point in a pyspark dataframe. a= spark.createDataFrame([('A', [0,1]), ('B', [5,9]), ('D', [13,5])],['Letter', 'distances']) I have created this function that will take in the dataframe (DB) and then check the closest data points to a fixed point (Q) using...
Bryce Ramgovind
1

votes
0

answer
45

Views

How to calculate connections of the node in Spark 2

I have the following DataFrame df: val df = Seq( (1, 0, 1, 0, 0), (1, 4, 1, 0, 4), (2, 2, 1, 2, 2), (4, 3, 1, 4, 4), (4, 5, 1, 4, 4) ).toDF('from', 'to', 'attr', 'type_from', 'type_to') +-----+-----+----+---------------+---------------+ |from |to |attr|type_from |type_to | +-----+-----...
Markus
1

votes
1

answer
1.3k

Views

How to relationalize a JSON to flat structure in AWS Glue

Trying to flatten input JSON data having two map/dictionary fields (custom_event1 and custom_event2), which may contain any key-value pair data. In order to create an output table from the data frame, will have to avoid the flattening of custom_events and store it as JSON string in the column. Follo...
Sumit Saurabh
1

votes
1

answer
910

Views

Save spark dataframe schema to hdfs

For a given data frame (df) we get the schema by df.schema, which is a StructType array. Can I save just this schema onto hdfs, while running from spark-shell? Also, what would be the best format in which the schema should be saved?
Ashwin
1

votes
0

answer
184

Views

Spark : Why some executors are having 0 active tasks and some 13 tasks?

I am trying to read from s3 and do a count on the data frame. I have a cluster of 76 r3.4xlarge(1 master and 75 slaves). I set : spark.dynamicAllocation.enabled 'true' maximizeResourceAllocation 'true' When I checked the Spark UI, I am just seeing : Just 25 executors - in that only 7 have active ta...
user3407267
1

votes
0

answer
560

Views

Spark job dataframe write to Oracle using jdbc failing

When writing spark dataframe to Oracle database (Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit), the spark job is failing with the exception java.sql.SQLRecoverableException: IO Error: The Network Adapter could not establish the connection. The scala code is dataFrame.write.mode(...
Abhishek Joshi
1

votes
1

answer
350

Views

spark-excel dataype issues

I am using spark-excel package for processing ms excel files using spark 2.2. Some of the files are getting failed to load as a spark dataframe with below exception. If someone have faced this issue can you please help to fix such data type issues? After analyzing I found at that if column name is n...
nilesh1212
1

votes
0

answer
189

Views

Data manipulation in PySpark [duplicate]

This question already has an answer here: How to melt Spark DataFrame? 3 answers Unpivot in spark-sql/pyspark 1 answer I have a dataframe A, whose content is as below: site id date1 date2 A 4/14/2001 1/1/1997 B 3/04/2000 4/8/1999 I want to pivot down data and store i...
Varun Chadha
1

votes
1

answer
957

Views

How to read nested json objects fields into Scala case classes with Spark

I have a tweets json file whose structure is as shown below. (This is a sample of just one tweet from my tweets file). I need to read it in as JSON with spark and transform it into a case class with the below scala code. I need to read specific fields of the json files which are nested. I particular...
Gakuo
1

votes
1

answer
2.1k

Views

Adding a Arraylist value to a new column in Spark Dataframe using Pyspark [duplicate]

This question already has an answer here: How to add a constant column in a Spark DataFrame? 3 answers I want add a new column in my existing dataframe. Below is my dataframe - +---+---+-----+ | x1| x2| x3| +---+---+-----+ | 1| a| 23.0| | 3| B|-23.0| +---+---+-----+ I am able to add df = df....
user7348570
1

votes
0

answer
86

Views

How spark works in terms of Persisting, Spilling To Disk, Partitioning ? Is Spark not a Better option always compared to HIVE?

Using Spark 2.1.0. What is the Difference between Spark Spilling on Disk and Persist/Caching. I mean how they work. Spilling to Disk means when a task is working on some partition & that is too big to fit into the memory, It spills to Disk. Is this Correct? Persist/Caching is when we have done som...
AJm
1

votes
1

answer
488

Views

Spark Dataset appending unique ID

I'm looking whether there is an 'already implemented alternative' to append unique ID on a spark dataset. My scenario: I have an incremental job that runs each day processing a batch of information. In this job, I create a dimension table of something and assign unique IDs to each row using monotoni...
Henrique Goulart
1

votes
0

answer
48

Views

Best performance when using multiple DataFrame's

I am working with a master list DataFrame and then merging and removing data based on a change list MasterFrame. I am very new to Scala so I have no doubt that I have not done things the most efficient way! Basically just trying to establish how I can spend up the performance of the final DF as I h...
Phil Baines
1

votes
0

answer
264

Views

Spark Streaming groupBy on RDD vs Structured Streaming groupBy on DF (Scala/Spark)

We have a high volume streaming job (Spark/Kafka) and the data (avro) needs to be grouped by a timestamp field inside the payload. We are doing groupBy on RDD to achieve this: RDD[Timestamp, Iterator[Records]]. This works well for decent volume records. But for loads like 150k every 10 seconds, the...
Shawn
1

votes
0

answer
63

Views

Data Workflow - Trouble aggregating 50 million rows in R?

The data set begins as 12 monthly csv files. Each file is loaded into R to work with. One step is to find all combinations of a certain factor group which results in 4 million rows. I have 16gb RAM on my computer, which with other tasks, is not enough to store all 12x4million rows. The end goal is t...
ben hulet
1

votes
1

answer
329

Views

Does it help to filter down a dataframe before a left outer join?

I've only seen sources say that this helps for RDDs, so I was wondering if it helped for DataFrames since the Spark core and spark SQL engines optimize differently. Let's say table 1 has 6mil records and we're joining to table 2 which has 600mil records. We are joining these two tables on table 2'...
Christopher Nguyen
1

votes
0

answer
163

Views

Performance issue with spark Dataframe, each iteration takes longer

I´m using Spark 2.2.1 with Scala 2.11.12 version as a language to generate a recursive algorithm. First, I tried an implementation using RDD but the time when I used a lot of data was too much. I have made a new version using DataFrames but with very little data it takes too much, taking less data...
José David Martín
1

votes
1

answer
289

Views

Spark version 2.0 Streaming : how to dynamically infer the schema of a JSON String rdd and convert it to a DF

With versions prior to 2.0, I could use an SQLContext to do the same like this: val sqlContext = new SQLContext(sc) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Set('myTopicName')) stream.foreachRDD( rdd => { val dataFrame = sqlContext....
void
1

votes
1

answer
178

Views

Schema for type TypeTag[java.sql.Timestamp] is not supported when creating Spark Dataset

Hi I have the following code which tries to create DataFrame from Seq[T] case class CaseConvert[T: TypeTag](a: T) def createDf[T: TypeTag](data: Seq[T]): DataFrame = { spark.createDataFrame(data.map(CaseConvert[T]) } When above createDf method executed by passing type say Seq[java.sql.Timestamp] it...
u449355
1

votes
1

answer
800

Views

Spark Driver does not release memory when using spark.sql.autoBroadcastJoinThreshold

I have came across abnormal behaviour, I have a query (inside loop) in which I have inner joins over 5 tables one with around 200MB and all other are under 10MB (All persisted at the start of loop, and unpersisted at the end of loop). Whenever I use spark.sql.autoBroadcastJoinThreshold (tried defau...
user9361799
1

votes
0

answer
47

Views

How to orderBy desc UTC time stamp column in spark scala

I need to order by desc for specific column in my spark data frame . For example here is the date time format that i want to reorder '2017-11-21T12:09:23+00:00' '2017-11-21T12:18:55+00:00' '2017-11-21T11:41:14+00:00' This is what i am doing to reorder val windowSpec = Window.partitionBy('Source_or...
1

votes
0

answer
251

Views

Spark pivot-ing for large number of distinct values

I have a 5GB csv file, with the following columns: ColA ColB Date Value There are aprox. 11k unique ColA values, and about 400 unique ColB values. I am trying to pivot the table, using Spark, so I get to the following structure: ColA Date ColB_1 ColB_2 ColB_3 .. ColB_n Value_1 Value_2 Value_3 .....
cristi.calugaru
1

votes
2

answer
451

Views

Create a new spark dataframe that contains pairwise combinations of another dataframe?

Consider the following code question = spark.createDataFrame([{'A':1,'B':5},{'A':2,'B':5}, {'A':3,'B':5},{'A':3,'B':6}]) #+---+---+ #| A| B| #+---+---+ #| 1| 5| #| 2| 5| #| 3| 5| #| 3| 6| #+---+---+ How can I create a spark dataframe that looks as follows : solution = spark.createDataFram...
Evan
1

votes
1

answer
584

Views

Spark and Scala, add new column with value from another dataframe by mapping common key [duplicate]

This question already has an answer here: How to join two DataFrames and change column for missing values? 3 answers How to do left outer join in spark sql? 3 answers I have 2 dataframes. df1 = dep-code rank abc 1 bcd 2 df2 = some cols... dep-code abc bcd abc I want to add n...
Amey Totawar
1

votes
1

answer
296

Views

Spark Stateful Streaming with DataFrame

Is it possible to use DataFrame as a State / StateSpec for Spark Streaming? The current StateSpec implementation seems to allow only key-value pair data structure (mapWithState etc..). My objective is to keep a fixed size FIFO buffer as a StateSpec that gets updated every time new data streams in. I...
user9395367
1

votes
1

answer
95

Views

Pyspark dataframe: Counting of unique values in a column, independently co-ocurring with values in other columns

I have a spark data frame comprising billions of predictions of interactions between two types of molecules, Regulators and Targets (there is no overlap between these) as obtained from various Sources. I need to add a column containing number resources that predict at least one interaction for the g...
Thomas Turner
1

votes
1

answer
175

Views

Spark write csv fail through zeppelin

Spark 2.1.1.2.6.2.0-205 Zeppelin 0.7.3 I'm running a piece of code on a certain machine, say server-A. The code finishes just fine. When I run the same code through Zeppelin (connected to the same machine), when it gets to this line (it's python): df.coalesce(1).write.csv(remote_path, header=True) I...
Tiberiu
1

votes
1

answer
540

Views

Write dataframe to multiple tabs in a excel sheet using Spark

I have been using Spark-excel(https://github.com/crealytics/spark-excel) to write the output to a single sheet of an excel sheet.However, I am unable to write the output to different sheets(tabs). Can anyone suggest any alternative? Thanks, Sai
Bharath
1

votes
1

answer
49

Views

parallelism in spark sql when list of tables are need to be processed

Using Spark 1.6.0 , cdh 5.7.0 I have a csv file which has list of tables to be processed and I want to achieve parallelism in processing. As of now I am using collect to process each , tried using future option in scala and even tried this https://blog.knoldus.com/2015/10/21/demystifying-asynchron...
j pavan kumar
1

votes
0

answer
213

Views

Inconsistent results in pyspark

Running the following code repeatedly generates inconsistent results. So far, I have only seen two outputs. The results get repeated any random number of times before switching to the other results, which then also repeat any random number of times before switching back again. Why is this happenin...
Clay
1

votes
0

answer
231

Views

Process a 1/2 billion rows with PySpark creates shuffle read problems

I am apparently facing a read shuffle problems. My Pyspark Script is running on a Hadoop cluster 1 EdgeNode and 12 Datanodes, using YARN as resources manager and Spark 1.6.2. ###[ini_file containing conf spark] spark.app.name = MY_PYSPARK_APP spark.master = yarn-client spark.yarn.queue = agr_queue s...
Indy McCarthy

View additional questions