/ DISTRIBUTED SYSTEMS

Designing Asynchronous Functions with Go

Who doesn’t love fully controllable asynchronous APIs? This post is about crafting asynchronous functions using Golang’s Context, Channels, and Goroutines.

Introduction

For the purpose of this post, let’s consider a user who is searching for a few flights on a flight search engine. The user has a complex query, and therefore, must be handled at two levels:

1. The search engine must query third-party APIs to obtain broad flight listings.
2. The search engine then has to apply complex filters and stream flights to the user.

The Design

Let’s dive right in to the asynchronous design. The following function promises to return Flight or error instances via the returned channels. The Context object can be used to explicitly stop the asynchronous function.

func AsyncListFlights(ctx context.Context, q SimpleQuery) (<-chan Flight, <-chan error) {
    flightsChan := make(chan Flight, 10) // Channel buffers 10 Flight objects
    errorsChan := make(chan error, 1) // Channel buffers only a single error
    go asyncListFlights(ctx, q, flightsChan, errorsChan) // Create a goroutine here
    return flightsChan, errorsChan // Return immediately
}

Fetching flights from third-party APIs could be done as below.

func asyncListFlights(ctx context.Context, q SimpleQuery,
        flightsChan chan<- Flight, errorsChan chan<- error) {

    defer close(flightsChan) // Signal the end of stream
    defer close(errorsChan) // Signal the end of stream

    for { // Fetch third-party results as long as is necessary
        select {
        case <-ctx.Done(): // Context was done (timeout, cancel, etc.)
            errorschan <- ctx.Err() // If canceled, ctx.Err() would return the context.Canceled error
            return
        default:
        }

        flights, err := fetchResultsFromThirdParty(q, 5) // Fetch 5 results from third-party
        if err != nil {
            errorsChan <- err // Publish any error
            return
        }
        if len(flights) == 0 { // Close channels and exit
            return
        }
        for _, flight := range flights { // Publish Flight instances
            flightsChan <- flight
        }
    }
}

Briefly, the asyncListFlights function is fetching Flight instances from third-party APIs in small batches, and publishing results/errors over channels. It’s also listening to the ctx.Done() channel, and would stop fetching more results if the Context was explicitly canceled.

That’s it! We’ve already addressed challenge 1. Let’s apply the complex filters to these Flight instances, and publish them to the user now.

func streamFlights(q ComplexQuery, numFlights int) {
    ctx, cancel := context.WithCancel(context.Background()) // Create a cancelable context
    defer cancel() // Prevent context from leaking

    flightsChan, errorsChan := AsyncListFlights(ctx, simplifyQuery(q)) // Start the async func

    var numPublished int32 // Counter to count published flights
    var wg sync.WorkGroup // Create a synchronizing WorkGroup

    for i := 0; i < numWorkers; i++ { // Spawn worker goroutines
        wg.Add(1) // Increment WorkGroup counter
        go func() {
            defer wg.Done() // Decrement WorkGroup counter

            for { // Until desired number of Flights are fetched, or the channels are closed
                select {
                case err, ok := <-errorsChan:
                    if !ok { // AsyncListFlights finished, so exit
                        return
                    }
                    if err == context.Canceled { // Another goroutine instance canceled, so exit
                        return
                    }
                    log.Fatal(err) // Log the error and continue

                case flight, ok := <-flightsChan:
                    if !ok { // AsyncListFlights finished, so exit
                        return
                    }
                    if matchesComplexQuery(flight, q) { // If flight matches complex query
                        publishFlightToUser(flight) // Publish the flight to user
                        atomic.AddInt32(&numPublished, 1) // Increment counter atomically
                        if atomic.LoadInt32(&numPublished) ==  numFlights { // Published enough flights
                            cancel() // Stop fetching flights
                            return
                        }
                    }
                }
            }
        }()
    }
    wg.Wait() // Wait for all worker goroutines to finish
}

The streamFlights function calls AsyncListFlights to start the third-party querying. It then spawns a few worker goroutines, which use sync.WaitGroup to synchronize themselves in wg.Wait() when finishing. Each worker listens to flightsChan and errorsChan channels, and exit when these channels are closed. If a Flight instance is received, it is further matched with the ComplexQuery and published via publishFlightToUser if it does match. Once published, the numPublished counter is incremented atomically, and once it equals the desired numFlights, the Context is canceled. Once the Context is canceled, no more third-party API calls are made by asyncListFlights, and it exits after closing the channels, which, in turn, causes the workers to stop.

And, there you have it - fully controllable asynchronous functions in Golang.

Conclusion

Context, Channels, and Goroutines are powerful concepts in Golang, and together, they can create amazing experiences.


Thoughts are appreciated. :)