Back to Skills

Ml Pipeline Automation

Automate ML workflows with Airflow, Kubeflow, MLflow. Use for reproducible pipelines, retraining schedules, MLOps, or encountering task failures, dependency errors, experiment tracking issues.

automationai
By secondsky
17928Updated 1 day agoTypeScriptMIT

Skill Content

# ML Pipeline Automation

Orchestrate end-to-end machine learning workflows from data ingestion to production deployment with production-tested Airflow, Kubeflow, and MLflow patterns.

## When to Use This Skill

Load this skill when:
- **Building ML Pipelines**: Orchestrating data → train → deploy workflows
- **Scheduling Retraining**: Setting up automated model retraining schedules
- **Experiment Tracking**: Tracking experiments, parameters, metrics across runs
- **MLOps Implementation**: Building reproducible, monitored ML infrastructure
- **Workflow Orchestration**: Managing complex multi-step ML workflows
- **Model Registry**: Managing model versions and deployment lifecycle

## Quick Start: ML Pipeline in 5 Steps

```bash
# 1. Install Airflow and MLflow (check for latest versions at time of use)
pip install apache-airflow==3.1.5 mlflow==3.7.0

# Note: These versions are current as of December 2025
# Check PyPI for latest stable releases: https://pypi.org/project/apache-airflow/

# 2. Initialize Airflow database
airflow db init

# 3. Create DAG file: dags/ml_training_pipeline.py
cat > dags/ml_training_pipeline.py << 'EOF'
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1)
)

def train_model(**context):
    import mlflow
    import mlflow.sklearn
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split

    X, y = load_iris(return_X_y=True)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    mlflow.set_tracking_uri('http://localhost:5000')
    mlflow.set_experiment('iris-training')

    with mlflow.start_run():
        model = RandomForestClassifier(n_estimators=100)
        model.fit(X_train, y_train)

        accuracy = model.score(X_test, y_test)
        mlflow.log_metric('accuracy', accuracy)
        mlflow.sklearn.log_model(model, 'model')

train = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)
EOF

# 4. Start Airflow scheduler and webserver
airflow scheduler &
airflow webserver --port 8080 &

# 5. Trigger pipeline
airflow dags trigger ml_training_pipeline

# Access UI: http://localhost:8080
```

**Result**: Working ML pipeline with experiment tracking in under 5 minutes.

## Core Concepts

### Pipeline Stages

1. **Data Collection** → Fetch raw data from sources
2. **Data Validation** → Check schema, quality, distributions
3. **Feature Engineering** → Transform raw data to features
4. **Model Training** → Train with hyperparameter tuning
5. **Model Evaluation** → Validate performance on test set
6. **Model Deployment** → Push to production if metrics pass
7. **Monitoring** → Track drift, performance in production

### Orchestration Tools Comparison

| Tool | Best For | Strengths |
|------|----------|-----------|
| **Airflow** | General ML workflows | Mature, flexible, Python-native |
| **Kubeflow** | Kubernetes-native ML | Container-based, scalable |
| **MLflow** | Experiment tracking | Model registry, versioning |
| **Prefect** | Modern Python workflows | Dynamic DAGs, native caching |
| **Dagster** | Asset-oriented pipelines | Data-aware, testable |

## Basic Airflow DAG

```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='End-to-end ML training pipeline',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False
)

def validate_data(**context):
    """Validate input data quality."""
    import pandas as pd

    data_path = "/data/raw/latest.csv"
    df = pd.read_csv(data_path)

    # Validation checks
    assert len(df) > 1000, f"Insufficient data: {len(df)} rows"
    assert df.isnull().sum().sum() < len(df) * 0.1, "Too many nulls"

    context['ti'].xcom_push(key='data_path', value=data_path)
    logger.info(f"Data validation passed: {len(df)} rows")

def train_model(**context):
    """Train ML model with MLflow tracking."""
    import mlflow
    import mlflow.sklearn
    from sklearn.ensemble import RandomForestClassifier

    data_path = context['ti'].xcom_pull(key='data_path', task_ids='validate_data')

    mlflow.set_tracking_uri('http://mlflow:5000')
    mlflow.set_experiment('production-training')

    with mlflow.start_run():
        # Training logic here
        model = RandomForestClassifier(n_estimators=100)
        # model.fit(X, y) ...

        mlflow.log_param('n_estimators', 100)
        mlflow.sklearn.log_model(model, 'model')

validate = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

train = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)

validate >> train
```

## Known Issues Prevention

### 1. Task Failures Without Alerts
**Problem**: Pipeline fails silently, no one notices until users complain.

**Solution**: Configure email/Slack alerts on failure:
```python
default_args = {
    'email': ['ml-team@example.com'],
    'email_on_failure': True,
    'email_on_retry': False
}

def on_failure_callback(context):
    """Send Slack alert on failure."""
    from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

    slack_msg = f"""
    :red_circle: Task Failed: {context['task_instance'].task_id}
    DAG: {context['task_instance'].dag_id}
    Execution Date: {context['ds']}
    Error: {context.get('exception')}
    """

    SlackWebhookOperator(
        task_id='slack_alert',
        slack_webhook_conn_id='slack_webhook',
        message=slack_msg
    ).execute(context)

task = PythonOperator(
    task_id='critical_task',
    python_callable=my_function,
    on_failure_callback=on_failure_callback,
    dag=dag
)
```

### 2. Missing XCom Data Between Tasks
**Problem**: Task expects XCom value from previous task, gets None, crashes.

**Solution**: Always validate XCom pulls:
```python
def process_data(**context):
    data_path = context['ti'].xcom_pull(
        key='data_path',
        task_ids='upstream_task'
    )

    if data_path is None:
        raise ValueError("No data_path from upstream_task - check XCom push")

    # Process data...
```

### 3. DAG Not Appearing in UI
**Problem**: DAG file exists in `dags/` but doesn't show in Airflow UI.

**Solution**: Check DAG parsing errors:
```bash
# Check for syntax errors
python dags/my_dag.py

# View DAG import errors in UI
# Navigate to: Browse → DAG Import Errors

# Common fixes:
# 1. Ensure DAG object is defined in file
# 2. Check for circular imports
# 3. Verify all dependencies installed
# 4. Fix syntax errors
```

### 4. Hardcoded Paths Break in Production
**Problem**: Paths like `/Users/myname/data/` work locally, fail in production.

**Solution**: Use Airflow Variables or environment variables:
```python
from airflow.models import Variable

def load_data(**context):
    # ❌ Bad: Hardcoded path
    # data_path = "/Users/myname/data/train.csv"

    # ✅ Good: Use Airflow Variable
    data_dir = Variable.get("data_directory", "/data")
    data_path = f"{data_dir}/train.csv"

    # Or use environment variable
    import os
    data_path = os.getenv("DATA_PATH", "/data/train.csv")
```

### 5. Stuck Tasks Consume Resources
**Problem**: Task hangs indefinitely, blocks worker slot, wastes resources.

**Solution**: Set execution_timeout on tasks:
```python
from datetime import timedelta

task = PythonOperator(
    task_id='long_running_task',
    python_callable=my_function,
    execution_timeout=timedelta(hours=2),  # Kill after 2 hours
    dag=dag
)
```

### 6. No Data Validation = Bad Model Training
**Problem**: Train on corrupted/incomplete data, model performs poorly in production.

**Solution**: Add data quality validation tasks:
```python
def validate_data_quality(**context):
    """Comprehensive data validation."""
    import pandas as pd

    df = pd.read_csv(data_path)

    # Schema validation
    required_cols = ['user_id', 'timestamp', 'feature_a', 'target']
    missing_cols = set(required_cols) - set(df.columns)
    if missing_cols:
        raise ValueError(f"Missing columns: {missing_cols}")

    # Statistical validation
    if df['target'].isnull().sum() > 0:
        raise ValueError("Target column contains nulls")

    if len(df) < 1000:
        raise ValueError(f"Insufficient data: {len(df)} rows")

    logger.info("✅ Data quality validation passed")
```

### 7. Untracked Experiments = Lost Knowledge
**Problem**: Can't reproduce results, don't know which hyperparameters worked.

**Solution**: Use MLflow for all experiments:
```python
import mlflow

mlflow.set_tracking_uri('http://mlflow:5000')
mlflow.set_experiment('model-experiments')

with mlflow.start_run(run_name='rf_v1'):
    # Log ALL hyperparameters
    mlflow.log_params({
        'model_type': 'random_forest',
        'n_estimators': 100,
        'max_depth': 10,
        'random_state': 42
    })

    # Log ALL metrics
    mlflow.log_metrics({
        'train_accuracy': 0.95,
        'test_accuracy': 0.87,
        'f1_score': 0.89
    })

    # Log model
    mlflow.sklearn.log_model(model, 'model')
```

## When to Load References

Load reference files for detailed production implementations:

- **Airflow DAG Patterns**: Load `references/airflow-patterns.md` when building complex DAGs with error handling, dynamic generation, sensors, task groups, or retry logic. Contains complete production DAG examples.

- **Kubeflow & MLflow Integration**: Load `references/kubeflow-mlflow.md` when using Kubeflow Pipelines for container-native orchestration, integrating MLflow tracking, building KFP components, or managing model registry.

- **Pipeline Monitoring**: Load `references/pipeline-monitoring.md` when implementing data quality checks, drift detection, alert configuration, or pipeline health monitoring with Prometheus.

## Best Practices

1. **Idempotent Tasks**: Tasks should produce same result when re-run
2. **Atomic Operations**: Each task does one thing well
3. **Version Everything**: Data, code, models, dependencies
4. **Comprehensive Logging**: Log all important events with context
5. **Error Handling**: Fail fast with clear error messages
6. **Monitoring**: Track pipeline health, data quality, model drift
7. **Testing**: Test tasks independently before integrating
8. **Documentation**: Document DAG purpose, task dependencies

## Common Patterns

### Conditional Execution
```python
from airflow.operators.python import BranchPythonOperator

def choose_branch(**context):
    accuracy = context['ti'].xcom_pull(key='accuracy', task_ids='evaluate')

    if accuracy > 0.9:
        return 'deploy_to_production'
    else:
        return 'retrain_with_more_data'

branch = BranchPythonOperator(
    task_id='check_accuracy',
    python_callable=choose_branch,
    dag=dag
)

train >> evaluate >> branch >> [deploy, retrain]
```

### Parallel Training
```python
from airflow.utils.task_group import TaskGroup

with TaskGroup('train_models', dag=dag) as train_group:
    train_rf = PythonOperator(task_id='train_rf', ...)
    train_lr = PythonOperator(task_id='train_lr', ...)
    train_xgb = PythonOperator(task_id='train_xgb', ...)

# All models train in parallel
preprocess >> train_group >> select_best
```

### Waiting for Data
```python
from airflow.sensors.filesystem import FileSensor

wait_for_data = FileSensor(
    task_id='wait_for_data',
    filepath='/data/input/{{ ds }}.csv',
    poke_interval=60,  # Check every 60 seconds
    timeout=3600,  # Timeout after 1 hour
    mode='reschedule',  # Don't block worker
    dag=dag
)

wait_for_data >> process_data
```

How to use

  1. Copy the skill content above
  2. Create a .claude/skills directory in your project
  3. Save as .claude/skills/claude-skills-ml-pipeline-automation.md
  4. Use /claude-skills-ml-pipeline-automation in Claude Code to invoke this skill

Claude Code Skills Collection

170 production-ready skills for Claude Code CLI

Version 3.3.1 | Last Updated: 2026-05-14

<div align="center">

🔌 Platform Support

This repository uses Claude Plugin Patterns — natively supported by:

PlatformStatusNotes
Claude CodeNativeFull marketplace support
Factory DroidNativeFull marketplace support
</div> **For all other Platforms like opencode, codex and others, you can use https://github.com/enulus/OpenPackage **

A curated collection of battle-tested skills for building modern web applications with Cloudflare, AI integrations, React, Tailwind, and more.

PS: if skills.sh warns about any skill: Their scan process is a outdated LLM which flags newest versions pins (like in ZOD) as non existent and by that potentially malicous.


Quick Start

Marketplace Installation (Recommended)

# Add the marketplace
/plugin marketplace add https://github.com/secondsky/claude-skills

# Install individual skills as needed
/plugin install cloudflare-d1@claude-skills
/plugin install tailwind-v4-shadcn@claude-skills
/plugin install ai-sdk-core@claude-skills

See MARKETPLACE.md for complete catalog of all 170 skills.

Bulk Installation (Contributors)

# Clone the repository
git clone https://github.com/secondsky/claude-skills.git
cd claude-skills

# Install all 170 skills at once
./scripts/install-all.sh

# Or install individual skills
./scripts/install-skill.sh cloudflare-d1

Repository Structure

This repository contains 170 production-tested skills for Claude Code, each focused on a specific technology or capability.

Individual Skills: Each skill is a standalone unit with:

  • SKILL.md - Core knowledge and guidance
  • Templates - Working code examples
  • References - Extended documentation
  • Scripts - Helper utilities

Installation Options:

  1. Individual - Install only the skills you need via marketplace
  2. Bulk - Install all 170 skills using ./scripts/install-all.sh

Available Skills (170 Individual Skills)

Each skill is individually installable. Install only the skills you need.

Full Catalog: See MARKETPLACE.md for detailed listings.

Categories

CategorySkillsExamples
tooling29turborepo, plan-interview, code-review
frontend26nuxt-v4, nuxt-v5, tailwind-v4-shadcn, tanstack-query, nuxt-studio, maz-ui, threejs
cloudflare21cloudflare-d1, cloudflare-workers-ai, cloudflare-agents
ai20openai-agents, claude-api, ai-sdk-core
api16api-design-principles, graphql-implementation
web10hono-routing, firecrawl-scraper, web-performance
mobile7swift-best-practices, react-native-app, react-native-skills
database6drizzle-orm-d1, neon-vercel-postgres, supabase-postgres-best-practices
security6csrf-protection, access-control-rbac
auth4better-auth
testing4vitest-testing, playwright-testing
design4design-review, design-system-creation
woocommerce4woocommerce-backend-dev
cms4hugo, sveltia-cms, wordpress-plugin-core
architecture3microservices-patterns, architecture-patterns
data3sql-query-optimization, recommendation-engine
seo2seo-optimizer, seo-keyword-cluster-builder
documentation1technical-specification

How It Works

Auto-Discovery

Claude Code automatically checks ~/.claude/skills/ for relevant skills before planning tasks:

User: "Set up a Cloudflare Worker with D1 database"
           ↓
Claude: [Checks skills automatically]
           ↓
Claude: "Found cloudflare-d1 skills.
         These prevent 12 documented errors. Use them?"
           ↓
User: "Yes"
           ↓
Result: Production-ready setup, zero errors, ~65% token savings

Note: Due to token limits, not all skills may be visible at once. See ⚠️ Important: Token Limits below.

Skill Structure

Each skill includes:

skills/[skill-name]/
├── SKILL.md              # Complete documentation
├── .claude-plugin/
│   └── plugin.json       # Plugin metadata
├── templates/            # Ready-to-copy templates
├── scripts/              # Automation scripts
└── references/           # Extended documentation

Recent Additions

May 2026

Supply Chain Security (cross-cutting):

  • dependency-upgrade expanded with Socket CLI integration — proactive malicious package detection, typosquatting alerts, and CI/CD security gates. New 418-line reference guide, 2 GitHub Actions templates, and expanded supply chain security comparison (3 tools)
  • 31 skills now include "Secure Installation" guidance — contextually-tailored security sections across all high-risk skill categories (scaffolding, MCP/agent SDKs, multi-provider installs, Docker, CI/CD). Covers 8 Bun skills, 5 Nuxt skills, 6 Cloudflare skills, 4 AI/agent skills, and 8 frontend/tooling skills
  • Supply chain security is now a first-class cross-cutting concern woven into the skill collection — not a standalone topic

February - April 2026

Full-Stack Frameworks:

  • nuxt-v5 (v1.0.0) - Full Nuxt 5 support with 4 skills (core, data, server, production), 3 diagnostic agents, and interactive setup wizard
  • supabase-postgres-best-practices - 30 Postgres optimization rules from Supabase across 8 categories
  • threejs (v1.0.0) - 3D web graphics: scenes, geometries, shaders, animations, post-processing

Infrastructure:

  • JSON schema validation - Automated plugin.json validation with CI support
  • GitHub issue templates - Skill-specific issue templates for bug reports, feature requests, and submissions

Plugin Enhancements:

  • mutation-testing - Added Bun native runner support
  • dependency-upgrade - Added supply chain security content

December 2025 - January 2026

Frontend Expansion:

  • nuxt-studio (v1.0.0) - Visual CMS for Nuxt Content with live preview, OAuth auth, and R2 storage integration
  • maz-ui (v1.0.0) - 50+ Vue/Nuxt components with theming, i18n, form generation, and 14 composables

Developer Workflow:

  • plan-interview (v2.0.0) - Adaptive interview-driven spec generation with autonomous quality review
  • turborepo (v2.8.0) - Updated to official Vercel skill with enhanced monorepo build optimization

Mobile Development:

  • react-native-skills (v1.0.0) - React Native & Expo best practices with performance optimization patterns

Enhanced Authentication:

  • better-auth (v2.2.0) - Expanded to 18 framework integrations with 30+ authentication plugins

⚠️ Important: Token Limits

Skill Visibility Constraint

Claude Code has a 15,000 character limit for the total size of skill descriptions in the system prompt. This limit also applies to commands and agents.

What this means:

  • Not all 170 skills may be visible in Claude's context at once
  • Skills are loaded based on relevance and available token budget
  • You can verify how many skills Claude currently sees by asking: "How many skills do you see in your system prompt?"

Checking Visible Skills

To verify which skills are currently loaded:

# Ask Claude Code directly
"Check what skills/plugins you see in your system prompt"

Claude will report something like: "85 of 170 skills visible due to token limits"

Workaround: Increase Token Budget

You can double the headroom for skill descriptions by setting an environment variable:

# Increase limit to 30,000 characters
export SLASH_COMMAND_TOOL_CHAR_BUDGET=30000

# Then launch Claude Code
claude

This gives you approximately 2x more skill visibility in the system prompt.

Note: This is a temporary workaround. The Claude Code team is working on better solutions for skill discovery and loading.


Token Efficiency

MetricManual SetupWith SkillsSavings
Average Tokens12,000-15,0004,000-5,000~65%
Typical Errors2-4 per service0 (prevented)100%
Setup Time2-4 hours15-45 minutes~80%

Across all 170 skills: 400+ documented errors prevented.


Contributing

Prerequisites for Contributors

Install the official plugin development toolkit:

/plugin install plugin-dev@claude-code-marketplace

This provides:

  • /plugin-dev:create-plugin command (8-phase guided workflow)
  • 7 comprehensive skills (hooks, MCP, structure, agents, commands, skills)
  • 2 specialized agents (agent-creator, plugin-validator)

Quick Steps

  1. Create skill directory in plugins/
  2. Add SKILL.md with YAML frontmatter
  3. Run ./scripts/sync-plugins.sh
  4. Submit pull request

See CONTRIBUTING.md and PLUGIN_DEV_BEST_PRACTICES.md for detailed guidelines.


Documentation

DocumentPurpose
START_HERE.mdStart here! Quick navigation guide
PLUGIN_DEV_BEST_PRACTICES.mdRepository-specific best practices (marketplace, budget, quality)
MARKETPLACE.mdFull skill catalog and installation guide
MARKETPLACE_MANAGEMENT.mdTechnical infrastructure (plugin.json, scripts, validation)
CLAUDE.mdProject context and development standards
CONTRIBUTING.mdContribution guidelines

Links


Built with ❤️ by Claude Skills Maintainers

View source on GitHub