- Home
- Course Detail
regularpython@gmail.com
You are now watching:
Python Basics in English / of Class Realtime Example
CSV ETL with S3 using Python Class
Read CSV from S3 → Transform → Upload back to S3 (Class-based design)
High-level Flow
Read CSV file from source S3 bucket.
Transform each row (add
Updated_Date).Write transformed data to another CSV and upload to target S3 bucket.
Code – CsvFileService Class
import boto3
import csv
import io
from datetime import datetime
class CsvFileService:
def __init__(self, source_bucket, source_key, target_bucket, target_key):
# Bucket and object names
self.source_bucket = source_bucket
self.source_key = source_key
self.target_bucket = target_bucket
self.target_key = target_key
self.s3 = boto3.client("s3")
self.data = None
self.transformed_rows = []
def read_the_data_from_s3(self):
# Step 1: Read CSV from S3
response = self.s3.get_object(Bucket=self.source_bucket, Key=self.source_key)
csv_content = response["Body"].read().decode("utf-8")
# Convert CSV string into rows
reader = csv.DictReader(io.StringIO(csv_content))
self.data = list(reader)
def transform_the_data(self):
# Step 2: Transform Data
for row in self.data:
a = row.copy()
# Example transformation: add Updated_Date
a["Updated_Date"] = datetime.today()
self.transformed_rows.append(a)
def upload_to_s3_bucket(self):
# Step 3: Convert transformed data back to CSV string
output_stream = io.StringIO()
writer = csv.DictWriter(output_stream, fieldnames=self.transformed_rows[0].keys())
writer.writeheader()
writer.writerows(self.transformed_rows)
# Final CSV content
final_csv_data = output_stream.getvalue()
# Step 4: Upload back to S3
self.s3.put_object(
Bucket=self.target_bucket,
Key=self.target_key,
Body=final_csv_data.encode("utf-8"),
ContentType="text/csv"
)
print("CSV transformed and uploaded successfully!")
source_bucket = "batch89-etl"
source_key = "customers.csv"
target_bucket = "batch89-etl"
target_key = "output-data.csv"
obj = CsvFileService(source_bucket, source_key, target_bucket, target_key)
obj.read_the_data_from_s3()
obj.transform_the_data()
obj.upload_to_s3_bucket()