Data Pipeline Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Data Sources โ
โ (APIs, Databases, IoT, Events, Files, Webhooks) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
Ingestion
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Message Broker โ
โ (Kafka, RabbitMQ, Redis) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโผโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ โโโโโโโโโผโโโโโโโโ
โ Streaming โ โ Batch โ โ Real-time โ
โ Processors โ โ Processors โ โ Alerts โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโโโโผโโโโโโโโ โโโโโโโโโโโผโโโโโโโโโโ โโโโโโโโโผโโโโโโโโ
โ Time-Series โ โ Data Warehouse โ โ Analytics โ
โ (TimescaleDB) โ โ (PostgreSQL) โ โ (ClickHouse)โ
โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ
|
Stream Processing with Kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| # Real-time event processing
from faust import App, Record
from datetime import datetime
app = App('data-pipeline', broker='kafka://localhost:9092')
class SensorReading(Record):
device_id: str
value: float
timestamp: datetime
sensor_topic = app.topic('sensors', value_type=SensorReading)
validated_topic = app.topic('sensors-validated', value_type=SensorReading)
@app.agent(sensor_topic)
async def process_readings(readings):
async for reading in readings:
# Validate
if not is_valid(reading):
await dead_letter.send(value=reading)
continue
# Enrich with metadata
reading.zone = await get_zone(reading.device_id)
reading.plant_type = await get_plant_type(reading.zone)
# Check for anomalies
if await is_anomaly(reading):
await alerts.send(value=Alert(reading))
# Forward to validated topic
await validated_topic.send(value=reading)
# Windowed aggregations
@app.agent(validated_topic)
async def aggregate_readings(readings):
async for reading in readings.group_by(SensorReading.zone):
await update_hourly_aggregate(reading)
|
ETL Patterns I Implement
| Pattern | Use Case | Technology |
|---|
| Stream-first | Real-time requirements | Kafka, Faust, Spark Streaming |
| Batch ETL | Historical processing | Airflow, Celery, dbt |
| CDC | Database replication | Debezium, Kafka Connect |
| Lambda | Mixed latency needs | Streaming + batch |
| ELT | Transform in warehouse | dbt, SQL transforms |
Data Quality Framework
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| class DataQualityPipeline:
def validate(self, record: dict) -> ValidationResult:
checks = [
# Schema validation
self.check_schema(record),
# Required fields
self.check_required_fields(record),
# Range validation
self.check_value_ranges(record),
# Referential integrity
self.check_references(record),
# Business rules
self.check_business_rules(record)
]
failures = [c for c in checks if not c.passed]
if failures:
return ValidationResult(
valid=False,
errors=failures,
action='dead_letter' if critical(failures) else 'flag'
)
return ValidationResult(valid=True)
|
Technologies for Data Pipelines
- Streaming: Kafka, RabbitMQ, Redis Streams
- Processing: Faust, Spark, Celery
- Orchestration: Airflow, Dagster, Prefect
- Storage: TimescaleDB, ClickHouse, PostgreSQL
- Quality: Great Expectations, custom validators
- Monitoring: Prometheus, Grafana, custom dashboards
Frequently Asked Questions
What is data pipeline development?
Data pipeline development involves building systems that extract data from various sources, transform it for analysis, and load it into data warehouses or lakes. This includes batch ETL, real-time streaming, data quality checks, and orchestration.
How much does data pipeline development cost?
Data pipeline development typically costs $110-160 per hour. A basic ETL pipeline starts around $15,000-30,000, while enterprise data platforms with multiple sources, real-time processing, and data quality governance range from $75,000-250,000+.
ETL vs ELT: which approach should I use?
Use ETL (transform before loading) when: you need to clean data before storage, have limited warehouse compute, or need to redact sensitive data. Use ELT (load then transform) when: you have powerful warehouses (Snowflake, BigQuery) and want flexibility. ELT is now more common.
I work with: Airflow (orchestration), dbt (transformation), Airbyte/Fivetran (ingestion), Spark (large-scale processing), and cloud-native tools (AWS Glue, GCP Dataflow). The choice depends on data volume, team skills, and existing infrastructure.
How do you ensure data pipeline reliability?
I implement: idempotent operations, data quality checks, schema validation, alerting on failures, backfill capabilities, lineage tracking, and proper retry logic. Production pipelines need observability and the ability to recover from failures.
Experience:
Case Studies:
Related Technologies: Kafka, Celery, PostgreSQL, Python, Redis