bufferCount()

Buffers the stream of data until buffer reaches predefined number of items (or stream is exhausted) and yields instance of RunOpenCode\Component\Dataset\Model\Buffer for batch processing.

Memory consumption depends on the size of the buffer; however, the operator is still considered memory-safe.

class RunOpenCode\Component\Dataset\Operator\BufferCount
__construct(iterable<TKey, TValue> $source, int $count = 1000)
Parameters:
  • $sourceiterable<TKey, TValue> Stream source to iterate over.

  • $countpositive-int Number of items to store into buffer.

getIterator()
Returns:

\Traversable<int, Buffer<TKey, TValue>> Stream of buffers.

Use cases

  • You want to load large dataset (per example, from file or from database) and process it in batches (batch import).

Example

Load data from legacy database and execute batch import of entities into new system. Notify rest of the application that new entity has been created.

 1<?php
 2
 3use App\Model\Entity;
 4use App\Events\EntityCreated;
 5use RunOpenCode\Component\Dataset\Stream;
 6use RunOpenCode\Component\Dataset\Model\Buffer;
 7
 8$orm, $dispatcher;
 9$dataset = $database->executeQuery('...');
10
11new Stream($dataset)
12    ->bufferCount(100)
13    ->map(function(Buffer $buffer) use ($orm): Buffer {
14        $entities = new Stream($buffer)
15            ->map(function(array $row): Entity {
16                return Entity::fromLegacy($row);
17            })
18            ->tap(function(Entity $entity) use ($orm): void {
19                $orm->persist($entity);
20            })
21            ->collect(ArrayCollector::class);
22
23        $orm->flush();
24        $orm->clear();
25
26        return $entities;
27    })
28    ->flatten()
29    ->tap(function(Entity $entity) use ($dispatcher): void {
30         $dispatcher->notify(new EntityCreated($entity::class, $entity->id));
31    })
32    ->flush();