Fast Data – Blogserie

Dies ist der vierte und letzte Teil der Blogserie „Aufbau einer Fast Data Analytics Plattform“ zum Thema Realtime Prediction. Im ersten Teil „Bereitstellung einer Fast Data Analytics Plattform“ sind wir in die Rolle eines Taxiunternehmens geschlüpft, welches Taxi-Daten in Echtzeit sammeln und diese auf einem Realtime-Dashboard visualisieren möchte. Im zweiten Teil Realtime Datenverarbeitung mit Spark Structured Streaming wurde die Integration der Taxi-Daten in Echtzeit durch die Bereitstellung eines Streaming Hubs ermöglicht. Der dritte Teil Realtime Datenverarbeitung mit Spark Structured Streaming” zeigt die Datenverarbeitung und Bereitstellung der Taxi-Daten durch den Processing Hub bzw. Serving Hub.

Damit ist unsere Fast Data Analytics Plattform vollständig aufgebaut und bietet eine optimale Basis für Data Science und KI-Analysen. Doch bevor wir tiefer in mögliche Analysen einsteigen, betrachten wir zuvor noch, welche Ausgangsfrage mit Data Science beantwortet werden soll. Unsere Taxifachabteilung konnte feststellen, dass in einigen Stadtteilen zu gewissen Zeiten zu wenige Taxis zur Verfügung stehen. Um die Taxis zukünftig besser koordinieren zu können, müssen die anfallenden Fahrten in einem bestimmten Stadtteil bestimmt werden. Das Ziel ist es somit mit einem geeigneten Machine Learning (ML) Algorithmus, die Anzahl der Taxifahrten für jede Taxizone in New York pro Stunde vorherzusagen und in ein Dashboard zu übertragen.

Jedes Data Science-Projekt, so wie auch dieses Projekt, entsteht bei der MT AG nach dem CRISP-DM Vorgehensmodell. Bestehend aus den Phasen Business Understanding, Data Understanding, Data Preparation, Modeling, Evaluation und Deployment bilden diese einen Zyklus. Im Folgenden betrachten wir die essenziellen Schritte dieses Vorgehens. Zur Bearbeitung und Analyse der Daten im Big Data Umfeld wurden Zeppelin Notebooks mit der Programmiersprache Python und dem Framework PySpark ML eingesetzt.

Datenexploration

Noch bevor ein erstes Machine Learning-Modell entwickelt wird, gilt es die zur Verfügung stehenden Daten besser zu verstehen. Das kann mithilfe einer Datenexploration erreicht werden. Die daraus resultierenden Ergebnisse führen häufig zu neuen Erkenntnissen und auch weiterführende Schritte, wie z.B. abgeleitete Features, können in dieser Phase entstehen.

Ein exploratives Beispiel für die kumulierte Anzahl der Fahrten in ganz New York sieht man in Abbildung 1.

Abbildung 1: Fahrten nach Wochentag
Abbildung 1: Fahrten nach Wochentag

Auf der x-Achse sind die Wochentage und auf der y-Achse die kumulierte Anzahl der Fahrten aufgetragen. Es wird deutlich, dass am Donnerstag und Freitag mit den meisten Fahrten zu rechnen ist. Die wenigsten Fahrten werden hingegen am Sonntag getätigt. Aber wie die folgende Abbildung 2 zeigt, ist nicht nur der Wochentag, sondern auch die Uhrzeit entscheidend.

Abbildung 2: Fahrten nach Tageszeit pro Wochentag
Abbildung 2: Fahrten nach Tageszeit pro Wochentag

Wir sehen auf der x-Achse die Uhrzeiten und die Anzahl der Fahrten auf der y-Achse, wieder für ganz New York. Gruppiert ist die Grafik zusätzlich noch nach Wochentag (farblich markiert siehe Abbildung 2). Zu erkennen ist, dass zwischen 18 und 19 Uhr mit den meisten Taxifahrten zu rechnen ist. Das gilt sowohl für das Wochenende als auch für die Wochentage. Der typische Feierabendverkehr ist damit auch in New York zu beobachten. Anders verhält es sich nachts in der Zeit von 0 bis 6 Uhr. Unter der Woche verzeichnen wir nur wenige Fahrten. Am Wochenende finden dagegen vermehrt Fahrten in diesem Zeitraum statt. Vermutlich handelt es sich dabei um Personen, die nach ihren Abendveranstaltungen den Heimweg antreten.

Ein weiterer Faktor, der die Anzahl der Fahrten beeinflusst, ist sicherlich der jeweilige Stadtteil, in dem die Taxifahrt beginnt bzw. endet. Um dieses Verhalten näher zu untersuchen wurde eine Kartenvisualisierung von NYC erstellt (vgl. Abbildung 3).

Abbildung 3: Kartenvisualisierung von NYC - (links) Zonen mit den meisten Pick-Ups (rechts) Zonen mit den meisten Drop-Offs
Abbildung 3 – Kartenvisualisierung von NYC – (links) Zonen mit den meisten Pick-Ups (rechts) Zonen mit den meisten Drop-Offs

Die Anzahl der Taxifahrten ist mithilfe einer Heat Map in den einzelnen Gebieten farblich gekennzeichnet. Auf den ersten Blick erkennt man, dass die meisten Fahrten im Stadtteil Mannhatten stattfinden [siehe blaue Umrandung]. Aber wieso finden in den anderen Stadtgebieten, bis auf einige Ausnahmen, kaum Fahrten statt? Das liegt daran, dass wir für dieses Szenario nur Yellow Taxis betrachten. Diese haben das Privileg in Mannhatten Passagiere befördern zu dürfen. Dieses nutzen sie auch primär, wie man den Daten entnehmen kann. Kommen wir zu den Stadtgebieten die rot umrandet sind und nicht zu Manhattan gehören. Diese stellen die drei Flughäfen von NYC (Newark, La Guardia und JFK [von links nach rechts]) dar und befördern ebenfalls viele Fahrgäste.

Wie wir durch die Datenexploration herausfinden konnten, haben die Uhrzeit, der Wochentag und der jeweilige Stadtteil erheblichen Einfluss auf die aufkommenden Taxifahrten. Daher werden wir die genannten Features auch in unser ML Modell mit einfließen lassen.

Modellierung

Kommen wir nun zur Erstellung des Machine Learning-Modells. Dazu setzen wir ein sogenanntes ML Pipeline Modell ein, welches im Spark MLlib Framework zu finden ist. Dabei handelt es sich um eine Standardisierung der Merkmalsverarbeitung und Modellerstellung, um verschiedene Verarbeitungsschritte in einer Pipeline zusammenzufassen und abzuspeichern. Das Konzept ist an das scikit-learn Framework angelehnt, und könnte einigen Leser bereits bekannt sein.

Wie das Framework und das Pipeline Modell anzuwenden sind, verdeutlichen wir in den nachfolgenden Code-Snippets bzw. Listings. Zunächst definieren wir alle Transformationen, die auf unseren Grunddaten vorgenommen werden müssen. Denn in der Regel kann ein ML Algorithmus nicht direkt die Daten verwerten, sondern braucht geeignete Datentypen.

1.	from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler  
2.	# Create list of columns that are indexed  
3.	indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df_agg) for column in list(set(df_agg.columns)-set(['day','weekday','month','year','hour','quarter','count']))]  
4.	# Create a vector column   
5.	assembler = VectorAssembler(inputCols =['day','weekday','month','year','hour','PULocationID_index','borough_index','quarter'], outputCol = "features")  
6.	# This class identifies categorical features, and index them.  
7.	featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=266)   

Listing 1: Definition Transformation

In Zeile 3 definieren wir eine Liste von „StringIndexer“. Die Methode codiert Spalten mit dem Datentyp String zu numerischen Indizes. Dadurch wird der Umgang mit kategorialen Variablen ermöglicht. Man beachte, dass es sich bei dem Aufruf, um eine List Comprehension in Python handelt. In Zeile 5 werden alle numerischen Columns innerhalb eines Vektors vereint und in die neue Spalte „features“ gespeichert. Die Zeile 7 identifiziert alle kategorialen Variablen in einem Vektor und speichert den neuen Vektor in Spalte „indexedFeatures“ ab. An dieser Stelle könnten sogar noch weitere Transformationen definiert werden, wie ein StandardScaler oder Bucketizer.

Der zweite Teil beschäftigt sich zunächst mit der Aufteilung unserer Daten in Trainings und Testdaten.

1.	from pyspark.ml import Pipeline  
2.	from pyspark.ml.regression import DecisionTreeRegressor  
3.	# Create training and test dataset  
4.	(trainingData, testData) = df_agg.randomSplit([0.7, 0.3])  
5.	# Configure a decision tree model 
6.	dt = DecisionTreeRegressor(labelCol="count", featuresCol="indexedFeatures", impurity="variance", maxDepth=5, maxBins=266)  
7.	# Build a pipeline  
8.	pipeline = Pipeline(stages=indexers + [assembler, featureIndexer, dt])  
9.	# Train a decision tree model.  
10.	model = pipeline.fit(trainingData)  

Listing 2: Training Pipeline Modell

Zufällig werden die Daten auf 70 % Trainingsdaten und 30 % Testdaten verteilt (Zeile 4). Die Trainingsdaten werden zum Lernen des ML Modells benötigt. Mit den Testdaten wird im Abschnitt Evaluation die Modellgüte überprüft und verglichen.

Nun kommen wir zu unserem ML Algorithmus. Nachfolgend zeigen wir das Vorgehen beispielsweise anhand eines Entscheidungsbaumes bzw. eines Decision Tree Regressors. (Zeile 6). Das Attribut „labelCount“ legt dabei die Zielvariable fest. Mit dem Attribut „featuresCol“ geben wir den Vektor an, den wir zuletzt mit den Transformationsschritten erzeugt haben. Alle weiteren Angaben in dieser Zeile beziehen sich auf die Konfiguration des Entscheidungsbaums, wie z. B. der maximalen Tiefe (maxDepth).

Nachdem nun alle notwendigen Schritte definiert wurden, kann das Pipeline Modell erstellt werden (Zeile 8). Das Attribut „Stages“ definiert dabei die verschiedenen Verarbeitungsschritte, die in der angegebenen Reihenfolge ausgeführt werden. In Zeile 10 stößt die Methode „fit(…)“ den Lernprozess mithilfe der Trainingsdaten an und gibt am Ende ein fertiges ML Model zurück. Der große Vorteil von Pipelining ist, dass wir Zeile 6 durch einen beliebigen anderen Regressionsalgorithmus ersetzen können, ohne die vorherigen Schritte anpassen zu müssen.

Evaluation

Dieser Abschnitt beschäftigt sich mit der Modellgüte. Um diese zu bewerten, gehen wir zuerst darauf ein, wie eine Fehlermetrik berechnet werden kann.

1.	from pyspark.ml.evaluation import RegressionEvaluator    
2.	# Evaluation Metrics
3.	predictions = model.transform(testData)  
4.	evaluator = RegressionEvaluator(labelCol="count", predictionCol="prediction", metricName="mae")  
5.	mae = evaluator.evaluate(predictions)  
6.	print "Mean Absolute Error (MAE) on test data = %g" % mae  

Listing 3: Auswertung Fehlermetrik

Dazu werden Vorhersagen (Realtime Predictions) für den Testdatensatz mit dem zuvor erstellten Modell erzeugt. Anschließend vergleichen wir die bekannte Anzahl der Taxifahrten aus dem Testdatensatz mit den Ergebnissen aus der Vorhersage. Dadurch können wir feststellen, wie gut unsere Vorhersage funktioniert bzw. wie stark sie vom tatsächlichen Wert abweicht. Solche Vorhersagen lassen mit der „transform(…)“ Methode´, wie in Zeile 3 realisieren. Das Ergebnis („predictions“) ist ein DataFrame mit den originalen Daten aus dem Testdatensatz, plus der Spalte „prediction“, die unsere Vorhersage beinhaltet. Die Klasse „RegressionEvaluator“ kann im Anschluss die gewünschte Fehlermetrik definieren (Zeile 4), berechnen (Zeile 5) und auch ausgeben (Zeile 6). Bei der Fehlermetrik handelt es sich um den Mean Absolute Error (MAE). Auch weitere Fehlermetriken, wie z. B. der MSE oder RMSE können auf ähnliche Weise berechnet werden.

Kommen wir nun zu den Ergebnissen, die unsere ML Modelle erzielen können. Natürlich haben wir nicht nur den Entscheidungsbaum aus dem vorherigen Beispiel (Abschnitt Modellierung) bewertet, sondern auch noch weitere Regressionsmodelle eingesetzt. So kamen auch der Random Forest und Gradient Boosted Trees Algorithmus zum Einsatz. Die berechnete Modellgüte ist in Abbildung 4 dargestellt.

Abbildung 4: Vergleich der Fehlermetriken: Mean Absolute Error (MAE)
Abbildung 4: Vergleich der Fehlermetriken: Mean Absolute Error (MAE)

Umso kleiner der MAE Wert ist, umso besser ist der Algorithmus einzuschätzen. In unserem Fall schneidet der Gradient Boosted Trees am besten von allen ML Modellen ab. Der berechnete MAE von 12.454 kann dabei wie folgt interpretiert werden: „Das GBT Modell verschätzt sich durchschnittlich um  12 Fahrten“. Wenn man berücksichtigt, dass für einige Stadtteile über 600 Fahrten pro Stunde anfallen, ist dieser Fehler durchaus akzeptabel.

Deployment

Nach der Fertigstellung des ML Modells und der Bewertung, gilt es noch das Modell in eine produktive Umgebung zu bringen.

1.	# Save pipeline model 
2.	model.save("...")  
3.	# Load pipeline model 
4.	from pyspark.ml import PipelineModel  
5.	pip_model = PipelineModel.load("...")  

Listing 4: Pipeline Modell sichern und laden

Dazu speichern wir das Modell, wie in Zeile 2 gezeigt, im Storage HUB ab. In der neuen Entwicklungsumgebung, wie der Spark Structured Streaming Applikation im Proccessing Hub (siehe Blogartikel Teil 3), kann das Pipeline Modell problemlos reingeladen und direkt angewendet werden. Die Spark Applikation kann dabei sogar in einer anderen Programmiersprache, wie Java oder Skala geschrieben sein. Der Aufruf des Modells erfolgt dabei analog zu Zeile 5. Die Klasse „PipelineModel“ beinhaltet die bereits bekannte Methode „transform(…)“ , um neue Datensätze vorherzusagen.

Damit haben wir erfolgreich ein Prediction Modell zur Realtime Vorhersage eingebunden, welches bestimmt wie viele Taxifahrten in den jeweiligen Bezirken von New York anfallen werden. Diese neuen Werte können wir mittels Power BI zusätzlich zu den anderen Daten in einem Dashboard integrieren.

Fazit

Mit dem entwickelten ML Modell für Fast Data Analytics können wir zuverlässig vorhersagen, wie viele Taxis zu einem bestimmten Zeitpunkt gebraucht werden. Erreichbar ist unser Ergebnis durch den Einsatz eines Gradient Boosted Trees Algorithmus. Vor allem durch die Datenexploration konnten wichtige Eigenschaften identifiziert werden, die unser Modell erheblich verbessern. Der Einsatz eines Pipeline Modell erlaubt einen schnellen und einfachen Einsatz in der Produktion und ist ideal für eine Realtime Predictions geeignet.



Fast Data – Blogserie Teil 1:

Fast Data – Blogserie Teil 2:

Fast Data – Blogserie Teil 3:

Kostenlose Downloads rund um das Thema IT und Digitalisierung

Keine Kommentare vorhanden.

    Schreibe einen Kommentar

    Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.