Ian Jones Logo


Problem statement:

You have a database with 100+k records that you need to process and you can't load them all into memory all at once. You have the database locally, so you dont have to worry about locking tables etc. How do you pull out these records in batches and process each one in a memory safe way?

Enter clojure.core.reducers/fold.

The docstring can explain fold better than I can:

= clojure.core.reducers/fold
[reducef coll]
[combinef reducef coll]
[n combinef reducef coll]
Reduces a collection using a (potentially parallel) reduce-combine
strategy. The collection is partitioned into groups of approximately
n (default 512), each of which is reduced with reducef (with a seed
value obtained by calling (combinef) with no arguments). The results
of these reductions are then reduced with combinef (default
reducef). combinef must be associative, and, when called with no
arguments, (combinef) must produce its identity element. These
operations may be performed in parallel, but the results will
preserve order.

The first param controls how large the coll you pass it is partitioned into.

Next, if you are calling the 4 arity function, the combinef takes the result of the reducef function and combines the partitions result into the fold result.

If you pass your own combinef function, it must be an multi-arity function that produces an identity when no arguments are passed, and a combining function when 2 args are passed, will take the accumulated value and the result of the last partition and combine them together.

An example of this is the + function. When called with no args it produces its identity:

;; => 0

and when you pass 2 args:

(+ 1 1)
;; => 2

reducef is a regular ol reduce function, that takes the accumulator and the current value and returns the result.

Lastly, you have coll, which is any seq. It is especially useful to pass a LazySeq so that only the partitions being worked on are realized in memory.

To put it together, we can take a range of xs from 0 to 100,000 and add them up:

(require '[clojure.core.reducers])
(def xs (range 100000)) ;; LazySeq
(r/fold 100 ;; 100 groups
(fn ([] 0) ;; Identity value
([accumulator current] (+ accumulator current))) ;; Partition result combiner function
(fn [accumulator current]
(+ accumulator current)) ;; Partition reducer
;; => 499500

Since + can product an identity when passed no arguments or produce a sum when passed & n arguments, this call to fold can be simplified:

(require '[clojure.core.reducers])
(def xs (range 100000)) ;; LazySeq
(r/fold 100 + + xs)
;; => 499500

If you dont care about the default batch size being 512, and since you are using the same function for combinef and reducef then you can simplify even further:

(require '[clojure.core.reducers])
(def xs (range 100000)) ;; LazySeq
(r/fold + xs)
;; => 499500

Now lets add a database.

You can use next.jdbc to query a table where the result is foldable just like our range is from the above example.

Lets say you have a database with a records table, and this records table has a column called annual_income. The problem is that the annual_income column is a string in the format of $10,000.00 for 10000 dollars or $100.00 for 100 dollars. You have to parse the string before you can add it together.

First, how do you get a lazy sequence from the whole table?

You can use next.jdbc/plan to accomplish this. All you have to do is pass it a database connection and sql params and it will return a foldable.

(require '[next.jdbc :as jdbc])
(def db
(datasource {:database-name "records"
:username "user"
:pool-name "records-pool"
:server-name "localhost"}))
(def foldable (jdbc/plan @db ["select annual_income from records"]))

These are implementation details but foldable will be of type IReduceInit that can be folded on.

Putting the fold together you will get:

(require '[next.jdbc :as jdbc])
(require '[utils :as u]) ; some namespace to parse the money string
(def db
(datasource {:database-name "records"
:username "user"
:pool-name "records-pool"
:server-name "localhost"}))
(r/fold 100 ; partition size
+ ; combinef function
(fn [total-income {:records/keys [annual_income] :as x}] ; jdbc default result set will namespace keys with the table name
(+ total-income (u/dollar->int annual_income)))
(jdbc/plan @db ["select annual_income from records"]))

The fold function call should look familiar. All that changed was where you got the data, now form jdbc/plan and how you add each partition up, having to parse the annual income.