created_at timestamp it processed to a small JSON file in S3. The next run reads that watermark and only fetches rows created after it.
year/month partition columns, and writes Parquet files to S3.
3. Athena can then query those Parquet files directly, and S3 lifecycle rules can move older partitions to Glacier to cut costs further.
variable databases {
type = map(object({}))
}
variable connections {
type = map(object({
connection_properties = map(string)
physical_connection_requirements = object({
availability_zone = string
security_group_ids = list(string)
subnet_id = string
})
}))
}
variable crawlers {
type = map(object({
database_name = string
path = string
role = string
schedule = string
jdbc_targets = map(object({
path = string
}))
job_triggers = map(object({}))
}))
}
variable jobs {
type = map(object({
enable = bool
role = string
worker_type = string
glue_version = string
number_of_workers = number
maximum_concurrent_runs = number
connections = list(string)
args = map(string)
}))
}
resource "random_string" "bucket_postfix" {
length = 8
special = false
upper = false
}
resource aws_glue_catalog_database db {
for_each = var.databases
name = each.key
}
resource aws_glue_connection connection {
for_each = var.connections
name = each.key
connection_properties = each.value.connection_properties
physical_connection_requirements {
availability_zone = each.value.physical_connection_requirements.availability_zone
security_group_id_list = each.value.physical_connection_requirements.security_group_ids
subnet_id = each.value.physical_connection_requirements.subnet_id
}
}
resource aws_glue_crawler crawler {
for_each = var.crawlers
name = each.key
database_name = each.value.database_name
role = each.value.role
schema_change_policy {
delete_behavior = "DELETE_FROM_DATABASE"
}
dynamic jdbc_target {
for_each = each.value.jdbc_targets
content {
connection_name = jdbc_target.key
path = jdbc_target.value.path
}
}
}
resource aws_s3_bucket notebooks {
bucket = "glue-notebooks-${random_string.bucket_postfix.result}"
}
resource aws_s3_bucket_policy notebooks_policy {
bucket = aws_s3_bucket.notebooks.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "glue.amazonaws.com"
}
Action = [
"s3:GetObject*",
"s3:PutObject"
]
Resource = [
"${aws_s3_bucket.notebooks.arn}/*"
]
},
{
Effect = "Allow"
Principal = {
Service = "glue.amazonaws.com"
}
Action = [
"s3:ListBucket"
]
Resource = [
aws_s3_bucket.notebooks.arn
]
}
]
})
}
resource aws_s3_object cold_storage_migration {
bucket = aws_s3_bucket.notebooks.id
key = "scripts/cold-storage-migration.py"
source = "${path.module}/scripts/cold-storage-migration.py"
etag = filemd5("${path.module}/scripts/cold-storage-migration.py")
}
resource aws_cloudwatch_log_group cold_storage_migration {
name = "cold-storage-migration"
retention_in_days = 14
}
resource aws_glue_job job {
for_each = var.jobs
name = each.key
connections = each.value.connections
role_arn = each.value.role
glue_version = each.value.glue_version
worker_type = each.value.worker_type
number_of_workers = each.value.number_of_workers
command {
python_version = "3"
script_location = "s3://${aws_s3_bucket.notebooks.bucket}/scripts/cold-storage-migration.py"
}
default_arguments = merge({
"--continuous-log-logGroup" = aws_cloudwatch_log_group.cold_storage_migration.name
"--job-language" = "python"
"--TempDir" = "s3://${aws_s3_bucket.notebooks.bucket}/tmp"
"--enable-continuous-cloudwatch-log" = "true"
"--enable-continuous-log-filter" = "true"
"--enable-metrics" = "true"
"--enable-job-insights" = "true"
"--enable-observability-metrics" = "true"
"--enable-glue-datacatalog" = "true"
"--enable-auto-scaling" = "true"
}, var.jobs.cold_storage_migration.args)
execution_property {
max_concurrent_runs = var.jobs.cold_storage_migration.maximum_concurrent_runs
}
}
# Triggers
resource aws_glue_workflow main {
name = "cold-storage-migration"
}
resource aws_glue_trigger start_workflow {
name = "start cold storage migration"
workflow_name = aws_glue_workflow.main.name
type = "SCHEDULED"
schedule = "cron(0 4 ? * 7 *)"
actions {
crawler_name = aws_glue_crawler.crawler["example_sync"].name
}
}
resource aws_glue_trigger job_trigger {
for_each = var.crawlers
workflow_name = aws_glue_workflow.main.name
name = "${each.key}-trigger"
type = "CONDITIONAL"
start_on_creation = true
description = "Trigger for ${each.key} crawler"
predicate {
conditions {
crawler_name = aws_glue_crawler.crawler[each.key].name
crawl_state = "SUCCEEDED"
}
}
dynamic actions {
for_each = each.value.job_triggers
content {
job_name = aws_glue_job.job[actions.key].name
}
}
}
# Athena analytics
resource aws_s3_bucket athena_results {
bucket = "athena-results-${random_string.bucket_postfix.result}"
}
resource aws_s3_bucket_policy athena_results_policy {
bucket = aws_s3_bucket.athena_results.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Principal = {
Service = "athena.amazonaws.com"
},
Action = "s3:PutObject",
Resource = "${aws_s3_bucket.athena_results.arn}/*"
}
]
})
}
resource aws_athena_workgroup default {
name = "default"
configuration {
enforce_workgroup_configuration = true
publish_cloudwatch_metrics_enabled = true
result_configuration {
output_location = "s3://${aws_s3_bucket.athena_results.bucket}/output/"
}
}
}
suffixes_to_include list controls which tables are processed. Update it to match your own audit table names — the script filters the Glue catalog to only tables whose S3 location ends with one of these suffixes.
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import sys
import boto3
import json
from pyspark.sql.functions import col, year, month
import pyspark.sql.functions as F
from awsglue.transforms import DropNullFields
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, max as spark_max
from datetime import datetime, timedelta
# Initialize Glue job
args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name", "s3_bucket"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
# Define Glue database name and S3 bucket
database_name = args["database_name"]
s3_bucket = "s3://" + args["s3_bucket"]
s3_client = boto3.client("s3")
print(f"Using database: {database_name}")
print(f"Storing data in: {s3_bucket}")
# Update this list to match your audit table name suffixes
suffixes_to_include = [
"audit_automation_records",
"audit_data_records",
"audit_security_records",
"audit_webhook_records",
"phone_tree_audit_record",
"notification_records"
]
# Function to get Glue tables
def get_glue_tables(database=None):
client = boto3.client('glue')
next_token = ''
tables = []
while True:
response = client.get_tables(DatabaseName=database, NextToken=next_token)
for table in response.get('TableList'):
tables.append({
'name': table['Name'],
'location': table['StorageDescriptor']['Location'],
'connection': table['Parameters']['connectionName'],
'classification': table['Parameters']['classification'],
})
next_token = response.get('NextToken')
if next_token is None:
return tables
def load_last_processed_timestamp(bucket, key):
"""Load the last processed timestamp from S3."""
try:
obj = s3_client.get_object(Bucket=bucket, Key=key)
data = json.loads(obj["Body"].read().decode("utf-8"))
return data.get("last_processed", "1970-01-01 00:00:00")
except s3_client.exceptions.NoSuchKey:
return "1970-01-01 00:00:00" # Default for first run
def save_last_processed_timestamp(bucket, key, timestamp):
"""Save the last processed timestamp to S3."""
data = {"last_processed": timestamp}
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=json.dumps(data)
)
# Retrieve all table names from Glue catalog
tables = get_glue_tables(database_name)
print(f"Tables in database {database_name}: {tables}")
# Include only tables that end with the specified suffixes
filtered_tables = [
table for table in tables
if any(table['location'].endswith(suffix) for suffix in suffixes_to_include) and
table.get('classification') == 'postgresql'
]
until_timestamp = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
print(f"🔹 Until processed timestamp: {until_timestamp}")
print(f"Filtered tables: {filtered_tables}")
# Read each table into a DynamicFrame and write to S3 in Parquet format
for table in filtered_tables:
table_name = table['name']
table_s3_key = f"state/{table_name}.json"
last_timestamp = load_last_processed_timestamp(args["s3_bucket"], table_s3_key)
print(f"🔹 Last processed timestamp for {table}: {last_timestamp}")
try:
print(f"📥 Reading table: {table_name}")
# Remove database prefix (if necessary)
dbtable_value = table['location'].replace("example.", "", 1)
db, sch, tbl = table['location'].split(".")
# Read data from PostgreSQL with job bookmarks enabled
dynamic_frame = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": "true",
"connectionName": table['connection'],
"dbtable": dbtable_value,
"sampleQuery": f'SELECT * FROM "{sch}"."{tbl}" WHERE created_at > \'{last_timestamp}\'::timestamp AND created_at <= \'{until_timestamp}\'::timestamp',
}
)
# Apply DropNullFields transformation to remove null fields
dynamic_frame = DropNullFields.apply(dynamic_frame)
# Convert to Spark DataFrame
df = dynamic_frame.toDF()
# Check if DataFrame is empty before proceeding
if df.isEmpty():
print(f"❌ Table {table_name} has no data, skipping write operation.")
continue # Skip this table if it's empty
# Add partition columns (year, month) to the DataFrame
df = df.withColumn("year", F.year(F.col("created_at"))).withColumn("month", F.month(F.col("created_at")))
# Get new max timestamp
new_max_timestamp = df.agg(spark_max("created_at")).collect()[0][0]
print("🔹 New max timestamp:", new_max_timestamp)
if new_max_timestamp:
new_max_timestamp = new_max_timestamp.strftime("%Y-%m-%d %H:%M:%S")
save_last_processed_timestamp(args["s3_bucket"], table_s3_key, new_max_timestamp)
else:
print(f"⚠️ No new timestamp to save for {table}.")
# Convert back to DynamicFrame using correct method
dynamic_frame_with_partitions = DynamicFrame.fromDF(df, glueContext)
# Define S3 path
output_path = f"{s3_bucket}/{db}/{sch}/{tbl}/"
print(f"📤 Writing table {table_name} to {output_path} with partitioning...")
# Write the DynamicFrame to S3 with partitioning
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame_with_partitions,
connection_type="s3",
connection_options={
"path": output_path,
"partitionKeys": ["year", "month"]
},
format="parquet",
)
except Exception as e:
print(f"❌ Error processing table {table_name}: {str(e)}")
raise
example and example-2) — for example, a primary and a read replica, or two separate services sharing the same Glue workflow. Adjust the connections, crawlers, and jobs blocks to match your own setup.
include "root" {
path = find_in_parent_folders()
}
include "aws" {
path = find_in_parent_folders("aws.hcl")
}
dependency "vpc" {
config_path = "${find_in_parent_folders("vpc")}/vpc"
}
dependency "sg" {
config_path = "${get_original_terragrunt_dir()}/../sg"
}
dependency "iam" {
config_path = "${find_in_parent_folders("iam")}/roles/glue/role"
}
dependency "s3_data_lake" {
config_path = "${find_in_parent_folders("s3")}/buckets/data-lake"
}
dependency "rds" {
config_path = "${find_in_parent_folders("rds")}/example/master-instance"
}
dependency "rds_example_2" {
config_path = "${find_in_parent_folders("rds")}/example/example-2/master-instance"
}
terraform {
source = "${get_path_to_repo_root()}//modules/aws/glue"
}
locals {
secrets = yamldecode(sops_decrypt_file(find_in_parent_folders("secrets.yaml")))
}
inputs = {
databases = {
example = {}
}
connections = {
"example" = {
connection_properties = {
JDBC_CONNECTION_URL = "jdbc:postgresql://${dependency.rds.outputs.db_instance_endpoint}/example"
USERNAME = local.secrets.example.db.users.glue.username
PASSWORD = local.secrets.example.db.users.glue.password
}
physical_connection_requirements = {
availability_zone = dependency.vpc.outputs.azs[0]
security_group_ids = [dependency.sg.outputs.security_group_id]
subnet_id = dependency.vpc.outputs.private_subnets[0]
}
}
"example-2" = {
connection_properties = {
JDBC_CONNECTION_URL = "jdbc:postgresql://${dependency.rds_example_2.outputs.db_instance_endpoint}/example"
USERNAME = local.secrets.example.db.users.glue.username
PASSWORD = local.secrets.example.db.users.glue.password
}
physical_connection_requirements = {
availability_zone = dependency.vpc.outputs.azs[0]
security_group_ids = [dependency.sg.outputs.security_group_id]
subnet_id = dependency.vpc.outputs.private_subnets[0]
}
}
}
crawlers = {
example_sync = {
database_name = "example"
connection_names = ["example", "example-2"]
path = "example/%"
role = dependency.iam.outputs.iam_role_arn
schedule = "cron(0 4 ? * 7 *)"
jdbc_targets = {
"example" = {
path = "example/%"
}
"example-2" = {
path = "example/%"
}
}
job_triggers = {
cold_storage_migration = {}
}
}
}
jobs = {
cold_storage_migration = {
enable = true
role = dependency.iam.outputs.iam_role_arn
glue_version = "5.0"
worker_type = "G.1X"
maximum_concurrent_runs = 5
number_of_workers = 5
connections = ["example", "example-2"]
args = {
"--s3_bucket" = dependency.s3_data_lake.outputs.s3_bucket_id
"--database_name" = "example"
}
}
}
}
DROP TABLE it — which is instant and leaves no bloat behind.
In practice though, partitioning is only easy if you design for it upfront. Retrofitting an existing large audit table into a partitioned table is a significant migration: you need to rename the original table, create a new partitioned parent, re-insert all existing rows (or use pg_repack), recreate indexes and foreign keys, and update any sequences. For most teams this risk and operational complexity outweighs the benefit, especially when a simpler alternative exists.
DELETE WHERE created_at < cutoff. It's slower than dropping a partition and it creates dead tuples that autovacuum needs to clean up — but for audit tables that are shrinking over time, autovacuum handles this well and the simplicity is worth it.
pg_cron is a Postgres extension supported on AWS RDS that lets you schedule SQL statements as cron jobs directly inside the database, with no Lambda or external scheduler required. Enable it once per database:
CREATE EXTENSION IF NOT EXISTS pg_cron;
Then schedule a weekly cleanup job for each audit table. The example below deletes rows older than 90 days, running every Sunday at 5am UTC — one hour after the Glue workflow finishes:
SELECT cron.schedule(
'delete-old-audit-records',
'0 5 * * 0',
$$
DELETE FROM your_schema.audit_automation_records
WHERE created_at < NOW() - INTERVAL '90 days';
DELETE FROM your_schema.audit_data_records
WHERE created_at < NOW() - INTERVAL '90 days';
DELETE FROM your_schema.audit_security_records
WHERE created_at < NOW() - INTERVAL '90 days';
$$
);
You can inspect scheduled jobs and their last run status at any time:
-- List all scheduled jobs
SELECT * FROM cron.job;
-- Check execution history
SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 20;
last_processed timestamp for a table is older than expected, something has gone wrong with the migration and you should not delete.
Vacuum after large deletes. The first few cleanup runs may delete a large number of rows at once. Run VACUUM ANALYZE on the affected tables shortly after to reclaim storage and keep query plans accurate:
VACUUM ANALYZE your_schema.audit_automation_records;
After the initial cleanup, autovacuum will keep up with the regular weekly deletes on its own.
pg_cron quietly keeps the Postgres tables lean — no manual maintenance required.
Powered by Golang net/http package