StreamSets ia a modern data streaming and integration platform build by company StreamSets, Inc. It is used by many multinational companies such as Shell and Dell.
StreamSets Data Collector (SDC) is an open source data ingestion pipeline as one part of StreamSets DataOps platform, you can download here to try out. Today we will use SDC to demonstrate the real time data transformation from Aurora Postgres to Kinese Firehose.
1. Create a new pipeline by clicking the blue button on the top left of the StreamSets UI, and type in pipeline name, I’m typing in “Demo”. After that, click “Save”. Now we have a empty pipeline created.
You will see a small error triangle on the right top of the user interface, reminding you to add in pipeline Origin.
We will use JDBC Query Consumer as the origin to ingest the source data from Aurora Postgres. There is one more popular data ingestion option which is using Change Data Capture to capture table data changes from logs. We will discuss it in another post.
2. Click JDBC Query Consumer located on the right panel and a box with a database drawing will display on the left side automatically. On the lower half of the user interface, there are connections and configurations we need to fill in.
Fill in the connection string, user and password and the SQL query with the table name you would like to retrieve data from. Take note that when we choose to use “Incremental Mode”, the table data will be retrieved only when there is new or updated data. In order to let JDBC Consumer to recognise whether the data is new or updated, set an offset column (usually timestamp column or datetime column). Therefore, any data with a timestamp / datetime later than last retrieved offset, that record will be ingested in.
The offset column also has to be filled in under tab “Change Data Capture”.
3. Let’s add in one processor to have some simple data transformation.
I added in one processor Record Deduplicator to check and remove duplicated records ingested by JDBC origin. We need to define what fields to compare to evaluate whether the records are unique or duplicated. We can notice that there is 1 and 2 with small circle on the processor, 1 is to process unique records and 2 is to process duplicates. We will add in more processors later to see how to use it.
Now, let’s configure the processor. Under tab “Deduplication”, fill in column names in “Fields to Compare” with “Specified Fields” selected under “Compare”, or simply select “All Fields” under “Compare” if every fields are prerequisite for duplicates.
Trash is used to collect garbage data outputted from Record Deduplicator. There is no configuration needed. It will automatically absorb the data and throw it nowhere. Isn’t it cool?
For Field Type Converter, we are demonstrating the data type conversion when the original type is not desirable.
There are two options to choose from.
First way, choose “Conversion Method” to be “By Field name”, which allows to convert the preselected column. Fill in the column in “Fields to convert” and select the final data format under “Convert to Type”, and other desired formats of other tabs.
Second way, choose “Conversion Method” to be “By Data Type”, which will process every fields and convert those fields with the preselected source type.
5. Let’s select the final destination Kinese Firehose. And now, we have nearly a complete pipeline!
Log in to your AWS account to find the kinese firehose access key and secret. If you have an existing firehose streams created, simply copy the streams name and paste it here. The default data format for delivering data to firehose is JSON, you can change it to Delimited under tab “Data Format” for your own requirement.
6. We have finished creating a simple pipeline! Click “Start” to test the pipeline and there should be data coming in!
It has a beautiful summary report showing real-time data ingestion and transformation statistics. If any error occurs, it’s easily be spotted.
That’s it! Thanks for reading until here. There are many more things StreamSets can do, for example version control, seamless migration and alert. Hope the article can inspire you to explore more. Feel free to comment if you have anything to share!