Questions tagged [pyspark]

1

votes
1

answer
1.4k

Views

Convert a pandas dataframe to a PySpark dataframe [duplicate]

This question already has an answer here: Convert between spark.SQL DataFrame and pandas DataFrame [duplicate] 1 answer I have a script with the below setup. I am using: 1) Spark dataframes to pull data in 2) Converting to pandas dataframes after initial aggregatioin 3) Want to convert back to Spar...
kikee1222
1

votes
1

answer
85

Views

PySpark Write Parquet Binary Column with Stats (signed-min-max.enabled)

I found this apache-parquet ticket https://issues.apache.org/jira/browse/PARQUET-686 which is marked as resolved for parquet-mr 1.8.2. The feature I want is the calculated min/max in the parquet metadata for a (string or BINARY) column. And referencing this is an email https://lists.apache.org/threa...
Nevermore
1

votes
2

answer
832

Views

How can I sum multiple columns in a spark dataframe in pyspark?

I've got a list of column names i want to sum columns = ['col1','col2','col3'] How can i add the three and put it in a new column ? (in an automatic way, so that i can change the column list and have new results) Dataframe with result i want: col1 col2 col3 result 1 2 3 6 Than...
Manrique
1

votes
2

answer
65

Views

How to create a PySpark Schema for a list of tuples?

What should be the correct PySpark Schema for the following list of tuples? I want to apply the schema on the following data: [('a', 0.0), ('b', 6), ('c', 44), ('d', 107), ('e', 0), ('f', 3), ('g', 4), ('h', 0.025599999353289604), ('i', 0.03239999711513519), ('j', -0.03205680847167969), ('k', 0.1042...
Ankan Dutta
1

votes
1

answer
51

Views

Optimizing pivoting and filling

They gave me a table storing sensor readings with a schema [TimeStamp, SensorKey, SensorValue]. TimeStamp Id Value 2019-01-01 00:00:47 1 66.6 2019-01-01 00:00:47 2 0.66 2019-01-01 00:00:57 1 66.7 2019-01-01 00:00:57 2 0.68 201...
dmcontador
1

votes
2

answer
43

Views

Pyspark - Split a column and take n elements

I want to take a column and split a string using a character. As per usual, I understood that the method split would return a list, but when coding I found that the returning object had only the methods getItem or getField with the following descriptions from the API: @since(1.3) def getItem(self...
Alejandro A
1

votes
0

answer
18

Views

H2O Target Mean Encoder “frames are being sent in the same order” ERROR

I am following the H2O example to run target mean encoding in Sparking Water (sparking water 2.4.2 and H2O 3.22.04). It runs well in all the following paragraph from h2o.targetencoder import TargetEncoder # change label to factor input_df_h2o['label'] = input_df_h2o['label'].asfactor() # add fold co...
Gavin
3

votes
0

answer
8

Views

how to save spark streaming to local pc and hdfs?

tried this data is getting streamed and couldnt save that data in form of tuples in local disk or hdfs. from pyspark import SparkConf, SparkContext from operator import add import sys from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils ## Constants APP_NAME...
hukkemaaru
0

votes
0

answer
5

Views

Shap rendering in Databricks

I am trying to visualise the Shap indices to explain an xgboost machine learning model. Can achieve this in google collab but am having difficulty achieving the same in Databricks. import shap import pandas as pd import numpy as np import matplotlib import xgboost as xgb df = pd.read_csv('/dbfs/data...
xxyy
0

votes
0

answer
4

Views

Pyspark apply parallel summary function to all columns

I would like to apply summary and customized stats functions to all columns, independently and in parallel. from pyspark.sql.functions import rand, randn df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27)) I have tried to look for answers such as h...
Kenny
1

votes
1

answer
1.6k

Views

Export models as PMML using PySpark

Is it possible to export models as PMMLs using PySpark? I know this is possible using Spark. But I did not find any reference in PySpark docs. So does this mean that if I want to do this, I need to write custom code using some third party python PMML library?
SameeraR
1

votes
0

answer
15

Views

Question about joining dataframes in Spark

Suppose I have two partitioned dataframes: df1 = spark.createDataFrame( [(x,x,x) for x in range(5)], ['key1', 'key2', 'time'] ).repartition(3, 'key1', 'key2') df2 = spark.createDataFrame( [(x,x,x) for x in range(7)], ['key1', 'key2', 'time'] ).repartition(3, 'key1', 'key2') (scenario 1) If I join th...
Artem Bergkamp
1

votes
1

answer
335

Views

How to bucketize a group of columns in pyspark?

I am trying to bucketize columns that contain the word 'road' in a 5k dataset. And create a new dataframe. I am not sure how to do that, here is what I have tried far : from pyspark.ml.feature import Bucketizer spike_cols = [col for col in df.columns if 'road' in col] for x in spike_cols : bucketi...
Matthew
1

votes
2

answer
516

Views

Create a dataframe from column of dictionaries in pyspark

I want to create a new dataframe from existing dataframe in pyspark. The dataframe 'df' contains a column named 'data' which has rows of dictionary and has a schema as string. And the keys of each dictionary are not fixed.For example the name and address are the keys for the first row dictionary but...
amol desai
1

votes
1

answer
47

Views

Creating a DataFrame from RDD while specifying DateType() in schema

I am creating a DataFrame from RDD and one of the value is a date. I don't know how to specify DateType() in schema. Let me illustrate the problem at hand - One way we can load the date into the DataFrame is by first specifying it as string and converting it to proper date using to_date() function....
cph_sto
0

votes
0

answer
5

Views

PySpark ClassNotFoundException: org.apache.spark.sql.DataFrame

I'm following examples on this page to test graphlab-create with PySpark Spark Integration I have tried following code from link: from pyspark import SparkContext from pyspark.sql import SQLContext # Launch spark by creating a spark context sc = SparkContext() # Create a SparkSQL context to manage d...
ikel
0

votes
0

answer
4

Views

PySpark: groupBy two columns with variables categorical and sort in ascending order

I am quite new in Spark and i have a problem with dataframe. I need to group the unique categorical variables from two columns (estado, producto) and then count and sort(asc) the unique values of the second column (producto). I can to do this in Pandas but i can't reproduce it in Spark. My original...
Rodrigo López
0

votes
0

answer
5

Views

Read CSV file from AWS S3

I have a EC2 instance running pyspark and I'm able to connect to it (ssh) and run interactive code within a Jupyter Notebook. I have a S3 bucket with a csv file that I want to read, when I attempt to read it with: spark = SparkSession.builder.appName('Basics').getOrCreate() df = spark.read.csv('http...
EGM8686
1

votes
1

answer
1.4k

Views

Most efficient way to load many files in spark in parallel?

[Disclaimer: While this question is somewhat specific, I think it circles a very generic issue with Hadoop/Spark.] I need to process a large dataset (~14TB) in Spark. Not doing aggregations, mostly filtering. Given ~30k files (250 part files, per month for 10 years, each part being ~ 200MB), I woul...
joshua.ewer
0

votes
0

answer
4

Views

Running pyspark script locally before sending to production (AWS EMR cluster)

I'm evaluating a possibility of using Spark (pyspark) for performing ETL: getting the data from Postgres using JDBC, processing it using Spark on AWS EMR and storing the results in another database. I don't want to have EMR cluster up and running all the time - that will be too expensive. It should...
vvzadvor
0

votes
0

answer
4

Views

Convert XML data to pandas dataframe via pyspark.sql.dataframe

My background: long-time SAS and R user, trying to figure out how to do some elementary things in Azure Databricks using Python and Spark. Sorry for the lack of a reproducible example below; I'm not sure how to create one like this. I'm trying to read data from a complicated XML file. I've reached t...
JMH
-2

votes
0

answer
27

Views

How to calculate distance between 2 lat long

We are trying to find out the distance between 2 locations or customer location with the centroid location.If the data looks below : enter image description here If we want to get the haversine distance like below: from numpy import cos, sin, arcsin, sqrt from math import radians def getHaversineDis...
Ashwin Padhy
1

votes
0

answer
14

Views

Pyspark causing java.lang.OutOfMemoryError: unable to create new native thread when writing to S3

Pyspark which copies data to S3 throwing with below piece of code self.dataframe.coalesce(1).write.format(format).mode('overwrite').save(location) Exception java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start(Thread.java:717) at java.util.concurrent.ThreadPoolEx...
Manoj4068
0

votes
0

answer
3

Views

PySpark - CSV to Parquet Column Header (Special Character “/”) error

I am having problem running this - it runs fine for Track Number but Transaction/Time has a special Character and it is failing - how to handle this. WHat i am looking for is either replace the row and define my own header. Or Remove special characters from the Header row. Thanks!! from pyspark.sql...
Abi.sonic
1

votes
3

answer
597

Views

No FileSystem for scheme: cos

I'm trying to connect to IBM Cloud Object Storage from IBM Data Science Experience: access_key = 'XXX' secret_key = 'XXX' bucket = 'mybucket' host = 'lon.ibmselect.objstor.com' service = 'mycos' sqlCxt = SQLContext(sc) hconf = sc._jsc.hadoopConfiguration() hconf.set('fs.cos.myCos.access.key', acces...
Chris Snow
1

votes
1

answer
4k

Views

pySpark: java.lang.UnsupportedOperationException: Unimplemented type: StringType

While reading inconsistent schema written group of parquet files, we have issue on schema merging. On switching to manually specifying schema i get following error. Any pointer will be helpful. java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution....
Srikant
1

votes
1

answer
24

Views

Pyspark: drop duplicates if reverse is present between two columns

I have dataframe contain (around 20000000 rows) and I'd like to drop duplicates from a dataframe for two columns if those columns have the same values, or even if those values are in the reverse order. For example the original dataframe: +----+----+----+ |col1|col2|col3| +----+----+----+ | 1| 1|...
Ahmad Suliman
1

votes
1

answer
7.4k

Views

Remove elements from Spark RDD

I am building an RDD from a text file. Some of the lines do not conform to the format I am expecting, in which case I use the marker -1. def myParser(line): try: # do something except: return (-1, -1), -1 lines = sc.textFile('path_to_file') pairs = lines.map(myParser) is it possible to remove the li...
Bob
1

votes
1

answer
1.6k

Views

PySpark throws java.io.EOFException when reading big files with boto3

I'm using boto3 to read files from S3, this have shown to be much faster than sc.textFile(...). These files are between 300MB and 1GB approx. The process goes like: data = sc.parallelize(list_of_files, numSlices=n_partitions) \ .flatMap(read_from_s3_and_split_lines) events = data.aggregateByKey(...)...
hmourit
1

votes
1

answer
163

Views

how to join features columns of two LabeledPoints in PySpark

I have two LabeledPoints - lable1 and label2: label1 = (label,[feature1,feature2,feature3]) label2 = (label,[feature4,feature5]) The label column in both LabeledPoints is same and I want to form a new LabeledPoint which has feature columns from the two LabeledPoints joined together: label_new = (lab...
Jason Donnald
1

votes
1

answer
1.5k

Views

Python Spark How to Map Fields of one rdd to another rdd

I am very new to python spark as per above subject i want to map the fields of one Rdd to the field of another Rdd.Here is the example rdd1: c_id name 121210 abc 121211 pqr rdd2: c_id cn_id cn_value 121211 0 0 121210 0 1 So the matched c_id will replace by name with cnid and the ag...
Deno George
1

votes
1

answer
1.6k

Views

Does caching a new table with an existing table name remove old contents from memory?

Using, Spark 1.5.2: dfOld.registerTempTable('oldTableName') hiveContext.cacheTable('oldTableName') // .... // do something // .... dfNew.registerTempTable('oldTableName') hiveContext.cacheTable('oldTableName') Now when I use the 'oldTableName' table I do get the latest contents from dfNew but do t...
Sahil Sareen
1

votes
1

answer
475

Views

Spark 1.5.2 + Hadoop 2.6.2 spark-submit and pyspark not utilizing all nodes in standalone

I'm running into an issue when running spark-submit or pyspark in standalone mode, something like this: spark/bin/pyspark --master spark://: which typically creates a running Spark application in the UI using all nodes (at least, in previous versions). For some reason, doing this only runs it on the...
Jack
1

votes
1

answer
1k

Views

Spark map is only one task while it should be parallel (PySpark)

I have a RDD with around 7M entries with 10 normalized coordinates in each. I also have a number of centers and I'm trying to map every entry to the closest (Euclidean distance) center. The problem is that this only generates one task which means it is not parallelizing. This is the form: def doSome...
Jan van der Vegt
1

votes
1

answer
548

Views

Importing bitarray library into SparkContext

I am trying to import the bitarray library into a SparkContext. https://pypi.python.org/pypi/bitarray/0.8.1. To do this I have zipped up the contexts in the bit array folder and then tried to add it to my python files. However even after I push the library to the nodes my RDD cannot find the library...
RDizzl3
1

votes
1

answer
2.3k

Views

Strip or Regex function in Spark 1.3 Dataframe

I have some code from PySpark 1.5 that I unfortunately have to port backwards to Spark 1.3. I have a column with elements that are alpha-numeric but I only want the digits. An example of the elements in 'old_col' of 'df' are: '125 Bytes' In Spark 1.5 I was able to use df.withColumn('new_col',F.rege...
PR102012
1

votes
1

answer
969

Views

Spark Streaming - HBase Bulk Load

I'm currently using Python to bulk load CSV data into an HBase table, and I'm currently having trouble with writing the appropriate HFiles using saveAsNewAPIHadoopFile My code currently looks as follows: def csv_to_key_value(row): cols = row.split(',') result = ((cols[0], [cols[0], 'f1', 'c1', cols[...
swinefish
0

votes
0

answer
2

Views

Pysparks re-samples my data everytime I run something related to the sample

I am running a stratified sample on a dataset, in which the sample I keep on a dataframe called df. When running a count on df, everytime I run the count (without re-running the stratified sampling), it gives me different count as if every time I do an operation on df, my data gets re-sampled. I hav...
Marcela Bejarano
2

votes
0

answer
9

Views

Does spark optimize identical but independent DAGs in pyspark?

Consider the following pyspark code def transformed_data(spark): df = spark.read.json('data.json') df = expensive_transformation(df) # (A) return df df1 = transformed_data(spark) df = transformed_data(spark) df1 = foo_transform(df1) df = bar_transform(df) return df.join(df1) my question is: are...
Jorge Leitão
1

votes
2

answer
592

Views

Unable to load bigquery data in local spark (on my mac) using pyspark

I am getting below error after executing below code. Am I missing something in the installation? I am using spark installed on my local mac and so I am checking to see if I need to install additional libraries for below code to work and load data from bigquery. Py4JJavaError...
VP10

View additional questions