mrjob.runner - base class for all runners

class mrjob.runner.MRJobRunner(mr_job_script=None, conf_paths=None, extra_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, sort_values=None, stdin=None, steps=None, step_output_dir=None, **opts)

Abstract base class for all runners

MRJobRunner.__init__(mr_job_script=None, conf_paths=None, extra_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, sort_values=None, stdin=None, steps=None, step_output_dir=None, **opts)

All runners take the following keyword arguments:

Parameters:
  • mr_job_script (str) – the path of the .py file containing the MRJob. If this is None, you won’t actually be able to run() the job, but other utilities (e.g. ls()) will work.
  • conf_paths (None or list) – List of config files to combine and use, or None to search for mrjob.conf in the default locations.
  • extra_args (list of str) – a list of extra cmd-line arguments to pass to the mr_job script. This is a hook to allow jobs to take additional arguments.
  • hadoop_input_format (str) – name of an optional Hadoop InputFormat class. Passed to Hadoop along with your first step with the -inputformat option. Note that if you write your own class, you’ll need to include it in your own custom streaming jar (see hadoop_streaming_jar).
  • hadoop_output_format (str) – name of an optional Hadoop OutputFormat class. Passed to Hadoop along with your first step with the -outputformat option. Note that if you write your own class, you’ll need to include it in your own custom streaming jar (see hadoop_streaming_jar).
  • input_paths (list of str) – Input files for your job. Supports globs and recursively walks directories (e.g. ['data/common/', 'data/training/*.gz']). If this is left blank, we’ll read from stdin
  • output_dir (str) – An empty/non-existent directory where Hadoop should put the final output from the job. If you don’t specify an output directory, we’ll output into a subdirectory of this job’s temporary directory. You can control this from the command line with --output-dir. This option cannot be set from configuration files. If used with the hadoop runner, this path does not need to be fully qualified with hdfs:// URIs because it’s understood that it has to be on HDFS.
  • partitioner (str) – Optional name of a Hadoop partitioner class, e.g. 'org.apache.hadoop.mapred.lib.HashPartitioner'. Hadoop streaming will use this to determine how mapper output should be sorted and distributed to reducers.
  • sort_values (bool) – if true, set partitioners and jobconf variables so that reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.
  • stdin – an iterable (can be a BytesIO or even a list) to use as stdin. This is a hook for testing; if you set stdin via sandbox(), it’ll get passed through to the runner. If for some reason your lines are missing newlines, we’ll add them; this makes it easier to write automated tests.
  • steps – a list of descriptions of steps to run (see mrjob.step - represent Job Steps for description formats)
  • step_output_dir (str) – An empty/non-existent directory where Hadoop should put output from all steps other than the last one (this only matters for multi-step jobs). Currently ignored by local runners.

Running your job

MRJobRunner.run()

Run the job, and block until it finishes.

Raise StepFailedException if there are any problems (except on InlineMRJobRunner, where we raise the actual exception that caused the step to fail).

MRJobRunner.cat_output()

Stream the job’s output, as a stream of bytes. If there are multiple output files, there will be an empty bytestring (b'') between them.

Like Hadoop input formats, we ignore files and subdirectories whose names start with "_" or "." (e.g. _SUCCESS, _logs/, .part-00000.crc.

Changed in version 0.6.8: Ignore file/dirnames starting with "." as well as "_".

MRJobRunner.cleanup(mode=None)

Clean up running jobs, temp files, and logs, subject to the cleanup option passed to the constructor.

If you create your runner in a with block, cleanup() will be called automatically:

with mr_job.make_runner() as runner:
    ...

# cleanup() called automatically here
Parameters:mode – override cleanup passed into the constructor. Should be a list of strings from CLEANUP_CHOICES
mrjob.options.CLEANUP_CHOICES = ['ALL', 'CLOUD_TMP', 'CLUSTER', 'HADOOP_TMP', 'JOB', 'LOCAL_TMP', 'LOGS', 'NONE', 'TMP']

cleanup options:

  • 'ALL': delete logs and local and remote temp files; stop cluster if on EMR and the job is not done when cleanup is run.

  • 'CLOUD_TMP': delete temp files on cloud storage (e.g. S3) only

  • 'CLUSTER': terminate the cluster if on EMR and the job is not done

    on cleanup

  • 'HADOOP_TMP': delete temp files on HDFS only

  • 'JOB': stop job if on EMR and the job is not done when cleanup runs

  • 'LOCAL_TMP': delete local temp files only

  • 'LOGS': delete logs only

  • 'NONE': delete nothing

  • 'TMP': delete local, HDFS, and cloud storage temp files, but not logs

Run Information

MRJobRunner.counters()

Get counters associated with this run in this form:

[{'group name': {'counter1': 1, 'counter2': 2}},
 {'group name': ...}]

The list contains an entry for every step of the current job.

MRJobRunner.get_hadoop_version()

Return the version number of the Hadoop environment as a string if Hadoop is being used or simulated. Return None if not applicable.

EMRJobRunner infers this from the cluster. HadoopJobRunner gets this from hadoop version. LocalMRJobRunner has an additional hadoop_version option to specify which version it simulates. InlineMRJobRunner does not simulate Hadoop at all.

MRJobRunner.get_job_key()

Get the unique key for the job run by this runner. This has the format label.owner.date.time.microseconds

Configuration

MRJobRunner.get_opts()

Get options set for this runner, as a dict.

File management

MRJobRunner.fs

Filesystem object for the local filesystem.

class mrjob.fs.base.Filesystem

Some simple filesystem operations that are common across the local filesystem, S3, GCS, HDFS, and remote machines via SSH.

Different runners provide functionality for different filesystems via their fs attribute. Generally a runner will wrap one or more filesystems with mrjob.fs.composite.CompositeFilesystem.

Schemes supported:

  • mrjob.fs.gcs.GCSFilesystem: gs://
  • mrjob.fs.hadoop.HadoopFilesystem: hdfs:// and other URIs
  • mrjob.fs.local.LocalFilesystem: paths and file:// URIs
  • mrjob.fs.s3.S3Filesystem: s3://, s3a://, s3n://,
  • mrjob.fs.ssh.SSHFilesystem: ssh://

Changed in version 0.6.12: LocalFilesystem added support for file:// URIs

can_handle_path(path)

Can we handle this path at all?

cat(path_glob)

cat all files matching path_glob, decompressing if necessary

This yields bytes, which don’t necessarily correspond to lines (see #1544). If multiple files are catted, yields b'' between each file.

du(path_glob)

Get the total size of files matching path_glob

Corresponds roughly to: hadoop fs -du path_glob

exists(path_glob)

Does the given path/URI exist?

Corresponds roughly to: hadoop fs -test -e path_glob

join(path, *paths)

Join paths onto path (which may be a URI)

ls(path_glob)

Recursively list all files in the given path.

We don’t return directories for compatibility with S3 (which has no concept of them)

Corresponds roughly to: hadoop fs -ls -R path_glob

md5sum(path)

Generate the md5 sum of the file at path

mkdir(path)

Create the given dir and its subdirs (if they don’t already exist). On cloud filesystems (e.g. S3), also create the corresponding bucket as needed

Corresponds roughly to: hadoop fs -mkdir -p path

New in version 0.6.8: creates buckets on cloud filesystems

put(src, path)

Upload a file on the local filesystem (src) to path. Like with shutil.copyfile(), path should be the full path of the new file, not a directory which should contain it.

Corresponds roughly to hadoop fs -put src path.

New in version 0.6.8.

rm(path_glob)

Recursively delete the given file/directory, if it exists

Corresponds roughly to: hadoop fs -rm -R path_glob

touchz(path)

Make an empty file in the given location. Raises an error if a non-zero length file already exists in that location.

Correponds to: hadoop fs -touchz path