ETL Data Engineering & Pipeline Monitoring, Simply
ETL Data Engineering & Pipeline Monitoring, Simply
Introduction
If you run ETL jobs, you’ve felt the pain: a nightly pipeline fails silently, a table arrives hours late, a schema change knocks out analytics, or worse, a downstream model ships wrong numbers to leadership. Monitoring is the safety net for every data engineering team, yet it’s often a patchwork of ad hoc logs, spreadsheets, and hasty dashboards. The result is toil, uncertainty, and stakeholder distrust.
This guide solves that problem, simply. We’ll show you how to design ETL data engineering workflows with monitoring built in from the start, not bolted on at the end. You’ll learn how to instrument a pipeline, set data and system SLAs, track freshness and quality, and configure actionable alerts. We’ll use a practical, copy-paste-ready approach and a product SDK so you can go from zero to monitored in an afternoon.
What you’ll gain in this guide:
- A clear mental model for pipeline monitoring, from batch ETL to streaming.
- A step-by-step implementation with a working example you can run locally.
- Code-first instrumentation patterns: runs, tasks, metrics, events, and data quality checks.
- Real monitors: freshness, failure rate, row-count anomalies, and data SLAs.
- Verification steps at each stage, so you know it works as you go.
- Common errors and how to fix them fast.
- Best practices for reliability, cost control, and alert hygiene.
We’ll use the Deadpipe SDK to demonstrate an opinionated but flexible approach to pipeline monitoring. The concepts apply broadly to any stack (Airflow, dbt, Spark, Kafka, Fivetran, custom Python), but the examples are designed for you to copy, paste, and run immediately. If you’ve been searching for a practical guide to ETL data engineering and pipeline monitoring, simply follow along.
Background and Context
Monitoring isn’t just about catching failures; it’s about making data reliable and trustworthy for the business. As data infrastructure has evolved from on-prem ETL to cloud-native ELT and streaming pipelines, the surface area of failures has exploded: API rate limits, cloud permissions, orchestration races, data quality regressions, cost blowouts, and schema drift from up- or downstream teams. This complexity means that basic "job succeeded" notifications aren’t sufficient.
Modern pipeline monitoring spans four dimensions:
- Reliability and correctness: Did the run complete successfully? Did any tasks fail? Were retries exhausted? Did results meet expectations?
- Timeliness and freshness: Are tables updated within their SLA windows? Are runs completing within expected durations? Are end consumers seeing fresh data?
- Data quality and contracts: Do counts, null rates, types, and distributions match expectations? Did upstream schema changes break us?
- Cost and efficiency: Are we spending wisely on compute, storage, and egress? Are we running redundant backfills or oversized clusters?
Historically, engineering teams stitched together monitoring from orchestrator logs (Airflow/Prefect), query logs (warehouse), separate data testing tools (Great Expectations, dbt tests), and generic observability (CloudWatch, Datadog). Each tool is valuable, but without a unifying layer, signals are fragmented. When a pipeline breaks at 2 a.m., the on-call engineer wastes time jumping between tabs to find root cause.
A product like Deadpipe aims to unify these signals: runs, tasks, metrics, events, quality checks, and alerts in one place, connected to your pipelines. Whether you’re running nightly batch ETL or low-latency streams, you want to:
- Instrument pipelines once, consistently.
- Auto-capture run context and lineage across tasks.
- Define SLAs and SLOs in code and config.
- Alert the right team with actionable details, not noise.
- Support backfills, retries, and idempotent replays without duplicating alerts.
- Keep costs predictable, even as monitoring scales.
Why does this matter now? Data is productized and operationalized across organizations. Stakeholders expect reliability akin to software uptime. The good news is you can get there with a clear model and practical tooling. In this guide we’ll show you how to implement that model, with examples for batch and streaming, and with the flexibility to plug into Airflow, dbt, and Spark.
For deeper dives on the monitoring landscape and tool choices, see: Data Pipeline Monitoring Tools: Top 5 ETL and Affordable Data Engineering & Pipeline Monitoring.
Design for Observability: An Instrumentation-First Pipeline (Main Section 1)
Before touching code, adopt an instrumentation-first design. Define what a "run" is, what tasks it contains, and what you’ll measure. This prevents the all-too-common situation where you ship a pipeline and scramble later to add logs and metrics.
Core concepts:
- Pipeline run: A uniquely identified execution of your ETL (e.g., orders_etl run_id=2026-01-05-01).
- Tasks: Stages within a run (extract, transform, load) with timing, success/failure status, and metadata.
- Metrics: Numbers that describe the run and tasks (rows extracted, null count, duration_ms, bytes processed).
- Events: Notable occurrences (schema change detected, retries started, backfill requested).
- Checks: Assertions about data or system properties (rowcount within bounds, freshness within SLA, schema contract held).
- Alerts: Notifications triggered by thresholds, breaches, or anomalies.
With Deadpipe, you initialize a client, wrap runs and tasks, and emit metrics, events, and checks. Here’s a minimal example you can copy and run locally. It simulates an ETL that loads a CSV into a SQLite database and adds basic monitoring.
Prerequisites:
- Python 3.9+
- pip installed
Install and set your API key:
pip install deadpipe-sdk pandas sqlalchemy sqlite-utils
export DEADPIPE_API_KEY="dp_live_xxx_your_api_key"
# optional for environment separation
export DEADPIPE_ENV="dev"
Create a minimal ETL with monitoring:
# etl_orders.py
import os
import time
import uuid
import pandas as pd
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from deadpipe import DeadpipeClient, CheckSeverity
DB_PATH = "orders.db"
CSV_PATH = "orders.csv"
PIPELINE_NAME = "orders_etl"
ENV = os.getenv("DEADPIPE_ENV", "dev")
def ensure_sample_csv(path: str):
if Path(path).exists():
return
df = pd.DataFrame([
{"order_id": 1, "customer_id": "C001", "amount": 23.50, "order_ts": "2026-01-05T07:15:00Z"},
{"order_id": 2, "customer_id": "C002", "amount": 120.00, "order_ts": "2026-01-05T08:45:10Z"},
{"order_id": 3, "customer_id": "C003", "amount": 0.00, "order_ts": "2026-01-05T09:30:00Z"}, # zero amount edge case
])
df.to_csv(path, index=False)
def connect_db(path: str):
return sqlite3.connect(path)
def run_etl():
client = DeadpipeClient(
api_key=os.environ["DEADPIPE_API_KEY"],
env=ENV,
pipeline=PIPELINE_NAME,
default_labels={"team": "data-eng", "owner": "orders", "env": ENV}
)
run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + "-" + uuid.uuid4().hex[:6]
with client.run(run_id=run_id, description="Nightly orders ETL") as run:
run.emit_event("run_started", {"note": "ad-hoc local run"})
# Extract
with run.task("extract") as t:
ensure_sample_csv(CSV_PATH)
start = time.time()
df = pd.read_csv(CSV_PATH)
t.emit_metric("extract.rows_read", int(df.shape[0]))
t.emit_metric("extract.columns", int(df.shape[1]))
t.emit_event("extract.schema_detected", {"columns": df.columns.tolist()})
t.add_metadata({"source.path": CSV_PATH})
t.check(
name="non_empty_source",
passed=(len(df) > 0),
severity=CheckSeverity.HIGH,
message="Source CSV should not be empty"
)
t.emit_metric("duration_ms", int((time.time() - start) * 1000))
# Transform
with run.task("transform") as t:
start = time.time()
# Basic cleanup: ensure order_ts timezone and drop malformed rows
df["order_ts"] = pd.to_datetime(df["order_ts"], errors="coerce", utc=True)
malformed = df["order_ts"].isna().sum()
if malformed > 0:
t.emit_event("transform.malformed_timestamps", {"count": int(malformed)})
df = df.dropna(subset=["order_ts"])
# Business rule: ensure amounts are non-negative
negatives = (df["amount"] < 0).sum()
t.check("no_negative_amounts", negatives == 0, severity=CheckSeverity.HIGH)
# Derive partition date, round amounts
df["order_date"] = df["order_ts"].dt.date
df["amount"] = df["amount"].round(2)
t.emit_metric("transform.rows_out", int(df.shape[0]))
t.emit_metric("transform.malformed_ts", int(malformed))
t.emit_metric("transform.negative_amounts", int(negatives))
t.emit_metric("duration_ms", int((time.time() - start) * 1000))
# Load
with run.task("load") as t:
start = time.time()
conn = connect_db(DB_PATH)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS fact_orders (
order_id INTEGER PRIMARY KEY,
customer_id TEXT,
amount REAL,
order_ts TEXT,
order_date TEXT
)
""")
# Upsert-like behavior using REPLACE INTO for SQLite
rows_before = cur.execute("SELECT COUNT(*) FROM fact_orders").fetchone()[0]
df.to_sql("fact_orders", conn, if_exists="replace", index=False)
rows_after = cur.execute("SELECT COUNT(*) FROM fact_orders").fetchone()[0]
conn.commit()
conn.close()
t.emit_metric("load.rows_before", int(rows_before))
t.emit_metric("load.rows_after", int(rows_after))
t.check("rowcount_not_decreasing", rows_after >= rows_before, severity=CheckSeverity.MEDIUM)
t.emit_metric("duration_ms", int((time.time() - start) * 1000))
t.add_metadata({"target.table": "fact_orders", "target.db": DB_PATH})
# Top-level run-level metrics and freshness check
run.emit_metric("run.total_rows", int(df.shape[0]))
run.check_freshness(
name="orders_freshness",
table="fact_orders",
timestamp_column="order_ts",
max_lag="6h",
severity=CheckSeverity.HIGH,
connection=f"sqlite:///{DB_PATH}"
)
run.emit_event("run_completed", {"rows_loaded": int(df.shape[0])})
if __name__ == "__main__":
run_etl()
What this does:
- Wraps the ETL in a run context that auto-captures timings and status.
- Emits task-level metrics and checks, with severity levels that map to alerting.
- Performs a run-level freshness check directly against the target table.
- Annotates runs with metadata (labels, descriptions) for routing and grouping.
Verification steps:
- Run the script: python etl_orders.py
- Confirm it exits successfully. If you set Deadpipe API key and env correctly, the run, tasks, metrics, and checks appear in the Deadpipe UI within seconds.
- Induce a failure: change check max_lag to "1s" and rerun; verify a freshness check failure appears.
- Induce data quality issue: set a negative amount in the CSV and verify the “no_negative_amounts” check fails.
Tip: During development, set env="dev" and configure alerts to route only to you or a dev channel. When you promote to prod, change env and ownership labels to route alerts appropriately.
Instrumentation Patterns You Should Adopt From Day 1
- Always include a run_id that is stable and meaningful. Prefer a combination of scheduled window and a unique suffix, e.g., 20260105T010000Z-<uuid6>.
- Tag runs with labels that match ownership boundaries: team, service, domain, tier. These become the basis for alert routing and dashboards.
- Emit at least three metrics per task: duration_ms, rows_in/rows_out, and bytes_processed (if available).
- Use checks for business rules, not just system conditions. For example, “orders amount must be between 0 and 50,000.”
- Emit an event any time operator action is helpful to understand context: retries, backfills, schema changes, partial loads, or manual overrides.
SLAs, SLOs, and Freshness You Can Enforce (Main Section 2)
Monitoring becomes actionable when tied to explicit expectations. Define SLAs (hard contractual expectations) and SLOs (internal targets) for both system and data properties.
Common SLA/SLO categories:
- Freshness SLA: “fact_orders updated by 08:30 UTC daily.”
- Duration SLO: “Nightly ETL completes within 20 minutes 95% of days.”
- Quality SLO: “Null rate for customer_id < 0.5%.”
- Availability SLO: “Successful pipeline completion 99% monthly.”
How to encode these in Deadpipe:
- Freshness monitors check lag between now and the latest timestamp in a table.
- Duration monitors watch run and task durations versus baselines.
- Quality monitors assert thresholds on metrics and SQL checks.
- Error rate monitors track consecutive failures or failure budgets.
Example monitors.yaml:
# monitors.yaml
monitors:
- name: orders_freshness_sla
type: freshness
target:
table: sqlite:///orders.db#fact_orders
timestamp_column: order_ts
threshold: 6h
severity: high
window: daily
alert_channels:
- slack:#data-alerts
- pagerduty:data-oncall
labels:
team: data-eng
owner: orders
env: prod
runbook: https://internal.wiki/runbooks/orders-freshness
- name: orders_duration_slo
type: duration
target:
pipeline: orders_etl
percentile: p95
threshold_ms: 20m
severity: medium
evaluation_period: 14d
alert_channels:
- slack:#data-observability
labels:
team: data-eng
owner: platform
- name: orders_rowcount_anomaly
type: anomaly
metric: run.total_rows
scope:
pipeline: orders_etl
method: seasonal
min_window: 30d
sensitivity: medium
severity: medium
alert_channels:
- slack:#data-alerts
- email:bi-leads@company.com
runbook: https://internal.wiki/runbooks/rowcount-anomaly
Apply monitors via CLI or API:
deadpipe monitors apply monitors.yaml --env prod
Verification:
- Trigger the ETL twice to establish a baseline.
- Temporarily drop a row in the CSV to simulate a count drop; confirm the anomaly monitor evaluates.
- Delay the ETL by setting a sleep in the script to push it past the duration threshold; verify a duration alert.
Best practices for SLAs/SLOs:
- Differentiate between hard SLAs (page on breach) and soft SLOs (notify, don’t page).
- Tie each monitor to a runbook link: a documented, step-by-step mitigation path.
- Use evaluation windows long enough to avoid flapping, but short enough to detect regressions.
- Consider separate thresholds for weekdays vs weekends if input volumes differ.
Real-World Example: Extending Checks and Metrics (Main Section 3)
Let’s expand the example with more data quality checks and cost/efficiency metrics. We’ll add:
- Null-rate checks for key columns.
- Distribution checks for amount (range and percentiles).
- Cost proxy metrics (bytes scanned, file size).
- Schema contract checks to enforce column names and types.
Add to transform task:
# Inside transform task in etl_orders.py
# Null-rate checks
for col in ["order_id", "customer_id", "amount", "order_ts"]:
null_rate = float(df[col].isna().mean())
t.emit_metric(f"null_rate.{col}", null_rate)
t.check(
name=f"null_rate_{col}_lt_1pct",
passed=(null_rate < 0.01),
severity=CheckSeverity.MEDIUM,
message=f"{col} null rate is {null_rate:.2%}"
)
# Range and distribution checks for amount
min_amount, max_amount = float(df["amount"].min()), float(df["amount"].max())
p95_amount = float(df["amount"].quantile(0.95))
t.emit_metric("amount.min", min_amount)
t.emit_metric("amount.max", max_amount)
t.emit_metric("amount.p95", p95_amount)
t.check("amount_min_ge_0", min_amount >= 0.0, severity=CheckSeverity.HIGH)
t.check("amount_max_le_50000", max_amount <= 50000, severity=CheckSeverity.MEDIUM)
Schema contract check:
expected_schema = {
"order_id": "int64",
"customer_id": "object",
"amount": "float64",
"order_ts": "datetime64[ns, UTC]",
"order_date": "object",
}
schema_mismatches = {}
for col, expected in expected_schema.items():
actual = str(df[col].dtype)
if actual != expected:
schema_mismatches[col] = {"expected": expected, "actual": actual}
t.check(
name="schema_contract_held",
passed=(len(schema_mismatches) == 0),
severity=CheckSeverity.HIGH,
message=f"Schema mismatches: {schema_mismatches}" if schema_mismatches else "OK"
)
Cost proxy metrics:
import os
file_size_bytes = os.path.getsize(CSV_PATH)
t.emit_metric("extract.source_bytes", int(file_size_bytes))
# If running on a warehouse, substitute with bytes_scanned from query stats
Why add these?
- Null-rate and distribution checks catch subtler regressions than flat rowcount comparisons.
- Schema checks guard against upstream changes breaking downstream assumptions.
- Cost proxies help track when inputs explode in size (e.g., API returning duplicates).
Troubleshooting tip: Schema dtype strings can vary by pandas versions—normalize mapping or use isinstance checks.
Integrating With Airflow and dbt (Main Section 4)
You don’t need to rewrite pipelines to add monitoring. Wrap what you have.
Airflow integration
Strategy:
- Create exactly one Deadpipe run per DAG run.
- Wrap each task body with a Deadpipe task.
- Propagate run_id via Airflow context or XCom.
Example:
# dags/orders_etl.py
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from deadpipe import DeadpipeClient
default_args = {"owner": "data-eng", "retries": 1}
client = DeadpipeClient(
api_key=os.environ["DEADPIPE_API_KEY"],
env="prod",
pipeline="orders_etl",
default_labels={"team": "data-eng", "env": "prod"}
)
def start_run(**context):
run_id = context["dag_run"].run_id
run = client.run(run_id=run_id, description="Airflow DAG run")
# store run token/handle in XCom
context["ti"].xcom_push(key="deadpipe_run_token", value=run.start()) # explicit start if not using context manager
def end_run(**context):
token = context["ti"].xcom_pull(key="deadpipe_run_token")
client.finish_run(token, status="success")
def extract(**context):
token = context["ti"].xcom_pull(key="deadpipe_run_token")
with client.task(run_token=token, name="extract") as t:
# your extract logic...
t.emit_metric("rows_read", 1000)
def transform(**context):
token = context["ti"].xcom_pull(key="deadpipe_run_token")
with client.task(run_token=token, name="transform") as t:
# your transform logic...
t.emit_metric("rows_out", 980)
def load(**context):
token = context["ti"].xcom_pull(key="deadpipe_run_token")
with client.task(run_token=token, name="load") as t:
# your load logic...
t.check("loaded_positive_rows", True)
with DAG(
"orders_etl",
default_args=default_args,
start_date=days_ago(1),
schedule_interval="@daily",
catchup=False,
tags=["orders", "deadpipe"]
) as dag:
t0 = PythonOperator(task_id="start_run", python_callable=start_run, provide_context=True)
t1 = PythonOperator(task_id="extract", python_callable=extract, provide_context=True)
t2 = PythonOperator(task_id="transform", python_callable=transform, provide_context=True)
t3 = PythonOperator(task_id="load", python_callable=load, provide_context=True)
t4 = PythonOperator(task_id="end_run", python_callable=end_run, provide_context=True)
t0 >> t1 >> t2 >> t3 >> t4
Notes:
- Use Airflow’s run_id as Deadpipe’s run_id for coherent deduplication.
- If a task fails, call client.finish_run(token, status="failed") in on_failure_callback to mark the overall run.
dbt integration
dbt already generates rich artifacts. Use them, don’t duplicate.
Patterns:
- Parse run_results.json to emit per-model metrics (status, execution time, rows affected).
- Emit checks for test results (failures, warnings).
- Track freshness via dbt source freshness output.
Example post-run script:
# dbt_deadpipe_report.py
import json
import os
from pathlib import Path
from deadpipe import DeadpipeClient, CheckSeverity
ARTIFACTS_DIR = Path("target")
ENV = os.getenv("DEADPIPE_ENV", "prod")
PIPELINE = "dbt_core_orders"
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"], env=ENV, pipeline=PIPELINE)
def main():
with client.run(run_id=os.getenv("DBT_RUN_ID", "manual-" + os.urandom(4).hex())) as run:
results = json.loads((ARTIFACTS_DIR / "run_results.json").read_text())
tests = json.loads((ARTIFACTS_DIR / "test_results.json").read_text()) if (ARTIFACTS_DIR / "test_results.json").exists() else {"results": []}
# Emit per-model metrics
for r in results["results"]:
node = r["unique_id"]
status = r["status"]
time_ms = int(r.get("execution_time", 0) * 1000)
rows = r.get("adapter_response", {}).get("rows_affected", 0)
with run.task(node) as t:
t.emit_metric("execution_ms", time_ms)
t.emit_metric("rows_affected", int(rows))
t.check("status_success", status == "success", severity=CheckSeverity.HIGH, message=status)
# Emit test failures
failed_tests = [x for x in tests.get("results", []) if x.get("status") == "fail"]
run.emit_metric("dbt.tests_failed", len(failed_tests))
run.check(
"dbt_tests_all_passed",
passed=(len(failed_tests) == 0),
severity=CheckSeverity.HIGH,
message=f"{len(failed_tests)} tests failed" if failed_tests else "OK"
)
if __name__ == "__main__":
main()
Use in your dbt job:
dbt run --fail-fast
dbt test || true
python dbt_deadpipe_report.py
Tip: For freshness, run dbt source freshness and emit checks from sources.json.
Monitoring Streaming Pipelines Without the Noise (Main Section 5)
Streaming adds continuous timelines, consumer lag, and retries. You need to monitor:
- Consumer lag (messages or time behind).
- Throughput (messages/sec, bytes/sec).
- Error rate and dead-letter queues (DLQ).
- Checkpointing and commit success.
Example Kafka consumer with Deadpipe:
# stream_orders.py
import os
import time
from deadpipe import DeadpipeClient, CheckSeverity
from kafka import KafkaConsumer
client = DeadpipeClient(
api_key=os.environ["DEADPIPE_API_KEY"],
env="prod",
pipeline="orders_stream",
default_labels={"team": "streaming", "env": "prod"}
)
consumer = KafkaConsumer(
"orders",
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP"],
group_id="orders-consumer",
enable_auto_commit=False,
auto_offset_reset="earliest"
)
BATCH_SIZE = 500
POLL_TIMEOUT = 1.0
def process_batch(batch):
# placeholder processing
time.sleep(0.05)
return {"processed": len(batch), "errors": 0, "bytes": sum(len(m.value) for m in batch)}
def main():
# Use a long-lived run that rolls hourly to aggregate metrics
while True:
run_id = time.strftime("stream-%Y%m%d%H")
with client.run(run_id=run_id, description="hourly window") as run:
batch = []
start = time.time()
for message in consumer.poll(timeout_ms=int(POLL_TIMEOUT * 1000)).values():
batch.extend(message)
if not batch:
run.emit_metric("throughput_msgs_per_sec", 0.0)
run.emit_event("heartbeat", {"note": "no messages polled"})
time.sleep(5)
continue
with run.task("consume") as t:
stats = process_batch(batch)
t.emit_metric("batch.size", stats["processed"])
t.emit_metric("batch.bytes", stats["bytes"])
t.emit_metric("duration_ms", int((time.time() - start) * 1000))
t.check("batch_errors_zero", stats["errors"] == 0, severity=CheckSeverity.HIGH)
# Commit offsets
try:
consumer.commit()
t.emit_event("commit_success", {"count": len(batch)})
except Exception as e:
t.emit_event("commit_failed", {"error": str(e)})
t.check("commit_successful", False, severity=CheckSeverity.HIGH)
# Lag metrics via external system (placeholder values)
run.emit_metric("consumer_lag_msgs", 0)
run.emit_metric("throughput_msgs_per_sec", len(batch) / max(time.time() - start, 1))
if __name__ == "__main__":
main()
Monitors for streaming:
monitors:
- name: orders_stream_lag
type: threshold
metric: consumer_lag_msgs
scope:
pipeline: orders_stream
comparator: ">"
threshold: 10000
duration: 5m # sustain lag above threshold for 5 minutes
severity: high
alert_channels: [pagerduty:data-oncall, slack:#streaming-alerts]
- name: orders_stream_throughput_dip
type: anomaly
metric: throughput_msgs_per_sec
scope:
pipeline: orders_stream
method: ewma
sensitivity: high
severity: medium
alert_channels: [slack:#streaming-alerts]
Pitfalls to avoid:
- Duplicated alerts during partition rebalances: use labels that include group_id and partition, and configure alert dedup keys.
- Never tie run_id to a single message; aggregate by time window or offset range to avoid alert spam.
- Ensure clock synchronization on consumers; time-based freshness checks depend on consistent clocks.
Backfills, Retries, and Idempotency Without Alert Spam (Main Section 6)
Backfills and retries are where monitoring gets noisy. The goals:
- Don’t alert on expected backfill delays.
- Deduplicate alerts across retries.
- Attribute metrics to the correct logical window (partition) of data.
Techniques:
- Partition labels: Add partition_date or window_start labels to runs and tasks.
- Idempotency keys: Use a composite key (pipeline, partition, attempt) so the platform can collapse duplicates.
- Backfill mode: Set a run label backfill=true so freshness monitors apply different thresholds or suppress paging.
Example:
with client.run(
run_id=f"orders-{partition_date}",
labels={"partition_date": str(partition_date), "backfill": "true"}
) as run:
with run.task("extract", idempotency_key=f"extract-{partition_date}-{attempt}") as t:
...
Monitor overrides:
monitors:
- name: orders_freshness_sla_backfill
type: freshness
target:
table: sqlite:///orders.db#fact_orders
timestamp_column: order_ts
threshold: 72h
when:
labels:
backfill: "true"
alert_channels: [slack:#data-alerts]
severity: low
Retries:
- Use task-level retries in your orchestrator, but only emit a final task status per attempt after exhausted retries.
- If your SDK supports it, call task.retry() to increment counters without creating new entities.
Data Contracts and Schema Drift (Main Section 7)
Data contracts codify schema, types, and semantics. They prevent accidental drift from upstream systems.
Contract definition (YAML):
# contracts/orders_contract.yaml
name: orders_contract
tables:
- table: fact_orders
columns:
- name: order_id
type: integer
constraints:
- not_null
- unique
- name: customer_id
type: string
constraints:
- not_null
- name: amount
type: number
constraints:
- not_null
- min: 0
- max: 50000
- name: order_ts
type: timestamp
constraints:
- not_null
- name: order_date
type: date
checks:
- name: order_id_monotonic
sql: "SELECT CASE WHEN MAX(order_id) >= MIN(order_id) THEN 1 ELSE 0 END AS ok FROM fact_orders"
expect: 1
Apply and enforce:
deadpipe contracts apply contracts/orders_contract.yaml --env prod
The platform runs checks on schedule or post-load, alerts on violations, and provides drift diffs (added/removed columns, type changes).
Quick SQL checks for SQLite:
-- Count of nulls in key columns
SELECT
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) AS null_order_id,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) AS null_customer_id
FROM fact_orders;
Pitfalls:
- Type inference differs across tools; a BigQuery NUMERIC may map to string in dbt artifacts. Normalize types at the contract level.
- If upstream teams change semantics (e.g., order_ts timezone), schema may not change. Add semantic checks (e.g., max order_ts not in future, timezone normalization).
Alert Hygiene and Routing That Scales (Main Section 8)
Bad alerting is worse than no alerting. Keep alert hygiene by:
- Severity tiers: high (page), medium (notify), low (digest).
- Owner-based routing: map labels to teams and channels.
- Grouping and deduplication: one alert per incident, even if multiple tasks fail.
- Quiet hours and maintenance windows: suppress alerts during known changes.
Example alert routing rules:
routing:
- match:
labels:
team: data-eng
env: prod
channels:
page: pagerduty:data-oncall
notify:
- slack:#data-alerts
- email:data-team@company.com
- match:
labels:
team: analytics
channels:
notify:
- slack:#analytics-eng
Incident grouping:
- Group by pipeline + partition_date for batch.
- Group by pipeline + consumer_group for streams.
Runbook template (include with monitors):
- Summary: What failed, when, severity, owner.
- Impact: Which tables/models/SLAs.
- Quick checks: Upstream availability, permissions, last schema change.
- Mitigation: Retry steps, backfill procedure, escalate path.
- Rollback: How to revert if a partial load occurred.
Cost and Efficiency Monitoring (Main Section 9)
Data monitoring without cost awareness invites surprises. Track:
- Bytes scanned per query (Snowflake: BYTES_SCANNED; BigQuery: totalBytesProcessed).
- Rows processed and rows written.
- Warehouse credits or compute hours per run.
- Storage growth for hot tables and partitions.
How to collect:
- Emit metrics from your query layer: most warehouses expose this in job metadata.
- For file-based pipelines, emit file sizes and counts.
- For Spark, emit executor CPU time and shuffle read/write.
Example Snowflake Python extraction:
# after executing query via snowflake-connector-python
query_id = cursor.sfqid
cursor2 = cnx.cursor().execute(f"SELECT bytes_scanned FROM table(information_schema.query_history_by_session()) WHERE query_id='{query_id}'")
bytes_scanned = cursor2.fetchone()[0]
t.emit_metric("sf.bytes_scanned", int(bytes_scanned))
Cost monitors:
monitors:
- name: orders_cost_spike
type: anomaly
metric: sf.bytes_scanned
scope:
pipeline: orders_etl
task: extract
method: median_abs_dev
severity: medium
alert_channels: [slack:#data-cost]
Best practices:
- Prefer incremental models to limit scan size; monitor that bytes scanned trend down over time.
- Add “dry run” or EXPLAIN checks in CI to estimate query cost before merging.
- Use partition pruning; monitor “pruning effectiveness” as percent of partitions scanned.
Environments, Secrets, and CI/CD (Main Section 10)
Keep monitoring config versioned and environments isolated.
- Environments: dev, staging, prod. Use env labels for routing and dashboards.
- Secrets: Store DEADPIPE_API_KEY in your secret manager (Vault, AWS Secrets Manager), not in code.
- IaC: Commit monitors.yaml and contracts to your repo; apply via CI on merge.
- Promotion: Test monitors in staging with shadow alerts; promote to prod when stable.
Example GitHub Actions snippet:
name: Apply Deadpipe Monitors
on:
push:
branches: [ main ]
paths:
- monitors.yaml
- contracts/**
jobs:
apply:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install deadpipe-sdk
- run: deadpipe monitors apply monitors.yaml --env prod
env:
DEADPIPE_API_KEY: ${{ secrets.DEADPIPE_API_KEY_PROD }}
CI data tests:
- Run a lightweight ETL on sample data in CI; validate checks and monitors evaluate without contacting prod systems.
- Use a special env “ci” to segregate runs.
Troubleshooting and Common Pitfalls (Main Section 11)
Here are issues you’ll likely encounter and quick fixes.
- Missing API key or wrong env: You see local success but nothing in UI.
- Fix: Ensure DEADPIPE_API_KEY is set and correct; call client.health_check() if available.
- Clock skew causes freshness false negatives:
- Fix: Use server-side evaluation for freshness when possible; NTP-sync hosts.
- Duplicate runs due to retries:
- Fix: Use stable run_id tied to schedule window; include attempt_id in metadata, not in run_id; configure dedup keys.
- High alert noise on first week:
- Fix: Start with notify-only channels; tune thresholds, increase evaluation windows, and add runbooks before enabling paging.
- Large payloads slow instrumentation:
- Fix: Disable verbose payloads; limit metric cardinality; batch emissions (the SDK should batch).
- Pandas dtype mismatch in schema checks:
- Fix: Normalize types; cast columns before checking; pin pandas version for consistency.
- Airflow sensor tasks create long-running “tasks” that skew duration monitors:
- Fix: Tag sensors and exclude from duration SLOs; or split monitors per task group.
Debug patterns:
- Emit a “heartbeat” event at the start of each run to verify connectivity.
- Search UI for labels you expect (owner, team, env) to confirm metadata is propagating.
- Build a dashboard of “last successful run per pipeline” to spot gaps.
Extending to Other Stacks (Main Section 12)
- Spark: Wrap stages as tasks; emit metrics like inputRecords, outputRecords, executorCpuTime from SparkListener or Spark UI REST API.
- Fivetran/Stitch: Use their webhooks to ingest sync start/stop into Deadpipe as events; augment with warehouse checks for row counts and latency.
- Kafka Connect: Monitor connector task states; map to Deadpipe tasks and emit lag per topic/partition.
- Flink: Use metrics reporters to push operator-level metrics; aggregate per job in Deadpipe.
Example Spark instrumentation snippet:
from deadpipe import DeadpipeClient
from pyspark.sql import SparkSession
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"], env="prod", pipeline="spark_orders")
spark = SparkSession.builder.appName("spark_orders").getOrCreate()
with client.run(run_id=time.strftime("%Y%m%d")) as run:
with run.task("read_parquet") as t:
df = spark.read.parquet("s3://bucket/orders/2026-01-05/")
t.emit_metric("rows_read", df.count())
with run.task("aggregate") as t:
agg = df.groupBy("customer_id").sum("amount")
t.emit_metric("rows_out", agg.count())
with run.task("write") as t:
agg.write.mode("overwrite").parquet("s3://bucket/orders_agg/2026-01-05/")
t.check("write_success", True)
Observability Dashboards That Matter (Main Section 13)
Build dashboards that answer the on-call questions:
- Is data fresh? Show last successful run time and freshness lag per critical table.
- What failed recently? List incidents grouped by pipeline with severity and time-to-resolve.
- Are we meeting SLOs? Trend duration p95, failure ratio, and test pass rate.
- What’s the cost trend? Bytes scanned and compute hours per pipeline.
Key charts:
- Line: run.total_rows over two months with anomaly bands.
- Bar: task duration breakdown per run (extract vs transform vs load).
- Heatmap: freshness lag per hour of day (to identify schedule contention).
- Table: top 10 monitors by alert count (to prioritize tuning).
Playbook: From Zero to Monitored in One Afternoon (Main Section 14)
- Inventory your top 3 pipelines by business impact.
- For one pipeline, install Deadpipe SDK and wrap runs and tasks as shown.
- Emit minimum viable metrics and checks: duration, rows, null_rate on key columns, and one business rule.
- Add a freshness monitor and a duration SLO.
- Route alerts to a dev channel only; include a runbook link.
- Run the pipeline twice; verify metrics, checks, monitors evaluate.
- Tune thresholds; add one anomaly monitor for rowcount or cost.
- Roll out to the next two pipelines; templatize wrappers.
- Extend to dbt models with artifact parsing.
- Schedule a weekly 30-minute review: alert counts, false positives, and backfill noise; iterate.
Frequently Asked Questions (Main Section 15)
-
Do I need Deadpipe if I already have Airflow and dbt tests?
- Airflow tells you if tasks ran; dbt tests assert quality within dbt. A unifying layer correlates runs, freshness, quality, and cost across tools and surfaces actionable alerts with routing and deduplication.
-
How do I avoid vendor lock-in?
- Keep instrumentation thin and based on generic concepts (run, task, metric, check). Store config in YAML in your repo. If you switch, adapters can translate these concepts.
-
What about PII and sensitive data?
- Emit metrics and aggregates, not raw data. Redact values in events; use allowlists for metadata keys.
-
How do I handle partial loads?
- Emit an event “partial_load” and a check “completeness” tied to expected partition counts. Suppress freshness paging until completeness passes.
Conclusion
Monitoring done right is part design, part habit, and part tooling. If you define runs, tasks, metrics, events, and checks up front, wire SLAs/SLOs into code and config, and adopt alert hygiene, you’ll turn late-night firefights into predictable, fixable incidents—and earn trust with stakeholders. Start with a single pipeline, instrument minimally, verify often, and expand. Whether you run nightly batch ETL or real-time streams, the patterns in this guide and the Deadpipe SDK will help you get from zero to monitored, simply.
Appendix: Quick Reference
- Minimum viable metrics per task: duration_ms, rows_in/out, bytes_processed.
- Core monitors to start:
- Freshness SLA on top 5 consumer tables.
- Duration SLO p95 for critical ETLs.
- Rowcount anomaly per partition/day.
- dbt test summary check.
- Alert hygiene:
- Severity mapping and owner labels.
- One runbook per monitor.
- Dedup by pipeline + partition.
- Backfills:
- Label backfill=true, relax freshness thresholds.
- Use partition-aware run_ids and idempotency keys.
Related Articles
Enjoyed this article?
Share it with your team or try Deadpipe free.