Testing jobs¶
mrjob can run jobs without the help of Hadoop. This isn’t very efficient, but it’s a great way to test a job before submitting it to a cluster.
Inline runner¶
The inline
runner (InlineMRJobRunner
)
is the default runner for mrjob (it’s what’s used
when you run python mr_your_job.py <input> without any -r
option). It runs your job in a single process so that you get
faster feedback and simpler tracebacks.
Multiple splits¶
The inline
runner doesn’t run mappers or reducers concurrently, but it
does run at least two mappers and two reducers for each step. This can help
catch bad assumptions about the MapReduce programming model.
For example, say we wanted to write a simple script that counted the number of lines of input:
from mrjob.job import MRJob
class MRCountLinesWrong(MRJob):
def mapper_init(self):
self.num_lines = 0
def mapper(self, _, line):
self.num_lines += 1
def mapper_final(self):
yield None, self.num_lines
if __name__ == '__main__':
MRCountLinesWrong.run()
Looks good, but if we run it, we get more than one line count:
$ python -m mrjob.examples.mr_count_lines_wrong README.rst 2> /dev/null
null 77
null 60
Aha! Because there can be more than one mapper! It’s fine to use
mapper_final()
like this, but we need to reduce on a
single key:
from mrjob.job import MRJob
class MRCountLinesRight(MRJob):
def mapper_init(self):
self.num_lines = 0
def mapper(self, _, line):
self.num_lines += 1
def mapper_final(self):
yield None, self.num_lines
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRCountLinesRight.run()
$ python -m mrjob.examples.mr_count_lines_right README.rst 2> /dev/null
null 137
Thanks, inline runner!
Isolated working directories¶
Just like Hadoop, the inline runner runs each mapper and reducer in its own
(temporary) working directory. It does add the original working directory
to $PYTHONPATH
so it can still access your local source tree.
Simulating jobconf¶
The inline runner simulates jobconf variables/properties set by Hadoop (and their Hadoop 1 equivalents):
mapreduce.job.cache.archives
(mapred.cache.archives
)mapreduce.job.cache.files
(mapred.cache.files
)mapreduce.job.cache.local.archives
(mapred.cache.localArchives
)mapreduce.job.cache.local.files
(mapred.cache.localFiles
)mapreduce.job.id
(mapred.job.id
)mapreduce.job.local.dir
(job.local.dir
)mapreduce.map.input.file
(map.input.file
)mapreduce.map.input.length
(map.input.length
)mapreduce.map.input.start
(map.input.start
)mapreduce.task.attempt.id
(mapred.task.id
)mapreduce.task.id
(mapred.tip.id
)mapreduce.task.ismap
(mapred.task.is.map
)mapreduce.task.output.dir
(mapred.work.output.dir
)mapreduce.task.partition
(mapred.task.partition
)
You can use jobconf_from_env()
to read these from
your job’s environment. For example:
from mrjob.compat import jobconf_from_env
from mrjob.job import MRJob
class MRCountLinesByFile(MRJob):
def mapper(self, _, line):
yield jobconf_from_env('mapreduce.map.input.file'), 1
def reducer(self, path, ones):
yield path, sum(ones)
if __name__ == '__main__':
MRCountLinesByFile.run()
$ python -m mrjob.examples.mr_count_lines_by_file README.rst CHANGES.txt 2> /dev/null
"CHANGES.txt" 564
"README.rst" 137
If you only want to simulate jobconf variables from a single version of Hadoop (for more stringent testing), you can set hadoop_version.
Setting number of mappers and reducers¶
Want more or less splits? You can tell the inline runner the same way
you’d tell hadoop, with the mapreduce.job.maps
and
mapreduces.job.reduces
jobconf options:
$ python -m mrjob.examples.mr_count_lines_wrong --jobconf mapreduce.job.maps=5 README.rst 2> /dev/null
null 24
null 33
null 38
null 30
null 12
Local runnner¶
The local
runner (LocalMRJobRunner
;
run using -r local
) supports
the above features, but, unlike the inline
runner, it uses subprocesses.
This means it can be used to test options that don’t make sense in a single-process context, including:
The local runner does run multiple subprocesses concurrently, but it’s not really meant as a replacement for Hadoop; it’s just for testing!
Anatomy of a test case¶
So, you’ve gotten a job working. Great! Here’s how you write a regression test so that future developers won’t break it.
For this example we’ll use a
test of the *_init()
methods from the mrjob test cases:
from mrjob.job import MRJob
class MRInitJob(MRJob):
def __init__(self, *args, **kwargs):
super(MRInitJob, self).__init__(*args, **kwargs)
self.sum_amount = 0
self.multiplier = 0
self.combiner_multipler = 1
def mapper_init(self):
self.sum_amount += 10
def mapper(self, key, value):
yield(None, self.sum_amount)
def reducer_init(self):
self.multiplier += 10
def reducer(self, key, values):
yield(None, sum(values) * self.multiplier)
def combiner_init(self):
self.combiner_multiplier = 2
def combiner(self, key, values):
yield(None, sum(values) * self.combiner_multiplier)
Without using any mrjob features, we can write a simple test case to make sure our methods are behaving as expected:
from unittest import TestCase
class MRInitTestCase(TestCase):
def test_mapper(self):
j = MRInitJob([])
j.mapper_init()
self.assertEqual(j.mapper(None, None).next(), (None, j.sum_amount))
To test the full job, you need to set up input, run the job, and check the
collected output. The most straightforward way to provide input is to use the
sandbox()
method. Create a BytesIO
object, populate it with data, initialize your job to read from stdin, and
enable the sandbox with your BytesIO
as stdin.
You’ll probably also want to specify --no-conf
so options from your local mrjob.conf
don’t pollute your testing
environment.
This example reads from stdin (hence the -
parameter):
from io import BytesIO
def test_init_funcs(self):
num_inputs = 2
stdin = BytesIO(b'x\n' * num_inputs)
mr_job = MRInitJob(['--no-conf'])
mr_job.sandbox(stdin=stdin)
To run the job without leaving temp files on your system, use the
make_runner()
context manager.
make_runner()
creates the runner specified in the
command line arguments and ensures that job cleanup is performed regardless of
the success or failure of the job.
Run the job with run()
. The job’s output
is available as a generator through
cat_output()
and can be parsed with
the job’s output protocol using parse_output()
:
results = []
with mr_job.make_runner() as runner:
runner.run()
for key, value in mrjob.parse_output(runner.cat_output()):
results.append(value)
# these numbers should match if mapper_init, reducer_init, and
# combiner_init were called as expected
self.assertEqual(sorted(results)[0], num_inputs * 10 * 10 * 2)
Warning
Do not let your tests depend on the input lines being processed in a certain order. Both mrjob and Hadoop divide input non-deterministically.