Recently while working on a project at MILL5 we had the need to retrieve data from an external API and write it to our Azure SQL instance. The API used an API key and was throttled so that it was necessary to process the external records by pages. The API returned the “nextPage” UUID that would be used on subsequent API calls to retrieve the next page.
Our existing data ingestion pipelines were built in Azure Data Factory and leveraged Azure Functions to provide server-less compute. It was not clear to us at the time how to implement this data ingress solution using these technologies and there wasn’t a lot of relevant information on the internet at the time. Below is some detail into how we did it.
Problem Statement
We were looking for a data ingestion pipeline that did the following:
- Read a data set from a SQL table that contained a column of data to enumerate and pass to the remote api
- Invoke the remote API with the value from the data set and retrieve the corresponding page of data from the remote API
- Insert the returned data into SQL Server
Azure Functions
With an eye on cost our import implementation was written using an Azure Function. The Azure function is very simple – it takes a “remoteParameter” and “nextPage” query params in the http request. The code for the function is below:
[Function("ApiImportUsingHttp")]
public async Task RunUsingHttpAsync([HttpTrigger(AuthorizationLevel.Function,
"get", Route = null)] HttpRequestData req, FunctionContext context)
{
var apiParam = HttpUtility.ParseQueryString(req.Url.Query).Get("remoteParameter");
CheckIsNotNull(nameof(apiParam), apiParam);
//get the passed in nextPage - will be null on the first invocation
var nextPage = HttpUtility.ParseQueryString(req.Url.Query).Get("nextPage");
//invoke the remote API - the response is JSON and contains "nextPage" and "Data"
var apiResponse = await _remoteServiceAPI.InvokeRemoteService(apiParam,
nextPage).ConfigureAwait(false);
//Code not shown: Using an EFCore context write the apiResponse.Data to SQL
//build the response that is returned from this function
var response = req.CreateResponse(HttpStatusCode.OK);
await response.WriteAsJsonAsync(new { nextPage = HttpUtility.UrlEncode(apiResponse.nextPage) });
return response;
}
The _remoteServiceAPI is just a wrapper around the external API and applies the API Key and the parameters the function passes in. After building the function it can be uploaded to the Azure subscription and made available to ADF.
Azure Data Factory
Azure Data Factory is a great tool for automating data management and ingestion. When using an instance of Azure SQL it is trivial to enable ADF to read records from your local SQL instance.
Inside ADF create a new Dataset that links to your SQL instance. For this example I created a SQL DB with a table called “TestData”.
In order to page the API data we are going to create two pipelines. One is the outer pipeline that enumerates over the “remoteParameter” data that is passed into the inner pipeline which actually pages the API data using each item of the enumeration as the “remoteParameter” passed into the azure function as a query parameter.
The first pipeline is shown below and consists of two activities; Lookup and ForEach:
Make sure that you uncheck the “First row only” as it is checked by default. Also note that the lookup activity has a limit on rows returned of 5000.
Next add a ForEach activity that will enumerate the results of the Lookup. This can be done by specifying the “Items” of the ForEach to be an array of the result set from the lookup:
If you click in the items box you will see the dynamic content looks like this: @array(activity(‘TestDataLookup’).output.value)
This is putting the rows of the Lookup activity into an array to be enumerated on. ForEach has a nice feature which allows for enumeration sequentially or in parallel.
For each item in the array it will invoke the inner activity which in our case will be another pipeline. One of the nice things about ADF pipelines is that they can take parameters when they execute and we will use this feature to pass into the pipeline the parameter we want to pass to our function to call the remote API with. The column that we care about in this example is called “remoteParameter”. Inside the activities of the ForEach activity is only one Execute Pipeline activity:
To do this you use the Execute Pipeline activity.
The pipeline can take a dynamic pipeline parameter which in our case is the “remoteParameter” property of the data set that was returned in the Lookup. Now lets dive into the inner pipeline that actually does the paging:
The inner pipeline is designed to execute our Azure Function and what is interesting is that when executing an Azure Function in ADF you can use string functions to dynamically build the request that is sent into the HTTP function:
The trick here is that when ADF invokes the “Function Name” it is the full HTTP call to the function so by declaring the function name like this:
@concat(‘ApiImportUsingHttp?remoteParameter=’, pipeline().parameters.remoteParam)
We can dynamically build the request based on the parameter that was passed into our pipeline and this is then passed as a query param to our function. You can also use this same technique to dynamically call route parameters in your api as well: @concat(‘ApiImportUsingHttp/items/’, pipeline().parameters.remoteParam) if your API doesn’t use query parameters.
Now remembering our Azure Function code from above we are setting the “nextPage” property of the functions response so then we turn around and set that into a variable. We can use the response from the function to set the variable like this:
@activity(‘ProcessTestData’).output.nextPage
Where nextPage is the page id returned from the API call.
Now we want to loop paging data until the nextPage variable returned from our function is null signifying that we have retrieved all the pages. In the Until activity we can specify this as an expression like this:
Where the dynamic content is:
@equals(variables(‘nextPage’), ”)
The final piece are the activities inside our Until loop which just repeats this workflow by calling our function and setting the variable as before with the addition of passing in the “nextPage” dynamically for the Function Name:
This is a nice example of how ADF allows for building blocks to drive dynamic content and integrates nicely with Azure Functions to allow for some complex workflows that are low compute and cost effective.
Author(s):
Steve Tarmey, Principal Architect (https://doanotherbuild.com)
Nate Franz, Software Engineer