Interact with the Microsoft Graph API using Synapse Analytics – Part II

Introduction

In the previous article, I described how to connect to the Microsoft Graph API using Synapse Analytics, including linked service configuration and a sample pipeline. This pipeline consumes a generic dataset which uses the “Relative URL” dataset property together with pipeline and dataset parameters. Finally, a set of AAD groups and their attributes were retrieved and stored in a parquet dataset in ADLS. 

In this post I will enhance the previous pipeline by adding additional features like filtering out empty AAD groups, retrieve and store group members, and create batches to minimize the number of API calls. 

Business Scenario

Let’s imagine we have the following requirements from business:

  • AAD groups with a name pattern must be imported in a database table
  • The members of the previous group must be imported in another database table too
  • The number of group members must be store in the group table
  • The number of calls to the Graph API must be minimize

Target data model

The following diagram represents our target data model:

  • graph_api.AAD_Group: table to store the AAD groups
  • graph_api.AAD_Group_Member: table to store the members of the AAD groups existing in the AAD_Group table.
  • graph api.AAD_Group_Member_Raw: this is a kind of staging table and will help us to store intermediate results of the API calls.

Remark: I have not optimized the data types in the model.

The Synapse Pipeline

This is an example pipeline to meet our requirements. Please be aware that I am not including logging or error handling in the design.

  • Copy AAD Groups: this activity imports the AAD group as explained in the previous article. The only difference is we are using a table in a SQL dedicated pool instead of a parquet dataset as target. 
  • Create requests AAD Members: create the request body for each batch.
  • SP Copy members: an SP to move the members from the AAD_Group_Member_Raw to the AAD_Group_Member table.

Grouping group IDs in batches

Some entities of the graph APIs accept batch requests of a maximum of 20 objects. In this case batches of 20 group IDs are built using SQL. Since all IDs in the graph_api.AAD_Group table must be processed, the total number of records is divided by 20. The result + 1 is the total number of batches (the last batch has from 0 to 19 objects). Then the SQL function NTILE is used to create buckets of 20 rows and assign a corresponding “batch” or bucket number. 

declare @number_of_groups int; 

set @number_of_groups = (select count(*) from graph_api.[AAD_Group]); 

select 
concat('[', 
  STRING_AGG( 
  CONCAT('{"id":"' ,Qry.Group_Id, '","method": "GET","url":"/groups/', Qry.[group_id]
 ,'/members?$count=true&$select=id,displayName,mail,userPrincipalName'
 ,',"headers":{"ConsistencyLevel": "eventual"}}') ,',') 
 ,']') as json_output 
from (
	select Group_Id, 
	row_number() over(order by load_ts desc) as id, 
	ntile((@number_of_groups / 20)+1) over(order by load_ts desc) as tile from 
		[graph_api].[AAD_Group] 
	)as Qry group by tile;

This is an example output containing only two groups for visibility, but sets of 20 groups are created if there are enough results:

For additional information visit this article: Combine multiple requests in one HTTP call using JSON batching. 

For Each Batch import the group members

Now we have the body for a batch request optimizing the number of calls with a reduction factor of 20. Let’s configure the Copy activity within the loop:

  • The source dataset is the same used to get the groups but this time it has a different setting:  Relative URL: https://graph.microsoft.com/v1.0/$batch 
  • Request Method = POST 
  • Request Body: the json_output of every item in the foreach collection
  • Header: Content-Type = application/json

This is the Sink configuration:

The mappings are configured to iterate through the “responses” collection and from the body the following attributes: 

  • value: array containing the members 
  • @odata.count: total count of group members 
  • id: id of the request which is the group_id (because we decided it in order to keep track it) 
  • status: status of the API call – Only 200 responses are considered valid

This is the output of the graph_api.AAD_Group_Member_Raw Table:

Now it’s quite simple to detect groups without members for further processing. The members for every group are stored in an array in the column “value”. This array is processed in the next activity to copy the members to its target table.

Loading the AAD members table

Finally, we are going to move the members to the target table using a stored procedure to process JSON arrays of members and filter out empty groups or status other than 200.

CREATE PROC [graph_api].[Move_AAD_Members_Stg_DWH] AS
BEGIN
 -- SET NOCOUNT ON added to prevent extra result sets from
 -- interfering with SELECT statements.
 SET NOCOUNT ON
-- Truncate target table
TRUNCATE TABLE [graph_api].[AAD_Group_Member]
 -- Unpack json values
INSERT INTO [graph_api].[AAD_Group_Member]
select Member_Id, id as Group_Id, Member_Name, User_Principal_Name, Mail, GETDATE() as DSTS
from [graph_api].[AAD_Group_Member_Raw]
cross apply openjson(value) WITH (
Member_Id VARCHAR(32) '$.id',
Member_Name NVARCHAR(255) '$.displayName',
User_Principal_Name NVARCHAR(255) '$.userPrincipalName',
Mail NVARCHAR(255) '$.mail'
)
where member_count > 0 and status = '200';
END

Summary

In this article an approach to optimize calls to the Microsoft Graph API is described. To showcase the features database tables, Synapse pipelines and some SQL code were used to query AAD groups and their members. As in previous posts, SQL code was used to interact with JSON data.

Advertisement
Posted in #azure, Azure Synapse Analytics | Tagged , , , | Leave a comment

Interact with the Microsoft Graph API using Synapse Analytics – Part I

Introduction

In this and the next post I want to show you how to connect to the Microsoft Graph API, request some data, process it and store it in a database using Synapse Analytics. 

This first post presents a sample use case, briefly introduces the Graph API, how to create a linked service to it, and how to start querying data. In the next post a sample  Synapse pipeline will be described. The pipeline grabs some data and copies it into some target tables. Finally, I will create a sample query to showcase the newly imported data. 

Use case 

Working with a customer in a large organization, I was dealing with a traditional security setup where access control for database objects, a SQL dedicated pool, was implemented using database custom roles, database users from external providers (AAD) and AAD groups. If a user requests access to a particular database view or schema, this user is added to the corresponding AAD group, which is created as a user in the target database. This user has been added to one or more roles, which has granted some privileges for the particular database objects (in this case select permissions over views).

After some time, it was required to prepare a list of AAD groups and their members in a queryable format in order to create reports and implement additional access control mechanisms. 

Access Control Setup

Graph API intro

“Microsoft Graph is a RESTful web API that enables you to access Microsoft Cloud service resources” https://learn.microsoft.com/en-us/graph/use-the-api

This API is a gateway to access large amounts of data in Microsoft 365, Windows, and Enterprise Mobility + Security. You can access your Azure Active Directory resources to enable different use cases. 

To access this API from the Synapse Workspace, the managed identity of the workspace must have User.Read.All permission. You can configure it in the Azure Portal, AAD, Enterprise Applications, and then search for the managed identity of the synapse workspace. 

For additional information please visit:

In order to learn and try out different request you can use this wonderful Graph Explorer: https://developer.microsoft.com/en-us/graph/graph-explorer

There is a selector for the request method, next to it you can select the version of the API and enter the request URL. On the top-right there is a button with the user icon you have to use to sign in. On the left you have a menu with different sample queries broken down by categories. 

Sample Query

Let’s say we want to query:

  • AAD Groups
  • Only groups starting by A_DB_
  • Only select the attributes: id, displayName and description

The query looks like this:

https://graph.microsoft.com/v1.0/groups?$filter=startswith(displayName,’A_DB_’)&$select=id,displayName,description

Please refer to the following pages to understand the query above:

After pasting the query in the graph explorer I got this response:

API Response

Linked Service Configuration

  • Navigate to manage, linked services and click on “+ New”

Entered the required values as follows:

  • After that, test your connection and click on “Create”.

Generic Dataset

In order to reuse the REST Linked Service a generic dataset with a parameter to point to different resources of the Graph API.

  • Go to Data tab – > Linked → Integration datasets … → New integration dataset

  • Select REST as the dataset type and click on “Continue”:

  • Provide a name and select the linked service we created previously and press ‘Ok’:

  • In the newly created dataset select the “Parameters” tab and create the following entry:

  • Go back to the Connection Tab of the dataset, select the “Relative URL” field and click on “Add dynamic content”

  • In the pipeline expression builder select the parameter we created previously:

  • Now the dataset should look like this:

  • Click on “Test connection”. It should succeed.

Data Pipeline

  • Create a new pipeline. I named it PIP_MS_Graph.
  • Add a Copy data activity and name it

  • In the “Source” tab let’s configure our query using the dataset parameter we created in a previous step:

  • Query: In the query I’m using the sample query presented before in the article. 
  • The Request method is GET
  • For Pagination rules, if the response contains more than 25 records the API paginates them and in every response the link to the next chunk is provided (default value, can be changed).
  • To showcase additional functionality, let’s add two additional columns, Load_TS to timestamp the incoming data and Has_Members, to initialize a column we are going to populate in a later step.
  • If you press on “Preview data” you should see something similar to this:

As target, let’s use for the part I just a parquet dataset in a storage account:

And now in the Copy activity Sink:

The Mappings could be tricky and should be handle with care:

  • The collection reference must be set to the [values] array in the response.
  • Then the attributes inside this array are configured as child objects using just their name.
  • Finally, the additional columns are added at the same level of the values.
  • The target columns are configured.

Note: Use the advanced editor if you want to see the target attribute names in the drown-downs for the mappings. This could be a current bug. You can also see the expression used to select the incoming fields. Now you can run the pipeline and check your results.

Summary

In this post I introduced the Microsoft Graph API, how you can use it to access AAD data and build a sample query, connect a Synapse Workspace to the Graph API and retrieve a list of groups to populate a parquet dataset. In the next post I will explain a couple of additional details to effectively use the API and play a bit more with AAD user, groups and memberships. Hope you find it useful. 

Posted in #azure, azure, Azure Synapse Analytics | 2 Comments

Alternative pipeline parametrization for Azure Synapse Analytics

  1. Introduction
  2. Use Case – Logical environment in data lake
  3. Use Case – Switching source folder in a dataset
  4. The Solution
  5. Example
    1. Overview
    2. Database objects
    3. Input data
    4. Read parameters from the database table
    5. Target Table
  6. Conclusion

Introduction

Parametrization was always a key aspect in ETL development to be able to move scripts, packages, jobs or whatever artifact you use into another environment. With modern data processing tools this situation has not changed. 

One of the first things I missed when I changed from tools like SSIS or Talend to Azure Synapse or Data Factory was a configuration database, file or similar mechanism (I may be getting old).

Particularly in Synapse, there are even no global parameters like in Azure Data Factory. 

When you want to move your development to another environment, typically CI/CDs pipelines are used. These pipelines consume an ARM template together with its parameter file to create a workspace in a target environment. The parameters can be overriding in the CD pipeline as explain here: https://techcommunity.microsoft.com/t5/data-architecture-blog/ci-cd-in-azure-synapse-analytics-part-4-the-release-pipeline/ba-p/2034434

Even so, I have not found a proper way to change the values of a pipeline parameter (the same for data flows and datasets parameters). I saw some custom parameters manipulation to set the default value of a parameter and then deploy it without any value, or even JSON manipulation with PowerShell (the dark side for me).

Use Case – Logical environment in data lake

Suppose you have separated environments, with let’s say, a different subscription or just a different resource group per environment,  you already set the template parameters and the linked servers are properly configured. Now suppose, and I have experienced this case in real projects, you have a common data lake for all environments with a logical separation using a folder layout:

If you have a dataset pointing to any folder of the storage account above, you need to parameterize it to be transportable to another environment. Let’s figure out how to implement this feature and deploy the workspace to quality or production. 

Use Case – Switching source folder in a dataset

Imagine you have different source directories in the same data lake, even if you have one different data lake per environment. You sample some data, maybe with a notebook, to be able to develop with a small representative dataset. You would like to execute the pipeline and use a parameter to switch between different source directories.

Remark: if this layout makes sense or not is another discussion.

The Solution

One way to overcome this issue is by creating a parameters table in a database. The pipelines will read this configuration at runtime and provide the configured parameters where it is needed:

The pipeline may have a custom logging step or even a pre-processing pipeline, then the parameters are read from the parameters table in a configuration database, the core of the pipeline consumes the retrieved parameters and finally you may want to have some general post-processing logic at the end.

Example

If you want to reproduce this example you will need at least:

  • A Synapse workspace
  • A dedicated SQL pool (or another database)
  • An azure data lake gen 2
  • A sample csv file

Overview

Let’s do a quick example and load a csv file located in a data lake to a table in a SQL dedicated pool.

The pipeline is simplified to highlight only how to use the alternative parametrization approach. 

Database objects

This is the DDL to recreate the database code. It is adapted to run in a SQL dedicated pool but you may want to hold this configurations in another database:

/* ---------------------------------------------------------

  ---- CREATE schema config

  ---------------------------------------------------------- */

IF(NOT EXISTS (select * from INFORMATION_SCHEMA.SCHEMATA where SCHEMA_NAME = 'config'))

BEGIN

EXEC('CREATE SCHEMA config AUTHORIZATION [dbo]')

END;

/* ---------------------------------------------------------

  ---- CREATE TABLE config.Pipeline_Parameters  ------------

  ---------------------------------------------------------- */

-- Drop the table if exists

IF(EXISTS (select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA = 'config' and TABLE_NAME = 'Pipeline_Parameters'))

BEGIN

DROP TABLE config.Pipeline_Parameters;

END;

-- Create a heap table to quickly insert data from S1

CREATE TABLE [config].[Pipeline_Parameters](

[Synapse_Object_Name] [nvarchar](120) NOT NULL,

[Synapse_Object_Type] [nvarchar](50) NOT NULL,

[Parameter_Name] [nvarchar](50) NOT NULL,

[Parameter_Value] [nvarchar](50) NULL

)

WITH  

  (   

    DISTRIBUTION = REPLICATE,

    CLUSTERED INDEX ([Synapse_Object_Name])  

  );

INSERT INTO [config].[Pipeline_Parameters] 

VALUES('Pipeline_Demo', 'Pipeline', 'source_folder', 'sample');

GO

INSERT INTO [config].[Pipeline_Parameters] 

VALUES('Pipeline_Demo', 'Pipeline', 'env', 'development');

GO

The result is a table like this:

Remark: The “Synapse_Object_Name” must have the same pipeline name. I created a column “Synapse_Object_Type” to generalize, in case you want to parameterize other objects.

Now that we prepared the parameters let’s turn back to the pipeline.

Input data

The sample file is stored in a data lake (ADLSv2) with this layout (this is just a sample layout design, not a recommendation):

Top level to logically split environments:

A level to split sample and source system data:

A specific source system folder:

Finally, the csv file:

To bring this data into synapse let’s create a new integration dataset. This dataset will be able to switch between different folders:

  • Create a new integration dataset from ADLSv2 of type DelimitedText and go to the parameters tab
  • Create parameter called source_folder
  • Add the default value: development/sample/erp

  • Return to the “Connection” tab and add the following configuration:

Note how the dataset parameter is consumed in the “File path” value.

If you set all properly you should be able to preview the data. In my case:

Read parameters from the database table

Let’s continue with another integration dataset, this time of type “Azure Synapse Analytics” (you will have to adjust it to the database you are using):

I don’t select any table since we are going to use this dataset to send all queries we want to execute against the config database.

This is the query to read the parameters:

select N'{' + STRING_AGG(params,',') + '}' as params
  from
  (
      select '"' +  + Parameter_Name + '":"' + Parameter_Value + '"' as params
      from [config].[Pipeline_Parameters]
      where Synapse_Object_Name = '@{pipeline().Pipeline}'
      and Synapse_Object_Type = 'Pipeline'
  ) as Qry

Please note the pipeline name is used to filter the relevant parameters. These are transformed into a JSON string to ease their consumption by the pipeline:

{"env":"development","source_folder":"sample"}

I used two “Set Variable” activities to extract the parameters in single variables:

Pipeline variables

env parameter:

Value:

@{json(activity('Get_Parameters').output.firstRow.params).env}

source_system parameter:

Value:

@{json(activity('Get_Parameters').output.firstRow.params).source_folder}

Target Table

To copy the data into a target table add a “Copy” activity to the canvas and configure it as follows:

  • Select the dataset we have created for the csv file as “Source”
  • Since this dataset has a parameter, a value can be provided. Here is where we pass our parameters from the database: 
@concat(variables('env'), '/', variables('source_folder'), '/erp')

If everything went well the rows were copied to the target table:

Conclusion

In this article an alternative parametrization for Synapse pipelines was presented together with an example. This method is also applicable for Azure Data Factory. This is not a standard approach and you should consider whether it fits your current technical requirements or not.

I also implemented a more complex parameters table design, in which you create another level I called parameter group. In this way you can retrieve different sets of parameters depending on the pipeline logic, for example, if you have multiple source or target datasets and your pipeline iterates over them.

Posted in #azure, azure, Azure Synapse Analytics, Business Intelligence | Tagged , | 2 Comments

Running mlflow server using Docker, Azure Blob Service and Azure SQL Database

Introduction

It is indisputable true that mlflow came to make life a lot easier not only for data scientists but also for data engineers, architects among others. There is a very helpful list of tutorials and example in the official mlflow docs. You can just download it, open a console and start using it locally on your computer. This is the fastest way to getting started. However, as soon as you progress and introduce mlflow in your team, or you want to use it extensively for yourself, some components should be deployed outside your laptop.

To exercise a deployment setup and since I own azure experience, I decided to provision a couple of resources in the cloud to deploy the model registry and store the data produced by the tracking server.

The code used during the article is available on github:

General overview

When I finished the diagram below, I noticed the code is located in the middle of everything. However, the code usually is developed locally. Data science teams must go beyond notebooks and operationalize their code. This will enable the integration with applications to deliver its value to end users and machines.

Example architecture overview

Tracking server

The tracking server is basically an API and UI. With the API you can logged parameters, code version, metrics and artifacts. The you can use the UI to query and visualize the experiment results. Experiments are a set of runs, and a run is the execution of a piece of code. The values from the experiments are recorded by default locally in a folder named mlruns in the directory where you call your code as can be seen in the following figure:

mlflow records experiment results locally by default

The results above can also be stored in a SQL Alchemy compatible database. The place where you store this data is called the backend store. In this example I used an Azure SQL Database. The details are described in the next sections.

The clients running experiment stores their artifacts output, i.e., models, in a location called the artifact store. If nothing is configured mlflow uses by default the mlruns directory as shown in the next figure:

Artifact store location

This location should be able to handle large amounts of data. Some different popular cloud providers storage services are supported. In this example Azure Blob Storage is used.

MLflow Projects

A project is just a directory, in this example a git repository, where a descriptor file is placed to specify the dependencies and how the code is executed.

MLflow Models

This module offers a way to unify the deployment of machine learning models. It defines a convention in order to package and share your code.

MLflow Registry

This is one of my favorite modules and is a centralized model repository with a UI and a set of APIs for model lifecycle management. If you run your own MLflow server, a database-backend must be configured. In this example an Azure SQL Database.

Preparing a docker image for the tracking server

One important thing is to make your work shareable and reusable. I really like docker containers because they help me to achieve that.  You can run them locally and also easily deploy them in different ways on different cloud providers.

For that I first tried to directly use the image provided by Yves Callaert. You can find the image in this git repo.

This docker image is created from a python image. The rest is quite simple, just a couple of environment variables, install the required python packages and define an entry point. Unfortunately, as usual, when you start getting away from the default configurations, things get complicated.

This docker image now must be able to connect to an Azure SQL Database using python. There are at least to major packages to achieve that. On is pymssql which seems to be the old way and has some limitation to work with Azure. The other is pyodbc.

The next step is to add pyodbc to the requirements.txt file. But that was not all. In order to work, pyodbc needs the ODBC drivers installed on the image. The new image added the SQL Server ODBC driver 17 for Ubuntu 18.04.

Last thing was to update the requirements file as follows:

python requirements docker image

The entry point is the script startup.sh which a modified as follows:

mlflow server --backend-store-uri "$BACKEND_URI" --default-artifact-root "$MLFLOW_SERVER_DEFAULT_ARTIFACT_ROOT" --host 0.0.0.0

You can find the upgraded code in my github repo.

Once you have downloaded the code just build the image. For instance, using your console, change the directory to the one with the DockerFile and issue:

docker build -t mlflowserver -f Dockerfile . --no-cache

Using blob storage for the tracking server artifact store

AS explain in the architecture overview, an Azure Blob Storage account was crated for the artifact backend. To configure it, you just need to set environment variable AZURE_STORAGE_ACCESS_KEY as follows:

wasbs://<container>@<storage-account>.blob.core.windows.net/<path>

Of course, first create an azure storage account and a container. I create a container named mlflow as shown in the following figure:

Artifact Store in Azure Blob Storage

And then my environment variable became:

MLFLOW_SERVER_DEFAULT_ARTIFACT_ROOT=wasbs://mlflow@mlautomationph271220.blob.core.windows.net

And to access the container from outside just set the storage account connection string environment variable:

AZURE_STORAGE_CONNECTION_STRING = <your azure storage connection string>

Using SQL server for the backend store

I created a serverless Azure SQL Database. A nice thing for testing and prototyping. If you want to change to another pricing model just configure another pricing tier.

From the SQL Server instance I need a user that can create and delete objects. I have not found exactly which permissions this user needs in the documentation but at least it should be able to create and drop tables, foreign keys and constraints. To be honest here, I just used the admin user. I need to investigate a bit deeper on this. When you already have your instance, user and password, you can build your connection string and also assign it to an environment variable as follows:

BACKEND_URI="mssql+pyodbc://<sqlserver user>:<password>@<your server>.database.windows.net:1433/<database name>?driver=ODBC+Driver+17+for+SQL+Server"

Test

In order to test it I used the sklearn_elasticnet_wine example from the mlflow tutorial: Train, serve, and score a linear regression model

It is enough to change a couple of lines in the code to use the tracking server we created:

Add tracking server to the train code in python
  1. Set the tracking server URL, in my case I ran the docker container locally
  2. Set the experiment passing its name as argument. If the experiment doesn’t exist it gets created
  3. Get the experiment Id
  4. Assign the experiment Id to the run

I left everything else as it was.

Now it is time to open the console and run our experiment.

Hint: remember to set the environment variable AZURE_STORAGE_CONNECTION_STRING where you execute the code.

The examples have several python requirement files you need to install depending on the tutorial you want to run. To simplify this I just wrote down my conda environment to a file on the folder “mlflow\examples\sklearn_elasticnet_wine”.

You can easily create a new conda environment using this file issuing:

conda create --name <env-name> --file mlflow\examples\sklearn_elasticnet_wine \requirements.txt

Time to execute the train.py script, from the root directory. I used different input values for the parameters alpha and l1_ratio, starting with 1 and 1:

Running the code

Parameters:

AlphaL1_ratio
11
10.5
0.51
0.250.65
Training parameters

Visualize experiment results using the tracking server UI

If you open the UI of the tracking server using your favorite browser you can visualize the experiment results:

Experiment Results MLflow Tracking Server

If you click on the start time you can open a single run and track code, versions, parameters, metrics and artifacts:

Single Run Results MLflow Tracking Server

If you scroll down to the bottom you can inspect the artifacts:

Experiment Artifacts

We can also verifiy the backend store tables are created in the azure SQL database instance:

For a complete description please refer to the official documentation using the link provide at the beginning of the post.

Deploy the model

If you are still not excited, now comes a very interesting part. Models cannot just stay on your laptop, you need to serve them somehow to applications and integrate them with other software pieces. Deploying the models to a web server as REST APIs is an excellent option to expose them as services.

To demonstrate mlflow deployment capabilities let’s deploy a REST server locally using:

mlflow models serve -m wasbs://mlflow@mlautomationph271220.blob.core.windows.net/0/866a64d8b7de488e83b985bd89d84afe/artifacts/model -p 1234

You need to replace the model location with the actual one. I found it in my previous screenshot:

Model location

Here we go:

Starting REST Server

The server is now running. Since I really like Postman, let´s just test the service with it. I will use the same input data as in the tutorial, which is a JSON-serialized pandas DataFrame:

Test REST Server using Postman

Voila, that´s it. Now we can score incoming data doing a REST call!

Further steps

To get completely away from local development, a vm, docker instance, or another service should be provisioned to run the mlflow docker container.

Also the REST server we created at the end should be deploy outside a laptop.

Once all infrastructure is already provisioned in the cloud, it would be very helpful to have an ARM template to be able to easily replicate and version the complete environment.

References

Posted in Uncategorized | Tagged , , | 2 Comments

Azure Digital Twins Management with Python

Introduction

As mentioned in my previous post, Azure Digital Twins (ADT) Service is a new way to build next generation IoT solutions. In the first post I show you in a video how to manage ADT instances with the ADT Explorer. In the second post I show how to do mostly the same but using Postman and the ADT Rest API.

ADT has control plane APIs and data plane APIs. The latest is used to manage elements in the ADT instance. In order to use these APIs Microsoft published a .Net (C#) SDK. And SDK is a convenient way to manage instances, since you can easily create applications for your digital twins. If for any reason you prefer to use another language like java, javascript or python, you need to generate your own SDK.

In this post I describe how to autogenerate a Python SDK using the tool Autorest and a swagger file.

In this repo an example of a generated SDK could be found: https://github.com/pauldj54/ADTDataPlane

Autorest

Autorest is a tool to generate client libraries for accessing RESTFul web services. It is available in github: https://github.com/Azure/autorest

Note: Autorest prerequisite is Node.js and the version 10.x is recommended. If you need multiple versions of Node.js I recommend you the tool nvm-windows that can be downloaded from this link: https://github.com/coreybutler/nvm-windows/releases

I will use PowerShell with admin rights for the next steps.

Now let’s select the desired Node.js version:

Node.js setup using nvm

The steps shown in the figure above are the following:

  • Print the current versions
  • Since only v12.x was available then install the 10.x version
  • List the versions available to confirm
  • Change the used version to 10.x
  • Confirm the changes.

Note the prefix “*” marking the selected Node.js version.

To install autorest I followed the steps from the official Azure documentation https://docs.microsoft.com/en-us/azure/digital-twins/how-to-create-custom-sdks:

npm install -g autorest
# run using command 'autorest'
autorest

Generate our own SDK

  1. In order to generate our own SDK the Swagger file with the ADT data plane APIs definition is needed and can be downloaded from here . Please be aware that the “examples” folder is also required, if not Autorest throws an error
  2. Place the downloads in a directory in your computer. I created a folder under my git directory with the name “adt_dataplane_api”.
  3. Open a console and navigate to the directory created in the previous step. Issue the following command: 
autorest --input-file=digitaltwins.json --python --output-folder=ADTApi --add-credentials --azure-arm --package-name=ADTApi --package-version=1.0

Basically you point to the swagger file (digitaltwins.json), select python as the output language, enter an output folder, package name and other details

Python SDK generation using Autorest

4. If everything ran successfully you should see the following output:

Resulting folder layout

Converting our own SDK to a python package

It is very convenient to convert the generated SDK in a python package and include it in the environments as needed. In order to do so, I followed these steps:

  1. Create a setup.py file in the “ADTDataPlane” directory:
setup(
      name='adtdataplane',
      version='0.1',
      description='ADT data plane python SDK',
      author='Azure Digital Twin autogenerated using autorest by Paul Hernandez',
      url='https://github.com/pauldj54/ADTDataPlane',
      packages=find_packages(exclude=['tests*'])
      )


2. Add the auto-generated code to git. I did it in my github using these directions

3. Now we are ready to install our newly generated package 😊

Installing the generated SDK using pip

4. Verify the installation:

Python packages available in this environment

Manage an ADT Instance

Once we have our SDK python package available is time to test it. For this post I registered an AAD application (app registration) and I am using the same ADT instance of the previous post.

  1. Find your Application (client) ID and Directory (tenant) ID:
client and tenant ID in the App registrations (Azure Portal)

2. Create a client secret and write it down:

Create a client secret to access your application

3. Grant the role “Azure Digital Twins Owner (Preview)” to the registered app:

Grant the correspondent role

4. Create a config file in the root directory (or another directory) within you python project and name it for instance settings.json . Hint: secrets and other sensible information will be stored in this file, so make sure you don’t push it to git or your source control.

The file should look like this:

{
    "client_id" : "<your-client-id>",
    "tenant_id" : "<your-tenant-id>",
    "adt_instance_url" : "https://management.azure.com",
    "secret" : "<your-secret>",
    "endpoint" : "https://<your-adt-instance>.api.neu.digitaltwins.azure.net",
    "scope" : ["https://digitaltwins.azure.net/.default"],
    "authority" : "https://login.microsoftonline.com/<your-tenant-id>"
}

5. Create a dtdl sample file to test the code. I crated the file “SampleModel.json” quite similar to the one in the official documentation:

{
  "@id": "dtmi:com:contoso:SampleModelPy;1",
  "@type": "Interface",
  "displayName": "SampleModelPy",
  "contents": [
    {
      "@type": "Relationship",
      "name": "contains"
    },
    {
      "@type": "Property",
      "name": "data",
      "schema": "string"
    }
  ],
  "@context": "dtmi:dtdl:context;2"
}

6. Import the following modules and install them if required:

import msal 
from msrestazure.azure_active_directory import AADTokenCredentials
import adtdataplane 
import logging
from azure.mgmt.consumption.models.error_response import ErrorResponseException
import json

msal is the Microsoft Authentication Library and is the preferred library according to the documentation. AADTokenCreadentials is the class used to build the credentials, adtdataplane is our generated sdk. Some other packages are required by the code.

7. Load the config file and create a confidential client application as follows:

# Load Config file
with open(r"settings.json") as f:
  config = json.load(f)

# Create a preferably long-lived app instance that maintains a token cache.
app = msal.ConfidentialClientApplication(
    config["client_id"], authority=config["authority"],
    client_credential=config["secret"],
    # token_cache=...  # Default cache is in memory only.
                       # You can learn how to use SerializableTokenCache from
                       # https://msal-python.rtfd.io/en/latest/#msal.SerializableTokenCache
    )


8. I used this code snippet from the azure python sdk examples to obtain a token:

# The pattern to acquire a token looks like this.
result = None

# First, the code looks up a token from the cache.
# Because we're looking for a token for the current app, not for a user,
# use None for the account parameter.
result = app.acquire_token_silent(config["scope"], account=None)

if not result:
    logging.info("No suitable token exists in cache. Let's get a new one from AAD.")
    result = app.acquire_token_for_client(scopes=config["scope"])

if "access_token" in result:
    # Call a protected API with the access token.
    print(result["token_type"], result["access_token"])
else:
    print(result.get("error"))
    print(result.get("error_description"))
    print(result.get("correlation_id"))  # You might need this when reporting a bug. 

9. I transform the acquired toke in AAD token credentials and create an SDK client:

credentials = AADTokenCredentials(result)

try:
    client = adtdataplane.AzureDigitalTwinsAPI(credentials = credentials, base_url = config['endpoint'])
    logging.info("Service client created – ready to go")
except ValueError as err:
    print('Client creation failed with error: {0}'.format(err))

10) Now we can load a dtdl model:

# load models
with open(r"models\SampleModel.json") as f:
  dtdl = json.load(f)
dtdl_list = []
dtdl_list.append(dtdl)
try:
  response = client.digital_twin_models.add(model = dtdl_list, raw=True)
  print(response)
except adtdataplane.models.ErrorResponseException as e:
  print(e)

Please notice the model location and modify it accordingly

11. Verify if the model was created

# Verify the model was created
response = client.digital_twin_models.get_by_id('dtmi:com:contoso:SampleModelPy;1')
print(response)

You should see something like this:

DTDL model retrieved

12. We could also verify that the model was correctly upload using the ADT Explorer:

ADT Explorer available DTDL models
ADT Explorer sample model definition

The entire python code:

import msal 
from msrestazure.azure_active_directory import AADTokenCredentials
import adtdataplane 
import logging
from azure.mgmt.consumption.models.error_response import ErrorResponseException
import json

# Load Config file
with open(r"settings.json") as f:
  config = json.load(f)

# Create a preferably long-lived app instance that maintains a token cache.
app = msal.ConfidentialClientApplication(
    config["client_id"], authority=config["authority"],
    client_credential=config["secret"],
    # token_cache=...  # Default cache is in memory only.
                       # You can learn how to use SerializableTokenCache from
                       # https://msal-python.rtfd.io/en/latest/#msal.SerializableTokenCache
    )

# The pattern to acquire a token looks like this.
result = None

# First, the code looks up a token from the cache.
# Because we're looking for a token for the current app, not for a user,
# use None for the account parameter.
result = app.acquire_token_silent(config["scope"], account=None)

if not result:
    logging.info("No suitable token exists in cache. Let's get a new one from AAD.")
    result = app.acquire_token_for_client(scopes=config["scope"])

if "access_token" in result:
    # Call a protected API with the access token.
    print(result["token_type"], result["access_token"])
else:
    print(result.get("error"))
    print(result.get("error_description"))
    print(result.get("correlation_id"))  # You might need this when reporting a bug. 

credentials = AADTokenCredentials(result)

try:
    client = adtdataplane.AzureDigitalTwinsAPI(credentials = credentials, base_url = config['endpoint'])
    logging.info("Service client created – ready to go")
except ValueError as err:
    print('Client creation failed with error: {0}'.format(err))

# load models
with open(r"models\SampleModel.json") as f:
  dtdl = json.load(f)
dtdl_list = []
dtdl_list.append(dtdl)
try:
  response = client.digital_twin_models.add(model = dtdl_list, raw=True)
  print(response)
except adtdataplane.models.ErrorResponseException as e:
  print(e)

# Verify the model was created
response = client.digital_twin_models.get_by_id('dtmi:com:contoso:SampleModelPy;1')
print(response)

Next steps

Even when the scenario presented in this post is extremely basic, now you have a python SDK to manage ADT instances. The benefit is more obvious when you have some data sets and want to populate your twins. In a next post I would like to show you how to write an ingestion program using the python SDK and a data source, most probably a CSV or a JSON file, let’s see what the open data world offers to us.

References

Posted in azure, python | Tagged , , , , | 1 Comment

Azure Digital Twins Management with Postman

In this video I will show how to manage Azure Digital Twins models and instances using Postman. How to use the ADT explorer is explained in my previous post: https://hernandezpaul.wordpress.com/2020/07/24/azure-digital-twins-and-adt-explorer-say-hello

ADT Management using Postman

In order to make the postman collection work you need to configure an environment as follows:

Postman Environment required

tenantId = your tenant Id, it could be found in the registered app

accessToken = will be populated withing a script

adtInstanceURL = the hostname of your ADT instance

clientId = as in your registered app

clientSecret = the one you generated in the registered app (see video)

scope = https://digitaltwins.azure.net/.default

You can find all the DTDL models and the postman collection in this repository:

https://github.com/pauldj54/adt-agrifood

The Swagger file of the ADT Management API:

https://github.com/Azure/azure-rest-api-specs/tree/master/specification/digitaltwins/resource-manager/Microsoft.DigitalTwins/preview/2020-03-01-preview

Posted in azure | Tagged , , , , , | 3 Comments

Azure Digital Twins and ADT Explorer – say hello!

ADT Explorer evaluation

Introduction

Azure Digital Twins Service offers a way to build next generation IoT solutions. There are other approaches on the market to describe IoT devices and build digital twins. Without making a formal comparison I can say with the Azure Digital Twins is possible to build a powerful semantic layer on top of your connected devices using domain specific models.

To show you how this work let’s create a kind of “hello world” example. An end-to-end solution is out-of-scope of this post. Instead I will create some hands-on tutorial to demonstrate some of the functionalities.

Scenario description

Let’s consider the following simplified use case.

I like farming, even when I am really a rookie on this topic. Let’s suppose we have a parcel in a farm. The parcel has a soil. There are also different product types in every soil.

Soil quality is an extensive topic and it could be measured using a set of physical, chemical and biological properties. One of them is the soil PH. Suppose we have one or more devices able to measure the soil PH and send the measured values to a local gateway, which transmit them to our digital twin instance in Azure. For more information about soil quality please visit this document:

https://ag.tennessee.edu/biodegradablemulch/Documents/What_is_Soil_Quality_Aug5_2015.pdf

Use Case Diagram

In the first video I only show you how to use the Azure Digital Twins Explorer. The use case is just a reference and I hope it makes a little bit of sense.

Prerequisites

Create a digital twin instance

https://docs.microsoft.com/en-us/azure/digital-twins/how-to-set-up-instance

Create an App registration in Azure Active Directory

https://docs.microsoft.com/en-us/azure/digital-twins/how-to-authenticate-client

From the register app we would need the Application (client) ID and Directory (tenant) ID. Since we are going to use an OAuth 2.0 authorization code flow. To learn more about authentication

flows please visit this article: https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-flows-app-scenarios

Azure Registered App in Azure Active Directory
Registered App

The next step is to grant the register app the permissions to interact with the digital twins service instance. There are two roles for that in the current preview version, “Azure Digital Twins Owner” and “Azure Digital Twins Reader”. We will use the owner role in this example.

Add Role assignment for the registered app in the ADT service instance
Add Role assignment for the registered app in the ADT service instance

DTDL models

In order to model the data I will used the FIWARE Agrifood smart data models as a starting point: https://github.com/smart-data-models/dataModel.Agrifood

I also created a super class called “Thing” in order to demonstrate inheritance in DTDL.

The created models are available in my github:

https://github.com/pauldj54/adt-agrifood

Model diagram:

DTDL Class Diagramm
DTDL Class Diagramm

ADT Explorer

The Azure Digital Twins (ADT) Explorer is an open source tool that allows model management, instance creation, relationship creation, graph visualization and run queries again our ADT instance. It can be download here: https://github.com/Azure-Samples/digital-twins-explorer/tree/master/

In the video I will show how to:

  • Upload models
  • Create Instances
  • Create Relationships
  • Executing some queries
Posted in azure | Tagged , , , , , , | 3 Comments

Streaming Technologies Comparison

After several time I decided to share my notes about comparing different open source streaming technologies on LinkedIn Streaming Technologies Comparison
https://www.linkedin.com/pulse/streaming-technologies-comparison-paul-hernandez

Posted in Uncategorized | Leave a comment

Installing Apache Zeppelin 0.7.3 in HDP 2.5.3 with Spark and Spark2 Interpreters

Background

As a recent client requirement I needed to propose a solution in order to add spark2 as interpreter to zeppelin in HDP (Hortonworks Data Platform) 2.5.3
The first hurdle is, HDP 2.5.3 comes with zeppelin 0.6.0 which does not support spark2, which was included as a technical preview. Upgrade the HDP version was not an option due to the effort and platform availability. At the end I found in the HCC (Hortonworks Community Connection) a solution, which involves installing a standalone zeppelin which does not affect the Ambari managed zeppelin delivered with HDP 2.5.3.
I want to share how I did it with you.

Preliminary steps

Stop current Zeppelin: version 0.6.0 comes with HDP 2.5.3

su zeppelin
 /usr/hdp/current/zeppelin-server/bin/zeppelin-daemon.sh stop

Deactivate script that starts this version by a system reboot
Zeppelin is started as an Ambari dependency in the script

 usr/lib/hue/tools/start_deps.mf

In order to avoid a modification in this file a custom init script could be crated to stop the default HDP Zeppelin and start the newer version

Apache Zeppelin Installation

Download Zeppelin: https://zeppelin.apache.org/download.html
Copy the .tar file tot he /tmp directory using WinSCP
Extract the .tar file in the target directory, i.e. opt

tar –xvf zeppelin-0.7.3-bin-all.tar -C /opt

Create a symlink to the last version (optional)

sudo ln –s zeppelin-0.7.3-bin-all/ zeppelin

Change the ownership of the folder

chown –R zeppelin:zeppelin /opt/zeppelin

Zeppelin Configuration

First copy the „conf“ directory from the existing zeppelin installation to the new version:

sudo yes | cp -rf /usr/hdp/current/zeppelin-server/conf/ /opt/zeppelin

In order to configure zeppelin to work with spark and spark2 client, the SPARK_HOME content needs to bind by the interpreter and comment out in the zeppelin-env.sh configuration file:
/opt/zeppelin/conf/zeppelin-env.sh

edit zeppelin-env

zeppelin-env.sh

According to the documentation, the variable ZEPPELIN_JAVA_OPTS changed in spark2 to ZEPPELIN_INTP_JAVA_OPTS. Since both versions are active these two variables are defined:

export ZEPPELIN_JAVA_OPTS=“-Dhdp.version=None -Dspark.executor.memory=512m -Dspark.executor.instances=2 -Dspark.yarn.queue=default”

export ZEPPELIN_INTP_JAVA_OPTS=“-Dhdp.version=None -Dspark.executor.memory=512m -Dspark.executor.instances=2 -Dspark.yarn.queue=default”

Start zeppelin 0.7.3

su zeppelin
/opt/zeppelin/bin/zeppelin-daemon.sh start

A pending issue here is to modifiy the startup scripts in order to persist the changes by a system reboot.

Configuring the spark interpreters

Navigate to the interpreter settings page:

interpreter menu

Open Interpreter Menu

Scroll-down to the spark interpreter and add the property:

SPARK_HOME = /usr/hdp/current/spark-client

add property spark interpreter

Add SPARK_HOME property to the spark interpreter

Create a new interpreter with interpreter group spark and name it spark2

Add new interpreter

create new interpreter

Create a new interpreter

Interpreter name and group (leave all other values as default)

create spark2 interpreter

Set interpreter name and group

Add the property:

SPARK_HOME = /usr/hdp/current/spark2-client

add property spark2 interpreter

Add SPARK_HOME property to the spark2 interpreter

Installation test

In order to test the installation create a new notebook and verify the binding of the interpreters

interpreter binding

Interpreter binding for the test notebook

Execute the following code in two different paragraphs:

%spark

sc.version
%spark2

sc.version

spark2 test

Test notebook

References

Posted in Analytics, hadoop, Spark | Tagged | 2 Comments

Talend job to lookup geographic coordinates into a shape file

Introduction

Recently for an open data integration project I had to select some tools in order to be able to process geospatial data. I had a couple of choices: I could use R and try to work out a solution with the packages available on the server or use Talend. One of the biggest restrictions was, the development environment had no internet connection due to security policies and I wanted to try some options iteractively. I decided to give Talend a try and asked the system admins to install the spatial plugin. I only had tried Talend before to accomplish some exercises from the book Talend for Big Data but never used it for a “real-world” project, which was challenging but also made me feel motivated.

Software requirements

Talend open studio for big data

https://www.talend.com/download/

Spatial extension for Talend

https://talend-spatial.github.io/

The experiment

Input data

Customers coordinates: a flat file containing x,y coordinates for every customer.

Municipalities in Austria: a shape file with multi-polygons defining the municipalities areas in Austria: source

Goal

Use the x,y coordinates from the customers to “look-up” the municipality code GKZ in the shape file, which in german stand for “Gemeindekennzahl”. The idea is to determine in which municipality lies every point (customer location).

This is an overview of the overall Talend job

jobOverview

Figure 1. Talend Job Overview

Create a generic schema

crateSchema.jpg

Figure 2. Create a generic schema

Use a sShapeFileInput component

Shapefile Input.JPG

Figure 3. Shape file input

The shapefile contains multipolygons and I want to have polygons. My solution was to use an sSimplify component. I used the default settings. You may need to analyze or find in the source metadata what kind of data is available within the shape file.

The projection of the shapefile was “MGI / Austria Lambert” which corresponds to EPSG 31287. I want to re-project it as EPSG 4326 (GCS_WGS_1984) which is the one used by my input coordinates.

sProj

Figure 4. Re-project the polygons

I read the x, y coordinates from a csv file.

With a s2DPointReplacer I converted the x,y coordinates as Point(x,y) (WKT: well-known text)

PointReplacer

Figure 5. Point replacer

Finally I created an expression in a tMap just to get the polygon and point intersection. The “contains” function would also work:

tmap

Figure 6. Calculate the intersection between the input geometries

Conclusion

Talend did the job and I recommend it as an alternative not only for classical ETL projects but also to create analytical data sets to be consumed by data scientists. Sometimes data cleansing (or data munging/wrangling, or whatever you want to call it) could be cumbersome with scripting languages. With Talend the jobs are easy to understand, could be fully parameterized and reused.

References

Posted in Business Intelligence, Geospatial data, Open Data, Talend | Tagged , , , , | 3 Comments