Orchestrating Data Flows in a Hybrid Cloud

Team Kranio

March 2, 2021

This article is based on the webinar "Automate Data Flows between Legacy and Cloud". The article is more detailed and step by step with code. If you prefer to watch the webinar of 1 now, hit click here and choose 'watch replay' at the end.

Why orchestrate processes?

Orchestrating and automating processes is part of the objectives of companies in their digital transformation phase. This is because many companies with more years in the market, have legacy systems fulfilling essential roles decades ago. Therefore, when companies seek to modernize their processes, the right thing to do is to do it incrementally, with decoupled services and deployed in a hybrid cloud: with cloud and on premise components working together.

One of the Amazon Web Services services that we like the most in Kranio and in which we are experts, is Step Functions. It consists of a state machine, very similar to a flowchart with sequential inputs and outputs where each output depends on each input.


Each step is a Lambda function, i.e. serverless code that only runs when needed. AWS provides the runtime and we don't have to manage any kind of server.

Use Case

A case that helps us to understand how to apply Step Functions, is to create sequential records in multiple tables of an on premise DB from a cloud application, through an api rest with an event-driven architecture.  

This case can be summarized in a diagram like this:

© Kranio 2021

Here we can see:

  1. A data source, such as a web form.
  2. Data Payload: the data we need to register them in the DB.
  3. CloudWatch Events: Also called Event Bridge, these are events that allow you to "trigger" AWS services, in this case, the State Machine.
  4. Api Gateway: AWS service that allows you to create, publish, maintain and monitor apis rest, http or websockets.
  5. A relational database.

Advantages

The advantages of orchestrating on premise from the cloud are:

  1. Reuse of existing components without leaving them behind
  2. The solution is decoupled, so that each action to be performed, has its own development, facilitating maintenance, error identification, etc., 
  3. If business requirements change, we know what happened and what needs to be changed or between which steps a new status needs to be added. 
  4. in case changes are required, it mitigates the impact on the on premise as the orchestration is in the cloud, .
  5. With serverless alternatives, there is no need to manage servers and their operating systems.
  6. They are low cost solutions. If you want to know more check the prices of using Lambda, Api Gateway, SNS and CloudWatch Events. 

And now what?

You already know the theory of orchestrating a data flow. Now we'll show you the considerations and steps you need to take into account to put it into practice.

Development

The resources to be used are:

  1. An Amazon Web Services account and have AWS CLI configured like this link
  2. Python +3.7
  3. The Serverless framework (learn how to setup here )
  4. The Boto3 Python library

How to leave

Being an orchestration we will need to identify the sequential steps we want to orchestrate. And since the orchestration is to automate, the flow should also start automatically.

For this we will base ourselves on the use case presented above, and we will suppose that the DB in which we write, corresponds to one of the components of the CRM of a company, that is to say, one of the technologies with which the customer base is managed. 

We will create an event-driven solution, starting the flow with the reception of a message from some source (such as a web form).

After the event is received, its content (payload) must be sent via POST to an endpoint to enter the database. This DB can be cloud or on premise and the endpoint must have a backend that can perform limited operations on the DB.  

To facilitate the deployment of what needs to be developed we use the Serverless framework that allows us to develop and deploy. 

 The project will be divided into 3 parts:

Name Description
Api Gateway An api in Api Gateway that will be in charge of creating the records in the DB.
Infrastructure Here we will simulate an on premise DB and create an Event Bus from Event Bridge to initialize the flow.
Orchestration The Step Functions code

‍If you can better understand these conversations, rank (word cloud), prioritize and engage users, you can improve a lot and build a true cult following.

Then these projects are deployed in the order infrastructure >> step-functions >> api-gateway.

It can be the same directory, where we dedicate 3 folders. The structure would be as follows:

├──api-gateway

│ ├── scripts-database

│ │ ├── db.py

│ │ └── script.py

│ ├── libs

│ │ └─└─ api_responses.py

│ ├── serverless.yml

│ └─── service

│ ├── create_benefit_back.py

│ ├── create_client_back.py

│ └──create_partner_back.py

├──infrastructure

│ └─── serverless.yml

└── step-functions

    ├── functions

    │ ├── catch_errors.py

    │ ├── create_benefit.py

    │ ├── create_client.py

    │ ├── create_partner.py

    │ └── receive_lead.py

    ├── serverless.yml

    └└── services

            └──crm_service.py

       

Talk is cheap. Show me the code.

And with this famous quote from Linus Torvalds, we will see the essential code of the project we are creating. You can see the detail here.

Backend

The previous endpoints are useless if they don't have a backend. To relate each endpoint with a backend, we must create Lambda functions that write in the database the parameters that the endpoint receives. Once the Lambda functions are created, we enter their ARN in the "uri" parameter inside "x-amazon-apigateway-integration".

A key thing about Lambda functions is that they consist of a main method called handler that receives 2 parameters: message and context. Message is the input payload, and Context contains data about the function invocation and data about the execution itself. All Lambda functions must receive an input and generate an output. You can learn more here.

The functions of each endpoint are very similar and only vary in the data that the function needs to be able to write to the corresponding table.

Function: createClient

Role: creates record in the CLIENTS table of our DB

-- CODE language-python -- def handler(message, context): try: msg = json.loads(message["body"]) data = return_tuple( msg["name"], msg["lastname"], msg["rut"], msg["mail"], msg["phone"] ) conn = connect() res = create_record(conn, INSERT_CLIENT, data) return response_success(res["success"], res["message"]) except Exception as e: print("Exception: ", e)


Function: createPartner

Role: creates record in the PARTNER table of our DB

-- CODE language-python -- def handler(message, context): try: crm_service = CRMService(message). crm_service.create_partner() return message except Exception as e: print(e) return e


Function: createBenefit

Role: creates record in the BENEFIT table of our DB

-- CODE language-python -- def handler(message, context): try: crm_service = CRMService(message). r = crm_service.create_benefit() return r except Exception as e: print(e) return e


IaaC - Infrastructure as Code

In the serverless.yml code we declare all the resources we are defining. To deploy them, you must have AWS CLI properly configured and then run the command 

-- CODE language-shell -- $ sls deploy -s {stage} -r {my AWS region} 


This generates a Cloudformation stack that groups all the resources you declared. Learn more here.

In the Serverless.yml files you will see values like this:

-- CODE language-yaml -- ${file(${self:provider.stage}.yml):db_pass}

‍If you can better understand these conversations, rank (word cloud), prioritize and engage users, you can improve a lot and build a true cult following.

These are references to strings in other yml documents within the same path, pointing to a particular value. You can learn more about this way of working here.

Api Gateway

For the api rest we will build an Api Gateway with a Serverless project.  

The purpose of the API is to receive requests from the Step Functions, registering data in the database.

The Api Gateway will allow us to expose endpoints to which to perform methods. In this project we will only create POST methods.  

We'll show you the basics of the project and you can see the details here.


OpenAPI Specification

An alternative to declare the API, its resources and methods, is to do it with OpenAPI. To learn more about Open Api, read this article we did about it.

This file is read by the Api Gateway service and generates the API.


Important: if we want to create an Api Gateway it is necessary to add to the OpenApi an extension with information that only AWS can interpret. For example: the create_client endpoint that we call via POST, receives a request body that a specific backend must process. That backend is a lambda. The relationship between the endpoint and the lambda function is declared in this extension. You can learn more about it here .

-- CODE language-yaml -- openapi: "3.0.1" info: title: "testapi" version: "2021-01-21T15:44:04Z" servers: - url: "https://{id-de-api}.execute-api.{TU-REGION-AQUI}.amazonaws.com/{basePath}" variables: basePath: default: "/dev" paths: /create_client: post: responses: 200: description: "200 response" content: application/json: schema: $ref: "#/components/schemas/ApiResponseBody" requestBody: description: req body para crear cliente content: application/json: schema: $ref: "#/components/schemas/CreateClientRequestBody" example: name: alice lastname: cooper rut: 11111111-1 phone: 54545454 mail: acooper@alice.com security: - api_key: [] x-amazon-apigateway-integration: uri: arn:aws:apigateway:{TU-REGION-AQUI}:lambda:path/2015-03-31/functions/arn:aws:lambda:{TU-REGION-AQUI}:{TU-ACCOUNT-ID-AQUI}:function:${stageVariables.CreateClientBackLambdaFunction}/invocations responses: default: statusCode: 200 credentials: arn:aws:iam::{TU-ACCOUNT-ID-AQUI}:role/${stageVariables.ApiGatewayStepFunctionLambdaRole} httpMethod: POST passthroughBehavior: "when_no_match" type: aws_proxy /create_partner: post: responses: 200: description: "200 response" content: application/json: schema: $ref: "#/components/schemas/ApiResponseBody" requestBody: description: req body para crear socio content: application/json: schema: $ref: "#/components/schemas/CreatePartnerRequestBody" example: rut: 11111111-1 store: ESTACION_CENTRAL security: - api_key: [] x-amazon-apigateway-integration: uri: arn:aws:apigateway:{TU-REGION-AQUI}:lambda:path/2015-03-31/functions/arn:aws:lambda:{TU-REGION-AQUI}:{TU-ACCOUNT-ID-AQUI}:function:${stageVariables.CreatePartnerBackLambdaFunction}/invocations responses: default: statusCode: 200 credentials: arn:aws:iam::{TU-ACCOUNT-ID-AQUI}:role/${stageVariables.ApiGatewayStepFunctionLambdaRole} httpMethod: POST passthroughBehavior: "when_no_match" type: aws_proxy /create_benefit: post: responses: 200: description: "200 response" content: application/json: schema: $ref: "#/components/schemas/ApiResponseBody" requestBody: description: req body para crear beneficio content: application/json: schema: $ref: "#/components/schemas/CreateBenefitRequestBody" example: rut: 11111111-1 wantsBenefit: true security: - api_key: [] x-amazon-apigateway-integration: uri: arn:aws:apigateway:{TU-REGION-AQUI}:lambda:path/2015-03-31/functions/arn:aws:lambda:{TU-REGION-AQUI}:{TU-ACCOUNT-ID-AQUI}:function:${stageVariables.CreateBenefitBackLambdaFunction}/invocations responses: default: statusCode: 200 credentials: arn:aws:iam::{TU-ACCOUNT-ID-AQUI}:role/${stageVariables.ApiGatewayStepFunctionLambdaRole} httpMethod: POST passthroughBehavior: "when_no_match" type: aws_proxy components: schemas: CreateClientRequestBody: title: Create Client Req Body type: object properties: name: type: string lastname: type: string rut: type: string phone: type: string mail: type: string CreatePartnerRequestBody: title: Create Partner Req Body type: object properties: rut: type: string store: type: string CreateBenefitRequestBody: title: Create Benefit Req Body type: object properties: rut: type: string wantsBenefit: type: bool ApiResponseBody: title: Api Res Body type: object properties: statusCode: type: integer message: type: string description: type: string


When you deploy the project, Api Gateway will interpret this file and create this in your AWS console:


To know the URL of the deployed API, you must go to the Stages menu. The stage is a state of your api at a given time (note: you can have as many stages as you want with different versions of your API). Here you can indicate an abbreviation for the environment you are working in (dev, qa, prd), you can indicate the version of the api you are doing (v1, v2) or indicate that it corresponds to a test version (test).

In the Api Gateway console, we indicated that we would make a deploy with stage name "dev", so when you go to Stage you will see something like this:



The URLs of each endpoint can be found by clicking on the names listed. This is what the create_client endpoint looks like:


Infrastructure

Here we will create the relational database and the Event Bridge event bus. 

For now the DB will be in the AWS cloud, but it could be a database in your own data center or in another cloud. 

The Event Bridge event bus allows us to communicate 2 isolated components that may even be on different architectures. Learn more about this service here

This repository is smaller than the previous one, as it only declares 2 resources.


Serverless.yml

-- CODE language-yaml -- # serverless.yml service: webinar-iass provider: name: aws runtime: nodejs12.x stage: ${opt:stage, 'dev'} region: {TU-REGION-AQUI} versionFunctions: false deploymentBucket: # aquí debe ir el nombre de un bucket de S3 que determines para deployment. Ejemplo: mi_proyecto_serverless_s3 # si pones el mismo en cada archivo serverless, tu deploy por proyecto quedará ordenado y dentro del mismo bucket. name: kranio-webinar resources: - Resources: WebinarMariaDB: Type: AWS::RDS::DBInstance Properties: DBName: WebinarMariaDB AllocatedStorage: '5' Engine: MariaDB DBInstanceClass: db.t2.micro # aquí va el usuario y pass de base de datos. self:provider.stage hace que se tome el parámetro de ambiente del deploy # y que se abra el archivo que tenga ese nombre (ej. dev.yml). así puedes parametrizar tus valores. MasterUsername: ${file(${self:provider.stage}.yml):db_user} MasterUserPassword: ${file(${self:provider.stage}.yml):db_pass} DeletionPolicy: Snapshot # aquí declaramos el event bus de Event Bridge. Para escribir aquí. WebinarEventBus: Type: AWS::Events::EventBus Properties: Name: WebinarEventBus

‍If you can better understand these conversations, rank (word cloud), prioritize and engage users, you can improve a lot and build a true cult following.

You need to create the following tables in your DB. You can be guided by these database scripts here.

CLIENTS
name VARCHAR(25)
lastname VARCHAR(25)
rut VARCHAR(25)
mail VARCHAR(25)
phone VARCHAR(25)

‍If you can better understand these conversations, rank (word cloud), prioritize and engage users, you can improve a lot and build a true cult following.

PARTNERS
rut VARCHAR(25)
store VARCHAR(25)


BENEFIT
rut VARCHAR(25)
wantsBenefit BOOL


With these steps we finish the creation of the infrastructure.

Step Functions

Each "step" of the State Machine that we will create is a Lambda function. Unlike the Lambda that I talked about in the Api Gateway item and that have the role of writing to the DB, these make requests to the endpoints of the Api Gateway.

According to the above architecture based on a sequential flow, the State Machine should have these steps:

  1. Receive data from a source (e.g. Web form) via Event Bridge event.
  2. Take the event data, build a payload with the name, last name, phone, branch and rut to the create_client endpoint for the backend to write it in the CLIENTS table.
  3. Take the data from the event, build a payload with the rut and branch to the create_partner endpoint for the backend to write it in the PARTNERS table.
  4. Take the data from the event, build a payload with the rut and wantsBenefit to the create_benefit endpoint for the backend to write it to the BENEFITS table.
  5. You can create an additional Lambda to which the flow arrives in case there is an error in the execution (example: the endpoint is down). In the case of this project, it is called catch_errors.

Therefore, a Lambda is made per action of each step.  

Function: receive_lead

Role: receives the Event Bridge event. Cleans it up and passes it to the next Step. This step is important because when an event is received, it arrives in a JSON document with attributes defined by Amazon, and the content of the event (the JSON of the form) is nested inside an attribute called "detail".

When a source sends you an event through Event Bridge, the payload looks like this:

-- CODE language-python -- { "version": "0", "id": "c66caab7-10f8-d6e9-fc4e-2b92021ce7ed", "detail-type": "string", "source": "kranio.event.crm", "account": "{tu número de cuenta}", "time": "2021-01-26T21:52:58Z", "region": "{tu región}", "resources": [], "detail": { "name": "Camila", "lastname": "Saavedra", "rut": "9891283-0", "phone": 56983747263, "mail": "csaavedra@micorreo.com", "store": "est_central", "wantsBenefit": false, "origin": "app" } }


We can define a Lambda that returns the contents of "detail" to the next function, as in the following example:   

-- CODE language-python -- def handler(message, context): print('Receiving event...'). return message["detail"]


Function: create_client

Role: Receives in message the content of the Lambda from the previous step. It takes the content and passes it as an argument to the instance of the CRMService class.  

-- CODE language-python -- def handler(message, context): try: crm_service = CRMService(message). crm_service.create_client() return message except Exception as e: print ("Exception: ", e) return e

 

In the CRMService class we declare the methods that will perform the request according to the endpoint. In this example, the request is to the create_client endpoint. For the API calls we used the Python Requests library:

-- CODE language-python -- class CRMService: def __init__(self, payload): self.payload = payload def create_client(self): try: r = requests.post(url=URL+CREATE_CLIENT, data=json.dumps(self.payload)) # if the response code is not 200 you can return an exception. if r.status_code != 200: return Exception(r.text). return json.loads(r.text) except Exception as e: return e


The Lambda functions for create_partner and create_benefit are similar to create_client, except that they call the corresponding endpoints. You can check case by case in this part of the repository.


Function: catch_error.py

Role: it takes the errors that are presented and can return them to diagnose what may have happened. It is a Lambda function like any other, so it also has a handler, context and returns a json.

-- CODE language-python -- def handler(message, context): exception_message = { "success":False, "description": "ha ocurrido una excepcion en el proceso. revisar log", "message": message } return exception_message


Then we declare the Serverless.yml of this project 

-- CODE language-yaml -- service: webinar-step-functions # frameworkVersion: '2.3.0' provider: name: aws runtime: python3.7 stage: ${opt:stage, 'dev'} region: {TU-REGION-AQUI} prefix: ${self:service}-${self:provider.stage} versionFunctions: false deploymentBucket: # aquí debe ir el nombre de un bucket de S3 que determines para deployment. Ejemplo: mi_proyecto_serverless_s3 # si pones el mismo en cada archivo serverless, tu deploy por proyecto quedará ordenado y dentro del mismo bucket. name: kranio-webinar package: excludeDevDependencies: false custom: prefix: '${self:service}-${self:provider.stage}' # arn_prefix es un string que se repetiría muchas veces si no se parametriza. en dev.yml podrás ver qué contiene. # según tus variables por ambiente, debes hacer un documento en caso de ser necesario. arn_prefix: '${file(${self:provider.stage}.yml):arn_prefix}' defaultErrorHandler: ErrorEquals: ["States.ALL"] Next: CatchError # declaración de todas las funciones lambda functions: receiveLead: handler: functions/receive_lead.handler createClient: handler: functions/create_client.handler createPartner: handler: functions/create_partner.handler createBenefit: handler: functions/create_benefit.handler catchErrors: handler: functions/catch_errors.handler # los step functions stepFunctions: stateMachines: CreateClientCRM: # aqui indicamos que el state machine se inicia cuando # ocurre el evento eventBridge que tiene ese eventBusName y # source en específico name: CreateClientCRM events: - eventBridge: eventBusName: WebinarEventBus event: source: - "kranio.event.crm" definition: # aqui indicamos que el state machine parte con este paso. Comment: inicia proceso de inscripcion de cliente StartAt: ReceiveLead States: # ahora indicamos los stados. # type indica que estos pasos son tareas. # resource indica el arn de la función lambda que # se debe ejcutar en este paso # next indica el paso que viene # catch indica a qué función llamamos si ocurre un error, en este caso, catch_error. ReceiveLead: Type: Task Resource: '${self:custom.arn_prefix}-receiveLead' Next: CreateClient Catch: - ${self:custom.defaultErrorHandler} CreateClient: Type: Task Resource: '${self:custom.arn_prefix}-createClient' Next: CreatePartner Catch: - ${self:custom.defaultErrorHandler} CreatePartner: Type: Task Resource: '${self:custom.arn_prefix}-createPartner' Next: CreateBenefit Catch: - ${self:custom.defaultErrorHandler} CreateBenefit: Type: Task Resource: '${self:custom.arn_prefix}-createBenefit' Catch: - ${self:custom.defaultErrorHandler} End: true CatchError: Type: Task Resource: '${self:custom.arn_prefix}-catchErrors' End: true


Now we have the Lambda functions for each step of the State Machine, we have the API that writes to the DB and we have the endpoint exposed to make requests.

Sending a message to the Event Bridge event bus

For all this to start interacting, it is necessary to send the event that initializes the flow.

Assuming we're working on your company's CRM, and you're getting the initial data from a web form, the way to write to the event bus that will initialize the flow is through the AWS SDK. Learn about the languages for which it is available here

If you are working with Python, the way to submit the form would be this:

-- CODE language-python -- client = boto3.client('events') # ejecutar este script permite enviar un evento de eventbridge # Source y EventBusName deben coincidir con lo que declaras en serverless.yml response = client.put_events( Entries=[ { 'Time': datetime.now(), # el source del evento 'Source': 'kranio.event.crm', 'Resources': [ 'string', ], 'DetailType': 'Inscripcion para CRM ', # payload son los datos del formulario. 'Detail': json.dumps(payload), 'EventBusName': 'WebinarEventBus' }, ] )


Once you have configured everything correctly, you should go to the Step Functions service in your AWS console and see the list of events sent:


If you choose the last event, you will see the sequence of execution of the Step Functions and the detail of their inputs and outputs..:


Where when choosing the ReceiveLead step, the input corresponds to the payload sent as an event via Event Bridge.

The test of truth

If you enter your database (either with the terminal client or with an intuitive visual client) you will see that each piece of data is in each of the corresponding tables.


Conclusions

Step Functions is a very powerful service if you need to automate a sequential flow of actions. We now elaborate a simple example, but it is highly scalable. In addition, working with Step Functions is an invitation to decouple the requirements of the solution you need to implement, which makes it easy to identify failure points.

This type of orchestration is completely serverless, so it is much cheaper than developing an application that runs on a server just to fulfill this role.

It's a great way to experiment with a hybrid cloud, reusing and integrating applications from your data center, and interacting with cloud services.

Do you want to accelerate, simplify and reduce the costs of your data flows with serverless and step functions? Need help getting started? Let's talk!

Team Kranio

March 1, 2021

Previous entries