Введение в DataPipe stable

DataPipe — это фреймворк для построения надёжных ETL-пайплайнов на Python. Он предоставляет декларативный API для описания потоков данных, встроенную обработку ошибок и масштабирование от одного процесса до распределённого кластера.

Примечание: Если вы обновляетесь с версии 1.x, ознакомьтесь с руководством по миграции. API пайплайнов был значительно переработан в v2.

Возможности

Установка

DataPipe требует Python 3.10 или выше. Рекомендуем установку через pip:

# Базовая установка
pip install datapipe-etl

# С поддержкой PostgreSQL и Kafka
pip install datapipe-etl[postgres,kafka]

# Все коннекторы
pip install datapipe-etl[all]

Для проверки установки выполните:

datapipe --version
# datapipe 2.4.1 (python 3.12.2, arrow 15.0.0)

Быстрый старт

Минимальный пайплайн, который читает данные из CSV-файла, фильтрует строки и записывает результат в PostgreSQL:

from datapipe import Pipeline, Source, Sink
from datapipe.transforms import Filter, Rename

# Определяем пайплайн
pipeline = Pipeline("import_orders")

pipeline.source(
    Source.csv("/data/orders.csv", encoding="utf-8")
)

pipeline.transform(
    Filter(lambda row: row["status"] == "completed"),
    Rename({"order_id": "id", "created_at": "date"})
)

pipeline.sink(
    Sink.postgres(
        dsn="postgresql://user:pass@localhost/warehouse",
        table="orders",
        on_conflict="upsert"
    )
)

# Запуск
result = pipeline.run()
print(f"Обработано: {result.processed} строк за {result.elapsed}")

Конфигурация

DataPipe можно настроить через YAML-файл datapipe.yml в корне проекта:

# datapipe.yml
engine:
  workers: 4
  batch_size: 10000
  retry_policy:
    max_attempts: 3
    backoff: exponential

monitoring:
  enabled: true
  prometheus_port: 9090
  export_interval: 15

logging:
  level: info
  format: json

Параметры конфигурации

ПараметрТипПо умолчаниюОписание
engine.workersint2Число воркеров для параллельной обработки
engine.batch_sizeint5000Размер батча при потоковой обработке
engine.retry_policyobjectПолитика повторных попыток при ошибках
monitoring.enabledboolfalseВключить экспорт метрик
monitoring.prometheus_portint9090Порт для Prometheus endpoint
logging.levelstringinfoУровень логирования: debug, info, warning, error
Внимание: При использовании более 8 воркеров убедитесь, что batch_size уменьшен пропорционально, чтобы избежать чрезмерного потребления памяти.

Следующие шаги