In an earlier article, I've described how to autogenerate SSIS file import packages using Biml. It is also possible to read data from more advanced data sources by generating a Script component. While you may not have much use for autogenerating SSIS-packages that read from Twitter, it serves as a good example and it is very useful to be able to autogenerate any SSIS-packages with a Script component.
This article requires that you use either BIDS Helper (a free add-on for Visual Studio) or MIST (a commercial product from Varigence, the company behind Biml).
The goal of this article
Our goal is to generate a SSIS-package that looks like this:
The SSIS-package should:
- Truncate a database table
- Read from Twitter
- Write to the database table
Again, the main purpose of this article is to show how to create a Script component with Biml. Twitter is just used as an example.
Twitter API
Twitter has several API:s, of which one of the most useful is the Search API. This API is open for anyone to use (but requires an account registration). It's also relatively simple.
Authentication
Twitter uses OAuth for authentication. There are some different options. I've chosen to use Application-Only authentication, but you could easily authenticate as a user instead with small changes to my example.
The authentication requires that you have a registered application. It's very simple and free to register here. By registering you will get a Consumer Key and a Consumer Secret.
To authenticate you send your Consumer Key and Consumer Secret using the OAuth protocol and you will then receive a Bearer Token. This Bearer Token must then be included in all subsequent calls to the Search API.
Twitter Search API
It's very simple to use the Search API. Just send your search parameters as an URL to the Twitter server and you will get back the search results in JSON format. For example:
https://api.twitter.com/1.1/search/tweets.json?q=%23mssql
will search for tweets tagged with #mssql (note that %23 means #).
When calling the Search API, don't forget to add the authentication header. This can easily be done with one line of code:
request.Headers.Add("Authorization", "Bearer " + token);
The search results would look something like this:
"{\"statuses\":[{\"created_at\":\"Tue Apr 19 05:06:27 +0000 2016\",\"id\":722290241235730432,\"id_str\":\"722290241235730432\",\"text\":\"Build ISA QA Trade 2015.01_20160419.1 succeeded for New Tech in 186 minutes #TFS #ContinuosIntegration #MSSQL #SSIS\",...
JSON parser
I wrote my own simple JSON parser. It's not intended for production usage, so if you are going to run this in a production environment you should look at other options.
Generating the SSIS-package
We will now continue this walkthrough by:
- Create the database table
- Create the SSIS project (in BIDS Helper/Visual Studio) or a project in MIST
- Add the Biml to your project
- Generate the SSIS-package
Create the database table
I have named my database JohanBimlDemo. You can of course rename it - just don't forget to change in all places in the Biml.
Here's the SQL code to generate the table:
USE JohanBimlDemo GO CREATE TABLE [dbo].[Twitter] ( [IdStr] [nvarchar](50) NOT NULL PRIMARY KEY CLUSTERED, [Text] [nvarchar](250) NOT NULL, [Username] [nvarchar](50) NOT NULL, [CreatedAt] [nvarchar](50) NOT NULL );
Create a project
If you are using BIDS Helper, just create an SSIS-project as usual in Visual Studio: New Project... -> Business Intelligence -> Integration Services Project.
If you instead are using MIST, just create a project there.
Biml
The complete Biml source code is pretty long, so I've added it to the end of this article (and also attached it as a file).
To add it in BIDS Helper / Visual Studio, create a new Biml file and paste the Biml source code into it:
Generate the SSIS-package
Before you can read from the Twitter Search API, you will have to enter your authentication information. Find these rows in the Biml source code and enter your own data:
<Variable Name="ConsumerKey" DataType="String" >Enter your Consumer Key here</Variable> <Variable Name="ConsumerSecret" DataType="String">Enter your Consumer Secret here</Variable> <Variable Name="TwitterQuery" DataType="String">/1.1/search/tweets.json?q=%23mssql</Variable>
If you use BIDS Helper, you generate the SSIS-package by right-clicking on your Biml-file and selecting "Generate SSIS Packages".
In MIST you just select Build.
Then your SSIS-package is ready to test!
How it works
Now, this is the most important part: how the ScriptComponent works.
First in the source code there is a Connection and some Variables. Then there is a Tasks-element which starts with an ExecuteSQL to do the truncation of the table. After that the most important part begins, under the Dataflow.
ScriptComponentSource
A ScriptComponentSource uses a ScriptComponentProject to define the actual script.
The ScriptComponentProject can be defined either "globally" (shared between ScriptComponentSources) or inline directly under a ScriptComponentSource. I have chosen the "inline" option.
The ScriptComponentProject is probably best explained by looking at my example.
- Note that I have added a ScriptComponentProject under another ScriptComponentProject. This is required in Biml and I have no good explanation why.
- It's important that you add the right Assembly References. My example shows which ones you should include by minimum.
- You have to add at least one OutputBuffer, where the ScriptComponentSource delivers its output. I have included four fields from Twitter, but you can of course easily add more.
- You need a ReadOnlyVariables (or ReadWriteVariables) to make your variables available inside your script.
- Then comes my source files. Normally you create two source files: AssemblyInfo.cs (containing some metadata) and ScriptMain.cs (containing your actual source code)
The most important function in ScriptMain is CreateNewOutputRows. Here is how it authenticates and fetches the search results:
var accessToken = getAccessToken(consumerKey, consumerSecret); var jsontext = callSearchApi(queryString, accessToken); var jObj = new JObject(jsontext);
The search results are written to the OutputBuffer this way:
foreach (JObject status in (JArray)jObj["statuses"]) { // Reading a few fields. More Twitter fields can of course be read the same way. var id_str = status["id_str"].ToString(); var text = status["text"].ToString(); var user = (JObject)status["user"]; var username = user["name"].ToString(); var created_at = status["created_at"].ToString(); // Send to output buffer Output0Buffer.AddRow(); Output0Buffer.IdStr = id_str; Output0Buffer.Text = text; Output0Buffer.Username = username; Output0Buffer.CreatedAt = created_at; } Output0Buffer.SetEndOfRowset();
OleDbDestination
Nothing special about this dataflow component. We just specify the ConnectionName, JohanBimlDemo, and the destination table, dbo.Twitter, as ExternalTableOutput.
Biml source code
<Biml xmlns="http://schemas.varigence.com/biml.xsd"> <Annotations> <Annotation> TwitterScriptTaskDemo.biml Biml Demo using script task for reading from Twitter Written by Johan Åhlén 2016 </Annotation> <Annotation> Table script: CREATE TABLE [dbo].[Twitter] ( [IdStr] [nvarchar](50) NOT NULL PRIMARY KEY CLUSTERED, [Text] [nvarchar](250) NOT NULL, [Username] [nvarchar](50) NOT NULL, [CreatedAt] [nvarchar](50) NOT NULL ); </Annotation> </Annotations> <Connections> <Connection Name="JohanBimlDemoConnection" ConnectionString="Provider=SQLNCLI11;Data Source=localhost;Integrated Security=SSPI;Initial Catalog=JohanBimlDemo" /> </Connections> <Packages> <Package Name="TwitterBimlDemo" ConstraintMode="Linear" ProtectionLevel="EncryptSensitiveWithUserKey"> <Variables> <!-- Enter your Twitter API keys here... --> <Variable Name="ConsumerKey" DataType="String" >Enter your Consumer Key</Variable> <Variable Name="ConsumerSecret" DataType="String">Enter your Consumer Secret</Variable> <Variable Name="TwitterQuery" DataType="String">/1.1/search/tweets.json?q=%23mssql</Variable> </Variables> <Tasks> <ExecuteSQL Name ="Truncate" ConnectionName="JohanBimlDemoConnection" ResultSet="None"> <DirectInput>TRUNCATE TABLE [dbo].[Twitter]</DirectInput> </ExecuteSQL> <Dataflow Name ="Read from Twitter"> <Transformations> <ScriptComponentSource Name="Twitter"> <ScriptComponentProject> <ScriptComponentProject ProjectCoreName="TwitterScript" Name="TwitterScript"> <AssemblyReferences> <AssemblyReference AssemblyPath="System" /> <AssemblyReference AssemblyPath="System.Data" /> <AssemblyReference AssemblyPath="System.Windows.Forms" /> <AssemblyReference AssemblyPath="System.Xml" /> <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSPipelineWrap" /> <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSRuntimeWrap" /> <AssemblyReference AssemblyPath="Microsoft.SqlServer.PipelineHost" /> <AssemblyReference AssemblyPath="Microsoft.SqlServer.TxScript" /> </AssemblyReferences> <OutputBuffers> <OutputBuffer Name="Output0" IsSynchronous="false"> <Columns> <Column Name="IdStr" DataType ="String" Length="50"></Column> <Column Name="Text" DataType ="String" Length="250"></Column> <Column Name="Username" DataType ="String" Length="50"></Column> <Column Name="CreatedAt" DataType ="String" Length="50"></Column> </Columns> </OutputBuffer> </OutputBuffers> <ReadOnlyVariables> <Variable VariableName="ConsumerKey" DataType="String" Namespace="User"/> <Variable VariableName="ConsumerSecret" DataType="String" Namespace="User"/> <Variable VariableName="TwitterQuery" DataType="String" Namespace="User"/> </ReadOnlyVariables> <Files> <File Path="AssemblyInfo.cs"> using System.Reflection; using System.Runtime.CompilerServices; // // General Information about an assembly is controlled through the following // set of attributes. Change these attribute values to modify the information // associated with an assembly. // [assembly: AssemblyTitle("TwitterScript.csproj")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] [assembly: AssemblyCompany("")] [assembly: AssemblyProduct("TwitterScript.csproj")] [assembly: AssemblyCopyright("Copyright @ Johan Åhlén 2016")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] // // Version information for an assembly consists of the following four values: // // Major Version // Minor Version // Build Number // Revision // // You can specify all the values or you can default the Revision and Build Numbers // by using the '*' as shown below: [assembly: AssemblyVersion("1.0.*")] </File> <File Path="ScriptMain.cs"> using System; using System.Data; using System.Text; using System.Text.RegularExpressions; using System.Net; using System.Collections.Generic; using System.IO; using Microsoft.SqlServer.Dts.Pipeline.Wrapper; using Microsoft.SqlServer.Dts.Runtime.Wrapper; [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute] public class ScriptMain : UserComponent { public override void CreateNewOutputRows() { var consumerKey = Variables.ConsumerKey; var consumerSecret = Variables.ConsumerSecret; var queryString = Variables.TwitterQuery; var accessToken = getAccessToken(consumerKey, consumerSecret); var jsontext = callSearchApi(queryString, accessToken); var jObj = new JObject(jsontext); foreach (JObject status in (JArray)jObj["statuses"]) { // Reading a few fields. More Twitter fields can of course be read the same way. var id_str = status["id_str"].ToString(); var text = status["text"].ToString(); var user = (JObject)status["user"]; var username = user["name"].ToString(); var created_at = status["created_at"].ToString(); // Send to output buffer Output0Buffer.AddRow(); Output0Buffer.IdStr = id_str; Output0Buffer.Text = text; Output0Buffer.Username = username; Output0Buffer.CreatedAt = created_at; } Output0Buffer.SetEndOfRowset(); } private string callSearchApi(string queryString, string accessToken) { // Prepare request var searchRequest = WebRequest.Create("https://api.twitter.com" + queryString); searchRequest.Method = "GET"; searchRequest.UseDefaultCredentials = false; searchRequest.Headers.Add("Authorization", "Bearer " + accessToken); // Get the response var searchResponse = searchRequest.GetResponse(); // Parse the response using (var responseStream = searchResponse.GetResponseStream()) { var reader = new StreamReader(responseStream); var responseBody = reader.ReadToEnd(); responseStream.Close(); return responseBody; } } private string getAccessToken(string consumerKey, string consumerSecret) { // Prepare request var oauthString = Convert.ToBase64String(Encoding.ASCII.GetBytes(consumerKey + ":" + consumerSecret)); var oauthRequest = WebRequest.Create("https://api.twitter.com/oauth2/token"); oauthRequest.Method = "POST"; oauthRequest.UseDefaultCredentials = false; oauthRequest.Headers.Add("Authorization", "Basic " + oauthString); oauthRequest.ContentType = "application/x-www-form-urlencoded;charset=UTF-8"; var postContent = Encoding.ASCII.GetBytes("grant_type=client_credentials"); oauthRequest.ContentLength = postContent.Length; using (var requestStream = oauthRequest.GetRequestStream()) { requestStream.Write(postContent, 0, postContent.Length); requestStream.Close(); } // Get the response var oauthResponse = oauthRequest.GetResponse(); // Parse the response using (var responseStream = oauthResponse.GetResponseStream()) { var reader = new StreamReader(responseStream); var responseBody = reader.ReadToEnd(); responseStream.Close(); var regex = new Regex("\"access_token\":\"(.+?)\""); var match = regex.Match(responseBody); var access_token = match.Groups[1].Value; return access_token; } } } // Using a very simple json implementation. If used for production, it should be replaced with a better json library. public class JArray : List<object> { internal void Parse(string jsontext, ref int i) { i = JToken.LookFor(jsontext, i, '[', true); i++; // Loop through members do { // Check for end of array if (jsontext == ']') { i++; return; } // Read object var obj = JToken.Parse(jsontext, ref i); this.Add(obj); // Skip whitespace i = JToken.SkipWhitespace(jsontext, i); // Check if end of array or another member var ch = jsontext; if (ch == ']') break; else if (ch == ',') continue; else throw new Exception("JArray.Parse: expected ] or ,"); } while (true); } public JArray(string srctext) : base() { var i = 0; Parse(srctext, ref i); } internal JArray(string jsontext, ref int i) : base() { Parse(jsontext, ref i); } } public static class JToken { internal static int LookFor(string jsontext, int startpos, char searchCh, bool skipWhitespace = false) { var i = jsontext.IndexOf(searchCh, skipWhitespace ? SkipWhitespace(jsontext, startpos) : startpos); if (i == -1) throw new Exception("JToken.LookFor: can't find character " + searchCh); return i; } internal static int LookForUnescaped(string jsontext, int startpos, char searchCh) { var i = startpos; while (i < jsontext.Length) { if (jsontext == searchCh) { if (i == 0 || jsontext != '\\') return i; } i++; } throw new Exception("JToken.LookForUnescaped: can't find character " + searchCh); } internal static int SkipWhitespace(string jsontext, int startpos) { for (var i = startpos; i < jsontext.Length; i++) if (!char.IsWhiteSpace(jsontext)) return i; throw new Exception("JToken.SkipWhitespace: end of string reached"); } public static object Parse(string jsontext, ref int i) { i = SkipWhitespace(jsontext, i); if (jsontext == '"') return ReadStr(jsontext, ref i); else if (jsontext == '{') return ReadJObject(jsontext, ref i); else if (jsontext == '[') return ReadArray(jsontext, ref i); else if (jsontext == 'n') return ReadNull(jsontext, ref i); else if (char.IsLetter(jsontext)) return ReadBool(jsontext, ref i); else if (char.IsDigit(jsontext) || jsontext == '-') return ReadNumber(jsontext, ref i); else throw new Exception("JToken.Parse: unknown object type"); } internal static object ReadNull(string jsontext, ref int i) { if (jsontext.Length >= i + 4) { if (jsontext.Substring(i,4) == "null") { i += 4; return null; } } throw new Exception("JToken.ReadNull: unknown object type"); } internal static string ReadStr(string jsontext, ref int i) { var j = LookForUnescaped(jsontext, i + 1, '"'); var str = jsontext.Substring(i + 1, j - i - 1); i = j + 1; return str; } internal static JObject ReadJObject(string jsontext, ref int i) { var jobj = new JObject(jsontext, ref i); return jobj; } internal static JArray ReadArray(string jsontext, ref int i) { var jarr = new JArray(jsontext, ref i); return jarr; } internal static bool ReadBool(string jsontext, ref int i) { if (jsontext.Substring(i, 4) == "true") { i += 4; return true; } else if (jsontext.Substring(i, 5) == "false") { i += 5; return false; } else throw new Exception("JToken.ReadBool: Unknown value"); } internal static double ReadNumber(string jsontext, ref int i) { var digitsstr = ""; if (jsontext == '-') { digitsstr += jsontext; } while (char.IsNumber(jsontext) || jsontext == '.') { digitsstr += jsontext; i++; } return Convert.ToDouble(digitsstr, System.Globalization.NumberFormatInfo.InvariantInfo); } } public class JObject { private SortedDictionary <string, object> contents = new SortedDictionary<string, object>(); private JObject() { } internal void Parse(string jsontext, ref int i) { i = JToken.LookFor(jsontext, i, '{', true); i++; // Loop through members do { // Look for label start (") i = JToken.LookFor(jsontext, i, '"', true); // Look for label end (") var j = JToken.LookFor(jsontext, i + 1, '"'); // Assign label var label = jsontext.Substring(i + 1, j - i - 1); // Look for colon i = JToken.LookFor(jsontext, j + 1, ':', true); // Move to next character and skip whitespace i = JToken.SkipWhitespace(jsontext, i + 1); // Read token var token = JToken.Parse(jsontext, ref i); contents.Add(label, token); // Skip whitespace i = JToken.SkipWhitespace(jsontext, i); // Check if end of object or another member var ch = jsontext; if (ch == '}') break; else if (ch == ',') continue; else throw new Exception("JObject.Parse: expected } or ,"); } while (true); } public JObject(string jsontext) { var i = 0; Parse(jsontext, ref i); } internal JObject(string jsontext, ref int i) { Parse(jsontext, ref i); } public object this[string name] { get { return contents[name]; } } } </File> </Files> </ScriptComponentProject> </ScriptComponentProject> </ScriptComponentSource> <OleDbDestination Name="Database" ConnectionName="JohanBimlDemoConnection"> <ExternalTableOutput Table="[dbo].[Twitter]"></ExternalTableOutput> </OleDbDestination> </Transformations> </Dataflow> </Tasks> </Package> </Packages> </Biml>