Friday, July 10, 2009

A Camel based XML Payload HTTP polling provider

Wow, what a mouthful of a title that is.

The EAI Polling Consumer pattern is well documented. Polling consumers are particularly useful for HTTP clients such as AJAX applications. Their presence provides a means of implementing publish and subscribe.

What I needed was some code to service my AJAX consumer; a Polling Provider so to speak.

Given my immersion in Apache Camel I came up with a Polling Provider for XML objects provided by XMLBeans. The provider assumes that an HTTP endpoint is to be interacted with and thus sets the HTTP time headers appropriately.

The Polling Provider holds the notion that the xml object is updated by one thread and then consumed by another. Furthermore the time the xml object was last updated is retained. This allows HTTP consumers to specify the HTTP ifModifiedSince header and block if the condition is not met.

To update the provider (fidsDocumentPollingProvider is an instance of it and newFidsDocument is an XML document object):


fidsDocumentPollingProvider.update(
newFidsDocument, new Date());


To retrieve the resource in a RESTful manner:


// Provide a RESTful service to retrieve FIDS data
from("jetty:http://0.0.0.0:9000/FIDSService/FIDS")
.inOut("direct:getFIDS");

// Provide a component to process FIDS requests
from("direct:getFIDS")
.process(fidsDocumentPollingProvider);


Here is the code I came up with for the provider. I would be very interested to hear of better ways to do this given Apache Camel.


package com.classactionpl.camel.xmlbeans;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.http.HttpProducer;
import org.apache.xmlbeans.XmlObject;

public class XMLHttpPayloadPollingProvider implements Processor {
XmlObject xmlObject;
Date lastModified;
Lock lock = new ReentrantLock();
Condition update = lock.newCondition();

public void update(XmlObject xmlObject, Date lastModified) {
lock.lock();
try {
this.xmlObject = xmlObject;
// We don't need the milliseconds and they can upset
// our comparisons given that they are not passed from the
// outside world.
this.lastModified = new Date((lastModified.getTime() / 1000) * 1000);
update.signal();
} finally {
lock.unlock();
}
}

public void process(Exchange exchange) throws Exception {
lock.lock();
try {
boolean waitForUpdate = true;
Date ifModifiedSince = null;
if (lastModified != null) {
String ifModifiedSinceStr = exchange.getIn().getHeader(
"If-Modified-Since", String.class);
if (ifModifiedSinceStr != null) {
try {
SimpleDateFormat rfc822DateFormat = new SimpleDateFormat(
"EEE', 'dd' 'MMM' 'yyyy' 'HH:mm:ss' 'Z",
Locale.US);
ifModifiedSince = rfc822DateFormat
.parse(ifModifiedSinceStr);
waitForUpdate = (lastModified
.compareTo(ifModifiedSince) <= 0);
} catch (ParseException e) {
}
}
}

boolean provideDateHeader;

if (waitForUpdate) {
update.await(90, TimeUnit.SECONDS);
}

if (xmlObject != null) {
if (ifModifiedSince == null) {
ifModifiedSince = lastModified;
}
if (lastModified.compareTo(ifModifiedSince) > 0) {
exchange.getOut().setBody(xmlObject.newInputStream());
exchange.getOut().setHeader("Content-Type", "text/xml");
provideDateHeader = true;
} else {
exchange.getOut().setHeader(
HttpProducer.HTTP_RESPONSE_CODE, 304);
provideDateHeader = true;
}
} else {
exchange.getOut().setHeader(HttpProducer.HTTP_RESPONSE_CODE,
404);
provideDateHeader = false;
}

if (provideDateHeader && lastModified != null) {
SimpleDateFormat rfc822DateFormat = new SimpleDateFormat(
"EEE', 'dd' 'MMM' 'yyyy' 'HH:mm:ss' GMT'", Locale.US);
rfc822DateFormat.setTimeZone(TimeZone.getTimeZone("GMT"));
exchange.getOut().setHeader("Last-Modified",
rfc822DateFormat.format(lastModified));

}

} finally {
lock.unlock();
}
}
}

No comments: