Every AI model — from ChatGPT to the recommendation engine on Netflix — runs on data. Lots of it. But raw data is messy. It needs to be collected, cleaned, transformed, and stored before any AI can learn from it.
That's what a data pipeline does.
What is a data pipeline?
A data pipeline is a series of automated steps that move data from one place to another, transforming it along the way.
Think of it like a factory assembly line, but for information:
Each stage has a specific job:
| Stage | What happens | Example |
|---|---|---|
| Ingest | Collect raw data from sources | Pull data from an API every hour |
| Clean | Remove duplicates, fix errors | Drop rows where email is empty |
| Transform | Reshape into the right format | Convert timestamps to UTC |
| Store | Save processed data | Write to a data warehouse |
| Use | Power AI, dashboards, apps | Train a recommendation model |
A real example: How Netflix gets recommendations right
Here's roughly how Netflix's data pipeline works:
- Source: Every time you watch, pause, rewind, or skip, Netflix logs it
- Ingest: Billions of events per day flow into their data systems
- Clean: Filter out accidental plays, duplicate events, test accounts
- Transform: Convert raw events into structured data: "User watched 80% of Stranger Things S3E2"
- Store: Load into a data warehouse (Snowflake, BigQuery, or similar)
- Train: Feed into a recommendation model that learns "if users watch X, they often enjoy Y"
- Serve: When you open Netflix, the model instantly predicts what you'll want to watch
The recommendation you see in 200ms took days of data processing behind the scenes.
The three types of data pipelines
Batch processing
Data is collected and processed in large chunks at scheduled intervals.
# Example: process all new orders from the last 24 hours
import pandas as pd
from datetime import datetime, timedelta
yesterday = datetime.now() - timedelta(days=1)
# Load yesterday's orders
orders = pd.read_csv('orders.csv')
new_orders = orders[orders['date'] >= yesterday.date()]
# Clean: remove cancelled orders
valid_orders = new_orders[new_orders['status'] != 'cancelled']
# Transform: calculate total revenue
valid_orders['revenue'] = valid_orders['quantity'] * valid_orders['price']
# Store
valid_orders.to_parquet('processed/orders_daily.parquet')
Best for: Reports, daily model retraining, analytics dashboards
Stream processing
Data is processed the moment it arrives, event by event.
# Conceptual example: process events as they happen
import json
def process_event(event: dict):
if event['type'] == 'purchase':
user_id = event['user_id']
product = event['product_id']
# Update recommendation model in near real-time
update_user_preferences(user_id, product)
# This runs 24/7, processing millions of events per second
for event in kafka_consumer:
process_event(json.loads(event.value))
Best for: Fraud detection, live recommendations, chat applications
Micro-batch
A middle ground: process data in very small batches (every few seconds).
💡 Tip
When to use what?
- Need analysis today? → Batch
- Need response in milliseconds? → Stream
- Need response in seconds at lower cost? → Micro-batch
Why data quality matters so much for AI
You've probably heard "garbage in, garbage out." Nowhere is this more true than in AI.
A model trained on biased or incorrect data will make biased and incorrect predictions — confidently.
Common data problems to watch for:
- Missing values: sensor failures, optional form fields
- Duplicates: same record inserted twice
- Outliers: a
$0purchase or a200-year-olduser - Schema drift: an upstream API suddenly changes its data format
- Bias: training data that over-represents certain groups
Good data pipelines catch these issues before data reaches the model.
Building your first pipeline (in Python)
Here's a minimal working data pipeline you can run locally:
import pandas as pd
import requests
# 1. INGEST — fetch data from a public API
url = "https://jsonplaceholder.typicode.com/posts"
response = requests.get(url)
raw_data = response.json()
# 2. CLEAN — convert to DataFrame, drop nulls
df = pd.DataFrame(raw_data)
df = df.dropna()
df = df[df['body'].str.len() > 50] # keep only substantive posts
# 3. TRANSFORM — add a word count column
df['word_count'] = df['body'].str.split().str.len()
# 4. STORE — save as Parquet (columnar, compressed)
df.to_parquet('posts_processed.parquet', index=False)
print(f"Pipeline complete: {len(df)} records processed")
Run this and you have a working data pipeline in under 20 lines.
Next steps
Data pipelines are the foundation of everything in AI. With this understanding, you're ready to go deeper:
- What is AI? — understand what AI models do with this data
- Data Engineering Path — the full structured course on pipelines, warehouses, and cloud data
- Prompt Engineering 101 — now that you understand how AI is built, learn to use it better