CockroachDB

GOncurrency – Nebenläufigkeit in Go

Einleitung

Bereits vor einigen Wochen habe ich einen Beitrag über den Einstieg in die Programmiersprache Go verfasst. In diesem Beitrag möchte ich daran anknüpfen und die Nebenläufigkeit in Go etwas genauer unter die Lupe nehmen. Außerdem möchte ich ein paar Beispiele zeigen wie die Konzepte genutzt werden können.

Eines der Ziele von Go ist, Nebenläufigkeit in die Syntax der Sprache zu integrieren. Das Starten und Synchronisieren von nebenläufigen Prozessen soll möglichst einfach sein. Gleichzeit sollen diese Prozesse problemlos miteinander kommunizieren können.

In Go wird Nebenläufigkeit durch “Goroutinen” umgesetzt. Diese sind leichtgewichtige Threads. Das Synchronisieren erfolgt über “WaitGroups” oder Channels, wobei letztere auch zur Kommunikation zwischen den Goroutinen genutzt werden.

Hinweis

In diesem Beitrag werden keine Variablen mit ihren explizit Typen deklariert. Stattdessen werden short variable declarations verwendet um den Code übersichtlich zu halten.

Grundlagen der Nebenläufigkeit

Nebenläufigkeit lässt sich in Go sehr einfach abbilden. Wenn eine Funktion mit dem Keyword go aufgerufen wird, wird sie nebenläufig ausgeführt. Ein sehr einfaches Beispiel könnte so aussehen:

func main() {
     // Das Keyword "go" zeigt an, dass die Funktion als Goroutine gestartet wird.
     go greeterFunction("world")

// Goroutines können auch von anonymen Funktionen aus gestartet werden.
go func() {
    time.Sleep(time.Millisecond*500)
    fmt.Print("foo bar\n")
} ()

// Ruhezustand bis die Goroutines fertig sind.
time.Sleep(time.Millisecond*2000) 
}

func greeterFunction(name string) {
     time.Sleep(time.Millisecond*1000)
     fmt.Printf("hello %s\n", name)
 }

Bei diesem Beispiel gibt es noch Verbesserungspotenzial. Das Programm würde sich schließen sobald der Haupt-Thread (die “main”-Funktion) durchgelaufen ist. Es würde keine Rücksicht darauf genommen werden, dass noch Goroutinen gestartet sind. Ein Ruhezustand (“warten”) ist in der Realität keine geeignete Lösung, da die Laufzeit der nebenläufigen Methode eigentlich nie bekannt ist. Es muss also die Möglichkeit geben, Goroutinen zu synchronisieren.

Synchronisieren von Goroutinen

Synchronisieren mithilfe von “WaitGroups”

Eine Möglichkeit sind sog. “WaitGroups”. Diese sind einfache Zähler, die die Anzahl ausgeführter Goroutinen zählen. “WaitGroups” eignen sich besonders dann, wenn die nebenläufige Operation kein Ergebnis zurückgeben soll. Sie erlauben das Zusammenfassen von Goroutinen in Gruppen. Anschließend kann darauf gewartet werden, dass alle Goroutinen einer Gruppe ihre Arbeit beendet haben und der Zähler somit auf 0 fällt.

// Erstellen einer neuen WaitGroup
var waitGroup sync.WaitGroup

func main() {
    // Erstellen einer WaitGroup, erhöhe Anzahl aktiver Worker um 1
    waitGroup.Add(1)

    // Starten der Goroutine
    go greeterWaitgroupFunction("world")

    // Warten bis der Zähler wieder auf 0 zurückgesetzt wird
    waitGroup.Wait()
}

func greeterWaitgroupFunction(name string) {
    time.Sleep(time.Millisecond*1000)
    fmt.Printf("hello %s\n", name)

    // Zähler in der WaitGroup um 1 reduzieren
    waitGroup.Done()
}

Statt einem stupiden “Warte x Sekunden” wird die main-Funktion durch das Aufrufen der Wait-Methode solange geblockt, bis der Zähler in der WaitGroup 0 erreicht.

Synchronisieren mithilfe von Channels

Neben den einfachen “WaitGroups” kann es in der Praxis auch den Fall geben, dass das Ergebnis einer Goroutine wichtig für die Arbeit ist. Zum Beispiel könnte am Anfang einer Funktion ein SQL-Query in einer separaten Goroutine gestartet werden, um an einem späteren Zeitpunkt auf das Ergebnis zuzugreifen. Go bietet dafür sog. Channels. Diese sind reservierte Speicherbereiche die zum schnellen Austausch von Daten zwischen Goroutinen verwendet werden können.

func main() {
    // Erstellen eines neuen Channels für den Datentyp "string"
    nameChannel := make(chan string)

    // Starten der Goroutine, Weitergabe unseres Channels zum Erhalten der Antwort
    go greeterChannelFunction("world", nameChannel)

    // Blockt solange, bis ein String aus dem Channel empfangen wird.
    // Schreibt den empfangenen String in die Variable "name"
    name := <- nameChannel

    // Schließen des Channels, ausgeben des Namens
    close(nameChannel)
    fmt.Printf("hello %s\n", name)
}

func greeterChannelFunction(name string, nameChannel chan string) {
    time.Sleep(time.Millisecond*1000)

    // Senden des Namens aus den Funktionsparametern über den mitgegebenen Channel
    nameChannel <- name
}

In dem Beispiel wurde ein sog. “unbuffered”-Channel (dt. “ungepufferter Kanal”) benutzt um zwei Goroutinen zu synchronisieren. Bei einem unbuffered-Channel wird die sendende Goroutine geblockt bis die Nachricht aus dem Channel konsumiert wurde. Bei einem sog. “buffered”-Channel (dt. “gepufferter Kanal”) kann eine gewisse Anzahl an Elementen in den Channel gesendet werden (buffersize), bis die sendende Goroutine geblockt werden würde. In der offiziellen Dokumentation sind weitere Informationen zu finden.

Lauschen auf mehreren Channels

Manchmal ist es notwendig auf mehreren Channels zu lauschen und beim Empfangen von Daten auf den jeweiligen Channels gewisse Aktionen auszuführen. Go bietet dafür sog. select-Statements. Diese blocken die Goroutine so lange, bis einer der angebenen Channels eine Nachricht sendet.

func main() {
    // Erstellen der Channels  für die jeweiligen Namen
    aliceChannel := make(chan string)
    bobChannel := make(chan string)
    eveChannel := make(chan string)

    // Starten von 3 Goroutinen die eine zufällige Zeit warten, bevor sie ihren Namen in einen Channel schreiben
    go greeterSelectFunction("alice", aliceChannel)
    go greeterSelectFunction("bob", bobChannel)
    go greeterSelectFunction("eve", eveChannel)

    // Warten auf die erste Antwort aus einem der Channels
    fmt.Println("Waiting for first returning hello ...")
    select {
        case alice := <-aliceChannel:
            fmt.Printf("hello %s\n", alice)
        case bob := <-bobChannel:
            fmt.Printf("hello %s\n", bob)
        case eve := <-eveChannel:
            fmt.Printf("hello %s\n", eve)
    }

    // Schließen der Channels
    close(aliceChannel)
    close(bobChannel)
    close(eveChannel)
}

func greeterSelectFunction(name string, nameChannel chan string) {
    // Eine zufällige Zeit warten
    time.Sleep(time.Second*time.Duration(rand.Intn(5)))

    // Senden des Namens aus den Funktionsparametern über den mitgegebenen Channel
    nameChannel <- name
}

Arbeit verteilen mit dem Worker-Pattern

Das Worker-Pattern ist weit verbreitet. Durch die vorgestellten Features der Sprache ist die Umsetzung in Go sehr einfach. In diesem Beispiel sollen Goroutinen für das nebenläufige Verarbeiten von Aufträgen gestartet werden. Um das Pattern zu verstehen, muss man wissen, dass die Go-Runtime die Goroutinen gleichmäßig auf die vorhandenen CPU-Threads verteilt. Dies ist eine ideale Vorraussetzung für das Worker-Pattern.
In dem Beispiel werden Channels als Input bzw. Queue für die anstehenden Aufgaben (“Jobs”) verwendet. Außerdem werden “WaitGroups” benutzt, damit sich die main-Funktion nicht schließt solange mindestens einer der Worker-Threads noch arbeitet.

var workerWG sync.WaitGroup

func main() {
    // Erstellen eines Channels der die gesamte "Arbeit" beinhaltet
    jobChannel := make(chan int)

    // Starte doppelt soviele Go-Routinen wie Threads vorhanden sind
    // Für jeden Thread wird der Zähler der Worker-Waitgroup hochgezählt
    for i := 0; i < runtime.NumCPU()*2; i++ {
        workerWG.Add(1)
        go workerRoutine(i, jobChannel)
    }

    // Senden der ersten 100 Zahlen an die Worker
    for i := 0; i < 100; i++ {
        jobChannel <- i
    }

    // Channel schließen und warten bis alle Routinen ihre Arbeit verrichtet haben
    close(jobChannel)
    workerWG.Wait()
}

func workerRoutine(index int, channel chan int) {
    // So lange der Channel geöffnet ist wird auf Arbeitsaufträge gelauscht
    for number := range channel {
        // Eine zufällige Zeit warten, um etwas Workload zu simulieren
        time.Sleep(time.Millisecond*time.Duration(rand.Intn(250)))

        // Die empfangene Zahl ausgeben
        fmt.Printf("echoing %d from worker #%d\n", number, index)
    }

    // Channel wurde geschlossen und der Worker hat die Arbeit beendet
    workerWG.Done()
}

Schlusswort

Ich hoffe mit diesem Artikel einen kleinen Einblick in die Konzepte für Nebenläufigkeit in Go gegeben haben zu können. Der gesamte hier gezeigte Code ist auch auf Github verfügbar. Weitere Beispiele finden sich noch in der Tour of Go. Ich freue mich auf Kommentare und Rückfragen.

Ähnliche Beiträge

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.

Bitte füllen Sie dieses Feld aus
Bitte füllen Sie dieses Feld aus
Bitte gib eine gültige E-Mail-Adresse ein.

Menü