- archive and compressed file importer (supports zip, tar, gz, bz2, tar.gz and tar.bz2)

(works by loading the files that have the most common extensions in the archive)
- changed default max heap to 3Gb


git-svn-id: http://google-refine.googlecode.com/svn/trunk@381 7d457c2a-affb-35e4-300a-418c747d4874
This commit is contained in:
Stefano Mazzocchi 2010-04-04 07:48:47 +00:00
parent 65c5aea079
commit 798b2a36ca
5 changed files with 207 additions and 141 deletions

View File

@ -380,7 +380,7 @@ fi
add_option "$JAVA_OPTIONS" add_option "$JAVA_OPTIONS"
if [ "$GRIDWORKS_MEMORY" == "" ] ; then if [ "$GRIDWORKS_MEMORY" == "" ] ; then
GRIDWORKS_MEMORY="1024M" GRIDWORKS_MEMORY="3072M"
fi fi
add_option "-Xms256M -Xmx$GRIDWORKS_MEMORY" add_option "-Xms256M -Xmx$GRIDWORKS_MEMORY"

View File

@ -125,7 +125,7 @@ set JAVA_OPTIONS=
set OPTS=%OPTS% %JAVA_OPTIONS% set OPTS=%OPTS% %JAVA_OPTIONS%
if not "%GRIDWORKS_MEMORY%" == "" goto gotMemory if not "%GRIDWORKS_MEMORY%" == "" goto gotMemory
set GRIDWORKS_MEMORY=1024M set GRIDWORKS_MEMORY=3072M
:gotMemory :gotMemory
set OPTS=%OPTS% -Xms256M -Xmx%GRIDWORKS_MEMORY% set OPTS=%OPTS% -Xms256M -Xmx%GRIDWORKS_MEMORY%

View File

@ -4,6 +4,7 @@ import java.io.BufferedInputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
@ -18,9 +19,12 @@ import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -108,7 +112,13 @@ public class CreateProjectCommand extends Command {
if (part.isFile()) { if (part.isFile()) {
FilePart filePart = (FilePart) part; FilePart filePart = (FilePart) part;
internalImportFile(project, options, filePart.getFileName(), filePart.getInputStream()); InputStream stream = filePart.getInputStream();
String name = filePart.getFileName().toLowerCase();
try {
internalImportFile(project, options, name, stream);
} finally {
stream.close();
}
} else if (part.isParam()) { } else if (part.isParam()) {
ParamPart paramPart = (ParamPart) part; ParamPart paramPart = (ParamPart) part;
@ -135,6 +145,25 @@ public class CreateProjectCommand extends Command {
} }
} }
class SafeInputStream extends FilterInputStream {
public SafeInputStream(InputStream stream) {
super(stream);
}
@Override
public void close() {
// some libraries attempt to close the input stream while they can't
// read anymore from it... unfortunately this behavior prevents
// the zip input stream from functioning correctly so we just have
// to ignore those close() calls and just close it ourselves
// forcefully later
}
public void reallyClose() throws IOException {
super.close();
}
}
protected void internalImportFile( protected void internalImportFile(
Project project, Project project,
Properties options, Properties options,
@ -142,7 +171,10 @@ public class CreateProjectCommand extends Command {
InputStream inputStream InputStream inputStream
) throws Exception { ) throws Exception {
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tar.bz2")) { Gridworks.info("Importing " + fileName + "");
if (fileName.endsWith(".zip") || fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz") || fileName.endsWith(".tar.bz2")) {
// first, save the file on disk, since we need two passes and we might // first, save the file on disk, since we need two passes and we might
// not have enough memory to keep it all in there // not have enough memory to keep it all in there
File file = save(inputStream); File file = save(inputStream);
@ -150,24 +182,33 @@ public class CreateProjectCommand extends Command {
// in the first pass, gather statistics about what files are in there // in the first pass, gather statistics about what files are in there
// unfortunately, we have to rely on files extensions, which is horrible but // unfortunately, we have to rely on files extensions, which is horrible but
// better than nothing // better than nothing
BufferedInputStream stream = new BufferedInputStream(new FileInputStream(file));
InputStream is = (fileName.endsWith(".tar.gz")) ? new GZIPInputStream(stream): new CBZip2InputStream(stream);
TarInputStream tis = new TarInputStream(is);
HashMap<String,Integer> ext_map = new HashMap<String,Integer>(); HashMap<String,Integer> ext_map = new HashMap<String,Integer>();
while (true) {
TarEntry entry = tis.getNextEntry(); InputStream is = getStream(fileName, new FileInputStream(file));
if (entry == null) break;
if (!entry.isDirectory()) { // NOTE(SM): unfortunately, java.io does not provide any generalized class for
String name = entry.getName(); // archive-like input streams so while both TarInputStream and ZipInputStream
String ext = getExtension(name)[1]; // behave precisely the same, there is no polymorphic behavior so we have
if (ext_map.containsKey(ext)) { // to treat each instance explicitly... one of those times you wish you had
ext_map.put(ext, ext_map.get(ext) + 1); // closures
} else { if (is instanceof TarInputStream) {
ext_map.put(ext, 1); TarInputStream tis = (TarInputStream) is;
TarEntry te;
while ((te = tis.getNextEntry()) != null) {
if (!te.isDirectory()) {
mapExtension(te.getName(),ext_map);
}
}
} else if (is instanceof ZipInputStream) {
ZipInputStream zis = (ZipInputStream) is;
ZipEntry ze;
while ((ze = zis.getNextEntry()) != null) {
if (!ze.isDirectory()) {
mapExtension(ze.getName(),ext_map);
} }
} }
} }
stream.close(); is.close();
// sort extensions by how often they appear // sort extensions by how often they appear
List<Entry<String,Integer>> values = new ArrayList<Entry<String,Integer>>(ext_map.entrySet()); List<Entry<String,Integer>> values = new ArrayList<Entry<String,Integer>>(ext_map.entrySet());
@ -193,17 +234,42 @@ public class CreateProjectCommand extends Command {
} }
} }
} }
Gridworks.log("**** Most frequent extensions: " + exts.toString()); Gridworks.log("Most frequent extensions: " + exts.toString());
// second pass, load the data for real
} else if (fileName.endsWith(".zip")) { is = getStream(fileName, new FileInputStream(file));
SafeInputStream sis = new SafeInputStream(is);
if (is instanceof TarInputStream) {
TarInputStream tis = (TarInputStream) is;
TarEntry te;
while ((te = tis.getNextEntry()) != null) {
if (!te.isDirectory()) {
String name = te.getName();
String ext = getExtension(name)[1];
if (exts.contains(ext)) {
internalImportFile(project, options, name, sis);
}
}
}
} else if (is instanceof ZipInputStream) {
ZipInputStream zis = (ZipInputStream) is;
ZipEntry ze;
while ((ze = zis.getNextEntry()) != null) {
if (!ze.isDirectory()) {
String name = ze.getName();
String ext = getExtension(name)[1];
if (exts.contains(ext)) {
internalImportFile(project, options, name, sis);
}
}
}
}
sis.reallyClose();
} else if (fileName.endsWith(".gz")) { } else if (fileName.endsWith(".gz")) {
String[] frags = getExtension(fileName); internalImportFile(project, options, getExtension(fileName)[0], new GZIPInputStream(inputStream));
internalImportFile(project, options, frags[0], new GZIPInputStream(inputStream));
} else if (fileName.endsWith(".bz2")) { } else if (fileName.endsWith(".bz2")) {
String[] frags = getExtension(fileName); internalImportFile(project, options, getExtension(fileName)[0], new CBZip2InputStream(inputStream));
internalImportFile(project, options, frags[0], new CBZip2InputStream(inputStream));
} else { } else {
load(project, options, fileName, inputStream); load(project, options, fileName, inputStream);
} }
@ -218,7 +284,6 @@ public class CreateProjectCommand extends Command {
private void load(Project project, Properties options, String fileName, InputStream inputStream) throws Exception { private void load(Project project, Properties options, String fileName, InputStream inputStream) throws Exception {
Importer importer = guessImporter(options, null, fileName); Importer importer = guessImporter(options, null, fileName);
internalInvokeImporter(project, importer, options, inputStream, null); internalInvokeImporter(project, importer, options, inputStream, null);
inputStream.close();
} }
private File save(InputStream is) throws IOException { private File save(InputStream is) throws IOException {
@ -229,6 +294,27 @@ public class CreateProjectCommand extends Command {
return temp; return temp;
} }
private void mapExtension(String name, Map<String,Integer> ext_map) {
String ext = getExtension(name)[1];
if (ext_map.containsKey(ext)) {
ext_map.put(ext, ext_map.get(ext) + 1);
} else {
ext_map.put(ext, 1);
}
}
private InputStream getStream(String fileName, InputStream is) throws IOException {
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
return new TarInputStream(new GZIPInputStream(is));
} else if (fileName.endsWith(".tar.bz2")) {
return new TarInputStream(new CBZip2InputStream(is));
} else if (fileName.endsWith(".zip")) {
return new ZipInputStream(is);
} else {
return null;
}
}
private String[] getExtension(String filename) { private String[] getExtension(String filename) {
String[] result = new String[2]; String[] result = new String[2];
int ext_index = filename.lastIndexOf("."); int ext_index = filename.lastIndexOf(".");
@ -296,39 +382,21 @@ public class CreateProjectCommand extends Command {
String encoding String encoding
) throws Exception { ) throws Exception {
int limit = -1; int limit = getIntegerOption("limit",options,-1);
int skip = 0; int skip = getIntegerOption("skip",options,0);
if (options.containsKey("limit")) { if (importer.takesReader()) {
String s = options.getProperty("limit");
try {
limit = Integer.parseInt(s);
} catch (Exception e) {
}
}
if (options.containsKey("skip")) {
String s = options.getProperty("skip");
try {
skip = Integer.parseInt(s);
} catch (Exception e) {
}
}
BufferedInputStream inputStream = new BufferedInputStream(rawInputStream); BufferedInputStream inputStream = new BufferedInputStream(rawInputStream);
if (importer.takesReader()) { // NOTE(SM): The ICU4J char detection code requires the input stream to support mark/reset.
/* // Unfortunately, not all ServletInputStream implementations are marking, so we need do
* NOTE(SM): The ICU4J char detection code requires the input stream to support mark/reset. // this memory-expensive wrapping to make it work. It's far from ideal but I don't have
* Unfortunately, not all ServletInputStream implementations are marking, so we need do // a more efficient solution.
* this memory-expensive wrapping to make it work. It's far from ideal but I don't have
* a more efficient solution.
*/
byte[] bytes = new byte[1024 * 4]; byte[] bytes = new byte[1024 * 4];
{
inputStream.mark(bytes.length); inputStream.mark(bytes.length);
inputStream.read(bytes); inputStream.read(bytes);
inputStream.reset(); inputStream.reset();
}
CharsetDetector detector = new CharsetDetector(); CharsetDetector detector = new CharsetDetector();
detector.setDeclaredEncoding("utf8"); // most of the content on the web is encoded in UTF-8 so start with that detector.setDeclaredEncoding("utf8"); // most of the content on the web is encoded in UTF-8 so start with that
@ -361,7 +429,7 @@ public class CreateProjectCommand extends Command {
importer.read(reader, project, options, skip, limit); importer.read(reader, project, options, skip, limit);
} else { } else {
importer.read(inputStream, project, options, skip, limit); importer.read(rawInputStream, project, options, skip, limit);
} }
} }
@ -372,23 +440,8 @@ public class CreateProjectCommand extends Command {
Reader reader Reader reader
) throws Exception { ) throws Exception {
int limit = -1; int limit = getIntegerOption("limit",options,-1);
int skip = 0; int skip = getIntegerOption("skip",options,0);
if (options.containsKey("limit")) {
String s = options.getProperty("limit");
try {
limit = Integer.parseInt(s);
} catch (Exception e) {
}
}
if (options.containsKey("skip")) {
String s = options.getProperty("skip");
try {
skip = Integer.parseInt(s);
} catch (Exception e) {
}
}
importer.read(reader, project, options, skip, limit); importer.read(reader, project, options, skip, limit);
} }
@ -434,4 +487,16 @@ public class CreateProjectCommand extends Command {
return new TsvCsvImporter(); return new TsvCsvImporter();
} }
private int getIntegerOption(String name, Properties options, int def) {
int value = def;
if (options.containsKey(name)) {
String s = options.getProperty(name);
try {
value = Integer.parseInt(s);
} catch (Exception e) {
}
}
return value;
}
} }

View File

@ -21,7 +21,6 @@ public class TsvCsvImporter implements Importer {
throws Exception { throws Exception {
LineNumberReader lnReader = new LineNumberReader(reader); LineNumberReader lnReader = new LineNumberReader(reader);
try {
String sep = options.getProperty("separator"); // auto-detect if not present String sep = options.getProperty("separator"); // auto-detect if not present
String line = null; String line = null;
boolean first = true; boolean first = true;
@ -78,9 +77,6 @@ public class TsvCsvImporter implements Importer {
} }
} }
} }
} finally {
lnReader.close();
}
} }
public void read(InputStream inputStream, Project project, public void read(InputStream inputStream, Project project,

View File

@ -1,8 +1,8 @@
package com.metaweb.gridworks.importers; package com.metaweb.gridworks.importers;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.io.PushbackInputStream;
import java.io.Reader; import java.io.Reader;
import java.util.Properties; import java.util.Properties;
@ -13,6 +13,8 @@ import com.metaweb.gridworks.model.Project;
public class XmlImporter implements Importer { public class XmlImporter implements Importer {
public static final int BUFFER_SIZE = 64 * 1024;
public boolean takesReader() { public boolean takesReader() {
return false; return false;
} }
@ -30,29 +32,32 @@ public class XmlImporter implements Importer {
int skip, int skip,
int limit int limit
) throws Exception { ) throws Exception {
BufferedInputStream bis = new BufferedInputStream(inputStream); PushbackInputStream pis = new PushbackInputStream(inputStream,BUFFER_SIZE);
String[] recordPath = null; String[] recordPath = null;
{ {
byte[] buffer = new byte[64 * 1024]; byte[] buffer = new byte[BUFFER_SIZE];
int bytes_read = 0;
bis.mark(buffer.length); while (bytes_read < BUFFER_SIZE) {
int c = bis.read(buffer); int c = pis.read(buffer, bytes_read, BUFFER_SIZE - bytes_read);
bis.reset(); if (c == -1) break;
bytes_read +=c ;
}
pis.unread(buffer, 0, bytes_read);
if (options.containsKey("importer-record-tag")) { if (options.containsKey("importer-record-tag")) {
recordPath = XmlImportUtilities.detectPathFromTag( recordPath = XmlImportUtilities.detectPathFromTag(
new ByteArrayInputStream(buffer, 0, c), new ByteArrayInputStream(buffer, 0, bytes_read),
options.getProperty("importer-record-tag")); options.getProperty("importer-record-tag"));
} else { } else {
recordPath = XmlImportUtilities.detectRecordElement( recordPath = XmlImportUtilities.detectRecordElement(
new ByteArrayInputStream(buffer, 0, c)); new ByteArrayInputStream(buffer, 0, bytes_read));
} }
} }
ImportColumnGroup rootColumnGroup = new ImportColumnGroup(); ImportColumnGroup rootColumnGroup = new ImportColumnGroup();
XmlImportUtilities.importXml(bis, project, recordPath, rootColumnGroup); XmlImportUtilities.importXml(pis, project, recordPath, rootColumnGroup);
XmlImportUtilities.createColumnsFromImport(project, rootColumnGroup); XmlImportUtilities.createColumnsFromImport(project, rootColumnGroup);
project.columnModel.update(); project.columnModel.update();