From 798b2a36ca788eae838fec6509aaae1171f3c99c Mon Sep 17 00:00:00 2001 From: Stefano Mazzocchi Date: Sun, 4 Apr 2010 07:48:47 +0000 Subject: [PATCH] - archive and compressed file importer (supports zip, tar, gz, bz2, tar.gz and tar.bz2) (works by loading the files that have the most common extensions in the archive) - changed default max heap to 3Gb git-svn-id: http://google-refine.googlecode.com/svn/trunk@381 7d457c2a-affb-35e4-300a-418c747d4874 --- gridworks | 2 +- gridworks.bat | 2 +- .../commands/edit/CreateProjectCommand.java | 217 ++++++++++++------ .../gridworks/importers/TsvCsvImporter.java | 98 ++++---- .../gridworks/importers/XmlImporter.java | 29 ++- 5 files changed, 207 insertions(+), 141 deletions(-) diff --git a/gridworks b/gridworks index 345768d58..1ad66c473 100755 --- a/gridworks +++ b/gridworks @@ -380,7 +380,7 @@ fi add_option "$JAVA_OPTIONS" if [ "$GRIDWORKS_MEMORY" == "" ] ; then - GRIDWORKS_MEMORY="1024M" + GRIDWORKS_MEMORY="3072M" fi add_option "-Xms256M -Xmx$GRIDWORKS_MEMORY" diff --git a/gridworks.bat b/gridworks.bat index e4b46762b..9b0a9f44e 100644 --- a/gridworks.bat +++ b/gridworks.bat @@ -125,7 +125,7 @@ set JAVA_OPTIONS= set OPTS=%OPTS% %JAVA_OPTIONS% if not "%GRIDWORKS_MEMORY%" == "" goto gotMemory -set GRIDWORKS_MEMORY=1024M +set GRIDWORKS_MEMORY=3072M :gotMemory set OPTS=%OPTS% -Xms256M -Xmx%GRIDWORKS_MEMORY% diff --git a/src/main/java/com/metaweb/gridworks/commands/edit/CreateProjectCommand.java b/src/main/java/com/metaweb/gridworks/commands/edit/CreateProjectCommand.java index be77288c5..4c0d8a9a4 100644 --- a/src/main/java/com/metaweb/gridworks/commands/edit/CreateProjectCommand.java +++ b/src/main/java/com/metaweb/gridworks/commands/edit/CreateProjectCommand.java @@ -4,6 +4,7 @@ import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -18,9 +19,12 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Map.Entry; import java.util.zip.GZIPInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; @@ -108,7 +112,13 @@ public class CreateProjectCommand extends Command { if (part.isFile()) { FilePart filePart = (FilePart) part; - internalImportFile(project, options, filePart.getFileName(), filePart.getInputStream()); + InputStream stream = filePart.getInputStream(); + String name = filePart.getFileName().toLowerCase(); + try { + internalImportFile(project, options, name, stream); + } finally { + stream.close(); + } } else if (part.isParam()) { ParamPart paramPart = (ParamPart) part; @@ -134,6 +144,25 @@ public class CreateProjectCommand extends Command { } } } + + class SafeInputStream extends FilterInputStream { + public SafeInputStream(InputStream stream) { + super(stream); + } + + @Override + public void close() { + // some libraries attempt to close the input stream while they can't + // read anymore from it... unfortunately this behavior prevents + // the zip input stream from functioning correctly so we just have + // to ignore those close() calls and just close it ourselves + // forcefully later + } + + public void reallyClose() throws IOException { + super.close(); + } + } protected void internalImportFile( Project project, @@ -141,8 +170,11 @@ public class CreateProjectCommand extends Command { String fileName, InputStream inputStream ) throws Exception { - - if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tar.bz2")) { + + Gridworks.info("Importing " + fileName + ""); + + if (fileName.endsWith(".zip") || fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz") || fileName.endsWith(".tar.bz2")) { + // first, save the file on disk, since we need two passes and we might // not have enough memory to keep it all in there File file = save(inputStream); @@ -150,24 +182,33 @@ public class CreateProjectCommand extends Command { // in the first pass, gather statistics about what files are in there // unfortunately, we have to rely on files extensions, which is horrible but // better than nothing - BufferedInputStream stream = new BufferedInputStream(new FileInputStream(file)); - InputStream is = (fileName.endsWith(".tar.gz")) ? new GZIPInputStream(stream): new CBZip2InputStream(stream); - TarInputStream tis = new TarInputStream(is); HashMap ext_map = new HashMap(); - while (true) { - TarEntry entry = tis.getNextEntry(); - if (entry == null) break; - if (!entry.isDirectory()) { - String name = entry.getName(); - String ext = getExtension(name)[1]; - if (ext_map.containsKey(ext)) { - ext_map.put(ext, ext_map.get(ext) + 1); - } else { - ext_map.put(ext, 1); + + InputStream is = getStream(fileName, new FileInputStream(file)); + + // NOTE(SM): unfortunately, java.io does not provide any generalized class for + // archive-like input streams so while both TarInputStream and ZipInputStream + // behave precisely the same, there is no polymorphic behavior so we have + // to treat each instance explicitly... one of those times you wish you had + // closures + if (is instanceof TarInputStream) { + TarInputStream tis = (TarInputStream) is; + TarEntry te; + while ((te = tis.getNextEntry()) != null) { + if (!te.isDirectory()) { + mapExtension(te.getName(),ext_map); + } + } + } else if (is instanceof ZipInputStream) { + ZipInputStream zis = (ZipInputStream) is; + ZipEntry ze; + while ((ze = zis.getNextEntry()) != null) { + if (!ze.isDirectory()) { + mapExtension(ze.getName(),ext_map); } } } - stream.close(); + is.close(); // sort extensions by how often they appear List> values = new ArrayList>(ext_map.entrySet()); @@ -193,17 +234,42 @@ public class CreateProjectCommand extends Command { } } } - Gridworks.log("**** Most frequent extensions: " + exts.toString()); - - - } else if (fileName.endsWith(".zip")) { - + Gridworks.log("Most frequent extensions: " + exts.toString()); + + // second pass, load the data for real + is = getStream(fileName, new FileInputStream(file)); + SafeInputStream sis = new SafeInputStream(is); + if (is instanceof TarInputStream) { + TarInputStream tis = (TarInputStream) is; + TarEntry te; + while ((te = tis.getNextEntry()) != null) { + if (!te.isDirectory()) { + String name = te.getName(); + String ext = getExtension(name)[1]; + if (exts.contains(ext)) { + internalImportFile(project, options, name, sis); + } + } + } + } else if (is instanceof ZipInputStream) { + ZipInputStream zis = (ZipInputStream) is; + ZipEntry ze; + while ((ze = zis.getNextEntry()) != null) { + if (!ze.isDirectory()) { + String name = ze.getName(); + String ext = getExtension(name)[1]; + if (exts.contains(ext)) { + internalImportFile(project, options, name, sis); + } + } + } + } + sis.reallyClose(); + } else if (fileName.endsWith(".gz")) { - String[] frags = getExtension(fileName); - internalImportFile(project, options, frags[0], new GZIPInputStream(inputStream)); + internalImportFile(project, options, getExtension(fileName)[0], new GZIPInputStream(inputStream)); } else if (fileName.endsWith(".bz2")) { - String[] frags = getExtension(fileName); - internalImportFile(project, options, frags[0], new CBZip2InputStream(inputStream)); + internalImportFile(project, options, getExtension(fileName)[0], new CBZip2InputStream(inputStream)); } else { load(project, options, fileName, inputStream); } @@ -218,7 +284,6 @@ public class CreateProjectCommand extends Command { private void load(Project project, Properties options, String fileName, InputStream inputStream) throws Exception { Importer importer = guessImporter(options, null, fileName); internalInvokeImporter(project, importer, options, inputStream, null); - inputStream.close(); } private File save(InputStream is) throws IOException { @@ -229,6 +294,27 @@ public class CreateProjectCommand extends Command { return temp; } + private void mapExtension(String name, Map ext_map) { + String ext = getExtension(name)[1]; + if (ext_map.containsKey(ext)) { + ext_map.put(ext, ext_map.get(ext) + 1); + } else { + ext_map.put(ext, 1); + } + } + + private InputStream getStream(String fileName, InputStream is) throws IOException { + if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { + return new TarInputStream(new GZIPInputStream(is)); + } else if (fileName.endsWith(".tar.bz2")) { + return new TarInputStream(new CBZip2InputStream(is)); + } else if (fileName.endsWith(".zip")) { + return new ZipInputStream(is); + } else { + return null; + } + } + private String[] getExtension(String filename) { String[] result = new String[2]; int ext_index = filename.lastIndexOf("."); @@ -296,39 +382,21 @@ public class CreateProjectCommand extends Command { String encoding ) throws Exception { - int limit = -1; - int skip = 0; - - if (options.containsKey("limit")) { - String s = options.getProperty("limit"); - try { - limit = Integer.parseInt(s); - } catch (Exception e) { - } - } - if (options.containsKey("skip")) { - String s = options.getProperty("skip"); - try { - skip = Integer.parseInt(s); - } catch (Exception e) { - } - } - - BufferedInputStream inputStream = new BufferedInputStream(rawInputStream); - + int limit = getIntegerOption("limit",options,-1); + int skip = getIntegerOption("skip",options,0); + if (importer.takesReader()) { - /* - * NOTE(SM): The ICU4J char detection code requires the input stream to support mark/reset. - * Unfortunately, not all ServletInputStream implementations are marking, so we need do - * this memory-expensive wrapping to make it work. It's far from ideal but I don't have - * a more efficient solution. - */ + + BufferedInputStream inputStream = new BufferedInputStream(rawInputStream); + + // NOTE(SM): The ICU4J char detection code requires the input stream to support mark/reset. + // Unfortunately, not all ServletInputStream implementations are marking, so we need do + // this memory-expensive wrapping to make it work. It's far from ideal but I don't have + // a more efficient solution. byte[] bytes = new byte[1024 * 4]; - { - inputStream.mark(bytes.length); - inputStream.read(bytes); - inputStream.reset(); - } + inputStream.mark(bytes.length); + inputStream.read(bytes); + inputStream.reset(); CharsetDetector detector = new CharsetDetector(); detector.setDeclaredEncoding("utf8"); // most of the content on the web is encoded in UTF-8 so start with that @@ -361,7 +429,7 @@ public class CreateProjectCommand extends Command { importer.read(reader, project, options, skip, limit); } else { - importer.read(inputStream, project, options, skip, limit); + importer.read(rawInputStream, project, options, skip, limit); } } @@ -372,23 +440,8 @@ public class CreateProjectCommand extends Command { Reader reader ) throws Exception { - int limit = -1; - int skip = 0; - - if (options.containsKey("limit")) { - String s = options.getProperty("limit"); - try { - limit = Integer.parseInt(s); - } catch (Exception e) { - } - } - if (options.containsKey("skip")) { - String s = options.getProperty("skip"); - try { - skip = Integer.parseInt(s); - } catch (Exception e) { - } - } + int limit = getIntegerOption("limit",options,-1); + int skip = getIntegerOption("skip",options,0); importer.read(reader, project, options, skip, limit); } @@ -434,4 +487,16 @@ public class CreateProjectCommand extends Command { return new TsvCsvImporter(); } + + private int getIntegerOption(String name, Properties options, int def) { + int value = def; + if (options.containsKey(name)) { + String s = options.getProperty(name); + try { + value = Integer.parseInt(s); + } catch (Exception e) { + } + } + return value; + } } diff --git a/src/main/java/com/metaweb/gridworks/importers/TsvCsvImporter.java b/src/main/java/com/metaweb/gridworks/importers/TsvCsvImporter.java index 64559733e..90a7e2b9b 100644 --- a/src/main/java/com/metaweb/gridworks/importers/TsvCsvImporter.java +++ b/src/main/java/com/metaweb/gridworks/importers/TsvCsvImporter.java @@ -21,65 +21,61 @@ public class TsvCsvImporter implements Importer { throws Exception { LineNumberReader lnReader = new LineNumberReader(reader); - try { - String sep = options.getProperty("separator"); // auto-detect if not present - String line = null; - boolean first = true; - int cellCount = 1; - RowParser parser = (sep == null || (sep.length() == 0)) ? null : new SeparatorRowParser(sep); + String sep = options.getProperty("separator"); // auto-detect if not present + String line = null; + boolean first = true; + int cellCount = 1; + RowParser parser = (sep == null || (sep.length() == 0)) ? null : new SeparatorRowParser(sep); + + int rowsWithData = 0; + while ((line = lnReader.readLine()) != null) { + if (line.trim().length() == 0) { + continue; + } - int rowsWithData = 0; - while ((line = lnReader.readLine()) != null) { - if (line.trim().length() == 0) { - continue; - } - - if (parser == null) { - int tab = line.indexOf('\t'); - if (tab >= 0) { - sep = "\t"; - parser = new SeparatorRowParser(sep); - } else { - sep = ","; - parser = new CSVRowParser(); - } - } - - if (first) { - String[] cells = StringUtils.splitPreserveAllTokens(line, sep); - - first = false; - for (int c = 0; c < cells.length; c++) { - String cell = cells[c]; - if (cell.startsWith("\"") && cell.endsWith("\"")) { - cell = cell.substring(1, cell.length() - 1); - } - - Column column = new Column(c, cell); - - project.columnModel.columns.add(column); - } - - cellCount = cells.length; + if (parser == null) { + int tab = line.indexOf('\t'); + if (tab >= 0) { + sep = "\t"; + parser = new SeparatorRowParser(sep); } else { - Row row = new Row(cellCount); + sep = ","; + parser = new CSVRowParser(); + } + } + + if (first) { + String[] cells = StringUtils.splitPreserveAllTokens(line, sep); + + first = false; + for (int c = 0; c < cells.length; c++) { + String cell = cells[c]; + if (cell.startsWith("\"") && cell.endsWith("\"")) { + cell = cell.substring(1, cell.length() - 1); + } - if (parser.parseRow(row, line)) { - rowsWithData++; + Column column = new Column(c, cell); + + project.columnModel.columns.add(column); + } + + cellCount = cells.length; + } else { + Row row = new Row(cellCount); + + if (parser.parseRow(row, line)) { + rowsWithData++; + + if (skip <= 0 || rowsWithData > skip) { + project.rows.add(row); + project.columnModel.setMaxCellIndex(row.cells.size()); - if (skip <= 0 || rowsWithData > skip) { - project.rows.add(row); - project.columnModel.setMaxCellIndex(row.cells.size()); - - if (limit > 0 && project.rows.size() >= limit) { - break; - } + if (limit > 0 && project.rows.size() >= limit) { + break; } } } } - } finally { - lnReader.close(); } } diff --git a/src/main/java/com/metaweb/gridworks/importers/XmlImporter.java b/src/main/java/com/metaweb/gridworks/importers/XmlImporter.java index 995d14f95..368582b38 100644 --- a/src/main/java/com/metaweb/gridworks/importers/XmlImporter.java +++ b/src/main/java/com/metaweb/gridworks/importers/XmlImporter.java @@ -1,8 +1,8 @@ package com.metaweb.gridworks.importers; -import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; -import java.io.InputStream; +import java.io.InputStream; +import java.io.PushbackInputStream; import java.io.Reader; import java.util.Properties; @@ -13,6 +13,8 @@ import com.metaweb.gridworks.model.Project; public class XmlImporter implements Importer { + public static final int BUFFER_SIZE = 64 * 1024; + public boolean takesReader() { return false; } @@ -30,29 +32,32 @@ public class XmlImporter implements Importer { int skip, int limit ) throws Exception { - BufferedInputStream bis = new BufferedInputStream(inputStream); + PushbackInputStream pis = new PushbackInputStream(inputStream,BUFFER_SIZE); String[] recordPath = null; { - byte[] buffer = new byte[64 * 1024]; - - bis.mark(buffer.length); - int c = bis.read(buffer); - bis.reset(); + byte[] buffer = new byte[BUFFER_SIZE]; + int bytes_read = 0; + while (bytes_read < BUFFER_SIZE) { + int c = pis.read(buffer, bytes_read, BUFFER_SIZE - bytes_read); + if (c == -1) break; + bytes_read +=c ; + } + pis.unread(buffer, 0, bytes_read); if (options.containsKey("importer-record-tag")) { recordPath = XmlImportUtilities.detectPathFromTag( - new ByteArrayInputStream(buffer, 0, c), + new ByteArrayInputStream(buffer, 0, bytes_read), options.getProperty("importer-record-tag")); } else { recordPath = XmlImportUtilities.detectRecordElement( - new ByteArrayInputStream(buffer, 0, c)); + new ByteArrayInputStream(buffer, 0, bytes_read)); } } - + ImportColumnGroup rootColumnGroup = new ImportColumnGroup(); - XmlImportUtilities.importXml(bis, project, recordPath, rootColumnGroup); + XmlImportUtilities.importXml(pis, project, recordPath, rootColumnGroup); XmlImportUtilities.createColumnsFromImport(project, rootColumnGroup); project.columnModel.update();