diff --git a/broker/appengine/.classpath b/broker/appengine/.classpath
index cdc386254..5d8e8539b 100644
--- a/broker/appengine/.classpath
+++ b/broker/appengine/.classpath
@@ -3,12 +3,13 @@
-
+
+
diff --git a/broker/appengine/.settings/com.google.appengine.eclipse.core.prefs b/broker/appengine/.settings/com.google.appengine.eclipse.core.prefs
index b727dd4d9..88d528198 100644
--- a/broker/appengine/.settings/com.google.appengine.eclipse.core.prefs
+++ b/broker/appengine/.settings/com.google.appengine.eclipse.core.prefs
@@ -1,3 +1,3 @@
-#Wed May 26 15:13:15 PDT 2010
+#Tue Aug 03 15:54:32 PDT 2010
eclipse.preferences.version=1
-validationExclusions=src/com/metaweb/gridworks/appengine/*ClientConnection*.java
+validationExclusions=src/com/google/gridworks/appengine/*ClientConnection*.java
diff --git a/broker/appengine/module/MOD-INF/module.properties b/broker/appengine/module/MOD-INF/module.properties
index 84644d4ae..cf4498b76 100644
--- a/broker/appengine/module/MOD-INF/module.properties
+++ b/broker/appengine/module/MOD-INF/module.properties
@@ -1,4 +1,4 @@
name = broker
description = Google App Engine implementation of Gridworks Broker
-module-impl = com.metaweb.gridworks.broker.AppEngineGridworksBrokerImpl
+module-impl = com.google.gridworks.broker.AppEngineGridworksBrokerImpl
templating = false
diff --git a/broker/appengine/src/com/google/gridworks/appengine/AppEngineClientConnection.java b/broker/appengine/src/com/google/gridworks/appengine/AppEngineClientConnection.java
new file mode 100644
index 000000000..70012f94e
--- /dev/null
+++ b/broker/appengine/src/com/google/gridworks/appengine/AppEngineClientConnection.java
@@ -0,0 +1,243 @@
+package com.google.gridworks.appengine;
+
+import static com.google.appengine.api.urlfetch.FetchOptions.Builder.allowTruncate;
+
+import java.io.ByteArrayOutputStream;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLSession;
+
+import org.apache.http.Header;
+import org.apache.http.HttpConnectionMetrics;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.conn.ManagedClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.message.BasicHttpResponse;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.HttpContext;
+
+import com.google.appengine.api.urlfetch.HTTPHeader;
+import com.google.appengine.api.urlfetch.HTTPMethod;
+import com.google.appengine.api.urlfetch.HTTPRequest;
+import com.google.appengine.api.urlfetch.HTTPResponse;
+import com.google.appengine.api.urlfetch.URLFetchService;
+import com.google.appengine.api.urlfetch.URLFetchServiceFactory;
+
+class AppEngineClientConnection implements ManagedClientConnection {
+ // Managed is the composition of ConnectionReleaseTrigger,
+ // HttpClientConnection, HttpConnection, HttpInetConnection
+
+ private HttpRoute _route;
+ private Object _state;
+ private boolean _reuseable;
+
+ public AppEngineClientConnection(HttpRoute route, Object state) {
+ _route = route;
+ _state = state;
+ }
+
+ // ManagedClientConnection methods
+
+ public HttpRoute getRoute() {
+ return _route;
+ }
+
+ public Object getState() {
+ return _state;
+ }
+
+ public SSLSession getSSLSession() {
+ return null;
+ }
+
+ public boolean isSecure() {
+ // XXX maybe parse the url to see if it's https?
+ return false;
+ }
+
+ public boolean isMarkedReusable() {
+ return _reuseable;
+ }
+
+ public void markReusable() {
+ _reuseable = true;
+ }
+
+ public void layerProtocol(HttpContext context, HttpParams params) {
+ return;
+ }
+
+ public void open(HttpRoute route, HttpContext context, HttpParams params) {
+ return;
+ }
+
+ public void setIdleDuration(long duration, TimeUnit unit) {
+ return;
+ }
+
+ public void setState(Object state) {
+ _state = state;
+ }
+
+ public void tunnelProxy(HttpHost next, boolean secure, HttpParams params) {
+ return;
+ }
+
+ public void tunnelTarget(boolean secure, HttpParams params) {
+ return;
+ }
+
+ public void unmarkReusable() {
+ _reuseable = false;
+ }
+
+
+ // ConnectionReleaseTrigger methods
+
+ public void releaseConnection() {
+ return;
+ }
+
+ public void abortConnection() {
+ return;
+ }
+
+ // HttpClientConnection methods
+
+ private HTTPRequest _appengine_hrequest;
+ private HTTPResponse _appengine_hresponse;
+
+ public void flush() {
+ return;
+ }
+
+ public boolean isResponseAvailable(int timeout) {
+ // XXX possibly use Async fetcher
+ return true;
+ }
+
+ public void receiveResponseEntity(org.apache.http.HttpResponse apache_response) {
+ byte[] data = _appengine_hresponse.getContent();
+
+ if (data != null) {
+ apache_response.setEntity(new ByteArrayEntity(data));
+ }
+ }
+
+ public HttpResponse receiveResponseHeader() {
+ URLFetchService ufs = URLFetchServiceFactory.getURLFetchService();
+ try {
+ _appengine_hresponse = ufs.fetch(_appengine_hrequest);
+ } catch (java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ org.apache.http.HttpResponse apache_response =
+ new BasicHttpResponse(new ProtocolVersion("HTTP", 1, 0),
+ _appengine_hresponse.getResponseCode(),
+ null);
+
+ for (HTTPHeader h : _appengine_hresponse.getHeaders()) {
+ apache_response.addHeader(h.getName(), h.getValue());
+ }
+
+ return apache_response;
+ }
+
+ public void sendRequestEntity(org.apache.http.HttpEntityEnclosingRequest request) {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+ org.apache.http.HttpEntity ent = request.getEntity();
+ if (ent != null) {
+ try {
+ ent.writeTo(os);
+ } catch (java.io.IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ _appengine_hrequest.setPayload(os.toByteArray());
+ }
+
+ public void sendRequestHeader(org.apache.http.HttpRequest apache_request) {
+ URL request_url;
+
+ HttpHost host = _route.getTargetHost();
+
+ String protocol = host.getSchemeName();
+ String addr = host.getHostName();
+ int port = host.getPort();
+
+ String path = apache_request.getRequestLine().getUri();
+
+ try {
+ request_url = new URL(protocol, addr, port, path);
+ } catch (java.net.MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ HTTPMethod method = HTTPMethod.valueOf(apache_request.getRequestLine().getMethod());
+ _appengine_hrequest = new HTTPRequest(request_url, method, allowTruncate()
+ .doNotFollowRedirects());
+
+ Header[] apache_headers = apache_request.getAllHeaders();
+ for (int i = 0; i < apache_headers.length; i++) {
+ Header h = apache_headers[i];
+ _appengine_hrequest
+ .setHeader(new HTTPHeader(h.getName(), h.getValue()));
+ }
+ }
+
+ // HttpConnection methods
+
+ public void close() {
+ return;
+ }
+
+ public HttpConnectionMetrics getMetrics() {
+ return null;
+ }
+
+ public int getSocketTimeout() {
+ return -1;
+ }
+
+ public boolean isOpen() {
+ return true;
+ }
+
+ public boolean isStale() {
+ return false;
+ }
+
+ public void setSocketTimeout(int timeout) {
+ return;
+ }
+
+ public void shutdown() {
+ return;
+ }
+
+ // HttpInetConnection methods
+
+ public InetAddress getLocalAddress() {
+ return null;
+ }
+
+ public int getLocalPort() {
+ return -1;
+ }
+
+ public InetAddress getRemoteAddress() {
+ return null;
+ }
+
+ public int getRemotePort() {
+ return -1;
+ }
+}
\ No newline at end of file
diff --git a/broker/appengine/src/com/google/gridworks/appengine/AppEngineClientConnectionManager.java b/broker/appengine/src/com/google/gridworks/appengine/AppEngineClientConnectionManager.java
new file mode 100644
index 000000000..50da0fd8c
--- /dev/null
+++ b/broker/appengine/src/com/google/gridworks/appengine/AppEngineClientConnectionManager.java
@@ -0,0 +1,76 @@
+package com.google.gridworks.appengine;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.ClientConnectionRequest;
+import org.apache.http.conn.ManagedClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.scheme.SchemeRegistry;
+import org.apache.http.conn.scheme.SocketFactory;
+import org.apache.http.params.HttpParams;
+
+public class AppEngineClientConnectionManager implements ClientConnectionManager {
+
+ private SchemeRegistry schemes;
+
+ class NoopSocketFactory implements SocketFactory {
+ public Socket connectSocket(Socket sock, String host, int port, InetAddress addr, int lport, HttpParams params) {
+ return null;
+ }
+
+ public Socket createSocket() {
+ return null;
+ }
+
+ public boolean isSecure(Socket sock) {
+ return false;
+ }
+ }
+
+ public AppEngineClientConnectionManager() {
+ SocketFactory noop_sf = new NoopSocketFactory();
+ schemes = new SchemeRegistry();
+ schemes.register(new Scheme("http", noop_sf, 80));
+ schemes.register(new Scheme("https", noop_sf, 443));
+ }
+
+ public void closeExpiredConnections() {
+ return;
+ }
+
+ public void closeIdleConnections(long idletime, TimeUnit tunit) {
+ return;
+ }
+
+ public ManagedClientConnection getConnection(HttpRoute route, Object state) {
+ return new AppEngineClientConnection(route, state);
+ }
+
+ public SchemeRegistry getSchemeRegistry() {
+ return schemes;
+ }
+
+ public void releaseConnection(ManagedClientConnection conn, long valid, TimeUnit tuint) {
+ return;
+ }
+
+ public ClientConnectionRequest requestConnection(final HttpRoute route, final Object state) {
+ return new ClientConnectionRequest() {
+ public void abortRequest() {
+ return;
+ }
+
+ public ManagedClientConnection getConnection(long idletime, TimeUnit tunit) {
+ return AppEngineClientConnectionManager.this.getConnection(route, state);
+ }
+ };
+ }
+
+ public void shutdown() {
+ return;
+ }
+}
diff --git a/broker/appengine/src/com/google/gridworks/broker/AppEngineGridworksBrokerImpl.java b/broker/appengine/src/com/google/gridworks/broker/AppEngineGridworksBrokerImpl.java
new file mode 100644
index 000000000..205e7a49a
--- /dev/null
+++ b/broker/appengine/src/com/google/gridworks/broker/AppEngineGridworksBrokerImpl.java
@@ -0,0 +1,365 @@
+package com.google.gridworks.broker;
+
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jdo.Extent;
+import javax.jdo.JDOHelper;
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.Transaction;
+import javax.jdo.annotations.IdGeneratorStrategy;
+import javax.jdo.annotations.PersistenceCapable;
+import javax.jdo.annotations.Persistent;
+import javax.jdo.annotations.PrimaryKey;
+import javax.servlet.ServletConfig;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.appengine.api.datastore.Text;
+import com.google.gridworks.appengine.AppEngineClientConnectionManager;
+import com.google.gridworks.broker.GridworksBroker;
+
+public class AppEngineGridworksBrokerImpl extends GridworksBroker {
+
+ protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.appengine");
+
+ PersistenceManagerFactory pmfInstance;
+
+ @Override
+ public void init(ServletConfig config) throws Exception {
+ super.init(config);
+
+ pmfInstance = JDOHelper.getPersistenceManagerFactory("transactional");
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ protected HttpClient getHttpClient() {
+ ClientConnectionManager cm = new AppEngineClientConnectionManager();
+ return new DefaultHttpClient(cm, null);
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ protected void expireLocks(HttpServletResponse response) throws Exception {
+
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ Extent extent = pm.getExtent(Lock.class, false);
+
+ try {
+ for (Lock lock : extent) {
+ if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) {
+ Transaction tx = pm.currentTransaction();
+ try {
+ tx.begin();
+ pm.deletePersistent(lock);
+ tx.commit();
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ }
+ }
+ }
+ } finally {
+ extent.closeAll();
+ }
+
+ respond(response, OK);
+
+ } finally {
+ pm.close();
+ }
+ }
+
+ protected void getState(HttpServletResponse response, String pid, String rev) throws Exception {
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ // FIXME
+ respond(response, lockToJSON(getLock(pm,pid)));
+ } finally {
+ pm.close();
+ }
+ }
+
+ protected void obtainLock(HttpServletResponse response, String pid, String uid, String type) throws Exception {
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ // FIXME (use type)
+
+ try {
+ Lock lock = getLock(pm, pid);
+ if (lock == null) {
+ Transaction tx = pm.currentTransaction();
+
+ try {
+ tx.begin();
+ lock = new Lock(Long.toHexString(tx.hashCode()), pid, uid);
+ pm.makePersistent(lock);
+ tx.commit();
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ }
+ }
+
+ respond(response, lockToJSON(lock));
+
+ } finally {
+ pm.close();
+ }
+ }
+
+ protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception {
+
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ Lock lock = getLock(pm, pid);
+ if (lock != null) {
+ if (!lock.id.equals(lid)) {
+ throw new RuntimeException("Lock id doesn't match, can't release the lock");
+ }
+ if (!lock.uid.equals(uid)) {
+ throw new RuntimeException("User id doesn't match the lock owner, can't release the lock");
+ }
+
+ Transaction tx = pm.currentTransaction();
+
+ try {
+ tx.begin();
+ pm.deletePersistent(lock);
+ tx.commit();
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ }
+ }
+
+ respond(response, OK);
+
+ } finally {
+ pm.close();
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+
+ protected void startProject(HttpServletResponse response, String pid, String uid, String lid, String data) throws Exception {
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ checkLock(pm, pid, uid, lid);
+
+ Project project = getProject(pm, pid);
+
+ if (project != null) {
+ throw new RuntimeException("Project '" + pid + "' already exists");
+ }
+
+ Transaction tx = pm.currentTransaction();
+
+ try {
+ tx.begin();
+ project = new Project(pid, data);
+ pm.makePersistent(project);
+ tx.commit();
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ }
+
+ respond(response, OK);
+ } finally {
+ pm.close();
+ }
+ }
+
+ protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List transformations) throws Exception {
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ checkLock(pm, pid, uid, lid);
+
+ Project project = getProject(pm, pid);
+
+ if (project == null) {
+ throw new RuntimeException("Project '" + pid + "' not found");
+ }
+
+ Transaction tx = pm.currentTransaction();
+
+ try {
+ for (String s : transformations) {
+ project.transformations.add(new Text(s));
+ }
+ tx.commit();
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ }
+
+ respond(response, OK);
+ } finally {
+ pm.close();
+ }
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ protected void openProject(HttpServletResponse response, String pid) throws Exception {
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ Project project = getProject(pm, pid);
+
+ Writer w = response.getWriter();
+ JSONWriter writer = new JSONWriter(w);
+ writer.object();
+ writer.key("data"); writer.value(project.data.toString());
+ writer.key("transformations");
+ writer.array();
+ for (Text s : project.transformations) {
+ writer.value(s.toString());
+ }
+ writer.endArray();
+ writer.endObject();
+ w.flush();
+ w.close();
+ } finally {
+ pm.close();
+ }
+ }
+
+ protected void getHistory(HttpServletResponse response, String pid, int tindex) throws Exception {
+ PersistenceManager pm = pmfInstance.getPersistenceManager();
+
+ try {
+ Project project = getProject(pm, pid);
+
+ Writer w = response.getWriter();
+ JSONWriter writer = new JSONWriter(w);
+ writer.object();
+ writer.key("transformations");
+ writer.array();
+ int size = project.transformations.size();
+ for (int i = tindex; i < size; i++) {
+ writer.value(project.transformations.get(i).toString());
+ }
+ writer.endArray();
+ writer.endObject();
+ w.flush();
+ w.close();
+ } finally {
+ pm.close();
+ }
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ Project getProject(PersistenceManager pm, String pid) {
+ Project project = pm.getObjectById(Project.class, pid);
+ if (project == null) {
+ throw new RuntimeException("Project '" + pid + "' is not managed by this broker");
+ }
+ return project;
+ }
+
+ @PersistenceCapable
+ static class Project {
+
+ @PrimaryKey
+ @Persistent(valueStrategy = IdGeneratorStrategy.IDENTITY)
+ String pid;
+
+ @Persistent
+ List transformations = new ArrayList();
+
+ @Persistent
+ Text data;
+
+ Project(String pid, String data) {
+ this.pid = pid;
+ this.data = new Text(data);
+ }
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ Lock getLock(PersistenceManager pm, String pid) {
+ return pm.getObjectById(Lock.class, pid);
+ }
+
+ void checkLock(PersistenceManager pm, String pid, String uid, String lid) {
+ Lock lock = getLock(pm, pid);
+
+ if (lock == null) {
+ throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it");
+ }
+
+ if (!lock.pid.equals(pid)) {
+ throw new RuntimeException("Lock '" + lid + "' is for another project: " + pid);
+ }
+
+ if (!lock.uid.equals(uid)) {
+ throw new RuntimeException("Lock '" + lid + "' is owned by another user: " + uid);
+ }
+ }
+
+ JSONObject lockToJSON(Lock lock) throws JSONException {
+ JSONObject o = new JSONObject();
+ if (lock != null) {
+ o.put("lock_id", lock.id);
+ o.put("project_id", lock.pid);
+ o.put("user_id", lock.uid);
+ o.put("timestamp", lock.timestamp);
+ }
+ return o;
+ }
+
+ @PersistenceCapable
+ static class Lock {
+
+ @Persistent
+ String id;
+
+ @PrimaryKey
+ @Persistent(valueStrategy = IdGeneratorStrategy.IDENTITY)
+ String pid;
+
+ @Persistent
+ String uid;
+
+ @Persistent
+ long timestamp;
+
+ Lock(String id, String pid, String uid) {
+ this.id = id;
+ this.pid = pid;
+ this.uid = uid;
+ this.timestamp = System.currentTimeMillis();
+ }
+ }
+
+}
diff --git a/broker/appengine/src/com/metaweb/gridworks/appengine/AppEngineClientConnection.java b/broker/appengine/src/com/metaweb/gridworks/appengine/AppEngineClientConnection.java
deleted file mode 100644
index 9f0effa08..000000000
--- a/broker/appengine/src/com/metaweb/gridworks/appengine/AppEngineClientConnection.java
+++ /dev/null
@@ -1,243 +0,0 @@
-package com.metaweb.gridworks.appengine;
-
-import static com.google.appengine.api.urlfetch.FetchOptions.Builder.allowTruncate;
-
-import java.io.ByteArrayOutputStream;
-import java.net.InetAddress;
-import java.net.URL;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLSession;
-
-import org.apache.http.Header;
-import org.apache.http.HttpConnectionMetrics;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpResponse;
-import org.apache.http.ProtocolVersion;
-import org.apache.http.conn.ManagedClientConnection;
-import org.apache.http.conn.routing.HttpRoute;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.message.BasicHttpResponse;
-import org.apache.http.params.HttpParams;
-import org.apache.http.protocol.HttpContext;
-
-import com.google.appengine.api.urlfetch.HTTPHeader;
-import com.google.appengine.api.urlfetch.HTTPMethod;
-import com.google.appengine.api.urlfetch.HTTPRequest;
-import com.google.appengine.api.urlfetch.HTTPResponse;
-import com.google.appengine.api.urlfetch.URLFetchService;
-import com.google.appengine.api.urlfetch.URLFetchServiceFactory;
-
-class AppEngineClientConnection implements ManagedClientConnection {
- // Managed is the composition of ConnectionReleaseTrigger,
- // HttpClientConnection, HttpConnection, HttpInetConnection
-
- private HttpRoute _route;
- private Object _state;
- private boolean _reuseable;
-
- public AppEngineClientConnection(HttpRoute route, Object state) {
- _route = route;
- _state = state;
- }
-
- // ManagedClientConnection methods
-
- public HttpRoute getRoute() {
- return _route;
- }
-
- public Object getState() {
- return _state;
- }
-
- public SSLSession getSSLSession() {
- return null;
- }
-
- public boolean isSecure() {
- // XXX maybe parse the url to see if it's https?
- return false;
- }
-
- public boolean isMarkedReusable() {
- return _reuseable;
- }
-
- public void markReusable() {
- _reuseable = true;
- }
-
- public void layerProtocol(HttpContext context, HttpParams params) {
- return;
- }
-
- public void open(HttpRoute route, HttpContext context, HttpParams params) {
- return;
- }
-
- public void setIdleDuration(long duration, TimeUnit unit) {
- return;
- }
-
- public void setState(Object state) {
- _state = state;
- }
-
- public void tunnelProxy(HttpHost next, boolean secure, HttpParams params) {
- return;
- }
-
- public void tunnelTarget(boolean secure, HttpParams params) {
- return;
- }
-
- public void unmarkReusable() {
- _reuseable = false;
- }
-
-
- // ConnectionReleaseTrigger methods
-
- public void releaseConnection() {
- return;
- }
-
- public void abortConnection() {
- return;
- }
-
- // HttpClientConnection methods
-
- private HTTPRequest _appengine_hrequest;
- private HTTPResponse _appengine_hresponse;
-
- public void flush() {
- return;
- }
-
- public boolean isResponseAvailable(int timeout) {
- // XXX possibly use Async fetcher
- return true;
- }
-
- public void receiveResponseEntity(org.apache.http.HttpResponse apache_response) {
- byte[] data = _appengine_hresponse.getContent();
-
- if (data != null) {
- apache_response.setEntity(new ByteArrayEntity(data));
- }
- }
-
- public HttpResponse receiveResponseHeader() {
- URLFetchService ufs = URLFetchServiceFactory.getURLFetchService();
- try {
- _appengine_hresponse = ufs.fetch(_appengine_hrequest);
- } catch (java.io.IOException e) {
- throw new RuntimeException(e);
- }
-
- org.apache.http.HttpResponse apache_response =
- new BasicHttpResponse(new ProtocolVersion("HTTP", 1, 0),
- _appengine_hresponse.getResponseCode(),
- null);
-
- for (HTTPHeader h : _appengine_hresponse.getHeaders()) {
- apache_response.addHeader(h.getName(), h.getValue());
- }
-
- return apache_response;
- }
-
- public void sendRequestEntity(org.apache.http.HttpEntityEnclosingRequest request) {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
-
- org.apache.http.HttpEntity ent = request.getEntity();
- if (ent != null) {
- try {
- ent.writeTo(os);
- } catch (java.io.IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- _appengine_hrequest.setPayload(os.toByteArray());
- }
-
- public void sendRequestHeader(org.apache.http.HttpRequest apache_request) {
- URL request_url;
-
- HttpHost host = _route.getTargetHost();
-
- String protocol = host.getSchemeName();
- String addr = host.getHostName();
- int port = host.getPort();
-
- String path = apache_request.getRequestLine().getUri();
-
- try {
- request_url = new URL(protocol, addr, port, path);
- } catch (java.net.MalformedURLException e) {
- throw new RuntimeException(e);
- }
-
- HTTPMethod method = HTTPMethod.valueOf(apache_request.getRequestLine().getMethod());
- _appengine_hrequest = new HTTPRequest(request_url, method, allowTruncate()
- .doNotFollowRedirects());
-
- Header[] apache_headers = apache_request.getAllHeaders();
- for (int i = 0; i < apache_headers.length; i++) {
- Header h = apache_headers[i];
- _appengine_hrequest
- .setHeader(new HTTPHeader(h.getName(), h.getValue()));
- }
- }
-
- // HttpConnection methods
-
- public void close() {
- return;
- }
-
- public HttpConnectionMetrics getMetrics() {
- return null;
- }
-
- public int getSocketTimeout() {
- return -1;
- }
-
- public boolean isOpen() {
- return true;
- }
-
- public boolean isStale() {
- return false;
- }
-
- public void setSocketTimeout(int timeout) {
- return;
- }
-
- public void shutdown() {
- return;
- }
-
- // HttpInetConnection methods
-
- public InetAddress getLocalAddress() {
- return null;
- }
-
- public int getLocalPort() {
- return -1;
- }
-
- public InetAddress getRemoteAddress() {
- return null;
- }
-
- public int getRemotePort() {
- return -1;
- }
-}
\ No newline at end of file
diff --git a/broker/appengine/src/com/metaweb/gridworks/appengine/AppEngineClientConnectionManager.java b/broker/appengine/src/com/metaweb/gridworks/appengine/AppEngineClientConnectionManager.java
deleted file mode 100644
index c0a9f7225..000000000
--- a/broker/appengine/src/com/metaweb/gridworks/appengine/AppEngineClientConnectionManager.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.metaweb.gridworks.appengine;
-
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.ClientConnectionRequest;
-import org.apache.http.conn.ManagedClientConnection;
-import org.apache.http.conn.routing.HttpRoute;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.scheme.SchemeRegistry;
-import org.apache.http.conn.scheme.SocketFactory;
-import org.apache.http.params.HttpParams;
-
-public class AppEngineClientConnectionManager implements ClientConnectionManager {
-
- private SchemeRegistry schemes;
-
- class NoopSocketFactory implements SocketFactory {
- public Socket connectSocket(Socket sock, String host, int port, InetAddress addr, int lport, HttpParams params) {
- return null;
- }
-
- public Socket createSocket() {
- return null;
- }
-
- public boolean isSecure(Socket sock) {
- return false;
- }
- }
-
- public AppEngineClientConnectionManager() {
- SocketFactory noop_sf = new NoopSocketFactory();
- schemes = new SchemeRegistry();
- schemes.register(new Scheme("http", noop_sf, 80));
- schemes.register(new Scheme("https", noop_sf, 443));
- }
-
- public void closeExpiredConnections() {
- return;
- }
-
- public void closeIdleConnections(long idletime, TimeUnit tunit) {
- return;
- }
-
- public ManagedClientConnection getConnection(HttpRoute route, Object state) {
- return new AppEngineClientConnection(route, state);
- }
-
- public SchemeRegistry getSchemeRegistry() {
- return schemes;
- }
-
- public void releaseConnection(ManagedClientConnection conn, long valid, TimeUnit tuint) {
- return;
- }
-
- public ClientConnectionRequest requestConnection(final HttpRoute route, final Object state) {
- return new ClientConnectionRequest() {
- public void abortRequest() {
- return;
- }
-
- public ManagedClientConnection getConnection(long idletime, TimeUnit tunit) {
- return AppEngineClientConnectionManager.this.getConnection(route, state);
- }
- };
- }
-
- public void shutdown() {
- return;
- }
-}
diff --git a/broker/appengine/src/com/metaweb/gridworks/broker/AppEngineGridworksBrokerImpl.java b/broker/appengine/src/com/metaweb/gridworks/broker/AppEngineGridworksBrokerImpl.java
deleted file mode 100644
index 6998776e3..000000000
--- a/broker/appengine/src/com/metaweb/gridworks/broker/AppEngineGridworksBrokerImpl.java
+++ /dev/null
@@ -1,364 +0,0 @@
-package com.metaweb.gridworks.broker;
-
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jdo.Extent;
-import javax.jdo.JDOHelper;
-import javax.jdo.PersistenceManager;
-import javax.jdo.PersistenceManagerFactory;
-import javax.jdo.Transaction;
-import javax.jdo.annotations.IdGeneratorStrategy;
-import javax.jdo.annotations.PersistenceCapable;
-import javax.jdo.annotations.Persistent;
-import javax.jdo.annotations.PrimaryKey;
-import javax.servlet.ServletConfig;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.http.client.HttpClient;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.appengine.api.datastore.Text;
-import com.metaweb.gridworks.appengine.AppEngineClientConnectionManager;
-
-public class AppEngineGridworksBrokerImpl extends GridworksBroker {
-
- protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.appengine");
-
- PersistenceManagerFactory pmfInstance;
-
- @Override
- public void init(ServletConfig config) throws Exception {
- super.init(config);
-
- pmfInstance = JDOHelper.getPersistenceManagerFactory("transactional");
- }
-
- @Override
- public void destroy() throws Exception {
- }
-
- // ---------------------------------------------------------------------------------
-
- protected HttpClient getHttpClient() {
- ClientConnectionManager cm = new AppEngineClientConnectionManager();
- return new DefaultHttpClient(cm, null);
- }
-
- // ---------------------------------------------------------------------------------
-
- protected void expireLocks(HttpServletResponse response) throws Exception {
-
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- Extent extent = pm.getExtent(Lock.class, false);
-
- try {
- for (Lock lock : extent) {
- if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) {
- Transaction tx = pm.currentTransaction();
- try {
- tx.begin();
- pm.deletePersistent(lock);
- tx.commit();
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- }
- }
- }
- } finally {
- extent.closeAll();
- }
-
- respond(response, OK);
-
- } finally {
- pm.close();
- }
- }
-
- protected void getState(HttpServletResponse response, String pid, String rev) throws Exception {
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- // FIXME
- respond(response, lockToJSON(getLock(pm,pid)));
- } finally {
- pm.close();
- }
- }
-
- protected void obtainLock(HttpServletResponse response, String pid, String uid, String type) throws Exception {
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- // FIXME (use type)
-
- try {
- Lock lock = getLock(pm, pid);
- if (lock == null) {
- Transaction tx = pm.currentTransaction();
-
- try {
- tx.begin();
- lock = new Lock(Long.toHexString(tx.hashCode()), pid, uid);
- pm.makePersistent(lock);
- tx.commit();
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- }
- }
-
- respond(response, lockToJSON(lock));
-
- } finally {
- pm.close();
- }
- }
-
- protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception {
-
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- Lock lock = getLock(pm, pid);
- if (lock != null) {
- if (!lock.id.equals(lid)) {
- throw new RuntimeException("Lock id doesn't match, can't release the lock");
- }
- if (!lock.uid.equals(uid)) {
- throw new RuntimeException("User id doesn't match the lock owner, can't release the lock");
- }
-
- Transaction tx = pm.currentTransaction();
-
- try {
- tx.begin();
- pm.deletePersistent(lock);
- tx.commit();
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- }
- }
-
- respond(response, OK);
-
- } finally {
- pm.close();
- }
- }
-
- // ----------------------------------------------------------------------------------------------------
-
- protected void startProject(HttpServletResponse response, String pid, String uid, String lid, String data) throws Exception {
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- checkLock(pm, pid, uid, lid);
-
- Project project = getProject(pm, pid);
-
- if (project != null) {
- throw new RuntimeException("Project '" + pid + "' already exists");
- }
-
- Transaction tx = pm.currentTransaction();
-
- try {
- tx.begin();
- project = new Project(pid, data);
- pm.makePersistent(project);
- tx.commit();
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- }
-
- respond(response, OK);
- } finally {
- pm.close();
- }
- }
-
- protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List transformations) throws Exception {
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- checkLock(pm, pid, uid, lid);
-
- Project project = getProject(pm, pid);
-
- if (project == null) {
- throw new RuntimeException("Project '" + pid + "' not found");
- }
-
- Transaction tx = pm.currentTransaction();
-
- try {
- for (String s : transformations) {
- project.transformations.add(new Text(s));
- }
- tx.commit();
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- }
-
- respond(response, OK);
- } finally {
- pm.close();
- }
- }
-
- // ---------------------------------------------------------------------------------
-
- protected void openProject(HttpServletResponse response, String pid) throws Exception {
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- Project project = getProject(pm, pid);
-
- Writer w = response.getWriter();
- JSONWriter writer = new JSONWriter(w);
- writer.object();
- writer.key("data"); writer.value(project.data.toString());
- writer.key("transformations");
- writer.array();
- for (Text s : project.transformations) {
- writer.value(s.toString());
- }
- writer.endArray();
- writer.endObject();
- w.flush();
- w.close();
- } finally {
- pm.close();
- }
- }
-
- protected void getHistory(HttpServletResponse response, String pid, int tindex) throws Exception {
- PersistenceManager pm = pmfInstance.getPersistenceManager();
-
- try {
- Project project = getProject(pm, pid);
-
- Writer w = response.getWriter();
- JSONWriter writer = new JSONWriter(w);
- writer.object();
- writer.key("transformations");
- writer.array();
- int size = project.transformations.size();
- for (int i = tindex; i < size; i++) {
- writer.value(project.transformations.get(i).toString());
- }
- writer.endArray();
- writer.endObject();
- w.flush();
- w.close();
- } finally {
- pm.close();
- }
- }
-
- // ---------------------------------------------------------------------------------
-
- Project getProject(PersistenceManager pm, String pid) {
- Project project = pm.getObjectById(Project.class, pid);
- if (project == null) {
- throw new RuntimeException("Project '" + pid + "' is not managed by this broker");
- }
- return project;
- }
-
- @PersistenceCapable
- static class Project {
-
- @PrimaryKey
- @Persistent(valueStrategy = IdGeneratorStrategy.IDENTITY)
- String pid;
-
- @Persistent
- List transformations = new ArrayList();
-
- @Persistent
- Text data;
-
- Project(String pid, String data) {
- this.pid = pid;
- this.data = new Text(data);
- }
- }
-
- // ---------------------------------------------------------------------------------
-
- Lock getLock(PersistenceManager pm, String pid) {
- return pm.getObjectById(Lock.class, pid);
- }
-
- void checkLock(PersistenceManager pm, String pid, String uid, String lid) {
- Lock lock = getLock(pm, pid);
-
- if (lock == null) {
- throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it");
- }
-
- if (!lock.pid.equals(pid)) {
- throw new RuntimeException("Lock '" + lid + "' is for another project: " + pid);
- }
-
- if (!lock.uid.equals(uid)) {
- throw new RuntimeException("Lock '" + lid + "' is owned by another user: " + uid);
- }
- }
-
- JSONObject lockToJSON(Lock lock) throws JSONException {
- JSONObject o = new JSONObject();
- if (lock != null) {
- o.put("lock_id", lock.id);
- o.put("project_id", lock.pid);
- o.put("user_id", lock.uid);
- o.put("timestamp", lock.timestamp);
- }
- return o;
- }
-
- @PersistenceCapable
- static class Lock {
-
- @Persistent
- String id;
-
- @PrimaryKey
- @Persistent(valueStrategy = IdGeneratorStrategy.IDENTITY)
- String pid;
-
- @Persistent
- String uid;
-
- @Persistent
- long timestamp;
-
- Lock(String id, String pid, String uid) {
- this.id = id;
- this.pid = pid;
- this.uid = uid;
- this.timestamp = System.currentTimeMillis();
- }
- }
-
-}
diff --git a/broker/core/.classpath b/broker/core/.classpath
index d24e060ab..a1243f2cc 100644
--- a/broker/core/.classpath
+++ b/broker/core/.classpath
@@ -14,10 +14,10 @@
-
+
diff --git a/broker/core/IDEs/eclipse/GridworksBrokerTests.launch b/broker/core/IDEs/eclipse/GridworksBrokerTests.launch
index 7a221be40..077db83a3 100644
--- a/broker/core/IDEs/eclipse/GridworksBrokerTests.launch
+++ b/broker/core/IDEs/eclipse/GridworksBrokerTests.launch
@@ -7,10 +7,10 @@
-
+
-
+
diff --git a/broker/core/module/MOD-INF/module.properties b/broker/core/module/MOD-INF/module.properties
index 0ae311031..a4778f16d 100644
--- a/broker/core/module/MOD-INF/module.properties
+++ b/broker/core/module/MOD-INF/module.properties
@@ -1,4 +1,4 @@
name = broker
description = Local Gridworks Broker
-module-impl = com.metaweb.gridworks.broker.GridworksBrokerImpl
+module-impl = com.google.gridworks.broker.GridworksBrokerImpl
templating = false
diff --git a/broker/core/src/com/google/gridworks/broker/GridworksBroker.java b/broker/core/src/com/google/gridworks/broker/GridworksBroker.java
new file mode 100644
index 000000000..f70f3a2f2
--- /dev/null
+++ b/broker/core/src/com/google/gridworks/broker/GridworksBroker.java
@@ -0,0 +1,311 @@
+
+package com.google.gridworks.broker;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.NameValuePair;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.params.CoreProtocolPNames;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.mit.simile.butterfly.ButterflyModuleImpl;
+
+/**
+ * This class contains all the code shared by various implementations of a Gridworks Broker.
+ *
+ * A broker is a server used by multiple Gridworks installations to enable collaborative
+ * development over the same project.
+ *
+ * Broker implementations differ in how they store their state but all of them are required
+ * to extend this abstract class and implement the services that are called via HTTP.
+ *
+ */
+public abstract class GridworksBroker extends ButterflyModuleImpl {
+
+ static final public String GET_STATE = "get_state";
+ static final public String EXPIRE = "expire";
+ static final public String OBTAIN_LOCK = "obtain_lock";
+ static final public String RELEASE_LOCK = "release_lock";
+ static final public String TRANSFORM = "transform";
+ static final public String START = "start";
+ static final public String OPEN = "open";
+
+ static final public int ALL = 0;
+ static final public int COL = 1;
+ static final public int CELL = 2;
+
+ static final protected Logger logger = LoggerFactory.getLogger("gridworks.broker");
+
+ static final protected String USER_INFO_URL = "http://www.freebase.com/api/service/user_info";
+ static final protected String DELEGATED_OAUTH_HEADER = "X-Freebase-Credentials";
+ static final protected String OAUTH_HEADER = "Authorization";
+
+ static protected String OK;
+
+ static {
+ try {
+ JSONObject o = new JSONObject();
+ o.put("status","ok");
+ OK = o.toString();
+ } catch (JSONException e) {
+ // not going to happen;
+ }
+ }
+
+ static public final long LOCK_DURATION = 60 * 1000; // 1 minute
+ static public final long USER_DURATION = 5 * 60 * 1000; // 1 minute
+ static public final long LOCK_EXPIRATION_CHECK_DELAY = 5 * 1000; // 5 seconds
+
+ protected HttpClient httpclient;
+
+ protected boolean developmentMode;
+
+ @Override
+ public void init(ServletConfig config) throws Exception {
+ super.init(config);
+ httpclient = getHttpClient();
+ developmentMode = Boolean.parseBoolean(config.getInitParameter("gridworks.development"));
+ if (developmentMode) logger.warn("Running in development mode");
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ httpclient.getConnectionManager().shutdown();
+ }
+
+ @Override
+ public boolean process(String path, HttpServletRequest request, HttpServletResponse response) throws Exception {
+ if (logger.isDebugEnabled()) {
+ logger.debug("> process '{}'", path);
+ } else {
+ logger.info("process '{}'", path);
+ }
+
+ try {
+
+ if (GET_STATE.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ getState(response, getParameter(request, "pid"), getUserId(request), getInteger(request, "rev"));
+ } else if (EXPIRE.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ expire(response);
+ } else if (OBTAIN_LOCK.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ obtainLock(response, getParameter(request, "pid"), getUserId(request), getInteger(request, "locktype"), getParameter(request, "lockvalue"));
+ } else if (RELEASE_LOCK.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ releaseLock(response, getParameter(request, "pid"), getUserId(request), getParameter(request, "lock"));
+ } else if (TRANSFORM.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ addTransformations(response, getParameter(request, "pid"), getUserId(request), getParameter(request, "lock"), getList(request, "transformations"));
+ } else if (START.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ startProject(response, getParameter(request, "pid"), getUserId(request), getParameter(request, "lock"), getData(request), getParameter(request, "metadata"), getList(request, "transformations"));
+ } else if (OPEN.equals(path)) {
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Type", "application/json");
+ openProject(response, getParameter(request, "pid"));
+ } else {
+ boolean value = super.process(path, request, response);
+ if (logger.isDebugEnabled()) logger.debug("< process '{}'", path);
+ return value;
+ }
+
+ } catch (RuntimeException e) {
+ logger.error("runtime error", e.getMessage());
+ respondError(response, e.getMessage());
+ } catch (Exception e) {
+ logger.error("internal error", e);
+ respondException(response, e);
+ }
+
+ if (logger.isDebugEnabled()) logger.debug("< process '{}'", path);
+
+ return true;
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ protected abstract HttpClient getHttpClient();
+
+ protected abstract void expire(HttpServletResponse response) throws Exception;
+
+ protected abstract void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception;
+
+ protected abstract void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception;
+
+ protected abstract void releaseLock(HttpServletResponse response, String pid, String uid, String lock) throws Exception;
+
+ protected abstract void startProject(HttpServletResponse response, String pid, String uid, String lock, byte[] data, String metadata, List transformations) throws Exception;
+
+ protected abstract void addTransformations(HttpServletResponse response, String pid, String uid, String lock, List transformations) throws Exception;
+
+ protected abstract void openProject(HttpServletResponse response, String pid) throws Exception;
+
+ // ----------------------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ protected String getUserId(HttpServletRequest request) throws Exception {
+
+ // This is useful for testing
+ if (developmentMode) {
+ return getParameter(request, "uid");
+ }
+
+ String oauth = request.getHeader(DELEGATED_OAUTH_HEADER);
+ if (oauth == null) {
+ throw new RuntimeException("The request needs to contain the '" + DELEGATED_OAUTH_HEADER + "' header set to obtain user identity via Freebase.");
+ }
+
+ List formparams = new ArrayList();
+ Map params = (Map) request.getParameterMap();
+ for (Entry e : params.entrySet()) {
+ formparams.add(new BasicNameValuePair((String) e.getKey(), (String) e.getValue()));
+ }
+ UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8");
+
+ HttpPost httpRequest = new HttpPost(USER_INFO_URL);
+ httpRequest.setHeader(OAUTH_HEADER, oauth);
+ httpRequest.getParams().setParameter(CoreProtocolPNames.USER_AGENT, "Gridworks Broker");
+ httpRequest.setEntity(entity);
+
+ ResponseHandler responseHandler = new BasicResponseHandler();
+ String responseBody = httpclient.execute(httpRequest, responseHandler);
+ JSONObject o = new JSONObject(responseBody);
+
+ return o.getString("username");
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ static protected String getParameter(HttpServletRequest request, String name) throws ServletException {
+ String param = request.getParameter(name);
+ if (param == null) {
+ throw new RuntimeException("request must come with a '" + name + "' parameter");
+ }
+ return param;
+ }
+
+ static protected List getList(HttpServletRequest request, String name) throws ServletException, JSONException {
+ String param = getParameter(request, name);
+ JSONArray a = new JSONArray(param);
+ List result = new ArrayList(a.length());
+ for (int i = 0; i < a.length(); i++) {
+ result.add(a.getString(i));
+ }
+ return result;
+ }
+
+ static protected int getInteger(HttpServletRequest request, String name) throws ServletException, JSONException {
+ return Integer.parseInt(getParameter(request, name));
+ }
+
+ static protected byte[] getData(HttpServletRequest request) throws ServletException, IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ InputStream input = request.getInputStream();
+ byte[] buffer = new byte[4096];
+ int count = 0;
+ int n = 0;
+ while (-1 != (n = input.read(buffer))) {
+ output.write(buffer, 0, n);
+ count += n;
+ }
+ return output.toByteArray();
+
+ }
+
+ static protected void respondError(HttpServletResponse response, String error) throws IOException, ServletException {
+
+ if (response == null) {
+ throw new ServletException("Response object can't be null");
+ }
+
+ try {
+ JSONObject o = new JSONObject();
+ o.put("status", "error");
+ o.put("message", error);
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ respond(response, o.toString());
+ } catch (JSONException e) {
+ e.printStackTrace(response.getWriter());
+ }
+ }
+
+ static protected void respondException(HttpServletResponse response, Exception e) throws IOException, ServletException {
+
+ if (response == null) {
+ throw new ServletException("Response object can't be null");
+ }
+
+ try {
+ JSONObject o = new JSONObject();
+ o.put("status", "error");
+ o.put("message", e.getMessage());
+
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+
+ o.put("stack", sw.toString());
+
+ response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ respond(response, o.toString());
+ } catch (JSONException e1) {
+ e.printStackTrace(response.getWriter());
+ }
+ }
+
+ static protected void respond(HttpServletResponse response, JSONObject content) throws IOException, ServletException {
+ if (content == null) {
+ throw new ServletException("Content object can't be null");
+ }
+
+ respond(response, content.toString());
+ }
+
+ static protected void respond(HttpServletResponse response, String content) throws IOException, ServletException {
+ if (response == null) {
+ throw new ServletException("Response object can't be null");
+ }
+
+ Writer w = response.getWriter();
+ if (w != null) {
+ w.write(content);
+ w.flush();
+ w.close();
+ } else {
+ throw new ServletException("response returned a null writer");
+ }
+ }
+}
diff --git a/broker/core/src/com/google/gridworks/broker/GridworksBrokerImpl.java b/broker/core/src/com/google/gridworks/broker/GridworksBrokerImpl.java
new file mode 100644
index 000000000..a952ccb4e
--- /dev/null
+++ b/broker/core/src/com/google/gridworks/broker/GridworksBrokerImpl.java
@@ -0,0 +1,592 @@
+package com.google.gridworks.broker;
+
+import static com.sleepycat.persist.model.Relationship.MANY_TO_ONE;
+
+import java.io.File;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.persist.EntityCursor;
+import com.sleepycat.persist.EntityStore;
+import com.sleepycat.persist.PrimaryIndex;
+import com.sleepycat.persist.SecondaryIndex;
+import com.sleepycat.persist.StoreConfig;
+import com.sleepycat.persist.model.Entity;
+import com.sleepycat.persist.model.PrimaryKey;
+import com.sleepycat.persist.model.SecondaryKey;
+
+public class GridworksBrokerImpl extends GridworksBroker {
+
+ protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.local");
+
+ Environment env;
+
+ EntityStore projectStore;
+ EntityStore lockStore;
+ EntityStore userStore;
+
+ PrimaryIndex projectById;
+ PrimaryIndex lockById;
+
+ SecondaryIndex locksByProject;
+
+ Timer timer;
+ Expirer expirer;
+
+ @Override
+ public void init(ServletConfig config) throws Exception {
+ logger.trace("> init");
+ super.init(config);
+
+ timer = new Timer();
+ expirer = new Expirer();
+ timer.schedule(expirer, 0, LOCK_EXPIRATION_CHECK_DELAY);
+
+ String dataDir = config.getInitParameter("gridworks.data");
+ if (dataDir == null) dataDir = "data";
+ File dataPath = new File(dataDir);
+ if (!dataPath.exists()) dataPath.mkdirs();
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ env = new Environment(dataPath, envConfig);
+
+ StoreConfig storeConfig = new StoreConfig();
+ storeConfig.setAllowCreate(true);
+ storeConfig.setTransactional(true);
+ projectStore = new EntityStore(env, "ProjectsStore", storeConfig);
+ lockStore = new EntityStore(env, "LockStore", storeConfig);
+
+ projectById = projectStore.getPrimaryIndex(String.class, Project.class);
+ lockById = lockStore.getPrimaryIndex(String.class, Lock.class);
+
+ locksByProject = lockStore.getSecondaryIndex(lockById, String.class, "pid");
+ logger.trace("< init");
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ logger.trace("> destroy");
+ super.destroy();
+
+ if (projectStore != null) {
+ projectStore.close();
+ projectById = null;
+ }
+
+ if (lockStore != null) {
+ lockStore.close();
+ lockById = null;
+ }
+
+ if (timer != null) {
+ timer.cancel();
+ timer.purge();
+ timer = null;
+ }
+
+ if (env != null) {
+ env.close();
+ env = null;
+ }
+ logger.trace("< destroy");
+ }
+
+ class Expirer extends TimerTask {
+ public void run() {
+ if (lockById != null) {
+ logger.trace("> expire");
+ Transaction txn = env.beginTransaction(null, null);
+ try {
+ EntityCursor cursor = lockById.entities();
+ try {
+ for (Lock lock : cursor) {
+ if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) {
+ logger.trace("Found expired lock {}", lock.id);
+ try {
+ releaseLock(null, lock.pid, lock.uid, lock.id);
+ } catch (Exception e) {
+ logger.error("Exception while expiring lock for project '" + lock.pid + "'", e);
+ }
+ }
+ }
+ } finally {
+ cursor.close();
+ }
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+ logger.trace("< expire");
+ }
+ }
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ @Override
+ protected HttpClient getHttpClient() {
+ return new DefaultHttpClient();
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ @Override
+ protected void expire(HttpServletResponse response) throws Exception {
+ expirer.run();
+ respond(response, OK);
+ }
+
+ @Override
+ protected void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception {
+ logger.trace("> obtain lock");
+ Lock lock = null;
+ Lock blocker = null;
+
+ Transaction txn = env.beginTransaction(null, null);
+
+ try {
+
+ EntityCursor cursor = locksByProject.subIndex(pid).entities();
+
+ /*
+ * ALL
+ * blocked -> somebody else's lock
+ * reuse -> you already have an ALL lock
+ * new -> else
+ *
+ * COL
+ * blocked -> somebody else's all lock || a lock on the same col
+ * reuse -> you have an ALL lock || a lock on the same col
+ * new -> else
+ *
+ * CELL
+ * blocked -> somebody else's all lock || a lock on the same col || a lock on the same cell
+ * reuse -> you have a lock on the same cell
+ * yes -> (you have a lock on the same cell) && (nobody else has a lock on the same cell || the same col || all)
+ * new -> else
+ *
+ */
+
+ try {
+ if (locktype == ALL) {
+ if (lockvalue.length() > 0) {
+ throw new RuntimeException("Hmm, seems like you're calling an ALL with a specific value, are you sure you didn't want another type of lock?");
+ }
+
+ for (Lock l : cursor) {
+ if (!l.uid.equals(uid)) {
+ blocker = l;
+ break;
+ } else {
+ if (l.type == ALL) {
+ lock = l;
+ break;
+ }
+ }
+ }
+ } else if (locktype == COL) {
+ if (lockvalue.indexOf(',') > -1) {
+ throw new RuntimeException("Hmm, seems like you're calling a COL lock with a CELL value");
+ }
+
+ for (Lock l : cursor) {
+ if (!l.uid.equals(uid)) {
+ if (l.type == ALL ||
+ (l.type == COL && l.value.equals(lockvalue)) ||
+ (l.type == CELL && l.value.split(",")[0].equals(lockvalue))) {
+ blocker = l;
+ break;
+ }
+ } else {
+ if (l.type == ALL ||
+ (l.type == COL && l.value.equals(lockvalue))) {
+ lock = l;
+ break;
+ }
+ }
+ }
+ } else if (locktype == CELL) {
+ if (lockvalue.indexOf(',') == -1) {
+ throw new RuntimeException("Hmm, seems like you're calling a CELL lock without specifying row and column: format must be 'row,column'");
+ }
+
+ for (Lock l : cursor) {
+ if (!l.uid.equals(uid)) {
+ if (l.type == ALL ||
+ (l.type == COL && l.value.equals(lockvalue.split(",")[0])) ||
+ (l.type == CELL && l.value.equals(lockvalue))) {
+ blocker = l;
+ break;
+ }
+ } else {
+ if (l.type == ALL ||
+ (l.type == COL && l.value.equals(lockvalue.split(",")[0])) ||
+ (l.type == CELL && l.value.equals(lockvalue))) {
+ lock = l;
+ break;
+ }
+ }
+ }
+ }
+ } finally {
+ cursor.close();
+ }
+
+ if (blocker != null) {
+ logger.info("found a blocking lock {}", lockToString(blocker));
+ throw new RuntimeException("Can't obtain lock, it is blocked by a type '" + blocker.type + "' lock owned by '" + blocker.uid + "'");
+ }
+
+ if (lock == null) {
+ logger.info("no comparable lock already exists, creating a new one");
+ lock = new Lock(Long.toHexString(txn.getId()), pid, uid, locktype, lockvalue);
+ lockById.put(txn, lock);
+ txn.commit();
+ }
+
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+
+ JSONObject o = lockToJSON(lock, uid);
+ o.put("status", "ok");
+ respond(response, o);
+
+ logger.trace("< obtain lock");
+ }
+
+ @Override
+ protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception {
+
+ Transaction txn = env.beginTransaction(null, null);
+
+ try {
+ Lock lock = getLock(lid, pid, uid);
+ if (lock != null) {
+ if (!lock.uid.equals(uid)) {
+ throw new RuntimeException("User id doesn't match the lock owner, can't release the lock");
+ }
+ lockById.delete(lid);
+ txn.commit();
+ }
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+
+ if (response != null) { // this because the expiration thread can call this method without a real response
+ respond(response, OK);
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------------
+
+ @Override
+ protected void startProject(HttpServletResponse response, String pid, String uid, String lid, byte[] data, String metadata, List transformations) throws Exception {
+
+ Transaction txn = env.beginTransaction(null, null);
+
+ try {
+ if (projectById.contains(pid)) {
+ throw new RuntimeException("Project '" + pid + "' already exists");
+ }
+
+ Lock lock = getLock(lid, pid, uid);
+
+ if (lock.type != ALL) {
+ throw new RuntimeException("The lock you have is not enough to start a project");
+ }
+
+ projectById.put(txn, new Project(pid, data, metadata, transformations));
+ txn.commit();
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+
+ respond(response, OK);
+ }
+
+ @Override
+ protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List transformations) throws Exception {
+
+ Transaction txn = env.beginTransaction(null, null);
+
+ try {
+ Project project = getProject(pid);
+
+ if (project == null) {
+ throw new RuntimeException("Project '" + pid + "' not found");
+ }
+
+ Lock lock = getLock(lid, pid, uid);
+
+ logger.info("obtained lock: {}", lockToString(lock));
+
+ if (lock.type == ALL) {
+ project.transformations.addAll(transformations);
+ } else {
+ for (String s : transformations) {
+ JSONObject o = new JSONObject(s);
+
+ int type = o.getInt("op_type");
+ String value = o.getString("op_value");
+ if (lock.type == COL) {
+ if (type == COL) {
+ if (value != null && value.equals(lock.value)) {
+ project.transformations.add(s);
+ } else {
+ throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify column '" + value + "'.");
+ }
+ } else if (type == CELL) {
+ String column = value.split(",")[0];
+ if (column != null && column.equals(lock.value)) {
+ project.transformations.add(s);
+ } else {
+ throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify cell '" + value + "' in another column.");
+ }
+ }
+ } else if (lock.type == CELL) {
+ if (type == COL) {
+ throw new RuntimeException("Can't apply '" + s + "': you offered a lock for a single cell and you're attempting an operation for the entire column.");
+ } else if (type == CELL) {
+ if (value != null && value.equals(lock.value)) {
+ project.transformations.add(s);
+ } else {
+ throw new RuntimeException("Can't apply '" + s + "': you have a lock for cell '" + lock.value + "' and you're attempting to modify cell '" + value + "'.");
+ }
+ }
+ }
+ }
+ }
+
+ projectById.put(txn, project);
+
+ txn.commit();
+ } finally {
+ if (txn != null) {
+ txn.abort();
+ txn = null;
+ }
+ }
+
+ respond(response, OK);
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ @Override
+ protected void openProject(HttpServletResponse response, String pid) throws Exception {
+ Project project = getProject(pid);
+
+ Writer w = response.getWriter();
+ JSONWriter writer = new JSONWriter(w);
+ writer.object();
+ writer.key("status"); writer.value("ok");
+ writer.key("data"); writer.value(project.data);
+ writer.key("metadata"); writer.value(new JSONObject(project.metadata));
+ writer.key("transformations");
+ writer.array();
+ for (String s : project.transformations) {
+ writer.value(new JSONObject(s));
+ }
+ writer.endArray();
+ writer.endObject();
+ w.flush();
+ w.close();
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ @Override
+ protected void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception {
+
+ Project project = getProject(pid);
+
+ Writer w = response.getWriter();
+ JSONWriter writer = new JSONWriter(w);
+
+ writer.object();
+ writer.key("status"); writer.value("ok");
+ writer.key("transformations");
+ writer.array();
+ int size = project.transformations.size();
+ for (int i = rev; i < size; i++) {
+ writer.value(new JSONObject(project.transformations.get(i)));
+ }
+ writer.endArray();
+
+ EntityCursor cursor = locksByProject.subIndex(pid).entities();
+
+ try {
+ writer.key("locks");
+ writer.array();
+ for (Lock lock : cursor) {
+ writer.value(lockToJSON(lock, uid));
+ }
+ writer.endArray();
+ writer.endObject();
+
+ w.flush();
+ w.close();
+ } finally {
+ cursor.close();
+ }
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ Project getProject(String pid) {
+ Project project = projectById.get(pid);
+ if (project == null) {
+ throw new RuntimeException("Project '" + pid + "' could not be found: are you sure is not managed by another broker?");
+ }
+ return project;
+ }
+
+ @Entity
+ static class Project {
+
+ @PrimaryKey
+ String pid;
+
+ List transformations;
+
+ byte[] data;
+
+ String metadata;
+
+ int rev;
+
+ Project(String pid, byte[] data, String metadata, List transformations) {
+ this.pid = pid;
+ this.data = data;
+ this.metadata = metadata;
+ this.transformations = (transformations != null) ? transformations : new ArrayList();
+ this.rev = this.transformations.size();
+ }
+
+ @SuppressWarnings("unused")
+ private Project() {}
+ }
+
+ // ---------------------------------------------------------------------------------
+
+ Lock getLock(String lid, String pid, String uid) {
+ Lock lock = lockById.get(lid);
+ checkLock(lock, lid, pid, uid);
+ return lock;
+ }
+
+ void checkLock(Lock lock, String lid, String pid, String uid) {
+ if (lock == null) {
+ throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it");
+ }
+
+ if (!lock.pid.equals(pid)) {
+ throw new RuntimeException("Lock '" + lock.id + "' is for another project: " + lock.pid);
+ }
+
+ if (!lock.uid.equals(uid)) {
+ throw new RuntimeException("Lock '" + lock.id + "' is owned by another user: " + lock.uid);
+ }
+ }
+
+ Lock getLock(String pid, String uid, int locktype) {
+ Lock lock = null;
+ EntityCursor cursor = locksByProject.subIndex(pid).entities();
+
+ try {
+ for (Lock l : cursor) {
+ if (uid.equals(l.uid) && (locktype == l.type)) {
+ lock = l;
+ break;
+ }
+ }
+ } finally {
+ cursor.close();
+ }
+
+ return lock;
+ }
+
+ JSONObject lockToJSON(Lock lock, String uid) throws JSONException {
+ JSONObject o = new JSONObject();
+ if (lock != null) {
+ // NOTE: only the owner of the lock should get the ID,
+ // otherwise others can just fake ownership of other people's locks
+ if (lock.uid.equals(uid)) {
+ o.put("lock", lock.id);
+ }
+
+ o.put("pid", lock.pid);
+ o.put("uid", lock.uid);
+ o.put("type", lock.type);
+ o.put("value", lock.value);
+ o.put("timestamp", lock.timestamp);
+ }
+ return o;
+ }
+
+ String lockToString(Lock lock) {
+ return lock.id + "," + lock.pid + "," + lock.uid + "," + lock.type + "," + lock.value;
+ }
+
+ @Entity
+ static class Lock {
+
+ @PrimaryKey
+ String id;
+
+ @SecondaryKey(relate=MANY_TO_ONE)
+ String pid;
+
+ String uid;
+
+ int type;
+
+ String value;
+
+ long timestamp;
+
+ Lock(String id, String pid, String uid, int type, String value) {
+ this.id = id;
+ this.pid = pid;
+ this.uid = uid;
+ this.type = type;
+ this.value = value;
+ this.timestamp = System.currentTimeMillis();
+ }
+
+ @SuppressWarnings("unused")
+ private Lock() {}
+ }
+}
diff --git a/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java b/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java
deleted file mode 100644
index f1415c7c3..000000000
--- a/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java
+++ /dev/null
@@ -1,311 +0,0 @@
-
-package com.metaweb.gridworks.broker;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.http.NameValuePair;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.params.CoreProtocolPNames;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edu.mit.simile.butterfly.ButterflyModuleImpl;
-
-/**
- * This class contains all the code shared by various implementations of a Gridworks Broker.
- *
- * A broker is a server used by multiple Gridworks installations to enable collaborative
- * development over the same project.
- *
- * Broker implementations differ in how they store their state but all of them are required
- * to extend this abstract class and implement the services that are called via HTTP.
- *
- */
-public abstract class GridworksBroker extends ButterflyModuleImpl {
-
- static final public String GET_STATE = "get_state";
- static final public String EXPIRE = "expire";
- static final public String OBTAIN_LOCK = "obtain_lock";
- static final public String RELEASE_LOCK = "release_lock";
- static final public String TRANSFORM = "transform";
- static final public String START = "start";
- static final public String OPEN = "open";
-
- static final public int ALL = 0;
- static final public int COL = 1;
- static final public int CELL = 2;
-
- static final protected Logger logger = LoggerFactory.getLogger("gridworks.broker");
-
- static final protected String USER_INFO_URL = "http://www.freebase.com/api/service/user_info";
- static final protected String DELEGATED_OAUTH_HEADER = "X-Freebase-Credentials";
- static final protected String OAUTH_HEADER = "Authorization";
-
- static protected String OK;
-
- static {
- try {
- JSONObject o = new JSONObject();
- o.put("status","ok");
- OK = o.toString();
- } catch (JSONException e) {
- // not going to happen;
- }
- }
-
- static public final long LOCK_DURATION = 60 * 1000; // 1 minute
- static public final long USER_DURATION = 5 * 60 * 1000; // 1 minute
- static public final long LOCK_EXPIRATION_CHECK_DELAY = 5 * 1000; // 5 seconds
-
- protected HttpClient httpclient;
-
- protected boolean developmentMode;
-
- @Override
- public void init(ServletConfig config) throws Exception {
- super.init(config);
- httpclient = getHttpClient();
- developmentMode = Boolean.parseBoolean(config.getInitParameter("gridworks.development"));
- if (developmentMode) logger.warn("Running in development mode");
- }
-
- @Override
- public void destroy() throws Exception {
- httpclient.getConnectionManager().shutdown();
- }
-
- @Override
- public boolean process(String path, HttpServletRequest request, HttpServletResponse response) throws Exception {
- if (logger.isDebugEnabled()) {
- logger.debug("> process '{}'", path);
- } else {
- logger.info("process '{}'", path);
- }
-
- try {
-
- if (GET_STATE.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- getState(response, getParameter(request, "pid"), getUserId(request), getInteger(request, "rev"));
- } else if (EXPIRE.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- expire(response);
- } else if (OBTAIN_LOCK.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- obtainLock(response, getParameter(request, "pid"), getUserId(request), getInteger(request, "locktype"), getParameter(request, "lockvalue"));
- } else if (RELEASE_LOCK.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- releaseLock(response, getParameter(request, "pid"), getUserId(request), getParameter(request, "lock"));
- } else if (TRANSFORM.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- addTransformations(response, getParameter(request, "pid"), getUserId(request), getParameter(request, "lock"), getList(request, "transformations"));
- } else if (START.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- startProject(response, getParameter(request, "pid"), getUserId(request), getParameter(request, "lock"), getData(request), getParameter(request, "metadata"), getList(request, "transformations"));
- } else if (OPEN.equals(path)) {
- response.setCharacterEncoding("UTF-8");
- response.setHeader("Content-Type", "application/json");
- openProject(response, getParameter(request, "pid"));
- } else {
- boolean value = super.process(path, request, response);
- if (logger.isDebugEnabled()) logger.debug("< process '{}'", path);
- return value;
- }
-
- } catch (RuntimeException e) {
- logger.error("runtime error", e.getMessage());
- respondError(response, e.getMessage());
- } catch (Exception e) {
- logger.error("internal error", e);
- respondException(response, e);
- }
-
- if (logger.isDebugEnabled()) logger.debug("< process '{}'", path);
-
- return true;
- }
-
- // ----------------------------------------------------------------------------------------
-
- protected abstract HttpClient getHttpClient();
-
- protected abstract void expire(HttpServletResponse response) throws Exception;
-
- protected abstract void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception;
-
- protected abstract void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception;
-
- protected abstract void releaseLock(HttpServletResponse response, String pid, String uid, String lock) throws Exception;
-
- protected abstract void startProject(HttpServletResponse response, String pid, String uid, String lock, byte[] data, String metadata, List transformations) throws Exception;
-
- protected abstract void addTransformations(HttpServletResponse response, String pid, String uid, String lock, List transformations) throws Exception;
-
- protected abstract void openProject(HttpServletResponse response, String pid) throws Exception;
-
- // ----------------------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- protected String getUserId(HttpServletRequest request) throws Exception {
-
- // This is useful for testing
- if (developmentMode) {
- return getParameter(request, "uid");
- }
-
- String oauth = request.getHeader(DELEGATED_OAUTH_HEADER);
- if (oauth == null) {
- throw new RuntimeException("The request needs to contain the '" + DELEGATED_OAUTH_HEADER + "' header set to obtain user identity via Freebase.");
- }
-
- List formparams = new ArrayList();
- Map params = (Map) request.getParameterMap();
- for (Entry e : params.entrySet()) {
- formparams.add(new BasicNameValuePair((String) e.getKey(), (String) e.getValue()));
- }
- UrlEncodedFormEntity entity = new UrlEncodedFormEntity(formparams, "UTF-8");
-
- HttpPost httpRequest = new HttpPost(USER_INFO_URL);
- httpRequest.setHeader(OAUTH_HEADER, oauth);
- httpRequest.getParams().setParameter(CoreProtocolPNames.USER_AGENT, "Gridworks Broker");
- httpRequest.setEntity(entity);
-
- ResponseHandler responseHandler = new BasicResponseHandler();
- String responseBody = httpclient.execute(httpRequest, responseHandler);
- JSONObject o = new JSONObject(responseBody);
-
- return o.getString("username");
- }
-
- // ----------------------------------------------------------------------------------------
-
- static protected String getParameter(HttpServletRequest request, String name) throws ServletException {
- String param = request.getParameter(name);
- if (param == null) {
- throw new RuntimeException("request must come with a '" + name + "' parameter");
- }
- return param;
- }
-
- static protected List getList(HttpServletRequest request, String name) throws ServletException, JSONException {
- String param = getParameter(request, name);
- JSONArray a = new JSONArray(param);
- List result = new ArrayList(a.length());
- for (int i = 0; i < a.length(); i++) {
- result.add(a.getString(i));
- }
- return result;
- }
-
- static protected int getInteger(HttpServletRequest request, String name) throws ServletException, JSONException {
- return Integer.parseInt(getParameter(request, name));
- }
-
- static protected byte[] getData(HttpServletRequest request) throws ServletException, IOException {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- InputStream input = request.getInputStream();
- byte[] buffer = new byte[4096];
- int count = 0;
- int n = 0;
- while (-1 != (n = input.read(buffer))) {
- output.write(buffer, 0, n);
- count += n;
- }
- return output.toByteArray();
-
- }
-
- static protected void respondError(HttpServletResponse response, String error) throws IOException, ServletException {
-
- if (response == null) {
- throw new ServletException("Response object can't be null");
- }
-
- try {
- JSONObject o = new JSONObject();
- o.put("status", "error");
- o.put("message", error);
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- respond(response, o.toString());
- } catch (JSONException e) {
- e.printStackTrace(response.getWriter());
- }
- }
-
- static protected void respondException(HttpServletResponse response, Exception e) throws IOException, ServletException {
-
- if (response == null) {
- throw new ServletException("Response object can't be null");
- }
-
- try {
- JSONObject o = new JSONObject();
- o.put("status", "error");
- o.put("message", e.getMessage());
-
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- pw.flush();
- sw.flush();
-
- o.put("stack", sw.toString());
-
- response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
- respond(response, o.toString());
- } catch (JSONException e1) {
- e.printStackTrace(response.getWriter());
- }
- }
-
- static protected void respond(HttpServletResponse response, JSONObject content) throws IOException, ServletException {
- if (content == null) {
- throw new ServletException("Content object can't be null");
- }
-
- respond(response, content.toString());
- }
-
- static protected void respond(HttpServletResponse response, String content) throws IOException, ServletException {
- if (response == null) {
- throw new ServletException("Response object can't be null");
- }
-
- Writer w = response.getWriter();
- if (w != null) {
- w.write(content);
- w.flush();
- w.close();
- } else {
- throw new ServletException("response returned a null writer");
- }
- }
-}
diff --git a/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java b/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java
deleted file mode 100644
index bde761ba4..000000000
--- a/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java
+++ /dev/null
@@ -1,592 +0,0 @@
-package com.metaweb.gridworks.broker;
-
-import static com.sleepycat.persist.model.Relationship.MANY_TO_ONE;
-
-import java.io.File;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.http.client.HttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.Transaction;
-import com.sleepycat.persist.EntityCursor;
-import com.sleepycat.persist.EntityStore;
-import com.sleepycat.persist.PrimaryIndex;
-import com.sleepycat.persist.SecondaryIndex;
-import com.sleepycat.persist.StoreConfig;
-import com.sleepycat.persist.model.Entity;
-import com.sleepycat.persist.model.PrimaryKey;
-import com.sleepycat.persist.model.SecondaryKey;
-
-public class GridworksBrokerImpl extends GridworksBroker {
-
- protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.local");
-
- Environment env;
-
- EntityStore projectStore;
- EntityStore lockStore;
- EntityStore userStore;
-
- PrimaryIndex projectById;
- PrimaryIndex lockById;
-
- SecondaryIndex locksByProject;
-
- Timer timer;
- Expirer expirer;
-
- @Override
- public void init(ServletConfig config) throws Exception {
- logger.trace("> init");
- super.init(config);
-
- timer = new Timer();
- expirer = new Expirer();
- timer.schedule(expirer, 0, LOCK_EXPIRATION_CHECK_DELAY);
-
- String dataDir = config.getInitParameter("gridworks.data");
- if (dataDir == null) dataDir = "data";
- File dataPath = new File(dataDir);
- if (!dataPath.exists()) dataPath.mkdirs();
-
- EnvironmentConfig envConfig = new EnvironmentConfig();
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- env = new Environment(dataPath, envConfig);
-
- StoreConfig storeConfig = new StoreConfig();
- storeConfig.setAllowCreate(true);
- storeConfig.setTransactional(true);
- projectStore = new EntityStore(env, "ProjectsStore", storeConfig);
- lockStore = new EntityStore(env, "LockStore", storeConfig);
-
- projectById = projectStore.getPrimaryIndex(String.class, Project.class);
- lockById = lockStore.getPrimaryIndex(String.class, Lock.class);
-
- locksByProject = lockStore.getSecondaryIndex(lockById, String.class, "pid");
- logger.trace("< init");
- }
-
- @Override
- public void destroy() throws Exception {
- logger.trace("> destroy");
- super.destroy();
-
- if (projectStore != null) {
- projectStore.close();
- projectById = null;
- }
-
- if (lockStore != null) {
- lockStore.close();
- lockById = null;
- }
-
- if (timer != null) {
- timer.cancel();
- timer.purge();
- timer = null;
- }
-
- if (env != null) {
- env.close();
- env = null;
- }
- logger.trace("< destroy");
- }
-
- class Expirer extends TimerTask {
- public void run() {
- if (lockById != null) {
- logger.trace("> expire");
- Transaction txn = env.beginTransaction(null, null);
- try {
- EntityCursor cursor = lockById.entities();
- try {
- for (Lock lock : cursor) {
- if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) {
- logger.trace("Found expired lock {}", lock.id);
- try {
- releaseLock(null, lock.pid, lock.uid, lock.id);
- } catch (Exception e) {
- logger.error("Exception while expiring lock for project '" + lock.pid + "'", e);
- }
- }
- }
- } finally {
- cursor.close();
- }
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
- logger.trace("< expire");
- }
- }
- }
-
- // ---------------------------------------------------------------------------------
-
- @Override
- protected HttpClient getHttpClient() {
- return new DefaultHttpClient();
- }
-
- // ---------------------------------------------------------------------------------
-
- @Override
- protected void expire(HttpServletResponse response) throws Exception {
- expirer.run();
- respond(response, OK);
- }
-
- @Override
- protected void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception {
- logger.trace("> obtain lock");
- Lock lock = null;
- Lock blocker = null;
-
- Transaction txn = env.beginTransaction(null, null);
-
- try {
-
- EntityCursor cursor = locksByProject.subIndex(pid).entities();
-
- /*
- * ALL
- * blocked -> somebody else's lock
- * reuse -> you already have an ALL lock
- * new -> else
- *
- * COL
- * blocked -> somebody else's all lock || a lock on the same col
- * reuse -> you have an ALL lock || a lock on the same col
- * new -> else
- *
- * CELL
- * blocked -> somebody else's all lock || a lock on the same col || a lock on the same cell
- * reuse -> you have a lock on the same cell
- * yes -> (you have a lock on the same cell) && (nobody else has a lock on the same cell || the same col || all)
- * new -> else
- *
- */
-
- try {
- if (locktype == ALL) {
- if (lockvalue.length() > 0) {
- throw new RuntimeException("Hmm, seems like you're calling an ALL with a specific value, are you sure you didn't want another type of lock?");
- }
-
- for (Lock l : cursor) {
- if (!l.uid.equals(uid)) {
- blocker = l;
- break;
- } else {
- if (l.type == ALL) {
- lock = l;
- break;
- }
- }
- }
- } else if (locktype == COL) {
- if (lockvalue.indexOf(',') > -1) {
- throw new RuntimeException("Hmm, seems like you're calling a COL lock with a CELL value");
- }
-
- for (Lock l : cursor) {
- if (!l.uid.equals(uid)) {
- if (l.type == ALL ||
- (l.type == COL && l.value.equals(lockvalue)) ||
- (l.type == CELL && l.value.split(",")[0].equals(lockvalue))) {
- blocker = l;
- break;
- }
- } else {
- if (l.type == ALL ||
- (l.type == COL && l.value.equals(lockvalue))) {
- lock = l;
- break;
- }
- }
- }
- } else if (locktype == CELL) {
- if (lockvalue.indexOf(',') == -1) {
- throw new RuntimeException("Hmm, seems like you're calling a CELL lock without specifying row and column: format must be 'row,column'");
- }
-
- for (Lock l : cursor) {
- if (!l.uid.equals(uid)) {
- if (l.type == ALL ||
- (l.type == COL && l.value.equals(lockvalue.split(",")[0])) ||
- (l.type == CELL && l.value.equals(lockvalue))) {
- blocker = l;
- break;
- }
- } else {
- if (l.type == ALL ||
- (l.type == COL && l.value.equals(lockvalue.split(",")[0])) ||
- (l.type == CELL && l.value.equals(lockvalue))) {
- lock = l;
- break;
- }
- }
- }
- }
- } finally {
- cursor.close();
- }
-
- if (blocker != null) {
- logger.info("found a blocking lock {}", lockToString(blocker));
- throw new RuntimeException("Can't obtain lock, it is blocked by a type '" + blocker.type + "' lock owned by '" + blocker.uid + "'");
- }
-
- if (lock == null) {
- logger.info("no comparable lock already exists, creating a new one");
- lock = new Lock(Long.toHexString(txn.getId()), pid, uid, locktype, lockvalue);
- lockById.put(txn, lock);
- txn.commit();
- }
-
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
-
- JSONObject o = lockToJSON(lock, uid);
- o.put("status", "ok");
- respond(response, o);
-
- logger.trace("< obtain lock");
- }
-
- @Override
- protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception {
-
- Transaction txn = env.beginTransaction(null, null);
-
- try {
- Lock lock = getLock(lid, pid, uid);
- if (lock != null) {
- if (!lock.uid.equals(uid)) {
- throw new RuntimeException("User id doesn't match the lock owner, can't release the lock");
- }
- lockById.delete(lid);
- txn.commit();
- }
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
-
- if (response != null) { // this because the expiration thread can call this method without a real response
- respond(response, OK);
- }
- }
-
- // ----------------------------------------------------------------------------------------------------
-
- @Override
- protected void startProject(HttpServletResponse response, String pid, String uid, String lid, byte[] data, String metadata, List transformations) throws Exception {
-
- Transaction txn = env.beginTransaction(null, null);
-
- try {
- if (projectById.contains(pid)) {
- throw new RuntimeException("Project '" + pid + "' already exists");
- }
-
- Lock lock = getLock(lid, pid, uid);
-
- if (lock.type != ALL) {
- throw new RuntimeException("The lock you have is not enough to start a project");
- }
-
- projectById.put(txn, new Project(pid, data, metadata, transformations));
- txn.commit();
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
-
- respond(response, OK);
- }
-
- @Override
- protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List transformations) throws Exception {
-
- Transaction txn = env.beginTransaction(null, null);
-
- try {
- Project project = getProject(pid);
-
- if (project == null) {
- throw new RuntimeException("Project '" + pid + "' not found");
- }
-
- Lock lock = getLock(lid, pid, uid);
-
- logger.info("obtained lock: {}", lockToString(lock));
-
- if (lock.type == ALL) {
- project.transformations.addAll(transformations);
- } else {
- for (String s : transformations) {
- JSONObject o = new JSONObject(s);
-
- int type = o.getInt("op_type");
- String value = o.getString("op_value");
- if (lock.type == COL) {
- if (type == COL) {
- if (value != null && value.equals(lock.value)) {
- project.transformations.add(s);
- } else {
- throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify column '" + value + "'.");
- }
- } else if (type == CELL) {
- String column = value.split(",")[0];
- if (column != null && column.equals(lock.value)) {
- project.transformations.add(s);
- } else {
- throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify cell '" + value + "' in another column.");
- }
- }
- } else if (lock.type == CELL) {
- if (type == COL) {
- throw new RuntimeException("Can't apply '" + s + "': you offered a lock for a single cell and you're attempting an operation for the entire column.");
- } else if (type == CELL) {
- if (value != null && value.equals(lock.value)) {
- project.transformations.add(s);
- } else {
- throw new RuntimeException("Can't apply '" + s + "': you have a lock for cell '" + lock.value + "' and you're attempting to modify cell '" + value + "'.");
- }
- }
- }
- }
- }
-
- projectById.put(txn, project);
-
- txn.commit();
- } finally {
- if (txn != null) {
- txn.abort();
- txn = null;
- }
- }
-
- respond(response, OK);
- }
-
- // ---------------------------------------------------------------------------------
-
- @Override
- protected void openProject(HttpServletResponse response, String pid) throws Exception {
- Project project = getProject(pid);
-
- Writer w = response.getWriter();
- JSONWriter writer = new JSONWriter(w);
- writer.object();
- writer.key("status"); writer.value("ok");
- writer.key("data"); writer.value(project.data);
- writer.key("metadata"); writer.value(new JSONObject(project.metadata));
- writer.key("transformations");
- writer.array();
- for (String s : project.transformations) {
- writer.value(new JSONObject(s));
- }
- writer.endArray();
- writer.endObject();
- w.flush();
- w.close();
- }
-
- // ---------------------------------------------------------------------------------
-
- @Override
- protected void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception {
-
- Project project = getProject(pid);
-
- Writer w = response.getWriter();
- JSONWriter writer = new JSONWriter(w);
-
- writer.object();
- writer.key("status"); writer.value("ok");
- writer.key("transformations");
- writer.array();
- int size = project.transformations.size();
- for (int i = rev; i < size; i++) {
- writer.value(new JSONObject(project.transformations.get(i)));
- }
- writer.endArray();
-
- EntityCursor cursor = locksByProject.subIndex(pid).entities();
-
- try {
- writer.key("locks");
- writer.array();
- for (Lock lock : cursor) {
- writer.value(lockToJSON(lock, uid));
- }
- writer.endArray();
- writer.endObject();
-
- w.flush();
- w.close();
- } finally {
- cursor.close();
- }
- }
-
- // ---------------------------------------------------------------------------------
-
- Project getProject(String pid) {
- Project project = projectById.get(pid);
- if (project == null) {
- throw new RuntimeException("Project '" + pid + "' could not be found: are you sure is not managed by another broker?");
- }
- return project;
- }
-
- @Entity
- static class Project {
-
- @PrimaryKey
- String pid;
-
- List transformations;
-
- byte[] data;
-
- String metadata;
-
- int rev;
-
- Project(String pid, byte[] data, String metadata, List transformations) {
- this.pid = pid;
- this.data = data;
- this.metadata = metadata;
- this.transformations = (transformations != null) ? transformations : new ArrayList();
- this.rev = this.transformations.size();
- }
-
- @SuppressWarnings("unused")
- private Project() {}
- }
-
- // ---------------------------------------------------------------------------------
-
- Lock getLock(String lid, String pid, String uid) {
- Lock lock = lockById.get(lid);
- checkLock(lock, lid, pid, uid);
- return lock;
- }
-
- void checkLock(Lock lock, String lid, String pid, String uid) {
- if (lock == null) {
- throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it");
- }
-
- if (!lock.pid.equals(pid)) {
- throw new RuntimeException("Lock '" + lock.id + "' is for another project: " + lock.pid);
- }
-
- if (!lock.uid.equals(uid)) {
- throw new RuntimeException("Lock '" + lock.id + "' is owned by another user: " + lock.uid);
- }
- }
-
- Lock getLock(String pid, String uid, int locktype) {
- Lock lock = null;
- EntityCursor cursor = locksByProject.subIndex(pid).entities();
-
- try {
- for (Lock l : cursor) {
- if (uid.equals(l.uid) && (locktype == l.type)) {
- lock = l;
- break;
- }
- }
- } finally {
- cursor.close();
- }
-
- return lock;
- }
-
- JSONObject lockToJSON(Lock lock, String uid) throws JSONException {
- JSONObject o = new JSONObject();
- if (lock != null) {
- // NOTE: only the owner of the lock should get the ID,
- // otherwise others can just fake ownership of other people's locks
- if (lock.uid.equals(uid)) {
- o.put("lock", lock.id);
- }
-
- o.put("pid", lock.pid);
- o.put("uid", lock.uid);
- o.put("type", lock.type);
- o.put("value", lock.value);
- o.put("timestamp", lock.timestamp);
- }
- return o;
- }
-
- String lockToString(Lock lock) {
- return lock.id + "," + lock.pid + "," + lock.uid + "," + lock.type + "," + lock.value;
- }
-
- @Entity
- static class Lock {
-
- @PrimaryKey
- String id;
-
- @SecondaryKey(relate=MANY_TO_ONE)
- String pid;
-
- String uid;
-
- int type;
-
- String value;
-
- long timestamp;
-
- Lock(String id, String pid, String uid, int type, String value) {
- this.id = id;
- this.pid = pid;
- this.uid = uid;
- this.type = type;
- this.value = value;
- this.timestamp = System.currentTimeMillis();
- }
-
- @SuppressWarnings("unused")
- private Lock() {}
- }
-}
diff --git a/broker/core/tests/conf/tests.xml b/broker/core/tests/conf/tests.xml
index afdb1333c..2876b8cbe 100644
--- a/broker/core/tests/conf/tests.xml
+++ b/broker/core/tests/conf/tests.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/broker/core/tests/src/com/google/gridworks/broker/tests/GridworksBrokerTests.java b/broker/core/tests/src/com/google/gridworks/broker/tests/GridworksBrokerTests.java
new file mode 100644
index 000000000..f1a8a6e61
--- /dev/null
+++ b/broker/core/tests/src/com/google/gridworks/broker/tests/GridworksBrokerTests.java
@@ -0,0 +1,452 @@
+package com.google.gridworks.broker.tests;
+
+import static com.google.gridworks.broker.GridworksBroker.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import com.google.gridworks.broker.GridworksBroker;
+import com.google.gridworks.broker.GridworksBrokerImpl;
+
+public class GridworksBrokerTests {
+
+ Logger logger;
+ File data;
+
+ @BeforeSuite public void suite_init() {
+ System.setProperty("log4j.configuration", "tests.log4j.properties");
+ data = new File("data");
+ if (!data.exists()) data.mkdirs();
+ }
+
+ @AfterSuite public void suite_destroy() {
+ for (File f : data.listFiles()) {
+ f.delete();
+ }
+ data.delete();
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ ServletConfig config = null;
+ GridworksBroker broker = null;
+
+ @BeforeTest public void test_init() throws Exception {
+ logger = LoggerFactory.getLogger(this.getClass());
+ config = mock(ServletConfig.class);
+ when(config.getInitParameter("gridworks.data")).thenReturn(data.getAbsolutePath());
+ when(config.getInitParameter("gridworks.development")).thenReturn("true");
+
+ broker = new GridworksBrokerImpl();
+ broker.init(config);
+ }
+
+ @AfterTest public void test_destroy() throws Exception {
+ broker.destroy();
+ broker = null;
+ config = null;
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ HttpServletRequest request = null;
+ HttpServletResponse response = null;
+ StringWriter writer = null;
+
+ @BeforeMethod public void setup() throws Exception {
+ request = mock(HttpServletRequest.class);
+ response = mock(HttpServletResponse.class);
+ }
+
+ @AfterMethod public void teardown() throws Exception {
+ response = null;
+ request = null;
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ @Test public void testLifeCycle() {
+ Assert.assertTrue(true);
+ }
+
+ @Test public void testService() {
+ try {
+ success(broker, request, response, EXPIRE);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testObtainLockFailure() {
+ try {
+ failure(broker, request, response, OBTAIN_LOCK);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testReleaseLockFailure() {
+ try {
+ failure(broker, request, response, RELEASE_LOCK);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testGetStateFailure() {
+ try {
+ failure(broker, request, response, GET_STATE, "pid", "project1934983948", "uid", "testuser", "rev", "0");
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testBrokenAllLockFailure() {
+ try {
+ failure(broker, request, response, OBTAIN_LOCK, "pid", "project", "uid", "testuser", "locktype", Integer.toString(ALL), "lockvalue", "1");
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testBrokenColLockFailure() {
+ try {
+ failure(broker, request, response, OBTAIN_LOCK, "pid", "project", "uid", "testuser", "locktype", Integer.toString(COL), "lockvalue", "1,1");
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testBrokenCellLockFailure() {
+ try {
+ failure(broker, request, response, OBTAIN_LOCK, "pid", "project", "uid", "testuser", "locktype", Integer.toString(CELL), "lockvalue", "1");
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testLockSimple() {
+ String project = "proj0";
+ String user = "testuser";
+
+ try {
+ logger.info("--- obtain ALL lock on project ---");
+ JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
+ assertJSON(result, "uid", "testuser");
+ String lock = result.getString("lock");
+
+ logger.info("--- obtain ALL lock on project ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+
+ logger.info("--- obtain COL lock on project ---");
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "1");
+ assertJSON(result, "uid", "testuser");
+ lock = result.getString("lock");
+
+ logger.info("--- release COL lock on project ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+
+ logger.info("--- obtain CELL lock on project ---");
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
+ assertJSON(result, "uid", "testuser");
+ lock = result.getString("lock");
+
+ logger.info("--- release CELL lock on project ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testLocksAllBlocks() {
+ String project = "proj1";
+ String user = "testuser";
+ String user2 = "testuser2";
+
+ try {
+ logger.info("--- obtain ALL lock on project ---");
+ JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
+ assertJSON(result, "uid", user);
+ String lock = result.getString("lock");
+
+ logger.info("--- another using asking for any lock will fail ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "1");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
+
+ logger.info("--- same user asking for lower capable locks will return the ALL one ---");
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "1");
+ String lock2 = result.getString("lock");
+ Assert.assertEquals(lock, lock2);
+
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
+ lock2 = result.getString("lock");
+ Assert.assertEquals(lock, lock2);
+
+ logger.info("--- release the ALL lock ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testLocksColBlocks() {
+ String project = "proj2";
+ String user = "testuser";
+ String user2 = "testuser2";
+
+ try {
+ logger.info("--- obtain COL lock on project ---");
+ JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "1");
+ String lock = result.getString("lock");
+
+ logger.info("--- other user must fail to obtain lock on the same COL or ALL ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "1");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
+
+ logger.info("--- but succeed in getting a COL lock on another column or cell ---");
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "2");
+ String lock2 = result.getString("lock");
+
+ logger.info("--- now it's our first user's turn to fail to get lock ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "2");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "2,1");
+
+ logger.info("--- release the locks ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user2, "lock", lock2);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testLocksCellBlocks() {
+ String project = "proj3";
+ String user = "testuser";
+ String user2 = "testuser2";
+
+ try {
+ logger.info("--- obtain CELL lock on project ---");
+ JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
+ String lock = result.getString("lock");
+
+ logger.info("--- other user must fail to obtain lock on the same CELL, COL or ALL ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "1");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
+
+ logger.info("--- but succeed in getting a CELL lock on a cell in another column ---");
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "2,1");
+ String lock2 = result.getString("lock");
+
+ logger.info("--- now it's our first user's turn to fail to get lock ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "2");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "2,1");
+
+ logger.info("--- release the locks ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user2, "lock", lock2);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test public void testCompleteProjectLifeCycle() {
+ try {
+ String project = "proj4";
+ String user = "testuser";
+ String user2 = "testuser2";
+ String data = "blah";
+ String metadata = "{}";
+ String transformations = "[]";
+ String rev = "0";
+
+ logger.info("--- obtain ALL lock on project ---");
+ JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
+ assertJSON(result, "uid", user);
+ String lock = result.getString("lock");
+
+ logger.info("--- start project ---");
+ success(broker, request, response, START, "pid", project, "uid", user, "lock", lock, "data", data, "metadata", metadata, "transformations", transformations);
+
+ logger.info("--- verify project state contains lock ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", rev);
+ JSONArray locks = result.getJSONArray("locks");
+ Assert.assertEquals(locks.length(), 1);
+ JSONObject l = locks.getJSONObject(0);
+ assertJSON(l, "uid", "testuser");
+ Assert.assertEquals(l.getInt("type"), ALL);
+
+ logger.info("--- release ALL lock on project ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
+
+ logger.info("--- verify no locks are present ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", rev);
+ locks = result.getJSONArray("locks");
+ Assert.assertEquals(locks.length(), 0);
+
+ logger.info("--- open project and verify data was loaded correctly ---");
+ result = success(broker, request, response, OPEN, "pid", project, "uid", user, "rev", rev);
+ JSONArray result_data = result.getJSONArray("data");
+ Assert.assertEquals(result_data.length(),data.getBytes("UTF-8").length);
+
+ JSONArray tt;
+ JSONObject t;
+
+ logger.info("--- obtain column lock ---");
+ String column = "1";
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", column);
+ String col_lock = result.getString("lock");
+
+ logger.info("--- perform column transformation ---");
+ t = new JSONObject();
+ t.put("op_type", COL);
+ t.put("op_value", column); // operate on col 1
+ t.put("value", new JSONObject());
+ tt = new JSONArray();
+ tt.put(t);
+ result = success(broker, request, response, TRANSFORM, "pid", project, "uid", user, "lock", col_lock, "transformations", tt.toString());
+
+ logger.info("--- make sure transformation was recorded properly ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "0");
+ tt = result.getJSONArray("transformations");
+ Assert.assertEquals(tt.length(), 1);
+ t = tt.getJSONObject(0);
+ assertJSON(t, "op_value", column);
+
+ logger.info("--- make sure revision numbers in state management work as expected ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "1");
+ tt = result.getJSONArray("transformations");
+ Assert.assertEquals(tt.length(), 0);
+
+ logger.info("--- perform cell transformation ---");
+ String cell = "1";
+ t = new JSONObject();
+ t.put("op_type", CELL);
+ t.put("op_value", column + "," + cell); // operate on cell at row 1 column 1
+ t.put("value", new JSONObject());
+ tt = new JSONArray();
+ tt.put(t);
+ result = success(broker, request, response, TRANSFORM, "pid", project, "uid", user, "lock", col_lock, "transformations", tt.toString());
+
+ logger.info("--- make sure transformation was recorded properly ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "0");
+ tt = result.getJSONArray("transformations");
+ Assert.assertEquals(tt.length(), 2);
+
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "1");
+ tt = result.getJSONArray("transformations");
+ Assert.assertEquals(tt.length(), 1);
+ t = tt.getJSONObject(0);
+ assertJSON(t, "op_value", column + "," + cell);
+
+ logger.info("--- make sure another user fails to acquire ALL lock ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
+
+ logger.info("--- make sure another user fails to acquire COL lock on the same column ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", column);
+
+ logger.info("--- make sure another user manages to acquire COL lock on another column ---");
+ String column2 = "2";
+ result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", column2);
+ String col_lock2 = result.getString("lock");
+
+ logger.info("--- make sure that both locks are present ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "2");
+ locks = result.getJSONArray("locks");
+ Assert.assertEquals(locks.length(), 2);
+
+ logger.info("--- make sure we can't escalate our current COL lock to an ALL lock ---");
+ failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
+
+ logger.info("--- release column locks ---");
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", col_lock);
+ success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user2, "lock", col_lock2);
+
+ logger.info("--- make sure the project has no locks ---");
+ result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "2");
+ locks = result.getJSONArray("locks");
+ Assert.assertEquals(locks.length(), 0);
+
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ private void assertJSON(JSONObject o, String name, String value) throws JSONException {
+ Assert.assertEquals(o.get(name), value);
+ }
+
+ private JSONObject success(GridworksBroker broker, HttpServletRequest request, HttpServletResponse response, String service, String... params) throws Exception {
+ return call(true, broker, request, response, service, params);
+ }
+
+ private JSONObject failure(GridworksBroker broker, HttpServletRequest request, HttpServletResponse response, String service, String... params) throws Exception {
+ return call(false, broker, request, response, service, params);
+ }
+
+ private JSONObject call(boolean successful, GridworksBroker broker, HttpServletRequest request, HttpServletResponse response, String service, String... params) throws Exception {
+ if (params != null) {
+ for (int i = 0; i < params.length; ) {
+ String name = params[i++];
+ String value = params[i++];
+ if ("data".equals(name)) {
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(value.getBytes("UTF-8"));
+ when(request.getInputStream()).thenReturn(new ServletInputStream() {
+ public int read() throws IOException {
+ return inputStream.read();
+ }
+ });
+ } else {
+ when(request.getParameter(name)).thenReturn(value);
+ }
+ }
+ }
+
+ StringWriter writer = new StringWriter();
+ when(response.getWriter()).thenReturn(new PrintWriter(writer));
+
+ broker.process(service, request, response);
+
+ JSONObject result = new JSONObject(writer.toString());
+
+ if (successful) {
+ assertJSON(result, "status", "ok");
+ } else {
+ assertJSON(result, "status", "error");
+ }
+
+ logger.info(result.toString());
+
+ return result;
+ }
+}
diff --git a/broker/core/tests/src/com/metaweb/gridworks/broker/tests/GridworksBrokerTests.java b/broker/core/tests/src/com/metaweb/gridworks/broker/tests/GridworksBrokerTests.java
deleted file mode 100644
index ea76a6251..000000000
--- a/broker/core/tests/src/com/metaweb/gridworks/broker/tests/GridworksBrokerTests.java
+++ /dev/null
@@ -1,458 +0,0 @@
-package com.metaweb.gridworks.broker.tests;
-
-import static com.metaweb.gridworks.broker.GridworksBroker.*;
-import static com.metaweb.gridworks.broker.GridworksBroker.EXPIRE;
-import static com.metaweb.gridworks.broker.GridworksBroker.GET_STATE;
-import static com.metaweb.gridworks.broker.GridworksBroker.OBTAIN_LOCK;
-import static com.metaweb.gridworks.broker.GridworksBroker.RELEASE_LOCK;
-import static com.metaweb.gridworks.broker.GridworksBroker.START;
-import static com.metaweb.gridworks.broker.GridworksBroker.OPEN;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-
-import com.metaweb.gridworks.broker.GridworksBroker;
-import com.metaweb.gridworks.broker.GridworksBrokerImpl;
-
-public class GridworksBrokerTests {
-
- Logger logger;
- File data;
-
- @BeforeSuite public void suite_init() {
- System.setProperty("log4j.configuration", "tests.log4j.properties");
- data = new File("data");
- if (!data.exists()) data.mkdirs();
- }
-
- @AfterSuite public void suite_destroy() {
- for (File f : data.listFiles()) {
- f.delete();
- }
- data.delete();
- }
-
- // ------------------------------------------------------------------------------------
-
- ServletConfig config = null;
- GridworksBroker broker = null;
-
- @BeforeTest public void test_init() throws Exception {
- logger = LoggerFactory.getLogger(this.getClass());
- config = mock(ServletConfig.class);
- when(config.getInitParameter("gridworks.data")).thenReturn(data.getAbsolutePath());
- when(config.getInitParameter("gridworks.development")).thenReturn("true");
-
- broker = new GridworksBrokerImpl();
- broker.init(config);
- }
-
- @AfterTest public void test_destroy() throws Exception {
- broker.destroy();
- broker = null;
- config = null;
- }
-
- // ------------------------------------------------------------------------------------
-
- HttpServletRequest request = null;
- HttpServletResponse response = null;
- StringWriter writer = null;
-
- @BeforeMethod public void setup() throws Exception {
- request = mock(HttpServletRequest.class);
- response = mock(HttpServletResponse.class);
- }
-
- @AfterMethod public void teardown() throws Exception {
- response = null;
- request = null;
- }
-
- // ------------------------------------------------------------------------------------
-
- @Test public void testLifeCycle() {
- Assert.assertTrue(true);
- }
-
- @Test public void testService() {
- try {
- success(broker, request, response, EXPIRE);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testObtainLockFailure() {
- try {
- failure(broker, request, response, OBTAIN_LOCK);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testReleaseLockFailure() {
- try {
- failure(broker, request, response, RELEASE_LOCK);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testGetStateFailure() {
- try {
- failure(broker, request, response, GET_STATE, "pid", "project1934983948", "uid", "testuser", "rev", "0");
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testBrokenAllLockFailure() {
- try {
- failure(broker, request, response, OBTAIN_LOCK, "pid", "project", "uid", "testuser", "locktype", Integer.toString(ALL), "lockvalue", "1");
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testBrokenColLockFailure() {
- try {
- failure(broker, request, response, OBTAIN_LOCK, "pid", "project", "uid", "testuser", "locktype", Integer.toString(COL), "lockvalue", "1,1");
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testBrokenCellLockFailure() {
- try {
- failure(broker, request, response, OBTAIN_LOCK, "pid", "project", "uid", "testuser", "locktype", Integer.toString(CELL), "lockvalue", "1");
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testLockSimple() {
- String project = "proj0";
- String user = "testuser";
-
- try {
- logger.info("--- obtain ALL lock on project ---");
- JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
- assertJSON(result, "uid", "testuser");
- String lock = result.getString("lock");
-
- logger.info("--- obtain ALL lock on project ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
-
- logger.info("--- obtain COL lock on project ---");
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "1");
- assertJSON(result, "uid", "testuser");
- lock = result.getString("lock");
-
- logger.info("--- release COL lock on project ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
-
- logger.info("--- obtain CELL lock on project ---");
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
- assertJSON(result, "uid", "testuser");
- lock = result.getString("lock");
-
- logger.info("--- release CELL lock on project ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testLocksAllBlocks() {
- String project = "proj1";
- String user = "testuser";
- String user2 = "testuser2";
-
- try {
- logger.info("--- obtain ALL lock on project ---");
- JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
- assertJSON(result, "uid", user);
- String lock = result.getString("lock");
-
- logger.info("--- another using asking for any lock will fail ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "1");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
-
- logger.info("--- same user asking for lower capable locks will return the ALL one ---");
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "1");
- String lock2 = result.getString("lock");
- Assert.assertEquals(lock, lock2);
-
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
- lock2 = result.getString("lock");
- Assert.assertEquals(lock, lock2);
-
- logger.info("--- release the ALL lock ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testLocksColBlocks() {
- String project = "proj2";
- String user = "testuser";
- String user2 = "testuser2";
-
- try {
- logger.info("--- obtain COL lock on project ---");
- JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "1");
- String lock = result.getString("lock");
-
- logger.info("--- other user must fail to obtain lock on the same COL or ALL ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "1");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
-
- logger.info("--- but succeed in getting a COL lock on another column or cell ---");
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "2");
- String lock2 = result.getString("lock");
-
- logger.info("--- now it's our first user's turn to fail to get lock ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "2");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "2,1");
-
- logger.info("--- release the locks ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user2, "lock", lock2);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testLocksCellBlocks() {
- String project = "proj3";
- String user = "testuser";
- String user2 = "testuser2";
-
- try {
- logger.info("--- obtain CELL lock on project ---");
- JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
- String lock = result.getString("lock");
-
- logger.info("--- other user must fail to obtain lock on the same CELL, COL or ALL ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", "1");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "1,1");
-
- logger.info("--- but succeed in getting a CELL lock on a cell in another column ---");
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(CELL), "lockvalue", "2,1");
- String lock2 = result.getString("lock");
-
- logger.info("--- now it's our first user's turn to fail to get lock ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", "2");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(CELL), "lockvalue", "2,1");
-
- logger.info("--- release the locks ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user2, "lock", lock2);
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Test public void testCompleteProjectLifeCycle() {
- try {
- String project = "proj4";
- String user = "testuser";
- String user2 = "testuser2";
- String data = "blah";
- String metadata = "{}";
- String transformations = "[]";
- String rev = "0";
-
- logger.info("--- obtain ALL lock on project ---");
- JSONObject result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
- assertJSON(result, "uid", user);
- String lock = result.getString("lock");
-
- logger.info("--- start project ---");
- success(broker, request, response, START, "pid", project, "uid", user, "lock", lock, "data", data, "metadata", metadata, "transformations", transformations);
-
- logger.info("--- verify project state contains lock ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", rev);
- JSONArray locks = result.getJSONArray("locks");
- Assert.assertEquals(locks.length(), 1);
- JSONObject l = locks.getJSONObject(0);
- assertJSON(l, "uid", "testuser");
- Assert.assertEquals(l.getInt("type"), ALL);
-
- logger.info("--- release ALL lock on project ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", lock);
-
- logger.info("--- verify no locks are present ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", rev);
- locks = result.getJSONArray("locks");
- Assert.assertEquals(locks.length(), 0);
-
- logger.info("--- open project and verify data was loaded correctly ---");
- result = success(broker, request, response, OPEN, "pid", project, "uid", user, "rev", rev);
- JSONArray result_data = result.getJSONArray("data");
- Assert.assertEquals(result_data.length(),data.getBytes("UTF-8").length);
-
- JSONArray tt;
- JSONObject t;
-
- logger.info("--- obtain column lock ---");
- String column = "1";
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(COL), "lockvalue", column);
- String col_lock = result.getString("lock");
-
- logger.info("--- perform column transformation ---");
- t = new JSONObject();
- t.put("op_type", COL);
- t.put("op_value", column); // operate on col 1
- t.put("value", new JSONObject());
- tt = new JSONArray();
- tt.put(t);
- result = success(broker, request, response, TRANSFORM, "pid", project, "uid", user, "lock", col_lock, "transformations", tt.toString());
-
- logger.info("--- make sure transformation was recorded properly ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "0");
- tt = result.getJSONArray("transformations");
- Assert.assertEquals(tt.length(), 1);
- t = tt.getJSONObject(0);
- assertJSON(t, "op_value", column);
-
- logger.info("--- make sure revision numbers in state management work as expected ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "1");
- tt = result.getJSONArray("transformations");
- Assert.assertEquals(tt.length(), 0);
-
- logger.info("--- perform cell transformation ---");
- String cell = "1";
- t = new JSONObject();
- t.put("op_type", CELL);
- t.put("op_value", column + "," + cell); // operate on cell at row 1 column 1
- t.put("value", new JSONObject());
- tt = new JSONArray();
- tt.put(t);
- result = success(broker, request, response, TRANSFORM, "pid", project, "uid", user, "lock", col_lock, "transformations", tt.toString());
-
- logger.info("--- make sure transformation was recorded properly ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "0");
- tt = result.getJSONArray("transformations");
- Assert.assertEquals(tt.length(), 2);
-
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "1");
- tt = result.getJSONArray("transformations");
- Assert.assertEquals(tt.length(), 1);
- t = tt.getJSONObject(0);
- assertJSON(t, "op_value", column + "," + cell);
-
- logger.info("--- make sure another user fails to acquire ALL lock ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(ALL), "lockvalue", "");
-
- logger.info("--- make sure another user fails to acquire COL lock on the same column ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", column);
-
- logger.info("--- make sure another user manages to acquire COL lock on another column ---");
- String column2 = "2";
- result = success(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user2, "locktype", Integer.toString(COL), "lockvalue", column2);
- String col_lock2 = result.getString("lock");
-
- logger.info("--- make sure that both locks are present ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "2");
- locks = result.getJSONArray("locks");
- Assert.assertEquals(locks.length(), 2);
-
- logger.info("--- make sure we can't escalate our current COL lock to an ALL lock ---");
- failure(broker, request, response, OBTAIN_LOCK, "pid", project, "uid", user, "locktype", Integer.toString(ALL), "lockvalue", "");
-
- logger.info("--- release column locks ---");
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user, "lock", col_lock);
- success(broker, request, response, RELEASE_LOCK, "pid", project, "uid", user2, "lock", col_lock2);
-
- logger.info("--- make sure the project has no locks ---");
- result = success(broker, request, response, GET_STATE, "pid", project, "uid", user, "rev", "2");
- locks = result.getJSONArray("locks");
- Assert.assertEquals(locks.length(), 0);
-
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- // ------------------------------------------------------------------------------------
-
- private void assertJSON(JSONObject o, String name, String value) throws JSONException {
- Assert.assertEquals(o.get(name), value);
- }
-
- private JSONObject success(GridworksBroker broker, HttpServletRequest request, HttpServletResponse response, String service, String... params) throws Exception {
- return call(true, broker, request, response, service, params);
- }
-
- private JSONObject failure(GridworksBroker broker, HttpServletRequest request, HttpServletResponse response, String service, String... params) throws Exception {
- return call(false, broker, request, response, service, params);
- }
-
- private JSONObject call(boolean successful, GridworksBroker broker, HttpServletRequest request, HttpServletResponse response, String service, String... params) throws Exception {
- if (params != null) {
- for (int i = 0; i < params.length; ) {
- String name = params[i++];
- String value = params[i++];
- if ("data".equals(name)) {
- final ByteArrayInputStream inputStream = new ByteArrayInputStream(value.getBytes("UTF-8"));
- when(request.getInputStream()).thenReturn(new ServletInputStream() {
- public int read() throws IOException {
- return inputStream.read();
- }
- });
- } else {
- when(request.getParameter(name)).thenReturn(value);
- }
- }
- }
-
- StringWriter writer = new StringWriter();
- when(response.getWriter()).thenReturn(new PrintWriter(writer));
-
- broker.process(service, request, response);
-
- JSONObject result = new JSONObject(writer.toString());
-
- if (successful) {
- assertJSON(result, "status", "ok");
- } else {
- assertJSON(result, "status", "error");
- }
-
- logger.info(result.toString());
-
- return result;
- }
-}
diff --git a/extensions/jython/module/MOD-INF/controller.js b/extensions/jython/module/MOD-INF/controller.js
index 6c4c6de08..5932fdaf3 100644
--- a/extensions/jython/module/MOD-INF/controller.js
+++ b/extensions/jython/module/MOD-INF/controller.js
@@ -1,10 +1,10 @@
function init() {
// Packages.java.lang.System.err.println("Initializing jython extension");
- Packages.com.metaweb.gridworks.expr.MetaParser.registerLanguageParser(
+ Packages.com.google.gridworks.expr.MetaParser.registerLanguageParser(
"jython",
"Jython",
- Packages.com.metaweb.gridworks.jython.JythonEvaluable.createParser(),
+ Packages.com.google.gridworks.jython.JythonEvaluable.createParser(),
"return value"
);
}
diff --git a/extensions/jython/src/com/google/gridworks/jython/JythonEvaluable.java b/extensions/jython/src/com/google/gridworks/jython/JythonEvaluable.java
new file mode 100644
index 000000000..5ea3aeb72
--- /dev/null
+++ b/extensions/jython/src/com/google/gridworks/jython/JythonEvaluable.java
@@ -0,0 +1,131 @@
+package com.google.gridworks.jython;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.python.core.Py;
+import org.python.core.PyException;
+import org.python.core.PyFunction;
+import org.python.core.PyNone;
+import org.python.core.PyObject;
+import org.python.core.PyString;
+import org.python.util.PythonInterpreter;
+
+import com.google.gridworks.expr.EvalError;
+import com.google.gridworks.expr.Evaluable;
+import com.google.gridworks.expr.HasFields;
+import com.google.gridworks.expr.LanguageSpecificParser;
+import com.google.gridworks.expr.ParsingException;
+
+public class JythonEvaluable implements Evaluable {
+
+ static public LanguageSpecificParser createParser() {
+ return new LanguageSpecificParser() {
+
+ public Evaluable parse(String s) throws ParsingException {
+ return new JythonEvaluable(s);
+ }
+ };
+ }
+
+ private static final String s_functionName = "___temp___";
+
+ private static PythonInterpreter _engine;
+
+ // FIXME(SM): this initialization logic depends on the fact that the JVM's
+ // current working directory is the root of the Gridworks distributions
+ // or the development checkouts. While this works in practice, it would
+ // be preferable to have a more reliable address space, but since we
+ // don't have access to the servlet context from this class this is
+ // the best we can do for now.
+ static {
+ File libPath = new File("webapp/WEB-INF/lib/jython");
+ if (!libPath.exists() && !libPath.canRead()) {
+ libPath = new File("main/webapp/WEB-INF/lib/jython");
+ if (!libPath.exists() && !libPath.canRead()) {
+ libPath = null;
+ }
+ }
+
+ if (libPath != null) {
+ Properties props = new Properties();
+ props.setProperty("python.path", libPath.getAbsolutePath());
+ PythonInterpreter.initialize(System.getProperties(), props, new String[] { "" });
+ }
+
+ _engine = new PythonInterpreter();
+ }
+
+ public JythonEvaluable(String s) {
+ // indent and create a function out of the code
+ String[] lines = s.split("\r\n|\r|\n");
+
+ StringBuffer sb = new StringBuffer(1024);
+ sb.append("def ");
+ sb.append(s_functionName);
+ sb.append("(value, cell, cells, row, rowIndex):");
+ for (int i = 0; i < lines.length; i++) {
+ sb.append("\n ");
+ sb.append(lines[i]);
+ }
+
+ _engine.exec(sb.toString());
+ }
+
+ public Object evaluate(Properties bindings) {
+ try {
+ // call the temporary PyFunction directly
+ Object result = ((PyFunction)_engine.get(s_functionName)).__call__(
+ new PyObject[] {
+ Py.java2py( bindings.get("value") ),
+ new JythonHasFieldsWrapper((HasFields) bindings.get("cell"), bindings),
+ new JythonHasFieldsWrapper((HasFields) bindings.get("cells"), bindings),
+ new JythonHasFieldsWrapper((HasFields) bindings.get("row"), bindings),
+ Py.java2py( bindings.get("rowIndex") )
+ }
+ );
+
+ return unwrap(result);
+ } catch (PyException e) {
+ return new EvalError(e.getMessage());
+ }
+ }
+
+ protected Object unwrap(Object result) {
+ if (result != null) {
+ if (result instanceof JythonObjectWrapper) {
+ return ((JythonObjectWrapper) result)._obj;
+ } else if (result instanceof JythonHasFieldsWrapper) {
+ return ((JythonHasFieldsWrapper) result)._obj;
+ } else if (result instanceof PyString) {
+ return ((PyString) result).asString();
+ } else if (result instanceof PyObject) {
+ return unwrap((PyObject) result);
+ }
+ }
+
+ return result;
+ }
+
+ protected Object unwrap(PyObject po) {
+ if (po instanceof PyNone) {
+ return null;
+ } else if (po.isNumberType()) {
+ return po.asDouble();
+ } else if (po.isSequenceType()) {
+ Iterator i = po.asIterable().iterator();
+
+ List