Wednesday, August 29, 2012

Storing Documents in MarkLogic via XCC

Integrating Java applications to MarkLogic involves using the MarkLogic XCC (XML Contentbase Connector). XCC is a set of APIs that support Java, .Net, etc.  XCC uses a MarkLogic XDBC server embedded in the MarkLogic server to connect to the XML databases on the MarkLogic server.

After the XDBC server is created in MarkLogic, you can test connectivity via a simple Hello World call, seen below.  Note that I use the admin/admin credentials that I set up when I installed MarkLogic.  In truth, MarkLogic has a very granular security scheme that can be used to control access and privileges.  SSL should also be used.  The Hello World example is from the XCC Developers Guide.
package com.icfi.marklogic;

import java.net.URI;
import java.net.URISyntaxException;

import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.ContentSourceFactory;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.XccConfigException;

public class HelloWorld {
 public static void main(String[] args) throws URISyntaxException,
   XccConfigException, RequestException {

  URI uri = new URI("xcc://admin:admin@localhost:8050/Documents");
  ContentSource contentSource = ContentSourceFactory.newContentSource(uri);

  Session session = contentSource.newSession();
  Request request = session.newAdhocQuery("\"Hello World\"");
  ResultSequence rs = session.submitRequest(request);
  System.out.println(rs.asString());
  session.close();
 }
}

Once connectivity has been tested, you are ready to start storing documents.  Another example from the XCC Guide (customized for my use) can be seen below.  In this example, I again connect to the XDBC server that I created and get a session object.  It is important to state here that connection pooling is done automatically for you by XCC.  The API also supports JTA.


package com.icfi.marklogic;

import java.net.URI;
import java.net.URISyntaxException;

import com.icfi.marklogic.content.XmlContent;
import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.ContentSourceFactory;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.RetryableXQueryException;
import com.marklogic.xcc.exceptions.XccConfigException;

public class ContentAdder {
 public static final int MAX_RETRY_ATTEMPTS = 5;
 public static final int RETRY_WAIT_TIME = 1000;

 public static void main(String[] args) throws URISyntaxException,
   XccConfigException, RequestException {
  URI uri = new URI("xcc://admin:admin@localhost:8050/Documents");
  ContentSource contentSource = ContentSourceFactory
    .newContentSource(uri);
  Session session = contentSource.newSession();
  session.setTransactionMode(Session.TransactionMode.UPDATE);

  // Re-try logic for a multi-statement transaction
  for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
   try {
    session.submitRequest(session
      .newAdhocQuery("xdmp:document-insert('/docs/catalog.xml', "
        + XmlContent.CATALOG + ")"));
    session.submitRequest(session
      .newAdhocQuery("xdmp:document-insert('/docs/bookstore.xml', "
        + XmlContent.BOOKSTORE + ")"));
    session.commit();
    break;
   } catch (RetryableXQueryException e) {
    try {
     Thread.sleep(RETRY_WAIT_TIME);
    } catch (InterruptedException ie) {
     // Ignore
    }
   }
  }
  session.close();
 }
}

In this example I also use the recommended approach to retrying operations against the XDBC server.  The code that does the "heavy-lifting" to store the documents is seen below.  In this code, the XCC API uses the session object to make a request to the XDBC server with a new ad-hoc query that uses the xdmp:document-insert function which is a built-in markLogic XQuery function.  In its simplest form the document-insert function takes a unique document URI and the XML document content.  In this example the XML content is provided by a Groovy-String (G-String) in XmlContent.groovy class.  I use Groovy string constants because G-Strings preclude me from having to write all that nasty java.lang.String concatenation.
session.submitRequest(session
      .newAdhocQuery("xdmp:document-insert('/docs/catalog.xml', "
        + XmlContent.CATALOG + ")"));

To verify that the docs were stored, I will go out to the MarkLogic Query Console (http://localhost:8000/qconsole/).  In the console, I can run  XQuery queries to verify that I stored the documents in the database.  Note:  When MarkLogic installs, it creates several databases, and when you create an XDBC server, you must choose a database to connect to.  I chose the "Documents" database, but I could have created a new one for this purpose.  Below is a screenshot of the Query Console.  I clicked on the "Explore" button to view a list of all the documents in this database.
If I wanted to view the contents of a document, I could run an XQuery as seen below, or I could also simply click on the document in the list.
The value in the XQuery doc function is the unique URI for the document in the MarkLogic database. This was a simple example of storing documents in MarkLogic.  In reality, considerable thought should be exercised to create the proper structures (Directories and Collections, etc.) that would be used to house and organize documents.  Organizing documents into directories and collections makes them easier to handle en masse if that requirement exists.  Another important point to make is that these docs were already XML.  Going forward, I will be serializing Java objects into XML via XStream. Before I can store Java objects as serialized XML, I need to map important attributes of my model objects to the MarkLogic container model.  To do this I wrote a custom Java Annotation, seen below.
package com.icfi.marklogic;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Document {
 String documentUriPrefix() default "";
 String documentUriSuffix() default "";
 String collections() default "";
 String directory() default "";
 String properties() default "";
}
The Employee.java model class seen below uses the Document annotation to define the MarkLogic container specific semantics that would be used when the document representing the Java object is stored in MarkLogic.
package com.icfi.model;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.icfi.framework.Strings;
import com.icfi.marklogic.Document;

/**
 * Employee model object.
 * 
 * @author jimmyray
 * @version 1.0
 */
@Document(documentUriSuffix = "/employee.xml", 
  collections = "http://employees.none.com", directory = "/Employees/", properties="NEW")
public class Employee extends Person implements Serializable {
 private static final long serialVersionUID = 2523764855390968707L;
 private String id;
 private Address address;
 private String employeeId;
 private Date hireDate;
 private Department department;
 private String title;
 private int salary;

 public String getId() {
  return id;
 }

 public void setId(String id) {
  this.id = id;
 }
...

Seen below, the EmployeeServiceImpl processes the annotations on the Employee class to get at the metadata needed to process the documents in MarkLogic.
...
 private void processAnnotations(Employee employee)
   throws ClassNotFoundException {

  this.documentMap = new HashMap();

  Class clas = Class.forName("com.icfi.marklogic.Document");
  Document document = (Document) employee.getClass().getAnnotation(clas);

  String uri = document.directory() + employee.getId()
    + document.documentUriSuffix();

  String collections = document.collections();

  String properties = document.properties();

  this.documentMap.put(URI_KEY, uri);

  if (null != collections && !collections.equals("")) {
   this.documentMap.put(COLLECTIONS_KEY, collections);
  }

  if (null != properties && !properties.equals("")) {
   this.documentMap.put(PROPERTIES_KEY, properties);
  }

 }...



Below is a JUnit test that exercises the EmployeeServiceImpl class.  This test loads employees and stores them in the MarkLogic database using the employee service.
package com.icfi.marklogic;

import java.util.List;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

import com.icfi.model.Employee;
import com.icfi.services.EmployeeService;

public class EmployeeServiceTest {
 private static Logger log = LoggerFactory
   .getLogger(EmployeeServiceTest.class);

 @Test
 public void testEmployeeService() throws Exception {
  ApplicationContext ctx = new GenericXmlApplicationContext(
    "context/main.xml");

  EmployeeService employeeService = (EmployeeService) ctx
    .getBean("employeeService");

  List<employee> employees = employeeService.buildEmployees();

  for (Employee employee : employees) {
   System.out.println(employee);
  }

  employeeService.persistEmployees(employees);
  
  //employeeService.removeEmployees(employees);
 }
}
The persistEmployee() method (Seen below) of the EmployeeServiceImpl class persists employees documents into MarkLogic via the MarkLogicDao class.
public void persistEmployee(Employee employee) {
  try {
   this.processAnnotations(employee);
   dao.storeDocument(this.serialize(employee), this.documentMap);
  } catch (XccConfigException xce) {
   log.error(Strings.getStackTraceAsString(xce));
  } catch (RequestException re) {
   log.error(Strings.getStackTraceAsString(re));
  } catch (URISyntaxException use) {
   log.error(Strings.getStackTraceAsString(use));
  } catch (ClassNotFoundException cnfe) {
   log.error(Strings.getStackTraceAsString(cnfe));
  } catch (TransformerConfigurationException tce) {
   log.error(Strings.getStackTraceAsString(tce));
  } catch (TransformerFactoryConfigurationError tfce) {
   log.error(Strings.getStackTraceAsString(tfce));
  }
 }
This method performs an inline serialization using XStream.
private String serialize(Employee employee)
   throws TransformerConfigurationException,
   TransformerFactoryConfigurationError {
  
                XStream xstream = new XStream(new DomDriver());
  String xml = xstream.toXML(employee);
 
  return xml;
 }
The MarkLogicDao class does the heavy lifting  and interfaces with the MarkLogic database.  Its methods use the documentMap to get access to the metadata attached to the Employee objects via the Document annotation.
package com.icfi.marklogic;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.icfi.services.EmployeeService;
import com.marklogic.xcc.ContentSource;
import com.marklogic.xcc.ContentSourceFactory;
import com.marklogic.xcc.Session;
import com.marklogic.xcc.exceptions.RequestException;
import com.marklogic.xcc.exceptions.RetryableXQueryException;
import com.marklogic.xcc.exceptions.XccConfigException;

/**
 * DAO class to abstract the database layer from the Java application.
 * 
 * @author jimmyray
 * @version 1.0
 */
public class MarkLogicDao {
 private static Logger log = LoggerFactory.getLogger(MarkLogicDao.class);

 public static final int MAX_RETRY_ATTEMPTS = 5;
 public static final int RETRY_WAIT_TIME = 1;

 private Session session;

 /**
  * Store multiple XML documents
  * 
  * @param data
  * @param map
  * @throws XccConfigException
  * @throws RequestException
  * @throws URISyntaxException
  */
 public void storeDocuments(String[] data, Map<String, String> map)
   throws XccConfigException, RequestException, URISyntaxException {
  for (String doc : data) {
   this.storeDocument(doc, map);
  }
 }

 /**
  * Store a single XML document.
  * 
  * @param data
  * @param map
  * @throws URISyntaxException
  * @throws XccConfigException
  * @throws RequestException
  */
 public void storeDocument(String data, Map<String, String> map)
   throws URISyntaxException, XccConfigException, RequestException {

  log.debug("Storing " + map.get(EmployeeService.URI_KEY));

  this.buildSession();

  session.setTransactionMode(Session.TransactionMode.AUTO);

  // Re-try logic for a multi-statement transaction
  for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
   try {
    log.debug("request 1");
    session.submitRequest(session
      .newAdhocQuery("xdmp:document-insert('"
        + map.get(EmployeeService.URI_KEY) + "', "
        + data + ")"));

    if (map.containsKey(EmployeeService.COLLECTIONS_KEY)) {
     log.debug("request 2");
     session.submitRequest(session
       .newAdhocQuery("xdmp:document-set-collections('"
         + map.get(EmployeeService.URI_KEY) + "', '"
         + map.get(EmployeeService.COLLECTIONS_KEY)
         + "')"));
    }

    if (map.containsKey(EmployeeService.PROPERTIES_KEY)) {
     log.debug("request 3");
     session.submitRequest(session
       .newAdhocQuery("xdmp:document-set-properties('"
         + map.get(EmployeeService.URI_KEY) + "', "
         + map.get(EmployeeService.PROPERTIES_KEY)
         + ")"));
    }

    //session.commit();
    break;
   } catch (RetryableXQueryException e) {
    try {
     Thread.sleep(RETRY_WAIT_TIME);
    } catch (InterruptedException ie) {
     Thread.currentThread().interrupt();
    }
   }
  }
  session.close();
 }

 /**
  * Delete an XML document
  * 
  * @param docUri
  * @throws URISyntaxException
  * @throws XccConfigException
  * @throws RequestException
  */
 public void deleteDocument(String docUri) throws URISyntaxException,
   XccConfigException, RequestException {
  this.buildSession();

  // session.setTransactionMode(Session.TransactionMode.UPDATE);

  // Re-try logic for a multi-statement transaction
  for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
   try {
    session.submitRequest(session
      .newAdhocQuery("xdmp:document-delete('" + docUri + "')"));
    // session.commit();
    break;
   } catch (RetryableXQueryException e) {
    try {
     Thread.sleep(RETRY_WAIT_TIME);
    } catch (InterruptedException ie) {
     Thread.currentThread().interrupt();
    }
   }
  }
  session.close();
 }

 /*
  * Build the MarkLogic session needed for other operations.
  * 
  * @throws URISyntaxException
  * 
  * @throws XccConfigException
  */
 private void buildSession() throws URISyntaxException, XccConfigException {
  if (null != session && !session.isClosed()) {
   return;
  }

  URI uri = new URI("xcc://admin:admin@localhost:8050/Documents");
  ContentSource contentSource = ContentSourceFactory
    .newContentSource(uri);
  this.session = contentSource.newSession();
 }
}
The purpose of the MarkLogicDao is to abstract the access layer and prototype the XDMP calls to the MarkLogic XCC API.  The screen shot below shows the documents loaded by their unique URIs and their collections.
By clicking on the (properties) link, you can access the document properties metadata.  These metadata are helpful when you want to process documents and keep track of which ones have been processed or other in process statuses.  Below are the properties for one of the employee docs.


  NEW
  2012-09-05T15:24:28-04:00

Going forward I will discuss the power behind XPath and XQuery embedded in MarkLogic.

3 comments:

  1. Hi Jimmy, I wonder if you've had a chance to look at the new Java API in MarkLogic 6? I wrote a tutorial on it here: http://developer.marklogic.com/learn/java

    It also integrates JAXB. I'd love to hear your thoughts on how using the new Java API would compare with the approach you took using XCC (before the new API was available). I look forward to reading more of your thoughts and insights about MarkLogic. I'm subscribed. :-)

    ReplyDelete
  2. Actually no, but I am because I can no longer get the XCC maven setup to work.

    ReplyDelete