Fix Pipeline Failures with Deadpipe Monitoring
Fix Pipeline Failures with Deadpipe Monitoring
Introduction
Pipelines fail silently more often than they explode loudly. A job finishes "successfully" but writes zero rows. A backfill runs with an empty upstream partition. A streaming checkpoint advances, yet no events arrive for hours. These are the worst kinds of failures for data engineering teams: undetected gaps that sneak into dashboards, machine learning features, invoices, and ops decisions. This guide shows you how to fix pipeline failures with Deadpipe monitoring, so your data flows become predictable, observable, and resilient.
In this practical guide, you will learn how to instrument your pipelines for liveness and correctness, define clear service level objectives, and set up actionable alerts with Deadpipe. We will cover batch and streaming patterns, heartbeats, row-level checks, schema drift detection, and anomaly baselines. You will get copy-paste ready code examples using the Deadpipe SDK, plus a step-by-step setup process you can roll into Airflow, dbt, Spark, Kafka, and cloud warehouse jobs.
If you have ever pushed a fix at midnight because a downstream model quietly degraded, or spent a morning reconciling an empty table after a "green" DAG, this guide is for you. We will tie monitoring to your engineering workflows so you detect and remediate issues faster, with fewer false positives. Along the way, we will reference related resources on ETL observability, tooling, and cost-effective approaches, including: ETL Data Engineering and Pipeline Monitoring, Simply, Data Pipeline Monitoring Tools: Top 5 ETL, and Affordable Data Engineering & Pipeline Monitoring.
By the end, you will have a working setup that:
- Detects "deadpipe" conditions (no data when data is expected)
- Validates core quality checks before data is marked "ready"
- Alerts the right channel with the right context and runbook
- Tracks SLOs over time and reduces mean time to detect and recover
This is a hands-on engineering guide. Expect shell commands, Python snippets, SQL assertions, YAML monitors, and an explanation of common errors and how to fix them. Let’s turn your pipeline failures into known, monitored, and quickly resolved events with Deadpipe monitoring.
Background and Context
Why does this matter now? Pipelines have multiplied across teams and tools. Most data engineering stacks include a scheduler or orchestrator, a warehouse or lakehouse, ELT frameworks like dbt, streaming components like Kafka or Flink, and a handful of cloud services. Each part has logs and metrics, but the signal you care about is end-to-end: did the data arrive, is it complete and fresh, and can downstream consumers trust it?
The failure mode that stings the most is the quiet one. Engineers often build robust retry logic, idempotent writes, and id-aware processing. Jobs succeed; SLAs pass; dashboards stay green. But when source systems change, schemas drift, time windows shift, or upstream filters accidentally exclude all data, the pipeline is technically alive while the data path is functionally dead. That deadpipe condition is the gap between "system health" and "data health."
Traditional monitoring focuses on compute metrics: CPU, memory, container up/down, DAG task success, runtime. These are necessary but insufficient. If you observe only infrastructure health, you will miss data semantics like "we processed 0 of the expected 1.2 million events," or "the incremental watermark did not advance," or "two critical dimensions vanished from the schema." Real monitoring must attach to the data itself and the logical outcome of pipeline runs.
Teams typically attempt a patchwork of homegrown checks: ad hoc row counts, manual data spot checks, periodic cron queries. The problem is that these are noisy, brittle, and divorced from changes in load, seasonality, or backfills. Without baselines and explicit SLOs, you cannot reason about normal versus anomalous variance. Without liveness pings and end-to-end lineage awareness, you cannot correlate upstream disruptions to downstream emptiness.
Deadpipe monitoring addresses this gap with three pillars:
-
Liveness: A heartbeat or check-in signal that asserts "this pipeline attempted to produce data within the expected window" with enough context to detect abnormal silence.
-
Freshness and completeness: Freshness measures whether new data arrived when it should. Completeness checks assess whether the delivered volume falls within expected bounds. Together they surface partial loads, backfills, and schedule slips.
-
Quality and semantics: Assertions about schema, uniqueness, nullability, referential integrity, and distributional anomalies. These checks ensure "good enough" data, not merely data present.
Deadpipe wraps these concepts with a developer-friendly SDK, a declarative monitor configuration, and AI-powered baselines that cut noise. It integrates with the tools teams already use, so you can monitor pipelines where they run: Airflow DAGs, dbt models, Spark jobs, Flink apps, and warehouse SQL. For an overview of how AI observability meets pipeline monitoring, see AI Observability in Data Pipelines: What Works and AI Observability in Data Pipelines with Deadpipe.
What about cost and complexity? Monitoring can sprawl, both in operational cost and developer time. Deadpipe emphasizes affordable, incremental adoption. Start with one or two critical pipelines, define concrete SLOs, and grow coverage. If budget is a concern, explore AI Observability: Cost-Effective Pipeline Monitoring and Affordable Data Engineering & Pipeline Monitoring for strategies to control spend without sacrificing reliability.
Finally, remember that monitoring is a means to an end. The end is trust. When data practitioners and stakeholders trust the pipeline, they ship models and dashboards faster, and the business acts on data with confidence. Deadpipe helps you earn that trust by catching failures early, contextualizing alerts, and tracking improvement over time.
Main Content Section 1: Instrumenting Pipelines for Liveness, Freshness, and Completeness
The first step to fix pipeline failures is to instrument your code so Deadpipe can observe it. You do not need to refactor your entire stack. Start with lightweight signals that we call "liveness" and augment them with freshness and completeness calculations.
Liveness heartbeats for batch jobs
A batch pipeline should emit a heartbeat at the beginning of a run and record success or failure at the end. The heartbeat binds a run to a logical data interval (for example, daily partition 2025-01-15) and a unique run id. Deadpipe correlates these events to detect deadpipe conditions: no heartbeat within the schedule window, or a heartbeat without completion.
Copy-paste ready Python example for Airflow or any batch runner:
# requirements.txt
# deadpipe>=0.7.0
import os
import uuid
from datetime import datetime, timezone
from deadpipe import DeadpipeClient, Severity
DEADPIPE_API_KEY = os.environ["DEADPIPE_API_KEY"]
PIPELINE_ID = "orders_daily_ingest"
ENV = os.getenv("ENV", "prod")
client = DeadpipeClient(api_key=DEADPIPE_API_KEY, env=ENV)
def run_orders_daily(execution_date: datetime):
run_id = f"{PIPELINE_ID}:{execution_date.strftime('%Y-%m-%d')}:{uuid.uuid4()}"
partition = execution_date.strftime("%Y-%m-%d")
# Start heartbeat: liveness ping
client.heartbeat(
pipeline_id=PIPELINE_ID,
run_id=run_id,
partition=partition,
schedule="daily",
message="Starting daily ingest"
)
try:
# Perform work...
rows_written = load_orders_for_partition(partition)
watermark = partition # e.g., last_processed_date
# Mark completion with outcome and metrics
client.complete(
pipeline_id=PIPELINE_ID,
run_id=run_id,
partition=partition,
status="success",
metrics={
"rows_written": rows_written,
"last_watermark": watermark
}
)
return rows_written
except Exception as e:
# Signal failure for visibility
client.complete(
pipeline_id=PIPELINE_ID,
run_id=run_id,
partition=partition,
status="failure",
severity=Severity.CRITICAL,
error=str(e)
)
raise
def load_orders_for_partition(partition: str) -> int:
# ... your ingestion logic ...
# return the number of rows written for monitoring
return 123456
In Airflow, you can wrap this inside a PythonOperator, passing ds as the partition. The important parts are:
- Emit
heartbeatas early as possible in the run. - Emit
completealways, even on errors (inside a finally or except). - Include metrics like row counts, processed bytes, and watermarks.
Tip: Use a stable pipeline_id. Include environment as a tag or let the client derive it from API key/workspace.
Heartbeats for streaming pipelines
Streaming jobs require a different liveness pattern: continuous check-ins based on event flow rather than scheduled runs. You can:
- Emit a heartbeat every N seconds/minutes when the job is alive and processing.
- Attach a moving window metric such as events processed in the last 5 minutes.
- Include consumer group lag or watermark to detect stuck reads.
Kafka consumer example with Deadpipe:
from deadpipe import DeadpipeClient
from kafka import KafkaConsumer
from time import time, sleep
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"])
PIPELINE_ID = "payments_stream"
HEARTBEAT_INTERVAL_SEC = 60
last_hb = 0
events_in_window = 0
consumer = KafkaConsumer(
"payments",
bootstrap_servers=["kafka:9092"],
group_id="payments-stream-consumer",
enable_auto_commit=True,
)
for msg in consumer:
process_message(msg)
events_in_window += 1
now = time()
if now - last_hb >= HEARTBEAT_INTERVAL_SEC:
client.heartbeat(
pipeline_id=PIPELINE_ID,
run_id="stream", # long-lived
schedule="streaming",
message="Streaming heartbeat",
metrics={"events_last_minute": events_in_window, "lag": get_consumer_lag()}
)
events_in_window = 0
last_hb = now
This light-touch approach detects deadpipe conditions like:
- Zero events_last_minute for X intervals (while job is up).
- Lag increasing beyond threshold, indicating stuck or slow downstream.
- Watermark not advancing (e.g., event-time stalls).
Freshness monitoring
Freshness asks: how stale is the data compared to now or compared to the source’s provided time? For batch jobs, compute a freshness lag at the end of the run. For streaming, use moving windows.
SQL example to compute last partition date and lag in Snowflake:
-- Compute freshness lag in minutes
WITH last AS (
SELECT MAX(partition_date) AS max_partition
FROM analytics.orders_staging
)
SELECT DATEDIFF('minute', max_partition, CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())) AS freshness_lag_minutes
FROM last;
Report this metric to Deadpipe:
lag = run_query_and_fetch_int("SELECT ... freshness query ...")
client.metric(
pipeline_id="orders_daily_ingest",
run_id=run_id,
name="freshness_lag_minutes",
value=lag,
tags={"table": "analytics.orders_staging"}
)
Practical thresholds:
- Daily ingestion: lag should be < 36 hours (accounts for retries).
- Hourly ingestion: lag < 3 hours.
- Streaming: event_time lag < 5 minutes for near-real-time.
Deadpipe can baseline lag per hour-of-day and day-of-week to reduce false alarms on known slow windows.
Completeness monitoring
Completeness measures whether the volume landed as expected. You can implement:
- Absolute minimums: e.g., at least 500k rows on weekdays.
- Relative baselines: within ±20% of the trailing 14-day median for same weekday.
- Ratio checks: fact rows vs dimension changes; new vs updated records.
Example: assert daily row count within range before marking a partition “ready.”
-- Warehouse-agnostic pseudo SQL
WITH todays AS (
SELECT COUNT(*) AS c FROM analytics.orders_staging WHERE partition_date = :partition
),
baseline AS (
SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cnt) AS median_cnt
FROM (
SELECT COUNT(*) AS cnt
FROM analytics.orders_staging
WHERE partition_date >= DATEADD('day', -28, :partition)
AND partition_date < :partition
AND DAYOFWEEK(partition_date) = DAYOFWEEK(:partition)
GROUP BY partition_date
)
)
SELECT t.c AS todays_count, b.median_cnt
FROM todays t CROSS JOIN baseline b;
In Python:
todays_count, median_cnt = fetch_counts(partition)
lower = int(median_cnt * 0.8)
upper = int(median_cnt * 1.2)
client.metric(
pipeline_id=PIPELINE_ID,
run_id=run_id,
name="rows_written",
value=todays_count,
tags={"partition": partition}
)
if todays_count < lower or todays_count > upper:
client.alert(
pipeline_id=PIPELINE_ID,
run_id=run_id,
title="Completeness anomaly",
severity="warning",
message=f"Count {todays_count} outside [{lower}, {upper}] baseline",
tags={"partition": partition}
)
Guardrail pattern: fail-fast on zero rows unless explicitly allowed (e.g., holidays):
if todays_count == 0 and not is_zero_expected(partition):
raise RuntimeError(f"Zero rows for {partition} is not allowed")
Watermark checks and idempotency
Incremental jobs often rely on watermarks (max timestamp, max id). Monitor:
- Watermark monotonicity: it should not regress.
- Advancement delta: it should advance by at least N units (e.g., minutes) per run.
- Rollover windows: allow catch-up after outages but alert on prolonged stagnation.
Example check:
prev = client.get_state(f"{PIPELINE_ID}:last_watermark") # "2025-01-14"
current = watermark # "2025-01-15"
if current <= prev:
client.alert(
pipeline_id=PIPELINE_ID,
run_id=run_id,
severity="critical",
title="Watermark did not advance",
message=f"prev={prev}, current={current}"
)
else:
client.set_state(f"{PIPELINE_ID}:last_watermark", current)
Idempotency hint: When reprocessing, allow same watermark with “replay” tag to avoid noise. Tag runs with context={"mode": "backfill"} so Deadpipe can apply different baselines.
Emitting structured context
Every heartbeat and completion should carry context so alerts are actionable:
- Source identifiers (API name, table name, topic)
- Destination identifiers (schema.table, storage path)
- Orchestrator metadata (Airflow DAG/task, dbt model name/selector)
- Deployment version (git SHA, container image)
- Runbook URL
Example:
client.heartbeat(
pipeline_id=PIPELINE_ID,
run_id=run_id,
partition=partition,
schedule="daily",
message="Starting load",
context={
"source": "shopify_orders_api",
"destination": "analytics.orders_staging",
"git_sha": os.getenv("GIT_SHA"),
"runbook": "https://runbooks.company.com/orders_daily_ingest"
}
)
Main Content Section 2: Defining SLOs and Alerting That Matters
Monitoring without goals generates noise. Service Level Objectives (SLOs) define acceptable performance for your pipelines. Deadpipe helps you track Service Level Indicators (SLIs) and compute error budgets.
Core SLIs for pipelines
- Freshness SLI: fraction of time freshness_lag_minutes < threshold.
- Completeness SLI: fraction of runs where row_count within baseline band.
- Liveness SLI: fraction of expected schedule slots with heartbeat+completion.
- Quality SLI: fraction of runs where critical assertions pass.
Define them per pipeline and aggregate for domains (e.g., finance, growth).
YAML example (deadpipe.monitors.yaml):
pipelines:
- id: orders_daily_ingest
description: Daily orders ingest from Shopify to analytics.orders_staging
schedule: daily
owners:
- team: data-platform
pagerduty_service: dp-orders
slack: "#data-alerts"
slos:
freshness_minutes_p95: 1440
completeness_band_pct: 0.2
liveness_success_rate: 0.99
quality_pass_rate: 0.995
alerts:
channels:
- type: slack
target: "#data-alerts"
- type: pagerduty
target: "dp-orders"
rules:
- name: deadpipe-miss
condition: "no_heartbeat within schedule_window"
severity: critical
message: "No heartbeat for {pipeline_id} {partition}"
- name: zero-rows
condition: "rows_written == 0 and not allow_zero"
severity: critical
message: "Zero rows for {partition}"
- name: freshness-breach
condition: "freshness_lag_minutes > 1440"
severity: warning
message: "Freshness lag {freshness_lag_minutes}m exceeds 1440m"
Deadpipe’s UI lets you fine-tune: maintenance windows, holiday calendars, and alert suppression when upstream has a known outage.
Alert routing and deduplication
Actionable alerts:
- Deduplicate per partition/run: avoid flooding Slack with every check failure.
- Group related failures: liveness + zero rows + watermark stagnation belong in one incident.
- Provide runbook and recent change log: link to last commit, deployment timestamp.
- Include suggested remediation: retries, backfill command.
Example runbook snippet included in alert:
- Validate source availability: curl the API health.
- Check upstream partitions: SELECT COUNT(*) FROM upstream WHERE partition = :p.
- Manual backfill command:
- Airflow: airflow dags backfill orders_daily -s 2025-01-15 -e 2025-01-15
- dbt: dbt run --select orders_staging --vars 'partition: 2025-01-15'
Error budgets for data pipelines
Error budgets apply to data too. If your liveness SLO is 99%, you can miss ~7 hours/month for hourly jobs. Track budget burn:
- Rapid burn: multiple consecutive misses triggers higher severity.
- Slow burn: occasional anomalies can be tolerated; focus on impactful pipelines.
Deadpipe can surface budget burn-down charts by SLI over time, helping prioritize engineering work (e.g., improving retry logic vs optimizing completeness baselines).
Main Content Section 3: Quality and Semantics Checks
Completeness is not enough. Bad data can flow "successfully" and harm consumers. Add quality checks that gate downstream exposure.
Schema drift detection
Automatically flag added/removed/changed columns. Two approaches:
- Producer-side schema registry (Avro/Protobuf) with Deadpipe bridge.
- Consumer-side introspection: snapshot information_schema and diff.
SQL example (BigQuery) to snapshot schema:
CREATE OR REPLACE TABLE monitoring.schemas_orders AS
SELECT column_name, data_type, is_nullable
FROM `project.dataset`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'orders_staging';
Diff and alert:
WITH current AS (
SELECT column_name, data_type, is_nullable
FROM `project.dataset`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'orders_staging'
),
baseline AS (
SELECT column_name, data_type, is_nullable
FROM `project.dataset.monitoring`.schemas_orders
)
SELECT
'added' AS change_type, c.column_name
FROM current c LEFT JOIN baseline b USING (column_name)
WHERE b.column_name IS NULL
UNION ALL
SELECT
'removed' AS change_type, b.column_name
FROM baseline b LEFT JOIN current c USING (column_name)
WHERE c.column_name IS NULL
UNION ALL
SELECT
'type_changed', c.column_name
FROM current c JOIN baseline b USING (column_name)
WHERE c.data_type != b.data_type OR c.is_nullable != b.is_nullable;
Wire this into Deadpipe by posting a "schema_change" event; optionally block deployment if prohibited changes are detected.
Row-level assertions
- Primary key uniqueness:
- Assert COUNT(*) == COUNT(DISTINCT id).
- Nullability:
- Critical columns must be non-null for new partitions.
- Referential integrity:
- Foreign keys must match dictionary/dimension tables.
- Distribution checks:
- Value ranges (amount >= 0), categorical domain checks (status IN (...)).
dbt example test that also reports to Deadpipe:
-- tests/unique_orders_id.sql
SELECT id
FROM {{ ref('orders_staging') }}
GROUP BY id
HAVING COUNT(*) > 1
In dbt’s on-run-end hook:
# dbt_project.yml
on-run-end:
- "{{ deadpipe_report_results(results) }}"
Macro:
{% macro deadpipe_report_results(results) %}
{% for r in results %}
{% if r.status != 'success' %}
{{ log("Deadpipe alert for " ~ r.node.name, info=True) }}
{% do run_query(
"select deadpipe.alert(:pipeline_id, :title, :severity, :message)",
{
"pipeline_id": "orders_transform",
"title": "dbt test failed: " ~ r.node.name,
"severity": "critical",
"message": r.message
}
) %}
{% endif %}
{% endfor %}
{% endmacro %}
Alternatively, use the Deadpipe Python client in a post-run script to collect dbt test results JSON and push structured alerts.
Anomaly detection and baselines
Beyond static thresholds, use Deadpipe’s baselines to learn normal:
- Seasonality-aware: weekday/weekend patterns.
- Volume cliffs: e.g., month-end spikes.
- Source-driven variation: promotions causing demand surges.
Practical tips:
- Require a warm-up period (e.g., 14–21 days) before enabling high-sensitivity alerts.
- Combine baselines with minimum floors to catch cataclysmic zeros.
- Tag backfills to exclude from baseline training windows.
Contract tests with producers
If you own both producer and consumer, implement data contracts:
- JSON schema for event payloads with semantic versioning.
- Contracted SLA for delivery times and completeness.
- Pre-production validation in CI: publish sample events to a staging topic/table and validate with Deadpipe checks.
Main Content Section 4: Integrations and Implementation Patterns
Deadpipe meets you where you work. Here’s how to integrate with popular tools.
Airflow integration
A typical DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from deadpipe import DeadpipeClient
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"])
default_args = {"retries": 1, "retry_delay": timedelta(minutes=15)}
with DAG(
dag_id="orders_daily_ingest",
schedule_interval="0 2 * * *",
start_date=datetime(2024, 1, 1),
default_args=default_args,
catchup=True,
max_active_runs=1,
) as dag:
def start(**context):
ds = context["ds"]
run_id = f"orders_daily_ingest:{ds}"
client.heartbeat(pipeline_id="orders_daily_ingest", run_id=run_id, partition=ds, schedule="daily")
def load(**context):
ds = context["ds"]
rows = load_orders_for_partition(ds)
context["ti"].xcom_push(key="rows", value=rows)
def validate_and_complete(**context):
ds = context["ds"]
run_id = f"orders_daily_ingest:{ds}"
rows = context["ti"].xcom_pull(key="rows")
# Fetch baseline and compute anomalies...
client.complete(
pipeline_id="orders_daily_ingest",
run_id=run_id,
partition=ds,
status="success",
metrics={"rows_written": rows}
)
start_task = PythonOperator(task_id="start", python_callable=start, provide_context=True)
load_task = PythonOperator(task_id="load", python_callable=load, provide_context=True)
finish_task = PythonOperator(task_id="finish", python_callable=validate_and_complete, provide_context=True)
start_task >> load_task >> finish_task
Use an Airflow DeadpipeSensor to wait for upstream liveness or quality before proceeding, enabling cross-DAG dependencies on data health, not only task success.
dbt integration
Patterns:
- Inject freshness checks via dbt sources freshness + Deadpipe forwarding.
- Report dbt test failures with context and severity mapping.
- Tag critical models with SLOs; block exposures if quality gate fails.
dbt source freshness post-hook:
sources:
- name: raw
tables:
- name: orders
freshness:
warn_after: {count: 36, period: hour}
error_after: {count: 72, period: hour}
loaded_at_field: _ingested_at
post-hook:
- "python scripts/deadpipe_freshness.py {{ this.name }} '{{ freshness_status }}' {{ freshness_lag }}"
deadpipe_freshness.py:
import sys, os
from deadpipe import DeadpipeClient
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"])
table, status, lag = sys.argv[1], sys.argv[2], int(sys.argv[3])
severity = "warning" if status == "warn" else "critical" if status == "error" else "info"
client.alert(
pipeline_id="dbt_sources",
run_id=os.getenv("DBT_INVOCATION_ID"),
title=f"Freshness {status} for {table}",
severity=severity,
message=f"Lag {lag} minutes",
tags={"table": table}
)
Spark and Databricks
Batch:
from deadpipe import DeadpipeClient
client = DeadpipeClient(api_key=dbutils.secrets.get("monitoring", "deadpipe_api_key"))
partition = dbutils.widgets.get("partition")
run_id = f"orders_spark_batch:{partition}"
client.heartbeat(pipeline_id="orders_spark_batch", run_id=run_id, partition=partition, schedule="daily")
df = spark.read.parquet(f"/mnt/raw/orders/date={partition}")
# Transform...
rows = df.count()
df.write.mode("overwrite").parquet(f"/mnt/stage/orders/date={partition}")
client.complete(
pipeline_id="orders_spark_batch",
run_id=run_id,
partition=partition,
status="success",
metrics={"rows_written": rows}
)
Structured Streaming:
from pyspark.sql.functions import col, window
from deadpipe import DeadpipeClient
import time
client = DeadpipeClient(api_key=dbutils.secrets.get("monitoring", "deadpipe_api_key"))
PIPELINE_ID = "orders_stream_ss"
def report_heartbeat(batch_df, batch_id):
count = batch_df.count()
client.heartbeat(
pipeline_id=PIPELINE_ID,
run_id="stream",
schedule="streaming",
message=f"Batch {batch_id}",
metrics={"events_in_batch": count}
)
df = spark.readStream.format("kafka").option("subscribe", "orders").load()
parsed = parse_orders(df)
query = (
parsed.writeStream
.foreachBatch(lambda df, idx: (df.write.mode("append").saveAsTable("analytics.orders_stream"), report_heartbeat(df, idx)))
.outputMode("append")
.start()
)
while query.isActive:
time.sleep(60)
Kafka/Flink
Flink can emit metrics to Prometheus; bridge them into Deadpipe via exporters, or emit direct heartbeats from operators. Key metrics: records-in/out, checkpoint time, backpressure, consumer lag. Set Deadpipe alert on “records_out_per_minute == 0 for 5 minutes.”
Cloud warehouses
- BigQuery: Use INFORMATION_SCHEMA and SCHEDULED QUERIES to run health checks; call Deadpipe via UDF HTTP or Cloud Functions.
- Snowflake: Tasks and Streams; use TASK stored procedures to compute metrics and call a Deadpipe REST endpoint with
SYSTEM$CURL. - Redshift/Postgres: Cron jobs with SQL scripts; a small Python wrapper posts to Deadpipe.
REST example (generic):
curl -X POST https://api.deadpipe.io/v1/heartbeat \
-H "Authorization: Bearer $DEADPIPE_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"pipeline_id": "orders_daily_ingest",
"run_id": "orders_daily_ingest:2025-01-15",
"partition": "2025-01-15",
"schedule": "daily",
"context": {"destination": "analytics.orders_staging"}
}'
Main Content Section 5: Safe Gating and Promotion to “Ready”
Marking data “ready” should depend on checks passing. Implement a readiness flag:
- Write to staging table.
- Run checks (row count, schema, constraints).
- If pass: upsert to production table or flip a view alias.
- If fail: keep staging isolated; emit alert; do not update production.
Example with SQL view swap (Snowflake):
-- After loading analytics.orders_staging
-- Run checks and compute a boolean "is_ready"
-- If ready, atomically swap:
ALTER VIEW analytics.orders_current SET SECURE = TRUE; -- ensure secure
ALTER TABLE analytics.orders RENAME TO analytics.orders_prev;
ALTER TABLE analytics.orders_staging RENAME TO analytics.orders;
DROP TABLE analytics.orders_prev;
Or better, use a pointer view:
- View
orders_currentselects from latest partition withready = true. - Loader sets
ready = trueonly when checks pass. - Consumers always query
orders_current.
Deadpipe can mark a partition’s readiness status and expose it via API/UI, so other jobs can wait_until_ready.
Main Content Section 6: Step-by-Step Setup
- Pick a critical pipeline.
- Identify owners, consumers, and current pain points.
- Document schedule, partitions, and expected volumes.
- Enable liveness.
- Add heartbeat at run start.
- Add completion with status and metrics.
- Test locally with a dry run.
- Add freshness and completeness.
- Compute freshness lag and row counts.
- Set initial static thresholds (e.g., lag < 36h, rows > 0).
- Train baselines (14–21 days) and enable ±20% band.
- Wire alerts.
- Route to a low-noise channel first (#data-alerts-dev).
- Configure deduplication and escalation.
- Add quality gates.
- Unique keys, not-null assertions, domain checks.
- Schema drift detection.
- Introduce SLOs.
- Set initial objectives and track budget.
- Publish SLOs in documentation/readme.
- Expand coverage.
- Tackle the next highest-impact pipeline.
- Standardize patterns via templates and libraries.
- Build runbooks.
- For each alert type, write a 5–10 step runbook with commands/queries.
- Store in version control and link in alerts.
- Review and iterate.
- Monthly postmortems: adjust baselines, thresholds, and processes.
- Track MTTR/MTTD trends.
Main Content Section 7: Practical Examples and Use Cases
Example 1: Marketing attribution backfill
Problem: Backfill for October ran “successfully” but wrote 0 rows due to an upstream filter on US-only traffic.
Deadpipe setup:
- Heartbeat for each partition during backfill with context mode=backfill.
- Completeness baseline disabled; static floor >0 enforced.
- Alert fired: zero rows detected for 2024-10-15.
- Runbook: verify source filters; rerun with corrected WHERE clause.
Outcome: Detected within minutes; single-day rerun, avoided missing MRR attribution.
Example 2: Streaming payments stall
Problem: Payments events stopped at 02:14 UTC due to expired API token. Stream job was “running,” checkpoint advanced, but no messages received.
Deadpipe:
- Heartbeat every minute with events_last_minute metric.
- Alert fired after 5 minutes of zero events; PagerDuty ping.
- Suggested remediation: rotate token via internal script.
Outcome: 12-minute outage, no downstream ledger imbalance.
Example 3: Schema drift on orders table
Problem: Upstream added column discount_code and removed promo_id without notice. Downstream transformation failed late.
Deadpipe:
- Nightly schema snapshot diff detected removal.
- Alert with change type: removed promo_id; severity=warning.
- Runbook: update dbt model; add null-handling; patch transformation.
Outcome: Prevented failed release; applied migration before promotion.
Main Content Section 8: Advanced Patterns
Correlating upstream and downstream
- Emit lineage via tags: upstream_pipeline_ids.
- Deadpipe auto-suppresses downstream alerts if upstream is red within the same window.
- Dashboard shows dependency graph with status propagation.
Multi-tenant pipelines
If you process N tenants/customers:
- Include tenant_id as a tag in metrics and alerts.
- Configure per-tenant baselines; alert on outliers.
- Avoid fan-out alert storms by sampling or grouping by severity.
Handling late-arriving data
- Set allowed lateness per source (e.g., 48 hours).
- Split freshness SLO into two: nearline SLO for 95%, final SLO for 100% within lateness window.
- Use watermark + grace to mark partitions provisional; revalidate when late data lands.
Cost-aware monitoring
- Sample metrics: for very high-frequency streams, report every Nth batch.
- Use aggregated metrics (histograms) instead of per-record logs.
- Push checks down to the warehouse where compute is cheap/fixed.
- Consolidate alerts; avoid per-tenant PagerDuty pages unless critical.
Security and governance
- Store Deadpipe API keys in secret managers.
- Do not leak PII in metrics/alerts; use IDs and references.
- Set RBAC in Deadpipe to control who can mute alerts or change thresholds.
- Audit logs: monitor who acknowledged, muted, or changed monitors.
Main Content Section 9: Troubleshooting and Common Pitfalls
- Missing heartbeats due to early failures:
- Place heartbeat in the earliest executable step.
- Use try/finally to ensure completion event fires.
- Duplicate run_ids:
- Include UUID or orchestrator run_id; ensure idempotent completion updates.
- Timezone mismatches:
- Standardize on UTC for partitions and liveness windows.
- Watch out for DST; schedule by UTC and display local time in UI only.
- Zero rows expected on holidays:
- Maintain a calendar; set allow_zero flag for known dates.
- Baseline hysteresis:
- Avoid flapping by using percentile bands and minimum alert duration.
- Schema drift false positives:
- Ignore benign changes (column ordering); focus on type and nullability changes.
- Streaming noise:
- Use moving average of events per minute; require consecutive breaches before alerting.
- Privilege errors calling Deadpipe from warehouse:
- Test REST connectivity; open egress to api.deadpipe.io.
- Alert fatigue:
- Tag severity correctly; route warnings to Slack, critical to PagerDuty.
- Backfill suppression:
- Tag runs with mode=backfill; Deadpipe can mute certain checks or adjust baselines automatically.
Main Content Section 10: Code Templates You Can Reuse
Python utility wrapper
# monitor_utils.py
import os
from contextlib import contextmanager
from deadpipe import DeadpipeClient
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"], env=os.getenv("ENV", "prod"))
@contextmanager
def monitored_run(pipeline_id: str, partition: str, schedule: str = "daily", context: dict | None = None):
run_id = f"{pipeline_id}:{partition}"
client.heartbeat(pipeline_id=pipeline_id, run_id=run_id, partition=partition, schedule=schedule, context=context or {})
try:
yield run_id
client.complete(pipeline_id=pipeline_id, run_id=run_id, partition=partition, status="success")
except Exception as e:
client.complete(pipeline_id=pipeline_id, run_id=run_id, partition=partition, status="failure", error=str(e))
raise
Usage:
from monitor_utils import monitored_run
with monitored_run("orders_daily_ingest", partition="2025-01-15", context={"destination": "analytics.orders_staging"}) as run_id:
rows = load_orders_for_partition("2025-01-15")
client.metric(pipeline_id="orders_daily_ingest", run_id=run_id, name="rows_written", value=rows)
YAML monitor for streaming
pipelines:
- id: payments_stream
schedule: streaming
slos:
events_per_minute_min: 10
alerts:
rules:
- name: zero-throughput
condition: "events_last_minute == 0 for 5m"
severity: critical
message: "No events in 5 minutes"
- name: rising-lag
condition: "lag > 10000 for 10m"
severity: warning
message: "Consumer lag above 10k"
SQL quality suite (Snowflake)
-- 1. Uniqueness
SELECT 'dup_keys' AS check_name, COUNT(*) AS violations
FROM (
SELECT id FROM analytics.orders_staging WHERE partition_date = :partition GROUP BY id HAVING COUNT(*) > 1
)
HAVING violations > 0;
-- 2. Not null
SELECT 'null_amount' AS check_name, COUNT(*) AS violations
FROM analytics.orders_staging
WHERE partition_date = :partition AND amount IS NULL
HAVING violations > 0;
-- 3. Domain values
SELECT 'invalid_status' AS check_name, COUNT(*) AS violations
FROM analytics.orders_staging
WHERE partition_date = :partition AND status NOT IN ('paid', 'refunded', 'pending')
HAVING violations > 0;
Post results to Deadpipe with a small Python script that parses violations and raises alerts as needed.
Main Content Section 11: Measuring Impact and Continuous Improvement
What gets measured gets improved. Track:
- MTTD: time from failure occurrence to first alert. Goal: minutes, not hours.
- MTTR: time from alert to resolution. Goal: shrink via runbooks and automation.
- False positives rate: alerts that did not require action. Goal: <10%.
- Coverage: percentage of critical pipelines with Deadpipe instrumentation. Goal: 100%.
- SLO adherence: % of time within SLOs; compare month over month.
Run quarterly reviews:
- Identify top 5 recurring incident types; automate detection and remediation.
- Invest in pipeline hardening for the worst offenders.
- Update playbooks and training for on-call rotations.
Main Content Section 12: Frequently Asked Questions
-
How is Deadpipe different from observability tools that scrape logs and metrics?
- Deadpipe attaches to data semantics (freshness, completeness, quality) and pipeline intent (partitions, watermarks). It complements infra monitoring by focusing on outcomes, not just runtime.
-
We already have dbt tests; why do we need Deadpipe?
- dbt tests validate models at run time; Deadpipe adds liveness, cross-tool correlation, baselining, alert routing, and SLO tracking across batch and streaming.
-
Will Deadpipe slow down my pipelines?
- Instrumentation is lightweight. Most checks reuse warehouse compute or streaming metrics. Network calls are minimal and can be batched.
-
How do we handle secrets and compliance?
- Use your cloud provider’s secret store; avoid sending PII in metrics. Deadpipe supports tags and masked fields; store only metadata and counts.
-
Can Deadpipe trigger auto-remediation?
- Yes. Hook alerts to orchestrator APIs to retry, pause, or run backfills; or trigger a script that rotates keys or restarts consumers.
Conclusion
Silent failures are the enemy of trust. Liveness without completeness is a false comfort; completeness without quality is risky. Deadpipe monitoring gives you a cohesive, developer-friendly way to instrument pipelines for liveness, freshness, completeness, and quality, with baselines and SLOs that reduce noise and accelerate response.
Start small: one pipeline, one SLO, one alert. Emit heartbeats, measure freshness, assert non-zero rows, and gate readiness. Then add schema drift detection, row-level checks, and lineage-aware suppression. Integrate with your orchestrators, ELT, streaming, and warehouses. Build runbooks, route alerts with context, and watch your MTTD/MTTR fall while stakeholder confidence rises.
When your pipelines are observed for what matters—the data itself—you convert unknown failures into known events, and known events into quick fixes. That is the path to predictable, observable, resilient data flows with Deadpipe monitoring.
Related Articles
Enjoyed this article?
Share it with your team or try Deadpipe free.