Questions tagged [pyspark]

1

votes
2

answer
484

Views

PySpark Structured Streaming: Pass output of Query to API endpoint

I have the following dataframe in Structured Streaming: TimeStamp|Room|Temperature| 00:01:29 | 1 | 55 | 00:01:34 | 2 | 51 | 00:01:36 | 1 | 56 | 00:02:03 | 2 | 49 | I am trying to detect when temperatures fall below a certain temperature (50 in this case). I have t...
DataGeek
1

votes
1

answer
728

Views

How do I select an ambiguous column reference? [duplicate]

This question already has an answer here: Enable case sensitivity for spark.sql globally 1 answer Here's some sample code illustrating what I'm trying to do. There is a dataframe with columns companyid and companyId. I want to select companyId, but the reference is ambiguous. How do I unambiguously...
chris.mclennon
1

votes
0

answer
23

Views

How to detect duplicates in large json file using PySpark HashPartitioner

I have a large json file with over 20GB of json-structured metadata. It contains simple user metadata across some application, and I would like to sift through it to detect duplicates. Here is an example of how the data looks like: {'created': '2015-08-04', 'created_at': '2010-03-15', 'username': 'k...
John Lexus
1

votes
1

answer
45

Views

Forward Fill New Row to Account for Missing Dates

I currently have a dataset grouped into hourly increments by a variable 'aggregator'. There are gaps in this hourly data and what i would ideally like to do is forward fill the rows with the prior row which maps to the variable in column x. I've seen some solutions to similar problems using PANDAS b...
ImNewToThis
1

votes
2

answer
85

Views

How to identify repeated occurrences of a string column in Hive?

I have a view like this in Hive: id sequencenumber appname 242539622 1 A 242539622 2 A 242539622 3 A 242539622 4 B 242539622 5 B 242539622 6 C 242539622...
Isaac
0

votes
0

answer
3

Views

Not able to create local directory in EMR by user 'livy'

I am submitting a pyspark job into EMR cluster from a AWS Step Function through apache livy. The pyspark job has Unix shell commands being fired. Within test.py, subprocess.call(' echo $USER', shell=True, stdout=None, stderr=None) subprocess.call(' mkdir /mnt/data', shell=True, stdout=None, stderr=...
Parijat Bose
0

votes
0

answer
6

Views

how to handle exceptions in pyspark, when data is unproper order?

actually i am creating small RDD from some unorderd data, like it doesn't have same number of columns in each row. so i am taking it as tuple type with maximum line index. here what i am getting problem is when i am accessing tuple[4],tuple[9] like this some rows doesn't have 9 index and all, so in...
jodu
2

votes
0

answer
11

Views

How to modify a column based on the values in another column of a PySpark dataframe? F.when edge case

I'd like to go through each row in a pyspark dataframe, and change the value of a column based on the content of another column. The value I am changing it to is also based on the current value of the column to be changed. Specifically, I have a column which contains DenseVectors, and another column...
bmarks2010
1

votes
0

answer
8

Views

How to Add Additional Remote Repository to Zeppelin?

Using the following code in a Zepplin note, I was able to add a repository and dependency. How would I accomplish the same using zeppelin.dep.additionalRemoteRepository? %dep z.addRepo('hortonworks').url('http://repo.hortonworks.com/content/repositories/releases') z.load('com.hortonworks:shc-core:1....
Ari
1

votes
1

answer
1.8k

Views

How to get best params after tuning by pyspark.ml.tuning.TrainValidationSplit?

I'm trying to tune the hyper-parameters of a Spark (PySpark) ALS model by TrainValidationSplit. It works well, but I want to know which combination of hyper-parameters is the best. How to get best params after evaluation ? from pyspark.ml.recommendation import ALS from pyspark.ml.tuning import Train...
takaomag
1

votes
2

answer
920

Views

pyspark: getting the best model's parameters after a gridsearch is blank {}

could someone help me extract the best performing model's parameters from my grid search? It's a blank dictionary for some reason. from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator from pyspark.ml.evaluation import BinaryClassificationEvaluator train, test = df.ra...
user798719
1

votes
2

answer
294

Views

Generate all possible combinations between two columns and an indicator to show if that combination exists in the source table

I am completely lost at a particular stage on doing a transformation. I am planning to achieve it either by using SQL or pyspark. My input format is. id name 1 A 1 C 1 E 2 A 2 B 2 C 2 E 2 F 3 A 3 E 3 D Could you please help me getting this output format. id name rating 1 A...
ankush reddy
1

votes
2

answer
450

Views

Get index of item in array that is a column in a Spark dataframe

I am able to filter a Spark dataframe (in PySpark) based on if a particular value exists within an array field by doing the following: from pyspark.sql.functions import array_contains spark_df.filter(array_contains(spark_df.array_column_name, 'value that I want')).show() Is there a way to get the i...
user1624577
1

votes
2

answer
268

Views

Slow filtering of pyspark dataframes

I have a question regarding the time difference while filtering pandas and pyspark dataframes: import time import numpy as np import pandas as pd from random import shuffle from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = pd.DataFrame(np.random.randint(1000000, si...
Konstantin
1

votes
1

answer
43

Views

Pyspark - GroupBy and Count combined with a WHERE

Say I have a list of magazine subscriptions, like so: subscription_id user_id created_at 12384 1 2018-08-10 83294 1 2018-06-03 98234 1 2018-04-08 24903 2 2018-05-08 32843 2...
DataScienceAmateur
1

votes
2

answer
211

Views

How to interpolate a column within a grouped object in PySpark?

How do you interpolate a PySpark dataframe within grouped data? For example: I have a PySpark dataframe with the following columns: +--------+-------------------+--------+ |webID |timestamp |counts | +--------+-------------------+--------+ |John |2018-02-01 03:00:00|60 | |John...
penguin
0

votes
0

answer
11

Views

how to pass a SparkContext from pyspark file to a scala UDF?

I have a pyspark file and my main codes written in Python in this file. and also I have a Scala file that contains some functions written in Scala and use them as UDFs in pyspark code. Now I need to read a csv file as a Spark DataFrame in Scala functions. to do that I need to create a SparkSession o...
Ali AzG
1

votes
0

answer
484

Views

Request address for sparkDriver failing

while running pyspark on my terminal There is a new issue appeared in pyspark initiation: 17/12/28 10:31:59 ERROR SparkContext: Error initializing SparkContext. java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (starting from 0)! Consider explicit...
mermi
1

votes
1

answer
22

Views

Merging records by 1+ common elements

I have a hive table with the following schema: int key1 # this is unique array key2_list Now I want to merge records here if their key2_lists have any common element(s). For example, if record A has (10, [1,3,5]) and B has (12, [1,2,4]), I want to merge it as ([10,12], [1,2,3,4,5]) or ([1,2,3,4,5])....
kee
1

votes
1

answer
823

Views

Pull data from RDS MySQL db using pyspark

I am using pyspark first time. I am trying to pull data from RDS MySQL database using below code. I have referred to the following links pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver, https://www.supergloo.com/fieldnotes/spark-sql-mysql-python-example-jdbc/ and...
user15051990
1

votes
0

answer
204

Views

Using elephas on jupyter test notebooks

I don't have a spark cluster at hand, but I want to fiddle around with elephas, so I'm using temporary instances at try.jupyter.org. You'll find at the bottom of this question all of my code (for reproducibility), and the full error log, but here is a short description of my problem : When I run my...
François M.
1

votes
1

answer
781

Views

Spark Will Not Load Large MySql Table: Java Communications link failure - Timing Out

I'm trying to get a pretty large table from mysql so I can manipulate using spark/databricks. I can't get it to load into spark - I have tried taking smaller subsets, but even at the smallest reasonable unit, it still fails to load. I have tried playing with the wait_timeout and interactive_timeout...
1

votes
3

answer
221

Views

Python Pandas: How to remove all columns from dataframe that contains the values in a list?

include_cols_path = sys.argv[5] with open(include_cols_path) as f: include_cols = f.read().splitlines() include_cols is a list of strings df1 = sqlContext.read.csv(input_path + '/' + lot_number +'.csv', header=True).toPandas() df1 is a dataframe of a large file. I would like to only retain the colum...
Cody
1

votes
1

answer
675

Views

PySpark - Calling a sub-setting function within a UDF

I have to find neighbors of a specific data point in a pyspark dataframe. a= spark.createDataFrame([('A', [0,1]), ('B', [5,9]), ('D', [13,5])],['Letter', 'distances']) I have created this function that will take in the dataframe (DB) and then check the closest data points to a fixed point (Q) using...
Bryce Ramgovind
1

votes
2

answer
370

Views

My spark code is not using all the executors available in Yarn aws EMR

I have written Spark code which runs locally. I have created a user defined function which needs to be applied on a dataframe created by cross-joining two tables read from local files. Somehow the user defined function I'm applying is not using is not getting distributed. I have installed the requir...
Amjath Khan
1

votes
0

answer
213

Views

Using python3 locally while running pyspark3 jupyter kernel

Does anyone know how to change the specific instance of python invoked in the %%local spark magic when running a pyspark3 kernel in jupyter? When running the PySpark3 kernel that ships with sparkmagic, I often want to pull data down to my local machine and do some calculations there. However, the...
John
1

votes
0

answer
142

Views

Use threading to spawn multiple processes to fetch the data in python 3

I novice to threads in python. Trying to implement it first time. I want to use threading to spawn multiple processes. I have written code below where I am trying to fetch data from the spark sql query. When I am trying to pull data without threading, this query is taking lot of time as it has to s...
user15051990
1

votes
1

answer
174

Views

Issues running spark-submit opening a SparkContext

firstly I describe my scenario. Ubuntu 14.04 Spark 1.6.3 Python 3.5 I'm trying to execute my python scripts thru spark-submit. I need to create a context and then apply SQLContext as well. Primarily I have tested a very easy case in my pyspark console: Then I'm creating my python script. from pyspa...
Andres Urrego Angel
1

votes
0

answer
512

Views

Read first line of huge Json file with Spark using Pyspark

I'm pretty new to Spark and to teach myself I have been using small json files, which work perfectly. I'm using Pyspark with Spark 2.2.1 However I don't get how to read in a single data line instead of the entire json file. I have been looking for documentation on this but it seems pretty scarce. I...
Sleenee
1

votes
1

answer
979

Views

Apply window function in Spark with non constant frame size

My Problem I am currently facing difficulties with Spark window functions. I am using Spark (through pyspark) version 1.6.3 (associated Python version 2.6.6). I run a pyspark shell instance that automatically initializes HiveContext as my sqlContext. I want to do a rolling sum with window function....
linog
1

votes
1

answer
357

Views

Could not read data from kafka using pyspark

I've a streaming data in my kafka topic. I need to read this data from topic using pyspark inthe form of pyspark dataframe. But I'm continuously receiving error when I'm calling readStream function. The error is 'py4j.protocol.Py4JJavaError: An error occurred while calling o35.load'. My code is as f...
Nayana Madhu
1

votes
1

answer
1.3k

Views

How to relationalize a JSON to flat structure in AWS Glue

Trying to flatten input JSON data having two map/dictionary fields (custom_event1 and custom_event2), which may contain any key-value pair data. In order to create an output table from the data frame, will have to avoid the flattening of custom_events and store it as JSON string in the column. Follo...
Sumit Saurabh
1

votes
1

answer
826

Views

java.net.UnknownHostException when starting Spark shell [duplicate]

This question already has an answer here: PySpark install error 1 answer After installing spark following this. https://www.davidadrian.cc/posts/2017/08/how-to-spark-cluster/ I got this message : [email protected]:~$ spark-shell Using Spark's default log4j profile: org/apache/spark/log4j-defaults.propertie...
Tensor
1

votes
1

answer
107

Views

pyspark in databricks - replace delimiters and string for every element in RDD at one shot

I feel stupid when trying to do something so simple as the following: I'm working with dataset /databricks-datasets/cs110x/ml-1m/data-001 which includes 3 dat files with info about user, movies and their ratings. RDD is like this sample of 10 entries: [u'1::F::1::10::48067', u'2::M::56::16::70072',...
zontxo
1

votes
0

answer
199

Views

writestream aggregate windowed watermarked dataframe doesn't wok:

I am working with a CSV dataset as input, read by readStream as below: inDF = spark \ .readStream \ .option('sep', ',') \ .option('maxFilesPerTrigger', 1) \ .schema(rawStreamSchema) \ .csv(rawEventsDir) Below the schema: inDF schema = root |-- timeStamp: timestamp (nullable = true) |-- machine: str...
Roberto Patrizi
1

votes
0

answer
51

Views

How to show pyspark docstrings in pycharm

Environment pycharm 2016.3.2 python 2.7 pip install pyspark in venv Some documents works well, But others are not work It is very wired, how to fix this?
okwap
1

votes
1

answer
216

Views

Pyspark installation with python 2.6

I have a production cluster with no internet connection and I would like to run spark scripts. Also, I only have installed python 2.6 and I cannot install 2.7 Considering these limitations where can I find the python package for pyspark 1.6.0? Best regards, João
João
1

votes
1

answer
884

Views

Concat multiple columns with loop Pyspark

I have n arrays of string columns. I would like concatenate this n columns in one, using a loop. I have this function to concat columns: def concat(type): def concat_(*args): return list(chain(*args)) return udf(concat_, ArrayType(type)) concat_string_arrays = concat(StringType()) And in the followi...
1

votes
0

answer
189

Views

Data manipulation in PySpark [duplicate]

This question already has an answer here: How to melt Spark DataFrame? 3 answers Unpivot in spark-sql/pyspark 1 answer I have a dataframe A, whose content is as below: site id date1 date2 A 4/14/2001 1/1/1997 B 3/04/2000 4/8/1999 I want to pivot down data and store i...
Varun Chadha
1

votes
2

answer
642

Views

Calling pyspark function asynchronously with concurrent.futures

I am trying to call python functions which use pyspark rdd objects methods and are time-consuming which blocks my application. I need to write it in an async fashion so that my app doesn't get blocked. Here is a miniature version of the actual thing I want to do. from concurrent.futures import Futur...
DevanshBheda

View additional questions