Balancing Spark

Pixabay

Outline

  • What is Spark?
  • Frequency Analysis
  • Data Skew
  • Project Gutenberg
  • Diagnosis
  • Partitioning
  • Packing Problems
  • Bin Packing
  • Clojure Sparkling Implementation
  • Results
  • Conclusion

What is Spark?

© 2012 Yu-Chan Chen, Flickr | PD-MK | via Wylio
What is Spark?

Large scale, general, distributed computation across a clusters of computers.

What is Spark?

Resilient Distributed Datasets (RDD)

Immutable, partitioned and distributed.



  • Transformations
    • map(func)
    • flatMap(func)
    • filter(func)
    • groupByKey()
    • mapValues(func)
    • union(other)
    • ...
  • Actions
    • reduce(func)
    • collect()
    • count()
    • take(n)
    • saveAsTextFile(path)
    • ...

Frequency Analysis

© 2007 Francesco Paroni Sterbini, Flickr | CC-BY-ND | via Wylio
Frequency Analysis

Term Frequency-Inverse Document Frequency


$${\color{crimson}W}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}={\color{darkorange}tf}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}\times log(\frac{\color{deeppink}N}{{\color{darkmagenta}df}_{{\color{mediumseagreen}term}}})$$

${\color{crimson}W}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}$ = weight of $_{{\color{mediumseagreen}term}}$ in $_{{\color{cornflowerblue}doc}}$
${\color{darkorange}tf}_{{\color{mediumseagreen}term}, {\color{cornflowerblue}doc}}$ = frequency of $_{{\color{mediumseagreen}term}}$ in $_{{\color{cornflowerblue}doc}}$
${\color{darkmagenta}df}_{{\color{mediumseagreen}term}}$ = number of documents containing $_{{\color{mediumseagreen}term}}$
${\color{deeppink}N}$ = total number of documents

Data Skew

Pixabay
Data Skew

What is data skew?

Data Skew

Causes of skew



from backtobazics.com

Project Gutenberg

© 2012 denisbin, Flickr | CC-BY-ND | via Wylio
Project Gutenberg

Project Gutenberg

https://www.gutenberg.org/
Project Gutenberg

Searching Gutenberg

Given some search terms:

  1. Find the book in each Gutenberg bookshelf that those terms are most important to.
  2. Find the Gutenberg bookshelf that those terms are most important to.
Project Gutenberg

Project Gutenberg Bookshelves

Inherently skewed data set; 228 bookshelves, 1 book minimum, 1341 book maximum, standard deviation ~ 121

https://www.gutenberg.org/w/index.php?title=Category:Bookshelf

Diagnosis

© 2008 Adrian Clark, Flickr | CC-BY-ND | via Wylio
Diagnosis

How do you know if the computation is suffering from skewed data?

Diagnosis

Diagnosis via Ganglia

Diagnosis

Diagnosis via Spark UI

Partitioning

© 2012 timlewisnm, Flickr | CC-BY-SA | via Wylio
Partitioning

Can we partition our way out of this?

Partitioning

Partitions and Parallelism

Partitioning

Ideal Partitioning




More partitions = greater potential parallelism

Partitioning

Can we simply partition to the number of items we have?

Partitioning

Partitioning the Gutenberg data into the exact number of bookshelves

Partitioning

Problems with simple partitioning?

  • Overhead from small tasks
  • Poor scheduling
  • Not enough hardware
  • Unused capacity

Packing Problems

Packing Problems

To create more balanced computation across the workers maybe we can partition the data into more evenly weighted partitions?


Packing Problems

The problem roughly falls into a class of optimization problems known as packing problems.

https://en.wikipedia.org/wiki/Packing_problems
  • Set packing
  • Bin packing problem
  • Slothouber-Graatsma puzzle
  • Conway puzzle
  • Tetris
  • Covering problem
  • Knapsack problem
  • Tetrahedron packing
  • Cutting stock problem
  • Kissing number problem
  • Close-packing of equal spheres
  • Random close pack
Packing Problems

These are NP-hard problems so we will need to make use of heuristical methods or possibly a constraint solver.

Bin Packing

© 2014 Ozzy Delaney, Flickr | CC-BY | via Wylio
Bin Packing

Our problem seems most closely related to the Bin packing problem (minimum number of bins of a certain volume) or maybe the Multiprocessor scheduling problem (pack into a specific number of bins)

Bin Packing

Contraint Solver Solution

Attempted using a constraint solver (in Python), it was FAR too slow!


from pyschedule import Scenario, solvers
import base64

def bin_packing_solver(items, get_size=lambda x: x[1], get_label=lambda x: x[0],
                       partitions=10, horizon=None):

if not horizon:
  horizon = get_size(items[0]) + 1

  S = Scenario('binpacking', horizon=horizon)
  resources = map(lambda x: S.Resource(str(x)), range(partitions))
  tasks = map(lambda x: S.Task(base64.b64encode(get_label(x)), get_size(x)), items)

  for t in tasks:
    t += reduce(lambda x,y: x|y, resources)

  S.use_makespan_objective()
  solvers.mip.solve(S,msg=1)
  return {base64.b64decode(str(s[0])): s[1] for s in S.solution()[:-1]}
                
Bin Packing

Bin Packing Methods

  1. First Fit (smallest number of fixed sized bins)
  2. First Fit + Smallest Bin (fixed number of bins)

Note: that these methods were chosen for their ease of implementation, the actual methods are however somewhat orthogonal to solving this partitioning problem in Spark.


Bin Packing

First Fit packing of a sorted list of items


(defn grow-bins [bins item]
  (conj bins (add-to-bin {:size 0 :items []} item)))

(defn first-fit-pack
  "Simple first fit algorithm that grows bins once reaching max-size.
   Should give min required bins."
  [items max-size]
  (let [init-bins [{:size 0 :items []}]]
    (reduce (fn [bins item]
              (if-let [fit (select-bin bins item max-size)]
                (update-in bins [fit] #(add-to-bin % item))
                (grow-bins bins item)))
            init-bins items)))

(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
  (assert
   (= (first-fit-pack items 9)
      [{:size 9 :items [[:a 9]]}
       {:size 9 :items [[:b 7] [:h 2]]}
       {:size 9 :items [[:c 6] [:g 3]]}
       {:size 9 :items [[:d 5] [:f 4]]}
       {:size 6 :items [[:e 5] [:i 1]]}])))
                

  1. start with a single empty bin
    • maybe the size of the largest item
  2. for each item find the first bin that it will fit into and place it into the bin
  3. if no bins will fit the item create a new bin to put it into to
  4. continue until all items are exhausted
Bin Packing

First Fit + Smallest Bin packing of a sorted list of items


(defn add-to-smallest-bin [n bins item]
  (if (< (count bins) n)
    (grow-bins bins item)
    (update-in bins [(select-smallest-bin bins)]
               #(add-to-bin % item))))

(defn first-fit-smallest-bin-pack
  "Simple first fit algorithm that continues to add to the smallest bin
   once n bins have been filled to max-size."
  [items n max-size]
  (let [init-bins [{:size 0 :items []}]]
    (reduce (fn [bins item]
              (if-let [fit (select-bin bins item max-size)]
                (update-in bins [fit] #(add-to-bin % item))
                (add-to-smallest-bin n bins item)))
            init-bins items)))

(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
  (assert
   (= (first-fit-smallest-bin-pack items 3 9)
      [{:size 14 :items [[:a 9] [:f 4] [:i 1]]}
       {:size 14 :items [[:b 7] [:e 5] [:h 2]]}
       {:size 14 :items [[:c 6] [:d 5] [:g 3]]}])))
         

  1. start with a single empty bin
    • maybe the size of the largest item
  2. for each item find the first bin that it will fit into and place it into the bin
  3. if no bins will fit the item add it too the bin that has the least in it
  4. continue until all items are exhausted
Bin Packing

Bin Packing the Gutenberg data

Clojure Sparkling Implementation

© 2011 ~Pawsitive~Candie_N, Flickr | CC-BY | via Wylio
Clojure Sparkling Implementation

Bin Partitioning in Spark


(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
  (assert
   (= (-> items
          (bin-packing/first-fit-smallest-bin-pack items 9 3)
          bin-packing/item-indices)
          {:bin-count 3
           :item-indices {:e 1 :g 2 :c 2 :h 1 :b 1 :d 2 :f 0 :i 0 :a 0}})))

(defn partitioner-fn [n-partitions partition-fn]
  (su/log "Partitioning into" n-partitions "partitions.")
  (proxy [Partitioner] []
    ;; get partition index
    (getPartition [key] (partition-fn key))
    (numPartitions [] n-partitions)))

(defn partition-into-bins [ebook-urls weight-fn]
  (let [ebook-urls    (spark/cache ebook-urls)
        packing-items (spark/collect
                       (su/map (fn [[k v]] [k (weight-fn v)])
                               ebook-urls))
        indices       (-> (sort-by second > packing-items)
                          (bin-packing/first-fit-smallest-bin-pack 16)
                          bin-packing/item-indices)]
    (spark/partition-by
     (partitioner-fn (:bin-count indices) (:item-indices indices))
     ebook-urls)))
         

  1. Using a weighting function generate a size for each key in the target RDD
  2. Apply bin packing using the key size pairs
  3. Generate function that given a key returns the bin (partition) index
  4. Create a Partitioner instance using the generated partitioning function
  5. Use the Partitioner instance to run partition-by on the RDD
Clojure Sparkling Implementation

Spark in Clojure


(ns bin-packing.spark-utils
  (:require [sparkling.core :as spark] [sparkling.destructuring :as de])
  (:import [org.apache.spark.api.java JavaRDD JavaPairRDD]
  (:refer-clojure :exclude [map count group-by mapcat reduce filter]))

(defprotocol Transformers
  (-map      [coll f])
  (-map-vals [coll f])
  (-group-by [coll f])
  (-mapcat   [coll f])
  (-reduce   [coll f])
  (-filter   [coll f])
  (-to-map   [coll])
  (-cache    [coll]) ...)

(extend-protocol Transformers
  org.apache.spark.api.java.JavaRDD
  (-map [coll f] (spark/map f coll)) ...
  org.apache.spark.api.java.JavaPairRDD
  (-map [coll f] (spark/map (de/key-value-fn (fn [k v] (f [k v]))) coll)) ...
  clojure.lang.IPersistentMap
  (-map [coll f] (clojure.core/map f coll)) ...
  clojure.lang.IPersistentCollection
  (-map [coll f] (clojure.core/map f coll)) ...)

(defn map [f coll] (-map coll f))
         



  • Uses the Clojure Sparkling library (a wrapper over the Java API).
  • Defined a protocol to unify Spark and Clojure API's.
Clojure Sparkling Implementation

tf-idf in Spark


(ns bin-packing.tf-idf
  (:require [bin-packing.spark-utils :as su]))

(defn tf [terms]
  (let [n-terms (count terms)]
    (su/map-vals (partial calc-tf n-terms) (frequencies terms))))

(defn term-doc-counts [id-and-terms]
  (->> (su/mapcat (fn [[k v]] (distinct v)) id-and-terms)
       (su/group-by identity)
       (su/map-vals count)))

(defn idf [n-docs term-doc-counts]
  (su/map-vals (partial calc-idf n-docs) term-doc-counts))

(defn calc-tf-and-idf [id-doc-pairs]
  (let [id-and-terms (su/cache (su/map-vals get-terms id-doc-pairs))]
    {:tfs (su/map-vals tf id-and-terms)
     :idf (su/to-map (idf (su/count id-doc-pairs)
                          (term-doc-counts id-and-terms)))}))

(defn calc-tf-idf [{:keys [tfs idf]} & {:keys [sc]}]
  (let [idf-b (su/broadcast idf :sc sc)]
    (su/map-vals #(merge-with * % (select-keys @idf-b (keys %))) tfs)))

(defn tf-idf [id-doc-pairs]
  (-> id-doc-pairs calc-tf-and-idf calc-tf-idf))
         



Uses the Spark Utils so the same tf-idf algorithm can be run on Clojure data structures (books in a bookshelf) or an RDD (whole bookshelves).

Results

Results

Running tf-idf on Gutenberg bookshelves:

EMR Cluster:

  • Master:
    • m3.xlarge
  • Core:
    • 4 x m3.xlarge
  • CPUs:
    • 20 total
  • Executors:
    • 4 (one per core)
  • Workers:
    • 16 (4 per executor)

Methods:

  • Skewed S3 data, without re-partitioning
    • 38 partitions
  • Re-partitioned data, maximally partitioned by number of bookshelves
    • 228 partitions
  • Bin packed data, re-partitioned using bin packing method
    • 16 partitions
Results

Running times:

  • Skewed
    • ~55 mins
  • Re-partitioned
    • ~25 mins
  • Bin packed
    • 15 mins

Conclusions

Conclusions

Conclusions

  • Data skew has the potential to cause massive inefficiencies
  • Re-partitioning can help overcome data skew issues
  • In limited resource environments bin packing can beat out simple partitioning
  • Bin packing can help save on the amount of resources you need
Conclusions

Search

Search for 'how to cook dinner for forty large human people' in the Cookery Bookshelf?


Top Results -:

  • "No Animal Food"
  • "The Art of Living in Australia;"
  • "Vegetable diet: as sanctioned by medical men, 2d ed."
  • "A Plain Cookery Book for the Working Classes"
  • "Food in war time"

Top Bookshelfs for "God begat Jesus trinity holy ark Moses" -:

  • Children's Picture Books
  • Science Fiction
  • Christianity
  • World War II