Stream Data to Google BigQuery with Apache Beam
Author :: Kevin Vecmanis
In this post I walk through the process of handling unbounded streaming data using Apache Beam, and pushing it to Google BigQuery as a data warehouse.
In this article you will learn:
- How to stream multi-attribute data to Google Pubsub
- How to create an Apache Beam pipeline to process streaming multi-attribute data
- The structure around Apache Beam pipeline syntax in Python
- How to output the data from Apache Beam to Google BigQuery
Table of Contents
- Introduction
- Creating the mock stock price stream
- Building the Apache Beam data pipeline
- Ingesting Data From Difference Sources
- Diving Deeper into Apache Beam
- Summary
Introduction
Data engineers deal with two main types of data processing when they’re designing ETL pipelines: batch and stream. The key distinction is that batch processing is done on data sources that are fixed and finite in size (importing data from a .csv file, for example), whereas streams are unbounded data. This could be something like temperature data from an IoT sensor that sends an updated value every 5 seconds and never ends.
In this post I’m going to stream mock second-by-second stock data using Apache Beam and Google Data Flow as the data runner. Apache Beam is a nice SDK, but the methodology and syntax takes some getting used to. I’m going to do the best I can to explain this if you’re unfamiliar.
The output of our data pipeline is going to dump into Google Big Query - a powerful data warehouse that facilitates all kinds of analysis. I hope you enjoy this!
Creating the Mock Stock Price Stream
To create our stock market price stream, I’ve created a simple class that creates a random-walk stock price by calling the __next__
method.
Calling the __next__
method will produce a new value of the stock based on the prior value that varies according to a distribution defined by a mu
and sigma
that you specify. The output looks like this:
New stock value: 100.0605626178124
New stock value: 100.16591843889884
New stock value: 100.28548431538503
New stock value: 100.24684703140774
New stock value: 100.47129110727234
New stock value: 100.71474859108991
New stock value: 100.84707200326982
New stock value: 101.03516354077385
New stock value: 101.23282725242157
New stock value: 101.33349858811373
New stock value: 101.4227323808079
.
.
.
For this tutorial, what we want to do is run this stock price generator indefinitely in a loop and have it publish the stream to Google Pubsub. I’ve created the following module to do this, which can be run locally if you have the gcloud SDK
installed, or from the cloud shell on Google Cloud Console.
This module does the following:
- Creates an instance of
StockGenerator
as the variablestock_price
. - In an indefinite loop:
- Pauses for 1 second to mimic second-by-second stock data.
- Gets a randomly generated stock value.
- Creates a timestamp.
- Packages these two entries in a dictionary.
- Dumps the dictionary to a JSON object.
- Encodes the JSON object.
- Publishes the encoded message to our pubsub topic
stock-stream
.
This will run until we shut it off - which means it’s “unbounded”. The next thing we’re going to do is build out our Apache Beam data pipeline for processing this stream.
Building the Apache Beam Data Pipeline
I want to cover some introductory ground regarding the Python syntax for Apache Beam - the code that follows will make more sense if I do.
The Apache Beam website says this about the Apache Beam SDK:
Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
Beam is particularly useful for Embarrassingly Parallel data processing tasks, in which the problem can be decomposed into many smaller bundles of data that can be processed independently and in parallel. You can also use Beam for Extract, Transform, and Load (ETL) tasks and pure data integration. These tasks are useful for moving data between different storage media and data sources, transforming data into a more desirable format, or loading data onto a new system.
In a nutshell, Apache Beam is a framework that allows you to:
- Get data from somewhere
- Do some things do it
- Dump it somewhere
In a fast, parallel way. Google Cloud Dataflow is a supported runner for Apache Beam jobs, and that’s how we’re going to run the job described in this tutorial. That being said, one of the nice things about Apache Beam is that it’s indifferent to the runner being used - you can swap out the runner for something else entirely just by changing a flag value. You’ll see this when we create our bash script to run this pipeline job.
A common design pattern you’ll see with Apache Beam pipeline scripts is for the pipeline parameters (and other parameters) to be passed as command-line arguments. The argparse
library is leveraged here for breaking up the command line arguments passed via a shell script. Before we proceed any further, let’s show you that shell script - the pipeline code will make more sense with this in mind.
#run.sh
#executes our pipeline code with all the required arguments
python -m pipeline.pipeline \
--project vanaurum \
--runner DataflowRunner \
--staging_location gs://vanaurum-stock-stream/staging \
--temp_location gs://vanaurum-stock-stream/temp \
--experiments=allow_non_updatable_job parameter\
--input_mode stream \
--input_topic projects/vanaurum/topics/stock-stream \
--output_table vanaurum:vanaurum.stock_stream
About this script:
- This script will be run either locally, or from Google Cloud Shell.
- It will execute your datapipeline using Dataflow as the runner, and provision everything needed behind the scenes.
project
: The project you want the resources provisioned under.runner
: The runner to use for executing the pipeline. Here we’re using Google Dataflow.staging_location
: Required with Google Dataflow - specifies the Google Cloud Storage location where temporary files required for running the pipeline can be stored. Needs to be in the form __gs:///staging temp_locaton
: Similar to staging_location. Required by Dataflow as a folder for storing temporary files.experiments
: Prevents a known error from arising when Google Dataflow attempts to over-write existing jobs.input_mode
: set to stream if your data source is unbounded (which it is, for our stock example)input_topic
: The Pub/Sub topic name that input data will be pulled from.output_table
: The table in BigQuery where we will output our streaming stock data to.
Now that we have introduced this file, let’s introduce the main script that this will be executing:
Breaking This Script Down
Starting with the run()
method:
- This is the main script for running our pipeline
- It accepts
argv
as an argument, which is the system arguments passed in ourrun.sh
bash script that I showed earlier. parser.add_argument
populates a known namespace with the options that you pass to it. When you do this, you can use theparser.parse_known_args
method to return a two-item tuple of arguments.- Our
pipeline_args
get passed as Pipeline Options for configuring our Pipeline, andknown_args
are going to be used as settings for the rest of our script. - For more information on
argparse
, visit the documentation site here. - The last half of this method is our pipeline transformations. Apache Beam has a pattern that works like this:
[Output PCollection] = [Input PCollection] | [Transform]
Or, when applying multiple transformations to the same input:
[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
| [Second Transform]
| [Third Transform])
In our script, we’re applying each to step to p
:
- Read an input
- Decode it using
decode_message()
- Parse it using
parse_json()
- Write it to BigQuery using the provided specifications.
It’s actually a really simple pipeline.
Ingesting Data From Difference Sources
Note that are data ingestion line here is:
This a standard I/O pattern for reading from Google PubSub - but there are other standard I/O patterns provided with Apache Beam:
Read from .txt file on Google Cloud Storage
Read from in-memory data
Read from a .csv file
Between these I/O types, you should have plenty of options of ingesting data from your source.
Running the Bash Script to Execute the Pipeline
The bash script can be run locally, or from Cloud Shell in your Google Cloud Console. I prefer to clone my files to the home directory in my cloud shell and run things from there.
Either way, when you navigate to your project root and run:
./run.sh
This will execute the pipeline. If you navigate to Dataflow in your Google Cloud Console, you’ll see a diagram of all the transformations steps you specified in your pipeline code. Mine looks like this:
Because the output of our Pipeline is going to Google BigQuery, let’s check there to see if data is appearing in our table:
Which it is - great!
Diving Deeper into Apache Beam
The code above is all we really need to execute a simple Apache Beam pipeline that loads streaming data into Big Query. Apache Beam, however, is much more powerful than that. There are some foundational concepts that we glossed over in the code explanation above - I want to discuss those here.
Structural Components of Apache Beam
An Apache Beam pipeline has three main objects:
Pipeline
:- A
Pipeline
object encapsulates your entire data processing task. - This includes reading input data, transforming that data, and writing the output data.
- All Apache Beam driver programs (including Google Dataflow) must create a
Pipeline
. - When creating the
Pipeline
, you also specify the execution steps that tell the runner where and how to run thePipeline
.
- A
PCollection
:- A
PCollection
is a foundational data structure in aPipeline
. - Represents a (potentially) distributed dataset that your Beam pipeline operates on.
- This dataset can be unbounded (streaming data), or bounded (fixed source like a text file).
- Your Pipeline will typically create an initial
PCollection
from the input source. From there,PCollections
will be the input and output to each step of yourPipeline
.
- A
PTransform
:- A
PTransform
is a single data processing operation in each step of yourPipeline
. - Every
PTransform
takes one or morePCollection
objects as an input, processes the data, and produces zero or morePCollection
objects as an output.
- A
Characteristics of PCollections
- Element Type: The elements within a
PCollection
can be any type, but they must all be the same type. To support parallel processing, all the elements must be serializable as a byte string. - Immutability: Each
PCollection
is immutable. Once created, elements cannot be added, removed, or changed. When processing is done on aPCollection
, it generates new pipeline data but does not consume or modify the original collection. - Element Timestamps: It’s worth noting that each element in a
PCollection
has an intrinsic timestamp that is assigned by the source that creates thePCollection
. It’s important to keep in mind that depending on your application, this timestamp may not be the one you want to use for processing. In our example with the stock price stream, we assigned a timestamp when we published it to Pubsub. This is the timestamp we want to use in any windowing or processing steps for the stock data. Also keep in mind that the intrinsic timestamp assigned by the source creating thePCollection
might be significantly different than any timestamp embedded in the source dataitself.
Characteristics of PTransforms
- Transforms are operations that are applied on your pipeline.
- Typically, the user provides processing logic in the form of a function object - examples of these above are
parse_json
anddecode_message
. - To harness the true power of Apache Beam, processing functions should leverage the parallel processing capabilities of Beam by sublassing built in parallel processing classes in Beam, like
ParDo
. - Apache Beam specifies requirements for writing these functions here.
- The key requirement is that the functions need to be serializable so that they can be sent over the wire to other computing nodes for execution. This has some implications on its own:
- The function should be stateless.
- The function should be as simple as possible.
- The function should avoid complex nested logic and recursive calls if possible (to avoid complications with serialization).
- The function should avoid the use of global variables.
- What is serialization? It’s just the conversion of a function into a byte string so that it can be transmitted over wire.
Serializing / Deserializing a function
>>> import pickle
>>> def add_one(n):
... return n+1
... ser = pickle.dumps(add_one)
>>> ser
b'\x80\x03c__main__\nadd_one\nq\x00.'
>>> add_one = pickle.loads(ser)
>>> add_one(5)
6
This is what they mean when they say the function needs to be serializable.
Summary
- In this post I walked through how to send streaming data to a table in Google Big Query.
- We went over some of the structural foundations of Apache Beam and how all the pieces fit together.
- I showed several ways of reading data into Apache Beam Pipelines.
- We finished by doing a deeper dive into Pipelines, PCollections, and a PTransforms.
I hope you enjoyed this post!