Large scale, general, distributed computation across a clusters of computers.
Immutable, partitioned and distributed.
Given some search terms:
Inherently skewed data set; 228 bookshelves, 1 book minimum, 1341 book maximum, standard deviation ~ 121
https://www.gutenberg.org/w/index.php?title=Category:BookshelfMore partitions = greater potential parallelism
To create more balanced computation across the workers maybe we can partition the data into more evenly weighted partitions?
The problem roughly falls into a class of optimization problems known as packing problems.
https://en.wikipedia.org/wiki/Packing_problemsThese are NP-hard problems so we will need to make use of heuristical methods or possibly a constraint solver.
Our problem seems most closely related to the Bin packing problem (minimum number of bins of a certain volume) or maybe the Multiprocessor scheduling problem (pack into a specific number of bins)
Attempted using a constraint solver (in Python), it was FAR too slow!
from pyschedule import Scenario, solvers
import base64
def bin_packing_solver(items, get_size=lambda x: x[1], get_label=lambda x: x[0],
partitions=10, horizon=None):
if not horizon:
horizon = get_size(items[0]) + 1
S = Scenario('binpacking', horizon=horizon)
resources = map(lambda x: S.Resource(str(x)), range(partitions))
tasks = map(lambda x: S.Task(base64.b64encode(get_label(x)), get_size(x)), items)
for t in tasks:
t += reduce(lambda x,y: x|y, resources)
S.use_makespan_objective()
solvers.mip.solve(S,msg=1)
return {base64.b64decode(str(s[0])): s[1] for s in S.solution()[:-1]}
Note: that these methods were chosen for their ease of implementation, the actual methods are however somewhat orthogonal to solving this partitioning problem in Spark.
(defn grow-bins [bins item]
(conj bins (add-to-bin {:size 0 :items []} item)))
(defn first-fit-pack
"Simple first fit algorithm that grows bins once reaching max-size.
Should give min required bins."
[items max-size]
(let [init-bins [{:size 0 :items []}]]
(reduce (fn [bins item]
(if-let [fit (select-bin bins item max-size)]
(update-in bins [fit] #(add-to-bin % item))
(grow-bins bins item)))
init-bins items)))
(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
(assert
(= (first-fit-pack items 9)
[{:size 9 :items [[:a 9]]}
{:size 9 :items [[:b 7] [:h 2]]}
{:size 9 :items [[:c 6] [:g 3]]}
{:size 9 :items [[:d 5] [:f 4]]}
{:size 6 :items [[:e 5] [:i 1]]}])))
(defn add-to-smallest-bin [n bins item]
(if (< (count bins) n)
(grow-bins bins item)
(update-in bins [(select-smallest-bin bins)]
#(add-to-bin % item))))
(defn first-fit-smallest-bin-pack
"Simple first fit algorithm that continues to add to the smallest bin
once n bins have been filled to max-size."
[items n max-size]
(let [init-bins [{:size 0 :items []}]]
(reduce (fn [bins item]
(if-let [fit (select-bin bins item max-size)]
(update-in bins [fit] #(add-to-bin % item))
(add-to-smallest-bin n bins item)))
init-bins items)))
(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
(assert
(= (first-fit-smallest-bin-pack items 3 9)
[{:size 14 :items [[:a 9] [:f 4] [:i 1]]}
{:size 14 :items [[:b 7] [:e 5] [:h 2]]}
{:size 14 :items [[:c 6] [:d 5] [:g 3]]}])))
(let [items [[:a 9] [:b 7] [:c 6] [:d 5] [:e 5] [:f 4] [:g 3] [:h 2] [:i 1]]]
(assert
(= (-> items
(bin-packing/first-fit-smallest-bin-pack items 9 3)
bin-packing/item-indices)
{:bin-count 3
:item-indices {:e 1 :g 2 :c 2 :h 1 :b 1 :d 2 :f 0 :i 0 :a 0}})))
(defn partitioner-fn [n-partitions partition-fn]
(su/log "Partitioning into" n-partitions "partitions.")
(proxy [Partitioner] []
;; get partition index
(getPartition [key] (partition-fn key))
(numPartitions [] n-partitions)))
(defn partition-into-bins [ebook-urls weight-fn]
(let [ebook-urls (spark/cache ebook-urls)
packing-items (spark/collect
(su/map (fn [[k v]] [k (weight-fn v)])
ebook-urls))
indices (-> (sort-by second > packing-items)
(bin-packing/first-fit-smallest-bin-pack 16)
bin-packing/item-indices)]
(spark/partition-by
(partitioner-fn (:bin-count indices) (:item-indices indices))
ebook-urls)))
(ns bin-packing.spark-utils
(:require [sparkling.core :as spark] [sparkling.destructuring :as de])
(:import [org.apache.spark.api.java JavaRDD JavaPairRDD]
(:refer-clojure :exclude [map count group-by mapcat reduce filter]))
(defprotocol Transformers
(-map [coll f])
(-map-vals [coll f])
(-group-by [coll f])
(-mapcat [coll f])
(-reduce [coll f])
(-filter [coll f])
(-to-map [coll])
(-cache [coll]) ...)
(extend-protocol Transformers
org.apache.spark.api.java.JavaRDD
(-map [coll f] (spark/map f coll)) ...
org.apache.spark.api.java.JavaPairRDD
(-map [coll f] (spark/map (de/key-value-fn (fn [k v] (f [k v]))) coll)) ...
clojure.lang.IPersistentMap
(-map [coll f] (clojure.core/map f coll)) ...
clojure.lang.IPersistentCollection
(-map [coll f] (clojure.core/map f coll)) ...)
(defn map [f coll] (-map coll f))
(ns bin-packing.tf-idf
(:require [bin-packing.spark-utils :as su]))
(defn tf [terms]
(let [n-terms (count terms)]
(su/map-vals (partial calc-tf n-terms) (frequencies terms))))
(defn term-doc-counts [id-and-terms]
(->> (su/mapcat (fn [[k v]] (distinct v)) id-and-terms)
(su/group-by identity)
(su/map-vals count)))
(defn idf [n-docs term-doc-counts]
(su/map-vals (partial calc-idf n-docs) term-doc-counts))
(defn calc-tf-and-idf [id-doc-pairs]
(let [id-and-terms (su/cache (su/map-vals get-terms id-doc-pairs))]
{:tfs (su/map-vals tf id-and-terms)
:idf (su/to-map (idf (su/count id-doc-pairs)
(term-doc-counts id-and-terms)))}))
(defn calc-tf-idf [{:keys [tfs idf]} & {:keys [sc]}]
(let [idf-b (su/broadcast idf :sc sc)]
(su/map-vals #(merge-with * % (select-keys @idf-b (keys %))) tfs)))
(defn tf-idf [id-doc-pairs]
(-> id-doc-pairs calc-tf-and-idf calc-tf-idf))
Uses the Spark Utils so the same tf-idf algorithm can be run on Clojure data structures (books in a bookshelf) or an RDD (whole bookshelves).
EMR Cluster:
Methods:
Search for 'how to cook dinner for forty large human people' in the Cookery Bookshelf?
Top Results -:
Top Bookshelfs for "God begat Jesus trinity holy ark Moses" -: