WORK IN PROGRESS
This is a simple spark cheatsheet I made when I learnt to use it for some Extract-Transform-Load operations.
General
Import sql functions
import pyspark.sql.functions as F
Object Transformations
Transform to Pandas dataframe
import pandas as pd
def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
def toPandas(df, n_partitions=None):
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df: pyspark.sql.DataFrame
:param n_partitions: int or None
:return: pandas.DataFrame
"""
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
Transform to list
# To extract the column 'column' from the pyspark dataframe df
mylist = [row.column for row in df.collect()]
Type transformations
Types:
- BinaryType – Binary data.
- BooleanType – Boolean values.
- ByteType – A byte value.
- DateType – A datetime value.
- DoubleType – A floating-point double value.
- IntegerType – An integer value.
- LongType – A long integer value.
- NullType – A null value.
- ShortType – A short integer value.
- StringType – A text string.
- TimestampType – A timestamp value (typically in seconds from 1/1/1970).
- UnknownType – A value of unidentified type.
How to import a type
# To import integer type
from pyspark.sql.types import IntegerType
How to cast:
# To cast to double
F.col('mycolumn').cast(DoubleType())
String type to date type
F.to_date(F.unix_timestamp('date', 'yyyy-MM-dd').cast("timestamp"))
Join
Inner join
df_joined = df_A.join(df_B, df_A.KEY_A == df_B.KEY_B, how='inner')
where KEY_A is the key to join A from and KEY_B is the key to join B from.
Left/Right join
df_joined = df_A.join(df_B, df_A.KEY_A == df_B.KEY_B, how='right')
df_joined = df_A.join(df_B, df_A.KEY_A == df_B.KEY_B, how='left')
TO CLASSIFY
MAPPING DICT
To apply a mapping to a column given a dataframe and a mapping as a dictionary
from itertools import chain
mapping = {
'old_value1': 'new_value1',
'old_value2': 'new_value2'
}
mapping_expr = F.create_map([F.lit(x) for x in chain(*mapping.items())])
df.withColumn("mapped_valule", mapping_expr.getItem(F.col("value")))
Where function
is equal / not equal
# keep rows where column 'int_column' values are equal to integer 5.
F.where(F.col('int_column') == 5)
F.where(F.col('date_column') < '2018/12/05')
# keep rows where column 'bool_column' values are equal to boolean True.
F.where(F.col('bool_column') == True)
is in / not in list
my_list = [5, 23, 7]
# keep rows
F.where(F.col('value').isin(my_list))
# skip rows Where
F.where(~F.col('value').isin(my_list))
Window
from pyspark.sql.window import Window
window = Window.partitionBy(df['category']).orderBy(df['revenue'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)
Rename column
df.withColumnRenamed(“colName”, “newColName”)
Add time to timestamp
df.withColumn('new_timestamp', F.col('timestamp') + F.expr('INTERVAL 2 HOURS'))
CREATE UDF FUNCTION
square_udf_int = udf(lambda z: square(z), IntegerType())