Fold
Problem statement:
You have a database with 100+k records that you need to process and you can't load them all into memory all at once. You have the database locally, so you dont have to worry about locking tables etc. How do you pull out these records in batches and process each one in a memory safe way?
Enter clojure.core.reducers/fold
.
The docstring can explain fold
better than I can:
= clojure.core.reducers/fold[reducef coll][combinef reducef coll][n combinef reducef coll]Reduces a collection using a (potentially parallel) reduce-combinestrategy. The collection is partitioned into groups of approximatelyn (default 512), each of which is reduced with reducef (with a seedvalue obtained by calling (combinef) with no arguments). The resultsof these reductions are then reduced with combinef (defaultreducef). combinef must be associative, and, when called with noarguments, (combinef) must produce its identity element. Theseoperations may be performed in parallel, but the results willpreserve order.
The first param controls how large the coll
you pass it is partitioned into.
Next, if you are calling the 4 arity function, the combinef
takes the result of the reducef
function and combines the partitions result into the fold result.
If you pass your own combinef
function, it must be an multi-arity function that produces an identity when no arguments are passed, and a combining function when 2 args are passed, will take the accumulated value and the result of the last partition and combine them together.
An example of this is the +
function. When called with no args it produces its identity:
(+);; => 0
and when you pass 2 args:
(+ 1 1);; => 2
reducef
is a regular ol reduce function, that takes the accumulator
and the current
value and returns the result.
Lastly, you have coll
, which is any seq. It is especially useful to pass a LazySeq
so that only the partitions being worked on are realized in memory.
To put it together, we can take a range of xs
from 0 to 100,000 and add them up:
(require '[clojure.core.reducers])(def xs (range 100000)) ;; LazySeq(r/fold 100 ;; 100 groups(fn ([] 0) ;; Identity value([accumulator current] (+ accumulator current))) ;; Partition result combiner function(fn [accumulator current](+ accumulator current)) ;; Partition reducerxs);; => 499500
Since +
can product an identity when passed no arguments or produce a sum when passed & n
arguments, this call to fold
can be simplified:
(require '[clojure.core.reducers])(def xs (range 100000)) ;; LazySeq(r/fold 100 + + xs);; => 499500
If you dont care about the default batch size being 512, and since you are using the same function for combinef
and reducef
then you can simplify even further:
(require '[clojure.core.reducers])(def xs (range 100000)) ;; LazySeq(r/fold + xs);; => 499500
Now lets add a database.
You can use next.jdbc
to query a table where the result is foldable just like our range is from the above example.
Lets say you have a database with a records
table, and this records table has a column called annual_income
. The problem is that the annual_income
column is a string in the format of $10,000.00
for 10000 dollars or $100.00
for 100 dollars. You have to parse the string before you can add it together.
First, how do you get a lazy sequence from the whole table?
You can use next.jdbc/plan
to accomplish this. All you have to do is pass it a database connection and sql params and it will return a foldable.
(require '[next.jdbc :as jdbc])(def db(datasource {:database-name "records":username "user":pool-name "records-pool":server-name "localhost"}))(def foldable (jdbc/plan @db ["select annual_income from records"]))
These are implementation details but foldable
will be of type IReduceInit
that can be folded on.
Putting the fold
together you will get:
(require '[next.jdbc :as jdbc])(require '[utils :as u]) ; some namespace to parse the money string(def db(datasource {:database-name "records":username "user":pool-name "records-pool":server-name "localhost"}))(r/fold 100 ; partition size+ ; combinef function(fn [total-income {:records/keys [annual_income] :as x}] ; jdbc default result set will namespace keys with the table name(+ total-income (u/dollar->int annual_income)))(jdbc/plan @db ["select annual_income from records"]))
The fold
function call should look familiar. All that changed was where you got the data, now form jdbc/plan
and how you add each partition up, having to parse the annual income.