dev-resources.site
for different kinds of informations.
Enhancing Data Security with Spark: A Guide to Column-Level Encryption - Part 2
This post describes how you can build an AWS Glue ingestion job with PySpark aes_encrypt()
function to encrypt sensitive columns. It is part of a series that shows how column-level encryption can be deployed at scale using AWS Glue, AWS KMS and Amazon Athena or Amazon Redshift.
Introduction
In this post, I demonstrate setting up an AWS Glue ingestion job to encrypt sensitive columns using AWS KMS. We will also explore key management approaches and their impact on organizational security practices.
In the previous post, I introduced column-level encryption using a Jupyter Notebook and a static AWS KMS-generated data key. While useful for learning, hardcoding encryption keys is insecure and impractical for production. Instead, encryption keys should be dynamically managed and accessed securely at runtime.
AWS Glue is a "scalable, serverless data integration service that simplifies data discovery, preparation, and combination for analytics, machine learning, and application development." Glue's ability to attach IAM roles to jobs allows seamless interaction with other AWS services like S3 and KMS, enabling tasks such as data ingestion, manipulation (e.g., encryption/decryption), and storage.
We will also address key management questions: How should your Glue job encrypt sensitive columns? Where should encryption material be stored? Who should have access to it?
Let’s dive in!
Getting Started
Prepare Your Environment
To set up your AWS Glue for the first time, from the AWS Management Console:
- Open the AWS Glue console and select Prepare your account for AWS Glue.
- You can ignore Choose IAM users and roles for AWS Glue and choose Next if you'll be perfoming the next steps using your current role. Otherwise, select the IAM roles or users (I don't recommend using IAM users) that need to have access.
- Under Grant Amazon S3 access, choose Next unless you want to edit the options selected by default. For the sake of this article, I granted access to all my S3 buckets. You will not do the same in a Production setting.
- Under Choose a default service role, keep the default settings and choose Next, unless you have an existing IAM role for Glue.
- Review and confirm your changes by choosing Apply changes.
To start building your ingestion job, create a Glue notebook by following the steps below.
- From the AWS Glue console, on the left pane, under ETL jobs, choose Notebooks.
- On the page Create job, select Notebook under Author using an interactive code notebook.
- On the Notebook pop-up, keep the default settings and choose Create notebook.
AWS Glue will spin-up a Glue Studio notebook for you. Start by running the cell that initializes the Glue job and wait for Waiting for session <GUID> to get into ready status...
. Your notebook is ready !
Encrypting Sensitive Columns with aes_encrypt()
Now that your Glue job is ready, let’s encrypt specific columns in your dataset using the PySpark function aes_encrypt()
. We’ll build a reusable Python module, ColEncrypt
, to handle both encryption and decryption, simplifying column-level encryption management in your Glue jobs.
Key Components
- KeyManager: Manages the creation and decryption of Data Encryption Keys (DEKs) using AWS KMS.
- ColEncrypt: Handles column encryption and decryption, leveraging PySpark's built-in AES functions.
These two components work together to provide a flexible, an almost production-ready solution for column-level encryption with better error handling and monitoring.
The source code is here.
Setting Up Key Management
Hardcoding encryption keys is a bad practice. Instead, use AWS KMS to generate and decrypt DEKs securely at runtime. Here’s how KeyManager
handles this:
Here's how you can accomplish this in the class KeyManager
.
class KeyManager:
def __init__(self, kms_client):
self.kms_client = kms_client
def generate_data_key(self, key_id: str) -> bytes:
try:
response = self.kms_client.generate_data_key(KeyId=key_id, KeySpec="AES_256")
return response["CiphertextBlob"]
except ClientError as err:
logger.error(f"Failed to generate data key: {err}")
raise
def decrypt_data_key(self, encrypted_key: bytes) -> bytes:
try:
response = self.kms_client.decrypt(CiphertextBlob=encrypted_key)
return response["Plaintext"]
except ClientError as err:
logger.error(f"Failed to decrypt data key: {err}")
raise
The generate_data_key()
method fetches an encrypted DEK from KMS, while decrypt_data_key()
decrypts it for use in encryption tasks. This ensures a secure, scalable, and auditable approach to key management.
Encrypting Columns with ColEncrypt
The ColEncrypt class applies AES encryption to specified columns, using DEKs managed by KeyManager.
encrypt()
Method
This method handles the encryption process:
class ColEncrypt:
def encrypt(self):
key_manager = KeyManager(self.kms_client)
for column in self.columns:
dek = key_manager.generate_data_key(self.key_id)
decrypted_dek = key_manager.decrypt_data_key(dek)
dek_b64 = b64.b64encode(dek).decode("utf-8")
self.df = self.df.withColumn(
"key", lit(decrypted_dek)
).withColumn(
column,
concat(lit(dek_b64 + "::"), base64(expr(f"aes_encrypt({column}, key)")))
).drop("key")
return self.df
Key Points
- Base64 Encode Encrypted DEK:
- The encrypted DEK is encoded using Base64 for storage as a string.
- The prefix format is
encrypted_dek_b64::encrypted_column_value
.
- Concatenation with Separator:
- The concat function adds the
encrypted_dek_b64
followed by a separator (::
) to the encrypted column value.
- The concat function adds the
- Temporary Key Column:
- A temporary key column is used to store the decrypted DEK during encryption, and it's dropped afterward.
Example Output
For a column named sensitive_column
:
- Original Value:
12345
- Encrypted Column:
ENCRYPTED_DEK_BASE64::ENCRYPTED_VALUE
Usage Example
Here’s how you can use ColEncrypt in your Glue job:
from ColEncrypt import ColEncrypt
# Load the data frame
df = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": ["s3://your-bucket/your-data.csv"]},
format="csv",
format_options={"withHeader": True}
).toDF()
# Specify columns to encrypt
kms_client = boto3.client("kms", region_name="us-east-2")
columns_to_encrypt = ["column_1",..., "column_n"]
# Initialize and encrypt
encryptor = ColEncrypt(df, columns_to_encrypt, "alias/your-kms-key", kms_client, "arn:aws:iam::123456789012:role/YourRole")
encrypted_df = encryptor.encrypt()
- Initialize the
ColEncrypt
class with your DataFrame, sensitive columns, KMS Key ID, and KMS client or IAM Role if you want Glue to use different credentials to interact with the KMS key. - Call the
encrypt()
method with or without specifying additional columns.
This approach ensures the encrypted DEK is stored alongside the encrypted value, enabling efficient decryption during data processing. You can generate and/or fetch the DEKs from a centralized Key Store as well, which will not be covered in this post.
Decrypting Columns with ColEncrypt
To decrypt, extract the DEK from the first row of the encrypted column, decrypt it using KMS, and apply aes_decrypt()
.
decrypt()
Method
class ColEncrypt:
def decrypt(self):
key_manager = KeyManager(self.kms_client)
for column in self.columns:
first_row = self.df.select(column).first()
dek_b64, _ = first_row[column].split("::", 1)
encrypted_dek = b64.b64decode(dek_b64)
decrypted_dek = key_manager.decrypt_data_key(encrypted_dek)
decrypted_dek_b64 = b64.b64encode(decrypted_dek).decode("utf-8")
self.df = self.df.withColumn(
column,
expr(f"aes_decrypt(unbase64(split({column}, '::')[1]), unbase64('{decrypted_dek_b64}'))").cast("string")
)
return self.df
Usage Example
Here is an example of how you can decrypt a previously encrypted data frame using the same KMS key.
decryptor = ColEncrypt(encrypted_df, columns_to_encrypt, "alias/your-kms-key", kms_client)
decrypted_df = decryptor.decrypt()
With these methods, you can easily decrypt sensitive data columns for downstream processing or analysis.
Validating Data Integrity
After decryption, validate that the original and decrypted data match:
if original_df.subtract(decrypted_df).isEmpty() and decrypted_df.subtract(original_df).isEmpty():
print("Original and decrypted files are identical!")
else:
print("Original and decrypted files differ!")
This step ensures your encryption and decryption workflows are functioning correctly and that no data is lost or altered.
Conclusion
Column-level encryption with AWS Glue, PySpark, and AWS KMS ensures secure data handling. By implementing ColEncrypt
, you can streamline encryption and decryption while adhering to best practices for key management. This approach not only secures sensitive data but also provides a scalable, auditable framework for enterprise-grade security.
Stay tuned for the last part of this series, where I'll explore secure analytics on encrypted data with Amazon Redshift.
Featured ones: