Balancing Spark



  • What is Spark?
  • Frequency Analysis
  • Data Skew
  • Project Gutenberg
  • Diagnosis
  • Partitioning
  • Packing Problems
  • Bin Packing
  • Clojure Sparkling Implementation
  • Results
  • Conclusion

What is Spark?

What is Spark?

Large scale, general, distributed computation across a clusters of computers.

What is Spark?

Resilient Distributed Datasets (RDD)

Immutable, partitioned and distributed.

  • Transformations
    • map(func)
    • flatMap(func)
    • filter(func)
    • groupByKey()
    • mapValues(func)
    • union(other)
    • ...
  • Actions
    • reduce(func)
    • collect()
    • count()
    • take(n)
    • saveAsTextFile(path)
    • ...

Frequency Analysis

Frequency Analysis

Term Frequency-Inverse Document Frequency

$${\color{crimson}W}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}={\color{darkorange}tf}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}\times log(\frac{\color{deeppink}N}{{\color{darkmagenta}df}_{{\color{mediumseagreen}term}}})$$

${\color{crimson}W}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}$ = weight of $_{{\color{mediumseagreen}term}}$ in $_{{\color{cornflowerblue}doc}}$
${\color{darkorange}tf}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}$ = frequency of $_{{\color{mediumseagreen}term}}$ in $_{{\color{cornflowerblue}doc}}$
${\color{darkmagenta}df}_{{\color{mediumseagreen}term}}$ = number of documents containing $_{{\color{mediumseagreen}term}}$
${\color{deeppink}N}$ = total number of documents

Data Skew

Data Skew

What is data skew?

Data Skew

Causes of skew


Project Gutenberg

Project Gutenberg

Project Gutenberg
Project Gutenberg

Searching Gutenberg

Given some search terms:

  1. Find the book in each Gutenberg bookshelf that those terms are most important to.
  2. Find the Gutenberg bookshelf that those terms are most important to.
Project Gutenberg

Project Gutenberg Bookshelves

Inherently skewed data set; 228 bookshelves, 1 book minimum, 1341 book maximum, standard deviation ~ 121


How do you know if the computation is suffering from skewed data?


Diagnosis via Ganglia


Diagnosis via Spark UI


Can we partition our way out of this?


Partitions and Parallelism


Ideal Partitioning

More partitions = greater potential parallelism


Can we simply partition to the number of items we have?


Partitioning the Gutenberg data into the exact number of bookshelves


Problems with simple partitioning?

  • Overhead from small tasks
  • Poor scheduling
  • Not enough hardware
  • Unused capacity

Packing Problems

Packing Problems

To create more balanced computation across the workers maybe we can partition the data into more evenly weighted partitions?

Packing Problems

The problem roughly falls into a class of optimization problems known as packing problems.
  • Set packing
  • Bin packing problem
  • Slothouber-Graatsma puzzle
  • Conway puzzle
  • Tetris
  • Covering problem
  • Knapsack problem
  • Tetrahedron packing
  • Cutting stock problem
  • Kissing number problem
  • Close-packing of equal spheres
  • Random close pack
Packing Problems

These are NP-hard problems so we will need to make use of heuristical methods or possibly a constraint solver.

Bin Packing

Bin Packing

Our problem seems most closely related to the Bin packing problem (minimum number of bins of a certain volume) or maybe the Multiprocessor scheduling problem (pack into a specific number of bins)

Bin Packing

Contraint Solver Solution

Attempted using a constraint solver (in Python), it was FAR too slow!

from pyschedule import Scenario, solvers
import base64

def bin_packing_solver(items, get_size=lambda x: x[1], get_label=lambda x: x[0],
                       partitions=10, horizon=None):

if not horizon:
  horizon = get_size(items[0]) + 1

  S = Scenario('binpacking', horizon=horizon)
  resources = map(lambda x: S.Resource(str(x)), range(partitions))
  tasks = map(lambda x: S.Task(base64.b64encode(get_label(x)), get_size(x)), items)

  for t in tasks:
    t += reduce(lambda x,y: x|y, resources)

  return {base64.b64decode(str(s[0])): s[1] for s in S.solution()[:-1]}
Bin Packing

Bin Packing Methods

  1. First Fit (smallest number of fixed sized bins)
  2. First Fit + Smallest Bin (fixed number of bins)

Note: that these methods were chosen for their ease of implementation, the actual methods are however somewhat orthogonal to solving this partitioning problem in Spark.

Bin Packing

First Fit packing of a sorted list of items

(defn grow-bins [bins item]
  (conj bins (add-to-bin {:size 0 :items []} item)))

(defn first-fit-pack
  "Simple first fit algorithm that grows bins once reaching max-size.
   Should give min required bins."
  [items max-size]
  (let [init-bins [{:size 0 :items []}]]
    (reduce (fn [bins item]
              (if-let [fit (select-bin bins item max-size)]
                (update-in bins [fit] #(add-to-bin % item))
                (grow-bins bins item)))
            init-bins items)))

(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
   (= (first-fit-pack items 9)
      [{:size 9 :items [[:a 9]]}
       {:size 9 :items [[:b 7] [:h 2]]}
       {:size 9 :items [[:c 6] [:g 3]]}
       {:size 9 :items [[:d 5] [:f 4]]}
       {:size 6 :items [[:e 5] [:i 1]]}])))

  1. start with a single empty bin
    • maybe the size of the largest item
  2. for each item find the first bin that it will fit into and place it into the bin
  3. if no bins will fit the item create a new bin to put it into to
  4. continue until all items are exhausted
Bin Packing

First Fit + Smallest Bin packing of a sorted list of items

(defn add-to-smallest-bin [n bins item]
  (if (< (count bins) n)
    (grow-bins bins item)
    (update-in bins [(select-smallest-bin bins)]
               #(add-to-bin % item))))

(defn first-fit-smallest-bin-pack
  "Simple first fit algorithm that continues to add to the smallest bin
   once n bins have been filled to max-size."
  [items n max-size]
  (let [init-bins [{:size 0 :items []}]]
    (reduce (fn [bins item]
              (if-let [fit (select-bin bins item max-size)]
                (update-in bins [fit] #(add-to-bin % item))
                (add-to-smallest-bin n bins item)))
            init-bins items)))

(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
   (= (first-fit-smallest-bin-pack items 3 9)
      [{:size 14 :items [[:a 9] [:f 4] [:i 1]]}
       {:size 14 :items [[:b 7] [:e 5] [:h 2]]}
       {:size 14 :items [[:c 6] [:d 5] [:g 3]]}])))

  1. start with a single empty bin
    • maybe the size of the largest item
  2. for each item find the first bin that it will fit into and place it into the bin
  3. if no bins will fit the item add it too the bin that has the least in it
  4. continue until all items are exhausted
Bin Packing

Bin Packing the Gutenberg data

Clojure Sparkling Implementation

Clojure Sparkling Implementation

Bin Partitioning in Spark

(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
   (= (-> items
          (bin-packing/first-fit-smallest-bin-pack items 9 3)
          {:bin-count 3
           :item-indices {:e 1 :g 2 :c 2 :h 1 :b 1 :d 2 :f 0 :i 0 :a 0}})))

(defn partitioner-fn [n-partitions partition-fn]
  (su/log "Partitioning into" n-partitions "partitions.")
  (proxy [Partitioner] []
    ;; get partition index
    (getPartition [key] (partition-fn key))
    (numPartitions [] n-partitions)))

(defn partition-into-bins [ebook-urls weight-fn]
  (let [ebook-urls    (spark/cache ebook-urls)
        packing-items (spark/collect
                       (su/map (fn [[k v]] [k (weight-fn v)])
        indices       (-> (sort-by second > packing-items)
                          (bin-packing/first-fit-smallest-bin-pack 16)
     (partitioner-fn (:bin-count indices) (:item-indices indices))

  1. Using a weighting function generate a size for each key in the target RDD
  2. Apply bin packing using the key size pairs
  3. Generate function that given a key returns the bin (partition) index
  4. Create a Partitioner instance using the generated partitioning function
  5. Use the Partitioner instance to run partition-by on the RDD
Clojure Sparkling Implementation

Spark in Clojure

(ns bin-packing.spark-utils
  (:require [sparkling.core :as spark] [sparkling.destructuring :as de])
  (:import [ JavaRDD JavaPairRDD]
  (:refer-clojure :exclude [map count group-by mapcat reduce filter]))

(defprotocol Transformers
  (-map      [coll f])
  (-map-vals [coll f])
  (-group-by [coll f])
  (-mapcat   [coll f])
  (-reduce   [coll f])
  (-filter   [coll f])
  (-to-map   [coll])
  (-cache    [coll]) ...)

(extend-protocol Transformers
  (-map [coll f] (spark/map f coll)) ...
  (-map [coll f] (spark/map (de/key-value-fn (fn [k v] (f [k v]))) coll)) ...
  (-map [coll f] (clojure.core/map f coll)) ...
  (-map [coll f] (clojure.core/map f coll)) ...)

(defn map [f coll] (-map coll f))

  • Uses the Clojure Sparkling library (a wrapper over the Java API).
  • Defined a protocol to unify Spark and Clojure API's.
Clojure Sparkling Implementation

tf-idf in Spark

  (:require [bin-packing.spark-utils :as su]))

(defn tf [terms]
  (let [n-terms (count terms)]
    (su/map-vals (partial calc-tf n-terms) (frequencies terms))))

(defn term-doc-counts [id-and-terms]
  (->> (su/mapcat (fn [[k v]] (distinct v)) id-and-terms)
       (su/group-by identity)
       (su/map-vals count)))

(defn idf [n-docs term-doc-counts]
  (su/map-vals (partial calc-idf n-docs) term-doc-counts))

(defn calc-tf-and-idf [id-doc-pairs]
  (let [id-and-terms (su/cache (su/map-vals get-terms id-doc-pairs))]
    {:tfs (su/map-vals tf id-and-terms)
     :idf (su/to-map (idf (su/count id-doc-pairs)
                          (term-doc-counts id-and-terms)))}))

(defn calc-tf-idf [{:keys [tfs idf]} & {:keys [sc]}]
  (let [idf-b (su/broadcast idf :sc sc)]
    (su/map-vals #(merge-with * % (select-keys @idf-b (keys %))) tfs)))

(defn tf-idf [id-doc-pairs]
  (-> id-doc-pairs calc-tf-and-idf calc-tf-idf))

Uses the Spark Utils so the same tf-idf algorithm can be run on Clojure data structures (books in a bookshelf) or an RDD (whole bookshelves).



Running tf-idf on Gutenberg bookshelves:

EMR Cluster:

  • Master:
    • m3.xlarge
  • Core:
    • 4 x m3.xlarge
  • CPUs:
    • 20 total
  • Executors:
    • 4 (one per core)
  • Workers:
    • 16 (4 per executor)


  • Skewed S3 data, without re-partitioning
    • 38 partitions
  • Re-partitioned data, maximally partitioned by number of bookshelves
    • 228 partitions
  • Bin packed data, re-partitioned using bin packing method
    • 16 partitions

Running times:

  • Skewed
    • ~55 mins
  • Re-partitioned
    • ~25 mins
  • Bin packed
    • 15 mins




  • Data skew has the potential to cause massive inefficiencies
  • Re-partitioning can help overcome data skew issues
  • In limited resource environments bin packing can beat out simple partitioning
  • Bin packing can help save on the amount of resources you need


Search for 'how to cook dinner for forty large human people' in the Cookery Bookshelf?

Top Results -:

  • "No Animal Food"
  • "The Art of Living in Australia;"
  • "Vegetable diet: as sanctioned by medical men, 2d ed."
  • "A Plain Cookery Book for the Working Classes"
  • "Food in war time"

Top Bookshelfs for "God begat Jesus trinity holy ark Moses" -:

  • Children's Picture Books
  • Science Fiction
  • Christianity
  • World War II