Data Sync Using Watermarking and Batch
home
A watermark refers to the high-water mark in a wall after a flood. This example finds the last item that was updated.
Assumptions
This document describes the details of the example within the context of Anypoint™ Studio, Mule ESB’s graphical user interface (GUI). Where appropriate, the XML configuration is provided.
This document assumes that you are familiar with Mule, Anypoint Studio, Mule flows, Mule Global Elements.
Example Use Case
In this example, the scheduler is used to periodically make pull requests to the data store in order to retrieve only the records that were modified since the last request. Salesforce CRM is queried for contacts that were recently modified and just prints them in the console.
Set Up and Run the Example
Create and run this example in your own instance of Anypoint Studio. Or, skip ahead to the next section if you prefer to simply examine this example.
- Open the Example project in Anypoint Studio from Anypoint Exchange. Do not run the application.
- Log in to your Salesforce account. From your account menu (your account is labeled with your name), select Setup.
- In the left navigation bar, under the Personal Setup heading, click to expand the My Personal Information folder.
- Click Reset My Security Token. Salesforce resets the token and emails you the new one.
- Access the email that Salesforce sent and copy the new token onto your local clipboard.
- In your application in Studio, click the Global Elements tab.
- Double-click the Salesforce global element to open its Global Element Properties panel. In the Security Token field, paste the new Salesforce token you copied from the email. Alternatively, configure the global element in the XML Editor.
- Change the contents of the Username and Password fields to your account-specific values, then click OK to save your changes.
- In the Package Explorer, right-click the connect-with-salesforce project name, then select Run As > Mule Application. Studio runs the application on the embedded server.
- In your browser, access your Salesforce account, then navigate to the Contacts tab.
- Use the drop-down menu to display All Contacts, then select one contact and edit (for example, change the first name).
- Go back to Anypoint Studio and click the console view and search for a log similar to this one:
INFO 2018-07-04 08:17:01,502 [salesforce-sync-Batch_Job-work-manager.05] org.mule.api.processor.LoggerMessageProcessor: Contact recently modified: {Name=Avi123 Green, LastModifiedDate=2018-07-04T06:16:54.000Z, Id=0032000001DpkrEAAR, type=Contact}
- Stop the Mule application by clicking the square, red terminate button in the Console.
How it Works
Using a single flow, this application queries Salesforce CRM for recently modified contacts and prints them in the console view.
The Scheduler Component periodically invokes Salesforce Connector to perform a query in your Salesforce account: to get Id, Name, and LastModifiedDate attributes of Contact objects that were modified since the last polling. The last invocation timestamp is stored by using Objectstore Component and updated after each Salesforce query. The polling is set to be executed every 10 seconds, but can be easily changed by accessing the component configuration. The polling is executed in the Input Phase of Batch Processing. To keep the example simple, Process Records Phase only prints the obtained records to the console.
The following steps outline the process to build this application.
- Drop a Batch Job in your application.
- Drop a Scheduler Component into flow's Source field to enable the polling mechanism. Set Frequency and Time units to 10 seconds or any time period you wish.
- Drop the Retrieve component from ObjectStore Connector after scheduler, set the Key field to lastUpdateTimestampKey and Default value to yesterday:
#[now() - |P1D|]
- In the Advanced tab set Target Variable to lastUpdateTimestamp and Target Value to:
#[payload as String]
- Next, put a Salesforce Connector after Retrieve. At this point, you can configure the connector with your Salesforce account details and test the connection to Salesforce.
- When you click Test Connection, Mule tests the connection to Salesforce. With a valid username, password and security token, the connection test results in success and Mule saves your global element configurations. If any of the values are invalid, the connection test results in failure, and Mule does not save the global element, prompting you to correct the invalid configurations.
- Back in the Salesforce Connector properties editor, choose Query operation from the dropdown menu and set Query Text to
SELECT Id,Name,LastModifiedDate FROM Contact WHERE LastModifiedDate > :lastUpdateTimestamp
- Set the parameter lastUpdateTimestamp to
vars.lastUpdateTimestamp
- Put the Store component from ObjectStore Connector after Query, set the Key field to lastUpdateTimestampKey and Value to:
#[now()]
- Add a Logger Component to the Process Records Phase of the batch and set Message to:
#['Contact recently modified: ' ++ write(payload, "application/json", {"indent": "false"})]
- The configuration is complete. Save and run the application. Modify an existing contact in your Salesforce account and watch it appear in the console.
Documentation
Studio includes a feature that enables you to easily export all the documentation you have recorded for your project. Whenever you want to share your project with others outside the Studio environment, you can export the project's documentation to print, email or share online. Studio's auto-generated documentation includes:
- A visual diagram of the flows in your application
- The XML configuration which corresponds to each flow in your application
- The text you entered in the Notes tab of any building block in your flow
Follow the procedure to export auto-generated Studio documentation.