Due to the current gap in continued funding from the U.S. National Science Foundation (NSF), the NSF Unidata Program Center has temporarily paused most operations. See NSF Unidata Pause in Most Operations for details.
Hello,Rob (CCed) emailed in February (subject: "EOFException: Reading file.grb2.ncx at 0 file length = 0") about a file descriptor leak observed with the Java netCDF when reading invalid GRIB index files. I was able to reliably replicate the file leak locally by manually setting the index file to 0 size before opening a GRIB file. I observed the file leak through the set of open file descriptors (through both the Java UnixOperatingSystemMXBean class and the lsof command).
The attached source files are a patch for this issue against the edu.ucar:grib:4.3.21 source code. This patch is quite simple, below I describe the individual changes. Would it be possible to apply these changes and make a new release? If so, when would the release be made? We are currently maintaining our own branch of the GRIB library because we need this fix in operations, which is undesirable for a number of reasons. I can provide a main() method that demonstrates this problem if it would be of interest.
Thanks! Aaron and Rob ===============================The source of the problem was in Grib1CollectionBuilder and Grib2CollectionBuilder (the code changes to each are identical):
In /readIndex/(String filename) a new RandomAccessFile was being created but was never closed. This fix definitively resolved our file leak issue, and logically the created RandomAccessFile must be closed in this method since it is only used there.
< return readIndex( new RandomAccessFile(filename, "r") ); --- > RandomAccessFile raf = new RandomAccessFile( filename, "r" ); > boolean success = readIndex( raf ); > raf.close(); > return success;In /readOrCreateIndex/() there is more code that opens a RandomAccessFile but may not close it. This code did not resolve our issue, but is similar to the fix above and may resolve a file leak in other situations. In this case I note that gc.setIndexRaf(indexRaf) is called just before the changed code segment, and therefore the RAF may be closed properly without this change. I don't know the code and flow of operations well enough to be sure.
< readIndex(indexRaf); --- > boolean success = readIndex(indexRaf); > if( !success ){ > indexRaf.close(); > }
/* * Copyright (c) 1998 - 2011. University Corporation for Atmospheric Research/Unidata * Portions of this software were developed by the Unidata Program at the * University Corporation for Atmospheric Research. * * Access and use of this software shall impose the following obligations * and understandings on the user. The user is granted the right, without * any fee or cost, to use, copy, modify, alter, enhance and distribute * this software, and any derivative works thereof, and its supporting * documentation for any purpose whatsoever, provided that this entire * notice appears in all copies of the software, derivative works and * supporting documentation. Further, UCAR requests that the user credit * UCAR/Unidata in any publications that result from the use of this * software or in any product that includes this software. The names UCAR * and/or Unidata, however, may not be used in any advertising or publicity * to endorse or promote any products or commercial entity unless specific * written permission is obtained from UCAR/Unidata. The user also * understands that UCAR/Unidata is not obligated to provide the user with * any support, consulting, training or assistance of any kind with regard * to the use, operation and performance of this software nor to provide * the user with any updates, revisions, new versions or "bug fixes." * * THIS SOFTWARE IS PROVIDED BY UCAR/UNIDATA "AS IS" AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL UCAR/UNIDATA BE LIABLE FOR ANY SPECIAL, * INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION * WITH THE ACCESS, USE OR PERFORMANCE OF THIS SOFTWARE. */ package ucar.nc2.grib.grib1; import com.google.protobuf.ByteString; import thredds.featurecollection.FeatureCollectionConfig; import thredds.inventory.CollectionManager; import thredds.inventory.CollectionManagerSingleFile; import thredds.inventory.MFile; import ucar.nc2.grib.*; import ucar.nc2.grib.grib1.tables.Grib1Customizer; import ucar.nc2.stream.NcStream; import ucar.unidata.io.RandomAccessFile; import ucar.unidata.util.Parameter; import java.io.*; import java.util.*; /** * Build a GribCollection object for Grib-1 files. Manage grib collection index (ncx). * Covers GribCollectionProto, which serializes and deserializes. * Rectilyse means to turn the collection into a multidimensional variable. * * @author caron * @since 4/6/11 */ public class Grib1CollectionBuilder extends GribCollectionBuilder { protected static final int minVersionSingle = 9; // if single file, this version and above is ok protected static final int version = 10; public static final String MAGIC_START = "Grib1CollectionIndex"; // from a single file, read in the index, create if it doesnt exist or is out of date static public GribCollection readOrCreateIndexFromSingleFile(MFile file, CollectionManager.Force force, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) throws IOException { Grib1CollectionBuilder builder = new Grib1CollectionBuilder(file, config, logger); builder.readOrCreateIndex(force); return builder.gc; } // called by tdm static public boolean update(CollectionManager dcm, org.slf4j.Logger logger) throws IOException { Grib1CollectionBuilder builder = new Grib1CollectionBuilder(dcm, logger); if (!builder.needsUpdate()) return false; builder.readOrCreateIndex(CollectionManager.Force.always); builder.gc.close(); return true; } // from a collection, read in the index, create if it doesnt exist or is out of date // assume that the CollectionManager is up to date, eg doesnt need to be scanned static public GribCollection factory(CollectionManager dcm, CollectionManager.Force force, org.slf4j.Logger logger) throws IOException { Grib1CollectionBuilder builder = new Grib1CollectionBuilder(dcm, logger); builder.readOrCreateIndex(force); return builder.gc; } // read in the index, index raf already open static public GribCollection createFromIndex(String name, File directory, RandomAccessFile indexRaf, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) throws IOException { Grib1CollectionBuilder builder = new Grib1CollectionBuilder(name, directory, config, logger); if (builder.readIndex(indexRaf)) { return builder.gc; } throw new IOException("Reading index failed"); } // this writes the index always static public boolean writeIndexFile(File indexFile, CollectionManager dcm, org.slf4j.Logger logger) throws IOException { Grib1CollectionBuilder builder = new Grib1CollectionBuilder(dcm, logger); return builder.createIndex(indexFile); } //////////////////////////////////////////////////////////////// //protected final List<CollectionManager> collections = new ArrayList<CollectionManager>(); protected GribCollection gc; protected Grib1Customizer cust; // single file private Grib1CollectionBuilder(MFile file, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) throws IOException { super(new CollectionManagerSingleFile(file, logger), true, logger); try { if (config != null) dcm.putAuxInfo(FeatureCollectionConfig.AUX_GRIB_CONFIG, config); this.gc = new Grib1Collection(file.getName(), new File(dcm.getRoot()), config); } catch (Exception e) { ByteArrayOutputStream bos = new ByteArrayOutputStream(10000); e.printStackTrace(new PrintStream(bos)); logger.error("Failed to create index for single file", e); throw new IOException(e); } } private Grib1CollectionBuilder(CollectionManager dcm, org.slf4j.Logger logger) { super(dcm, false, logger); FeatureCollectionConfig.GribConfig config = (FeatureCollectionConfig.GribConfig) dcm.getAuxInfo(FeatureCollectionConfig.AUX_GRIB_CONFIG); this.gc = new Grib1Collection(dcm.getCollectionName(), new File(dcm.getRoot()), config); } private Grib1CollectionBuilder(String name, File directory, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) { super(null, false, logger); this.gc = new Grib1Collection(name, directory, config); } protected Grib1CollectionBuilder(CollectionManager dcm, boolean isSingleFile, org.slf4j.Logger logger) { super(dcm, isSingleFile, logger); } // read or create index private void readOrCreateIndex(CollectionManager.Force ff) throws IOException { // force new index or test for new index needed boolean force = ((ff == CollectionManager.Force.always) || (ff == CollectionManager.Force.test && needsUpdate())); // otherwise, we're good as long as the index file exists File idx = gc.getIndexFile(); // LOOK problem - index exists but its out of date - trigger rewrite, but not writeable. if (force || !idx.exists() || !readIndex(idx.getPath()) ) { // write out index idx = gc.makeNewIndexFile(logger); // make sure we have a writeable index logger.info("{}: createIndex {}", gc.getName(), idx.getPath()); createIndex(idx); // read back in index RandomAccessFile indexRaf = new RandomAccessFile(idx.getPath(), "r"); gc.setIndexRaf(indexRaf); boolean success = readIndex(indexRaf); if( !success ){ indexRaf.close(); } } } public boolean needsUpdate() { if (dcm == null) return false; File idx = gc.getIndexFile(); return !idx.exists() || needsUpdate(idx.lastModified()); } private boolean needsUpdate(long idxLastModified) { CollectionManager.ChangeChecker cc = GribIndex.getChangeChecker(); for (MFile mfile : dcm.getFiles()) { if (cc.hasChangedSince(mfile, idxLastModified)) return true; } return false; } //////////////////////////////////////////////////////////////////////////////////////////////////// // reading public boolean readIndex(String filename) throws IOException { RandomAccessFile raf = new RandomAccessFile( filename, "r" ); boolean success = readIndex( raf ); raf.close(); return success; } /** * Read the index file * @param raf the index file * @return true on success */ protected boolean readIndex(RandomAccessFile raf) { gc.setIndexRaf(raf); // LOOK leaving the raf open in the GribCollection try { raf.order(RandomAccessFile.BIG_ENDIAN); raf.seek(0); //// header message if (!NcStream.readAndTest(raf, getMagicStart().getBytes())) { logger.error("GribCollection {}: invalid index", gc.getName()); return false; } gc.version = raf.readInt(); boolean versionOk = isSingleFile ? gc.version >= minVersionSingle : gc.version >= version; if (!versionOk) { logger.warn("Grib1Collection {}: index found version={}, want version= {} on file {}", gc.getName(), gc.version, version, raf.getLocation()); return false; } long skip = raf.readLong(); raf.skipBytes(skip); int size = NcStream.readVInt(raf); if ((size < 0) || (size > 100 * 1000 * 1000)) { logger.warn("Grib1Collection {}: invalid or empty index ", gc.getName()); return false; } byte[] m = new byte[size]; raf.readFully(m); GribCollectionProto.GribCollectionIndex proto = GribCollectionProto.GribCollectionIndex.parseFrom(m); gc.center = proto.getCenter(); gc.subcenter = proto.getSubcenter(); gc.master = proto.getMaster(); gc.local = proto.getLocal(); gc.genProcessType = proto.getGenProcessType(); gc.genProcessId = proto.getGenProcessId(); gc.backProcessId = proto.getBackProcessId(); gc.local = proto.getLocal(); if (cust == null) { cust = Grib1Customizer.factory(gc.center, gc.subcenter, gc.local, null); // we need this in readVertCoord() } File dir = gc.getDirectory(); String dirname = proto.getDirName(); if (dir != null && !dir.getPath().equals(dirname)) { logger.debug("Grib1Collection {}: has different directory= {} than index= {} ", gc.getName(), dir.getPath(), dirname); //return false; } // switch from files to mfiles in version 10 if (!(this instanceof Grib1TimePartitionBuilder)) { if (gc.version < 10) { int n = proto.getFilesCount(); if (n == 0) { logger.warn("Grib1Collection {}: has no files, force recreate ", gc.getName()); return false; } else { List<MFile> files = new ArrayList<MFile>(proto.getFilesCount()); for (int i = 0; i < n; i++) files.add(new GribCollectionBuilder.GcMFile(dir, proto.getFiles(i), -1)); gc.setFiles(files); if (dcm != null) dcm.setFiles(files); } } else { int n = proto.getMfilesCount(); if (n == 0) { logger.warn("Grib1Collection {}: has no files, force recreate ", gc.getName()); return false; } else { List<MFile> files = new ArrayList<MFile>(n); for (int i = 0; i < n; i++) files.add(new GribCollectionBuilder.GcMFile(dir, proto.getMfiles(i))); gc.setFiles(files); if (dcm != null) dcm.setFiles(files); } } } gc.groups = new ArrayList<GribCollection.GroupHcs>(proto.getGroupsCount()); for (int i = 0; i < proto.getGroupsCount(); i++) gc.groups.add(readGroup(proto.getGroups(i), gc.makeGroup(), gc.center)); gc.groups = Collections.unmodifiableList(gc.groups); gc.params = new ArrayList<Parameter>(proto.getParamsCount()); for (int i = 0; i < proto.getParamsCount(); i++) gc.params.add(readParam(proto.getParams(i))); if (!readPartitions(proto, dirname)) { logger.warn("Time1Partition {}: has no partitions, force recreate ", gc.getName()); return false; } return true; } catch (Throwable t) { logger.error("Error reading index " + raf.getLocation(), t); return false; } } protected boolean readPartitions(GribCollectionProto.GribCollectionIndex proto, String directory) { return true; } protected void readTimePartitions(GribCollection.GroupHcs group, GribCollectionProto.Group proto) { // NOOP } GribCollection.GroupHcs readGroup(GribCollectionProto.Group p, GribCollection.GroupHcs group, int center) throws IOException { byte[] rawGds = null; Grib1Gds gds; if (p.hasPredefinedGds()) { gds = ucar.nc2.grib.grib1.Grib1GdsPredefined.factory(center, p.getPredefinedGds()); } else { rawGds = p.getGds().toByteArray(); Grib1SectionGridDefinition gdss = new Grib1SectionGridDefinition(rawGds); gds = gdss.getGDS(); } int gdsHash = (p.getGdsHash() != 0) ? p.getGdsHash() : gds.hashCode(); group.setHorizCoordSystem(gds.makeHorizCoordSys(), rawGds, gdsHash); group.varIndex = new ArrayList<GribCollection.VariableIndex>(); for (int i = 0; i < p.getVariablesCount(); i++) group.varIndex.add(readVariable(p.getVariables(i), group)); Collections.sort(group.varIndex); group.timeCoords = new ArrayList<TimeCoord>(p.getTimeCoordsCount()); for (int i = 0; i < p.getTimeCoordsCount(); i++) group.timeCoords.add(readTimeCoord(p.getTimeCoords(i))); group.vertCoords = new ArrayList<VertCoord>(p.getVertCoordsCount()); for (int i = 0; i < p.getVertCoordsCount(); i++) group.vertCoords.add(readVertCoord(p.getVertCoords(i))); group.ensCoords = new ArrayList<EnsCoord>(p.getEnsCoordsCount()); for (int i = 0; i < p.getEnsCoordsCount(); i++) group.ensCoords.add(readEnsCoord(p.getEnsCoords(i))); group.filenose = new int[p.getFilenoCount()]; for (int i = 0; i < p.getFilenoCount(); i++) group.filenose[i] = p.getFileno(i); readTimePartitions(group, p); // finish for (GribCollection.VariableIndex vi : group.varIndex) { TimeCoord tc = group.timeCoords.get(vi.timeIdx); vi.ntimes = tc.getSize(); VertCoord vc = (vi.vertIdx < 0) ? null : group.vertCoords.get(vi.vertIdx); vi.nverts = (vc == null) ? 0 : vc.getSize(); EnsCoord ec = (vi.ensIdx < 0) ? null : group.ensCoords.get(vi.ensIdx); vi.nens = (ec == null) ? 0 : ec.getSize(); } // group.assignVertNames(); return group; } private Parameter readParam(GribCollectionProto.Parameter pp) throws IOException { if (pp.hasSdata()) return new Parameter(pp.getName(), pp.getSdata()); int count = 0; double[] vals = new double[pp.getDataCount()]; for (double val : pp.getDataList()) vals[count++] = val; return new Parameter(pp.getName(), vals); } private TimeCoord readTimeCoord(GribCollectionProto.Coord pc) throws IOException { if (pc.getBoundCount() > 0) { // its an interval List<TimeCoord.Tinv> coords = new ArrayList<TimeCoord.Tinv>(pc.getValuesCount()); for (int i = 0; i < pc.getValuesCount(); i++) coords.add(new TimeCoord.Tinv((int) pc.getValues(i), (int) pc.getBound(i))); TimeCoord tc = new TimeCoord(pc.getCode(), pc.getUnit(), coords); return tc.setIndex( pc.getIndex()); } else { List<Integer> coords = new ArrayList<Integer>(pc.getValuesCount()); for (float value : pc.getValuesList()) coords.add((int) value); TimeCoord tc = new TimeCoord(pc.getCode(), pc.getUnit(), coords); return tc.setIndex( pc.getIndex()); } } private VertCoord readVertCoord(GribCollectionProto.Coord pc) throws IOException { boolean isLayer = (pc.getBoundCount() > 0); List<VertCoord.Level> coords = new ArrayList<VertCoord.Level>(pc.getValuesCount()); for (int i = 0; i < pc.getValuesCount(); i++) coords.add(new VertCoord.Level(pc.getValues(i), isLayer ? pc.getBound(i) : 0)); return new VertCoord(coords, cust.getVertUnit(pc.getCode()), isLayer); } private EnsCoord readEnsCoord(GribCollectionProto.Coord pc) throws IOException { List<EnsCoord.Coord> coords = new ArrayList<EnsCoord.Coord>(pc.getValuesCount()); for (int i = 0; i < pc.getValuesCount(); i += 2) coords.add(new EnsCoord.Coord((int) pc.getValues(i), (int) pc.getValues(i + 1))); return new EnsCoord(coords); } protected GribCollection.VariableIndex readVariable(GribCollectionProto.Variable pv, GribCollection.GroupHcs group) { int discipline = pv.getDiscipline(); int category = pv.getCategory(); int param = pv.getParameter(); int tableVersion = pv.getTableVersion(); int levelType = pv.getLevelType(); int intvType = pv.getIntervalType(); String intvName = pv.getIntvName(); boolean isLayer = pv.getIsLayer(); int ensDerivedType = pv.getEnsDerivedType(); int probType = pv.getProbabilityType(); String probabilityName = pv.getProbabilityName(); int cdmHash = pv.getCdmHash(); long recordsPos = pv.getRecordsPos(); int recordsLen = pv.getRecordsLen(); int timeIdx = pv.getTimeIdx(); int vertIdx = pv.getVertIdx(); int ensIdx = pv.getEnsIdx(); return gc.makeVariableIndex(group, tableVersion, discipline, category, param, levelType, isLayer, intvType, intvName, ensDerivedType, probType, probabilityName, -1, cdmHash, timeIdx, vertIdx, ensIdx, recordsPos, recordsLen); } /////////////////////////////////////////////////////////////////////////////////// // writing private class Group { public Grib1SectionGridDefinition gdss; public int gdsHash; // may have been modified public Grib1Rectilyser rect; public List<Grib1Record> records = new ArrayList<Grib1Record>(); public String nameOverride; public Set<Integer> fileSet; // this is so we can show just the component files that are in this group private Group(Grib1SectionGridDefinition gdss, int gdsHash) { this.gdss = gdss; this.gdsHash = gdsHash; } } /////////////////////////////////////////////////// // create the index private boolean createIndex(File indexFile) throws IOException { if (dcm == null) { logger.error("Grib1CollectionBuilder "+gc.getName()+" : cannot create new index "); throw new IllegalStateException(); } long start = System.currentTimeMillis(); ArrayList<MFile> files = new ArrayList<MFile>(); List<Group> groups = makeAggregatedGroups(files); createIndex(indexFile, groups, files); long took = System.currentTimeMillis() - start; if (logger.isDebugEnabled()) logger.debug("That took {} msecs", took); return true; } // read all records in all files, // divide into groups based on GDS hash // each group has an arraylist of all records that belong to it. // for each group, run rectlizer to derive the coordinates and variables public List<Group> makeAggregatedGroups(ArrayList<MFile> files) throws IOException { Map<Integer, Group> gdsMap = new HashMap<Integer, Group>(); Map<Integer, Integer> gdsConvert = null; Map<String, Boolean> pdsConvert = null; Grib1Rectilyser.Counter stats = new Grib1Rectilyser.Counter(); //boolean intvMerge = intvMergeDefault; logger.debug("GribCollection {}: makeAggregatedGroups%n", gc.getName()); int fileno = 0; logger.debug(" dcm= {}%n", dcm); FeatureCollectionConfig.GribConfig config = (FeatureCollectionConfig.GribConfig) dcm.getAuxInfo(FeatureCollectionConfig.AUX_GRIB_CONFIG); if (config != null) gdsConvert = config.gdsHash; if (config != null) pdsConvert = config.pdsHash; FeatureCollectionConfig.GribIntvFilter intvMap = (config != null) ? config.intvFilter : null; // intvMerge = (config == null) || (config.intvMerge == null) ? intvMergeDefault : config.intvMerge; for (MFile mfile : dcm.getFiles()) { // f.format("%3d: %s%n", fileno, mfile.getPath()); Grib1Index index; try { index = (Grib1Index) GribIndex.readOrCreateIndexFromSingleFile(true, !isSingleFile, mfile, config, CollectionManager.Force.test, logger); files.add(mfile); // only add on success } catch (IOException ioe) { logger.error("Grib1CollectionBuilder "+gc.getName()+" : reading/Creating gbx9 index for file "+ mfile.getPath()+" failed", ioe); continue; } for (Grib1Record gr : index.getRecords()) { gr.setFile(fileno); // each record tracks which file it belongs to int gdsHash = gr.getGDSsection().getGDS().hashCode(); // use GDS hash code to group records if (gdsConvert != null && gdsConvert.get(gdsHash) != null) // allow external config to muck with gdsHash. Why? because of error in encoding gdsHash = gdsConvert.get(gdsHash); // and we need exact hash matching if (cust == null) cust = Grib1Customizer.factory(gr, null); if (config != null) cust.setTimeUnitConverter(config.getTimeUnitConverter()); if (intvMap != null && filterOut(gr, intvMap)) { stats.filter++; continue; // skip } Group g = gdsMap.get(gdsHash); if (g == null) { g = new Group(gr.getGDSsection(), gdsHash); gdsMap.put(gdsHash, g); //g.nameOverride = setGroupNameOverride(gdsHash, gdsNamer, groupNamer, mfile); } g.records.add(gr); } fileno++; stats.recordsTotal += index.getRecords().size(); } List<Group> result = new ArrayList<Group>(gdsMap.values()); for (Group g : result) { g.rect = new Grib1Rectilyser(cust, g.records, g.gdsHash, pdsConvert); g.rect.make(stats); } if (logger.isDebugEnabled()) logger.debug(stats.show()); return result; } // true means remove private boolean filterOut(Grib1Record gr, FeatureCollectionConfig.GribIntvFilter intvFilter) { Grib1SectionProductDefinition pdss = gr.getPDSsection(); Grib1ParamTime ptime = pdss.getParamTime(cust); if (!ptime.isInterval()) return false; int[] intv = ptime.getInterval(); if (intv == null) return false; int haveLength = intv[1] - intv[0]; // HACK if (haveLength == 0 && intvFilter.isZeroExcluded()) { // discard 0,0 if ((intv[0] == 0) && (intv[1] == 0)) { //f.format(" FILTER INTV [0, 0] %s%n", gr); return true; } return false; } else if (intvFilter.hasFilter()) { int center = pdss.getCenter(); int subcenter = pdss.getSubCenter(); int version = pdss.getTableVersion(); int param = pdss.getParameterNumber(); int id = (center << 8) + (subcenter << 16) + (version << 24) + param; return intvFilter.filterOut(id, haveLength, Integer.MIN_VALUE); } return false; } /////////////////////////////////////////////////////////////////////////////////// public String getMagicStart() { return MAGIC_START; } /* MAGIC_START version sizeRecords VariableRecords (sizeRecords bytes) sizeIndex GribCollectionIndex (sizeIndex bytes) */ private void createIndex(File indexFile, List<Group> groups, ArrayList<MFile> files) throws IOException { Grib1Record first = null; // take global metadata from here boolean deleteOnClose = false; if (indexFile.exists()) { if (!indexFile.delete()) logger.warn(" gc1 cant delete index file {}", indexFile.getPath()); } logger.debug(" createIndex for {}", indexFile.getPath()); RandomAccessFile raf = new RandomAccessFile(indexFile.getPath(), "rw"); raf.order(RandomAccessFile.BIG_ENDIAN); try { //// header message raf.write(getMagicStart().getBytes("UTF-8")); raf.writeInt(version); long lenPos = raf.getFilePointer(); raf.writeLong(0); // save space to write the length of the record section long countBytes = 0; int countRecords = 0; for (Group g : groups) { g.fileSet = new HashSet<Integer>(); for (Grib1Rectilyser.VariableBag vb : g.rect.getGribvars()) { if (first == null) first = vb.first; GribCollectionProto.VariableRecords vr = writeRecordsProto(vb, g.fileSet); byte[] b = vr.toByteArray(); vb.pos = raf.getFilePointer(); vb.length = b.length; raf.write(b); countBytes += b.length; countRecords += vb.recordMap.length; } } long bytesPerRecord = countBytes / ((countRecords == 0) ? 1 : countRecords); if (logger.isDebugEnabled()) logger.debug(" write RecordMaps: bytes = {} records = {} bytesPerRecord={}", countBytes, countRecords, bytesPerRecord); if (first == null) { deleteOnClose = true; logger.error("GribCollection {}: has no files", gc.getName()); throw new IOException("GribCollection " + gc.getName() + " has no files"); } long pos = raf.getFilePointer(); raf.seek(lenPos); raf.writeLong(countBytes); raf.seek(pos); // back to the output. GribCollectionProto.GribCollectionIndex.Builder indexBuilder = GribCollectionProto.GribCollectionIndex.newBuilder(); indexBuilder.setName(gc.getName()); // directory and mfile list indexBuilder.setDirName(gc.getDirectory().getPath()); List<GribCollectionBuilder.GcMFile> gcmfiles = GribCollectionBuilder.makeFiles(gc.getDirectory(), files); for (GribCollectionBuilder.GcMFile gcmfile : gcmfiles) { indexBuilder.addMfiles(gcmfile.makeProto()); } for (Group g : groups) indexBuilder.addGroups(writeGroupProto(g)); /* int count = 0; for (DatasetCollectionManager dcm : collections) { indexBuilder.addParams(makeParamProto(new Parameter("spec" + count, dcm.()))); count++; } */ // what about just storing first ?? Grib1SectionProductDefinition pds = first.getPDSsection(); indexBuilder.setCenter(pds.getCenter()); indexBuilder.setSubcenter(pds.getSubCenter()); indexBuilder.setLocal(pds.getTableVersion()); indexBuilder.setMaster(0); indexBuilder.setGenProcessId(pds.getGenProcess()); GribCollectionProto.GribCollectionIndex index = indexBuilder.build(); byte[] b = index.toByteArray(); NcStream.writeVInt(raf, b.length); // message size raf.write(b); // message - all in one gulp logger.debug(" write GribCollectionIndex= {} bytes", b.length); } finally { logger.debug(" file size = %d bytes", raf.length()); raf.close(); // remove it on failure if (deleteOnClose && !indexFile.delete()) logger.error(" gc1 cant deleteOnClose index file {}", indexFile.getPath()); } } private GribCollectionProto.VariableRecords writeRecordsProto(Grib1Rectilyser.VariableBag vb, Set<Integer> fileSet) throws IOException { GribCollectionProto.VariableRecords.Builder b = GribCollectionProto.VariableRecords.newBuilder(); b.setCdmHash(vb.cdmHash); for (Grib1Rectilyser.Record ar : vb.recordMap) { GribCollectionProto.Record.Builder br = GribCollectionProto.Record.newBuilder(); if (ar == null || ar.gr == null) { br.setFileno(0); br.setPos(0); br.setMissing(true); // missing : cant use 0 since that may be a valid value } else { br.setFileno(ar.gr.getFile()); fileSet.add(ar.gr.getFile()); Grib1SectionIndicator is = ar.gr.getIs(); br.setPos(is.getStartPos()); // start of entire message } b.addRecords(br); } return b.build(); } private GribCollectionProto.Group writeGroupProto(Group g) throws IOException { GribCollectionProto.Group.Builder b = GribCollectionProto.Group.newBuilder(); if (g.gdss.getPredefinedGridDefinition() >= 0) b.setPredefinedGds(g.gdss.getPredefinedGridDefinition()); else { b.setGds(ByteString.copyFrom(g.gdss.getRawBytes())); b.setGdsHash(g.gdsHash); } for (Grib1Rectilyser.VariableBag vb : g.rect.getGribvars()) b.addVariables(writeVariableProto(g.rect, vb)); List<TimeCoord> timeCoords = g.rect.getTimeCoords(); for (int i = 0; i < timeCoords.size(); i++) b.addTimeCoords(writeCoordProto(timeCoords.get(i), i)); List<VertCoord> vertCoords = g.rect.getVertCoords(); for (int i = 0; i < vertCoords.size(); i++) b.addVertCoords(writeCoordProto(vertCoords.get(i), i)); List<EnsCoord> ensCoords = g.rect.getEnsCoords(); for (int i = 0; i < ensCoords.size(); i++) b.addEnsCoords(writeCoordProto(ensCoords.get(i), i)); for (Integer aFileSet : g.fileSet) b.addFileno(aFileSet); if (g.nameOverride != null) b.setName(g.nameOverride); return b.build(); } private GribCollectionProto.Variable writeVariableProto(Grib1Rectilyser rect, Grib1Rectilyser.VariableBag vb) throws IOException { GribCollectionProto.Variable.Builder b = GribCollectionProto.Variable.newBuilder(); Grib1SectionProductDefinition pds = vb.first.getPDSsection(); b.setDiscipline(0); b.setCategory(0); b.setParameter(pds.getParameterNumber()); b.setTableVersion(pds.getTableVersion()); // can differ for variables in the same file b.setLevelType(pds.getLevelType()); b.setIsLayer(cust.isLayer(pds.getLevelType())); // LOOK alternatively could store an entire PDS (one for each variable) b.setCdmHash(vb.cdmHash); b.setRecordsPos(vb.pos); b.setRecordsLen(vb.length); b.setTimeIdx(vb.timeCoordIndex); if (vb.vertCoordIndex >= 0) b.setVertIdx(vb.vertCoordIndex); if (vb.ensCoordIndex >= 0) b.setEnsIdx(vb.ensCoordIndex); Grib1ParamTime ptime = pds.getParamTime(cust); // LOOK could use cust.getParamTime(pds) to not retain object if (ptime.isInterval()) { b.setIntervalType(pds.getTimeRangeIndicator()); b.setIntvName(rect.getTimeIntervalName(vb.timeCoordIndex)); } /* if (pds.isEnsembleDerived()) { Grib1Pds.PdsEnsembleDerived pdsDerived = (Grib1Pds.PdsEnsembleDerived) pds; b.setEnsDerivedType(pdsDerived.getDerivedForecastType()); // derived type (table 4.7) } if (pds.isProbability()) { Grib1Pds.PdsProbability pdsProb = (Grib1Pds.PdsProbability) pds; b.setProbabilityName(pdsProb.getProbabilityName()); b.setProbabilityType(pdsProb.getProbabilityType()); } */ return b.build(); } protected GribCollectionProto.Parameter writeParamProto(Parameter param) throws IOException { GribCollectionProto.Parameter.Builder b = GribCollectionProto.Parameter.newBuilder(); b.setName(param.getName()); if (param.isString()) b.setSdata(param.getStringValue()); else { for (int i = 0; i < param.getLength(); i++) b.addData(param.getNumericValue(i)); } return b.build(); } protected GribCollectionProto.Coord writeCoordProto(TimeCoord tc, int index) throws IOException { GribCollectionProto.Coord.Builder b = GribCollectionProto.Coord.newBuilder(); b.setIndex(index); b.setCode(tc.getCode()); b.setUnit(tc.getUnits()); float scale = (float) tc.getTimeUnitScale(); // deal with, eg, "6 hours" by multiplying values by 6 if (tc.isInterval()) { for (TimeCoord.Tinv tinv : tc.getIntervals()) { b.addValues(tinv.getBounds1() * scale); b.addBound(tinv.getBounds2() * scale); } } else { for (int value : tc.getCoords()) b.addValues(value * scale); } return b.build(); } protected GribCollectionProto.Coord writeCoordProto(VertCoord vc, int index) throws IOException { GribCollectionProto.Coord.Builder b = GribCollectionProto.Coord.newBuilder(); b.setIndex(index); b.setCode(vc.getCode()); String units = (vc.getUnits() != null) ? vc.getUnits() : ""; b.setUnit(units); for (VertCoord.Level coord : vc.getCoords()) { if (vc.isLayer()) { b.addValues((float) coord.getValue1()); b.addBound((float) coord.getValue2()); } else { b.addValues((float) coord.getValue1()); } } return b.build(); } protected GribCollectionProto.Coord writeCoordProto(EnsCoord ec, int index) throws IOException { GribCollectionProto.Coord.Builder b = GribCollectionProto.Coord.newBuilder(); b.setIndex(index); b.setCode(0); b.setUnit(""); for (EnsCoord.Coord coord : ec.getCoords()) { b.addValues((float) coord.getCode()); b.addValues((float) coord.getEnsMember()); } return b.build(); } }
/* * Copyright (c) 1998 - 2011. University Corporation for Atmospheric Research/Unidata * Portions of this software were developed by the Unidata Program at the * University Corporation for Atmospheric Research. * * Access and use of this software shall impose the following obligations * and understandings on the user. The user is granted the right, without * any fee or cost, to use, copy, modify, alter, enhance and distribute * this software, and any derivative works thereof, and its supporting * documentation for any purpose whatsoever, provided that this entire * notice appears in all copies of the software, derivative works and * supporting documentation. Further, UCAR requests that the user credit * UCAR/Unidata in any publications that result from the use of this * software or in any product that includes this software. The names UCAR * and/or Unidata, however, may not be used in any advertising or publicity * to endorse or promote any products or commercial entity unless specific * written permission is obtained from UCAR/Unidata. The user also * understands that UCAR/Unidata is not obligated to provide the user with * any support, consulting, training or assistance of any kind with regard * to the use, operation and performance of this software nor to provide * the user with any updates, revisions, new versions or "bug fixes." * * THIS SOFTWARE IS PROVIDED BY UCAR/UNIDATA "AS IS" AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL UCAR/UNIDATA BE LIABLE FOR ANY SPECIAL, * INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING * FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, * NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION * WITH THE ACCESS, USE OR PERFORMANCE OF THIS SOFTWARE. */ package ucar.nc2.grib.grib2; import com.google.protobuf.ByteString; import thredds.featurecollection.FeatureCollectionConfig; import thredds.inventory.CollectionManager; import thredds.inventory.CollectionManagerSingleFile; import thredds.inventory.MFile; import ucar.nc2.grib.EnsCoord; import ucar.nc2.grib.GribCollection; import ucar.nc2.grib.GribCollectionBuilder; import ucar.nc2.grib.GribCollectionProto; import ucar.nc2.grib.GribIndex; import ucar.nc2.grib.GribNumbers; import ucar.nc2.grib.TimeCoord; import ucar.nc2.grib.VertCoord; import ucar.nc2.grib.grib2.table.Grib2Customizer; import ucar.nc2.stream.NcStream; import ucar.unidata.io.RandomAccessFile; import ucar.unidata.util.Parameter; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; /** * Build a GribCollection object for Grib-2 files. Manage grib collection index. * Covers GribCollectionProto, which serializes and deserializes. * Rectilyse means to turn the collection into a multidimensional variable. * * @author caron * @since 4/6/11 */ public class Grib2CollectionBuilder extends GribCollectionBuilder { public static final String MAGIC_START = "Grib2CollectionIndex"; protected static final int minVersionSingle = 11; protected static final int version = 12; private static final boolean showFiles = false; // called by tdm static public boolean update(CollectionManager dcm, org.slf4j.Logger logger) throws IOException { Grib2CollectionBuilder builder = new Grib2CollectionBuilder(dcm, logger); if (!builder.needsUpdate()) return false; builder.readOrCreateIndex(CollectionManager.Force.always); builder.gc.close(); return true; } // from a single file, read in the index, create if it doesnt exist static public GribCollection readOrCreateIndexFromSingleFile(MFile file, CollectionManager.Force force, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) throws IOException { Grib2CollectionBuilder builder = new Grib2CollectionBuilder(file, config, logger); builder.readOrCreateIndex(force); return builder.gc; } // from a collection, read in the index, create if it doesnt exist or is out of date // assume that the CollectionManager is up to date, eg doesnt need to be scanned static public GribCollection factory(CollectionManager dcm, CollectionManager.Force force, org.slf4j.Logger logger) throws IOException { Grib2CollectionBuilder builder = new Grib2CollectionBuilder(dcm, logger); builder.readOrCreateIndex(force); return builder.gc; } // read in the index, index raf already open static public GribCollection createFromIndex(String name, File directory, RandomAccessFile raf, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) throws IOException { Grib2CollectionBuilder builder = new Grib2CollectionBuilder(name, directory, config, logger); if (builder.readIndex(raf)) return builder.gc; throw new IOException("Reading index failed"); } // this writes the index always static public boolean writeIndexFile(File indexFile, CollectionManager dcm, org.slf4j.Logger logger) throws IOException { Grib2CollectionBuilder builder = new Grib2CollectionBuilder(dcm, logger); return builder.createIndex(indexFile); } //////////////////////////////////////////////////////////////// protected GribCollection gc; protected Grib2Customizer tables; // only gets created in makeAggGroups // single file private Grib2CollectionBuilder(MFile file, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) throws IOException { super(new CollectionManagerSingleFile(file, logger), true, logger); try { if (config != null) dcm.putAuxInfo(FeatureCollectionConfig.AUX_GRIB_CONFIG, config); this.gc = new Grib2Collection(file.getName(), new File(dcm.getRoot()), config); } catch (Exception e) { logger.error("Failed to index single file", e); throw new IOException(e); } } private Grib2CollectionBuilder(CollectionManager dcm, org.slf4j.Logger logger) { super(dcm, false, logger); FeatureCollectionConfig.GribConfig config = (FeatureCollectionConfig.GribConfig) dcm.getAuxInfo(FeatureCollectionConfig.AUX_GRIB_CONFIG); this.gc = new Grib2Collection(dcm.getCollectionName(), new File(dcm.getRoot()), config); } private Grib2CollectionBuilder(String name, File directory, FeatureCollectionConfig.GribConfig config, org.slf4j.Logger logger) { super(null, false, logger); this.gc = new Grib2Collection(name, directory, config); } protected Grib2CollectionBuilder(CollectionManager dcm, boolean isSingleFile, org.slf4j.Logger logger) { super(dcm, isSingleFile, logger); } // read or create index private void readOrCreateIndex(CollectionManager.Force ff) throws IOException { // force new index or test for new index needed boolean force = ((ff == CollectionManager.Force.always) || (ff == CollectionManager.Force.test && needsUpdate())); // otherwise, we're good as long as the index file exists File idx = gc.getIndexFile(); if (force || !idx.exists() || !readIndex(idx.getPath()) ) { // write out index idx = gc.makeNewIndexFile(logger); // make sure we have a writeable index logger.info("{}: createIndex {}", gc.getName(), idx.getPath()); createIndex(idx); // read back in index RandomAccessFile indexRaf = new RandomAccessFile(idx.getPath(), "r"); gc.setIndexRaf(indexRaf); boolean success = readIndex(indexRaf); if( !success ){ indexRaf.close(); } } } public boolean needsUpdate() { if (dcm == null) return false; File idx = gc.getIndexFile(); return !idx.exists() || needsUpdate(idx.lastModified()); } private boolean needsUpdate(long idxLastModified) { CollectionManager.ChangeChecker cc = GribIndex.getChangeChecker(); for (MFile mfile : dcm.getFiles()) { if (cc.hasChangedSince(mfile, idxLastModified)) return true; } return false; } //////////////////////////////////////////////////////////////////////////////////////////////////// // reading public String getMagicStart() { return MAGIC_START; } public boolean readIndex(String filename) throws IOException { RandomAccessFile raf = new RandomAccessFile( filename, "r" ); boolean success = readIndex( raf ); raf.close(); return success; } protected boolean readIndex(RandomAccessFile raf) { gc.setIndexRaf(raf); // LOOK leaving the raf open in the GribCollection try { raf.order(RandomAccessFile.BIG_ENDIAN); raf.seek(0); //// header message if (!NcStream.readAndTest(raf, getMagicStart().getBytes())) { logger.error("Grib2Collection {}: invalid index", gc.getName()); return false; } gc.version = raf.readInt(); boolean versionOk = isSingleFile ? gc.version >= minVersionSingle : gc.version == version; if (!versionOk) { logger.warn("Grib2Collection {}: index found version={}, want version= {} on file {}", gc.getName(), gc.version, version, raf.getLocation()); return false; } long skip = raf.readLong(); raf.skipBytes(skip); int size = NcStream.readVInt(raf); if ((size < 0) || (size > 100 * 1000 * 1000)) { logger.warn("Grib2Collection {}: invalid index ", gc.getName()); return false; } byte[] m = new byte[size]; raf.readFully(m); GribCollectionProto.GribCollectionIndex proto = GribCollectionProto.GribCollectionIndex.parseFrom(m); gc.center = proto.getCenter(); gc.subcenter = proto.getSubcenter(); gc.master = proto.getMaster(); gc.local = proto.getLocal(); gc.genProcessType = proto.getGenProcessType(); gc.genProcessId = proto.getGenProcessId(); gc.backProcessId = proto.getBackProcessId(); gc.local = proto.getLocal(); // gc.tables = Grib2Tables.factory(gc.center, gc.subcenter, gc.master, gc.local); File dir = gc.getDirectory(); String dirname = proto.getDirName(); if (dir != null && !dir.getPath().equals(dirname)) { logger.debug("Grib2Collection {}: has different directory= {} than index= {} ", gc.getName(), dir.getPath(), dirname); //return false; } // switch from files to mfiles in version 12 if (!(this instanceof Grib2TimePartitionBuilder)) { if (gc.version < 12) { int n = proto.getFilesCount(); if (n == 0) { logger.warn("Grib2Collection {}: has no files, force recreate ", gc.getName()); return false; } else { List<MFile> files = new ArrayList<MFile>(proto.getFilesCount()); for (int i = 0; i < n; i++) files.add(new GribCollectionBuilder.GcMFile(dir, proto.getFiles(i), -1)); gc.setFiles(files); if (dcm != null) dcm.setFiles(files); } } else { int n = proto.getMfilesCount(); if (n == 0) { logger.warn("Grib2Collection {}: has no files, force recreate ", gc.getName()); return false; } else { List<MFile> files = new ArrayList<MFile>(n); for (int i = 0; i < n; i++) files.add(new GribCollectionBuilder.GcMFile(dir, proto.getMfiles(i))); gc.setFiles(files); if (dcm != null) dcm.setFiles(files); } } } gc.groups = new ArrayList<GribCollection.GroupHcs>(proto.getGroupsCount()); for (int i = 0; i < proto.getGroupsCount(); i++) gc.groups.add(readGroup(proto.getGroups(i), gc.makeGroup())); gc.groups = Collections.unmodifiableList(gc.groups); gc.params = new ArrayList<Parameter>(proto.getParamsCount()); for (int i = 0; i < proto.getParamsCount(); i++) gc.params.add(readParam(proto.getParams(i))); if (!readPartitions(proto, dirname)) { logger.warn("Time2Partition {}: has no partitions, force recreate ", gc.getName()); return false; } return true; } catch (Throwable t) { logger.error("Error reading index " + raf.getLocation(), t); return false; } } protected boolean readPartitions(GribCollectionProto.GribCollectionIndex proto, String dirname) { return true; } protected void readTimePartitions(GribCollection.GroupHcs group, GribCollectionProto.Group proto) { // NOOP } GribCollection.GroupHcs readGroup(GribCollectionProto.Group p, GribCollection.GroupHcs group) throws IOException { byte[] rawGds = p.getGds().toByteArray(); Grib2SectionGridDefinition gdss = new Grib2SectionGridDefinition(rawGds); Grib2Gds gds = gdss.getGDS(); int gdsHash = (p.getGdsHash() != 0) ? p.getGdsHash() : gds.hashCode(); group.setHorizCoordSystem(gds.makeHorizCoordSys(), rawGds, gdsHash); group.varIndex = new ArrayList<GribCollection.VariableIndex>(); for (int i = 0; i < p.getVariablesCount(); i++) group.varIndex.add(readVariable(p.getVariables(i), group)); Collections.sort(group.varIndex); group.timeCoords = new ArrayList<TimeCoord>(p.getTimeCoordsCount()); for (int i = 0; i < p.getTimeCoordsCount(); i++) group.timeCoords.add(readTimeCoord(p.getTimeCoords(i))); group.vertCoords = new ArrayList<VertCoord>(p.getVertCoordsCount()); for (int i = 0; i < p.getVertCoordsCount(); i++) group.vertCoords.add(readVertCoord(p.getVertCoords(i))); group.ensCoords = new ArrayList<EnsCoord>(p.getEnsCoordsCount()); for (int i = 0; i < p.getEnsCoordsCount(); i++) group.ensCoords.add(readEnsCoord(p.getEnsCoords(i))); group.filenose = new int[p.getFilenoCount()]; for (int i = 0; i < p.getFilenoCount(); i++) group.filenose[i] = p.getFileno(i); readTimePartitions(group, p); // finish for (GribCollection.VariableIndex vi : group.varIndex) { TimeCoord tc = group.timeCoords.get(vi.timeIdx); vi.ntimes = tc.getSize(); VertCoord vc = (vi.vertIdx < 0) ? null : group.vertCoords.get(vi.vertIdx); vi.nverts = (vc == null) ? 0 : vc.getSize(); EnsCoord ec = (vi.ensIdx < 0) ? null : group.ensCoords.get(vi.ensIdx); vi.nens = (ec == null) ? 0 : ec.getSize(); } // group.assignVertNames(); return group; } private Parameter readParam(GribCollectionProto.Parameter pp) throws IOException { if (pp.hasSdata()) return new Parameter(pp.getName(), pp.getSdata()); int count = 0; double[] vals = new double[pp.getDataCount()]; for (double val : pp.getDataList()) vals[count++] = val; return new Parameter(pp.getName(), vals); } private TimeCoord readTimeCoord(GribCollectionProto.Coord pc) throws IOException { if (pc.getBoundCount() > 0) { // its an interval List<TimeCoord.Tinv> coords = new ArrayList<TimeCoord.Tinv>(pc.getValuesCount()); for (int i = 0; i < pc.getValuesCount(); i++) coords.add(new TimeCoord.Tinv((int) pc.getValues(i), (int) pc.getBound(i))); TimeCoord tc = new TimeCoord(pc.getCode(), pc.getUnit(), coords); return tc.setIndex( pc.getIndex()); } else { List<Integer> coords = new ArrayList<Integer>(pc.getValuesCount()); for (float value : pc.getValuesList()) coords.add((int) value); TimeCoord tc = new TimeCoord(pc.getCode(), pc.getUnit(), coords); return tc.setIndex( pc.getIndex()); } } private VertCoord readVertCoord(GribCollectionProto.Coord pc) throws IOException { boolean isLayer = (pc.getBoundCount() > 0); List<VertCoord.Level> coords = new ArrayList<VertCoord.Level>(pc.getValuesCount()); for (int i = 0; i < pc.getValuesCount(); i++) coords.add(new VertCoord.Level(pc.getValues(i), isLayer ? pc.getBound(i) : 0)); return new VertCoord(pc.getCode(), coords, isLayer); } private EnsCoord readEnsCoord(GribCollectionProto.Coord pc) throws IOException { List<EnsCoord.Coord> coords = new ArrayList<EnsCoord.Coord>(pc.getValuesCount()); for (int i = 0; i < pc.getValuesCount(); i += 2) coords.add(new EnsCoord.Coord((int) pc.getValues(i), (int) pc.getValues(i + 1))); return new EnsCoord(coords); } protected GribCollection.VariableIndex readVariable(GribCollectionProto.Variable pv, GribCollection.GroupHcs group) { int discipline = pv.getDiscipline(); int category = pv.getCategory(); int param = pv.getParameter(); int levelType = pv.getLevelType(); int intvType = pv.getIntervalType(); String intvName = pv.getIntvName(); boolean isLayer = pv.getIsLayer(); int ensDerivedType = pv.getEnsDerivedType(); int probType = pv.getProbabilityType(); String probabilityName = pv.getProbabilityName(); int cdmHash = pv.getCdmHash(); long recordsPos = pv.getRecordsPos(); int recordsLen = pv.getRecordsLen(); int timeIdx = pv.getTimeIdx(); int vertIdx = pv.getVertIdx(); int ensIdx = pv.getEnsIdx(); int tableVersion = pv.getTableVersion(); int genProcessType = pv.getGenProcessType(); return gc.makeVariableIndex(group, tableVersion, discipline, category, param, levelType, isLayer, intvType, intvName, ensDerivedType, probType, probabilityName, genProcessType, cdmHash, timeIdx, vertIdx, ensIdx, recordsPos, recordsLen); } /////////////////////////////////////////////////////////////////////////////////// // writing private class Group { public Grib2SectionGridDefinition gdss; public int gdsHash; // may have been modified public Grib2Rectilyser rect; public List<Grib2Record> records = new ArrayList<Grib2Record>(); public String nameOverride; public Set<Integer> fileSet; // this is so we can show just the component files that are in this group private Group(Grib2SectionGridDefinition gdss, int gdsHash) { this.gdss = gdss; this.gdsHash = gdsHash; } } /////////////////////////////////////////////////// // create the index private boolean createIndex(File indexFile) throws IOException { if (dcm == null) { logger.error("Grib2CollectionBuilder "+gc.getName()+" : cannot create new index "); throw new IllegalStateException(); } long start = System.currentTimeMillis(); ArrayList<MFile> files = new ArrayList<MFile>(); List<Group> groups = makeAggregatedGroups(files); createIndex(indexFile, groups, files); long took = System.currentTimeMillis() - start; logger.debug("That took {} msecs", took); return true; } // read all records in all files, // divide into groups based on GDS hash // each group has an arraylist of all records that belong to it. // for each group, run rectlizer to derive the coordinates and variables public List<Group> makeAggregatedGroups(List<MFile> files) throws IOException { Map<Integer, Group> gdsMap = new HashMap<Integer, Group>(); Map<String, Boolean> pdsConvert = null; //boolean intvMerge = intvMergeDefault; //boolean useGenType = false; logger.debug("GribCollection {}: makeAggregatedGroups", gc.getName()); int fileno = 0; Grib2Rectilyser.Counter stats = new Grib2Rectilyser.Counter(); // debugging logger.debug(" dcm={}", dcm); FeatureCollectionConfig.GribConfig config = (FeatureCollectionConfig.GribConfig) dcm.getAuxInfo(FeatureCollectionConfig.AUX_GRIB_CONFIG); Map<Integer, Integer> gdsConvert = (config != null) ? config.gdsHash : null; FeatureCollectionConfig.GribIntvFilter intvMap = (config != null) ? config.intvFilter : null; if (config != null) pdsConvert = config.pdsHash; //intvMerge = (config == null) || (config.intvMerge == null) ? intvMergeDefault : config.intvMerge; //useGenType = (config == null) || (config.useGenType == null) ? false : config.useGenType; for (MFile mfile : dcm.getFiles()) { if (showFiles) logger.debug("{}: {}", fileno, mfile.getPath()); Grib2Index index = null; try { index = (Grib2Index) GribIndex.readOrCreateIndexFromSingleFile(false, !isSingleFile, mfile, config, CollectionManager.Force.test, logger); files.add(mfile); // add on success } catch (IOException ioe) { logger.error("Grib2CollectionBuilder "+gc.getName()+" : reading/Creating gbx9 index for file "+ mfile.getPath()+" failed", ioe); continue; } for (Grib2Record gr : index.getRecords()) { if (this.tables == null) { Grib2SectionIdentification ids = gr.getId(); // so all records must use the same table (!) this.tables = Grib2Customizer.factory(ids.getCenter_id(), ids.getSubcenter_id(), ids.getMaster_table_version(), ids.getLocal_table_version()); if (config != null) tables.setTimeUnitConverter(config.getTimeUnitConverter()); } if (intvMap != null && filterOut(gr, intvMap)) { stats.filter++; continue; // skip } gr.setFile(fileno); // each record tracks which file it belongs to int gdsHash = gr.getGDSsection().getGDS().hashCode(); // use GDS hash code to group records if (gdsConvert != null && gdsConvert.get(gdsHash) != null) // allow external config to muck with gdsHash. Why? because of error in encoding gdsHash = gdsConvert.get(gdsHash); // and we need exact hash matching Group g = gdsMap.get(gdsHash); if (g == null) { g = new Group(gr.getGDSsection(), gdsHash); gdsMap.put(gdsHash, g); } g.records.add(gr); } fileno++; stats.recordsTotal += index.getRecords().size(); } List<Group> result = new ArrayList<Group>(gdsMap.values()); for (Group g : result) { g.rect = new Grib2Rectilyser(tables, g.records, g.gdsHash, pdsConvert); g.rect.make(stats, files); } // debugging and validation if (logger.isDebugEnabled()) logger.debug(stats.show()); return result; } // true means remove private boolean filterOut(Grib2Record gr, FeatureCollectionConfig.GribIntvFilter intvFilter) { int[] intv = tables.getForecastTimeIntervalOffset(gr); if (intv == null) return false; int haveLength = intv[1] - intv[0]; // HACK if (haveLength == 0 && intvFilter.isZeroExcluded()) { // discard 0,0 if ((intv[0] == 0) && (intv[1] == 0)) { //f.format(" FILTER INTV [0, 0] %s%n", gr); return true; } return false; } else if (intvFilter.hasFilter()) { int discipline = gr.getIs().getDiscipline(); Grib2Pds pds = gr.getPDS(); int category = pds.getParameterCategory(); int number = pds.getParameterNumber(); int id = (discipline << 16) + (category << 8) + number; int prob = Integer.MIN_VALUE; if (pds.isProbability()) { prob = (int) (1000 * pds.getProbabilityUpperLimit()); } return intvFilter.filterOut(id, haveLength, prob); } return false; } /* MAGIC_START version sizeRecords VariableRecords (sizeRecords bytes) sizeIndex GribCollectionIndex (sizeIndex bytes) */ private void createIndex(File indexFile, List<Group> groups, List<MFile> files) throws IOException { Grib2Record first = null; // take global metadata from here boolean deleteOnClose = false; if (indexFile.exists()) { if (!indexFile.delete()) { logger.error("gc2 cant delete index file {}", indexFile.getPath()); } } logger.debug(" createIndex for {}", indexFile.getPath()); RandomAccessFile raf = new RandomAccessFile(indexFile.getPath(), "rw"); raf.order(RandomAccessFile.BIG_ENDIAN); try { //// header message raf.write(MAGIC_START.getBytes("UTF-8")); raf.writeInt(version); long lenPos = raf.getFilePointer(); raf.writeLong(0); // save space to write the length of the record section long countBytes = 0; int countRecords = 0; for (Group g : groups) { g.fileSet = new HashSet<Integer>(); for (Grib2Rectilyser.VariableBag vb : g.rect.getGribvars()) { if (first == null) first = vb.first; GribCollectionProto.VariableRecords vr = writeRecordsProto(vb, g.fileSet); byte[] b = vr.toByteArray(); vb.pos = raf.getFilePointer(); vb.length = b.length; raf.write(b); countBytes += b.length; countRecords += vb.recordMap.length; } } long bytesPerRecord = countBytes / ((countRecords == 0) ? 1 : countRecords); if (logger.isDebugEnabled()) logger.debug(" write RecordMaps: bytes = {} record = {} bytesPerRecord={}", new Object[] {countBytes, countRecords, bytesPerRecord}); if (first == null) { deleteOnClose = true; logger.error("GribCollection {}: has no files", gc.getName()); throw new IOException("GribCollection " + gc.getName() + " has no files"); } long pos = raf.getFilePointer(); raf.seek(lenPos); raf.writeLong(countBytes); raf.seek(pos); // back to the output. GribCollectionProto.GribCollectionIndex.Builder indexBuilder = GribCollectionProto.GribCollectionIndex.newBuilder(); indexBuilder.setName(gc.getName()); indexBuilder.setDirName(gc.getDirectory().getPath()); // directory and mfile list indexBuilder.setDirName(gc.getDirectory().getPath()); List<GribCollectionBuilder.GcMFile> gcmfiles = GribCollectionBuilder.makeFiles(gc.getDirectory(), files); for (GribCollectionBuilder.GcMFile gcmfile : gcmfiles) { indexBuilder.addMfiles(gcmfile.makeProto()); } for (Group g : groups) indexBuilder.addGroups(writeGroupProto(g)); /* int count = 0; for (DatasetCollectionManager dcm : collections) { indexBuilder.addParams(makeParamProto(new Parameter("spec" + count, dcm.()))); count++; } */ // what about just storing first ?? Grib2SectionIdentification ids = first.getId(); indexBuilder.setCenter(ids.getCenter_id()); indexBuilder.setSubcenter(ids.getSubcenter_id()); indexBuilder.setMaster(ids.getMaster_table_version()); indexBuilder.setLocal(ids.getLocal_table_version()); Grib2Pds pds = first.getPDS(); indexBuilder.setGenProcessType(pds.getGenProcessType()); indexBuilder.setGenProcessId(pds.getGenProcessId()); indexBuilder.setBackProcessId(pds.getBackProcessId()); GribCollectionProto.GribCollectionIndex index = indexBuilder.build(); byte[] b = index.toByteArray(); NcStream.writeVInt(raf, b.length); // message size raf.write(b); // message - all in one gulp logger.debug(" write GribCollectionIndex= {} bytes", b.length); } finally { logger.debug(" file size = {} bytes", raf.length()); if (raf != null) raf.close(); // remove it on failure if (deleteOnClose && !indexFile.delete()) logger.error(" gc2 cant deleteOnClose index file {}", indexFile.getPath()); } } /* private void createIndexForGroup(Group group, ArrayList<String> filenames) throws IOException { Grib2Record first = null; // take global metadata from here File file = new File(gc.getDirectory(), group.name + GribCollection.IDX_EXT); if (file.exists()) file.delete(); // replace it RandomAccessFile raf = new RandomAccessFile(file.getPath(), "rw"); raf.order(RandomAccessFile.BIG_ENDIAN); try { //// header message String magic = gc.getMagicBytes(); raf.write(magic.getBytes("UTF-8")); raf.writeInt(version); long lenPos = raf.getFilePointer(); raf.writeLong(0); // save space to write the length of the record section long countBytes = 0; int countRecords = 0; group.fileSet = new HashSet<Integer>(); for (Rectilyser.VariableBag vb : group.rect.getGribvars()) { if (first == null) first = vb.first; GribCollectionProto.VariableRecords vr = makeRecordsProto(vb, group.fileSet); byte[] b = vr.toByteArray(); vb.pos = raf.getFilePointer(); vb.length = b.length; raf.write(b); countBytes += b.length; } countRecords += group.records.size(); if (countRecords == 0) countRecords = 1; long bytesPerRecord = countBytes / countRecords; logger.debug("VariableRecords: bytes = {} record = {} bytesPerRecord={}", new Object[] {countBytes, countRecords, bytesPerRecord}); long pos = raf.getFilePointer(); raf.seek(lenPos); raf.writeLong(countBytes); raf.seek(pos); // back to the output. GribCollectionProto.GribCollectionIndex.Builder indexBuilder = GribCollectionProto.GribCollectionIndex.newBuilder(); indexBuilder.setName(group.name); for (String fn : filenames) indexBuilder.addFiles(fn); indexBuilder.addGroups(makeGroupProto(group)); int count = 0; for (CollectionManager dcm : collections) { indexBuilder.addParams(makeParamProto(new Parameter("spec" + count, dcm.toString()))); count++; } Grib2SectionIdentification ids = first.getId(); indexBuilder.setCenter(ids.getCenter_id()); indexBuilder.setSubcenter(ids.getSubcenter_id()); indexBuilder.setMaster(ids.getMaster_table_version()); indexBuilder.setLocal(ids.getLocal_table_version()); GribCollectionProto.GribCollectionIndex index = indexBuilder.build(); byte[] b = index.toByteArray(); NcStream.writeVInt(raf, b.length); // message size raf.write(b); // message - all in one gulp logger.debug("GribCollectionIndex= {} bytes%n", b.length); } finally { logger.debug("file size = {} bytes%n", raf.length()); raf.close(); if (raf != null) raf.close(); } } */ private GribCollectionProto.VariableRecords writeRecordsProto(Grib2Rectilyser.VariableBag vb, Set<Integer> fileSet) throws IOException { GribCollectionProto.VariableRecords.Builder b = GribCollectionProto.VariableRecords.newBuilder(); b.setCdmHash(vb.cdmHash); for (Grib2Rectilyser.Record ar : vb.recordMap) { GribCollectionProto.Record.Builder br = GribCollectionProto.Record.newBuilder(); if (ar == null || ar.gr == null) { br.setFileno(0); br.setPos(0); // missing : ok to use 0 since drsPos > 0 } else { br.setFileno(ar.gr.getFile()); fileSet.add(ar.gr.getFile()); Grib2SectionDataRepresentation drs = ar.gr.getDataRepresentationSection(); br.setPos(drs.getStartingPosition()); if (ar.gr.isBmsReplaced()) { Grib2SectionBitMap bms = ar.gr.getBitmapSection(); br.setBmsPos(bms.getStartingPosition()); } } b.addRecords(br); } return b.build(); } private GribCollectionProto.Group writeGroupProto(Group g) throws IOException { GribCollectionProto.Group.Builder b = GribCollectionProto.Group.newBuilder(); b.setGds(ByteString.copyFrom(g.gdss.getRawBytes())); b.setGdsHash(g.gdsHash); for (Grib2Rectilyser.VariableBag vb : g.rect.getGribvars()) b.addVariables(writeVariableProto(g.rect, vb)); List<TimeCoord> timeCoords = g.rect.getTimeCoords(); for (int i = 0; i < timeCoords.size(); i++) b.addTimeCoords(writeCoordProto(timeCoords.get(i), i)); List<VertCoord> vertCoords = g.rect.getVertCoords(); for (int i = 0; i < vertCoords.size(); i++) b.addVertCoords(writeCoordProto(vertCoords.get(i), i)); List<EnsCoord> ensCoords = g.rect.getEnsCoords(); for (int i = 0; i < ensCoords.size(); i++) b.addEnsCoords(writeCoordProto(ensCoords.get(i), i)); for (Integer aFileSet : g.fileSet) b.addFileno(aFileSet); if (g.nameOverride != null) b.setName(g.nameOverride); return b.build(); } private GribCollectionProto.Variable writeVariableProto(Grib2Rectilyser rect, Grib2Rectilyser.VariableBag vb) throws IOException { GribCollectionProto.Variable.Builder b = GribCollectionProto.Variable.newBuilder(); b.setDiscipline(vb.first.getDiscipline()); Grib2Pds pds = vb.first.getPDS(); b.setCategory(pds.getParameterCategory()); b.setParameter(pds.getParameterNumber()); b.setLevelType(pds.getLevelType1()); b.setIsLayer(Grib2Utils.isLayer(vb.first)); b.setIntervalType(pds.getStatisticalProcessType()); b.setCdmHash(vb.cdmHash); b.setRecordsPos(vb.pos); b.setRecordsLen(vb.length); b.setTimeIdx(vb.timeCoordIndex); if (vb.vertCoordIndex >= 0) b.setVertIdx(vb.vertCoordIndex); if (vb.ensCoordIndex >= 0) b.setEnsIdx(vb.ensCoordIndex); if (pds.isEnsembleDerived()) { Grib2Pds.PdsEnsembleDerived pdsDerived = (Grib2Pds.PdsEnsembleDerived) pds; b.setEnsDerivedType(pdsDerived.getDerivedForecastType()); // derived type (table 4.7) } if (pds.isProbability()) { Grib2Pds.PdsProbability pdsProb = (Grib2Pds.PdsProbability) pds; b.setProbabilityName(pdsProb.getProbabilityName()); b.setProbabilityType(pdsProb.getProbabilityType()); } if (pds.isTimeInterval()) b.setIntvName(rect.getTimeIntervalName(vb.timeCoordIndex)); int genType = pds.getGenProcessType(); if (genType != GribNumbers.UNDEFINED) b.setGenProcessType(pds.getGenProcessType()); return b.build(); } protected GribCollectionProto.Parameter writeParamProto(Parameter param) throws IOException { GribCollectionProto.Parameter.Builder b = GribCollectionProto.Parameter.newBuilder(); b.setName(param.getName()); if (param.isString()) b.setSdata(param.getStringValue()); else { for (int i = 0; i < param.getLength(); i++) b.addData(param.getNumericValue(i)); } return b.build(); } protected GribCollectionProto.Coord writeCoordProto(TimeCoord tc, int index) throws IOException { GribCollectionProto.Coord.Builder b = GribCollectionProto.Coord.newBuilder(); b.setIndex(index); b.setCode(tc.getCode()); b.setUnit(tc.getUnits()); float scale = (float) tc.getTimeUnitScale(); // deal with, eg, "6 hours" by multiplying values by 6 if (tc.isInterval()) { for (TimeCoord.Tinv tinv : tc.getIntervals()) { b.addValues(tinv.getBounds1() * scale); b.addBound(tinv.getBounds2() * scale); } } else { for (int value : tc.getCoords()) b.addValues(value * scale); } return b.build(); } protected GribCollectionProto.Coord writeCoordProto(VertCoord vc, int index) throws IOException { GribCollectionProto.Coord.Builder b = GribCollectionProto.Coord.newBuilder(); b.setIndex(index); b.setCode(vc.getCode()); String units = vc.getUnits(); if (units == null) units = ""; b.setUnit(units); for (VertCoord.Level coord : vc.getCoords()) { if (vc.isLayer()) { b.addValues((float) coord.getValue1()); b.addBound((float) coord.getValue2()); } else { b.addValues((float) coord.getValue1()); } } return b.build(); } protected GribCollectionProto.Coord writeCoordProto(EnsCoord ec, int index) throws IOException { GribCollectionProto.Coord.Builder b = GribCollectionProto.Coord.newBuilder(); b.setIndex(index); b.setCode(0); b.setUnit(""); for (EnsCoord.Coord coord : ec.getCoords()) { b.addValues((float) coord.getCode()); b.addValues((float) coord.getEnsMember()); } return b.build(); } }
netcdf-java
archives: