leftJoin()

This operator mimics the SQL LEFT JOIN found in relational databases.

It iterates over the source stream and joins each item with values from the joining stream based on strict key equality.

The operator yields the key from the source stream and a tuple containing the source value as the first element and an iterable of joined values from the joining stream as the second element.

Warning

This operator is not memory-efficient. Memory consumption depends on the size of the joining stream.

class RunOpenCode\Component\Dataset\Operator\LeftJoin
__construct(iterable<TKey, TValue> $source, iterable<TKey, TJoinValue> $join)
Parameters:
  • $sourceiterable<TKey, TValue> Stream source to iterate over on the left side of the left join operation.

  • $joiniterable<TKey, TJoinValue> Stream source to iterate over on the right side of the left join operation.

getIterator()
Returns:

\Traversable<TKey, TValue> Stream source joined with values from joining stream source.

Use cases

  • Join operation is required which can not be executed on database level.

Example

Migrate users and their addresses from legacy system in batches.

 1<?php
 2
 3use App\Model\User;
 4use App\Model\Address;
 5use RunOpenCode\Component\Dataset\Stream;
 6use RunOpenCode\Component\Dataset\Model\Buffer;
 7
 8$users = $database->executeQuery('SELECT * FROM users...');
 9
10new Stream($users)
11    ->bufferCount(100)
12    ->map(function(Buffer $buffer) use ($database): array {
13        $users = $buffer
14            ->stream()
15            ->map(keyTransform: static fn(array $row): int => $row['id'])
16            ->collect(ArrayCollector::class);
17
18        $addresses = new Stream($database->executeQuery('SELECT * FROM addresses ... WHERE user_id IN (:ids)', [
19            'ids' => \array_keys($users),
20        ]))->map(keyTransform: static fn(array $row): int => $row['user_id']);
21
22        return [ $users, $addresses ];
23    })
24    ->map(function(array $batch): Stream {
25        [$users, $addresses] = $batch;
26
27        return new Stream($users)
28            ->leftJoin($addresses);
29    })
30    ->flatten()
31    ->map(valueTransform: function(array $joined): User {
32        [$user, $addresses] = $joined;
33
34        return User::fromArray($user)
35                   ->setAddresses(\array_map(static fn(array $address): Address => Address::fromArray($address), $addresses));
36    })
37    ->bufferCount(100)
38    ->tap(function(Buffer $buffer) use ($orm) {
39        $buffer
40            ->stream()
41            ->tap(function(User $user) use ($orm) {
42                $orm->persist($user);
43            })
44            ->flush();
45
46        $orm->flush();
47        $orm->clear();
48    })
49    ->flush();