- Introduction to Hadoop Streaming and Elastic MapReduce
- Simple Elastic MapReduce demo
- More complex case and preprocessing of data
- Programming Elastic MapReduce
Utility that allows running MapReduce jobs with any executable or script as the mapper and/or the reducer
cat input_data.txt | mapper.py | sort | reducer.py > output_data.txt
hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDataDirectories \
-output myOutputDataDirectory \
-mapper myMapperProgram.py \
-reducer myReducerProgram.py
--
- MapReduce cluster as a service
- Managed via a web interface or API
- Can run either Amazon-optimized Hadoop or MapR
- Pre-installed Hive, Pig, Spark, Hue, Mahout, Presto...
--
--
- Hadoop version and pre-installed applications
- Computing capacity
- List of work steps
Step is a unit of work submitted to the cluster. It might contain Hadoop job(s) or instructions to install an application.
- Mapper and reducer program locations
- Data input and output locations
--
--
The endlessly fascinating example of counting words in Hadoop Getting started tutoria
--
WordSplitter.py (mapper)
Mapper
Input
#!/usr/bin/python
import sys
import re
pattern = re.compile("[a-zA-Z][a-zA-Z0-9]*")
for line in sys.stdin:
for word in pattern.findall(line):
print "LongValueSum:" + word.lower() + "\t" + "1"
LongValueSum:i 1
LongValueSum:count 1
LongValueSum:words 1
LongValueSum:with 1
LongValueSum:hadoop 1
--
- Master: Node that manages the cluster (e.g. runs YARN ResourceManager and HDFS NameNode services)
- Core: Nodes that run tasks and store HDFS data. Cannot be removed from the cluster.
- Task: Optional nodes that run tasks, but do not store HDFS data. Can be flexibly added and removed.
Task nodes are ideal for using EC2 Spot pricing.
--
- Logging, debugging
- Tags: keywords for the cluster resources
- Additional applications and configuration
- Filesystem: encryption, consistency
- Number and type of nodes, spot price
- Security and Access: SSH keys, node access roles
- Bootstrap actions: scripts to customize the cluster
--
- Amazon offers file storage service called S3.
- S3 is not a file system, it is a RESTish object storage.
- S3 has eventual consistency: files written to S3 might not be immediately available for reading.
http://wiki.apache.org/hadoop/AmazonS3
--
- EMR offers 2 file systems: HDFS and EMRFS.
- HDFS should be used to cache results of intermediate steps.
- HDFS is always used in master and core nodes.
- HDFS storage is lost when the clusters ends its execution.
--
- EMRFS is an implementation of HDFS, with reading and writing of files directly to S3.
- EMRFS can be configured to encrypt files in S3 and monitor consistancy of files, which can detect event that try to use inconsistant files.
--
- Digitraffic is a service offering real time information and data about the traffic, weather and condition information on the Finnish main roads.
- The service is provided by the Finnish Transport Agency (Liikennevirasto), and produced by Gofore and Infotripla.
--
- Our data consists of traffic fluency information, i.e. how quickly vehicles have been identified to pass through a road segment (a link).
- Data is gathered with camera-based Automatic License Plate Recognition (ALPR), and more recently with mobile-device-based Floating Car Data (FCD).
--
--
<link>
<linkno>310102</linkno>
<startsite>1108</startsite>
<endsite>1107</endsite>
<name language="en">Hallila -> Kaukajärvi</name>
<name language="fi">Hallila -> Kaukajärvi</name>
<name language="sv">Hallila -> Kaukajärvi</name>
<distance>
<value>3875.000</value>
<unit>m</unit>
</distance>
</link>
Static link information (271kb xml)
642 one-way links, 243 sites
--
<ivjtdata duration="60" periodstart="2014-06-24T02:55:00Z">
<vehicle-recognitions>
<link id="110302" data-source="1">
<recognition offset="8" travel-time="152"></recognition>
<recognition offset="36" travel-time="155"></recognition>
</link>
<link id="410102" data-source="1">
<recognition offset="6" travel-time="126"></recognition>
<recognition offset="45" travel-time="152"></recognition>
</link>
<link id="810502" data-source="1">
<recognition offset="25" travel-time="66"></recognition>
<recognition offset="34" travel-time="79"></recognition>
<recognition offset="35" travel-time="67"></recognition>
<recognition offset="53" travel-time="58"></recognition>
</link>
</vehicle-recognitions>
</ivjtdata>
Each file contains finished passthroughs for each road segment during one minute.
--
- 6.5 years worth of data from January 2008 to June 2014
- 3.9 million XML files (525600 files per year)
- 6.3 GB of compressed archives (with 7.5GB of additional median data as CSV)
- 42 GB of data as XML (and 13 GB as CSV)
--
- Unpacked the tar.gz archives and uploaded the XML files as such to S3 (using AWS CLI tools).
- Turns out (4 million 11kB) small files with Hadoop is not fun. Hadoop does not handle well with files significantly smaller than the HDFS block size (default 64MB) [1] [2] [3]
- And well, XML is not fun either, so...
--
- Wrote scripts to parse XML, munge and upload JSON data
- Concatenated data into one-month files (180-540MB each)
- In the concatenated file, each line is a valid JSON object. Mapper processes one line, one object at a time.
- Munging XML worth of 6.5 years takes 8.5 hours on a single t2.medium instance. Size reduced to 60% of the original XML.
--
{
"sites": [
{
"id": "1205",
"name": "Viinikka",
"lat": 61.488282,
"lon": 23.779057,
"rno": "3495",
"tro": "3495/1-2930"
}
],
"links": [
{
"id": "99001041",
"name": "Hallila -> Viinikka",
"dist": 5003.0,
"site_start": "1108",
"site_end": "1205"
}]
}
Static link information (120kb json)
--
{
"date": "2014-06-01T02:52:00Z",
"recognitions": [
{
"id": "4510201",
"tt": 117,
"cars": 4,
"itts": [100, 139, 121, 110, ...]
},
{ ... }, { ... }, ...
]
}
This object is actually a single line in the munged data.
Formatted for readability.
--
- AWS Command line tools
- SDKs like boto for Python
--
#!/usr/bin/env python
import boto.emr
from boto.emr.instance_group import InstanceGroup
# Requires AWS API credentials exported as env variables
connection = boto.emr.connect_to_region('eu-west-1')
1 of 4
--
instance_groups = []
instance_groups.append(InstanceGroup(
role="MASTER", name="Main node",
type="m1.medium", num_instances=1,
market="ON_DEMAND"))
instance_groups.append(InstanceGroup(
role="CORE", name="Worker nodes",
type="m1.medium", num_instances=3,
market="ON_DEMAND"))
instance_groups.append(InstanceGroup(
role="TASK", name="Optional spot-price nodes",
type="m1.medium", num_instances=20,
market="SPOT", bidprice=0.012))
2 of 4
--
cluster_id = connection.run_jobflow(
"Our awesome cluster",
instance_groups=instance_groups,
action_on_failure='CANCEL_AND_WAIT',
keep_alive=True,
enable_debugging=True,
log_uri="s3://bucket/logs/",
ami_version="3.3.1",
bootstrap_actions=[],
ec2_keyname="name-of-our-ssh-key",
visible_to_all_users=True,
job_flow_role="EMR_EC2_DefaultRole",
service_role="EMR_DefaultRole")
3 of 4
--
steps = []
steps.append(boto.emr.step.StreamingStep(
"Our awesome streaming app",
input="s3://bucket/our-input-data",
output="s3://bucket/our-output-path/",
mapper="our-mapper.py",
reducer="aggregate",
cache_files=[
"s3://bucket/programs/our-mapper.py#our-mapper.py",
"s3://bucket/data/our-dictionary.json#our-dictionary.json",)
],
action_on_failure='CANCEL_AND_WAIT',
jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'))
connection.add_jobflow_steps(cluster_id, steps)
4 of 4
--
import boto.emr
from boto.emr.instance_group import InstanceGroup
# Create a connection to AWS
connection = boto.emr.connect_to_region('eu-west-1')
# Create a new EMR cluster
cluster_id = connection.run_jobflow(**cluster_parameters)
# Add steps to the cluster
connection.add_jobflow_steps(cluster_id, **steps_parameters)
--
# Create new cluster
aws-tools/run-jobs.py
create-cluster
"Car speed counting cluster"
Starting cluster
j-F0K0A4Q9F5O0 Car speed counting cluster
# Add job step to the cluster
aws-tools/run-jobs.py
run-step
j-F0K0A4Q9F5O0
carspeeds.py
digitraffic/munged/links-by-month/2014
Step will output data to
s3://hadoop-seminar-emr/digitraffic/outputs/carspeeds.py/
--
# Download and concatenate output
aws s3 cp
s3://hadoop-seminar-emr/digitraffic/outputs/carspeeds.py/
/tmp/emr
--recursive
--profile hadoop-seminar-emr
cat /tmp/emr/part-* > /tmp/emr/output
# Analyze results
05-car-speed-for-time-of-day_output.py
/tmp/emr/output
example-data/locationdata.json
--
--
--
- We only used subset of whole dataset due to interest in EMR instead of analyzing the data
- EMR/Hadoop creates overhead which are substantial with small files
- Even after munging files together, our subset took considerably longer in EMR compared to local parsing
- "Too small problem for EMR/Hadoop"
--
- Make sure your problem is big enough for Hadoop
- Munging wisely makes streaming programs easier and faster
- Always use Spot instances with EMR
--
Presentation and source code are available at
https://github.com/gofore/aws-emr





