Simple Data Processing Pipelines in Python for Everyday Work
I wrote a script to process a 2GB log file. Ran it. My laptop froze. Task manager showed Python eating 8GB of RAM before the OOM killer stepped in.
The problem? I loaded the entire file into a list, then created another list for filtered results, then another for transformed results. Three copies of 2GB data in memory.
The fix took 10 minutes and changed readlines() to yield. Suddenly the script used ~50MB regardless of file size. Generators are magic.
The Memory Trap
This is the naïve approach everyone writes first:
# ❌ Loads everything into memory
def process_log(filename):
with open(filename) as f:
lines = f.readlines() # 2GB in RAM
filtered = []
for line in lines:
if "ERROR" in line:
filtered.append(line) # More RAM
return filtered
For a 100KB file, this is fine. For a 10GB file, your script crashes.
Generators to the Rescue
A generator function uses yield instead of return. It produces values one at a time, pausing between each. Memory stays constant.
# ✅ Constant memory usage
def read_lines(filename):
with open(filename) as f:
for line in f:
yield line.strip()
def filter_errors(lines):
for line in lines:
if "ERROR" in line:
yield line
# Usage
lines = read_lines("huge.log")
errors = filter_errors(lines)
for error in errors:
print(error)
Nothing happens until you iterate. When you ask for the first error, filter_errors asks read_lines for lines one by one until it finds an error. One line in memory at a time.
Building a Pipeline
The pattern is: chain generators together. Each step takes an iterable and yields transformed items.
import csv
def read_users(filepath):
"""Extract: Read CSV rows one at a time"""
with open(filepath, encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
yield row
def filter_active(users):
"""Transform: Keep only active users"""
for user in users:
if user['status'] == 'active':
yield user
def normalize_phones(users):
"""Transform: Clean up phone numbers"""
for user in users:
user = user.copy()
user['phone'] = user['phone'].replace('-', '').replace(' ', '')
yield user
def save_json(users, output_path):
"""Load: Write results to file"""
import json
with open(output_path, 'w') as f:
f.write('[\n')
first = True
for user in users:
if not first:
f.write(',\n')
json.dump(user, f)
first = False
f.write('\n]')
# Compose the pipeline
def main():
users = read_users('users.csv')
users = filter_active(users)
users = normalize_phones(users)
save_json(users, 'active_users.json')
print("Done!")
main()
This processes millions of rows using a few KB of memory. Each function is single-purpose and testable.
Error Handling
What if row 50,000 has bad data? You don't want to lose 49,999 good rows.
def safe_transform(items, transform_fn):
"""Skip bad items instead of crashing"""
for item in items:
try:
yield transform_fn(item)
except Exception as e:
print(f"Skipping bad item: {e}")
continue
# Usage
def parse_date(user):
user = user.copy()
user['signup'] = datetime.strptime(user['signup'], '%Y-%m-%d')
return user
users = read_users('users.csv')
users = safe_transform(users, parse_date)
# Bad dates are logged and skipped
When to Use This
This pattern shines for:
- Log file processing
- CSV/JSON transformations
- Data cleaning pipelines
- ETL scripts
- Anything where input is larger than available RAM
For complex analytics, use pandas. For streaming data, use this. For really big data, use Spark. But for most everyday data munging, generators are perfect.
Quick Mental Model
Think of it like a factory assembly line. Each worker (function) does one thing and passes the item to the next worker. No worker needs to see all items at once—they just handle whatever comes down the line.
Lists are warehouses. Generators are conveyor belts. When dealing with big data, you want conveyor belts.