ivanfuhr/ingestor
Composer 安装命令:
composer require ivanfuhr/ingestor
包简介
README 文档
README
Ingestor
Ingestor is a PHP library for safe, auditable data imports with isolated staging, atomic release, and an extensible pipeline.
Data enters through a source driver, is transformed into mutations by a definition, is applied in an isolated stage by a persistence driver, and is only then promoted to production — safely and atomically.
Requires PHP 8.2+ and the PDO extension.
Installation
⚡️ Get started by requiring the package using Composer:
composer require ivanfuhr/ingestor
Quick Start
use Ivanfuhr\Ingestor\Ingestor; use Ivanfuhr\Ingestor\Driver\Persistence\PostgresDriver; use Ivanfuhr\Ingestor\Driver\Source\CsvDriver; $ingestor = Ingestor::make( persistence: new PostgresDriver($pdo), source: new CsvDriver(), ); $import = $ingestor ->for(CustomerImport::class) ->from('/path/to/customers.csv') ->import(); if ($import->hasFailures()) { foreach ($import->failures() as $failure) { // inspect validation or persistence failures } $import->rollback(); return; } $import->release();
Table of Contents
- Architecture
- Definitions & Schema
- Context
- Validation
- Persistence Failures
- Hooks
- Metrics
- Testing Utilities
- PostgreSQL Driver
- CSV Driver
- Development
- Community
- License
🏗️ Architecture
Ingestor separates four responsibilities:
Source
↓
Source Driver
↓
Iterable<RowContext>
↓
Definition (prepare → validate → map)
↓
Dataset (mutations)
↓
Persistence Driver
↓
Stage (isolated)
↓
Release (atomic promotion)
| Driver | Responsibility | Implementations |
|---|---|---|
| Source | Turns a source into input rows | CsvDriver |
| Persistence | Creates staging, persists mutations, and releases | PostgresDriver |
Drivers are injected at construction time. The import pipeline never needs to know how data is read or written.
$ingestor = Ingestor::make( persistence: new PostgresDriver($pdo), source: new CsvDriver(), );
Why: Keeps reading, transformation, and persistence independent — each piece can be swapped or tested in isolation.
📋 Definitions & Schema
A Definition describes an import. It declares structure via Schema and transforms each row into write intentions via Dataset.
use Ivanfuhr\Ingestor\Contract\Definition; use Ivanfuhr\Ingestor\Contract\Context; use Ivanfuhr\Ingestor\Dataset\Dataset; use Ivanfuhr\Ingestor\Schema\Schema; use Ivanfuhr\Ingestor\Stage\EmptyStage; use Ivanfuhr\Ingestor\Stage\PrefilledStage; use Ivanfuhr\Ingestor\Conflict\UpdateOnConflict; final class CustomerImport implements Definition { public function schema(): Schema { return Schema::make() ->dataset('customers') ->using(PrefilledStage::class) ->onConflict(UpdateOnConflict::by('document')) ->dataset('addresses') ->using(EmptyStage::class); } public function map(array $row, Context $context): Dataset { return Dataset::make() ->insert('customers', [ 'document' => $row['cpf'], 'name' => $row['name'], ]) ->insert('addresses', [ 'document' => $row['cpf'], 'city' => $row['city'], ]); } }
Stage Strategies
| Strategy | Behavior |
|---|---|
EmptyStage |
Dataset starts empty |
PrefilledStage |
Dataset starts with a copy of existing data (ideal for incremental updates) |
Conflict Strategies
Declared in the Schema and translated by the persistence driver:
UpdateOnConflict::by('document'); IgnoreOnConflict::by('document'); ReplaceOnConflict::by('document'); FailOnConflict::by('document');
A Stage is an isolated ingestion environment. Nothing touches production until release() is called.
Import
└── Stage
├── customers (staging table)
└── addresses (staging table)
Why: One row can produce zero, one, or many mutations across multiple datasets — without coupling business logic to SQL.
🗂️ Context
Shared storage available throughout an import. Use it to preload ID maps, caches, and reference data so map() stays pure and fast.
use Ivanfuhr\Ingestor\Contract\Preparable; final class OrderImport implements Definition, Preparable { public function prepare(Context $context): void { $context->put('customers', Customer::pluck('id', 'document')->all()); } public function map(array $row, Context $context): Dataset { $customers = $context->get('customers'); return Dataset::make()->insert('orders', [ 'customer_id' => $customers[$row['document']] ?? null, 'total' => $row['total'], ]); } }
Why: Avoids N+1 queries during import. I/O belongs in prepare(); map() should be a pure Row + Context → Dataset transformation.
✅ Validation
Row validation is optional and runs before mapping. Implement ValidatesRows on your definition:
use Ivanfuhr\Ingestor\Contract\ValidatesRows; use Ivanfuhr\Ingestor\Validation\Failure; final class CustomerImport implements Definition, ValidatesRows { public function validate(array $row, Context $context): iterable { if (empty($row['document'])) { yield Failure::error('document') ->message('Document is required.'); } if (empty($row['phone'])) { yield Failure::warning('phone') ->message('Phone number is empty.'); } } }
| Severity | Behavior |
|---|---|
ERROR |
Row is skipped — not mapped or persisted |
WARNING |
Recorded, but the row continues through the pipeline |
Failures are available after import:
$import->failures(); $import->hasFailures();
Why: Invalid rows are caught early, before any database writes, with full reporting for audits and reprocessing.
🚨 Persistence Failures
Database errors (NOT NULL, FOREIGN KEY, UNIQUE, etc.) are exposed through the same Failure mechanism, with additional context:
line()— original source line numberdataset()— affected datasetdata()— row datacause()— underlying exception
Failures do not trigger an automatic rollback. You decide between release() and rollback().
$import = $ingestor ->for(CustomerImport::class) ->from($file) ->import(); if ($import->hasFailures()) { foreach ($import->failures() as $failure) { dump([ 'line' => $failure->line(), 'dataset' => $failure->dataset(), 'message' => $failure->message(), 'data' => $failure->data(), ]); } $import->rollback(); return; } $import->release();
SQL Failure Modes
PostgresDriver supports configurable failure diagnosis:
use Ivanfuhr\Ingestor\Driver\Persistence\SqlFailureMode; new PostgresDriver($pdo, chunkSize: 500, failureMode: SqlFailureMode::Diagnostic);
| Mode | Priority |
|---|---|
Fast |
Throughput — records batch failure when a bulk INSERT fails |
Diagnostic |
Traceability — subdivides the batch to isolate the failing row |
Why: Every mutation inherits its source row context, so persistence errors remain traceable even at scale.
🔗 Hooks
High-level lifecycle hooks for auditing, metrics, notifications, and external integrations. They run a fixed number of times regardless of row volume.
beforeImport()
↓
prepare()
↓
validate() → map() → persist()
↓
afterImport()
↓
release()
↓
beforeRelease() → promote stage → afterRelease()
| Interface | When | Typical use |
|---|---|---|
BeforeImport |
Before import starts | Timers, logging, audit trail |
AfterImport |
After all rows processed, before release | Metrics, reports, notifications |
BeforeRelease |
Immediately before promotion | Final checks, manual approval |
AfterRelease |
After promotion | Cache invalidation, external sync |
BeforeRelease can block publication:
use Ivanfuhr\Ingestor\Exception\CannotRelease; public function beforeRelease(ImportedImport $import): void { if ($import->hasFailures()) { throw CannotRelease::because('Import contains unresolved failures.'); } }
Why: Integrate with the outside world without per-row callbacks that would destroy throughput.
📊 Metrics
Read-only metrics collected during import. Available whether you release or rollback.
$metrics = $import->metrics(); $metrics->startedAt(); $metrics->finishedAt(); $metrics->duration(); $metrics->rows(); // rows processed $metrics->importedRows(); // rows imported successfully $metrics->failedRows(); // rows with failures $metrics->mutations(); // mutations produced foreach ($metrics->datasets() as $dataset) { $dataset->name(); $dataset->mutations(); $dataset->persisted(); $dataset->failures(); }
Failures answer what and why. Metrics answer how much and how long.
Why: Every import becomes observable — performance, throughput, and per-dataset breakdowns without affecting the pipeline.
🧪 Testing Utilities
Test definitions in isolation — no database, no CSV files, no external infrastructure.
Asserting the Schema
use Ivanfuhr\Ingestor\Ingestor; Ingestor::test(CustomerImport::class) ->assertDataset('customers') ->assertStage(PrefilledStage::class) ->assertUpdateOnConflict('document');
Asserting map()
Ingestor::test(CustomerImport::class) ->withContext(['customers' => ['12345678901' => 1]]) ->map(['cpf' => '12345678901', 'name' => 'Ada', 'city' => 'SP']) ->assertInserted('customers', [ 'document' => '12345678901', 'name' => 'Ada', ]) ->assertDatasetCount('addresses', 1);
Asserting Validation
Ingestor::test(CustomerImport::class) ->map(['document' => null]) ->assertFailure(field: 'document', message: 'Document is required.') ->assertFailureCount(1);
Asserting the Full Pipeline
Ingestor::test(CustomerImport::class) ->fromRows([ ['cpf' => '1', 'name' => 'Ada', 'city' => 'SP'], ['cpf' => '2', 'name' => 'Bob', 'city' => 'RJ'], ]) ->import() ->assertRows(2) ->assertImportedRows(2) ->assertFailedRows(0) ->assertMutations(4);
Why: Definitions should be fully testable with fast, deterministic tests — safe to refactor without spinning up infrastructure.
🐘 PostgreSQL Driver
PostgresDriver creates staging tables, inserts data in configurable batches, and atomically promotes staging to production.
use Ivanfuhr\Ingestor\Driver\Persistence\PostgresDriver; use Ivanfuhr\Ingestor\Driver\Persistence\SqlFailureMode; $driver = new PostgresDriver( pdo: $pdo, chunkSize: 500, failureMode: SqlFailureMode::Fast, );
The driver introspects production tables to build matching staging tables and applies conflict strategies from the Schema via ON CONFLICT.
Why: Staging + atomic swap gives you a safe rollback window before data ever reaches production.
📄 CSV Driver
CsvDriver reads CSV files with a header row and yields RowContext objects with line numbers and associative data.
use Ivanfuhr\Ingestor\Driver\Source\CsvDriver; $ingestor = Ingestor::make($persistence, new CsvDriver());
Why: Line numbers flow through the entire pipeline, enabling precise failure reporting back to the source file.
🛠️ Development
composer test # PHPUnit composer lint # PHP-CS-Fixer (check) composer lint:fix # PHP-CS-Fixer (fix) composer phpstan # Static analysis composer rector # Automated refactoring
Community
License
Ingestor was created by Ivan Führ under the MIT license.
统计信息
- 总下载量: 0
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 0
- 点击次数: 3
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2026-06-15