more work on the broker

git-svn-id: http://google-refine.googlecode.com/svn/trunk@1065 7d457c2a-affb-35e4-300a-418c747d4874
This commit is contained in:
Stefano Mazzocchi 2010-07-02 09:30:24 +00:00
parent 217fb7b25c
commit 9f759719e7
2 changed files with 274 additions and 111 deletions

View File

@ -1,7 +1,9 @@
package com.metaweb.gridworks.broker; package com.metaweb.gridworks.broker;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.io.Writer; import java.io.Writer;
@ -26,7 +28,6 @@ import org.apache.http.params.CoreProtocolPNames;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,6 +51,10 @@ public abstract class GridworksBroker extends ButterflyModuleImpl {
static final protected String DELEGATED_OAUTH_HEADER = "X-Freebase-Credentials"; static final protected String DELEGATED_OAUTH_HEADER = "X-Freebase-Credentials";
static final protected String OAUTH_HEADER = "Authorization"; static final protected String OAUTH_HEADER = "Authorization";
static final protected int ALL = 0;
static final protected int COLUMN = 1;
static final protected int CELL = 2;
static protected String OK; static protected String OK;
static { static {
@ -63,6 +68,7 @@ public abstract class GridworksBroker extends ButterflyModuleImpl {
} }
static public final long LOCK_DURATION = 60 * 1000; // 1 minute static public final long LOCK_DURATION = 60 * 1000; // 1 minute
static public final long USER_DURATION = 5 * 60 * 1000; // 1 minute
static public final long LOCK_EXPIRATION_CHECK_DELAY = 5 * 1000; // 5 seconds static public final long LOCK_EXPIRATION_CHECK_DELAY = 5 * 1000; // 5 seconds
protected HttpClient httpclient; protected HttpClient httpclient;
@ -103,22 +109,20 @@ public abstract class GridworksBroker extends ButterflyModuleImpl {
// we could be using a hashtable and some classes that implement the commands, but the complexity overhead // we could be using a hashtable and some classes that implement the commands, but the complexity overhead
// doesn't seem to justify the marginal benefit. // doesn't seem to justify the marginal benefit.
if ("get_lock".equals(path)) { if ("get_state".equals(path)) {
getLock(response, pid); getState(response, pid, uid, getInteger(request, "rev"));
} else if ("expire_locks".equals(path)) { } else if ("expire".equals(path)) {
expireLocks(response); expire(response);
} else if ("obtain_lock".equals(path)) { } else if ("obtain_lock".equals(path)) {
obtainLock(response, pid, uid); obtainLock(response, pid, uid, getInteger(request, "locktype"), getParameter(request, "lockvalue"));
} else if ("release_lock".equals(path)) { } else if ("release_lock".equals(path)) {
releaseLock(response, pid, uid, getParameter(request, "lock")); releaseLock(response, pid, uid, getParameter(request, "lock"));
} else if ("history".equals(path)) {
getHistory(response, pid, getInteger(request, "tindex"));
} else if ("transform".equals(path)) { } else if ("transform".equals(path)) {
addTransformations(response, pid, uid, getParameter(request, "lock"), getList(request, "transformations")); addTransformations(response, pid, uid, getParameter(request, "lock"), getList(request, "transformations"));
} else if ("start".equals(path)) { } else if ("start".equals(path)) {
startProject(response, pid, uid, getParameter(request, "lock"), getParameter(request, "data")); startProject(response, pid, uid, getParameter(request, "lock"), getData(request), getParameter(request, "metadata"), getInteger(request, "rev"));
} else if ("get".equals(path)) { } else if ("open".equals(path)) {
getProject(response, pid); openProject(response, pid);
} else { } else {
boolean value = super.process(path, request, response); boolean value = super.process(path, request, response);
if (logger.isDebugEnabled()) logger.debug("< process '{}'", path); if (logger.isDebugEnabled()) logger.debug("< process '{}'", path);
@ -142,21 +146,19 @@ public abstract class GridworksBroker extends ButterflyModuleImpl {
protected abstract HttpClient getHttpClient(); protected abstract HttpClient getHttpClient();
protected abstract void expireLocks(HttpServletResponse response) throws Exception; protected abstract void expire(HttpServletResponse response) throws Exception;
protected abstract void getLock(HttpServletResponse response, String pid) throws Exception;
protected abstract void obtainLock(HttpServletResponse response, String pid, String uid) throws Exception; protected abstract void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception;
protected abstract void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception;
protected abstract void releaseLock(HttpServletResponse response, String pid, String uid, String lock) throws Exception; protected abstract void releaseLock(HttpServletResponse response, String pid, String uid, String lock) throws Exception;
protected abstract void startProject(HttpServletResponse response, String pid, String uid, String lock, String data) throws Exception; protected abstract void startProject(HttpServletResponse response, String pid, String uid, String lock, byte[] data, String metadata, int rev) throws Exception;
protected abstract void addTransformations(HttpServletResponse response, String pid, String uid, String lock, List<String> transformations) throws Exception; protected abstract void addTransformations(HttpServletResponse response, String pid, String uid, String lock, List<String> transformations) throws Exception;
protected abstract void getProject(HttpServletResponse response, String pid) throws Exception; protected abstract void openProject(HttpServletResponse response, String pid) throws Exception;
protected abstract void getHistory(HttpServletResponse response, String pid, int tindex) throws Exception;
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
@ -211,11 +213,25 @@ public abstract class GridworksBroker extends ButterflyModuleImpl {
} }
return result; return result;
} }
static protected int getInteger(HttpServletRequest request, String name) throws ServletException, JSONException { static protected int getInteger(HttpServletRequest request, String name) throws ServletException, JSONException {
return Integer.parseInt(getParameter(request, name)); return Integer.parseInt(getParameter(request, name));
} }
static protected byte[] getData(HttpServletRequest request) throws ServletException, IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
InputStream input = request.getInputStream();
byte[] buffer = new byte[4096];
int count = 0;
int n = 0;
while (-1 != (n = input.read(buffer))) {
output.write(buffer, 0, n);
count += n;
}
return buffer;
}
static protected void respondError(HttpServletResponse response, String error) throws IOException, ServletException { static protected void respondError(HttpServletResponse response, String error) throws IOException, ServletException {
if (response == null) { if (response == null) {

View File

@ -1,5 +1,7 @@
package com.metaweb.gridworks.broker; package com.metaweb.gridworks.broker;
import static com.sleepycat.persist.model.Relationship.MANY_TO_ONE;
import java.io.File; import java.io.File;
import java.io.Writer; import java.io.Writer;
import java.util.ArrayList; import java.util.ArrayList;
@ -24,30 +26,37 @@ import com.sleepycat.je.Transaction;
import com.sleepycat.persist.EntityCursor; import com.sleepycat.persist.EntityCursor;
import com.sleepycat.persist.EntityStore; import com.sleepycat.persist.EntityStore;
import com.sleepycat.persist.PrimaryIndex; import com.sleepycat.persist.PrimaryIndex;
import com.sleepycat.persist.SecondaryIndex;
import com.sleepycat.persist.StoreConfig; import com.sleepycat.persist.StoreConfig;
import com.sleepycat.persist.model.Entity; import com.sleepycat.persist.model.Entity;
import com.sleepycat.persist.model.PrimaryKey; import com.sleepycat.persist.model.PrimaryKey;
import com.sleepycat.persist.model.SecondaryKey;
public class GridworksBrokerImpl extends GridworksBroker { public class GridworksBrokerImpl extends GridworksBroker {
protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.local"); protected static final Logger logger = LoggerFactory.getLogger("gridworks.broker.local");
Environment env; Environment env;
EntityStore projectStore; EntityStore projectStore;
EntityStore lockStore; EntityStore lockStore;
EntityStore userStore;
PrimaryIndex<String,Project> projectById; PrimaryIndex<String,Project> projectById;
PrimaryIndex<String,Lock> lockByProject; PrimaryIndex<String,Lock> lockById;
SecondaryIndex<String,String,Lock> locksByProject;
Timer timer; Timer timer;
LockExpirer expirer; Expirer expirer;
@Override @Override
public void init(ServletConfig config) throws Exception { public void init(ServletConfig config) throws Exception {
logger.trace("> init");
super.init(config); super.init(config);
timer = new Timer(); timer = new Timer();
expirer = new LockExpirer(); expirer = new Expirer();
timer.schedule(expirer, LOCK_EXPIRATION_CHECK_DELAY, LOCK_EXPIRATION_CHECK_DELAY); timer.schedule(expirer, LOCK_EXPIRATION_CHECK_DELAY, LOCK_EXPIRATION_CHECK_DELAY);
String dataDir = config.getInitParameter("gridworks.data"); String dataDir = config.getInitParameter("gridworks.data");
@ -65,13 +74,17 @@ public class GridworksBrokerImpl extends GridworksBroker {
storeConfig.setTransactional(true); storeConfig.setTransactional(true);
projectStore = new EntityStore(env, "ProjectsStore", storeConfig); projectStore = new EntityStore(env, "ProjectsStore", storeConfig);
lockStore = new EntityStore(env, "LockStore", storeConfig); lockStore = new EntityStore(env, "LockStore", storeConfig);
projectById = projectStore.getPrimaryIndex(String.class, Project.class); projectById = projectStore.getPrimaryIndex(String.class, Project.class);
lockByProject = lockStore.getPrimaryIndex(String.class, Lock.class); lockById = lockStore.getPrimaryIndex(String.class, Lock.class);
locksByProject = lockStore.getSecondaryIndex(lockById, String.class, "pid");
logger.trace("< init");
} }
@Override @Override
public void destroy() throws Exception { public void destroy() throws Exception {
logger.trace("> destroy");
super.destroy(); super.destroy();
if (projectStore != null) { if (projectStore != null) {
@ -81,7 +94,7 @@ public class GridworksBrokerImpl extends GridworksBroker {
if (lockStore != null) { if (lockStore != null) {
lockStore.close(); lockStore.close();
lockByProject = null; lockById = null;
} }
if (timer != null) { if (timer != null) {
@ -94,89 +107,127 @@ public class GridworksBrokerImpl extends GridworksBroker {
env.close(); env.close();
env = null; env = null;
} }
logger.trace("< destroy");
} }
class LockExpirer extends TimerTask { class Expirer extends TimerTask {
public void run() { public void run() {
if (lockByProject != null) { if (lockById != null) {
EntityCursor<Lock> cursor = lockByProject.entities(); logger.trace("> expire");
Transaction txn = env.beginTransaction(null, null);
try { try {
for (Lock lock = cursor.first(); lock != null; lock = cursor.next()) { EntityCursor<Lock> cursor = lockById.entities();
if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) { try {
try { for (Lock lock : cursor) {
releaseLock(null, lock.pid, lock.uid, lock.id); if (lock.timestamp + LOCK_DURATION < System.currentTimeMillis()) {
} catch (Exception e) { logger.trace("Found expired lock {}", lock.id);
logger.error("Exception while expiring lock for project '" + lock.pid + "'", e); try {
releaseLock(null, lock.pid, lock.uid, lock.id);
} catch (Exception e) {
logger.error("Exception while expiring lock for project '" + lock.pid + "'", e);
}
} }
} }
} finally {
cursor.close();
} }
} finally { } finally {
cursor.close(); if (txn != null) {
} txn.abort();
txn = null;
}
}
logger.trace("< expire");
} }
} }
} }
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
@Override
protected HttpClient getHttpClient() { protected HttpClient getHttpClient() {
return new DefaultHttpClient(); return new DefaultHttpClient();
} }
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
protected void expireLocks(HttpServletResponse response) throws Exception { @Override
protected void expire(HttpServletResponse response) throws Exception {
expirer.run(); expirer.run();
respond(response, OK); respond(response, OK);
} }
protected void getLock(HttpServletResponse response, String pid) throws Exception { @Override
respond(response, lockToJSON(getLock(pid))); protected void obtainLock(HttpServletResponse response, String pid, String uid, int locktype, String lockvalue) throws Exception {
}
protected void obtainLock(HttpServletResponse response, String pid, String uid) throws Exception { Lock lock = null;
Transaction txn = env.beginTransaction(null, null); Transaction txn = env.beginTransaction(null, null);
Lock lock = getLock(pid); try {
if (lock == null) {
EntityCursor<Lock> cursor = locksByProject.subIndex(pid).entities();
try { try {
lock = new Lock(Long.toHexString(txn.getId()), pid, uid); for (Lock l : cursor) {
lockByProject.put(txn, lock); if (locktype == ALL) {
txn.commit(); if (l.type == ALL) {
} finally { lock = l;
if (txn != null) { break;
txn.abort(); }
txn = null; } else if (locktype == COLUMN) {
if (l.type == ALL ||
(l.type == COLUMN && l.value.equals(lockvalue))) {
lock = l;
break;
}
} else if (locktype == CELL) {
if (l.type == ALL ||
(l.type == COLUMN && l.value.equals(lockvalue.split(",")[0])) ||
(l.type == CELL && l.value.equals(lockvalue))) {
lock = l;
break;
}
}
} }
} finally {
cursor.close();
}
if (lock == null) {
lock = new Lock(Long.toHexString(txn.getId()), pid, uid, locktype, lockvalue);
lockById.put(txn, lock);
txn.commit();
}
} finally {
if (txn != null) {
txn.abort();
txn = null;
} }
} }
respond(response, lockToJSON(lock)); respond(response, lockToJSON(lock));
} }
@Override
protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception { protected void releaseLock(HttpServletResponse response, String pid, String uid, String lid) throws Exception {
Transaction txn = env.beginTransaction(null, null); Transaction txn = env.beginTransaction(null, null);
Lock lock = getLock(pid); try {
if (lock != null) { Lock lock = getLock(lid, pid, uid);
try { if (lock != null) {
if (!lock.id.equals(lid)) {
throw new RuntimeException("Lock id doesn't match, can't release the lock");
}
if (!lock.uid.equals(uid)) { if (!lock.uid.equals(uid)) {
throw new RuntimeException("User id doesn't match the lock owner, can't release the lock"); throw new RuntimeException("User id doesn't match the lock owner, can't release the lock");
} }
lockById.delete(pid);
lockByProject.delete(pid);
txn.commit(); txn.commit();
} finally { }
if (txn != null) { } finally {
txn.abort(); if (txn != null) {
txn = null; txn.abort();
} txn = null;
} }
} }
@ -186,19 +237,24 @@ public class GridworksBrokerImpl extends GridworksBroker {
} }
// ---------------------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------------------
protected void startProject(HttpServletResponse response, String pid, String uid, String lid, String data) throws Exception { @Override
protected void startProject(HttpServletResponse response, String pid, String uid, String lid, byte[] data, String metadata, int rev) throws Exception {
Transaction txn = env.beginTransaction(null, null); Transaction txn = env.beginTransaction(null, null);
checkLock(pid, uid, lid);
if (projectById.contains(pid)) {
throw new RuntimeException("Project '" + pid + "' already exists");
}
try { try {
projectById.put(txn, new Project(pid, data)); if (projectById.contains(pid)) {
throw new RuntimeException("Project '" + pid + "' already exists");
}
Lock lock = getLock(lid, pid, uid);
if (lock.type != ALL) {
throw new RuntimeException("The lock you have is not enough to start a project");
}
projectById.put(txn, new Project(pid, data, metadata, rev));
txn.commit(); txn.commit();
} finally { } finally {
if (txn != null) { if (txn != null) {
@ -209,21 +265,58 @@ public class GridworksBrokerImpl extends GridworksBroker {
respond(response, OK); respond(response, OK);
} }
@Override
protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List<String> transformations) throws Exception { protected void addTransformations(HttpServletResponse response, String pid, String uid, String lid, List<String> transformations) throws Exception {
Transaction txn = env.beginTransaction(null, null); Transaction txn = env.beginTransaction(null, null);
checkLock(pid, uid, lid);
Project project = getProject(pid);
if (project == null) {
throw new RuntimeException("Project '" + pid + "' not found");
}
try { try {
project.transformations.addAll(transformations); Project project = getProject(pid);
if (project == null) {
throw new RuntimeException("Project '" + pid + "' not found");
}
Lock lock = getLock(lid, pid, uid);
if (lock.type == ALL) {
project.transformations.addAll(transformations);
} else {
for (String s : transformations) {
JSONObject o = new JSONObject(s);
int type = o.getInt("op_type");
String value = o.getString("op_value");
if (lock.type == COLUMN) {
if (type == COLUMN) {
if (value != null && value.equals(lock.value)) {
project.transformations.add(s);
} else {
throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify column '" + value + "'.");
}
} else if (type == CELL) {
String column = value.split(",")[0];
if (column != null && column.equals(lock.value)) {
project.transformations.add(s);
} else {
throw new RuntimeException("Can't apply '" + s + "': you have a lock for column '" + lock.value + "' and you're attempting to modify cell '" + value + "' in another column.");
}
}
} else if (lock.type == CELL) {
if (type == COLUMN) {
throw new RuntimeException("Can't apply '" + s + "': you offered a lock for a single cell and you're attempting an operation for the entire column.");
} else if (type == CELL) {
if (value != null && value.equals(lock.value)) {
project.transformations.add(s);
} else {
throw new RuntimeException("Can't apply '" + s + "': you have a lock for cell '" + lock.value + "' and you're attempting to modify cell '" + value + "'.");
}
}
}
}
}
txn.commit(); txn.commit();
} finally { } finally {
if (txn != null) { if (txn != null) {
@ -237,48 +330,71 @@ public class GridworksBrokerImpl extends GridworksBroker {
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
protected void getProject(HttpServletResponse response, String pid) throws Exception { @Override
protected void openProject(HttpServletResponse response, String pid) throws Exception {
Project project = getProject(pid); Project project = getProject(pid);
Writer w = response.getWriter(); Writer w = response.getWriter();
JSONWriter writer = new JSONWriter(w); JSONWriter writer = new JSONWriter(w);
writer.object(); writer.object();
writer.key("data"); writer.value(project.data); writer.key("data"); writer.value(project.data);
writer.key("metadata"); writer.value(new JSONObject(project.metadata));
writer.key("transformations"); writer.key("transformations");
writer.array(); writer.array();
for (String s : project.transformations) { for (String s : project.transformations) {
writer.value(s); writer.value(new JSONObject(s));
} }
writer.endArray(); writer.endArray();
writer.endObject(); writer.endObject();
w.flush(); w.flush();
w.close(); w.close();
} }
protected void getHistory(HttpServletResponse response, String pid, int tindex) throws Exception { // ---------------------------------------------------------------------------------
Project project = getProject(pid);
@Override
protected void getState(HttpServletResponse response, String pid, String uid, int rev) throws Exception {
Project project = getProject(pid);
Writer w = response.getWriter(); Writer w = response.getWriter();
JSONWriter writer = new JSONWriter(w); JSONWriter writer = new JSONWriter(w);
writer.object(); writer.object();
writer.key("transformations"); writer.key("transformations");
writer.array(); writer.array();
int size = project.transformations.size(); int size = project.transformations.size();
for (int i = tindex; i < size; i++) { for (int i = rev; i < size; i++) {
writer.value(project.transformations.get(i)); writer.value(project.transformations.get(i));
} }
writer.endArray(); writer.endArray();
writer.endObject(); writer.endObject();
w.flush();
w.close();
}
EntityCursor<Lock> cursor = locksByProject.subIndex(pid).entities();
try {
writer.object();
writer.key("locks");
writer.array();
for (Lock lock : cursor) {
writer.value(lockToJSON(lock));
}
writer.endArray();
writer.endObject();
w.flush();
w.close();
} finally {
cursor.close();
}
}
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
Project getProject(String pid) { Project getProject(String pid) {
Project project = projectById.get(pid); Project project = projectById.get(pid);
if (project == null) { if (project == null) {
throw new RuntimeException("Project '" + pid + "' is not managed by this broker"); throw new RuntimeException("Project '" + pid + "' could not be found: are you sure is not managed by another broker?");
} }
return project; return project;
} }
@ -290,39 +406,63 @@ public class GridworksBrokerImpl extends GridworksBroker {
String pid; String pid;
List<String> transformations = new ArrayList<String>(); List<String> transformations = new ArrayList<String>();
byte[] data;
String data; String metadata;
int rev;
Project(String pid, String data) { Project(String pid, byte[] data, String metadata, int rev) {
this.pid = pid; this.pid = pid;
this.data = data; this.data = data;
this.metadata = metadata;
this.rev = rev;
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
private Project() {} private Project() {}
} }
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------
Lock getLock(String pid) { Lock getLock(String lid, String pid, String uid) {
return lockByProject.get(pid); Lock lock = lockById.get(lid);
checkLock(lock, lid, pid, uid);
return lock;
} }
void checkLock(String pid, String uid, String lid) { void checkLock(Lock lock, String lid, String pid, String uid) {
Lock lock = getLock(pid);
if (lock == null) { if (lock == null) {
throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it"); throw new RuntimeException("No lock was found with the given Lock id '" + lid + "', you have to have a valid lock on a project in order to start it");
} }
if (!lock.pid.equals(pid)) { if (!lock.pid.equals(pid)) {
throw new RuntimeException("Lock '" + lid + "' is for another project: " + pid); throw new RuntimeException("Lock '" + lock.id + "' is for another project: " + lock.pid);
} }
if (!lock.uid.equals(uid)) { if (!lock.uid.equals(uid)) {
throw new RuntimeException("Lock '" + lid + "' is owned by another user: " + uid); throw new RuntimeException("Lock '" + lock.id + "' is owned by another user: " + lock.uid);
} }
} }
Lock getLock(String pid, String uid, int locktype) {
Lock lock = null;
EntityCursor<Lock> cursor = locksByProject.subIndex(pid).entities();
try {
for (Lock l : cursor) {
if (uid.equals(l.uid) && (locktype == l.type)) {
lock = l;
break;
}
}
} finally {
cursor.close();
}
return lock;
}
JSONObject lockToJSON(Lock lock) throws JSONException { JSONObject lockToJSON(Lock lock) throws JSONException {
JSONObject o = new JSONObject(); JSONObject o = new JSONObject();
@ -339,18 +479,25 @@ public class GridworksBrokerImpl extends GridworksBroker {
static class Lock { static class Lock {
@PrimaryKey @PrimaryKey
String pid;
String id; String id;
@SecondaryKey(relate=MANY_TO_ONE)
String pid;
String uid; String uid;
int type;
String value;
long timestamp; long timestamp;
Lock(String id, String pid, String uid) { Lock(String id, String pid, String uid, int type, String value) {
this.id = id; this.id = id;
this.pid = pid; this.pid = pid;
this.uid = uid; this.uid = uid;
this.type = type;
this.value = value;
this.timestamp = System.currentTimeMillis(); this.timestamp = System.currentTimeMillis();
} }