Neue Multithreading Extension Parallel

Die Parallel Extension

Was ist Parallel?

Parallel ist eine neue Multithreading Extension für PHP die an die Coroutinen der Programmiersprache GO erinnert.

Mit Parallel hat man die Möglichkeit Threads zu erstellen, die aber KEINEN shared Memory mit dem Hauptprozess haben. Dies ist wichtig zu verstehen, da man eben dadurch auch keinen Zugriff auf die bereits definierten Klassen und Funktionen hat (dazu gleich aber mehr).

In diesen Threads laufen dann die entsprechend vorgesehenen Algorithmen ab und geben dann ggf. einen entsprechenden Rückgabewert zurück oder teilen über so genannte Channels die Informationen über Threads hinweg.

Parallel wurde von Joe Watkins und Remi Collet entwickelt und steht als PECL Package zur Installation bereit.

Nutzen von Parallel

Um Parallel nutzen zu können benötigt man eine PHP Installation welche Thread Safe ist (Stichwort ZTS).
Sobald man dann auf solch einer Instanz das Parallel Package installiert hat kann man nun anfangen die entsprechenden Methoden zu nutzen.

Erstellen einer Runtime (ein Thread)

Das erstellen eines Threads bzw. einer so genannten Runtime ist relativ einfach und ist durch ein einfaches:

$runtime = new \parallel\Runtime();
// oder
$runtime = new \parallel\Runtime('./vendor/autoload.php');

schnell gemacht.

Wie ihr sehen könnt können wir für das erstellen der Runtime auch eine "bootstrap" Datei angeben, die Parallel uns beim erstellen der Runtime laden soll.
Diese Bootstrap datei sollte die für die Runtime notwendigen Klassen und Funktions Deklarationen beinhalten, damit man diese auch in der entsprechenden Runtime nutzen kann, da wir wie bereits erwähnt keinen shared Memory haben.
Das geladene Bootstrap bleibt in der Runtime, bis diese Runtime geschlossen / gekillt wird.

Diese Runtime steht uns nun so lange zur Verfügung bis wie diesen per $runtime->close() oder $runtime->kill() schließen.

Ausführen von Algorithmen in einer Runtime

Zum Ausführen von Algorithmen in einer Runtime kommen bei Parallel "Closures" zum Einsatz, die im Parallel-Kontext auch "Tasks" genannt werden.
Jede Runtime kann mehrere Closures ausführen und ist somit nicht begrenzt auf eine einzelne Closure. Die Closures werden nach dem FIFO-Prinzip dann in der entsprechenden Runtime ausgeführt.

$runtime = new \parallel\Runtime();

$runtime->run(Closure $task): ?Future
// oder
$runtime->run(Closure $task, array $argv): ?Future

Diese unterliegen jedoch einigen Einschränkungen die es zu beachten gilt.

Closures die an die Runtime übergeben werden dürfen:

  • keine Referenzen zu Objekten entgegen nehmen oder zurück geben
  • keine "Internal Objects" entgegen nehmen oder zurück geben
  • keine Klassen deklarieren
  • keine Funktionen mit Namen deklarieren (Anonyme-Funktionen sind erlaubt)
  • keine yield-Funktionalität nutzen
  • keine "use by-reference" beinhalten

Des Weiteren gelten auch für die Parameter die man an die Closure übergibt beschränkungen.
Dabei dürfen:

  • keine Referenzen übergeben werden
  • keine Resourcen übergeben werden
  • keine "Internal Objects" übergeben werden

Hier mal ein Beispiel für eine solche Closure die ausgeführt werden kann:

$runtime = new \parallel\Runtime();
$runtime->run(
    function (int $count, string $required) {
        for ($i = 0; $i < $count; $i++) {
            printf('%d. Ich möchte %s', $i, $required);
        }
    },
    [100, 'Kekse']
);

Dies sollte 100 mal ausgeben "[Zahl]. Ich möchte Kekse".

Futures - was gibt mir die Runtime da?

Der Returntype der \parallel\Runtime::run(): ?Future Funktion ist "Future" und diese können je nachdem was man macht sehr wichtig sein.
Sie geben nämlich das zurück was in der Closure als Returntype definiert ist. Dies kann ein Wert aber auch eine Exception sein, die man innerhalb der Closure wirft.

Durch die Future können wir auch ermitteln ob unser Closure "Fertig" ($future->done()) ist oder "Abgebrochen" ($future->cancelled()) wurde.

Auf die Rückgabe des ausgeführten Closure kann man dann per $future->value() zugreifen.
Dabei gilt es zu beachten, dass wenn die entsprechende Closure noch nicht fertig ist und man den Wert versucht abzurufen, dieser in einen Blocking Zustand geht wo an jener Stelle wo $future->value() aufgerufen wurde gewartet wird bis die Closure fertig ist.
Dies ist dadurch auch eine einfache Art und Weise den Hauptprozess mit dem Thread zu synchronisieren.

Hierfür habe ich 2 Beispiele vorbereitet:

Beispiel 1 mit Rückgabewert:

$runtime = new \parallel\Runtime();
$future = $runtime->run(
    function (int $count) {
        $calc = 0;
        for ($i = 0; $i < $count; $i++) {
            $calc += $i;
        }

        return $calc;
    },
    [100]
);
echo 'Closure gestartet, warte auf Ergebnis!' . PHP_EOL;
printf('Ergebnis: %d' . PHP_EOL, $future->value());
echo 'Fertig!';

Sollte uns folgendes ausgeben:

Closure gestartet, warte auf Ergebnis!
Ergebnis: 4950
Fertig!

Beispiel 2 mit Exception:

$runtime = new \parallel\Runtime();
$future = $runtime->run(
    function () {
        throw new \RuntimeException('Nichts zu tun!', 1565938195826);
    }
);
echo 'Closure gestartet, warte auf Ergebnis!' . PHP_EOL;
try {
    printf('Ergebnis: %d' . PHP_EOL, $future->value());
} catch (\RuntimeException $runtimeException) {
    printf(
        'Etwas ist schief gelaufen! Message: "%s" | Code: "%s"',
        $runtimeException->getMessage(),
        $runtimeException->getCode()
    );
}
echo 'Fertig!';

Sollte uns folgendes ausgeben:

Closure gestartet, warte auf Ergebnis!
Etwas ist schief gelaufen! Message: "Nichts zu tun!" | Code: "1565938195826"Fertig!

Channels - Austausch von Daten leicht gemacht

Channels sind wichtig um Daten zwischen den Runtimes und dem Hauptprozess auszutauschen und auch zur Synchronisation zwischen Hauptprozess und Runtimes.
Ein Channel kann an einer Closure übergeben werden oder einfach darin abgerufen werden und kann auf 2 Arten benutzt werden, entweder mit einer angegebenen Kapazität (Stichwort: buffered Channel) oder auch ohne (Stichwort: unbuffered Channel).
Der Unterschied dabei liegt darin ob beim senden der Daten über den Channel die public send(mixed $value): void den weiterverlauf Blockiert oder nicht. Das heißt man kann einen Channel bspw. mit einer Kapazität von 10 erstellen und kann 10 mal Daten über den Channel senden bis beim 11 Aufruf die send Methode blockiert, weil die Daten noch nicht abgerufen wurden durch die Methode public recv(): mixed.

Bei einem Channel ohne definierte Kapazität blockieren die send und recv sofort wenn ein entsprechendes Gegenstück nicht vorhanden ist und aufgerufen wurde.

Hier dann eben auch 2 Beispiele zu:

Beispiel 1 Buffered Channel:

$channel = new \parallel\Channel(3);
$runtime = new \parallel\Runtime();
$future = $runtime->run(
    function (\parallel\Channel $channel, int $count) {
        for ($i = 0; $i < $count; $i++) {
            printf('Sending:  %s' . PHP_EOL, $i);
            $channel->send($i);
        }
    },
    [$channel, 10]
);
echo 'Closure gestartet!' . PHP_EOL;
sleep(1);
echo 'Warte auf Ergebnis!' . PHP_EOL;
for ($i = 0; $i < 10; $i++) {
    printf('Received: %s' . PHP_EOL, $channel->recv());
}
echo 'Fertig!';

Sollte etwas ausgeben wie:

Closure gestartet!
Sending:  0
Sending:  1
Sending:  2
Sending:  3
Warte auf Ergebnis!
Received: 0
Received: 1
Received: 2
Sending:  4
Sending:  5
Sending:  6
Received: 3
Received: 4
Received: 5
Sending:  7
Sending:  8
Sending:  9
Received: 6
Received: 7
Received: 8
Received: 9
Fertig!

Beispiel 1 Unbuffered Channel:

$channel = new \parallel\Channel();
$runtime = new \parallel\Runtime();
$future = $runtime->run(
    function (\parallel\Channel $channel, int $count) {
        for ($i = 0; $i < $count; $i++) {
            printf('Sending:  %s' . PHP_EOL, $i);
            $channel->send($i);
        }
    },
    [$channel, 10]
);
echo 'Closure gestartet!' . PHP_EOL;
sleep(1);
echo 'Warte auf Ergebnis!' . PHP_EOL;
for ($i = 0; $i < 10; $i++) {
    printf('Received: %s' . PHP_EOL, $channel->recv());
}
echo 'Fertig!';

Hier sollte die Ausgabe exakt so aussehen:

Closure gestartet!
Sending:  0
Warte auf Ergebnis!
Received: 0
Sending:  1
Received: 1
Sending:  2
Received: 2
Sending:  3
Received: 3
Sending:  4
Received: 4
Sending:  5
Received: 5
Sending:  6
Received: 6
Sending:  7
Received: 7
Sending:  8
Received: 8
Sending:  9
Received: 9
Fertig!

Events - hier kommt einiges zusammen

Events sind dafür da, dass man seine Daten aus den Channels und/oder auch die Futures verwaltet abrufen kann.

Man fügt einem Events-Objekt Futures (public addFuture(string $name ,parallel\Future $future): void) oder Channels (public addChannel(parallel\Channel $channel): void) hinzu und kann diese dann per public poll(): ?Event abrufen.
Wenn ein \parallel\Events\Event zurück gegeben wird wird das Objekt, welches das Event getriggert hat aus dem Events-Objekt entfernt und muss ggf. neu hinzugefügt werden (bspw. im Falle von Channels).

Auch Events haben 2 Funktionsweisen. In diesem Fall einmal "blocking" und einmal "nonBlocking" dies kann per public setBlocking(bool $blocking): void auf dem Events-Objekt gesetzt werden.
Blocking true steht hierbei für die Variante, dass beim Aufruf von $events->poll() der Prozess geblockt wird bis ein Event verfügbar ist.
Die Zeit für das Blocking kann aber per public setTimeout(int $timeout): void in Microsekunden konfiguriert werden, heißt wir haben dadurch Kontrolle darüber dass die poll Methode nicht für immer blockt.

Wenn Blocking aber auf false gesetzt ist prüft die poll Methode ob ein Event vorhanden ist und gibt entweder das Event-Objekt oder null zurück.

Fazit

Je nach Anwendungsfall kann einem Multithreading mit Parallel einen großen Vorteil in der Verarbeitung bringen.
Bspw. beim Importieren von größeren Datenmassen über externe APIs. Man könnte jeden Request den man an die API macht Parallel abarbeiten.

Beim Abarbeiten von Requests vom Client benötigt man Multithreading in PHP eher weniger, da jeder Request von einem Client ein eigener PHP Prozess ist.
In manchen Fällen aber kann es auch von Vorteil sein, wenn man anhand des Requests eines Clients externe Daten anfordern muss und Parallel abarbeiten kann / muss.

Auf Github habe ich ein kleines Repository erstellt wo ich 3 einfache Beispiele für die Nutzung der Parallel Extension erstellt habe.

Vorteile von Multithreading

  • bei größeren Datenverarbeitungen oder bspw. bei Algorithmen die länger dauern und nach einander laufen aber eigentlich auch parallel laufen könnten bietet sich Multithreading gut an
  • man kann dadurch die volle Kraft der CPU entfalten
  • Prozesse laufen deutlich schneller

Nachteile von Multithreading

  • macht eine Anwendung komplexer bspw. durch Synchronisierung
  • Debugging könnte erschwert werden
Ismail Özgün Turan am

Kommentare 0