AEM Elastic Search Integration

This tutorial will provide depth knowledge on AEM and Elasticsearch integration in terms of indexing (sync/add/update) data on publish of page and delete on page unpublish.

Please visit this URL to get more theoretical knowledge on Elasticsearch.

Implementation

We will be following below mentioned step by step process to sync/add/update data into Elasticsearch from AEM.

  • EventHandler will get call as soon as we publish/unpublish a page.
  • EventHandler will call a Jobconsumer which will make sure to complete the task.
  • Job will call an OSGI service to have a business logic and at the same time collect OSGI configurations.
  • One more application level common class to make 3rd party API’s HTTP calls.

As discussed, lets follow below steps in detail to implement Elasticsearch. Reading comment on top of every line will help us to understand code in detail.

  1. Follow URL to setup cloud instance.
  2. Follow this URL to create an index.
  3. Let’s move towards code, once instance is setup and index is successfully created.
  4. As mentioned, let’s create an event handler which will get call at the time of publish and unpublish of page.
package com.javadoubts.core.handler;

import com.day.cq.replication.ReplicationAction;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.event.jobs.JobManager;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import com.javadoubts.core.utils.CommonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

@Component(service = EventHandler.class, property = {
  EventConstants.EVENT_TOPIC+"="+ReplicationAction.EVENT_TOPIC }, immediate = true)
public class ElastisearchEventHandler implements EventHandler {

 private static final Logger LOG = LoggerFactory.getLogger(ElastisearchEventHandler.class);

 @Reference
 private JobManager jobManager;

 // Main logic to handle AEM specific events such as publish and unpublish of page
 public void handleEvent(Event event) {
  LOG.debug("handleEvent : starts...");

  String topic = event.getTopic();
  LOG.debug("handleEvent : topic {}", topic);

  // logic to get the page path for replication events
  ReplicationAction actionType = ReplicationAction.fromEvent(event);
  String action = actionType.getType().getName().toUpperCase();
  String pagePath = actionType.getPath();
  LOG.debug("handleEvent : else if action {}", action);

  // proceed only if the content path matches the configured values
  ResourceResolver resolver = CommonUtils.getServiceUser();

  // resolve to the full path from mapping
  Resource resource = resolver.resolve(pagePath);

  if (null != pagePath && null != resource && StringUtils.contains(pagePath, "/content/practice")) {
   if (null != action && null != pagePath && (!action.equals(StringUtils.EMPTY))
     && (!pagePath.equals(StringUtils.EMPTY))) {
    addJob(pagePath, action, topic, event);
   }
  }
 }

 private void addJob(String action, String topic, String path, Event event) {

  // create job properties
  Map<String, Object> jobProperties = new HashMap<>();
  jobProperties.put("path", path);
  jobProperties.put("topic", topic);
  jobProperties.put("action", action);

  // Set action type as activate or deactivate 
  if(event.getProperty("type") != null && StringUtils.isNotEmpty(event.getProperty("type").toString())) {
   jobProperties.put("actionType", action);
  }

  // add to job in queue
  jobManager.addJob("com/practice/JobHandler", jobProperties);
 }
}

5. Create below job handler to process the request. This job will help us create JSON which we will index in to Elasticsearch as document. At the same time it call ElasticsearchService service class to make a post/put/delete call to Elasticsearch instance.

package com.javadoubts.core.jobs;

import com.day.cq.replication.ReplicationActionType;
import com.javadoubts.core.services.ElasticsearchService;
import com.javadoubts.core.utils.CommonUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.json.JSONException;
import org.json.JSONObject;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;


@Component(service = { PracticeJobHandler.class, JobConsumer.class }, property = {
  JobConsumer.PROPERTY_TOPICS + "=com/practice/JobHandler" }, immediate = true)
public class PracticeJobHandler implements JobConsumer {

 @ObjectClassDefinition(name = "Job Handler", description = "Custom Job Handler for data to be handled")
 public static @interface Config {

  @AttributeDefinition(name = "job.topics.name", description = "job.topics.description", defaultValue = "com/practice/JobHandler")
  String job_topics();
 }

 private static final Logger LOG = LoggerFactory.getLogger(PracticeJobHandler.class);

 @SuppressWarnings("AEM Rules:AEM-3")
 ResourceResolver resolver = null;

 @Reference
 private ElasticsearchService elasticsearchService;
 
 @Override
 public JobResult process(Job job) {

  int httpStatus = 0;
  try {
   LOG.info("Starting Job for topic: {}", job.getTopic());

   // pull properties
   String path = job.getProperty("path").toString();
   String topic = job.getProperty("topic").toString();
   String action = job.getProperty("action").toString();
   String actionType = job.getProperty("actionType").toString();
   LOG.info("Path:::: {} Action:::: {} Topic:::: {}", path, action, topic);

   resolver = CommonUtils.getServiceUser();

   LOG.debug("Getting resolver Obejct {}",resolver);
   Resource resource = resolver.getResource(path);

   boolean resourceExists = false;
            if (null != resource && !ResourceUtil.isNonExistingResource(resource)) {
                resourceExists = true;
            }
   // process DAM assets
   if (resourceExists) { // process content pages
    String jsonBody = StringUtils.EMPTY;
    if(!ReplicationActionType.DEACTIVATE.getName().equalsIgnoreCase(actionType)) {
     jsonBody = generatePage(resource);
    }

    if (StringUtils.isNotBlank(jsonBody) ||
      ReplicationActionType.DEACTIVATE.getName().equalsIgnoreCase(actionType)) {
     // send event
     sendEvent(jsonBody, topic, action, resource, actionType);
    }
            }

   // log response
   LOG.debug("API response: {} ", httpStatus);
   LOG.info("job completed successfully");
   // set job to success
   return JobResult.OK;

  } catch (IOException e) {
            LOG.error("RepositoryException IOException ", e);
   return JobResult.FAILED;
  }catch (JSONException e) {
            LOG.error("RepositoryException | WCMException | LoginException  {}", e);
   return JobResult.OK;
  } finally {
   // clean up
   if (resolver != null) {
    resolver.close();
   }
  }
 }

 private void sendEvent(String jsonBody, String topic, String action, Resource resource, String actionType) throws IOException {
  int httpStatus = elasticsearchService.sendEvent(jsonBody, topic, action, resource, actionType);
  if (httpStatus != 200) {
   throw new IOException("Error sending page :: httpStatus :: "+httpStatus);
  }
 }

 // Create JSON or document for syncing
 private String generatePage(Resource resource) throws JSONException {
  JSONObject document = new JSONObject();
  Resource contentresource = resource.getChild("jcr:content");
  if(contentresource != null) {
   ValueMap vm = contentresource.getValueMap();
   document.put("title", vm.get("jcr:title", String.class));
   document.put("description", vm.get("jcr:description", String.class));
   document.put("path", resource.getPath());
   document.put("name", resource.getName());
  }
  return document.toString();
 }
}

6. Create below OSGI configuration to have Elasticsearch URL, username and password.

com.javadoubts.core.services.impl.ElasticsearchServiceImpl.cfg.json

{
 "serverApiUsername": "elastic",
 "serverApiPassword": "bFbPE2v5lT1T7qtEfcfXqHaJ",
 "elasticSearchEndpointUrl": "https://25c75063f6e34368afa17a2cb292138c.us-central1.gcp.cloud.es.io:443"
}

7. Below is the OCD (object class definition) for above OSGI configurations.

package com.javadoubts.core.services;

import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.AttributeType;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

@ObjectClassDefinition(
    name = "ElasticSearch OCD Configuration",
    description = "ElasticSearch OCD Configuration description"
)
public @interface ElasticSearchConfiguration {

    @AttributeDefinition(
      name = "Elasticsearch API username",
      description = "Elasticsearch API username",
      type = AttributeType.STRING)
    String serverApiUsername();

    @AttributeDefinition(
      name = "Elasticsearch API password",
      description = "Elasticsearch API password",
      type = AttributeType.STRING)
    String serverApiPassword();

    @AttributeDefinition(
      name = "Elasticsearch API Endpoint URL",
      description = "Elasticsearch API Endpoint URL",
      type = AttributeType.STRING)
    String elasticSearchEndpointUrl();
}

8. Below is the ElasticsearchService interface having sendEvent() method which will help us to make http calls to Elasticsearch environment.

package com.javadoubts.core.services;

import org.apache.sling.api.resource.Resource;

import java.io.IOException;

/**
 * ElasticsearchService - Define methods.
 */
public interface ElasticsearchService {

 // This method sends the JSON response
 public int sendEvent(String jsonBody, String topic, String action, Resource resource, String actionType) throws IOException;
}

9.Below is the ElasticsearchServiceImpl class which will help us to make http put/delete call to add/update or delete data in Elasticsearch instance depending on action type as activate or deactivate of page.package com.javadoubts.core.services.impl;

package com.javadoubts.core.services.impl;

import com.day.cq.replication.ReplicationActionType;
import com.javadoubts.core.handler.ElastisearchEventHandler;
import com.javadoubts.core.services.ElasticSearchConfiguration;
import com.javadoubts.core.services.ElasticsearchService;
import com.javadoubts.core.services.PracticeService;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.sling.api.resource.Resource;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * ElasticsearchServiceImpl
 */
@Component(service = ElasticsearchService.class, immediate = true, enabled = true, property = {
  Constants.SERVICE_DESCRIPTION + "=ElasticsearchServiceImpl Config Properties", Constants.SERVICE_VENDOR + "=Practice" })
@Designate(ocd = ElasticSearchConfiguration.class)
public class ElasticsearchServiceImpl implements ElasticsearchService {

 private static final Logger LOG = LoggerFactory.getLogger(ElastisearchEventHandler.class);

 // OSGI Service to get Resourceresolver
 @Reference
 PracticeService practiceService;

 String serverApiUsername;
 String serverApiPassword;
 String elasticSearchEndpointUrl;

 @Activate
 protected void activate(ElasticSearchConfiguration config) {

  // collect url, username and password from OSGI ElasticSearchConfiguration
  this.serverApiUsername = config.serverApiUsername();
  this.serverApiPassword = config.serverApiPassword();
  this.elasticSearchEndpointUrl = config.elasticSearchEndpointUrl();

  initialize();
 }

 protected CloseableHttpClient httpClient;

 // initializes the http client
 public void initialize() {
  PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();

  // Increase max total connection
  connManager.setMaxTotal(20);
  // Increase default max connection per route
  connManager.setDefaultMaxPerRoute(5);

  // config timeout
  RequestConfig config = RequestConfig.custom()
    .setConnectTimeout(10 * 1000)
    .setConnectionRequestTimeout(10 * 1000)
    .setSocketTimeout(10 * 1000).build();

  httpClient = HttpClients.custom()
    .setConnectionManager(connManager)
    .setDefaultRequestConfig(config).build();
 }

 public int sendEvent(String jsonBody, String topic, String action, Resource resource, String actionType) throws IOException {

  CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.serverApiUsername, this.serverApiPassword));
  HttpClientContext localContext = HttpClientContext.create();
  localContext.setCredentialsProvider(credentialsProvider);

  String uniqueId = String.valueOf(Math.abs(resource.getPath().hashCode()));

  String apiUrl = elasticSearchEndpointUrl + "/practice-index-en" + "/_doc/" + uniqueId;

  if(ReplicationActionType.DEACTIVATE.getName().equalsIgnoreCase(topic)) {
   return delete(httpClient, apiUrl, localContext);
  } else {
   return execute(httpClient, apiUrl, jsonBody, localContext);
  }
 }

 /*
  * This represents only the most basic contract for HTTP request execution.
  * Executes HTTP client using the default context and returns the response
  * to the request
  */
 private int delete(CloseableHttpClient httpClient, String apiPath,
        HttpClientContext localContext) {
  LOG.info("Entry in the execute(): apiPath {}", apiPath);
  HttpDelete deleteMethod = new HttpDelete(apiPath);
  int statusCode=0;

  CloseableHttpResponse response = null;
  try {
   response = httpClient.execute(deleteMethod, localContext);
   statusCode = response.getStatusLine().getStatusCode();
   LOG.info("Layer return the {} status code",statusCode);

  }
  catch (IOException e) {
   LOG.error("IOException", e.getMessage());
  }
  finally {
   if (null != response) {
    try {
     response.close();
    } catch (IOException e) {
     LOG.error("IOException", e.getMessage());
    }
   }
  }
  return statusCode;
 }

 /*
  * This represents only the most basic contract for HTTP request execution.
  * Executes HTTP client using the default context and returns the response
  * to the request
  */
 private int execute(CloseableHttpClient httpClient, String apiPath, String json,
      HttpClientContext localContext) {
  LOG.info("Entry in the execute(): apiPath {}", apiPath);

  StringEntity requestEntity = new StringEntity(json, "UTF-8");
  HttpPut putMethod = new HttpPut(apiPath);
  putMethod.addHeader("content-type", "application/json");
  putMethod.setEntity(requestEntity);
  int statusCode=0;

  CloseableHttpResponse response = null;
  try {
   response = httpClient.execute(putMethod, localContext);
   statusCode = response.getStatusLine().getStatusCode();
   LOG.info("Layer return the {} status code",statusCode);

  }
  catch (IOException e) {
   LOG.error("IOException", e.getMessage());
  }
  finally {
   if (null != response) {
    try {
     response.close();
    } catch (IOException e) {
     LOG.error("IOException", e.getMessage());
    }
   }
  }
  return statusCode;
 }

}

10. Use below class to create service user to get Resourceresolver.

Note: Create practiceuser service user and provide required permissions. This Link will provide complete knowledge to create system user.package com.javadoubts.core.utils;

package com.javadoubts.core.utils;

import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;

import java.util.HashMap;
import java.util.Map;

public class CommonUtils {

    public static ResourceResolver getServiceUser() {
        try {
            Map<String, Object> param = new HashMap<>();
            param.put(ResourceResolverFactory.SUBSERVICE, "practiceuser");
            BundleContext bundleContext = FrameworkUtil.getBundle(CommonUtils.class).getBundleContext();
            ServiceReference resourceResolverFactoryRef = bundleContext
                    .getServiceReference(ResourceResolverFactory.class.getName());
            ResourceResolverFactory resFactory = (ResourceResolverFactory) bundleContext
                    .getService(resourceResolverFactoryRef);
            return resFactory.getServiceResourceResolver(param);
        } catch(LoginException e) {

        }
        return null;
    }
}

11. Publish page and check Elasticsearch instance. we will see data created in Elasticsearch instance.

On unpublish it will allow us to delete data/document form publish instance.

Note: As part of this blog, I kept my code short and tried to explain integration in between AEM and Elasticsearch. Please follow best practices to have string literals as part of Constant file. Also, create a generic list to declare the list of properties which we want to index. Catch specific exceptions and define connection related constants as part of configuration file.

Imran Khan

Specialist Master (Architect) with a passion for cutting-edge technologies like AEM (Adobe Experience Manager) and a proven track record of delivering high-quality software solutions.

  • Languages: Java, Python
  • Frameworks: J2EE, Spring, Struts 2.0, Hibernate
  • Web Technologies: React, HTML, CSS
  • Analytics: Adobe Analytics
  • Tools & Technologies: IntelliJ, JIRA

🌐 LinkedIn

📝 Blogs

📧 Imran Khan