How to migrate audit logs from Postgres RDS to S3 using AWS Glue

Thu May 21 13:46:54 2026
If you've been storing audit logs in a relational database, you've probably noticed the table growing without bound — slowing down your application and increasing database costs over time. Moving that historical data to S3 gives you cheap, durable storage and lets you query it on-demand with Athena. This article walks through a complete setup using AWS Glue to incrementally extract audit records from Postgres RDS, convert them to Parquet, and land them in S3.
Why not use Glue job bookmarks?
AWS Glue's built-in job bookmarks track progress using auto-incrementing primary keys. If your tables use UUIDs as primary keys — which is common — bookmarks will fail to correctly detect new rows. This setup uses a custom approach instead: after each run, the job saves the highest created_at timestamp it processed to a small JSON file in S3. The next run reads that watermark and only fetches rows created after it.
Architecture
The pipeline has three stages that run in sequence via a Glue Workflow: 1. Glue Crawler scans your Postgres database through a JDBC connection and updates the Glue Data Catalog with the current schema. 2. Glue Job reads the delta since the last run, drops null fields, adds year/month partition columns, and writes Parquet files to S3. 3. Athena can then query those Parquet files directly, and S3 lifecycle rules can move older partitions to Glacier to cut costs further.
Terraform module
All resources live in a reusable Terraform module. The examples below use Terragrunt to call it, but plain Terraform works just as well.
variables.tf

variable databases {
  type = map(object({}))
}

variable connections {
  type = map(object({
    connection_properties = map(string)
    physical_connection_requirements = object({
      availability_zone = string
      security_group_ids = list(string)
      subnet_id = string
    })
  }))
}

variable crawlers {
  type = map(object({
    database_name    = string
    path             = string
    role             = string
    schedule         = string
    jdbc_targets     = map(object({
      path = string
    }))
    job_triggers     = map(object({}))
  }))
}

variable jobs {
  type = map(object({
    enable                 = bool
    role                   = string
    worker_type            = string
    glue_version           = string
    number_of_workers      = number
    maximum_concurrent_runs = number
    connections            = list(string)
    args = map(string)
  }))
}
main.tf
The module creates the Glue catalog database, JDBC connections, crawlers, the S3 bucket for scripts, the Glue job itself, and a workflow with triggers that chain them together. It also provisions an Athena workgroup with a dedicated results bucket.

resource "random_string" "bucket_postfix" {
  length  = 8
  special = false
  upper   = false
}

resource aws_glue_catalog_database db {
  for_each = var.databases
  name = each.key
}

resource aws_glue_connection connection {
  for_each = var.connections

  name = each.key
  connection_properties = each.value.connection_properties
  physical_connection_requirements {
    availability_zone      = each.value.physical_connection_requirements.availability_zone
    security_group_id_list = each.value.physical_connection_requirements.security_group_ids
    subnet_id              = each.value.physical_connection_requirements.subnet_id
  }
}

resource aws_glue_crawler crawler {
  for_each     = var.crawlers
  name         = each.key

  database_name = each.value.database_name
  role          = each.value.role

  schema_change_policy {
    delete_behavior = "DELETE_FROM_DATABASE"
  }

  dynamic jdbc_target {
    for_each = each.value.jdbc_targets

    content {
      connection_name = jdbc_target.key
      path            = jdbc_target.value.path
    }
  }
}

resource aws_s3_bucket notebooks {
  bucket = "glue-notebooks-${random_string.bucket_postfix.result}"
}

resource aws_s3_bucket_policy notebooks_policy {
  bucket = aws_s3_bucket.notebooks.id
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "glue.amazonaws.com"
        }
        Action = [
          "s3:GetObject*",
          "s3:PutObject"
        ]
        Resource = [
          "${aws_s3_bucket.notebooks.arn}/*"
        ]
      },
      {
        Effect = "Allow"
        Principal = {
          Service = "glue.amazonaws.com"
        }
        Action = [
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.notebooks.arn
        ]
      }
    ]
  })
}

resource aws_s3_object cold_storage_migration {
  bucket = aws_s3_bucket.notebooks.id
  key    = "scripts/cold-storage-migration.py"
  source = "${path.module}/scripts/cold-storage-migration.py"
  etag   = filemd5("${path.module}/scripts/cold-storage-migration.py")
}

resource aws_cloudwatch_log_group cold_storage_migration {
  name              = "cold-storage-migration"
  retention_in_days = 14
}

resource aws_glue_job job {
  for_each     = var.jobs
  name         = each.key

  connections = each.value.connections

  role_arn          = each.value.role
  glue_version      = each.value.glue_version
  worker_type       = each.value.worker_type
  number_of_workers = each.value.number_of_workers

  command {
    python_version  = "3"
    script_location = "s3://${aws_s3_bucket.notebooks.bucket}/scripts/cold-storage-migration.py"
  }


  default_arguments = merge({
    "--continuous-log-logGroup"          = aws_cloudwatch_log_group.cold_storage_migration.name
    "--job-language"                     = "python"
    "--TempDir"                          = "s3://${aws_s3_bucket.notebooks.bucket}/tmp"
    "--enable-continuous-cloudwatch-log" = "true"
    "--enable-continuous-log-filter"     = "true"
    "--enable-metrics"                   = "true"
    "--enable-job-insights"              = "true"
    "--enable-observability-metrics"     = "true"
    "--enable-glue-datacatalog"          = "true"
    "--enable-auto-scaling"              = "true"
  }, var.jobs.cold_storage_migration.args)

  execution_property {
    max_concurrent_runs = var.jobs.cold_storage_migration.maximum_concurrent_runs
  }
}

# Triggers
resource aws_glue_workflow main {
  name = "cold-storage-migration"
}

resource aws_glue_trigger start_workflow {
  name          = "start cold storage migration"
  workflow_name = aws_glue_workflow.main.name

  type          = "SCHEDULED"
  schedule      = "cron(0 4 ? * 7 *)"

  actions {
    crawler_name = aws_glue_crawler.crawler["example_sync"].name
  }
}


resource aws_glue_trigger job_trigger {
  for_each = var.crawlers

  workflow_name = aws_glue_workflow.main.name
  name     = "${each.key}-trigger"
  type     = "CONDITIONAL"
  start_on_creation = true
  description = "Trigger for ${each.key} crawler"

  predicate {
    conditions {
      crawler_name = aws_glue_crawler.crawler[each.key].name
      crawl_state  = "SUCCEEDED"
    }
  }

  dynamic actions {
    for_each = each.value.job_triggers

    content {
      job_name = aws_glue_job.job[actions.key].name
    }
  }
}

# Athena analytics
resource aws_s3_bucket athena_results {
  bucket = "athena-results-${random_string.bucket_postfix.result}"
}

resource aws_s3_bucket_policy athena_results_policy {
  bucket = aws_s3_bucket.athena_results.id
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Principal = {
          Service = "athena.amazonaws.com"
        },
        Action = "s3:PutObject",
        Resource = "${aws_s3_bucket.athena_results.arn}/*"
      }
    ]
  })
}

resource aws_athena_workgroup default {
  name = "default"

  configuration {
    enforce_workgroup_configuration    = true
    publish_cloudwatch_metrics_enabled = true

    result_configuration {
      output_location = "s3://${aws_s3_bucket.athena_results.bucket}/output/"
    }
  }
}
The Glue job
The Python script iterates over the tables you care about, fetches the rows created since the last run, converts them to Parquet partitioned by year and month, and writes them to S3. The suffixes_to_include list controls which tables are processed. Update it to match your own audit table names — the script filters the Glue catalog to only tables whose S3 location ends with one of these suffixes.

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
import sys
import boto3
import json
from pyspark.sql.functions import col, year, month
import pyspark.sql.functions as F
from awsglue.transforms import DropNullFields
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, max as spark_max
from datetime import datetime, timedelta

# Initialize Glue job
args = getResolvedOptions(sys.argv, ["JOB_NAME", "database_name", "s3_bucket"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Define Glue database name and S3 bucket
database_name = args["database_name"]
s3_bucket = "s3://" + args["s3_bucket"]
s3_client = boto3.client("s3")
print(f"Using database: {database_name}")
print(f"Storing data in: {s3_bucket}")

# Update this list to match your audit table name suffixes
suffixes_to_include = [
    "audit_automation_records",
    "audit_data_records",
    "audit_security_records",
    "audit_webhook_records",
    "phone_tree_audit_record",
    "notification_records"
]

# Function to get Glue tables
def get_glue_tables(database=None):
    client = boto3.client('glue')
    next_token = ''
    tables = []

    while True:
        response = client.get_tables(DatabaseName=database, NextToken=next_token)
        for table in response.get('TableList'):
            tables.append({
                'name': table['Name'],
                'location': table['StorageDescriptor']['Location'],
                'connection': table['Parameters']['connectionName'],
                'classification': table['Parameters']['classification'],
            })
        next_token = response.get('NextToken')
        if next_token is None:
            return tables

def load_last_processed_timestamp(bucket, key):
    """Load the last processed timestamp from S3."""
    try:
        obj = s3_client.get_object(Bucket=bucket, Key=key)
        data = json.loads(obj["Body"].read().decode("utf-8"))
        return data.get("last_processed", "1970-01-01 00:00:00")
    except s3_client.exceptions.NoSuchKey:
        return "1970-01-01 00:00:00"  # Default for first run

def save_last_processed_timestamp(bucket, key, timestamp):
    """Save the last processed timestamp to S3."""
    data = {"last_processed": timestamp}
    s3_client.put_object(
        Bucket=bucket,
        Key=key,
        Body=json.dumps(data)
    )

# Retrieve all table names from Glue catalog
tables = get_glue_tables(database_name)
print(f"Tables in database {database_name}: {tables}")

# Include only tables that end with the specified suffixes
filtered_tables = [
    table for table in tables
    if any(table['location'].endswith(suffix) for suffix in suffixes_to_include) and
       table.get('classification') == 'postgresql'
]

until_timestamp = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
print(f"🔹 Until processed timestamp: {until_timestamp}")
print(f"Filtered tables: {filtered_tables}")

# Read each table into a DynamicFrame and write to S3 in Parquet format
for table in filtered_tables:
    table_name = table['name']
    table_s3_key = f"state/{table_name}.json"
    last_timestamp = load_last_processed_timestamp(args["s3_bucket"], table_s3_key)
    print(f"🔹 Last processed timestamp for {table}: {last_timestamp}")
    try:
        print(f"📥 Reading table: {table_name}")

        # Remove database prefix (if necessary)
        dbtable_value = table['location'].replace("example.", "", 1)
        db, sch, tbl = table['location'].split(".")


        # Read data from PostgreSQL with job bookmarks enabled
        dynamic_frame = glueContext.create_dynamic_frame.from_options(
            connection_type="postgresql",
            connection_options={
                    "useConnectionProperties": "true",
                    "connectionName": table['connection'],
                    "dbtable": dbtable_value,
                    "sampleQuery": f'SELECT * FROM "{sch}"."{tbl}" WHERE created_at > \'{last_timestamp}\'::timestamp AND created_at <= \'{until_timestamp}\'::timestamp',
            }
        )

        # Apply DropNullFields transformation to remove null fields
        dynamic_frame = DropNullFields.apply(dynamic_frame)

        # Convert to Spark DataFrame
        df = dynamic_frame.toDF()

        # Check if DataFrame is empty before proceeding
        if df.isEmpty():
            print(f"❌ Table {table_name} has no data, skipping write operation.")
            continue  # Skip this table if it's empty

        # Add partition columns (year, month) to the DataFrame
        df = df.withColumn("year", F.year(F.col("created_at"))).withColumn("month", F.month(F.col("created_at")))

        # Get new max timestamp
        new_max_timestamp = df.agg(spark_max("created_at")).collect()[0][0]
        print("🔹 New max timestamp:", new_max_timestamp)

        if new_max_timestamp:
            new_max_timestamp = new_max_timestamp.strftime("%Y-%m-%d %H:%M:%S")
            save_last_processed_timestamp(args["s3_bucket"], table_s3_key, new_max_timestamp)
        else:
            print(f"⚠️ No new timestamp to save for {table}.")

        # Convert back to DynamicFrame using correct method
        dynamic_frame_with_partitions = DynamicFrame.fromDF(df, glueContext)

        # Define S3 path
        output_path = f"{s3_bucket}/{db}/{sch}/{tbl}/"
        print(f"📤 Writing table {table_name} to {output_path} with partitioning...")

        # Write the DynamicFrame to S3 with partitioning
        glueContext.write_dynamic_frame.from_options(
            frame=dynamic_frame_with_partitions,
            connection_type="s3",
            connection_options={
                "path": output_path,
                "partitionKeys": ["year", "month"]
            },
            format="parquet",
        )

    except Exception as e:
        print(f"❌ Error processing table {table_name}: {str(e)}")
        raise
Calling the module with Terragrunt
The inputs below show a setup with two RDS instances (example and example-2) — for example, a primary and a read replica, or two separate services sharing the same Glue workflow. Adjust the connections, crawlers, and jobs blocks to match your own setup.

include "root" {
  path = find_in_parent_folders()
}

include "aws" {
  path = find_in_parent_folders("aws.hcl")
}

dependency "vpc" {
  config_path = "${find_in_parent_folders("vpc")}/vpc"
}

dependency "sg" {
  config_path = "${get_original_terragrunt_dir()}/../sg"
}

dependency "iam" {
  config_path = "${find_in_parent_folders("iam")}/roles/glue/role"
}

dependency "s3_data_lake" {
  config_path = "${find_in_parent_folders("s3")}/buckets/data-lake"
}

dependency "rds" {
  config_path = "${find_in_parent_folders("rds")}/example/master-instance"
}

dependency "rds_example_2" {
  config_path = "${find_in_parent_folders("rds")}/example/example-2/master-instance"
}

terraform {
  source = "${get_path_to_repo_root()}//modules/aws/glue"
}

locals {
  secrets = yamldecode(sops_decrypt_file(find_in_parent_folders("secrets.yaml")))
}

inputs = {
  databases = {
    example = {}
  }

  connections = {
    "example" = {
      connection_properties = {
        JDBC_CONNECTION_URL = "jdbc:postgresql://${dependency.rds.outputs.db_instance_endpoint}/example"
        USERNAME            = local.secrets.example.db.users.glue.username
        PASSWORD            = local.secrets.example.db.users.glue.password
      }
      physical_connection_requirements = {
        availability_zone  = dependency.vpc.outputs.azs[0]
        security_group_ids = [dependency.sg.outputs.security_group_id]
        subnet_id          = dependency.vpc.outputs.private_subnets[0]
      }
    }
    "example-2" = {
      connection_properties = {
        JDBC_CONNECTION_URL = "jdbc:postgresql://${dependency.rds_example_2.outputs.db_instance_endpoint}/example"
        USERNAME            = local.secrets.example.db.users.glue.username
        PASSWORD            = local.secrets.example.db.users.glue.password
      }
      physical_connection_requirements = {
        availability_zone  = dependency.vpc.outputs.azs[0]
        security_group_ids = [dependency.sg.outputs.security_group_id]
        subnet_id          = dependency.vpc.outputs.private_subnets[0]
      }
    }
  }

  crawlers = {
    example_sync = {
      database_name    = "example"
      connection_names = ["example", "example-2"]
      path             = "example/%"
      role             = dependency.iam.outputs.iam_role_arn
      schedule         = "cron(0 4 ? * 7 *)"
      jdbc_targets = {
        "example" = {
          path = "example/%"
        }
        "example-2" = {
          path = "example/%"
        }
      }
      job_triggers = {
        cold_storage_migration = {}
      }
    }
  }

  jobs = {
    cold_storage_migration = {
      enable                  = true
      role                    = dependency.iam.outputs.iam_role_arn
      glue_version            = "5.0"
      worker_type             = "G.1X"
      maximum_concurrent_runs = 5
      number_of_workers       = 5
      connections             = ["example", "example-2"]
      args = {
        "--s3_bucket"     = dependency.s3_data_lake.outputs.s3_bucket_id
        "--database_name" = "example"
      }
    }
  }
}
Cleaning up old data from Postgres
Migrating data to S3 only solves half the problem — the rows still exist in Postgres and will keep consuming storage and slowing down queries unless you delete them. Here's the simplest way to do that automatically.
Why not use Postgres table partitioning?
Postgres native partitioning is often suggested as the "right" solution for this kind of problem: you partition the table by month, and once a partition has been migrated you simply DROP TABLE it — which is instant and leaves no bloat behind. In practice though, partitioning is only easy if you design for it upfront. Retrofitting an existing large audit table into a partitioned table is a significant migration: you need to rename the original table, create a new partitioned parent, re-insert all existing rows (or use pg_repack), recreate indexes and foreign keys, and update any sequences. For most teams this risk and operational complexity outweighs the benefit, especially when a simpler alternative exists.
Deleting rows with pg_cron
Without partitioning, the deletion is just a DELETE WHERE created_at < cutoff. It's slower than dropping a partition and it creates dead tuples that autovacuum needs to clean up — but for audit tables that are shrinking over time, autovacuum handles this well and the simplicity is worth it. pg_cron is a Postgres extension supported on AWS RDS that lets you schedule SQL statements as cron jobs directly inside the database, with no Lambda or external scheduler required. Enable it once per database:

CREATE EXTENSION IF NOT EXISTS pg_cron;
Then schedule a weekly cleanup job for each audit table. The example below deletes rows older than 90 days, running every Sunday at 5am UTC — one hour after the Glue workflow finishes:

SELECT cron.schedule(
  'delete-old-audit-records',
  '0 5 * * 0',
  $$
    DELETE FROM your_schema.audit_automation_records
    WHERE created_at < NOW() - INTERVAL '90 days';

    DELETE FROM your_schema.audit_data_records
    WHERE created_at < NOW() - INTERVAL '90 days';

    DELETE FROM your_schema.audit_security_records
    WHERE created_at < NOW() - INTERVAL '90 days';
  $$
);
You can inspect scheduled jobs and their last run status at any time:

-- List all scheduled jobs
SELECT * FROM cron.job;

-- Check execution history
SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 20;
A few important notes
Keep a safety buffer. Don't delete data that your Glue job may not have processed yet. If your Glue job runs weekly and you keep 90 days of data in Postgres, you have roughly 12 weekly runs worth of safety margin before any data is at risk of being deleted before it's migrated. Verify before deleting. For extra confidence, you can query the S3 watermark files your Glue job writes before running deletions. If the last_processed timestamp for a table is older than expected, something has gone wrong with the migration and you should not delete. Vacuum after large deletes. The first few cleanup runs may delete a large number of rows at once. Run VACUUM ANALYZE on the affected tables shortly after to reclaim storage and keep query plans accurate:

VACUUM ANALYZE your_schema.audit_automation_records;
After the initial cleanup, autovacuum will keep up with the regular weekly deletes on its own.
Wrapping up
Once deployed, the workflow runs on a weekly schedule (every Saturday at 4am UTC in the example above — adjust the cron expression to suit your needs). Each run fetches only the rows created since the previous run, converts them to Parquet partitioned by year and month, and appends them to your data lake in S3. From there you can query the data with Athena straight away, and adopt S3 lifecycle rules to transition older partitions to Glacier or Glacier Deep Archive to reduce long-term storage costs. Meanwhile, pg_cron quietly keeps the Postgres tables lean — no manual maintenance required.

Comments
To leave a comment please login via github

Powered by Golang net/http package