12.3. FP Pattern MapReduce
split-apply-combine strategy
The map-reduce algorithm is a programming model used for processing large amounts of data in a distributed and parallel manner. It consists of two phases: the map phase and the reduce phase.
In Python, the map-reduce algorithm can be implemented using the built-in
map()
and reduce()
functions.
Here's an example of using the map-reduce algorithm to calculate the sum of squares of a list of numbers:
>>> from functools import reduce
>>>
>>> numbers = [1, 2, 3, 4, 5]
>>>
>>> squared_numbers = list(map(lambda x: x ** 2, numbers))
>>> sum_of_squares = reduce(lambda x, y: x + y, squared_numbers)
>>>
>>> print(sum_of_squares)
55
In this example, the map()
function is used to square each number in the
numbers
list. The resulting list of squared numbers is stored in the
squared_numbers
variable.
The reduce()
function is then used to calculate the sum of the squared
numbers. The lambda
function passed to reduce()
takes two arguments
(x
and y
) and returns their sum. The reduce()
function applies
this function to the elements of the squared_numbers
list, starting with
the first two elements, and then applying the function to the result and the
next element, and so on, until all elements have been processed.
The final result (55
) is the sum of the squares of the original list of
numbers.
The map-reduce algorithm can be used for a wide range of data processing tasks, such as filtering, sorting, and aggregating data. By dividing the work into smaller tasks that can be performed in parallel, the algorithm can process large amounts of data efficiently.
Apply function of two arguments cumulatively to the items of iterable, from
left to right, so as to reduce the iterable to a single value. For example,
reduce(lambda x, y: x+y, [1, 2, 3, 4, 5])
calculates
((((1+2)+3)+4)+5)
. The left argument, x, is the accumulated value and
the right argument, y, is the update value from the iterable. If the
optional initializer is present, it is placed before the items of the
iterable in the calculation, and serves as a default when the iterable is
empty. If initializer is not given and iterable contains only one item, the
first item is returned.
12.3.1. About
Source: https://dask.org
Map-Reduce is a programming model for processing large datasets in parallel across a cluster of computers. The Map-Reduce algorithm consists of two main phases: the map phase and the reduce phase.
In the map phase, the input data is divided into smaller chunks and processed in parallel by multiple nodes in the cluster. Each node applies a mapping function to the input data and produces a set of key-value pairs as output.
In the reduce phase, the output of the map phase is collected and processed by a single node in the cluster. The reduce function aggregates the key-value pairs produced by the map function and produces a final output.
Here's an example of using the Map-Reduce algorithm in Python to count the number of occurrences of each word in a large text file:
>>>
... from multiprocessing import Pool
... import collections
...
... # Define the mapping function
... def map_function(line):
... words = line.strip().split()
... return [(word, 1) for word in words]
...
... # Define the reduce function
... def reduce_function(key_value_pairs):
... word_counts = collections.defaultdict(int)
... for key, value in key_value_pairs:
... word_counts[key] += value
... return word_counts.items()
...
... # Read the input file
... with open('/tmp/myfile.txt', mode='r') as input_file:
... lines = input_file.readlines()
...
... # Divide the input into chunks
... chunk_size = len(lines) // 4
... chunks = [lines[i:i+chunk_size] for i in range(0, len(lines), chunk_size)]
...
... # Process the chunks in parallel
... with Pool(processes=4) as pool:
... mapped_results = pool.map(map_function, chunks)
...
... # Collect the output of the map phase
... mapped_output = [item for sublist in mapped_results for item in sublist]
...
... # Group the mapped results by key and reduce the values
... grouped_output = collections.defaultdict(list)
... for key, value in mapped_output:
... grouped_output[key].append(value)
... reduced_output = reduce_function(grouped_output.items())
...
... # Print the final output
... for word, count in reduced_output:
... print(word, count)
In this example, the input text file is divided into four chunks, which are
processed in parallel by four processes using the Pool
class from the
multiprocessing
module. The map_function
applies a mapping function
to each line of the input file, producing a list of key-value pairs for each
line. The reduce_function
aggregates the key-value pairs produced by the
map function, producing a final output of word counts. The defaultdict
class from the collections
module is used to simplify the code for
grouping and reducing the key-value pairs.
12.3.2. SetUp
>>> from functools import reduce
12.3.3. Pattern
>>> class MapReduce:
... def __init__(self, values):
... self.values = values
...
... def filter(self, fn):
... self.values = filter(fn, self.values)
... return self
...
... def map(self, fn):
... self.values = map(fn, self.values)
... return self
...
... def reduce(self, fn):
... return reduce(fn, self.values)
12.3.4. Usage
>>> DATA = [1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> result = (
... MapReduce(DATA)
... .map(lambda x: x ** 2)
... .map(lambda x: x / 2)
... .map(lambda x: x + 2)
... .map(lambda x: round(x, 2))
... .map(lambda x: int(x))
... .filter(lambda x: x > 10)
... .reduce(lambda x,y: x + y)
... )
>>>
>>> result
136
12.3.5. Use Case - 1
>>> def even(x):
... return x % 2 == 0
>>>
>>> def positive(x):
... return x > 0
>>>
>>> def non_negative(x):
... return x >= 0
>>>
>>> def square(x):
... return x ** 2
>>>
>>> def increment(x):
... return x + 1
>>>
>>> def decrement(x):
... return x - 1
>>> from functools import reduce
>>> from operator import add
>>>
>>> data = range(0, 1024)
>>> data = filter(even, data)
>>> data = filter(positive, data)
>>> data = filter(non_negative, data)
>>> data = map(square, data)
>>> data = map(increment, data)
>>> data = map(decrement, data)
>>> result = reduce(add, data)
>>>
>>> result
178433024
12.3.6. Use Case - 2
>>> def even(x):
... return x % 2 == 0
>>>
>>> def positive(x):
... return x > 0
>>>
>>> def non_negative(x):
... return x >= 0
>>>
>>> def square(x):
... return x ** 2
>>>
>>> def increment(x):
... return x + 1
>>>
>>> def decrement(x):
... return x - 1
>>> filters = [
... even,
... positive,
... non_negative,
... ]
>>>
>>> maps = [
... square,
... increment,
... decrement,
... ]
>>>
>>> def apply(data, fn):
... return map(fn, data)
>>> from functools import reduce
>>> from operator import add
>>>
>>> data = range(0, 1024)
>>> data = reduce(apply, filters, data)
>>> data = reduce(apply, maps, data)
>>> result = reduce(add, data)
>>>
>>> result
1024
12.3.7. Use Case - 3
This is our function library.
Transformation functions (non-reducing) - takes one argument and returns one value:
>>> def increment(x):
... return x + 1
>>>
>>> def decrement(x):
... return x - 1
>>>
>>> def square(x):
... return x ** 2
>>>
>>> def cube(x):
... return x ** 3
Reducing functions - takes two arguments returns one value:
>>> def add(x, y):
... return x + y
>>>
>>> def sub(x, y):
... return x - y
>>>
>>> def mul(x, y):
... return x * x
We have data to compute:
>>> data = [
... [1, 2, 3],
... [4, 5, 6],
... [7, 8, 9],
... ]
On this data, we want to apply the following transformations:
>>> transformations = [increment, square, decrement, cube]
We need to create apply function, which takes data and apply the transformation:
>>> def apply(data, fn):
... return map(fn, data)
Let's do it parallel. We will create three independent workers. Each worker will get part of the data (one-third) and will apply all the transformation (map) to their data subset.
>>> workerA = reduce(apply, transformations, data[0]) # [27, 512, 3375]
>>> workerB = reduce(apply, transformations, data[1]) # [13824, 42875, 110592]
>>> workerC = reduce(apply, transformations, data[2]) # [250047, 512000, 970299]
Note, that all workers will produce generators (maps).
We need to merge the results using reduce
function,
but before that we need to evaluate maps to lists.
>>> def merge(x, y):
... return list(x) + list(y)
>>> merged = reduce(merge, [workerA, workerB, workerC]) # [27, 512, 3375, 13824, 42875, 110592, 250047, 512000, 970299]
>>> result = reduce(add, merged)
>>>
>>> print(result)
1903551
12.3.8. Use Case - 4
>>> from itertools import chain
>>> from functools import reduce, partial
>>> from operator import add, sub, mul, pow
>>> from math import sqrt, cbrt
>>> transformations = [
... partial(add, 1),
... partial(sub, 1),
... partial(pow, 2),
... partial(pow, 3),
... ]
>>> data = [
... [1, 2, 3],
... [4, 5, 6],
... [7, 8, 9],
... ]
>>> def apply(data, fn):
... return map(fn, data)
>>> workerA = reduce(apply, transformations, data[0]) # [27, 512, 3375]
>>> workerB = reduce(apply, transformations, data[1]) # [13824, 42875, 110592]
>>> workerC = reduce(apply, transformations, data[2]) # [250047, 512000, 970299]
>>> reduce(add, chain(workerA, workerB, workerC))
10.333713311264441