Skip to main content

Documentation Index

Fetch the complete documentation index at: https://opensre.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Airflow integration enables OpenSRE to investigate DAG failures and extract execution context directly from an Apache Airflow instance. It supports:
  • DAG run inspection
  • Task instance retrieval
  • Failure detection
  • Evidence collection for RCA generation
This integration is designed for incident-driven workflows, where an alert referencing a DAG triggers an investigation.

Architecture

The Airflow integration participates in the investigation pipeline as follows:
  1. Alert ingestion
  2. Planner selects relevant tools
  3. Airflow API is queried
  4. Evidence is collected
  5. RCA is generated
Alert → Planner → Airflow tools → Evidence → RCA

Configuration

Required Environment Variables

AIRFLOW_BASE_URL=http://localhost:8080

# Authentication (choose one)

# Basic Auth
AIRFLOW_USERNAME=your_username
AIRFLOW_PASSWORD=your_password

# Token-based (if supported)
AIRFLOW_AUTH_TOKEN=your_token

# Optional
AIRFLOW_TIMEOUT_SECONDS=15
AIRFLOW_VERIFY_SSL=true
AIRFLOW_MAX_RESULTS=50

Setup Example

Start Airflow locally:
docker run -p 8080:8080 apache/airflow:2.8.1 standalone
Create a failing DAG:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def fail_task():
    raise Exception("Intentional failure")

with DAG(
    dag_id="test_fail_dag",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    PythonOperator(
        task_id="fail_task",
        python_callable=fail_task,
    )
Trigger the DAG:
airflow dags trigger test_fail_dag

Investigation Flow

Run the investigation CLI:
python -m app.cli investigate
Provide the alert payload:
{
  "source": "airflow",
  "message": "Airflow DAG test_fail_dag failed",
  "metadata": {
    "dag_id": "test_fail_dag"
  }
}

Capabilities

CapabilityDescription
List DAG runsFetch execution history
Get task instancesInspect task-level failures
Detect failuresIdentify recent failing runs
RCA supportProvide structured evidence for root cause analysis

Planner Behavior

When source = airflow, the planner:
  • Prioritizes Airflow-related actions
  • Seeds Airflow tools into the action space
However:
  • Tool selection is LLM-driven
  • Exact ordering may vary between runs
This design avoids hard-coded routing and keeps the system extensible.

Error Handling

  • Per-run failures are isolated — one failing request does not break the loop
  • Network/API errors are handled defensively
  • Partial evidence is preserved whenever possible

Testing

E2E Tests

python -m pytest tests/e2e/airflow/test_orchestrator.py -v
Expected output:
test_airflow_investigation_e2e PASSED

Routing Tests

python -m pytest tests/nodes/plan_actions/test_airflow_routing.py -v

Limitations

  • Planner routing is probabilistic (LLM-based)
  • Requires a reachable Airflow instance
  • No CI-backed Airflow instance by default (local validation required)

Design Notes

  • Integration follows the same contract as other sources (Datadog, Grafana, etc.)
  • Uses env-based configuration for simplicity
  • Avoids introducing hard overrides in planning logic
  • Focuses on evidence-driven investigation, not static rules

Future Work

  • Stronger tool routing guarantees
  • CI-backed disposable Airflow instance for e2e tests
  • Deeper DAG dependency analysis
  • Richer RCA explanations