Back to Blog
tutorial

Fix Pipeline Failures with Deadpipe Monitoring

January 5, 202625 min read

Fix Pipeline Failures with Deadpipe Monitoring

Introduction

Pipelines fail silently more often than they explode loudly. A job finishes "successfully" but writes zero rows. A backfill runs with an empty upstream partition. A streaming checkpoint advances, yet no events arrive for hours. These are the worst kinds of failures for data engineering teams: undetected gaps that sneak into dashboards, machine learning features, invoices, and ops decisions. This guide shows you how to fix pipeline failures with Deadpipe monitoring, so your data flows become predictable, observable, and resilient.

In this practical guide, you will learn how to instrument your pipelines for liveness and correctness, define clear service level objectives, and set up actionable alerts with Deadpipe. We will cover batch and streaming patterns, heartbeats, row-level checks, schema drift detection, and anomaly baselines. You will get copy-paste ready code examples using the Deadpipe SDK, plus a step-by-step setup process you can roll into Airflow, dbt, Spark, Kafka, and cloud warehouse jobs.

If you have ever pushed a fix at midnight because a downstream model quietly degraded, or spent a morning reconciling an empty table after a "green" DAG, this guide is for you. We will tie monitoring to your engineering workflows so you detect and remediate issues faster, with fewer false positives. Along the way, we will reference related resources on ETL observability, tooling, and cost-effective approaches, including: ETL Data Engineering and Pipeline Monitoring, Simply, Data Pipeline Monitoring Tools: Top 5 ETL, and Affordable Data Engineering & Pipeline Monitoring.

By the end, you will have a working setup that:

  • Detects "deadpipe" conditions (no data when data is expected)
  • Validates core quality checks before data is marked "ready"
  • Alerts the right channel with the right context and runbook
  • Tracks SLOs over time and reduces mean time to detect and recover

This is a hands-on engineering guide. Expect shell commands, Python snippets, SQL assertions, YAML monitors, and an explanation of common errors and how to fix them. Let’s turn your pipeline failures into known, monitored, and quickly resolved events with Deadpipe monitoring.

Background and Context

Why does this matter now? Pipelines have multiplied across teams and tools. Most data engineering stacks include a scheduler or orchestrator, a warehouse or lakehouse, ELT frameworks like dbt, streaming components like Kafka or Flink, and a handful of cloud services. Each part has logs and metrics, but the signal you care about is end-to-end: did the data arrive, is it complete and fresh, and can downstream consumers trust it?

The failure mode that stings the most is the quiet one. Engineers often build robust retry logic, idempotent writes, and id-aware processing. Jobs succeed; SLAs pass; dashboards stay green. But when source systems change, schemas drift, time windows shift, or upstream filters accidentally exclude all data, the pipeline is technically alive while the data path is functionally dead. That deadpipe condition is the gap between "system health" and "data health."

Traditional monitoring focuses on compute metrics: CPU, memory, container up/down, DAG task success, runtime. These are necessary but insufficient. If you observe only infrastructure health, you will miss data semantics like "we processed 0 of the expected 1.2 million events," or "the incremental watermark did not advance," or "two critical dimensions vanished from the schema." Real monitoring must attach to the data itself and the logical outcome of pipeline runs.

Teams typically attempt a patchwork of homegrown checks: ad hoc row counts, manual data spot checks, periodic cron queries. The problem is that these are noisy, brittle, and divorced from changes in load, seasonality, or backfills. Without baselines and explicit SLOs, you cannot reason about normal versus anomalous variance. Without liveness pings and end-to-end lineage awareness, you cannot correlate upstream disruptions to downstream emptiness.

Deadpipe monitoring addresses this gap with three pillars:

  1. Liveness: A heartbeat or check-in signal that asserts "this pipeline attempted to produce data within the expected window" with enough context to detect abnormal silence.

  2. Freshness and completeness: Freshness measures whether new data arrived when it should. Completeness checks assess whether the delivered volume falls within expected bounds. Together they surface partial loads, backfills, and schedule slips.

  3. Quality and semantics: Assertions about schema, uniqueness, nullability, referential integrity, and distributional anomalies. These checks ensure "good enough" data, not merely data present.

Deadpipe wraps these concepts with a developer-friendly SDK, a declarative monitor configuration, and AI-powered baselines that cut noise. It integrates with the tools teams already use, so you can monitor pipelines where they run: Airflow DAGs, dbt models, Spark jobs, Flink apps, and warehouse SQL. For an overview of how AI observability meets pipeline monitoring, see AI Observability in Data Pipelines: What Works and AI Observability in Data Pipelines with Deadpipe.

What about cost and complexity? Monitoring can sprawl, both in operational cost and developer time. Deadpipe emphasizes affordable, incremental adoption. Start with one or two critical pipelines, define concrete SLOs, and grow coverage. If budget is a concern, explore AI Observability: Cost-Effective Pipeline Monitoring and Affordable Data Engineering & Pipeline Monitoring for strategies to control spend without sacrificing reliability.

Finally, remember that monitoring is a means to an end. The end is trust. When data practitioners and stakeholders trust the pipeline, they ship models and dashboards faster, and the business acts on data with confidence. Deadpipe helps you earn that trust by catching failures early, contextualizing alerts, and tracking improvement over time.

Main Content Section 1: Instrumenting Pipelines for Liveness, Freshness, and Completeness

The first step to fix pipeline failures is to instrument your code so Deadpipe can observe it. You do not need to refactor your entire stack. Start with lightweight signals that we call "liveness" and augment them with freshness and completeness calculations.

Liveness heartbeats for batch jobs

A batch pipeline should emit a heartbeat at the beginning of a run and record success or failure at the end. The heartbeat binds a run to a logical data interval (for example, daily partition 2025-01-15) and a unique run id. Deadpipe correlates these events to detect deadpipe conditions: no heartbeat within the schedule window, or a heartbeat without completion.

Copy-paste ready Python example for Airflow or any batch runner:

# requirements.txt
# deadpipe>=0.7.0

import os
import uuid
from datetime import datetime, timezone
from deadpipe import DeadpipeClient, Severity

DEADPIPE_API_KEY = os.environ["DEADPIPE_API_KEY"]
PIPELINE_ID = "orders_daily_ingest"
ENV = os.getenv("ENV", "prod")

client = DeadpipeClient(api_key=DEADPIPE_API_KEY, env=ENV)

def run_orders_daily(execution_date: datetime):
    run_id = f"{PIPELINE_ID}:{execution_date.strftime('%Y-%m-%d')}:{uuid.uuid4()}"
    partition = execution_date.strftime("%Y-%m-%d")

    # Start heartbeat: liveness ping
    client.heartbeat(
        pipeline_id=PIPELINE_ID,
        run_id=run_id,
        partition=partition,
        schedule="daily",
        message="Starting daily ingest"
    )

    try:
        # Perform work...
        rows_written = load_orders_for_partition(partition)
        watermark = partition  # e.g., last_processed_date
        # Mark completion with outcome and metrics
        client.complete(
            pipeline_id=PIPELINE_ID,
            run_id=run_id,
            partition=partition,
            status="success",
            metrics={
                "rows_written": rows_written,
                "last_watermark": watermark
            }
        )
        return rows_written
    except Exception as e:
        # Signal failure for visibility
        client.complete(
            pipeline_id=PIPELINE_ID,
            run_id=run_id,
            partition=partition,
            status="failure",
            severity=Severity.CRITICAL,
            error=str(e)
        )
        raise

def load_orders_for_partition(partition: str) -> int:
    # ... your ingestion logic ...
    # return the number of rows written for monitoring
    return 123456

In Airflow, you can wrap this inside a PythonOperator, passing ds as the partition. The important parts are:

  • Emit heartbeat as early as possible in the run.
  • Emit complete always, even on errors (inside a finally or except).
  • Include metrics like row counts, processed bytes, and watermarks.

Tip: Use a stable pipeline_id. Include environment as a tag or let the client derive it from API key/workspace.

Heartbeats for streaming pipelines

Streaming jobs require a different liveness pattern: continuous check-ins based on event flow rather than scheduled runs. You can:

  • Emit a heartbeat every N seconds/minutes when the job is alive and processing.
  • Attach a moving window metric such as events processed in the last 5 minutes.
  • Include consumer group lag or watermark to detect stuck reads.

Kafka consumer example with Deadpipe:

from deadpipe import DeadpipeClient
from kafka import KafkaConsumer
from time import time, sleep

client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"])

PIPELINE_ID = "payments_stream"
HEARTBEAT_INTERVAL_SEC = 60
last_hb = 0
events_in_window = 0

consumer = KafkaConsumer(
    "payments",
    bootstrap_servers=["kafka:9092"],
    group_id="payments-stream-consumer",
    enable_auto_commit=True,
)

for msg in consumer:
    process_message(msg)
    events_in_window += 1

    now = time()
    if now - last_hb >= HEARTBEAT_INTERVAL_SEC:
        client.heartbeat(
            pipeline_id=PIPELINE_ID,
            run_id="stream",  # long-lived
            schedule="streaming",
            message="Streaming heartbeat",
            metrics={"events_last_minute": events_in_window, "lag": get_consumer_lag()}
        )
        events_in_window = 0
        last_hb = now

This light-touch approach detects deadpipe conditions like:

  • Zero events_last_minute for X intervals (while job is up).
  • Lag increasing beyond threshold, indicating stuck or slow downstream.
  • Watermark not advancing (e.g., event-time stalls).

Freshness monitoring

Freshness asks: how stale is the data compared to now or compared to the source’s provided time? For batch jobs, compute a freshness lag at the end of the run. For streaming, use moving windows.

SQL example to compute last partition date and lag in Snowflake:

-- Compute freshness lag in minutes
WITH last AS (
  SELECT MAX(partition_date) AS max_partition
  FROM analytics.orders_staging
)
SELECT DATEDIFF('minute', max_partition, CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())) AS freshness_lag_minutes
FROM last;

Report this metric to Deadpipe:

lag = run_query_and_fetch_int("SELECT ... freshness query ...")
client.metric(
    pipeline_id="orders_daily_ingest",
    run_id=run_id,
    name="freshness_lag_minutes",
    value=lag,
    tags={"table": "analytics.orders_staging"}
)

Practical thresholds:

  • Daily ingestion: lag should be < 36 hours (accounts for retries).
  • Hourly ingestion: lag < 3 hours.
  • Streaming: event_time lag < 5 minutes for near-real-time.

Deadpipe can baseline lag per hour-of-day and day-of-week to reduce false alarms on known slow windows.

Completeness monitoring

Completeness measures whether the volume landed as expected. You can implement:

  • Absolute minimums: e.g., at least 500k rows on weekdays.
  • Relative baselines: within ±20% of the trailing 14-day median for same weekday.
  • Ratio checks: fact rows vs dimension changes; new vs updated records.

Example: assert daily row count within range before marking a partition “ready.”

-- Warehouse-agnostic pseudo SQL
WITH todays AS (
  SELECT COUNT(*) AS c FROM analytics.orders_staging WHERE partition_date = :partition
),
baseline AS (
  SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY cnt) AS median_cnt
  FROM (
    SELECT COUNT(*) AS cnt
    FROM analytics.orders_staging
    WHERE partition_date >= DATEADD('day', -28, :partition)
      AND partition_date < :partition
      AND DAYOFWEEK(partition_date) = DAYOFWEEK(:partition)
    GROUP BY partition_date
  )
)
SELECT t.c AS todays_count, b.median_cnt
FROM todays t CROSS JOIN baseline b;

In Python:

todays_count, median_cnt = fetch_counts(partition)
lower = int(median_cnt * 0.8)
upper = int(median_cnt * 1.2)

client.metric(
    pipeline_id=PIPELINE_ID,
    run_id=run_id,
    name="rows_written",
    value=todays_count,
    tags={"partition": partition}
)

if todays_count < lower or todays_count > upper:
    client.alert(
        pipeline_id=PIPELINE_ID,
        run_id=run_id,
        title="Completeness anomaly",
        severity="warning",
        message=f"Count {todays_count} outside [{lower}, {upper}] baseline",
        tags={"partition": partition}
    )

Guardrail pattern: fail-fast on zero rows unless explicitly allowed (e.g., holidays):

if todays_count == 0 and not is_zero_expected(partition):
    raise RuntimeError(f"Zero rows for {partition} is not allowed")

Watermark checks and idempotency

Incremental jobs often rely on watermarks (max timestamp, max id). Monitor:

  • Watermark monotonicity: it should not regress.
  • Advancement delta: it should advance by at least N units (e.g., minutes) per run.
  • Rollover windows: allow catch-up after outages but alert on prolonged stagnation.

Example check:

prev = client.get_state(f"{PIPELINE_ID}:last_watermark")  # "2025-01-14"
current = watermark  # "2025-01-15"

if current <= prev:
    client.alert(
        pipeline_id=PIPELINE_ID,
        run_id=run_id,
        severity="critical",
        title="Watermark did not advance",
        message=f"prev={prev}, current={current}"
    )
else:
    client.set_state(f"{PIPELINE_ID}:last_watermark", current)

Idempotency hint: When reprocessing, allow same watermark with “replay” tag to avoid noise. Tag runs with context={"mode": "backfill"} so Deadpipe can apply different baselines.

Emitting structured context

Every heartbeat and completion should carry context so alerts are actionable:

  • Source identifiers (API name, table name, topic)
  • Destination identifiers (schema.table, storage path)
  • Orchestrator metadata (Airflow DAG/task, dbt model name/selector)
  • Deployment version (git SHA, container image)
  • Runbook URL

Example:

client.heartbeat(
    pipeline_id=PIPELINE_ID,
    run_id=run_id,
    partition=partition,
    schedule="daily",
    message="Starting load",
    context={
        "source": "shopify_orders_api",
        "destination": "analytics.orders_staging",
        "git_sha": os.getenv("GIT_SHA"),
        "runbook": "https://runbooks.company.com/orders_daily_ingest"
    }
)

Main Content Section 2: Defining SLOs and Alerting That Matters

Monitoring without goals generates noise. Service Level Objectives (SLOs) define acceptable performance for your pipelines. Deadpipe helps you track Service Level Indicators (SLIs) and compute error budgets.

Core SLIs for pipelines

  • Freshness SLI: fraction of time freshness_lag_minutes < threshold.
  • Completeness SLI: fraction of runs where row_count within baseline band.
  • Liveness SLI: fraction of expected schedule slots with heartbeat+completion.
  • Quality SLI: fraction of runs where critical assertions pass.

Define them per pipeline and aggregate for domains (e.g., finance, growth).

YAML example (deadpipe.monitors.yaml):

pipelines:
  - id: orders_daily_ingest
    description: Daily orders ingest from Shopify to analytics.orders_staging
    schedule: daily
    owners:
      - team: data-platform
        pagerduty_service: dp-orders
        slack: "#data-alerts"
    slos:
      freshness_minutes_p95: 1440
      completeness_band_pct: 0.2
      liveness_success_rate: 0.99
      quality_pass_rate: 0.995
    alerts:
      channels:
        - type: slack
          target: "#data-alerts"
        - type: pagerduty
          target: "dp-orders"
      rules:
        - name: deadpipe-miss
          condition: "no_heartbeat within schedule_window"
          severity: critical
          message: "No heartbeat for {pipeline_id} {partition}"
        - name: zero-rows
          condition: "rows_written == 0 and not allow_zero"
          severity: critical
          message: "Zero rows for {partition}"
        - name: freshness-breach
          condition: "freshness_lag_minutes > 1440"
          severity: warning
          message: "Freshness lag {freshness_lag_minutes}m exceeds 1440m"

Deadpipe’s UI lets you fine-tune: maintenance windows, holiday calendars, and alert suppression when upstream has a known outage.

Alert routing and deduplication

Actionable alerts:

  • Deduplicate per partition/run: avoid flooding Slack with every check failure.
  • Group related failures: liveness + zero rows + watermark stagnation belong in one incident.
  • Provide runbook and recent change log: link to last commit, deployment timestamp.
  • Include suggested remediation: retries, backfill command.

Example runbook snippet included in alert:

  • Validate source availability: curl the API health.
  • Check upstream partitions: SELECT COUNT(*) FROM upstream WHERE partition = :p.
  • Manual backfill command:
    • Airflow: airflow dags backfill orders_daily -s 2025-01-15 -e 2025-01-15
    • dbt: dbt run --select orders_staging --vars 'partition: 2025-01-15'

Error budgets for data pipelines

Error budgets apply to data too. If your liveness SLO is 99%, you can miss ~7 hours/month for hourly jobs. Track budget burn:

  • Rapid burn: multiple consecutive misses triggers higher severity.
  • Slow burn: occasional anomalies can be tolerated; focus on impactful pipelines.

Deadpipe can surface budget burn-down charts by SLI over time, helping prioritize engineering work (e.g., improving retry logic vs optimizing completeness baselines).

Main Content Section 3: Quality and Semantics Checks

Completeness is not enough. Bad data can flow "successfully" and harm consumers. Add quality checks that gate downstream exposure.

Schema drift detection

Automatically flag added/removed/changed columns. Two approaches:

  • Producer-side schema registry (Avro/Protobuf) with Deadpipe bridge.
  • Consumer-side introspection: snapshot information_schema and diff.

SQL example (BigQuery) to snapshot schema:

CREATE OR REPLACE TABLE monitoring.schemas_orders AS
SELECT column_name, data_type, is_nullable
FROM `project.dataset`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'orders_staging';

Diff and alert:

WITH current AS (
  SELECT column_name, data_type, is_nullable
  FROM `project.dataset`.INFORMATION_SCHEMA.COLUMNS
  WHERE table_name = 'orders_staging'
),
baseline AS (
  SELECT column_name, data_type, is_nullable
  FROM `project.dataset.monitoring`.schemas_orders
)
SELECT
  'added' AS change_type, c.column_name
FROM current c LEFT JOIN baseline b USING (column_name)
WHERE b.column_name IS NULL
UNION ALL
SELECT
  'removed' AS change_type, b.column_name
FROM baseline b LEFT JOIN current c USING (column_name)
WHERE c.column_name IS NULL
UNION ALL
SELECT
  'type_changed', c.column_name
FROM current c JOIN baseline b USING (column_name)
WHERE c.data_type != b.data_type OR c.is_nullable != b.is_nullable;

Wire this into Deadpipe by posting a "schema_change" event; optionally block deployment if prohibited changes are detected.

Row-level assertions

  • Primary key uniqueness:
    • Assert COUNT(*) == COUNT(DISTINCT id).
  • Nullability:
    • Critical columns must be non-null for new partitions.
  • Referential integrity:
    • Foreign keys must match dictionary/dimension tables.
  • Distribution checks:
    • Value ranges (amount >= 0), categorical domain checks (status IN (...)).

dbt example test that also reports to Deadpipe:

-- tests/unique_orders_id.sql
SELECT id
FROM {{ ref('orders_staging') }}
GROUP BY id
HAVING COUNT(*) > 1

In dbt’s on-run-end hook:

# dbt_project.yml
on-run-end:
  - "{{ deadpipe_report_results(results) }}"

Macro:

{% macro deadpipe_report_results(results) %}
{% for r in results %}
  {% if r.status != 'success' %}
    {{ log("Deadpipe alert for " ~ r.node.name, info=True) }}
    {% do run_query(
      "select deadpipe.alert(:pipeline_id, :title, :severity, :message)",
      {
        "pipeline_id": "orders_transform",
        "title": "dbt test failed: " ~ r.node.name,
        "severity": "critical",
        "message": r.message
      }
    ) %}
  {% endif %}
{% endfor %}
{% endmacro %}

Alternatively, use the Deadpipe Python client in a post-run script to collect dbt test results JSON and push structured alerts.

Anomaly detection and baselines

Beyond static thresholds, use Deadpipe’s baselines to learn normal:

  • Seasonality-aware: weekday/weekend patterns.
  • Volume cliffs: e.g., month-end spikes.
  • Source-driven variation: promotions causing demand surges.

Practical tips:

  • Require a warm-up period (e.g., 14–21 days) before enabling high-sensitivity alerts.
  • Combine baselines with minimum floors to catch cataclysmic zeros.
  • Tag backfills to exclude from baseline training windows.

Contract tests with producers

If you own both producer and consumer, implement data contracts:

  • JSON schema for event payloads with semantic versioning.
  • Contracted SLA for delivery times and completeness.
  • Pre-production validation in CI: publish sample events to a staging topic/table and validate with Deadpipe checks.

Main Content Section 4: Integrations and Implementation Patterns

Deadpipe meets you where you work. Here’s how to integrate with popular tools.

Airflow integration

A typical DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from deadpipe import DeadpipeClient

client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"])

default_args = {"retries": 1, "retry_delay": timedelta(minutes=15)}
with DAG(
    dag_id="orders_daily_ingest",
    schedule_interval="0 2 * * *",
    start_date=datetime(2024, 1, 1),
    default_args=default_args,
    catchup=True,
    max_active_runs=1,
) as dag:

    def start(**context):
        ds = context["ds"]
        run_id = f"orders_daily_ingest:{ds}"
        client.heartbeat(pipeline_id="orders_daily_ingest", run_id=run_id, partition=ds, schedule="daily")

    def load(**context):
        ds = context["ds"]
        rows = load_orders_for_partition(ds)
        context["ti"].xcom_push(key="rows", value=rows)

    def validate_and_complete(**context):
        ds = context["ds"]
        run_id = f"orders_daily_ingest:{ds}"
        rows = context["ti"].xcom_pull(key="rows")
        # Fetch baseline and compute anomalies...
        client.complete(
            pipeline_id="orders_daily_ingest",
            run_id=run_id,
            partition=ds,
            status="success",
            metrics={"rows_written": rows}
        )

    start_task = PythonOperator(task_id="start", python_callable=start, provide_context=True)
    load_task = PythonOperator(task_id="load", python_callable=load, provide_context=True)
    finish_task = PythonOperator(task_id="finish", python_callable=validate_and_complete, provide_context=True)

    start_task >> load_task >> finish_task

Use an Airflow DeadpipeSensor to wait for upstream liveness or quality before proceeding, enabling cross-DAG dependencies on data health, not only task success.

dbt integration

Patterns:

  • Inject freshness checks via dbt sources freshness + Deadpipe forwarding.
  • Report dbt test failures with context and severity mapping.
  • Tag critical models with SLOs; block exposures if quality gate fails.

dbt source freshness post-hook:

sources:
  - name: raw
    tables:
      - name: orders
        freshness:
          warn_after: {count: 36, period: hour}
          error_after: {count: 72, period: hour}
        loaded_at_field: _ingested_at
        post-hook:
          - "python scripts/deadpipe_freshness.py {{ this.name }} '{{ freshness_status }}' {{ freshness_lag }}"

deadpipe_freshness.py:

import sys, os
from deadpipe import DeadpipeClient
client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"])

table, status, lag = sys.argv[1], sys.argv[2], int(sys.argv[3])
severity = "warning" if status == "warn" else "critical" if status == "error" else "info"

client.alert(
    pipeline_id="dbt_sources",
    run_id=os.getenv("DBT_INVOCATION_ID"),
    title=f"Freshness {status} for {table}",
    severity=severity,
    message=f"Lag {lag} minutes",
    tags={"table": table}
)

Spark and Databricks

Batch:

from deadpipe import DeadpipeClient
client = DeadpipeClient(api_key=dbutils.secrets.get("monitoring", "deadpipe_api_key"))

partition = dbutils.widgets.get("partition")
run_id = f"orders_spark_batch:{partition}"
client.heartbeat(pipeline_id="orders_spark_batch", run_id=run_id, partition=partition, schedule="daily")

df = spark.read.parquet(f"/mnt/raw/orders/date={partition}")
# Transform...
rows = df.count()
df.write.mode("overwrite").parquet(f"/mnt/stage/orders/date={partition}")

client.complete(
    pipeline_id="orders_spark_batch",
    run_id=run_id,
    partition=partition,
    status="success",
    metrics={"rows_written": rows}
)

Structured Streaming:

from pyspark.sql.functions import col, window
from deadpipe import DeadpipeClient
import time

client = DeadpipeClient(api_key=dbutils.secrets.get("monitoring", "deadpipe_api_key"))
PIPELINE_ID = "orders_stream_ss"

def report_heartbeat(batch_df, batch_id):
    count = batch_df.count()
    client.heartbeat(
        pipeline_id=PIPELINE_ID,
        run_id="stream",
        schedule="streaming",
        message=f"Batch {batch_id}",
        metrics={"events_in_batch": count}
    )

df = spark.readStream.format("kafka").option("subscribe", "orders").load()
parsed = parse_orders(df)

query = (
  parsed.writeStream
    .foreachBatch(lambda df, idx: (df.write.mode("append").saveAsTable("analytics.orders_stream"), report_heartbeat(df, idx)))
    .outputMode("append")
    .start()
)

while query.isActive:
    time.sleep(60)

Flink can emit metrics to Prometheus; bridge them into Deadpipe via exporters, or emit direct heartbeats from operators. Key metrics: records-in/out, checkpoint time, backpressure, consumer lag. Set Deadpipe alert on “records_out_per_minute == 0 for 5 minutes.”

Cloud warehouses

  • BigQuery: Use INFORMATION_SCHEMA and SCHEDULED QUERIES to run health checks; call Deadpipe via UDF HTTP or Cloud Functions.
  • Snowflake: Tasks and Streams; use TASK stored procedures to compute metrics and call a Deadpipe REST endpoint with SYSTEM$CURL.
  • Redshift/Postgres: Cron jobs with SQL scripts; a small Python wrapper posts to Deadpipe.

REST example (generic):

curl -X POST https://api.deadpipe.io/v1/heartbeat \
  -H "Authorization: Bearer $DEADPIPE_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "pipeline_id": "orders_daily_ingest",
    "run_id": "orders_daily_ingest:2025-01-15",
    "partition": "2025-01-15",
    "schedule": "daily",
    "context": {"destination": "analytics.orders_staging"}
  }'

Main Content Section 5: Safe Gating and Promotion to “Ready”

Marking data “ready” should depend on checks passing. Implement a readiness flag:

  • Write to staging table.
  • Run checks (row count, schema, constraints).
  • If pass: upsert to production table or flip a view alias.
  • If fail: keep staging isolated; emit alert; do not update production.

Example with SQL view swap (Snowflake):

-- After loading analytics.orders_staging
-- Run checks and compute a boolean "is_ready"
-- If ready, atomically swap:
ALTER VIEW analytics.orders_current SET SECURE = TRUE; -- ensure secure
ALTER TABLE analytics.orders RENAME TO analytics.orders_prev;
ALTER TABLE analytics.orders_staging RENAME TO analytics.orders;
DROP TABLE analytics.orders_prev;

Or better, use a pointer view:

  • View orders_current selects from latest partition with ready = true.
  • Loader sets ready = true only when checks pass.
  • Consumers always query orders_current.

Deadpipe can mark a partition’s readiness status and expose it via API/UI, so other jobs can wait_until_ready.

Main Content Section 6: Step-by-Step Setup

  1. Pick a critical pipeline.
  • Identify owners, consumers, and current pain points.
  • Document schedule, partitions, and expected volumes.
  1. Enable liveness.
  • Add heartbeat at run start.
  • Add completion with status and metrics.
  • Test locally with a dry run.
  1. Add freshness and completeness.
  • Compute freshness lag and row counts.
  • Set initial static thresholds (e.g., lag < 36h, rows > 0).
  • Train baselines (14–21 days) and enable ±20% band.
  1. Wire alerts.
  • Route to a low-noise channel first (#data-alerts-dev).
  • Configure deduplication and escalation.
  1. Add quality gates.
  • Unique keys, not-null assertions, domain checks.
  • Schema drift detection.
  1. Introduce SLOs.
  • Set initial objectives and track budget.
  • Publish SLOs in documentation/readme.
  1. Expand coverage.
  • Tackle the next highest-impact pipeline.
  • Standardize patterns via templates and libraries.
  1. Build runbooks.
  • For each alert type, write a 5–10 step runbook with commands/queries.
  • Store in version control and link in alerts.
  1. Review and iterate.
  • Monthly postmortems: adjust baselines, thresholds, and processes.
  • Track MTTR/MTTD trends.

Main Content Section 7: Practical Examples and Use Cases

Example 1: Marketing attribution backfill

Problem: Backfill for October ran “successfully” but wrote 0 rows due to an upstream filter on US-only traffic.

Deadpipe setup:

  • Heartbeat for each partition during backfill with context mode=backfill.
  • Completeness baseline disabled; static floor >0 enforced.
  • Alert fired: zero rows detected for 2024-10-15.
  • Runbook: verify source filters; rerun with corrected WHERE clause.

Outcome: Detected within minutes; single-day rerun, avoided missing MRR attribution.

Example 2: Streaming payments stall

Problem: Payments events stopped at 02:14 UTC due to expired API token. Stream job was “running,” checkpoint advanced, but no messages received.

Deadpipe:

  • Heartbeat every minute with events_last_minute metric.
  • Alert fired after 5 minutes of zero events; PagerDuty ping.
  • Suggested remediation: rotate token via internal script.

Outcome: 12-minute outage, no downstream ledger imbalance.

Example 3: Schema drift on orders table

Problem: Upstream added column discount_code and removed promo_id without notice. Downstream transformation failed late.

Deadpipe:

  • Nightly schema snapshot diff detected removal.
  • Alert with change type: removed promo_id; severity=warning.
  • Runbook: update dbt model; add null-handling; patch transformation.

Outcome: Prevented failed release; applied migration before promotion.

Main Content Section 8: Advanced Patterns

Correlating upstream and downstream

  • Emit lineage via tags: upstream_pipeline_ids.
  • Deadpipe auto-suppresses downstream alerts if upstream is red within the same window.
  • Dashboard shows dependency graph with status propagation.

Multi-tenant pipelines

If you process N tenants/customers:

  • Include tenant_id as a tag in metrics and alerts.
  • Configure per-tenant baselines; alert on outliers.
  • Avoid fan-out alert storms by sampling or grouping by severity.

Handling late-arriving data

  • Set allowed lateness per source (e.g., 48 hours).
  • Split freshness SLO into two: nearline SLO for 95%, final SLO for 100% within lateness window.
  • Use watermark + grace to mark partitions provisional; revalidate when late data lands.

Cost-aware monitoring

  • Sample metrics: for very high-frequency streams, report every Nth batch.
  • Use aggregated metrics (histograms) instead of per-record logs.
  • Push checks down to the warehouse where compute is cheap/fixed.
  • Consolidate alerts; avoid per-tenant PagerDuty pages unless critical.

Security and governance

  • Store Deadpipe API keys in secret managers.
  • Do not leak PII in metrics/alerts; use IDs and references.
  • Set RBAC in Deadpipe to control who can mute alerts or change thresholds.
  • Audit logs: monitor who acknowledged, muted, or changed monitors.

Main Content Section 9: Troubleshooting and Common Pitfalls

  • Missing heartbeats due to early failures:
    • Place heartbeat in the earliest executable step.
    • Use try/finally to ensure completion event fires.
  • Duplicate run_ids:
    • Include UUID or orchestrator run_id; ensure idempotent completion updates.
  • Timezone mismatches:
    • Standardize on UTC for partitions and liveness windows.
    • Watch out for DST; schedule by UTC and display local time in UI only.
  • Zero rows expected on holidays:
    • Maintain a calendar; set allow_zero flag for known dates.
  • Baseline hysteresis:
    • Avoid flapping by using percentile bands and minimum alert duration.
  • Schema drift false positives:
    • Ignore benign changes (column ordering); focus on type and nullability changes.
  • Streaming noise:
    • Use moving average of events per minute; require consecutive breaches before alerting.
  • Privilege errors calling Deadpipe from warehouse:
    • Test REST connectivity; open egress to api.deadpipe.io.
  • Alert fatigue:
    • Tag severity correctly; route warnings to Slack, critical to PagerDuty.
  • Backfill suppression:
    • Tag runs with mode=backfill; Deadpipe can mute certain checks or adjust baselines automatically.

Main Content Section 10: Code Templates You Can Reuse

Python utility wrapper

# monitor_utils.py
import os
from contextlib import contextmanager
from deadpipe import DeadpipeClient

client = DeadpipeClient(api_key=os.environ["DEADPIPE_API_KEY"], env=os.getenv("ENV", "prod"))

@contextmanager
def monitored_run(pipeline_id: str, partition: str, schedule: str = "daily", context: dict | None = None):
    run_id = f"{pipeline_id}:{partition}"
    client.heartbeat(pipeline_id=pipeline_id, run_id=run_id, partition=partition, schedule=schedule, context=context or {})
    try:
        yield run_id
        client.complete(pipeline_id=pipeline_id, run_id=run_id, partition=partition, status="success")
    except Exception as e:
        client.complete(pipeline_id=pipeline_id, run_id=run_id, partition=partition, status="failure", error=str(e))
        raise

Usage:

from monitor_utils import monitored_run

with monitored_run("orders_daily_ingest", partition="2025-01-15", context={"destination": "analytics.orders_staging"}) as run_id:
    rows = load_orders_for_partition("2025-01-15")
    client.metric(pipeline_id="orders_daily_ingest", run_id=run_id, name="rows_written", value=rows)

YAML monitor for streaming

pipelines:
  - id: payments_stream
    schedule: streaming
    slos:
      events_per_minute_min: 10
    alerts:
      rules:
        - name: zero-throughput
          condition: "events_last_minute == 0 for 5m"
          severity: critical
          message: "No events in 5 minutes"
        - name: rising-lag
          condition: "lag > 10000 for 10m"
          severity: warning
          message: "Consumer lag above 10k"

SQL quality suite (Snowflake)

-- 1. Uniqueness
SELECT 'dup_keys' AS check_name, COUNT(*) AS violations
FROM (
  SELECT id FROM analytics.orders_staging WHERE partition_date = :partition GROUP BY id HAVING COUNT(*) > 1
)
HAVING violations > 0;

-- 2. Not null
SELECT 'null_amount' AS check_name, COUNT(*) AS violations
FROM analytics.orders_staging
WHERE partition_date = :partition AND amount IS NULL
HAVING violations > 0;

-- 3. Domain values
SELECT 'invalid_status' AS check_name, COUNT(*) AS violations
FROM analytics.orders_staging
WHERE partition_date = :partition AND status NOT IN ('paid', 'refunded', 'pending')
HAVING violations > 0;

Post results to Deadpipe with a small Python script that parses violations and raises alerts as needed.

Main Content Section 11: Measuring Impact and Continuous Improvement

What gets measured gets improved. Track:

  • MTTD: time from failure occurrence to first alert. Goal: minutes, not hours.
  • MTTR: time from alert to resolution. Goal: shrink via runbooks and automation.
  • False positives rate: alerts that did not require action. Goal: <10%.
  • Coverage: percentage of critical pipelines with Deadpipe instrumentation. Goal: 100%.
  • SLO adherence: % of time within SLOs; compare month over month.

Run quarterly reviews:

  • Identify top 5 recurring incident types; automate detection and remediation.
  • Invest in pipeline hardening for the worst offenders.
  • Update playbooks and training for on-call rotations.

Main Content Section 12: Frequently Asked Questions

  • How is Deadpipe different from observability tools that scrape logs and metrics?

    • Deadpipe attaches to data semantics (freshness, completeness, quality) and pipeline intent (partitions, watermarks). It complements infra monitoring by focusing on outcomes, not just runtime.
  • We already have dbt tests; why do we need Deadpipe?

    • dbt tests validate models at run time; Deadpipe adds liveness, cross-tool correlation, baselining, alert routing, and SLO tracking across batch and streaming.
  • Will Deadpipe slow down my pipelines?

    • Instrumentation is lightweight. Most checks reuse warehouse compute or streaming metrics. Network calls are minimal and can be batched.
  • How do we handle secrets and compliance?

    • Use your cloud provider’s secret store; avoid sending PII in metrics. Deadpipe supports tags and masked fields; store only metadata and counts.
  • Can Deadpipe trigger auto-remediation?

    • Yes. Hook alerts to orchestrator APIs to retry, pause, or run backfills; or trigger a script that rotates keys or restarts consumers.

Conclusion

Silent failures are the enemy of trust. Liveness without completeness is a false comfort; completeness without quality is risky. Deadpipe monitoring gives you a cohesive, developer-friendly way to instrument pipelines for liveness, freshness, completeness, and quality, with baselines and SLOs that reduce noise and accelerate response.

Start small: one pipeline, one SLO, one alert. Emit heartbeats, measure freshness, assert non-zero rows, and gate readiness. Then add schema drift detection, row-level checks, and lineage-aware suppression. Integrate with your orchestrators, ELT, streaming, and warehouses. Build runbooks, route alerts with context, and watch your MTTD/MTTR fall while stakeholder confidence rises.

When your pipelines are observed for what matters—the data itself—you convert unknown failures into known events, and known events into quick fixes. That is the path to predictable, observable, resilient data flows with Deadpipe monitoring.

Enjoyed this article?

Share it with your team or try Deadpipe free.