Write your own operator¶
To create your own operator, you need to define a class that implements
RunOpenCode\Component\Dataset\Contract\OperatorInterface. The library also
provides a base class, RunOpenCode\Component\Dataset\AbstractStream,
which serves as a prototype for custom operators and significantly simplifies
their implementation.
Note
This tutorial assumes that you are familiar with PHPStan and generics.
For the purpose of this tutorial, we will implement a log() operator. The goal of this operator is to monitor a stream and write each item (key and value) to a file.
Note
Security, usability, bugs, and edge cases are intentionally ignored in this example. The goal of the tutorial is to demonstrate the process of writing and using a custom operator.
Creating the operator class¶
First, we need to create our operator class and define its signature.
1<?php
2
3declare(strict_types=1);
4
5namespace App\Operator;
6
7use RunOpenCode\Component\Dataset\AbstractStream;
8use RunOpenCode\Component\Dataset\Contract\OperatorInterface;
9
10/**
11 * @template TKey
12 * @template TValue
13 *
14 * @extends AbstractStream<TKey, TValue>
15 * @implements OperatorInterface<TKey, TValue>
16 */
17final class Log extends AbstractStream implements OperatorInterface
18{
19 /**
20 * @param iterable<TKey, TValue> $source Stream source to iterate over.
21 * @param non-empty-string $file File where items should be stored.
22 */
23 public function __construct(
24 iterable $source,
25 private readonly string $file,
26 ) {
27 parent::__construct($source);
28 }
29}
The operator class extends RunOpenCode\Component\Dataset\AbstractStream and
implements RunOpenCode\Component\Dataset\Contract\OperatorInterface.
The class defines two generic template parameters, TKey and TValue,
which describe the key and value types yielded by the operator. This information
is required by PHPStan for correct type inference.
The constructor accepts two arguments:
The source stream to iterate over.
A non-empty string representing the path to the file where stream items will be written.
From the declared generic types and constructor signature, it is clear that this operator does not modify the original stream.
Note
If you need to implement an operator that modifies the stream, you can
refer to existing implementations such as
RunOpenCode\Component\Dataset\Operator\Map or
RunOpenCode\Component\Dataset\Operator\Merge.
The constructor must pass the source stream to the parent AbstractStream
implementation. This is required so that the library can correctly track
upstreams and provide proper support for aggregators.
Lines 12 and 13 are required in order to provide information to the PHPStan what will operator yield. Line 22 provides information about input stream source and what input source streams. Line 23 defines required parameter for operator - a path to a file where stream items will be stored.
It is clear from lines 12, 13 and 22 that operator does not modifies the original stream.
Line 25 is required and in its essence, it will provide source stream to
prototype stream implementation RunOpenCode\Component\Dataset\AbstractStream
which will track upstreams and provide correct support for aggregators.
Implementing the operator logic¶
Next, implement the iterate(): \Traversable<TKey, TValue> method.
This method is executed when the stream is iterated and contains the actual
operator logic.
1<?php
2
3declare(strict_types=1);
4
5namespace App\Operator;
6
7use RunOpenCode\Component\Dataset\AbstractStream;
8use RunOpenCode\Component\Dataset\Contract\OperatorInterface;
9
10/**
11 * @template TKey
12 * @template TValue
13 *
14 * @extends AbstractStream<TKey, TValue>
15 * @implements OperatorInterface<TKey, TValue>
16 */
17final class Log extends AbstractStream implements OperatorInterface
18{
19 // Code omitted for the sake of readability.
20
21 /**
22 * {@inheritdoc}
23 */
24 public function iterate(): \Traversable
25 {
26 $handler = \Safe\fopen($this->file, 'wb'); // @see https://github.com/thecodingmachine/safe
27
28 try {
29 foreach($this->source as $key => $value) {
30 \Safe\fwrite($handler, \sprintf(
31 'Key: "%s", Value: "%s"',
32 self::stringify($key),
33 self::stringify($value),
34 ));
35
36 yield $key => $value;
37 }
38 } finally {
39 \Safe\fclose($handler);
40 }
41 }
42
43 /**
44 * Cast anything to its string representation.
45 */
46 private static function stringify(mixed $value): string
47 {
48 // Implementation omitted.
49 }
50}
Inside this method, a file handler is opened using the provided file path. The source stream is then iterated item by item.
For each key–value pair:
The key and value are converted to their string representations (implementation of method
Log::stringify()is omitted for the sake of readability).The formatted output is written to the file.
The original key and value are yielded back to the stream.
Yielding the original key and value is the most important step, as it allows the stream to continue flowing to downstream operators or consumers.
The file handler is closed in a finally block to ensure that resources are released even if an error occurs during iteration.
Warning
This implementation assumes that the stream will be fully iterated,
which is not always the case. Consumer of the stream can break
iteration (either by using break in loop, or by using operator
which limits number of iterations, such as take()). When
writing your own operators, always account for early termination
and ensure that resources are handled correctly.
Testing the operator¶
Once the operator is complete, it can be unit tested without requiring any external dependencies.
1<?php
2
3declare(strict_types=1);
4
5namespace App\Tests\Operator\Log;
6
7use App\Operator\Log;
8use PHPUnit\Framework\Attributes\Test;
9use PHPUnit\Framework\TestCase;
10
11final class LogTest extends TestCase
12{
13 #[Test]
14 public function logs(): void
15 {
16 $this->assertFileDoesNotExist('/tmp/test_logs.log');
17
18 $operator = new Log([1, 2, 3], '/tmp/test_logs.log');
19
20 \iterator_to_array($operator);
21
22 $this->assertSame(
23 \Safe\file_get_contents('/path/to/expected/output/file.log'),
24 \Safe\file_get_contents('/tmp/test_logs.log'),
25 );
26
27 \Safe\unlink('/tmp/test_logs.log');
28 }
29}
Using the operator¶
When using streams in an object-oriented style, you can call the operator()
method on an instance of RunOpenCode\Component\Dataset\Stream to apply your
operator.
1<?php
2
3declare(strict_types=1);
4
5use RunOpenCode\Component\Dataset\Stream;
6use App\Operator\Log;
7
8Stream(...)
9 ->operator(Log::class, '/path/to/file.log');
When using the functional style in PHP 8.5 or later, the operator() function
is also available.
1<?php
2
3declare(strict_types=1);
4
5use function RunOpenCode\Component\Dataset\stream;
6use function RunOpenCode\Component\Dataset\operator;
7use App\Operator\Log;
8
9$source = [...];
10
11$processed = $source |> stream(...)
12 |> static fn(iterable $stream): Stream => operator($stream, Log::class, '/path/to/file.log');
General advices for implementing operators¶
Keep your operators simple. The general idea of each operator is to perform one simple, fundamental operation. Any data-processing complexity should be achieved by composing multiple simple operators, not by implementing a single complex operator that performs multiple tasks simultaneously.
Reuse existing operators whenever possible. If your operator can be expressed as a composition of existing operators, with only a small amount of custom stream-specific logic, do not reimplement existing functionality. Existing operators are already unit-tested, which makes it easier to reason about their composition than to write everything from scratch.
Keys can be anything — do not break this assumption. A generator may emit values with any type of key.
Keys are not unique — do not break this assumption. A generator may emit multiple items with the same key.
Streams are not rewindable — do not break this assumption. Although
arrayisiterable,\Generatoris alsoiterable, and generators cannot be rewound.Streams may not be fully iterated — do not assume that users of your operator will consume the entire stream. They may use
breakortake()in their code.Do not lock or hold resources. If you do, release them when the operator finishes streaming. Since there is no reliable way to detect whether streaming was interrupted (for example, via
breakor another operator), ensure that your operator implements the__destruct()method to properly release any resources on garbage collection.
If you believe your operator would be valuable to the wider community, feel free to submit a pull request!