From 9f759719e7362663a3732f19fe5bcff5599d0bc6 Mon Sep 17 00:00:00 2001 From: Stefano Mazzocchi Date: Fri, 2 Jul 2010 09:30:24 +0000 Subject: [PATCH] more work on the broker git-svn-id: http://google-refine.googlecode.com/svn/trunk@1065 7d457c2a-affb-35e4-300a-418c747d4874 --- .../gridworks/broker/GridworksBroker.java | 58 ++-- .../gridworks/broker/GridworksBrokerImpl.java | 327 +++++++++++++----- 2 files changed, 274 insertions(+), 111 deletions(-) diff --git a/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java b/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java index de358cc4b..452135f00 100644 --- a/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java +++ b/broker/core/src/com/metaweb/gridworks/broker/GridworksBroker.java @@ -1,7 +1,9 @@ package com.metaweb.gridworks.broker; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.PrintWriter; import java.io.StringWriter; import java.io.Writer; @@ -26,7 +28,6 @@ import org.apache.http.params.CoreProtocolPNames; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,10 @@ public abstract class GridworksBroker extends ButterflyModuleImpl { static final protected String DELEGATED_OAUTH_HEADER = "X-Freebase-Credentials"; static final protected String OAUTH_HEADER = "Authorization"; + static final protected int ALL = 0; + static final protected int COLUMN = 1; + static final protected int CELL = 2; + static protected String OK; static { @@ -63,6 +68,7 @@ public abstract class GridworksBroker extends ButterflyModuleImpl { } static public final long LOCK_DURATION = 60 * 1000; // 1 minute + static public final long USER_DURATION = 5 * 60 * 1000; // 1 minute static public final long LOCK_EXPIRATION_CHECK_DELAY = 5 * 1000; // 5 seconds protected HttpClient httpclient; @@ -103,22 +109,20 @@ public abstract class GridworksBroker extends ButterflyModuleImpl { // we could be using a hashtable and some classes that implement the commands, but the complexity overhead // doesn't seem to justify the marginal benefit. - if ("get_lock".equals(path)) { - getLock(response, pid); - } else if ("expire_locks".equals(path)) { - expireLocks(response); + if ("get_state".equals(path)) { + getState(response, pid, uid, getInteger(request, "rev")); + } else if ("expire".equals(path)) { + expire(response); } else if ("obtain_lock".equals(path)) { - obtainLock(response, pid, uid); + obtainLock(response, pid, uid, getInteger(request, "locktype"), getParameter(request, "lockvalue")); } else if ("release_lock".equals(path)) { releaseLock(response, pid, uid, getParameter(request, "lock")); - } else if ("history".equals(path)) { - getHistory(response, pid, getInteger(request, "tindex")); } else if ("transform".equals(path)) { addTransformations(response, pid, uid, getParameter(request, "lock"), getList(request, "transformations")); } else if ("start".equals(path)) { - startProject(response, pid, uid, getParameter(request, "lock"), getParameter(request, "data")); - } else if ("get".equals(path)) { - getProject(response, pid); + startProject(response, pid, uid, getParameter(request, "lock"), getData(request), getParameter(request, "metadata"), getInteger(request, "rev")); + } else if ("open".equals(path)) { + openProject(response, pid); } else { boolean value = super.process(path, request, response); if (logger.isDebugEnabled()) logger.debug("< process '{}'", path); @@ -142,21 +146,19 @@ public abstract class GridworksBroker extends ButterflyModuleImpl { protected abstract HttpClient getHttpClient(); - protected abstract void expireLocks(HttpServletResponse response) throws Exception; - - protected abstract void getLock(HttpServletResponse response, String pid) throws Exception; + protected abstract void expire(HttpServletResponse response) throws Exception; - protected abstract void obtainLock(HttpServletResponse response, String pid, String uid) throws Exception; + protected abstract void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception; + + protected abstract void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception; protected abstract void releaseLock(HttpServletResponse response, String pid, String uid, String lock) throws Exception; - - protected abstract void startProject(HttpServletResponse response, String pid, String uid, String lock, String data) throws Exception; + + protected abstract void startProject(HttpServletResponse response, String pid, String uid, String lock, byte[] data, String metadata, int rev) throws Exception; protected abstract void addTransformations(HttpServletResponse response, String pid, String uid, String lock, List transformations) throws Exception; - protected abstract void getProject(HttpServletResponse response, String pid) throws Exception; - - protected abstract void getHistory(HttpServletResponse response, String pid, int tindex) throws Exception; + protected abstract void openProject(HttpServletResponse response, String pid) throws Exception; // ---------------------------------------------------------------------------------------- @@ -211,11 +213,25 @@ public abstract class GridworksBroker extends ButterflyModuleImpl { } return result; } - + static protected int getInteger(HttpServletRequest request, String name) throws ServletException, JSONException { return Integer.parseInt(getParameter(request, name)); } + static protected byte[] getData(HttpServletRequest request) throws ServletException, IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + InputStream input = request.getInputStream(); + byte[] buffer = new byte[4096]; + int count = 0; + int n = 0; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + count += n; + } + return buffer; + + } + static protected void respondError(HttpServletResponse response, String error) throws IOException, ServletException { if (response == null) { diff --git a/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java b/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java index 11b6fd827..311802429 100644 --- a/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java +++ b/broker/core/src/com/metaweb/gridworks/broker/GridworksBrokerImpl.java @@ -1,5 +1,7 @@ package com.metaweb.gridworks.broker; +import static com.sleepycat.persist.model.Relationship.MANY_TO_ONE; + import java.io.File; import java.io.Writer; import java.util.ArrayList; @@ -24,30 +26,37 @@ import com.sleepycat.je.Transaction; import com.sleepycat.persist.EntityCursor; import com.sleepycat.persist.EntityStore; import com.sleepycat.persist.PrimaryIndex; +import com.sleepycat.persist.SecondaryIndex; import com.sleepycat.persist.StoreConfig; import com.sleepycat.persist.model.Entity; import com.sleepycat.persist.model.PrimaryKey; +import com.sleepycat.persist.model.SecondaryKey; public class GridworksBrokerImpl extends GridworksBroker { protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.local"); Environment env; + EntityStore projectStore; EntityStore lockStore; + EntityStore userStore; PrimaryIndex projectById; - PrimaryIndex lockByProject; + PrimaryIndex lockById; + SecondaryIndex locksByProject; + Timer timer; - LockExpirer expirer; + Expirer expirer; @Override public void init(ServletConfig config) throws Exception { + logger.trace("> init"); super.init(config); timer = new Timer(); - expirer = new LockExpirer(); + expirer = new Expirer(); timer.schedule(expirer, LOCK_EXPIRATION_CHECK_DELAY, LOCK_EXPIRATION_CHECK_DELAY); String dataDir = config.getInitParameter("gridworks.data"); @@ -65,13 +74,17 @@ public class GridworksBrokerImpl extends GridworksBroker { storeConfig.setTransactional(true); projectStore = new EntityStore(env, "ProjectsStore", storeConfig); lockStore = new EntityStore(env, "LockStore", storeConfig); - + projectById = projectStore.getPrimaryIndex(String.class, Project.class); - lockByProject = lockStore.getPrimaryIndex(String.class, Lock.class); + lockById = lockStore.getPrimaryIndex(String.class, Lock.class); + + locksByProject = lockStore.getSecondaryIndex(lockById, String.class, "pid"); + logger.trace("< init"); } @Override public void destroy() throws Exception { + logger.trace("> destroy"); super.destroy(); if (projectStore != null) { @@ -81,7 +94,7 @@ public class GridworksBrokerImpl extends GridworksBroker { if (lockStore != null) { lockStore.close(); - lockByProject = null; + lockById = null; } if (timer != null) { @@ -94,89 +107,127 @@ public class GridworksBrokerImpl extends GridworksBroker { env.close(); env = null; } + logger.trace("< destroy"); } - class LockExpirer extends TimerTask { + class Expirer extends TimerTask { public void run() { - if (lockByProject != null) { - EntityCursor cursor = lockByProject.entities(); + if (lockById != null) { + logger.trace("> expire"); + Transaction txn = env.beginTransaction(null, null); try { - for (Lock lock = cursor.first(); lock != null; lock = cursor.next()) { - if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) { - try { - releaseLock(null, lock.pid, lock.uid, lock.id); - } catch (Exception e) { - logger.error("Exception while expiring lock for project '" + lock.pid + "'", e); + EntityCursor cursor = lockById.entities(); + try { + for (Lock lock : cursor) { + if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) { + logger.trace("Found expired lock {}", lock.id); + try { + releaseLock(null, lock.pid, lock.uid, lock.id); + } catch (Exception e) { + logger.error("Exception while expiring lock for project '" + lock.pid + "'", e); + } } } + } finally { + cursor.close(); } } finally { - cursor.close(); - } + if (txn != null) { + txn.abort(); + txn = null; + } + } + logger.trace("< expire"); } } } // --------------------------------------------------------------------------------- + @Override protected HttpClient getHttpClient() { return new DefaultHttpClient(); } // --------------------------------------------------------------------------------- - protected void expireLocks(HttpServletResponse response) throws Exception { + @Override + protected void expire(HttpServletResponse response) throws Exception { expirer.run(); respond(response, OK); } - - protected void getLock(HttpServletResponse response, String pid) throws Exception { - respond(response, lockToJSON(getLock(pid))); - } + + @Override + protected void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception { - protected void obtainLock(HttpServletResponse response, String pid, String uid) throws Exception { + Lock lock = null; Transaction txn = env.beginTransaction(null, null); - - Lock lock = getLock(pid); - if (lock == null) { + + try { + + EntityCursor cursor = locksByProject.subIndex(pid).entities(); + try { - lock = new Lock(Long.toHexString(txn.getId()), pid, uid); - lockByProject.put(txn, lock); - txn.commit(); - } finally { - if (txn != null) { - txn.abort(); - txn = null; + for (Lock l : cursor) { + if (locktype == ALL) { + if (l.type == ALL) { + lock = l; + break; + } + } else if (locktype == COLUMN) { + if (l.type == ALL || + (l.type == COLUMN && l.value.equals(lockvalue))) { + lock = l; + break; + } + } else if (locktype == CELL) { + if (l.type == ALL || + (l.type == COLUMN && l.value.equals(lockvalue.split(",")[0])) || + (l.type == CELL && l.value.equals(lockvalue))) { + lock = l; + break; + } + } } + } finally { + cursor.close(); + } + + if (lock == null) { + lock = new Lock(Long.toHexString(txn.getId()), pid, uid, locktype, lockvalue); + lockById.put(txn, lock); + txn.commit(); + } + + } finally { + if (txn != null) { + txn.abort(); + txn = null; } } - + respond(response, lockToJSON(lock)); } + @Override protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception { Transaction txn = env.beginTransaction(null, null); - Lock lock = getLock(pid); - if (lock != null) { - try { - if (!lock.id.equals(lid)) { - throw new RuntimeException("Lock id doesn't match, can't release the lock"); - } + try { + Lock lock = getLock(lid, pid, uid); + if (lock != null) { if (!lock.uid.equals(uid)) { throw new RuntimeException("User id doesn't match the lock owner, can't release the lock"); } - - lockByProject.delete(pid); - + lockById.delete(pid); txn.commit(); - } finally { - if (txn != null) { - txn.abort(); - txn = null; - } + } + } finally { + if (txn != null) { + txn.abort(); + txn = null; } } @@ -186,19 +237,24 @@ public class GridworksBrokerImpl extends GridworksBroker { } // ---------------------------------------------------------------------------------------------------- - - protected void startProject(HttpServletResponse response, String pid, String uid, String lid, String data) throws Exception { + + @Override + protected void startProject(HttpServletResponse response, String pid, String uid, String lid, byte[] data, String metadata, int rev) throws Exception { Transaction txn = env.beginTransaction(null, null); - checkLock(pid, uid, lid); - - if (projectById.contains(pid)) { - throw new RuntimeException("Project '" + pid + "' already exists"); - } - try { - projectById.put(txn, new Project(pid, data)); + if (projectById.contains(pid)) { + throw new RuntimeException("Project '" + pid + "' already exists"); + } + + Lock lock = getLock(lid, pid, uid); + + if (lock.type != ALL) { + throw new RuntimeException("The lock you have is not enough to start a project"); + } + + projectById.put(txn, new Project(pid, data, metadata, rev)); txn.commit(); } finally { if (txn != null) { @@ -209,21 +265,58 @@ public class GridworksBrokerImpl extends GridworksBroker { respond(response, OK); } - + + @Override protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List transformations) throws Exception { Transaction txn = env.beginTransaction(null, null); - checkLock(pid, uid, lid); - - Project project = getProject(pid); - - if (project == null) { - throw new RuntimeException("Project '" + pid + "' not found"); - } - try { - project.transformations.addAll(transformations); + Project project = getProject(pid); + + if (project == null) { + throw new RuntimeException("Project '" + pid + "' not found"); + } + + Lock lock = getLock(lid, pid, uid); + + if (lock.type == ALL) { + project.transformations.addAll(transformations); + } else { + for (String s : transformations) { + JSONObject o = new JSONObject(s); + + int type = o.getInt("op_type"); + String value = o.getString("op_value"); + if (lock.type == COLUMN) { + if (type == COLUMN) { + if (value != null && value.equals(lock.value)) { + project.transformations.add(s); + } else { + throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify column '" + value + "'."); + } + } else if (type == CELL) { + String column = value.split(",")[0]; + if (column != null && column.equals(lock.value)) { + project.transformations.add(s); + } else { + throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify cell '" + value + "' in another column."); + } + } + } else if (lock.type == CELL) { + if (type == COLUMN) { + throw new RuntimeException("Can't apply '" + s + "': you offered a lock for a single cell and you're attempting an operation for the entire column."); + } else if (type == CELL) { + if (value != null && value.equals(lock.value)) { + project.transformations.add(s); + } else { + throw new RuntimeException("Can't apply '" + s + "': you have a lock for cell '" + lock.value + "' and you're attempting to modify cell '" + value + "'."); + } + } + } + } + } + txn.commit(); } finally { if (txn != null) { @@ -237,48 +330,71 @@ public class GridworksBrokerImpl extends GridworksBroker { // --------------------------------------------------------------------------------- - protected void getProject(HttpServletResponse response, String pid) throws Exception { + @Override + protected void openProject(HttpServletResponse response, String pid) throws Exception { Project project = getProject(pid); Writer w = response.getWriter(); JSONWriter writer = new JSONWriter(w); writer.object(); writer.key("data"); writer.value(project.data); + writer.key("metadata"); writer.value(new JSONObject(project.metadata)); writer.key("transformations"); writer.array(); for (String s : project.transformations) { - writer.value(s); + writer.value(new JSONObject(s)); } writer.endArray(); writer.endObject(); w.flush(); w.close(); } - - protected void getHistory(HttpServletResponse response, String pid, int tindex) throws Exception { - Project project = getProject(pid); + + // --------------------------------------------------------------------------------- + @Override + protected void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception { + + Project project = getProject(pid); + Writer w = response.getWriter(); JSONWriter writer = new JSONWriter(w); + writer.object(); writer.key("transformations"); writer.array(); int size = project.transformations.size(); - for (int i = tindex; i < size; i++) { + for (int i = rev; i < size; i++) { writer.value(project.transformations.get(i)); } writer.endArray(); writer.endObject(); - w.flush(); - w.close(); - } + EntityCursor cursor = locksByProject.subIndex(pid).entities(); + + try { + writer.object(); + writer.key("locks"); + writer.array(); + for (Lock lock : cursor) { + writer.value(lockToJSON(lock)); + } + writer.endArray(); + writer.endObject(); + + w.flush(); + w.close(); + } finally { + cursor.close(); + } + } + // --------------------------------------------------------------------------------- Project getProject(String pid) { Project project = projectById.get(pid); if (project == null) { - throw new RuntimeException("Project '" + pid + "' is not managed by this broker"); + throw new RuntimeException("Project '" + pid + "' could not be found: are you sure is not managed by another broker?"); } return project; } @@ -290,39 +406,63 @@ public class GridworksBrokerImpl extends GridworksBroker { String pid; List transformations = new ArrayList(); + + byte[] data; - String data; + String metadata; + + int rev; - Project(String pid, String data) { + Project(String pid, byte[] data, String metadata, int rev) { this.pid = pid; this.data = data; + this.metadata = metadata; + this.rev = rev; } @SuppressWarnings("unused") private Project() {} } - + // --------------------------------------------------------------------------------- - Lock getLock(String pid) { - return lockByProject.get(pid); + Lock getLock(String lid, String pid, String uid) { + Lock lock = lockById.get(lid); + checkLock(lock, lid, pid, uid); + return lock; } - void checkLock(String pid, String uid, String lid) { - Lock lock = getLock(pid); - + void checkLock(Lock lock, String lid, String pid, String uid) { if (lock == null) { throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it"); } if (!lock.pid.equals(pid)) { - throw new RuntimeException("Lock '" + lid + "' is for another project: " + pid); + throw new RuntimeException("Lock '" + lock.id + "' is for another project: " + lock.pid); } if (!lock.uid.equals(uid)) { - throw new RuntimeException("Lock '" + lid + "' is owned by another user: " + uid); + throw new RuntimeException("Lock '" + lock.id + "' is owned by another user: " + lock.uid); } } + + Lock getLock(String pid, String uid, int locktype) { + Lock lock = null; + EntityCursor cursor = locksByProject.subIndex(pid).entities(); + + try { + for (Lock l : cursor) { + if (uid.equals(l.uid) && (locktype == l.type)) { + lock = l; + break; + } + } + } finally { + cursor.close(); + } + + return lock; + } JSONObject lockToJSON(Lock lock) throws JSONException { JSONObject o = new JSONObject(); @@ -339,18 +479,25 @@ public class GridworksBrokerImpl extends GridworksBroker { static class Lock { @PrimaryKey - String pid; - String id; + @SecondaryKey(relate=MANY_TO_ONE) + String pid; + String uid; + int type; + + String value; + long timestamp; - Lock(String id, String pid, String uid) { + Lock(String id, String pid, String uid, int type, String value) { this.id = id; this.pid = pid; this.uid = uid; + this.type = type; + this.value = value; this.timestamp = System.currentTimeMillis(); }