First pass implementation of new Fusion Tables API - fixes #539

This commit is contained in:
Tom Morris 2013-02-11 16:33:51 -05:00
parent 2464421014
commit 7fb95ebbad
5 changed files with 213 additions and 227 deletions

View File

@ -29,20 +29,18 @@
package com.google.refine.extension.gdata;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;
import com.google.gdata.client.GoogleService;
import com.google.gdata.client.Service.GDataRequest;
import com.google.gdata.client.Service.GDataRequest.RequestType;
import com.google.gdata.util.ContentType;
import com.google.gdata.util.ServiceException;
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.services.fusiontables.Fusiontables;
import com.google.api.services.fusiontables.Fusiontables.Query.Sql;
import com.google.api.services.fusiontables.Fusiontables.Query.SqlGet;
import com.google.api.services.fusiontables.Fusiontables.Table.ImportRows;
import com.google.api.services.fusiontables.model.FusiontablesImport;
import com.google.api.services.fusiontables.model.Sqlresponse;
/**
* @author Tom Morris <tfmorris@gmail.com>
@ -51,54 +49,83 @@ import com.google.gdata.util.ServiceException;
public class FusionTableHandler {
final static private String FUSION_TABLES_SERVICE_URL =
"https://www.google.com/fusiontables/api/query";
// "https://www.googleapis.com/fusiontables/v1/query";
final static private Pattern CSV_VALUE_PATTERN =
Pattern.compile("([^,\\r\\n\"]*|\"(([^\"]*\"\")*[^\"]*)\")(,|\\r?\\n)");
static private Sqlresponse executeQuery (Fusiontables service, String query)
throws IOException {
Sql sql = service.query().sql(query);
Sqlresponse response = sql.execute();
return response;
}
static public GDataRequest createFusionTablesPostRequest(
GoogleService service, RequestType requestType, String query)
throws IOException, ServiceException {
URL url = new URL(FUSION_TABLES_SERVICE_URL);
GDataRequest request = service.getRequestFactory().getRequest(
requestType, url, new ContentType("application/x-www-form-urlencoded"));
OutputStreamWriter writer =
new OutputStreamWriter(request.getRequestStream());
writer.append("sql=" + URLEncoder.encode(query, "UTF-8") + "&alt=csv");
writer.flush();
writer.close();
return request;
static String createTable(Fusiontables service, String name, List<String> columnNames) throws IOException {
StringBuffer sb = new StringBuffer();
sb.append("CREATE TABLE '");
sb.append(name);
sb.append("' (");
boolean first = true;
for (String columnName : columnNames) {
if (first) {
first = false;
} else {
sb.append(',');
}
sb.append("'");
sb.append(columnName);
sb.append("': STRING");
}
sb.append(")");
String createQuery = sb.toString();
Sqlresponse response = executeQuery(service, createQuery);
// response.getTableId(); // FIXME: Oh wait, there's no such F*ing method!!!
return getTableId(response);
}
static public GDataRequest createFusionTablesRequest(
GoogleService service, RequestType requestType, String query)
throws IOException, ServiceException {
URL url = new URL(FUSION_TABLES_SERVICE_URL + "?sql=" +
URLEncoder.encode(query, "UTF-8")+"&alt=csv");
return service.getRequestFactory().getRequest(
requestType, url, ContentType.TEXT_PLAIN);
private static String getTableId(Sqlresponse response) {
List<Object> row = response.getRows().get(0);
int i = 0;
for (String colname : response.getColumns()) {
if ("tableid".equals(colname)) {
return (String) row.get(i);
}
}
return null;
}
/**
* Insert a set of rows and optionally return the IDs of the new rows.
*
* @param service a Fusiontables object
* @param sql SQL statement to do the inserts
* @param returnIds true to return the IDs of the newly inserted rows
* @return
* @throws IOException
*/
static Long insertRows(Fusiontables service, String tableId, AbstractInputStreamContent mediaContent) throws IOException {
ImportRows importRows = service.table().importRows(tableId, mediaContent);
importRows.setIsStrict(false);
FusiontablesImport response = importRows.execute();
return response.getNumRowsReceived();
}
static String getFusionTableKey(URL url) {
String tableId = getParamValue(url,"dsrcid");
// TODO: Any special id format considerations to worry about?
// if (tableId.startsWith("p") || !tableId.contains(".")) {
// return tableId;
// }
String tableId = getParamValue(url,"dsrcid"); // old style phased out
if (tableId == null || tableId.isEmpty()) {
tableId = getParamValue(url,"docid");
}
return tableId;
}
static public GoogleService getFusionTablesGoogleService(String token) {
GoogleService service = new GoogleService("fusiontables", GDataExtension.SERVICE_APP_NAME);
if (token != null) {
service.setAuthSubToken(token);
}
return service;
static public Fusiontables getFusionTablesService(String token) {
Credential credential = new GoogleCredential().setAccessToken(token);
Fusiontables fusiontables = new Fusiontables.Builder(
GDataExtension.HTTP_TRANSPORT, GDataExtension.JSON_FACTORY, credential)
.setApplicationName(GDataExtension.SERVICE_APP_NAME)
.build();;
return fusiontables;
}
static boolean isFusionTableURL(URL url) {
@ -109,60 +136,17 @@ public class FusionTableHandler {
}
return url.getHost().endsWith(".google.com")
&& url.getPath().startsWith("/fusiontables/DataSource")
&& query.contains("dsrcid=");
&& (query.contains("dsrcid=")||query.contains("docid="));
}
static public List<List<String>> parseFusionTablesResults(GDataRequest request) throws IOException {
List<List<String>> rows = new ArrayList<List<String>>();
List<String> row = null;
static Sqlresponse runFusionTablesSelect(Fusiontables service, String selectQuery)
throws IOException {
Scanner scanner = new Scanner(request.getResponseStream(), "UTF-8");
while (scanner.hasNextLine()) {
scanner.findWithinHorizon(CSV_VALUE_PATTERN, 0);
MatchResult match = scanner.match();
String quotedString = match.group(2);
String decoded = quotedString == null ? match.group(1) : quotedString.replaceAll("\"\"", "\"");
if (row == null) {
row = new ArrayList<String>();
}
row.add(decoded);
if (!match.group(4).equals(",")) {
if (row != null) {
rows.add(row);
row = null;
}
}
}
scanner.close();
if (row != null) {
rows.add(row);
}
return rows;
}
static public List<List<String>> listTables(GoogleService service) throws IOException, ServiceException {
List<List<String>> rows = runFusionTablesSelect(service, "SHOW TABLES");
// Format is id, name to which we append a link URL based on ID
if (rows.size() > 1) { // excluding headers
for (int i = 1; i < rows.size(); i++) {
List<String> row = rows.get(i);
if (row.size() >= 2) {
String id = row.get(0);
row.add("https://www.google.com/fusiontables/DataSource?docid=" + id);
}
}
}
return rows;
}
static public List<List<String>> runFusionTablesSelect(GoogleService service, String selectQuery)
throws IOException, ServiceException {
GDataRequest request = createFusionTablesRequest(service, RequestType.QUERY, selectQuery);
request.execute();
return parseFusionTablesResults(request);
// FIXME: alt=csv doesn't actually work! It will attempt to parse response as JSON and die
// perhaps use .executeUnparsed() would work?
SqlGet query = service.query().sqlGet(selectQuery);//.setAlt("csv");
Sqlresponse response = query.execute();
return response;
}
static private String getParamValue(URL url, String key) {

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, Thomas F. Morris
* Copyright (c) 2010,2013 Thomas F. Morris and other contributors
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -34,8 +34,10 @@ import java.util.List;
import org.json.JSONObject;
import com.google.gdata.client.GoogleService;
import com.google.gdata.util.ServiceException;
import com.google.api.services.fusiontables.Fusiontables;
import com.google.api.services.fusiontables.model.Column;
import com.google.api.services.fusiontables.model.Sqlresponse;
import com.google.api.services.fusiontables.model.Table;
import com.google.refine.ProjectMetadata;
import com.google.refine.importers.TabularImportingParserBase;
@ -61,7 +63,7 @@ public class FusionTableImporter {
JSONObject options,
List<Exception> exceptions) {
GoogleService service = FusionTableHandler.getFusionTablesGoogleService(token);
Fusiontables service = FusionTableHandler.getFusionTablesService(token);
parse(
service,
project,
@ -89,7 +91,7 @@ public class FusionTableImporter {
final ImportingJob job;
final String fileSource;
final GoogleService service;
final Fusiontables service;
final List<FTColumnData> columns;
final int batchSize;
@ -102,7 +104,7 @@ public class FusionTableImporter {
boolean usedHeaders = false;
public FusionTableBatchRowReader(ImportingJob job, String fileSource,
GoogleService service, String tableId, List<FTColumnData> columns,
Fusiontables service, String tableId, List<FTColumnData> columns,
int batchSize) {
this.job = job;
this.fileSource = fileSource;
@ -143,14 +145,10 @@ public class FusionTableImporter {
if (rowsOfCells == null || (nextRow >= batchRowStart + rowsOfCells.size() && !end)) {
int newBatchRowStart = batchRowStart + (rowsOfCells == null ? 0 : rowsOfCells.size());
try {
rowsOfCells = getRowsOfCells(newBatchRowStart);
batchRowStart = newBatchRowStart;
GDataImporter.setProgress(job, fileSource, -1 /* batchRowStart * 100 / totalRows */);
} catch (ServiceException e) {
throw new IOException(e);
}
rowsOfCells = getRowsOfCells(newBatchRowStart);
batchRowStart = newBatchRowStart;
GDataImporter.setProgress(job, fileSource, -1 /* batchRowStart * 100 / totalRows */);
}
if (rowsOfCells != null && nextRow - batchRowStart < rowsOfCells.size()) {
@ -161,18 +159,19 @@ public class FusionTableImporter {
}
private List<List<Object>> getRowsOfCells(int startRow) throws IOException, ServiceException {
private List<List<Object>> getRowsOfCells(int startRow) throws IOException {
List<List<Object>> rowsOfCells = new ArrayList<List<Object>>(batchSize);
String query = baseQuery + " OFFSET " + startRow + " LIMIT " + batchSize;
List<List<String>> rows = FusionTableHandler.runFusionTablesSelect(service, query);
Sqlresponse sqlresponse = FusionTableHandler.runFusionTablesSelect(service, query);
List<List<Object>> rows = sqlresponse.getRows();
if (rows.size() > 1) {
for (int i = 1; i < rows.size(); i++) {
List<String> row = rows.get(i);
List<Object> row = rows.get(i);
List<Object> rowOfCells = new ArrayList<Object>(row.size());
for (int j = 0; j < row.size() && j < columns.size(); j++) {
String text = row.get(j);
String text = (String)row.get(j);
if (text.isEmpty()) {
rowOfCells.add(null);
} else {
@ -208,7 +207,7 @@ public class FusionTableImporter {
}
static public void parse(
GoogleService service,
Fusiontables service,
Project project,
ProjectMetadata metadata,
final ImportingJob job,
@ -222,37 +221,34 @@ public class FusionTableImporter {
try {
List<FTColumnData> columns = new ArrayList<FusionTableImporter.FTColumnData>();
List<List<String>> rows = FusionTableHandler.runFusionTablesSelect(service, "DESCRIBE " + id);
if (rows.size() > 1) {
for (int i = 1; i < rows.size(); i++) {
List<String> row = rows.get(i);
if (row.size() >= 2) {
FTColumnData cd = new FTColumnData();
cd.name = row.get(1);
cd.type = FTColumnType.STRING;
if (row.size() > 2) {
String type = row.get(2).toLowerCase();
if (type.equals("number")) {
cd.type = FTColumnType.NUMBER;
} else if (type.equals("datetime")) {
cd.type = FTColumnType.DATETIME;
} else if (type.equals("location")) {
cd.type = FTColumnType.LOCATION;
}
}
columns.add(cd);
}
Table table = service.table().get(id).execute();
for (Column col : table.getColumns()) {
FTColumnData cd = new FTColumnData();
cd.name = col.getName();
String type = col.getType();
if (type.equals("STRING")) {
cd.type = FTColumnType.STRING;
} else if (type.equals("NUMBER")) {
cd.type = FTColumnType.NUMBER;
} else if (type.equals("DATETIME")) {
cd.type = FTColumnType.DATETIME;
} else if (type.equals("LOCATION")) {
cd.type = FTColumnType.LOCATION;
} else {
// TODO: unknown type
cd.type = FTColumnType.STRING;
}
setProgress(job, docUrlString, -1);
// Force these options for the next call because each fusion table
// is strictly structured with a single line of headers.
JSONUtilities.safePut(options, "ignoreLines", 0); // number of blank lines at the beginning to ignore
JSONUtilities.safePut(options, "headerLines", 1); // number of header lines
TabularImportingParserBase.readTable(
columns.add(cd);
}
setProgress(job, docUrlString, -1);
// Force these options for the next call because each fusion table
// is strictly structured with a single line of headers.
JSONUtilities.safePut(options, "ignoreLines", 0); // number of blank lines at the beginning to ignore
JSONUtilities.safePut(options, "headerLines", 1); // number of header lines
TabularImportingParserBase.readTable(
project,
metadata,
job,
@ -262,14 +258,10 @@ public class FusionTableImporter {
options,
exceptions
);
setProgress(job, docUrlString, 100);
}
setProgress(job, docUrlString, 100);
} catch (IOException e) {
e.printStackTrace();
exceptions.add(e);
} catch (ServiceException e) {
e.printStackTrace();
exceptions.add(e);
}
}

View File

@ -6,15 +6,16 @@ import java.util.List;
import org.json.JSONObject;
import com.google.gdata.client.GoogleService;
import com.google.gdata.client.Service.GDataRequest;
import com.google.gdata.client.Service.GDataRequest.RequestType;
import com.google.gdata.util.ServiceException;
import com.google.api.client.http.AbstractInputStreamContent;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.HttpResponseException;
import com.google.api.services.fusiontables.Fusiontables;
import com.google.refine.exporters.TabularSerializer;
final class FusionTableSerializer implements TabularSerializer {
GoogleService service;
private static final int BATCH_SIZE = 20;
Fusiontables service;
String tableName;
List<Exception> exceptions;
@ -23,7 +24,7 @@ final class FusionTableSerializer implements TabularSerializer {
StringBuffer sbBatch;
int rows;
FusionTableSerializer(GoogleService service, String tableName, List<Exception> exceptions) {
FusionTableSerializer(Fusiontables service, String tableName, List<Exception> exceptions) {
this.service = service;
this.tableName = tableName;
this.exceptions = exceptions;
@ -44,68 +45,75 @@ final class FusionTableSerializer implements TabularSerializer {
public void addRow(List<CellData> cells, boolean isHeader) {
if (isHeader) {
columnNames = new ArrayList<String>(cells.size());
StringBuffer sb = new StringBuffer();
sb.append("CREATE TABLE '");
sb.append(tableName);
sb.append("' (");
boolean first = true;
for (CellData cellData : cells) {
columnNames.add(cellData.text);
if (first) {
first = false;
} else {
sb.append(',');
}
sb.append("'");
sb.append(cellData.text);
sb.append("': STRING");
}
sb.append(")");
}
try {
String createQuery = sb.toString();
GDataRequest createTableRequest = FusionTableHandler.createFusionTablesPostRequest(
service, RequestType.INSERT, createQuery);
createTableRequest.execute();
List<List<String>> createTableResults =
FusionTableHandler.parseFusionTablesResults(createTableRequest);
if (createTableResults != null && createTableResults.size() == 2) {
tableId = createTableResults.get(1).get(0);
}
tableId = FusionTableHandler.createTable(service, tableName, columnNames);
} catch (Exception e) {
tableId = null;
exceptions.add(e);
}
} else if (tableId != null) {
if (sbBatch == null) {
sbBatch = new StringBuffer();
}
formulateInsert(cells, sbBatch);
formatCsv(cells, sbBatch);
rows++;
if (rows % 20 == 0) {
sendBatch();
if (rows % BATCH_SIZE == 0) {
if (!sendBatch()) {
return;
}
}
}
}
private void sendBatch() {
private boolean sendBatch() {
try {
GDataRequest createTableRequest = FusionTableHandler.createFusionTablesPostRequest(
service, RequestType.INSERT, sbBatch.toString());
createTableRequest.execute();
// FIXME: text/csv doesn't work even though that's what the content is
AbstractInputStreamContent content = ByteArrayContent.fromString("application/octet-stream", sbBatch.toString());
// AbstractInputStreamContent content = new InputStreamContent("application/octet-stream",
// // TODO: we really want to do GZIP compression here
// new ByteArrayInputStream(sbBatch.toString().getBytes("UTF-8")));
Long count = FusionTableHandler.insertRows(service, tableId, content);
if (count != BATCH_SIZE) {
exceptions.add(new IOException("only imported %d of %d rows"));
}
} catch (IOException e) {
exceptions.add(e);
} catch (ServiceException e) {
exceptions.add(e);
if (e instanceof HttpResponseException) {
int code = ((HttpResponseException)e).getStatusCode();
if (code >= 400 && code < 500) {
return false;
}
// 500s appear to be retried automatically by li
}
} finally {
sbBatch = null;
}
return true;
}
private void formatCsv(List<CellData> cells, StringBuffer sb) {
boolean first = true;
for (int i = 0; i < cells.size() && i < columnNames.size(); i++) {
CellData cellData = cells.get(i);
if (!first) {
sb.append(',');
} else {
first = false;
}
sb.append("\"");
if (cellData != null && cellData.text != null) {
sb.append(cellData.text.replaceAll("\"", "\\\\\""));
}
sb.append("\"");
}
sb.append("\n");
}
// Old-style SQL INSERT can be removed once we're sure importRows will work
private void formulateInsert(List<CellData> cells, StringBuffer sb) {
StringBuffer sbColumnNames = new StringBuffer();
StringBuffer sbValues = new StringBuffer();

View File

@ -49,7 +49,9 @@ import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONWriter;
import com.google.gdata.client.GoogleService;
import com.google.api.services.fusiontables.Fusiontables;
import com.google.api.services.fusiontables.model.Table;
import com.google.api.services.fusiontables.model.TableList;
import com.google.gdata.client.docs.DocsService;
import com.google.gdata.client.spreadsheet.FeedURLFactory;
import com.google.gdata.client.spreadsheet.SpreadsheetService;
@ -59,6 +61,7 @@ import com.google.gdata.data.spreadsheet.SpreadsheetEntry;
import com.google.gdata.data.spreadsheet.SpreadsheetFeed;
import com.google.gdata.data.spreadsheet.WorksheetEntry;
import com.google.gdata.data.spreadsheet.WorksheetFeed;
import com.google.gdata.util.AuthenticationException;
import com.google.gdata.util.ServiceException;
import com.google.refine.ProjectManager;
@ -125,13 +128,15 @@ public class GDataImportingController implements ImportingController {
try {
listSpreadsheets(GDataExtension.getDocsService(token), writer);
listFusionTables(FusionTableHandler.getFusionTablesGoogleService(token), writer);
listFusionTables(FusionTableHandler.getFusionTablesService(token), writer);
} catch (AuthenticationException e) {
TokenCookie.deleteToken(request, response);
} catch (ServiceException e) {
e.printStackTrace();
} finally {
writer.endArray();
writer.endObject();
}
writer.endArray();
writer.endObject();
} catch (JSONException e) {
throw new ServletException(e);
} finally {
@ -167,27 +172,24 @@ public class GDataImportingController implements ImportingController {
}
}
private void listFusionTables(GoogleService service, JSONWriter writer)
private void listFusionTables(Fusiontables service, JSONWriter writer)
throws IOException, ServiceException, JSONException {
List<List<String>> rows = FusionTableHandler.listTables(service);
if (rows.size() > 1) { // excluding headers
for (int i = 1; i < rows.size(); i++) {
List<String> row = rows.get(i);
if (row.size() >= 2) {
String id = row.get(0);
String name = row.get(1);
String link = row.get(2);
writer.object();
writer.key("docId"); writer.value(id);
writer.key("docLink"); writer.value(link);
writer.key("docSelfLink"); writer.value(link);
writer.key("title"); writer.value(name);
writer.key("type"); writer.value("table");
writer.endObject();
}
}
Fusiontables.Table.List listTables = service.table().list();
TableList tablelist = listTables.execute();
for (Table table : tablelist.getItems()) {
String id = table.getTableId();
String name = table.getName();
String link = "https://www.google.com/fusiontables/DataSource?docid=" + id;
// Add JSON object to our stream
writer.object();
writer.key("docId"); writer.value(id);
writer.key("docLink"); writer.value(link);
writer.key("docSelfLink"); writer.value(link);
writer.key("title"); writer.value(name);
writer.key("type"); writer.value("table");
writer.endObject();
}
}

View File

@ -46,7 +46,6 @@ import org.json.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gdata.client.GoogleService;
import com.google.gdata.client.docs.DocsService;
import com.google.gdata.client.spreadsheet.CellQuery;
import com.google.gdata.client.spreadsheet.SpreadsheetService;
@ -292,8 +291,9 @@ public class UploadCommand extends Command {
static private String uploadFusionTable(
Project project, final Engine engine, final Properties params,
String token, String name, List<Exception> exceptions) {
GoogleService service = FusionTableHandler.getFusionTablesGoogleService(token);
FusionTableSerializer serializer = new FusionTableSerializer(service, name, exceptions);
FusionTableSerializer serializer = new FusionTableSerializer(
FusionTableHandler.getFusionTablesService(token), name, exceptions);
CustomizableTabularExporterUtilities.exportRows(
project, engine, params, serializer);