SIMR

Spark In Map Reduce




Ahir Reddy
Databricks

What is SIMR?

SIMR is a Hadoop Map Reduce job

It runs arbitrary jars

It wraps normal Spark jobs to deploy them on clusters where users are prohibited from deploying Spark natively

Hadoop Map Reduce acts as a resource manager that allocates mapper slots to SIMR 

There is little change required for existing jobs

Using SIMR

Running a job is as simple as:
 ./simr my_job.jar my_class %spark_url% parameters

The STDOUT and STDERR of your job will be relayed to your console

It will also be written to log files on HDFS, along with the logs of all the worker nodes.

You can specify this directory with --outdir , otherwise it will be generated by default using the current time stamp.

Spark Shell

SIMR is also packaged with the Spark shell
 ./simr --shell

The shell is simply a job jar like any other

Restructuring a Job

It only takes a few changes to run a Spark job on SIMR

These changes reflect necessary setup and tear down

Spark Driver URL

A Spark job must take the driver url as a command line argument

This URL must be passed to the SparkContext constructor
 val sc = new SparkContext(master_url, "Job Name")

When running a job under SIMR simply pass %spark_url% as the master url argument
 ./simr example_job.jar example_class %spark_url%

SIMR will replace %spark_url% in any parameter list with the real driver url before passing it to the job jar

Cleaning Up the Spark Context

 To ensure that your Spark jobs terminate without errors, you must end your Spark programs by calling stop() on SparkContext

In the case of the Spark examples, this usually means adding spark.stop() at the end of main()




We've talked about using SIMR,

but how does it work?

SIMR Overview


Deploy Map Reduce Job

Leader Election

Relay Server

Spark Driver

Workers

Relay Client

Job Execution and Cleanup

Deploying SIMR


SIMR launches a Map Reduce job that only contains mappers

It ships the user specified job jar to each of the mappers

It also ensures that jumbo jars containing Scala and Spark are shipped to each of the mappers.

The classpath on mappers is structured such that the shipped jars appear first.

All logs and temporary files (leader election, driver url, relay url) are written to HDFS. If the directory is not specified via the command line, it will be generated based on the current time.

Leader Election

Once the mappers are all running with the right dependencies in place they engage in a leader election

Election files are housed in a directory on HDFS

One mapper is elected as the Relay Server and the rest are workers

Relay Server

Responsible for executing the job jar

It sends its own address to the Relay Client via HDFS, and subsequently communicates with the client with RPCs

It rewrites the parameter list, replacing %spark_url% with a simr://path url

Directs STDIN from the Relay Client to the main class

Redirects STDOUT/STDERR from the main class to HDFS and the Relay Client

Executes the specified main class with the modified parameter list

Spark Driver


Upon execution of the main class, the simr://path master url will make its way to the invocation of the job's SparkContext


This will launch the new SimrSchedulerBackend, which will write its own address to a DRIVERURL on HDFS


The backend then waits for executors to connect before the job can proceed.

Workers

Unelected mappers act as worker nodes

Worker nodes poll HDFS for the DRIVERURL written by the SimrSchedulerBackend

These workers then invoke Spark Executors which connect to the Spark Driver

Relay Client


The Relay Client is invoked in the user's console

Similar to worker map tasks, it polls the HDFS for the RELAYURL, and connects to the Relay Server upon finding it.

It allows the user to send STDIN and receive STDOUT and STDERR from jobs. 

Job Execution and Cleanup

Upon connection of the Spark Executors, the main class invoked by the Relay Server will proceed executing as a normal Spark Job

The STDOUT/STDERR of the driver and executors are written to HDFS

At the end of the job, calling stop() on the SparkContext will send a termination message to each Executor and shutdown the driver map task

Upon receiving a termination message, each executor will end its own map task

Once all map tasks have ended, the Map Reduce job will complete and temporary files on HDFS will be removed




SIMR



SIMR Slides

By Ahir Reddy

SIMR Slides

  • 3,339
Loading comments...

More from Ahir Reddy