Dataset component¶
This library is heavily inspired by the Java Stream API for working with collections in a functional(ish), declarative way. In some aspects, it is also inspired by ReactiveX, but with a much simpler approach and far fewer features.
Table of Contents¶
Introduction¶
If your problem can be described as:
I have a data stream from some source (file, database query result, etc.), and I want to iterate over its records and process them using a small memory footprint.
then this library can help you achieve that goal using a declarative approach.
There are several PHP implementations of this idea; however, this implementation
focuses on PHP iterable values, assuming that the underlying implementation
is most likely an instance of Generator. Of course, it also works with the
array data type or anything that implements \Traversable. The real
strength of this library lies in its focus on simple, declarative data stream
processing with minimal memory usage.
If you need full fledged ReactiveX in PHP, please take a look at the official implementation of the specification: RxPHP.
Features¶
Declarative approach to process data streams.
Designed to work with any
iterablewhile using as little memory as possible.Provides a set of operators, reducers, and collectors that can be easily extended or added as needed.
Focused on small memory consumption during processing.
Introduces the concept of aggregators, allowing you to process a stream and reduce (aggregate) values simultaneously without interrupting the data stream.
Quick example¶
A simple example of using this library to list online transactions is shown below. Assume that we want to display a list of online transactions executed within a certain time period, and we also want to calculate the total amount for each currency as well as the overall total. This is how it can be done using this library:
1<?php
2
3namespace App\Reporting\Finance;
4
5use RunOpenCode\Component\Dataset\Stream;
6use RunOpenCode\Component\Dataset\Reducer\Sum;
7use RunOpenCode\Component\Dataset\Reducer\Average;
8
9final readonly class OnlinePurchaseReport
10{
11 // "Database" is just made up service to demonstrate usage of this library.
12 public function __construct(private Database $database)
13 {
14 /* noop */
15 }
16
17 /**
18 * return iterable<array{
19 * client_id: int,
20 * transaction_id: string,
21 * total_amount: int,
22 * currency: 'EUR'|'USD',
23 * converted: int,
24 * }>
25 */
26 public function getReportData(\DateTimeInterface $from, \DateTimeInterface $to, int $conversionRate): iterable
27 {
28 /**
29 * @var iterable<array{
30 * client_id: int,
31 * transaction_id: string,
32 * total_amount: int,
33 * currency: 'EUR'|'USD'
34 * }>
35 */
36 $dataset = $this->database->execute('SELECT * FROM online_transactions WHERE created_at BETWEEN :from AND :to;', [
37 'from' => $from,
38 'to' => $to,
39 ]);
40
41 return Stream::create($dataset)
42 ->aggregate('total_eur', Sum::class, static fn(array $row): int => 'EUR' === $row['currency'] ? $row['total_amount'] : 0)
43 ->aggregate('total_usd', Sum::class, static fn(array $row): int => 'USD' === $row['currency'] ? $row['total_amount'] : 0)
44 ->map(function(array $row) use ($conversionRate): array {
45 $row['converted'] = 'USD' === $row['currency'] ? $row['total_amount'] * $conversionRate : $row['total_amount'];
46 return $row;
47 })
48 ->aggregate('total_converted', Sum::class, static fn(int $reduced, array $row): int => $row['converted'])
49 ->aggregate('average_converted', Sum::class, static fn(int $reduced, array $row): int => $row['converted'])
50 }
51}
Explanation of the code: On line 35 we fetch data from database. PHP returns an iterable that points to the first row of the result set, which means that no rows are loaded into the PHP virtual machine’s memory upfront.
Line 41 wraps that iterable into instance of
RunOpenCode\Component\Dataset\Stream. We then apply the operations that
should be executed while iterating over the stream.
Line 42 applies aggregator that sums all transactions executed in EUR. Line
43 does the same for USD.
Line 44 adds a new column to each row, converted which will convert all
amounts to EUR using the provided conversion rate.
Finally, lines 48 and 49 apply aggregators that calculate the total sum of all
transactions in EUR as well as the average transaction amount in EUR.
None of the processing is executed, until the stream is iterated. The iterable is only wrapped with processing logic. To execute it, you must iterate over it. In practice, this will often be done in a templating engine.
The example below uses echo to demonstrate the concept:
1<?php
2
3foreach($stream as $row) {
4 echo \sprintf(
5 'Client: %s, Transaction: %s, Amount: %d, Currency: %d, EUR: %d',
6 $row['client_id'],
7 $row['transaction_id'],
8 $row['total_amount'],
9 $row['currency'],
10 $row['converted'],
11 );
12 echo "\n";
13}
14
15// Since we iterated, our aggregated values are available too.
16echo \sprintf('Total in EUR: %d', $stream->aggregated['total_converted']);
17echo "\n";
18echo \sprintf('Average in EUR: %d', $stream->aggregated['average_converted']);
So, during this process, memory footprint is almost as low as amount of memory required for storing one row.