Monday, March 14, 2022

Handout Series - Big Data - Spark - Performance Tuning

Background:

In this handout, I tried to cover some key aspects which we need to keep in mind while trying to tune a spark application. This covers tools I frequently used to get more detail info to do better debugging.


Tools and techniques to debug performance issues:

Tools/Techniques

Things to focus on

Spark UI

  • Stage and task completion time

  • 25 percentile, 50 percentile, 75 percentile of execution time

  • Memory usage by executor

  • In and out data for tasks

  • DAG – helps to find out job/stage optimization

  • Garbage collection time

Spark Log

Partition size – evenly distributed or not

Yarn Manager (if yarn is used as resource manager), UI or yarn command

  • Number of executors

  • Job/task execution time for each executor

  • Memory usage for each executor

Data Storage (HDFS, object storage (S3), etc)

  • File size - helps to identify skew in data

DataFrame API

  • Number of partitions

  • Number of records per partitions

SparkLens

Tool that analyzes the application and suggests possible configuration parameters with values to get optimal performance that could be head start before further tuning


Primary areas in Spark to tune performance:

Areas

Note

Memory

  • Amount of memory for per executor might increase garbage collection

  • Spark parameters

    • spark.driver.memory

    • spark.executor.memory

Processing

  • Increase in cores gives parallelism but might impact scheduling and additional shuffle

  • Spark parameters

    • spark.driver.cores

    • spark.executor.cores

    • spark.cores.max

    • spark.executor.instances

Garbage Collection

  • Type of GC to use (ParallelGC, G1GC etc.)

  • Configure each type of GC to get better performance

  • Specify high level GC parameters like parallel threads, concurrent threads, heap occupancy percentage etc.



Note:

We need to be cognizant about all the areas when tuning performance as change in one area has direct impact to others.

It has to be balancing act, need to fine the combination of parameters across all areas which fit better for the problem in hand


Performance Improvement technique- resolve skewness in data

What is skew


  • Most common problem, mostly happens during initial phase

  • Extreme imbalance of work in the cluster

  • Tasks within a stage take uneven amounts of time to finish

How to identify


  • Stage/task wait time and execution time – spark UI

  • Memory usage – Spark UI, yarn

  • Check partition size – spark UI, yarn

  • Hearbeat is missing by executor – spark log

  • Partition size and records processed per partition – DataFrame API

How it could happen


Data ingestion has skew in it where data is not partitioned properly. This will be propagated to other parts of the application

How to handle skew data


  • Use a partition column in the source data (if possible) and use it to read – gives best performance as partitions don’t need to be created in processing time. If partition column cannot be added in source, create the partition column to dataset immediately after reading data

  • Make sure the partition column has even distribution of data

    • Partition column has to be numeric. Can be a composite column

  • Spark parameters:

    • partitionColumn

    • lower/upperBound – used to determine stride

    • numPartitions – max # of partitions

  • Make sure stride is not skew (specially for JBDC load)

    • Use hash + Mod function to have evenly distributed stride if some strides have uneven data volume

  • Partition data at dataframe level – great for ‘narrow transformation’

    • To increase, use partition function

    • To reduce, use coalesce for less data shuffling


Performance Improvement technique – use cache/persist

What is cache/persist


  • Persist – saves data in storage

  • Cache – in memory persist

  • Reuses DataFrame with transformations rather than building it from scratch

  • Spark uses columnar format (allows scanning on required columns), compression to minimize memory usage and less pressure on GC

How to use cache/persist


  • Select appropriate storage type as each type has it’s own performance implication

  • Un-persist when done and don’t over persist

    • Has impact on memory performance and possible slowdown

    • Puts pressure on GC

  • Without cache, DataFrame will be built from scratch if the same one is used in processing, saves read time, serialization time etc.

  • Spark properties:

    • spark.sql.inMemoryColumnarStorage.compressed 

    • spark.sql.inMemoryColumnarStorage.batchSize



Performance Improvement technique – use seq.par.foreach

  • Allows items in a sequence (seq in scala) to be processed in parallel, hence improving on processing time. Though we need to be careful that logic doesn’t yield non-deterministic results or else will create a race condition.

  • Example: if we have a foreach loop to process (read and/or transformation) a file on a seq where file names are stored, replacing with seq.par.foreach will allow parallel processing of files and give performance benefit


Performance Improvement technique – use proper joining strategies

Use broadcast join


  • Broadcast puts a DataFrame to every executor before joining. Idea is to broadcast smaller dataset to all executors and use local copy of DataFrame to join with bigger dataset

  • Spark 3.x has auto-broadcast join threshold to 10M, datasets smaller than that will be automatically broadcasted if not done explicitly in the code

  • In broadcasting, spark uses broadcast hash join instead of sort merge join which triggers data shuffle and exchanges

  • Code snippet: largeDF.join(broadcast(smallDF), largeDF.col(“col_name”) === smallDF.col(“col_name”))

  • If DataFrame is big: apply filters to keep the dataset as small possible

Dynamic partition pruning


  • Out of the box functionality in spark 3.x

  • Eliminates partition at the read time

Introduce salting to reduce skew




Performance Improvement technique – use right serializer

Types of serializers



Java serializer

Kryo serializer

  • Default for most types

  • Can work on any class

  • More flexible

  • Gives slower performance

  • Default for shuffling RDDs and simple types (integer, string)

  • Significantly faster (4x) and more compact (10x)


Serialization strategies


  • Use kryo for serialization boosting

  • Register classes to use as serializer in kryo – doesn’t save class name as part of object, saves 6 bytes for each character in class name


Few simple handy tricks

  • Keep larger dataset at left in the joining – spark implicitly try to shuffle right dataset first

  • Apply appropriate partitioning –

    • very less partitions = less parallelism

    • too many partitions = scheduling issues, more data shuffling

  • Try not to use UDFs

    • UDF execution flow - Deserializes every row to an object > apply lamda function > reserializes the row

    • Above flow puts pressure on garbage collection by generating lot of garbage

    • Avoid using UDFs specially if not using scala, as everything needs to be translated to JVM code

  • Apply all the filters on dataset as early as possible in the processing


Few advance configuration
  • JVM parameters for memory - 

  • Reduce long Garbage collection time

    • Use ‘Spark UI’ to check time spent by task vs garbage collection

    • First step of GC tuning is enable GC logging

    • Analyze log to look for 

      • How frequent GC is occurring

      • How much memory is cleaned up

      • Algorithm specific (G1GC, ParallelGC) stage information like minor GC, full GC, major GC

ParallelGC (spark’s default)

G1GC

Heap space is divided into young and old generation

Breaks heap into thousands of region


If minorGC occurs frequently: increase Eden and Survivor space

Recommended if heap size are large (>8GB) GC times are long

If majorGC occurs frequently: increase young and old space, decrease spark.memory.fraction

Lower latency traded with higher cpu usage for GC bookkeeping

If fullGC occurs before task finishes: increase memory, try cleaning young space by GC triggers


For large scale applications, setting below parameters help a lot

  • -XX:ParallelGCThread=n

  • -XX:ConcGCThreads=[n, 2n]

  • -XX:InitialHeapOccupancyPercent=35