Queue-System

Einleitung

Um unsere Webserver zu entlasten und die Dauer eines Requests für den Kunden so gering wie möglich zu halten, setzen wir bei myposter ein Queue-System ein. Dabei werden bestimmte rechenintensive Teilaufgaben, z.B. die Bildberechnungen, zur Abarbeitung an eigens dafür bereitgestellte Server delegiert und dort ausgeführt. Das Ergebnis der Berechnungen wird über einen Rückkanal bereitgestellt und steht dem aufrufenden Prozess dann wieder zur Verfügung.

Das Prinzip des Message-Queue-Systems ist ein etablierter Standard. Es gibt eine Reihe an Implementierungen dafür, auch das Protokoll, in welchem die Aufträge übertragen werden, ist dabei standardisiert. Wir haben uns für RabbitMQ als Queue-Server entschieden, dieser implementiert nach dem AMQP-Standard ein vollwertiges Queue-System.

 

Komponenten

Eines der Grundmerkmale eines Queue-Systems ist der Ansatz der verteilten Komponenten. Weder Applikation noch Queue-Server noch die für die Abarbeitung der Jobs zuständigen Worker müssen sich auf den selben Servern befinden.

Applikation

Grundlegend ist mit Applikation unsere myposter-Anwendung gemeint, also die Webanwendung, die z.B. den Shop, das CMS, die Administration und die Produktion umfasst. Diese liegen auf mehreren Webservern verteilt. Hier fallen die abzuarbeitenden Jobs an – und werden von der Applikation über die Service-Klassen an den Queue-Server geschickt. Die Jobs sind generell in Queues organisiert – auf jeder Queue registrieren sich die Publisher und Worker-seitig die Subscriber.

Service-Klassen

Die Service-Klassen sind der Teil der Applikation, die die Schnittstellen zum Queue-Server zur Verfügung stellen. Über diese Klassen werden Queue-Jobs abgesetzt – sie dienen also als Schnittstelle zwischen der Applikation und dem Queue-Server. Als Zwischenschicht dient hierbei der Publisher. In ihm ist eine Persistenzschicht als Fallback eingebunden, welche die Messages, sollten sie nicht an den Queue-Server übermittelt werden können, lokal in der Applikationsdatenbank ablegt werden. Über einen regelmäßig ausgeführten Hintergrund-Service werden diese Messages dann wiederum erneut an den Queue-Server geschickt. Eine weitere Absicherung stellt das folgende, von uns über das Strategy-Pattern implementierte, Verhalten dar: ist der Queue-Server nicht verfügbar (Server down, Timeouts usw.) so werden die Jobs sofort über eine Lokale Instanz der Worker-Klassen ausgeführt – hier ist dann allerdings keine parallele Ausführung im Hintergrund möglich.

Queue-Server

Wir haben mehrere eigenständige Server, auf denen der Queue-Server läuft. Dieser nimmt die Job-Aufträge entgegen, persistiert sie und stellt sie den wartenden Workern zur Verfügung. Mehrere Queue-Server laufen hier nach Master-Slave-Prinzip um eine maximale Ausfallsicherheit zu garantieren. Die Jobs werden in Queues organisiert – der Queue-Server übernimmt hierbei die Verteilung der Jobs in die Queues sowie die Priorisierung der Queues.

Worker

Worker sind ständig laufende Prozesse, die auf mehreren eigenständigen Rechnern laufen, anstehende Jobs entgegennehmen und zur Abarbeitung an einen Teil der Applikation weitergeben – oder direkt an Maschinen der Produktion delegieren, z.B. Verpackungsmaschinen. Hierbei laufen die Worker als langlaufende PHP-Prozesse und warten als Subscriber auf einer Socketverbindung auf einen neuen Auftrag durch den Queue-Server. Jobs werden in Queues organisiert, Worker registrieren sich über den Subscriber auf eine bestimmte Queue. Pro Queue kann dabei eine beliebige Anzahl an Workern bereitstehen. Bei Erreichen der Maximallaufzeit werden die Worker beendet und durch einen System-Service (SuperVisor) neu gestartet. Das Gleiche gilt bei Abbruch der Worker durch einen Fehler. Von einem Worker-Typen sind stets mehrere Instanzen gestartet, die alle gleichzeitig und parallel die anstehenden Jobs abarbeiten. Durch eine Änderung der Anzahl dieser Worker-Instanzen ist eine Skalierung je nach Auftragslage und Typ des Jobs möglich.

 

Verwendung bei uns

Nachfolgend sind einige Bespiele zur Verwendung des Queue-Systems bei uns:

  • Erzeugung von Vorschaubildern aus den vom Kunden hochgeladenen Bilder
  • Erzeugung von Bildern aus den vom Kunden erstellten Collagen
  • Generierung der Rechnungsnummern zur Sicherstellung, dass nur ein Prozess zu einem bestimmten Zeitpunkt läuft
  • Rendern aller Bilder für das CMS über verschiedene Renderer in verschiedenen Auflösungen, Größen und Formaten
  • Abschließen von Bestellungen und Erstellen von allen Daten für die Produktion
  • Senden von Aufträgen an die automatische Säge in der Schreinerei
  • Senden von Aufträgen an die Verpackungsmaschinen
  • Abarbeitung von langlaufenden Aufgaben im Hintergrund ohne zeitliche Beschränkung

 

Verwendete Software / Libraries

 

Vor- und Nachteile

Das Queue-System bietet viele Vorteile, bringt aber auch ein paar Nachteile mit sich.

Vorteile

  • Entlastung der Webserver, die die direkten Requests der Kunden bearbeiten
  • Asynchrones Erstellen von Jobs ist möglich (Fire & Forget), wenn die Teilaufgaben dies erlauben und die Applikation nicht in Echtzeit am Ergebnis interessiert ist
  • Parallelisierung von Teilaufgaben und dadurch insgesamt schnellere Abarbeitung
  • Priorisiertes Abarbeiten der Teilaufgaben, z.B. nach Typ der Aufgabe
  • Schnellere Abarbeitung der Requests durch Auslagerung rechenintensiver Teilaufgaben
  • Performantere Abarbeitung der ausgelagerten Teilaufgaben durch spezialisierte Server
  • Skalierung ist jederzeit möglich durch einfaches Starten weiterer Worker-Instanzen, wodurch sogar dynamisches Skalieren möglich ist
  • Lose Kopplung von Applikation und Workern – Worker haben eine klar definierte Abhängigkeit zur Applikation
  • Etablierte Standards sowohl für Datenaustauschformate (JSON), Nachrichtenformate (AMQP) als auch Design-Patterns (Publisher / Subscriber) werden verwendet

 

Nachteile

  • Erhöhter Implementierungsaufwand
  • Hinzukommen weiterer Abstraktionsschichten
  • Debuggen ist weitaus komplizierter
  • Unnötiger Overhead bei Jobs, die sich sehr schnell abarbeiten lassen
  • Mehr Aufwand zur Sicherstellung der Ausfallsicherheit des Systems

 

Architektur

Übersicht

Nachfolgend ist ein einfacher schematischer Aufbau ohne Fallback-Persistenz und -Strategie, ohne Retry-Logik sowie nur für synchrone Messages:

Applikation

Nachfolgend ist ein detaillierterer Aufbau der Applikation inklusive Fallback-Persistenz und -Strategie:

Queue-Server

Nachfolgend ist der relativ einfache Aufbau der Queue-Server:

Worker

Nachfolgend ist der detaillierte Aufbau der Worker, inklusive Retry-Logik:

Ausfallsicherheit, Replikation und Persistenz

Um die Abarbeitung der Aufträge zu garantieren, sind alle Server-Komponenten redundant ausgelegt. Weiterhin gibt es mehrere Mechanismen, die verhindern, dass Queue-Messages verlorengehen:

  • Bei nicht verfügbarem Queue-Server (Queue-Server offline, Netzwerkfehler usw.) greift die Lokale-Strategie, d.h. der Job wird an eine lokale Instanz der Worker delegiert, welche diesen dann im Kontext des aktuellen Requests abarbeiten (keine Parallelisierung, kein Abarbeiten im Hintergrund) – hier wird also der Job auf dem Webserver selbst abgearbeitet
  • Die Queue-Server sind redundant nach dem Master-Slave-Prinzip ausgelegt – einer der Server übernimmt den Job des Masters, bei einem Ausfall springt ein anderer Server ein. Eigentlich ein Master-Master-Prinzip ohne Daten-Replikation, dafür mit einer gemeinsamen Daten-Basis
  • Die Queue-Messages werden auf der Seite des Queue-Servers persistiert, dabei greifen alle Queue-Server auf die gleiche Datenbasis zu. Bei einem Restart, Abbruch oder Master-Switch eines Queue-Servers gehen so keine Messages verloren
  • Bei Verfügbarkeit des Queue-Servers, aber trotzdem auftretenden Fehlern beim Absetzen der Queue-Messages, z.B. bei einem kurzzeitigen Netzwerkausfall, werden die Messages auf dem Webserver in die Datenbank persistiert und über einen Cronjob verzögert noch einmal abgesetzt
  • Von allen Workern laufen mehrere Instanzen verteilt auf mehreren Servern, so wird erstens die Parallelisierung erreicht, zweitens die Skalierung und drittens die Ausfallsicherheit in Bezug auf die Worker-Server. Pro Queue laufen auf den Worker-Servern eine bestimmte Anzahl an Workern
  • Die Anzahl an pro Queue ständig laufender Worker wird auf einem Server durch den SuperVisor (System-Service) sichergestellt. Beendet sich ein Worker, z.B. durch einen Fehler, eine Exception oder durch Erreichen der maximalen Laufzeit, so wird sofort eine neue Instanz gestartet
  • Empfängt ein laufender Worker von außen ein Signal zur Beendigung (SIGTERM), so beendet er seinen aktuellen Job sauber und beendet sich erst dann – diese Signale werden unter anderem beim Deployment geschickt – um einen sauberen Neustart der Worker zu erreichen – ohne dass aktuell abgearbeitete Jobs abgebrochen werden.

 

Asynchrone, synchrone und exklusive Jobs

Je nach Anwendungsfall können Jobs durch die Applikation asynchron, synchron oder exklusiv erstellt werden.

Asynchrone Jobs

Das Ergebnis des abgearbeiteten Jobs spielt für den aktuellen Request keine Rolle, oder die Abarbeitungszeit des Jobs ist zu lang für den aktuellen Request, oder der Zeitpunkt der Ausführung ist unwichtig. Hier wird im Endeffekt nur der Auftrag abgesetzt ohne auf das Ergebnis zu warten (Fire & Forget). Der Queue-Server dient zum Auslagern des Jobs in den Hintergrund. Die Queue (Channel, auf welchem die Publisher Nachrichten schicken und die Subscriber Nachrichten abholen) ist hier nur in eine Richtung aktiv. Die Worker bzw. die Job-ausführenden Services sichern die Ergebnisse selbst, z.B. in die Datenbank, als E-Mail oder in das Filesystem.

Beispiele:

  • Langlaufende statistische Auswertungen
  • Berechnung der Bilder für das CMS. Hier werden extrem viele Jobs generiert, die Laufzeit kann dabei mehrere Stunden erreichen

 

Synchrone Jobs

Das Ergebnis des abgearbeiteten Jobs ist für den aktuellen Request relevant, hier dient der Queue-Server zur Auslagerung und Parallelisierung des Jobs. Die Queue (Channel, auf welchem die Publisher Nachrichten schicken und die Subscriber Nachrichten abholen) ist auch hier nur in eine Richtung aktiv – für den Rückkanal wird eine temporäre Queue erstellt, welche nur für diesen einen Job existiert. Die Worker bzw. die Job-ausführenden Services liefern das Ergebnis an den aufrufenden Service zurück. Das Ergebnis wird hierbei als JSON-serialisierte Payload der Queue-Message zurückgegeben.

Beispiele:

  • Kurz laufende Jobs, die für den Request relevant sind, z.B. die Erzeugung von Vorschaubildern aus den von Kunden hochgeladenen Bildern

 

Exklusive Jobs

Darf ein Job nicht parallel, sondern nur exakt einmal gleichzeitig ausgeführt werden, so kann der Worker bzw. die Queue als exklusiv konfiguriert werden. Das bewirkt, dass sich auf alle Worker auf allen Worker-Servern übergreifend bezogen nur exakt ein einziger Worker auf die Queue binden und damit anstehende Jobs ausführen kann. Das macht bei Zugriffen auf Ressourcen Sinn, auf die zu einer Zeit nur einmal zugegriffen werden kann bzw. darf, z.B. bei Locks, dem Filesystem und Datenbank-Locks. Die Queue (Channel) ist auch hier nur in eine Richtung aktiv, lässt aber nur exakt einen Subscriber zu. Bei synchronen Jobs wird für den Rückkanal eine zweite, temporäre Queue genutzt. Exklusive Jobs können sowohl synchron als auch asynchron umgesetzt werden.

Beispiele:

  • Generierung der Rechnungsnummern um Rechnungsnummern nicht doppelt zu vergeben

 

Persistenz und Fallbacks

Queue-Messages werden je nach auftretenden Fehlerfall an mehreren Stellen im System persistiert.

Persistenz in der Applikation

Der Fall tritt ein, wenn obwohl der Queue-Server erreichbar ist, die Queue-Message nicht übertragen werden konnte. Sie wird daraufhin in der Applikation in der lokalen Datenbank persistiert und durch einen Cronjob später erneut an den Queue-Server übertragen. Die Queue-Message wird hierbei serialisiert in der Datenbank abgelegt.

Persistenz in der Queue

Konnte die Queue-Message erfolgreich an den Queue-Server übermittelt werden, so wird sie dort vom Queue-Server in eine verteilte Datenbank persistiert. Damit stehen Messages, welche noch nicht abgearbeitet worden sind, auch nach einem Neustart, Absturz oder Wechsel des Masters zur Verfügung und gehen nicht verloren.

Fallbacks

Ist der Queue-Server offline oder der Queue-Server konnte nicht erreicht werden, so greift die Lokale-Strategie. Der Job wird von einer lokalen Instanz des Workers im aktuellen Kontext des Requests ausgeführt.

 

Exception-Handling und Retry-Logik

In jedem Worker ist ein eigener Exception-Handler implementiert, welcher bei auftretenden Exceptions, die nicht explizit abgefangen wurden, je nach Konfiguration den aktuellen Job zur erneuten Ausführung wieder in die Queue zurück schreibt. Damit kann der Job unter Umständen doch noch abgeschlossen werden.

Allerdings macht diese Retry-Lösung nur untern bestimmten Umständen Sinn:

  • Der Job wurde zur asynchronen Abarbeitung übergeben, das Ergebnis ist für den aktuellen Request also nicht wichtig
  • Die aufgetretene Exception tritt bei erneuter Ausführung des Jobs nicht noch mal auf, sonst könnte es zu einer Endlosschleife kommen
  • Ein erneutes Ausführen des Jobs macht vom Ergebnis her Sinn

 

Signal-Handling und Beendigung

Wird ein Worker von außen beendet, der Prozess gekillt oder das System heruntergefahren, also eine Beendigung vom System veranlasst ohne dass der Worker seine maximale Laufzeit erreicht hat, so würde normalerweise der momentan abgearbeitete Job unterbrochen werden. Dadurch können inkonsistente Daten entstehen, wenn diese z.B. zu einem gewissen Teil schon persistiert worden sind. Um dieses Problem zu verhindern, fangen die Worker solche Signale ab, und reagieren entsprechend darauf. Bei einem SIGTERM-Signal, welches zum Beispiel im Rahmen des Deployments das Beenden der Worker veranlasst, wird der aktuelle Job noch sauber abgearbeitet und erst dann beendet. Bei einem SIGKILL-Signal (sofortiger Kill des Prozesses) hingegen, beendet sich der Worker sofort.

 

Services

Services stellen in unserer Applikation die Schnittstelle zu applikationsfremden Komponenten her. In Bezug zum Queue-Server werden hier die Publisher instantiiert, über welche man die Jobs der entsprechenden Queue an den Queue-Server übermittelt. Gleichzeitig dienen die Services auf der Gegenseite, also den Workern, wieder als Einstiegspunkt vom Queue-Server zur lokalen Applikation. Eine Queue-Message durchläuft die Service-Klassen also zweimal. Abhängig von der Strategie wird dabei die Message entweder an den Queue-Server übermittelt (Remote-Strategie, innerhalb des Request-Kontextes) oder an den Service der lokalen Applikation zur Abarbeitung delegiert (Lokale-Strategie, CLI-Kontext).

 

Worker

Die Worker sind bei uns als Symfony-Console-Commands umgesetzt und werden über den System-Service SuperVisor gestartet. Dabei wird durch den Worker das Laufzeitverhalten und das Exception-Handling für die eigentliche abarbeitende Komponente gesteuert.

Konfigurierbares Laufzeitverhalten:

  • maxExecutionTime: maximale Laufzeit, ehe sich ein Worker beendet – unabhängig davon, ob er in dieser Zeit überhaupt einen Job zur Abarbeitung bekommen hat oder nicht
  • runOnce: maximale Anzahl an Jobs die ein Worker abarbeiten soll – ehe er sich beendet
  • Exklusive Ausführung: steuert, ob sich nur exakt ein einziger Worker als Subscriber auf die Queue hängen kann
  • republishMessageOnException: steuert, ob beim Auftreten einer nicht explizit abgefangenen Exception der aktuelle Job erneut in den Queue-Server eingespeist werden soll

 

Da die Worker bei uns als Symfony-Console-Command umgesetzt sind, durchlaufen sie das Standard-Bootstrapping der Symfony-Console-Commands. Im Endeffekt stehen damit alle Bootstrapping-Ressourcen außer den Sessions zur Verfügung.

 

Strategien

Wie im Bereich „Services“ schon beschrieben, wird eine Service-Klasse mehrfach durchlaufen und entscheidet durch das Ergebnis über das Ziel der Queue-Message.

Remote-Strategie

Wenn das Queue-System verfügbar ist, wird über einen Service eine Connection zum Queue-Server erstellt. Die Instanz des Services dient dann als Publisher und die durch die Service-Klasse konfigurierte Queue ist im Queue-Server verfügbar.

Lokale-Strategie

Die Lokale-Strategie trifft zu, wenn einer der im Bereich „Remote-Strategie“ aufgeführten Faktoren nicht zu trifft. Wenn also die aktuelle Strategie zuerst remote ist, aber der aktuelle Job nicht erfolgreich abgearbeitet werden konnte, z.B. das Exception-Handling und die Retry-Logik ist ausgeschaltet, oder die Exception wurde explizit gefangen, wird die Lokale-Strategie verwendet.

 

Mögliche Schwierigkeiten

Sessions

Normalerweise werden Jobs von der Applikation innerhalb eines Requests im Web-Kontext abgesetzt – damit hat dieser Request eine Session. Wird der Job aber durch den Queue-Server auf dem Worker-Server ausgeführt, fehlt dieser Session-Kontext, was in der Programmierung berücksichtigt werden muss.

Web-Kontext vs. CLI

Wie oben erwähnt, erfolgt das Absetzen des Jobs im Web-Kontext der Applikation – damit hat man Zugriff auf den aktuellen Request inklusive der damit verbundenen Informationen wie Session, Headers, Cookies usw. Zur Zeit der Ausführung des Jobs auf den Worker-Servern stehen diese Informationen nicht mehr zur Verfügung, da weder Request- noch Response-Objekt im CLI-Kontext existieren.

Caches

Da die Jobs auf einem anderen Worker-Server abgearbeitet werden, ist bei der Benutzung von Caches, z.B. Redis-Caches, APC-Caches, Filesystem-Caches auf einiges zu achten:

  • Der APC-Cache ist lokal und bezieht sich nur auf die aktuelle Server-Instanz, er unterscheidet sich also vom Cache des Job-aufrufenden Web-Servers
  • Durch den Wechsel des Server-Kontextes befindet man sich auch in einem anderen Filesystem – Zugriffe auf lokale Dateien und Temp-Verzeichnisse enden also in einem anderen Filesystem als beim aufrufenden Web-Server. Eine mögliche Lösung ist die Verwendung von Netzwerkdateisystemen

 

PHP-Prozess und der Debugger

Der Wechsel des Server-Kontextes bringt auch einen Wechsel des aktuellen PHP-Prozesses mit sich. Dadurch wird ein Debuggen (PHP-Storm, XDebug) des Workers extrem erschwert. Hier empfiehlt es sich, während des Implementierens auf die Lokale-Strategie zu wechseln und den Worker damit im aktuellen PHP-Prozess auszuführen.

Wechsel der Applikation

Da zwischen Job-Erzeugung in der Applikation und Job-Abarbeitung ein Wechsel der Server und damit ein Wechsel der Applikation und des PHP-Prozesses stattfinden kann, gibt es unter Umständen Unterschiede, die es zu beachten gilt:

  • Unterschiede in der Konfiguration der lokalen Applikation
  • Unterschiedliche Cache-Zustände, wenn es sich um lokale Caches handelt
  • Andere Zustände der UnitOfWork des Entity-Managers

 

Rückgabe von Objekten und Entities

Ebenso wie die Payload der Queue-Message vom Applikations-Server an den Worker-Server, so werden auch die Responses der Worker-Server zurück an die Applikation als serialisierte Daten übertragen. Damit ließen sich zwar rein theoretisch auch Objekte und Entities übermitteln, jedoch sind diese dem lokalen Entity-Manager unbekannt und müssten vor Verwendung per Refresh in die UnitOfWork des Entity-Managers gemerged werden. Besser und auch einfacher ist es, Entities nicht als Objekte sondern nur deren ID zurückzugeben und applikationsseitig das Entity wieder aus der Datenbank zu laden.