bufferWhile()

Buffers stream items while the predicate returns true. When the predicate evaluates to false (or the stream ends), a RunOpenCode\Component\Dataset\Model\Buffer instance is emitted and buffering restarts.

Warning

Memory usage depends on the predicate and data distribution, so memory safety depends on the specific use case.

class RunOpenCode\Component\Dataset\Operator\BufferWhile
__construct(iterable<TKey, TValue> $source, callable(Buffer<TKey, TValue>, TKey=, TValue=): bool $predicate)
Parameters:
  • $sourceiterable<TKey, TValue> Stream source to iterate over.

  • $predicatecallable(Buffer<TKey, TValue>, TKey=, TValue=): bool Predicate function to evaluate if current item should be placed into existing buffer, or existing buffer should be yielded and new one should be created with current item.

getIterator()
Returns:

\Traversable<int, Buffer<TKey, TValue>> Stream of buffers.

Use cases

  • You want to load large dataset (per example, from file or from database) and process it in batches (batch import). However, you want to cluster stream data based on yielded keys/values from data stream.

Example

Model defines Person entity and Address entity which needs to be migrated from legacy system. In the first iteration, all legacy data for Person is migrated into new system.

In the next iteration, data for Address is loaded, ordered by person_id column, and for each related Person, addresses are stored.

On each processed Person and its addresses, system is notified about successful migration of Person addresses.

 1<?php
 2
 3use App\Model\Person;
 4use App\Model\Address;
 5use App\Events\PersonAddressesMigrated;
 6use RunOpenCode\Component\Dataset\Stream;
 7use RunOpenCode\Component\Dataset\Model\Buffer;
 8
 9$orm, $dispatcher;
10$dataset = $database->executeQuery('...');
11
12new Stream($dataset)
13    ->bufferWhile(function(Buffer $buffer, array $values): bool {
14        return $buffer->first()->value()['person_id'] === $values['person_id'];
15    })
16    ->map(function(Buffer $buffer) use ($orm): Buffer {
17        $person = $orm->fetchOne(Person::class, $buffer->first()->value()['person_id']);
18        $addresses = new Stream($buffer)
19            ->map(function(array $row): Address {
20                return Address::fromLegacy($row);
21            })
22            ->tap(function(Address $entity) use ($orm): void {
23                $orm->persist($entity);
24            })
25            ->collect(ArrayCollector::class);
26
27        $person->setAddresses($addresses);
28
29        $orm->flush();
30        $orm->clear();
31
32        return $person;
33    })
34    ->tap(function(Person $person) use ($dispatcher): void {
35         $dispatcher->notify(new PersonAddressesMigrated($person::class, $person->id));
36    })
37    ->flush();