whitedigital-eu/etl-bundle
Composer 安装命令:
composer require whitedigital-eu/etl-bundle
包简介
Extract/Transform/Load processing bundle for Symfony 6+
README 文档
README
Package for running ETL tasks.
- Define custom Extractors, Transformers and Loaders
- Create pipeline to run specific data import/export process.
Tasks can be run:
- from CLI (bin/console etl:run <task_name>)
- from services or controllers
- from frontend using Server Sent Events (SSE) API.
Requirements
- PHP 8.1+
- Symfony 6.2+
Install bundle
composer req "whitedigital-eu/etl-bundle"
Setup task
Example task (HorizonDataExtractor, HorizonCustomerTransformer - should be created separately)
<?php declare(strict_types=1); namespace App\ETL\Task; use App\ETL\Extractor\HorizonDataExtractor; use App\ETL\Transformer\HorizonCustomerTransformer; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Mailer\Exception\TransportExceptionInterface; use WhiteDigital\EtlBundle\Attribute\AsTask; use WhiteDigital\EtlBundle\Exception\EtlException; use WhiteDigital\EtlBundle\Loader\DoctrineDbalLoader; use WhiteDigital\EtlBundle\Task\AbstractTask; #[AsTask(name: 'horizon_customer_import')] class HorizonCustomerImportTask extends AbstractTask { /** * @throws EtlException * @throws TransportExceptionInterface */ public function runTask(OutputInterface $output, array $extractorArgs = null): void { $this->etlPipeline ->setOutput($output) ->addExtractor(HorizonDataExtractor::class, $extractorArgs ?? ['path' => '/rest/TDdmNorSar/query?columns=K.KODS,K.NOSAUK&orderby=K.NOSAUK asc']) ->addTransformer(HorizonCustomerTransformer::class) ->addLoader(DoctrineDbalLoader::class) ->run(); } }
Example Extractor:
<?php declare(strict_types=1); namespace App\ETL\Extractor; use App\Service\HorizonRestApiService; use Psr\Cache\InvalidArgumentException; use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface; use Symfony\Contracts\HttpClient\Exception\RedirectionExceptionInterface; use Symfony\Contracts\HttpClient\Exception\ServerExceptionInterface; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use WhiteDigital\EtlBundle\Exception\ExtractorException; use WhiteDigital\EtlBundle\Extractor\AbstractExtractor; use WhiteDigital\EtlBundle\Helper\Queue; final class HorizonDataExtractor extends AbstractExtractor { public function __construct( private readonly HorizonRestApiService $horizonRestApiService, ) { } /** * @param \Closure|null $batchProcessor * @return Queue<\stdClass> * @throws ClientExceptionInterface * @throws ExtractorException * @throws InvalidArgumentException * @throws RedirectionExceptionInterface * @throws ServerExceptionInterface * @throws TransportExceptionInterface * @throws \JsonException */ public function run(\Closure $batchProcessor = null): Queue { if (null !== $batchProcessor) { throw new ExtractorException(sprintf('Batch mode not supported by %s', __CLASS__)); } $data = new Queue(); $this->output->writeln(sprintf('Datu iegūšana uzsākta no avota: [%s]', $path = $this->getOption('path'))); $rawJsonData = $this->horizonRestApiService->makeGetRequest($path); foreach ($rawJsonData?->collection->row as $row) { $data->push($row); } $this->output->writeln(sprintf('Iegūti %s ieraksti.', count($data))); return $data; } }
Example Transformer:
Example Loader:
Console commands
- Run task by its name:
bin/console etl:run horizon_customer_import
or pass extra custom argument:
bin/console etl:run horizon_customer_import random_path.txt
- List available tasks:
bin/console etl:list
统计信息
- 总下载量: 429
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 1
- 点击次数: 0
- 依赖项目数: 0
- 推荐数: 0
其他信息
- 授权协议: MIT
- 更新时间: 2022-12-01