Post

Running Recommendation Engine Spark Job on EMR

Abstract

Amazon Elastic MapReduce (EMR) is a managed big data service that simplifies the process of processing and analyzing large datasets using popular frameworks like Apache Spark, Hadoop, and others. In this post, we’ll focus on using EMR with Apache Spark to read data from Amazon S3, a scalable object storage service offered by AWS.

Target Infrastructure

infra.png

EMR Setup

Frameworks and versions configuration

Setting up an EMR cluster from the AWS Console is a easy thanks to the intuitive wizard steps. The latest available EMR version, emr-6.13.0, ensures you have access to the most up-to-date features and improvements.

During the configuration process, you have the flexibility to choose from a variety of frameworks. These frameworks are automatically provisioned and ready to use on your EMR cluster. Should you require a specific version, you can easily adjust it to meet your needs.

One of the standout features of EMR is the provision of preconfigured installations. AWS offers a selection of installations with different engines and compatible versions that have undergone rigorous testing and alignment. This is a game-changer in the Big Data landscape, as it eliminates the challenges and BIG pain in Big Data World of assembling a distribution with all frameworks precisely aligned to dedicated versions and simplifies the upgrade process. With EMR, everything is prepped and ready for seamless use.

img.png

Primary node capacity

The primary node serves as the central management hub for the cluster. It oversees critical components of distributed applications, including the YARN ResourceManager service for resource management. Additionally, the primary node is responsible for running the HDFS NameNode service, keeping track of job statuses, and monitoring the overall health of the instance groups.

img_1.png

Core node capacity

Under the primary node’s management, core nodes play a pivotal role in data coordination within the Hadoop Distributed File System (HDFS). They execute the Data Node daemon, which handles data storage tasks. Core nodes are also responsible for running the Task Tracker daemon, performing parallel computations required by installed applications. For instance, a core node hosts YARN NodeManager daemons, executes Hadoop MapReduce tasks, and handles Spark executors.

Task node capacity

Task nodes are your go-to choice when you need additional processing power. They excel at executing tasks like Hadoop MapReduce tasks and Spark executors. Unlike core nodes, task nodes do not run the Data Node daemon, nor do they store data in HDFS. To incorporate task nodes into your cluster, you can either add Amazon EC2 instances to an existing uniform instance group or adjust target capacities for a task instance fleet. This flexibility allows you to fine-tune your cluster’s computational capabilities to meet specific workload requirements.

img_2.png

Cluster size

Based on workload time you can configure and tune cluster capacity with autoscaling options.

img_3.png

Acces to S3 with Data

img_5.png

Enable access to Nodes from your machine

By default, when EMR is created there is no access to nodes for end-user. We need to edit Security Group of the Primary Node and add appropriate rule to access EC2 instance.

After configuring SG we can ssh into Primary node of EMR:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 ssh -i ~/ssh.pem hadoop@ec2-xx-xxx-xxx-xx.eu-west-1.compute.amazonaws.com
   ,     #_
   ~\_  ####_        Amazon Linux 2
  ~~  \_#####\
  ~~     \###|       AL2 End of Life is 2025-06-30.
  ~~       \#/ ___
   ~~       V~' '->
    ~~~         /    A newer version of Amazon Linux is available!
      ~~._.   _/
         _/ _/       Amazon Linux 2023, GA and supported until 2028-03-15.
       _/m/'           https://aws.amazon.com/linux/amazon-linux-2023/

5 package(s) needed for security, out of 11 available
Run "sudo yum update" to apply all updates.

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

[hadoop@ip-xxx-xx-xx-xx ~]$

Access to YARN, Spark History:

You can use ssh tunnel to expose EMR ports with YARN, Spark to you local machine and access them. But in the latest EMR UI version AWS Console provides link to navigate directly to provisioned proxy instances.

1
ssh -i ~/key.cer -L 3000:ec2-xx-xxx-xxx-xx.eu-west-1.compute.amazonaws.com:8088 hadoop@ec2-xx-xxx-xxx-xx.eu-west-1.compute.amazonaws.com

img_6.png

1
2
3
4
ssh -i ~/key.cer  -L 4040:SPARK_UI_NODE_URL:4040 hadoop@ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com
ssh -i ~/key.cer -N -L 20888:ip-xx-xx-xx-xx.your-region.compute.internal:20888 hadoop@ec2-xxx.compute.amazonaws.com
ssh -i ~/key.cer -L 3000:ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com:8088 hadoop@ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com
ssh -i ~/key.cer -L 4040:ec2-xx-xx-xx-xx.eu-west-1.compute.amazonaws.com:4040 hadoop@ec2-xxx-xx-xx-xx.eu-west-1.compute.amazonaws.com

Running Sample Spark Job

To test EMR Spark functionality we will start with predefided Spark sample app, let’s dive into its details.

1
cp /usr/lib/spark/examples/src/main/python/ml/als_example.py ./

Spark Load Data from HDFS, splited based on :: delimiter to userID, movieID, rating, timestamp:

1
2
3
4
5
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=int(p[3])))
    ratings = spark.createDataFrame(ratingsRDD)

Spark Train Model for recommendations using ALS on training Data:

1
2
3
    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
    model = als.fit(training)

Predictions, Model Evaluation:

  • generate top 10 movie recommendations for each user
  • generate top 10 user recommendations for each movie
1
2
3
4
5
6
7
8
    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))

    userRecs = model.recommendForAllUsers(10)
    movieRecs = model.recommendForAllItems(10)
  • generate top 10 movie recommendations for a specified set of users
  • generate top 10 user recommendations for a specified set of movies
1
2
3
4
5
6
    users = ratings.select(als.getUserCol()).distinct().limit(3)
    userSubsetRecs = model.recommendForUserSubset(users, 10)

    movies = ratings.select(als.getItemCol()).distinct().limit(3)
    movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Load data to HDFS

DataSet is available at https://github.com/tsypuk/samples/tree/main/emr to copy it to HDFS we are using hadoop cli

1
2
hadoop fs -mkdir -p /user/hadoop/data/mllib/als
hadoop fs -copyFromLocal /usr/lib/spark/data/mllib/als/sample_movielens_ratings.txt /user/hadoop/data/mllib/als/sample_movielens_ratings.txt

Optional Stage

Modify spark Job to have less output in logs

1
spark.sparkContext.setLogLeve("ERROR")

Submitting Spark Job

1
2
3
4
5
6
7
[hadoop@ip-xxx-xx-xx-xx ~]$ spark-submit als_example.py
23/10/18 14:03:34 INFO SparkContext: Running Spark version 3.4.1-amzn-0
23/10/18 14:03:34 INFO ResourceUtils: ==============================================================
23/10/18 14:03:34 INFO ResourceUtils: No custom resources configured for spark.driver.
23/10/18 14:03:34 INFO ResourceUtils: ==============================================================
23/10/18 14:03:34 INFO SparkContext: Submitted application: ALSExample
23/10/18 14:03:34 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 9486, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)

Results and Status are available on UI

img_7.png

Output of results in console:

Here are recommendations for each user for current DataSet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     4|[{41, 4.165862}, ...|
|    14|[{76, 4.606556}, ...|
|    24|[{69, 5.097545}, ...|
|     5|[{55, 4.787237}, ...|
|    15|[{46, 4.87731}, {...|
|    25|[{38, 4.6747584},...|
|     6|[{62, 4.7941403},...|
|    16|[{85, 5.0937176},...|
|    26|[{74, 5.545739}, ...|
|    27|[{18, 4.022548}, ...|
|    17|[{27, 5.4436226},...|
|     7|[{25, 4.7733192},...|
|    20|[{22, 4.6535172},...|
|     0|[{92, 3.967172}, ...|
|    10|[{2, 3.6862848}, ...|
|     1|[{68, 3.9590843},...|
|    11|[{27, 4.9718685},...|
|    21|[{53, 4.8786216},...|
|    12|[{46, 6.7249556},...|
|     2|[{83, 5.306094}, ...|
+------+--------------------+
only showing top 20 rows

Switching EMR Spark to query AWS S3

It was sampled Spark job that works with HDFS, but for EMR AWS S3 is available

DataSet located S3 bucket (with prefixed virtual path based on timeline) we will use from the previous post Connecting Kinesis Firehose DataStream with aws-kinesis-agent to ingest Log Data into AWS S3

To make EMR Spark work with S3 we need to modify the Job file:

1
2
3
4
5
6
7
8
lines = spark.read.text("s3://kl-bucket-2023/year=2023/month=10/day=18/hour=13/*").rdd
parts = lines.map(lambda row: row.value.split(','))
#Filter out postage, shipping, bank charges, discounts, commissions
productsOnly = parts.filter(lambda p: p[1][0:5].isdigit())
#Filter out empty customer ID's
cleanData = productsOnly.filter(lambda p: p[6].isdigit())
ratingsRDD = cleanData.map(lambda p: Row(customerId=int(p[6]), \
itemId=int(p[1][0:5]), rating=1.0))

Troubleshooting the issue

I case you receive you receiving following error when running Job:

1
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;

Try to check if EMR has access to S3 by running the following cli command:

1
2
hdfs dfs -ls s3://kl-bucket-2023/year=2023/month=10/day=18/hour=13/*
ls: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied

If error - check Service Role and EC2 instance profile - they should have access to your s3 bucket. Once permissions are setup, you should see the proper bucket listing:

1
2
3
4
[hadoop@ip-xx-xx-xx-xx ~]$ hdfs dfs -ls s3://kl-bucket-2023hdfs dfs -ls s3://kl-bucket-2023/year=2023/month=10/day=18/hour=13/*
-rw-rw-rw-   1 hadoop hadoop     423974 2023-10-18 13:02 s3://kl-bucket-2023/year=2023/month=10/day=18/hour=13/terraform-kinesis-firehose-logs-s3-stream-1-2023-10-18-13-01-08-e7f44bf7-b2e0-43b7-ad1c-11d74c7383fa
-rw-rw-rw-   1 hadoop hadoop     837497 2023-10-18 13:13 s3://kl-bucket-2023/year=2023/month=10/day=18/hour=13/terraform-kinesis-firehose-logs-s3-stream-1-2023-10-18-13-12-31-efad7e9e-89a1-4478-9a09-b954dd83c48d
-rw-rw-rw-   1 hadoop hadoop    1257779 2023-10-18 13:14 s3://kl-bucket-2023/year=2023/month=10/day=18/hour=13/terraform-kinesis-firehose-logs-s3-stream-1-2023-10-18-13-13-43-35e7b0b8-90a4-449a-b373-82f14e28032f

Running Spark Job on top of S3 Data

After submitting Spark Job, now it works and queries Data from partitions prefixes S3 Bucket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+----------+--------------------+
|customerId|     recommendations|
+----------+--------------------+
|     15992|[{22983, 1.283649...|
|     15237|[{21064, 1.094786...|
|     17668|[{85114, 1.253556...|
+----------+--------------------+

+------+--------------------+
|itemId|     recommendations|
+------+--------------------+
| 21209|[{17119, 1.570859...|
| 22129|[{16402, 0.956514...|
| 22429|[{16781, 1.282029...|
+------+--------------------+

Acknowledgements

EMR makes it easy to provision resources, scale instances submit and monitor jobs. Regarding storage besides standard HDFS it provides EMRFS - a robust implementation of Hadoop Distributed File System (HDFS) used by all Amazon EMR clusters. It empowers clusters to seamlessly read and write regular files from Amazon EMR directly to Amazon S3, offering a blend of convenience and powerful features.

This post is licensed under CC BY 4.0 by the author.