bufferWhile()¶
Buffers stream items while the predicate returns true. When the predicate
evaluates to false (or the stream ends), a
RunOpenCode\Component\Dataset\Model\Buffer instance is emitted and buffering
restarts.
Warning
Memory usage depends on the predicate and data distribution, so memory safety depends on the specific use case.
- class RunOpenCode\Component\Dataset\Operator\BufferWhile¶
- __construct(iterable<TKey, TValue> $source, callable(Buffer<TKey, TValue>, TKey=, TValue=): bool $predicate)¶
- Parameters:
$source –
iterable<TKey, TValue>Stream source to iterate over.$predicate –
callable(Buffer<TKey, TValue>, TKey=, TValue=): boolPredicate function to evaluate if current item should be placed into existing buffer, or existing buffer should be yielded and new one should be created with current item.
- getIterator()¶
- Returns:
\Traversable<int, Buffer<TKey, TValue>>Stream of buffers.
Use cases¶
You want to load large dataset (per example, from file or from database) and process it in batches (batch import). However, you want to cluster stream data based on yielded keys/values from data stream.
Example¶
Model defines Person entity and Address entity which needs to be
migrated from legacy system. In the first iteration, all legacy data for
Person is migrated into new system.
In the next iteration, data for Address is loaded, ordered by person_id
column, and for each related Person, addresses are stored.
On each processed Person and its addresses, system is notified about
successful migration of Person addresses.
1<?php
2
3use App\Model\Person;
4use App\Model\Address;
5use App\Events\PersonAddressesMigrated;
6use RunOpenCode\Component\Dataset\Stream;
7use RunOpenCode\Component\Dataset\Model\Buffer;
8
9$orm, $dispatcher;
10$dataset = $database->executeQuery('...');
11
12new Stream($dataset)
13 ->bufferWhile(function(Buffer $buffer, array $values): bool {
14 return $buffer->first()->value()['person_id'] === $values['person_id'];
15 })
16 ->map(function(Buffer $buffer) use ($orm): Buffer {
17 $person = $orm->fetchOne(Person::class, $buffer->first()->value()['person_id']);
18 $addresses = new Stream($buffer)
19 ->map(function(array $row): Address {
20 return Address::fromLegacy($row);
21 })
22 ->tap(function(Address $entity) use ($orm): void {
23 $orm->persist($entity);
24 })
25 ->collect(ArrayCollector::class);
26
27 $person->setAddresses($addresses);
28
29 $orm->flush();
30 $orm->clear();
31
32 return $person;
33 })
34 ->tap(function(Person $person) use ($dispatcher): void {
35 $dispatcher->notify(new PersonAddressesMigrated($person::class, $person->id));
36 })
37 ->flush();