Questions tagged [apache-spark]

22247 questions
0

votes
0

answer
4

Views

Writing to DSE graph from EMR

We are trying to write to write to a DSE graph (cassandra) from EMR and keep getting these errors. My JAR is a shaded jar with the byos dependencies. Any help would be appreciated. java.lang.UnsatisfiedLinkError: org.apache.cassandra.utils.NativeLibraryLinux.getpid()J at org.apache.cassandra.utils.N...
mat77
1

votes
2

answer
484

Views

PySpark Structured Streaming: Pass output of Query to API endpoint

I have the following dataframe in Structured Streaming: TimeStamp|Room|Temperature| 00:01:29 | 1 | 55 | 00:01:34 | 2 | 51 | 00:01:36 | 1 | 56 | 00:02:03 | 2 | 49 | I am trying to detect when temperatures fall below a certain temperature (50 in this case). I have t...
DataGeek
1

votes
1

answer
848

Views

EXCEPT on Specific columns Spark 1.6

I'm trying to filter out rows from dfA using dfB. dfA: +----+---+----+------------+-----+ |year|cid|X| Y|Z| +----+---+----+------------+-----+ +----+---+----+------------+-----+. dfB: +----+---+ |year|cid| +----+---+ +----+---+ My goal is to fillter all couples year cid in dfB from dfA. I see...
RefiPeretz
1

votes
1

answer
728

Views

How do I select an ambiguous column reference? [duplicate]

This question already has an answer here: Enable case sensitivity for spark.sql globally 1 answer Here's some sample code illustrating what I'm trying to do. There is a dataframe with columns companyid and companyId. I want to select companyId, but the reference is ambiguous. How do I unambiguously...
chris.mclennon
1

votes
1

answer
684

Views

Consuming RESTful API and converting to Dataframe in Apache Spark

I am trying to convert output of url directly from RESTful api to Dataframe conversion in following way: package trials import org.apache.spark.sql.SparkSession import org.json4s.jackson.JsonMethods.parse import scala.io.Source.fromURL object DEF { implicit val formats = org.json4s.DefaultFormats ca...
Utkarsh Saraf
1

votes
1

answer
1.7k

Views

Spark Dataframe to Kafka

I am trying to stream the Spark Dataframe to Kafka consumer. I am unable to do , Can you please advice me. I am able to pick the data from Kafka producer to Spark , and I have performed some manipulation, After manipulating the data , I am interested to stream it back to Kafka (Consumer).
1

votes
3

answer
4k

Views

How to create an empty dataFrame in Spark

I have a set of Avro based hive tables and I need to read data from them. As Spark-SQL uses hive serdes to read the data from HDFS, it is much slower than reading HDFS directly. So I have used data bricks Spark-Avro jar to read the Avro files from underlying HDFS dir. Everything works fine except w...
Vinay Kumar
0

votes
0

answer
4

Views

Spark-Scala Log Rotation Issue, Unable to create External Log

Spark-Scala Log Rotation Issue, Unable to create External Log : Unable to create log rotation using RollingFileAppender & ConsoleAppender: i have received below WARN during execution, unable to create any log file in following location. Still using default log file. ++++++++++++++++++++++++++++++++...
Srini K
1

votes
1

answer
490

Views

Spark dataframe calculate the row-wise minimum [duplicate]

This question already has an answer here: Comparing columns in Pyspark 4 answers Row aggregations in Scala 1 answer I'm trying to put the minimum value of a few columns into a separate column. (Creating the min column). The operation is pretty straight forward but I wasn't able to find the right f...
1

votes
2

answer
374

Views

How to check if key exists in spark sql map type

So I have a table with one column of map type (the key and value are both strings). I'd like to write spark sql like this to check if given key exists in the map. select count(*) from my_table where map_contains_key(map_column, 'testKey') I couldn't find any existing spark sql functions that can do...
seiya
1

votes
2

answer
698

Views

Spark collect_list and limit resulting list

I have a dataframe of the following format: name merged key1 (internalKey1, value1) key1 (internalKey2, value2) ... key2 (internalKey3, value3) ... What I want to do is group the dataframe by the name, collect the list and limit the size of the list. This is how i group by the name...
pirox22
1

votes
1

answer
116

Views

Connecting to sql database from Spark

I am trying to connect to a SQL database from spark and I have used the following commands: scala> import org.apache.spark.sql.SQLContext...
Mahadevan Swamy
0

votes
0

answer
4

Views

Is there update operator support on MongoDB connector for Spark?

Using: Spark / MongoDB connector for Spark / Scala Hi Is there update operators support on MongoDB connector for Spark? I'd like to use update docuemnt query(updateOne query) with $inc, $currentDate, $setOnInsert update operators and update option(like upsert = true). I already did with mongo-scala...
gingermanjh
-1

votes
3

answer
23

Views

Spark - Map flat dataframe to a configurable nested json schema

I have a flat dataframe with 5-6 columns. I want to nest them and convert it into a nested dataframe so that I can then write it to parquet format. However, I don't want to use case classes as I am trying to keep the code as configurable as possible. I'm stuck with this part and need some help. My...
devnong
1

votes
1

answer
27

Views

How to properly apply HashPartitioner before a join in Spark?

To reduce shuffling during the joining of two RDDs, I decided to partition them using HashPartitioner first. Here is how I do it. Am I doing it correctly, or is there a better way to do this? val rddA = ... val rddB = ... val numOfPartitions = rddA.getNumPartitions val rddApartitioned = rddA.partiti...
MetallicPriest
1

votes
2

answer
40

Views

Sql Between Clause with multiple columns

I am trying to get the dates between JAN 2016 and March 2018. I have used the below query. But it is not returning proper results. The input columns will come at runtime. We cannot convert it into the date as the input columns can contain quarter. SELECT Year,Month,DayOfMonth FROM period WHERE (Y...
dassum
1

votes
2

answer
79

Views

How to expose a headless service for a StatefulSet cassandra cluster externally in Kubernetes

I'm setting up multi node cassandra cluster in kubernetes (Azure AKS),Since this a headless service with statefull set pods without having a external IP. How can i connect my spark application with cassandra which is in kubernetes cluster We have tried with cluster ip,ingress ip also but only single...
suraj1287
1

votes
2

answer
33

Views

How to create an unique autogenerated Id column in a spark dataframe

I have a dataframe where I have to generate a unique Id in one of the columns. This id has to be generated with an offset. Because , I need to persist this dataframe with the autogenerated id , now if new data comes in the autogenerated id should not collide with the existing ones. I checked the mon...
Ayan Biswas
1

votes
2

answer
59

Views

Sum columns of a Spark dataframe and create another dataframe

I have a dataframe like below - I am trying to create another dataframe from this which has 2 columns - the column name and the sum of values in each column like this - So far, I've tried this (in Spark 2.2.0) but throws a stack trace - val get_count: (String => Long) = (c: String) => { df.groupB...
van_d39
1

votes
2

answer
43

Views

case when otherwise dataframe spark

I wrote this : val result = df.withColumn('Ind', when($'color' === 'Green', 1).otherwise(0)) And I want to extend the condition $'color' === 'Green' to $'color' in ['GREEN', 'RED', 'YELLOW'] Any idea how to do this please ?
scalacode
1

votes
1

answer
58

Views

How spark loads the data into memory

I have total confusion in the spark execution process. I have referred may articles and tutorials, nobody is discussing in detailed. I might be wrongly understanding spark. Please correct me. I have my file of 40GB distributed across 4 nodes (10GB each node) of the 10 node cluster. When I say spark....
Learner
1

votes
1

answer
49

Views

spark scala create column from Dataframe with values dependent on date time range

I'm trying to create a new column from a dataframe that lets say looks like names|birthtime-datetime| joe|2017-03-29 2:23:38| mike|2017-03-29 3:53:38| mary|2017-03-29 11:63:38| ..... I want to add a column that based on if the DateTime column is in a range gets a int. let's say in this case there ar...
Brian
1

votes
1

answer
40

Views

Spark SQL lazy count

I need to use a dataframe count as divisor for calculating percentages. This is what I'm doing: scala> val df = Seq(1,1,1,2,2,3).toDF('value') scala> val overallCount = df.count scala> df.groupBy('value') .agg( count(lit(1)) / overallCount ) But I would like to avoid the action df.count as it will...
Pedro H
1

votes
1

answer
32

Views

Scala Spark: How to pad a sublist inside a dataframe with extra values?

Say I have a dataframe, originalDF, which looks like this +--------+--------------+ |data_id |data_list | +--------+--------------+ | 3| [a, b, d] | | 2|[c, a, b, e] | | 1| [g] | +--------+--------------+ And I have another dataframe, extraInfoDF, which looks like...
JR3652
1

votes
2

answer
47

Views

Spark Exception : Task failed while writing rows (Spark on Kuberenetes)

I have Apache Spark 2.4.1 environment on Kubernetes(Azure Kubernetes Service). The Spark container image is made from official binary file(spark-2.4.1-bin-hadoop2.7.tgz). It works fine on example program(e.g. PI calculation). But I use my Scala program that use MlLib and save Word2Vec model, Spark r...
Zollverein
1

votes
2

answer
31

Views

filter or label rows based on a Scala array

Is there a way to filter or label rows based on a Scala array? Please keep in mind in reality there the number of rows is much larger. sample data val clients= List(List('1', '67') ,List('2', '77') ,List('3', '56'),List('4','90')).map(x =>(x(0), x(1))) val df = clients.toDF('soc','ages') +---+----+...
Brian
0

votes
0

answer
3

Views

Not able to create local directory in EMR by user 'livy'

I am submitting a pyspark job into EMR cluster from a AWS Step Function through apache livy. The pyspark job has Unix shell commands being fired. Within test.py, subprocess.call(' echo $USER', shell=True, stdout=None, stderr=None) subprocess.call(' mkdir /mnt/data', shell=True, stdout=None, stderr=...
Parijat Bose
0

votes
0

answer
6

Views

how to handle exceptions in pyspark, when data is unproper order?

actually i am creating small RDD from some unorderd data, like it doesn't have same number of columns in each row. so i am taking it as tuple type with maximum line index. here what i am getting problem is when i am accessing tuple[4],tuple[9] like this some rows doesn't have 9 index and all, so in...
jodu
2

votes
0

answer
11

Views

How to modify a column based on the values in another column of a PySpark dataframe? F.when edge case

I'd like to go through each row in a pyspark dataframe, and change the value of a column based on the content of another column. The value I am changing it to is also based on the current value of the column to be changed. Specifically, I have a column which contains DenseVectors, and another column...
bmarks2010
1

votes
0

answer
4

Views

Testing a utility function by writing a unit test in apache spark scala

I have a utility function written in scala to read parquet files from s3 bucket. Could someone help me in writing unit test cases for this Below is the function which needs to be tested. // function to get parquet dataset from s3 def readParquetFile(spark: SparkSession, locationPath: String): DataFr...
wandermonk
0

votes
2

answer
25

Views

Scala Spark RDD Aggregate behaves weirdly

I have 2 seqOp functions given to aggregate which I expect to return identical results. They don't. This version works: rdd.aggregate(0)((acc, article) => (acc + (if (article.mentionsLanguage(lang)) 1 else 0)), _ + _) This version doesn't work: def seqOp(acc: Int, article: WikipediaArticle): Int = {...
Atte Juvonen
1

votes
1

answer
13

Views

How to convert a type Any List to a type Double (Scala)

I am new to Scala and I would like to understand some basic stuff. First of all, I need to calculate the average of a certain column of a DataFrame and use the result as a double type variable. After some Internet research I was able to calculate the average and at the same time pass it into a Lis...
Aris Kantas
1

votes
0

answer
8

Views

How to Add Additional Remote Repository to Zeppelin?

Using the following code in a Zepplin note, I was able to add a repository and dependency. How would I accomplish the same using zeppelin.dep.additionalRemoteRepository? %dep z.addRepo('hortonworks').url('http://repo.hortonworks.com/content/repositories/releases') z.load('com.hortonworks:shc-core:1....
Ari
1

votes
2

answer
1.4k

Views

Save object in cassandra using spark and java

Just started working with spark and cassandra in Java and I am already stuck with saving data to my cassandra database. here is the java bean class that i have public class User implements Serializable { public User(){} public User(String username, String password){ this.username = username; setPass...
chris
1

votes
2

answer
35

Views

Update a dataframe with nested fields - Spark

I have two dataframes like below Df1 +----------------------+---------+ |products |visitorId| +----------------------+---------+ |[[i1,0.68], [i2,0.42]]|v1 | |[[i1,0.78], [i3,0.11]]|v2 | +----------------------+---------+ Df2 +---+----------+ | id| name| +---+----------...
yAsH
1

votes
2

answer
4.1k

Views

Saving files in Spark

There are two operations on RDD to save. One is saveAsTextFile and other is saveAsObjectFile. I understand saveAsTextFile, but not saveAsObjectFile. I am new to Spark and scala and hence I am curious about saveAsObjectFile. 1) Is it sequence file from Hadoop or some thing different? 2) Can I read...
1

votes
2

answer
1.1k

Views

How to compare two StructType sharing same contents?

It seems like StructType preserves order, so two StructType containing same StructFields are not considered equivalent. For example: val st1 = StructType( StructField('ii',StringType,true) :: StructField('i',StringType,true) :: Nil) val st2 = StructType( StructField('i',StringType,true) :: Struc...
THIS USER NEEDS HELP
1

votes
1

answer
1.8k

Views

How to get best params after tuning by pyspark.ml.tuning.TrainValidationSplit?

I'm trying to tune the hyper-parameters of a Spark (PySpark) ALS model by TrainValidationSplit. It works well, but I want to know which combination of hyper-parameters is the best. How to get best params after evaluation ? from pyspark.ml.recommendation import ALS from pyspark.ml.tuning import Train...
takaomag
1

votes
1

answer
555

Views

What is difference between calling someRDD.collect.foreach(println) vs someRDD.foreach(println)

I have created an RDD from a csv file when i am calling rdd.collect.foreach(println) it returns file as it is but rdd.foreach(println) returns merged output. There are two partions on the RDD. val sc = new SparkContext('local[*]', 'WordCount') val cities = sc.textFile('C:/Users/PSKUMARBEHL/Desktop/u...
SunTech
2

votes
1

answer
112

Views

Spark AnalysisException when “flattening” DataFrame in Spark SQL

I'm using the approach given here to flatten a DataFrame in Spark SQL. Here is my code: package com.acme.etl.xml import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, SparkSession} object RuntimeError { def main(args: Array[String]): Unit = { val spark = SparkSession.builder(...
Paul Reiners

View additional questions