Runners¶
While the MRJob
class is the part of the framework that
handles the execution of your code in a MapReduce context, the runner is
the part that packages and submits your job to be run, and reporting the
results back to you.
In most cases, you will interact with runners via the command line and
configuration files. When you invoke mrjob via the command line, it reads your
command line options (the --runner
parameter) to determine which type of
runner to create. Then it creates the runner, which reads your configuration
files and command line args and starts your job running in whatever context
you chose.
Most of the time, you won’t have any reason to construct a runner directly.
Instead you’ll invoke your Python script on the command line and it will make a
runner automatically or you’ll write some sort of wrapper that calls
my_job.make_runner()
.
Internally, the general order of operations is:
- Get a runner by calling
make_runner()
on your job - Call
run()
on your runner. This will:- Copy your job and supporting files to Hadoop
- Instruct Hadoop to run your job with the appropriate
--mapper
,--combiner
,--reducer
, and--step-num
arguments
Each runner runs a single job once; if you want to run a job multiple times, make multiple runners.
Subclasses: DataprocJobRunner
,
EMRJobRunner
,
HadoopJobRunner
,
InlineMRJobRunner
,
LocalMRJobRunner
Testing locally¶
To test the job locally, just run:
python your_mr_job_sub_class.py < log_file_or_whatever > output
The script will automatically invoke itself to run the various steps, using
InlineMRJobRunner
(--runner=inline
). If you want
to simulate Hadoop more closely, you can use --runner=local
, which doesn’t
add your working directory to the PYTHONPATH
, sets a few Hadoop
environment variables, and uses multiple subprocesses for tasks.
You can also run individual steps:
# test 1st step mapper:
python your_mr_job_sub_class.py --mapper
# test 2nd step reducer (step numbers are 0-indexed):
python your_mr_job_sub_class.py --reducer --step-num=1
By default, we read from stdin, but you can also specify one or more input files. It automatically decompresses .gz and .bz2 files:
python your_mr_job_sub_class.py log_01.gz log_02.bz2 log_03
See mrjob.examples
for more examples.
Running on your own Hadoop cluster¶
Set up a hadoop cluster (see http://hadoop.apache.org/docs/current/)
Run your job with
-r hadoop
:python your_mr_job_sub_class.py -r hadoop < input > output
Note
You don’t need to install mrjob
or any other libraries on the nodes
of your Hadoop cluster, but they do at least need a version of Python
that’s compatible with your job.
Running on EMR¶
Set up your Amazon account and credentials (see Configuring AWS credentials)
Run your job with
-r emr
:python your_mr_job_sub_class.py -r emr < input > output
Running on Dataproc¶
Set up your Google account and credentials (see Getting started with Google Cloud)
Run your job with
-r dataproc
:python your_mr_job_sub_class.py -r dataproc < input > output
Configuration¶
Runners are configured by several methods:
- from
mrjob.conf
(see Config file format and location) - from the command line
- by re-defining
job_runner_kwargs()
etc in yourMRJob
(see Job runner configuration) - by instantiating the runner directly
In most cases, you should put all configuration in mrjob.conf
and use the
command line args or class variables to customize how individual jobs are run.
Running your job programmatically¶
It is fairly common to write an organization-specific wrapper around mrjob. Use
make_runner()
to run an MRJob
from another Python script. The context manager guarantees that all temporary
files are cleaned up regardless of the success or failure of your job.
This pattern can also be used to write integration tests (see Testing jobs).
mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
runner.run()
for key, value in mr_job.parse_output(runner.cat_output()):
... # do something with the parsed output
You instantiate the MRJob
, use a context manager to
create the runner, run the job, and cat its output, parsing that output with
the job’s output protocol.
Further reference:
Limitations¶
Note
You should pay attention to the next sentence.
You cannot use the programmatic runner functionality in the same file as your job class. As an example of what not to do, here is some code that does not work.
Warning
The code below shows you what not to do.
from mrjob.job import MRJob
class MyJob(MRJob):
# (your job)
# no, stop, what are you doing?!?!
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
runner.run()
# ... etc
What you need to do instead is put your job in one file, and your run code in another. Here are two files that would correctly handle the above case.
# job.py
from mrjob.job import MRJob
class MyJob(MRJob):
# (your job)
if __name__ == '__main__':
MyJob.run()
# run.py
from job import MyJob
mr_job = MyJob(args=[args])
with mr_job.make_runner() as runner:
runner.run()
# ... etc
Why can’t I put the job class and run code in the same file?¶
The file with the job class is sent to Hadoop to be run. Therefore, the job file cannot attempt to start the Hadoop job, or you would be recursively creating Hadoop jobs!
The code that runs the job should only run outside of the Hadoop context.
The if __name__ == '__main__'
block is only run if you invoke the job file
as a script. It is not run when imported. That’s why you can import the job
class to be run, but it can still be invoked as an executable.
Counters¶
Counters may be read through the
counters()
method on the runner. The
example below demonstrates the use of counters in a test case.
mr_counting_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRCountingJob(MRJob):
def steps(self):
# 3 steps so we can check behavior of counters for multiple steps
return [MRStep(self.mapper),
MRStep(self.mapper),
MRStep(self.mapper)]
def mapper(self, _, value):
self.increment_counter('group', 'counter_name', 1)
yield _, value
if __name__ == '__main__':
MRCountingJob.run()
test_counters.py
from io import BytesIO
from unittest import TestCase
from tests.mr_counting_job import MRCountingJob
class CounterTestCase(TestCase):
def test_counters(self):
stdin = BytesIO(b'foo\nbar\n')
mr_job = MRCountingJob(['--no-conf', '-'])
mr_job.sandbox(stdin=stdin)
with mr_job.make_runner() as runner:
runner.run()
self.assertEqual(runner.counters(),
[{'group': {'counter_name': 2}},
{'group': {'counter_name': 2}},
{'group': {'counter_name': 2}}])