Working with data files from S3 in your local pySpark environment

Temps de lecture : 11 minutes

Even though AWS provides more and more resources and possibilities to work with your data in the Cloud, some people still feeling the need to work with their data into their local pySpark environment. The idea of this article / tutorial is to show how to do that and help you to understand what happens under the hood.

This article was written in collaboration with Fabien Lallemand.

Accessing to a csv file locally

With pySpark you can easily and natively load a local csv file (or parquet file structure) with a unique command. Something like:

file_to_read="./bank.csv" 
spark.read.csv(file_to_read)

Bellow, a real example (using AWS EC2) of the previous command would be:

(venv) [ec2-user@ip-172-31-37-236 ~]$ pyspark
....
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Spark context Web UI available at http://ip-172-31-37-236.eu-west-1.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1675094700680).
SparkSession available as 'spark'.

>>> spark.read.csv("./bank.csv")
DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string, _c20: string]
>>> exit()

Here, the file is correctly converted as a Spark DataFrame.

However, this function does not work if you try to access the same file located in a S3 bucket.

In the following example, we copy our local file to an AWS S3 bucket and try to access it directly by changing the file name. we only configured our CLI with a aws configure command with nothing else.

In your bash environment, copy your file (bank.csv) to your personal bucket (here, the AWS S3 Bucket is named my-bucket-789):

    (venv) [ec2-user@ip-172-31-37-236 ~]$ aws s3 cp ./bank.csv s3://my-bucket-789/bank.csv
    upload: ./bank.csv to s3://mv-bucket-789/tmp.csv

(As you can see, AWS credentials are correctly set up, otherwise the aws s3 cp would fail).

Now, if you try to directly access your S3 file from your local computer you will have to following error:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Python version 3.7.16 (default, Dec 15 2022 23:24:54)
Spark context Web UI available at http://ip-172-31-37-236.eu-west-1.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1675100682884).
SparkSession available as 'spark'.

>>> file_to_read="s3a://my-bucket-789/bank.csv"
>>> spark.read.csv(file_to_read)

23/01/30 18:32:22 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://my-bucket_789/bank.csv.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
    ....

NOTICE: As you can see, we identify the file with a « s3a prefix ». Using a S3 prefix (as we usually do with the CLI) would generate an error:

'No FileSystem for scheme "s3"'

Installation of the Hadoop-AWS module

When pySpark is installed in your environment, only the minimum necessary packages are already installed by default.

That’s, in order to access the AWS S3 Bucket from your pySpark environment you will need to install additional Hadoop module for AWS.

Dependency installation: The easiest and fastest way

To simply solve this error you will have to add an argument to your pyspark command.

The option --packages org.apache.hadoop:hadoop-aws:3.3.2 will automatically download and install (or load to the context if is already installed) the hadoop-aws dependency into your context, if you don’t have installed yet.

By default, you will find in your spark local installation (or python lib folder if you install spark with a pip install pyspark command).

These are the Hadoop jar files:

[ec2-user@ip-172-31-37-236 venv]$ find . -name "hadoop*"
./lib/python3.7/site-packages/pyspark/jars/hadoop-client-api-3.3.2.jar
./lib/python3.7/site-packages/pyspark/jars/hadoop-client-runtime-3.3.2.jar
./lib/python3.7/site-packages/pyspark/jars/hadoop-shaded-guava-1.1.1.jar
./lib/python3.7/site-packages/pyspark/jars/hadoop-yarn-server-web-proxy-3.3.2.jar

These modules are updated synchronously with the hadoop-aws jar file. Consequently, it could be a good idea to keep the version syncrhonised as well.

However, if you want get the latest version of this module you can refer to maven repository.

Notice You will have to use the argument everytime you will launch the pyspark if you want to manipulate S3 file. Something like:

pyspark --packages org.apache.hadoop:hadoop-aws:3.3.2

Dependency installation: THE HARD WAY

If you want to simply launch you pyspark with a command pySpark, you will have to manually download and deploy the jar file.

To do this you need to determine the version of the module you want to download.

This can be accomplished by looking for the lib hadoop-client-api-X.Y.Z.jar as defined the previous chapter.

1. Get the version to install

The following example, show you how to achieve it, in the case you install pySpark in your python environment (with pip install pyspark for example).

Inside of your Python environment, go to the site-packages folder and search inside of the pyspark/jars the Hadoop client version:

cd venv/lib/python<YOUR_PYTHON_VERSION>/site-packages/pyspark/jars
ls hadoop-client-*

Output:

hadoop-client-api-X.X.X.jar  hadoop-client-runtime-X.X.X.jar

(where X.X.X is the version that is installed).

2. Download the package from MAVEN repository « hadoop-aws »

Go to https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws Select the correct version (based on what you discovered previously):

Then, download the .jar file (click in the jar in the Files line)

3. Move the .jar file to the pySpark environment

Now, copy from where the .jar file is saved to the pySpark environment, i.e.,

venv/lib/python<YOUR_PYTHON_VERSION>/site-packages/pyspark/jars

When this is done, you could open your pySpark and the package will be then installed in your environment.

Managing AWS Credentials with aws-hadoop module

You now have all the lib required to access your S3. However, even if your machine is correctly configured with an aws configure or with a role, pySpark will not be capable of accessing the AWS S3 Bucket, so you will have to make additional validation before having a functional environment.

Here is the error you could have if you directly try to access your data on S3 without any additional configuration.

[ec2-user@ip-172-31-37-236 ~]$ ./spark-3.3.1-bin-hadoop3/bin/pyspark --packages org.apache.hadoop:hadoop-aws:3.3.4
Python 3.7.16 (default, Dec 15 2022, 23:24:54) 
[GCC 7.3.1 20180712 (Red Hat 7.3.1-15)] on linux
......
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Python version 3.7.16 (default, Dec 15 2022 23:24:54)
Spark context Web UI available at http://ip-172-31-37-236.eu-west-1.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1675154949309).
SparkSession available as 'spark'.
>>> file_to_read="s3a://my-bucket-789/bank.csv"
>>> spark.read.csv(file_to_read)
23/01/31 08:49:46 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/01/31 08:49:48 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://my-bucket-789/bank.csv.
java.nio.file.AccessDeniedException: s3a://my-bucket-789/bank.csv: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
......

Use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

By Default pySpark will try to get the credentials from the OS environment variable AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY,

Consequently, if you specify these values pySpark will be able to access your AWS S3 Bucket data without any other configuration. This is how you set your variables:

export AWS_ACCESS_KEY_ID=<YOUR_ACCESS_KEY_ID>
export AWS_SECRET_ACCESS_KEY=<YOUR_SECRET_KEY>

And that’s it. Now you could try on pySpark (as example below):

[ec2-user@ip-172-31-37-236 test2]$ ./spark-3.3.1-bin-hadoop3/bin/pyspark --packages org.apache.hadoop:hadoop-aws:3.3.2
Python 3.7.16 (default, Dec 15 2022, 23:24:54) 
[GCC 7.3.1 20180712 (Red Hat 7.3.1-15)] on linux
Type "help", "copyright", "credits" or "license"for more information.
.....
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Python version 3.7.16 (default, Dec 15 2022 23:24:54)
Spark context Web UI available at http://ip-172-31-37-236.eu-west-1.compute.internal:4040
Spark context available as 'sc' (master = local[*], app id = local-1675161465632).
SparkSession available as 'spark'.
>>> file_to_read="s3a://my-bucket-789/bank.csv"
>>> spark.read.csv(file_to_read)
23/01/31 10:38:09 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string, _c20: string]
>>> exit()

Using an instance role attached to an EC2 instance

If you setup your environment in a EC2 with a role configured to access the S3 bucket, you will have nothing to do. The role will be automatically used by your pySpark.

Having a more granular control with SPARKCONF

You can have additional use case where you will need to specifically specify to Spark how to get the credentials.

This is the case for example if you want to work with multiple configuration in your aws_config file or if you want to work with AWS Cloud9.

For example, in the case you only use aws configure, the OS VARIBLES are not set.

This technique will only work for ACCESS_KEY. What about if you want to specify another credential or if you need to use a role?

In this case, the module hadoop-aws can be configured with specific properties defined in a SparkConf object and attach it to your SparkContext.

The following commands are related to the way that pySpark connects to AWS. Fore more details of these commands, please go to this reference here.

>>> from pyspark import SparkConf
>>> conf = SparkConf()
>>> conf.set('spark.hadoop.fs.s3a.access.key','1234567890123456')
>>> conf.set('spark.hadoop.fs.s3a.secret.key', 'AZERTYUIOPQSDFGHJKLMWXCVBN')
                           'fs.s3a.aws.credentials.provider', 
>>> conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
>>> conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.1')

where 1234567890123456 is my ACCESS_KEY_ID and AZERTYUIOPQSDFGHJKLMWXCVBN my SECRET_KEY_ID

Note that conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.1') is the line that install hadoop-aws into your context, if you don’t have yet.

This step will create the Spark session and install any necessary dependency.

spark = SparkSession.builder.config(conf=conf).getOrCreate()

notice you can have access tot these properties later on by using the spark session class

>> spark.conf.get('spark.hadoop.fs.s3a.secret.key')
'AZERTYUIOPQSDFGHJKLMWXCVBN'

Errors summary

During this article we encountered errors. Here is a summary:

Java ErrorsProposed Solutions
No FileSystem for scheme « s3 »replace s3 with s3a in order to identify you file (i.e. s3a://my-bucket/my-file)
Class org.apache.hadoop.fs.s3a.S3AFileSystem not foundinstall hadoop-aws lib as defined in this article
No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProviderSparkConf needs to be setup with AWS credentials

Doing it with a Real Example

Pre-requirements

  • AWS Account with the Access key configured. More details about how to do that you could find here.
  • AWS CLI configured in your (local) machine. More details of how to do that could be found here.
  • Python environment configured and the pySpark properly installed. More details for how to create your Python enviroment here and how to install pySpark here.
  • Java installed. More details of how to install could be find here.

The data

The data that we would like to recover from the S3 looks like that (example2.json):

[{
    "RecordNumber": 2,
    "Zipcode": 704,
    "ZipCodeType": "STANDARD",
    "City": "PASEO COSTA DEL SUR",
    "State": "PR"
  },
  {
    "RecordNumber": 10,
    "Zipcode": 709,
    "ZipCodeType": "STANDARD",
    "City": "BDA SAN LUIS",
    "State": "PR"
  }]

This file is already uploaded into our AWS account. If we go to our AWS account, we could see the data on the S3:

So, the S3 name is: hand-on-from-s3-to-local-pyspark and the file name is example2.json.

The local environment (PySpark)

The code

Now, it’s time to code!

1. Set AWS_ACCESS_KEY_ID and AWS_SECRET_KEY Environment Variables

Set these Environment Variables with your AWS Access Key and Secret Key, respectively into your context.

2. Launch pySpark

Launch your pySpark:

pyspark --packages org.apache.hadoop:hadoop-aws:3.3.2

This is what will prompt into your screen

:: loading settings :: url = jar:file:/XXX/XXX/XXX/XXX/XXX/venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /XXX/XXX/.ivy2/cache
The jars for the packages stored in: /XXX/XXX/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0c4387cc-52a9-454e-a719-c1ac031269cf;1.0
        confs: [default]
        found org.apache.hadoop#hadoop-aws;3.3.2 in central
        found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
        found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 113ms :: artifacts dl 3ms
        :: modules in use:
        com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
        org.apache.hadoop#hadoop-aws;3.3.2 from central in [default]
        org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0c4387cc-52a9-454e-a719-c1ac031269cf
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/5ms)
22/10/27 14:21:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

3. Read the data from S3 to local pySpark dataframe

It’s time to get our .json data! Note that our .json file is a multiline type. So the read method bellow is adapted to grab that properly. More info about different spark.read methods for .json format could be find here.

multiline_df = spark.read.option("multiline","true").json("s3a://hand-on-from-s3-to-local-pyspark/example2.json")
multiline_df.show() 

output:

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+

On that way, we had access our S3 data into our local pySpark environment!

The local environment (Python)

For this example, it was created a Python 3.9 local environment called venv. The pySpark and (pip install pysparkjupyterlab (pip install jupyterlab) were also installed. The idea is to use jupyter notebooks to run our pySpark code.

To handle the AWS Access key Credentials, a library called dotenv was installed (python -m pip install python-dotenv) and the .env file was created/configured into our local context to store the AWS Credentials. This process allow us to access the information from .env inside of our jupyter notebook without ‘hard-code’ that inside of our code (Don’t forget to double check if your .gitignore file is up-to-date to not include the .env into your commits). For more details of how to use it, please check here.

The code

Now, it’s time to code!

In your Python environment (or .py file):

1. Import the necessary libraries

from dotenv import load_dotenv

import os

from pyspark import SparkConf
from pyspark.sql import SparkSession

2. Load the .env to the jupyter notebook

load_dotenv()

3. Create the custom Spark configuration

This configuration will allow Spark to access the AWS S3 service.

(Note that the os.environ['aws_access_key_id'] and os.environ['aws_secret_access_key'] go the local .env file to grab the AWS Access Key ID and Secret Access Key respectively).

conf = SparkConf()
conf.set('spark.hadoop.fs.s3a.access.key', os.environ['aws_access_key_id'])
conf.set('spark.hadoop.fs.s3a.secret.key', os.environ['aws_secret_access_key'])
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')

4. Create the Spark session with our custom configuration

spark = SparkSession.builder.config(conf=conf).getOrCreate()

Note: As you create this pySpark session, the necessary packages will be stored in a temporary folder:

:: loading settings :: url = jar:file:/XXX/XXX/XXX/XXX/XXX/XXX/venv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /XXX/XXX/.ivy2/cache
The jars for the packages stored in: /XXX/XXX/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fbd04e0b-1d2c-4591-a09f-a68708c38466;1.0
        confs: [default]
        found org.apache.hadoop#hadoop-aws;3.2.0 in central
        found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
:: resolution report :: resolve 324ms :: artifacts dl 4ms
        :: modules in use:
        com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
        org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-fbd04e0b-1d2c-4591-a09f-a68708c38466
        confs: [default]
        0 artifacts copied, 2 already retrieved (0kB/6ms)

5. Read the data from S3 to local pySpark dataframe

It’s time to get our .json data! Note that our .json file is a multiline type. So the read method bellow is adapted to grab that properly. More info about different spark.read methods for .json format could be find here.

multiline_df = spark.read.option("multiline","true").json("s3a://hand-on-from-s3-to-local-pyspark/example2.json")
multiline_df.show() 

Output:

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+

On that way, we had access our S3 data into our local pySpark environment!

Commentaires :

A lire également sur le sujet :