Questions tagged [pyspark-sql]

1

votes
1

answer
45

Views

Forward Fill New Row to Account for Missing Dates

I currently have a dataset grouped into hourly increments by a variable 'aggregator'. There are gaps in this hourly data and what i would ideally like to do is forward fill the rows with the prior row which maps to the variable in column x. I've seen some solutions to similar problems using PANDAS b...
ImNewToThis
1

votes
2

answer
85

Views

How to identify repeated occurrences of a string column in Hive?

I have a view like this in Hive: id sequencenumber appname 242539622 1 A 242539622 2 A 242539622 3 A 242539622 4 B 242539622 5 B 242539622 6 C 242539622...
Isaac
2

votes
0

answer
11

Views

How to modify a column based on the values in another column of a PySpark dataframe? F.when edge case

I'd like to go through each row in a pyspark dataframe, and change the value of a column based on the content of another column. The value I am changing it to is also based on the current value of the column to be changed. Specifically, I have a column which contains DenseVectors, and another column...
bmarks2010
1

votes
2

answer
294

Views

Generate all possible combinations between two columns and an indicator to show if that combination exists in the source table

I am completely lost at a particular stage on doing a transformation. I am planning to achieve it either by using SQL or pyspark. My input format is. id name 1 A 1 C 1 E 2 A 2 B 2 C 2 E 2 F 3 A 3 E 3 D Could you please help me getting this output format. id name rating 1 A...
ankush reddy
1

votes
2

answer
268

Views

Slow filtering of pyspark dataframes

I have a question regarding the time difference while filtering pandas and pyspark dataframes: import time import numpy as np import pandas as pd from random import shuffle from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = pd.DataFrame(np.random.randint(1000000, si...
Konstantin
1

votes
1

answer
22

Views

Merging records by 1+ common elements

I have a hive table with the following schema: int key1 # this is unique array key2_list Now I want to merge records here if their key2_lists have any common element(s). For example, if record A has (10, [1,3,5]) and B has (12, [1,2,4]), I want to merge it as ([10,12], [1,2,3,4,5]) or ([1,2,3,4,5])....
kee
1

votes
0

answer
142

Views

Use threading to spawn multiple processes to fetch the data in python 3

I novice to threads in python. Trying to implement it first time. I want to use threading to spawn multiple processes. I have written code below where I am trying to fetch data from the spark sql query. When I am trying to pull data without threading, this query is taking lot of time as it has to s...
user15051990
1

votes
1

answer
174

Views

Issues running spark-submit opening a SparkContext

firstly I describe my scenario. Ubuntu 14.04 Spark 1.6.3 Python 3.5 I'm trying to execute my python scripts thru spark-submit. I need to create a context and then apply SQLContext as well. Primarily I have tested a very easy case in my pyspark console: Then I'm creating my python script. from pyspa...
Andres Urrego Angel
1

votes
1

answer
2.1k

Views

Adding a Arraylist value to a new column in Spark Dataframe using Pyspark [duplicate]

This question already has an answer here: How to add a constant column in a Spark DataFrame? 3 answers I want add a new column in my existing dataframe. Below is my dataframe - +---+---+-----+ | x1| x2| x3| +---+---+-----+ | 1| a| 23.0| | 3| B|-23.0| +---+---+-----+ I am able to add df = df....
user7348570
1

votes
1

answer
614

Views

How to read hive data from HDFS

I have hive warehouse in HDFS hdfs://localhost:8020/user/hive/warehouse. I have a database mydb inside hdfs like hdfs://localhost:8020/user/hive/warehouse/mydb.db How can I create a table & insert data into it using Pyspark Please suggest
Praveen Mandadi
1

votes
1

answer
1k

Views

pyspark withColumnRenamed, drop functions, u'Reference ambiguous error

I have a function which changes the column headers of a DF with a new set of headers in a list. def updateHeaders(dataFrame, newHeader): oldColumns = dataFrame.schema.names dfNewCol = reduce(lambda dataFrame, idx: dataFrame.withColumnRenamed(oldColumns[idx], newHeader[idx]), xrange(len(oldColumns))...
yalcinm1
1

votes
0

answer
531

Views

PySpark streaming: window and transform

I'm trying to read in data from a Spark streaming data source, window it by event time, and then run a custom Python function over the windowed data (it uses non-standard Python libraries). My data frame looks something like this: | Time | Value | | 2018-01-01 12:23:50.200 | 1234...
m01
1

votes
1

answer
280

Views

Inserting arrays into parquet using spark sql query

I need to add complex data types to a parquet file using the SQL query option. I've had partial success using the following code: self._operationHandleRdd = spark_context_.sql(u'INSERT OVERWRITE TABLE _df_Dns VALUES array(struct(struct(35,'ww'),5,struct(47,'BGN')), struct(struct(70,'w'),1,struct(8...
Yonatan Edrich
1

votes
1

answer
725

Views

Pyspark replace strings in Spark dataframe column by using values in another column

I'd like to replace a value present in a column with by creating search string from another column before id address st 1 2.PA1234.la 1234 2 10.PA125.la 125 3 2.PA156.ln 156 After id address st 1 2.PA9999.la 1234 2 10.PA9999.la 125 3 2.PA9999.ln 156 I tried df.withColumn('address',...
prudhvi Indana
1

votes
2

answer
989

Views

Cant connect to Mysql database from pyspark, getting jdbc error

I am learning pyspark, and trying to connect to a mysql database. But i am getting a java.lang.ClassNotFoundException: com.mysql.jdbc.Driver Exception while running the code. I have spent a whole day trying to fix it, any help would be appreciated :) I am using pycharm community edition with anacond...
rakesht
1

votes
0

answer
74

Views

How to speed up cassandra “in” queries or 1,000s of single queries using pyspark?

I have thousands of queries that I need to issue. Each query is along the lines of select sample_time, value from table where company=123 and location=some-place and id= the only difference in the queries is the id. I had been using cassandra driver in python and running concurrent queries where...
pmdaly
1

votes
0

answer
213

Views

Inconsistent results in pyspark

Running the following code repeatedly generates inconsistent results. So far, I have only seen two outputs. The results get repeated any random number of times before switching to the other results, which then also repeat any random number of times before switching back again. Why is this happenin...
Clay
1

votes
0

answer
102

Views

Issue in saving the content of a dataframe to table

I have a data source (hive external tables) which refresh the data in adhoc manner. To avoid any discrepancies in the execution i'm trying to save the data as a table in my location. Initially, i have loaded the data from data source to a dataframe source = hqlContext.table('datasourcedb.table1') //...
1

votes
1

answer
224

Views

How can we create many new columns in a dataframe in pyspark using withcolumn

I have to create around 800 dummy columns in a dataframe which have Null values in it. I don't want to use df.withColumn('x', lit(None)) for individual columns as there are many columns. I tried map(lambda x: df.withColumn(x, lit(None)), column_list) but it's not working. Writing the snippet below...
chetan
1

votes
1

answer
600

Views

Percentiles in spark - most efficient method (RDD vs SqlContext) [duplicate]

This question already has an answer here: How to find median and quantiles using Spark 4 answers I have a large grouped dataset in spark that I need to return the percentiles from 0.01 to 0.99 for. I have been using online resources to determine different methods of doing this, from operations on R...
AMcNall
1

votes
1

answer
100

Views

hive scan and select in one query

I have a hive table, say emp_details(name, dept). In this table, I need to check if any records exists with dept = ‘a’ then select those records. If no such record is found then only I will choose records with dept = ‘b’. The source data has either 'a' or 'b' as dept value and my result set...
Sandeep
1

votes
2

answer
1.7k

Views

How to overwrite the rdd saveAsPickleFile(path) if file already exist in pyspark?

How to overwrite RDD output objects any existing path when we are saving time. test1: 975078|56691|2.000|20171001_926_570_1322 975078|42993|1.690|20171001_926_570_1322 975078|46462|2.000|20171001_926_570_1322 975078|87815|1.000|20171001_926_570_1322 rdd=sc.textFile('/home/administrator/work/test1'...
Sai
1

votes
1

answer
394

Views

sort pyspark dataframe within groups

I would like to sort column 'time' within each 'id' group. The data looks like: id time name 132 12 Lucy 132 10 John 132 15 Sam 78 11 Kate 78 7 Julia 78 2 Vivien 245 22 Tom I would like to get this: id time name 132 10 John 132 12 Lucy 132 15 Sam 78 2 Vivi...
MLam
1

votes
0

answer
54

Views

How to get Dataframe partitions to include literal colons instead of encoded (%3A) colons in S3 key names

In pyspark, I have a dataframe that contains a date column with the date truncated to the day # +-------------------+ # |d | # +-------------------+ # |2018-04-07 00:00:00| # +-------------------+ When I do this: df.writ...
Jared
1

votes
1

answer
306

Views

Not able to select more than 255 columns from Pyspark DataFrame

I am trying to select 500 columns from a Pyspark DatFrame. Getting error as 'SyntaxError: more than 255 arguments' Df2 = Df\ .select('col1','col2','col3',...............,'col500') Tried below approach also, bit did not work. cols = ['col1','col2','col3',...............,'col500'] Df2 = Df\ .select(co...
1

votes
1

answer
34

Views

Unexpected Shuffle while Calculating Mean by Partition from Hive Data

My Question: Why does Spark calculate sum and count from each partition, do an unnecessary (IMHO) shuffle (Exchange hashpartitioning), and then calculate the mean in HashAggregate? What could've been done: Calculate the mean for each partition and then combine (union) the results. Details: I am read...
pbahr
1

votes
0

answer
33

Views

PySpark - Remove Illegal Hive Character from schema

I have a really large pyspark dataframe which gets data from json files. This is an example of the schema |-- Col1: array (nullable = true) | |-- element: double (containsNull = true) |-- Col2: struct (nullable = true) | |-- Col2-Col1: string (nullable = true) | |-- Col2-Col2: string (null...
1

votes
2

answer
964

Views

How to plot using matplotlib and pandas in pyspark environment?

I have a very large pyspark dataframe and I took a sample and convert it into pandas dataframe sample = heavy_pivot.sample(False, fraction = 0.2, seed = None) sample_pd = sample.toPandas() The dataframe looks like this: sample_pd[['client_id', 'beer_freq']].head(10) client_id beer_freq 0 1000839...
Elsa Li
1

votes
0

answer
41

Views

How to avoid random number while writing Dataframe for a given path in pyspark?

I have pyspark Dataframe by executing below code I am saving the Dataframe in give path. df.write.format('csv').options(header='false', inferschema='true',sep='|').option('codec', 'org.apache.hadoop.io.compress.GzipCodec').save('path') after saving files in path folder when i am list the files by e...
Sai
1

votes
0

answer
89

Views

Extract only points inside polygon

I have two CSV file one contain points for polygon around 2000 point (lat, long). another file has more than 1 billion row ( id, lat, long). how to extract only the points intersect(inside) the polygon by pyspark
Sidhom
1

votes
0

answer
49

Views

Weird issue facing with spark-sql

I have below 3 sqls. select count(distinct visitor_id) from df_and_lkp_join_cache --178996 select count(distinct trim(visitor_id)) from df_and_lkp_join_cache --178996 select count(distinct visitor_id) from (select a.visitor_id, a.ip, b.visitor_id as visitor_id_b from df_and_lkp_join_cache a inner j...
ashwani gupta
1

votes
0

answer
249

Views

How to efficiently calculate WoE in PySpark

I want to calculate Weight of Evidence on a feature column depending on the binary target column, is there a way to efficiently do that in Spark? As of now, Spark still doesn't have any inbuilt API for calculating WoE. I've built using a few Spark-SQL queries which is as follows (here item is one co...
Aakash Basu
1

votes
2

answer
792

Views

Remove duplicate rows, regardless of new information -PySpark

Say I have a dataframe like so: ID Media 1 imgix.com/20830dk 2 imgix.com/202398pwe 3 imgix.com/lvw0923dk 4 imgix.com/082kldcm 4 imgix.com/lks032m 4 imgix.com/903248 I'd like to end up with: ID Media 1 imgix.com/20830dk 2...
Ashley O
1

votes
0

answer
360

Views

how to get spark executor process PID in pyspark

suppose a spark job running in cluster mode launches 3 executors in cluster mode, then how to fetch the process ID (PID) of each of the executor processes in the spark cluster.? is there any api for this in pyspark.? EDIT: The question is about the executor jvm process ID (PID) not the executor ID....
Manoranjan
1

votes
1

answer
481

Views

NameError: name 'dbutils' is not defined in pyspark

I am running a pyspark job in databricks cloud. I need to write some of the csv files to databricks filesystem (dbfs) as part of this job and also i need to use some of the dbutils native commands like, #mount azure blob to dbfs location dbutils.fs.mount (source='...',mount_point='/mnt/...',extra_co...
Krishna Reddy
1

votes
0

answer
127

Views

Pyspark: last non null record for each id in dataframe (Spark Streaming)

I have a dataframe with schema - |-- record_id: integer (nullable = true) |-- Data1: string (nullable = true) |-- Data2: string (nullable = true) |-- Data3: string (nullable = true) |-- Time: timestamp (nullable = true) I wanted to retrieve the last non null value for each column in the data, groupi...
Mark B.
1

votes
0

answer
68

Views

How to return data in function passed to flatmap

I have a file of data in the following schema: +--------------------+-------------------+ | full_text| id| +--------------------+-------------------+ I want to change each record to pairs of (token, id). For example for record Hi, how are you, 1010 I want it to transform to...
1

votes
0

answer
32

Views

hive or pyspark order issue

Hi, My requirement is to find the maximum of MONTHS_BALANCE based on the individual SK_ID_CURR, SK_ID_PREV. To achieve that, I need to order by SK_ID_CURR, SK_ID_PREV and then MONTHS_BALANCE or CNT_INSTALMENT_FUTURE. I wrote the query using group by and order by but able to order only the first two...
sarath chandra
1

votes
3

answer
387

Views

difference between select distinct id and select distinct * in sql

i tried with select distinct ID from DB.TABLE; it returned unique ids in all records. select distinct * from DB.TABLE; it will return unique records by comparing all columns and records or what is functionality of 'distinct *' ,i confused with functionality of distinct * .
Ahito
1

votes
1

answer
44

Views

Pyspark DF with a list of dates (with PANDAS) from today BACK 1 year

Morning All, I am trying to create a Pyspark DF with a list of dates from today back 1 year. So far I can get the list going fwd from today with this import pandas as pd dates = pd.date_range(pd.datetime.today(), periods=365).tolist() dates = list(map(pd.Timestamp.to_pydatetime, dates)) dates_df = s...
Sashweed

View additional questions