[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Datastream #MQU-657040]: L2 products sent in NIMAGE



Hi,

re:
> Is there a chance we could get the code that strips the headers and
> reconstructs the tiles? STAR is not always the most reliable and its
> nice to have a backup.

OK.  There are two Python scripts that were originally created to do
the work:

goes-restitch.py
stip_header.py

Both of these can be found in the ldm-alchemy portion of the Unidata
section of GitHub:

https://github.com/unidata/ldm-alchemy

I am attaching an LDM pattern-action file, pqact.satellite, that contains,
among other things, the action that is aimed at running the goes-restitch.py
script that is on GitHub and another action that runs strip_header.py to stip
the headers and footers off of the other NOAAPort-broadcast L2 products.

NOTEs:

- the goes-restitch.py script that is available in the ldm-alchemy section
  of GitHub does NOT create output file names that follow GOES-R mission
  standard naming as is defined in the PUG

- the action that strips headers and footers from the non-ABI L2 products
  is NOT bulletproof enough to correctly process all of the L2 files

  Why, you ask?

  Because the LDM/IDD Product IDs, which are exactly the WMO IDs that the
  weather service is assigning to the L2 products, do NOT have enough
  information to uniquely identify each L2 product as there can be
  several of the same L2 product with different coverages all within
  the same minute.

- I modified the goes-restitch.py from GitHub to create GOES-R mission standard
  names for the reassembled ABI L2 imagery

  I have attached this modified version of goes-restitch.py (renamed
  to goes-restitch_nimage.py) and the LDM pattern-action file that
  runs this script (named pqact.conf_npgoesr) to this email.

  This is the exact code+action that is currently and will continue to be
  used to create the ABI L2 image content for the reconstituted NIMAGE
  datastream.

- because of the problems I give a high level overview on in the first
  bulleted note above, I wrote a Bourne shell script and created an
  associated LDM pattern action file that _does_ correctly process
  all of the non-ABI L2 products delivered in NOAAPort

  I have attached the Bourne shell script (L2File.sh) and pattern-
  action file action (pqact.conf_l2) to this reply.

- the LDM configuration file EXEC lines for the pattern-action files
  that we are using to create content for the reconstituted NIMAGE
  datastream are:

  exec    "pqact -f EXP|HDS|NOTHER -p ^IXT /opt/ldm/etc/pqact.conf_l2"
  exec    "pqact -f NOTHER -p ^TI /opt/ldm/etc/pqact.conf_npgoesr"

  (The HOME directory for the 'ldm' user on the machine where we are
  creating the products for the reconstituted NIMAGE feed is /opt/ldm)

I hope that the above was not overly confusing.  Please don't hesitate
to ask questions.

Cheers,

Tom
--
****************************************************************************
Unidata User Support                                    UCAR Unidata Program
(303) 497-8642                                                 P.O. Box 3000
address@hidden                                   Boulder, CO 80307
----------------------------------------------------------------------------
Unidata HomePage                       http://www.unidata.ucar.edu
****************************************************************************


Ticket Details
===================
Ticket ID: MQU-657040
Department: Support Datastream
Priority: Normal
Status: Closed
===================
NOTE: All email exchanges with Unidata User Support are recorded in the Unidata 
inquiry tracking system and then made publicly available through the web.  If 
you do not want to have your interactions made available in this way, you must 
let us know in each email you send to us.

Attachment: pqact.satellite
Description: Binary data

#!/usr/bin/env python
# Copyright (c) 2017 University Corporation for Atmospheric Research/Unidata.
# Distributed under the terms of the MIT License.
# SPDX-License-Identifier: MIT

import asyncio
from collections import defaultdict
from datetime import datetime
import faulthandler
import functools
import logging
import os
import os.path
import sys
import tempfile

from netCDF4 import Dataset, date2num
import numpy as np

from ldm import init_logger, read_stream, remove_footer, remove_header


def goes_time_to_dt(s):
    r"""Parse the GOES-16 string-formatted time."""
    return datetime.strptime(s, '%Y%j%H%M%S')


def copy_attrs(src, dest, skip):
    r"""Copy all netCDF attributes from one object to another."""
    for attr in src.ncattrs():
        if not skip(attr):
            setattr(dest, attr, getattr(src, attr))

            # Make the spheroid axis lengths CF-compliant
            if attr in {'semi_major', 'semi_minor'} and hasattr(src, 
'grid_mapping_name'):
                setattr(dest, attr + '_axis', getattr(src, attr))


def dataset_name(dataset, template):
    r"""Create an appropriate file path from a GOES dataset."""
#// global attributes:
#               :title = "Sectorized Cloud and Moisture Imagery for the WFDM3 
region." ;
#               :ICD_version = "GROUND SEGMENT (GS) TO ADVANCED WEATHER 
INTERACTIVE PROCESSING SYSTEM (AWIPS) INTERFACE CONTROL DOCUMENT (ICD) Revision 
B" ;
#               :Conventions = "CF-1.6" ;
#               :channel_id = 15 ;
#               :central_wavelength = 12.3f ;
#               :abi_mode = 3 ;
#               :source_scene = "FullDisk" ;
#               :periodicity = 15.f ;
#               :production_location = "WCDAS" ;
#               :product_name = "WFD-060-B12-M3C15" ;
#               :satellite_id = "GOES-17" ;
#               :product_center_latitude = 0. ;
#               :product_center_longitude = -137. ;
#               :projection = "Fixed Grid" ;
#               :bit_depth = 12 ;
#               :source_spatial_resolution = 2.f ;
#               :request_spatial_resolution = 6.f ;
#               :start_date_time = "2019086160038" ;
#               :number_product_tiles = 4 ;
#               :tile_center_latitude = 22.5093750854732 ;
#               :tile_center_longitude = -161.363571074665 ;
#               :product_tile_width = 1024 ;
#               :product_tile_height = 1024 ;
#               :product_rows = 1808 ;
#               :product_columns = 1808 ;
#               :pixel_x_size = 6. ;
#               :pixel_y_size = 6. ;
#               :tile_row_offset = 0 ;
#               :tile_column_offset = 0 ;
#               :satellite_latitude = 0. ;
#               :satellite_longitude = -137. ;
#               :satellite_altitude = 35786023. ;

    # Extract needed values from global attributes
    satellite  = dataset.satellite_id.replace('-', '')
    sat_id     = dataset.satellite_id.replace('OES-', '')
    channel_id = dataset.channel_id
    abi_mode   = dataset.abi_mode
    scene      = dataset.source_scene

    # Need special handling for full disk images to better name NWS regional 
images
    if scene == 'FullDisk':
        region = dataset.product_name.split('-')[0]

        # Handle some weird product names that start unexpectedly
        if region.startswith(('G16_', 'G17_')):
            region = region.split('_', maxsplit=1)[-1]

        # TCONUS is just CONUS from the mode 4 full disk
        if region.endswith('CONUS'):
            scene = 'CONUS'

        # Not CONUS or FullDisk
        elif not region.endswith('FD'):
            scene = region

    # Coverage and redefine some scene names
    if scene == "CONUS":
        covr = 'C'
    elif scene == "FullDisk":
        covr = 'F'
    elif scene == "Mesoscale-1":
        covr = 'M1'
    elif scene == "Mesoscale-2":
        covr = 'M2'
    elif scene == "PRREGI":
        covr = 'PR'
        scene = 'PuertoRico'
    elif scene == "AKREGI":
        covr = 'AK'
        scene = 'Alaska'
    elif scene == "HIREGI":
        covr = 'HI'
        scene = 'Hawaii'

    # Parse start time into something we can use
    dt = goes_time_to_dt(dataset.start_date_time)

    return template.format(band=channel_id, mode=abi_mode, scene=scene, 
satellite=satellite, sat=sat_id, covr=covr, dt=dt)


def init_nc_file(source_nc, output_nc):
    r"""Initialize an output netCDF4 file from the input tile file."""
    # Copy global attributes, create dimensions, add our metadata
    copy_attrs(source_nc, output_nc, lambda a: a.startswith('tile'))
    output_nc.product_tiles_received = 0
    output_nc.created_by = 'ldm-alchemy'
    output_nc.createDimension('y', source_nc.product_rows)
    output_nc.createDimension('x', source_nc.product_columns)

    # Create a scalar time coordinate variable from the string attribute
    dt = goes_time_to_dt(source_nc.start_date_time)
    time_var = output_nc.createVariable('time', np.int32)
    time_var.units = 'seconds since 2017-01-01'
    time_var.standard_name = 'time'
    time_var.long_name = 'The start date / time that the satellite began 
capturing the scene'
    time_var.axis = 'T'
    time_var.calendar = 'standard'
    time_var[:] = date2num(dt, time_var.units)

    # Copy all the variables
    for var_name, old_var in source_nc.variables.items():
        extra_args = {}

        # Need special handling for fill value, since that needs to be on the 
variable
        # constructor
        if hasattr(old_var, '_FillValue'):
            extra_args['fill_value'] = old_var._FillValue

        # Enable compression for 2D variables only, not coordinates
        if len(old_var.dimensions) == 2:
            extra_args['zlib'] = True
            extra_args['complevel'] = 4
            extra_args['shuffle'] = True

            # Default chunk size chosen by library has 50% file size penalty!
            chunk_height = min(source_nc.product_tile_height, 
source_nc.product_rows)
            chunk_width = min(source_nc.product_tile_width, 
source_nc.product_columns)
            extra_args['chunksizes'] = (chunk_height, chunk_width)

        # Create the variable and copy its attributes
        var = output_nc.createVariable(var_name, old_var.datatype,
                                       old_var.dimensions, **extra_args)
        copy_attrs(old_var, var, lambda a: '_FillValue' in a)

        # Need to add time coordinate to any variable that cares
        if hasattr(var, 'grid_mapping'):
            var.coordinates = ' '.join(('time',) + var.dimensions)

    return output_nc


def copy_tile(input_ds, output_ds):
    r"""Copy tile data from input to output."""
    # There's no need to do any scaling--just copy the integer data
    input_ds.set_auto_scale(False)
    output_ds.set_auto_scale(False)

    # Set up slices based on the column and row offsets
    col_slice = slice(input_ds.tile_column_offset,
                      input_ds.tile_column_offset + input_ds.product_tile_width)
    row_slice = slice(input_ds.tile_row_offset,
                      input_ds.tile_row_offset + input_ds.product_tile_height)

    # Copy out data for x, y, and data variables using appropriate slices
    for var_name, src_var in input_ds.variables.items():
        dest_var = output_ds.variables[var_name]
        if var_name == 'x':
            dest_var[col_slice] = src_var[:]
        elif var_name == 'y':
            dest_var[row_slice] = src_var[:]
        elif src_var.ndim == 2:
            dest_var[row_slice, col_slice] = src_var[:]

    output_ds.product_tiles_received += 1
    output_ds.sync()


def find_files(source_dir):
    r"""Find all the netCDF4 files in a directory tree."""
    for root, dirs, files in os.walk(source_dir):
        for fname in sorted(files):
            if not fname.endswith('nc'):
                continue
            ds = Dataset(os.path.join(root, fname))
            yield ds


async def read_disk(source_dir, sinks):
    r"""Read files from disk and asynchronously put them in queue.

    Integrates find_files into our asynchronous framework.
    """
    for product in find_files(source_dir):
        for sink in sinks:
            logger.debug('Queued product: %s', product.filepath())
            await sink.put(product)
            # Without this, we just load files until we run out of file handles
            await asyncio.sleep(0.01)

    for sink in sinks:
        logger.debug('Flushing product sinks.')
        await sink.join()
    await asyncio.sleep(0.01)  # Just enough to let other things close out
    logger.debug('All done.')


#
# Caching and storage of tiles
#
class AssemblerManager(defaultdict):
    r"""Manages Assembler instances.

    Dispatches tiles that have arrived and dispatches them to the appropriate
    file assembler, creating them as necessary.
    """
    def __init__(self, out_dir, timeout, filename_template, loop):
        super().__init__()
        self.out_dir = out_dir
        self.timeout = timeout
        self.loop = loop
        self.filename = filename_template

    def __missing__(self, key):
        new = self._create_store(key)
        self[key] = new
        return new

    def _create_store(self, key):
        # Pass in call-back to call when done. We don't use the standard future 
callback
        # because it will end up queued--we need to run immediately.
        store = Assembler(self.out_dir, self.timeout, self.filename)
        store.task = asyncio.ensure_future(
            store.process_items(functools.partial(self.store_done, key=key)))
        return store

    async def join(self):
        # Need to iterate over copy of keys because items could be removed 
during iteration
        for key in list(self.keys()):
            logger.debug('Flushing chunk store queue.')
            store = self[key]
            await store.finish()
            store.task.cancel()

    async def put(self, item):
        # Find the appropriate store (will be created if necessary)
        if not isinstance(item, Dataset):
            prod_id, data = item
            item = read_netcdf_from_memory(data)
            logger.debug('Dispatching item: %s', prod_id)
        else:
            logger.debug('Dispatching item: %s', item.filepath())
        await self[dataset_name(item, self.filename)].enqueue(item)

    def store_done(self, key):
        logger.info('%s finished.', key)
        self.pop(key)


class Assembler:
    r"""Handles writing tiles to the final netCDF file."""
    def __init__(self, out_dir, timeout, filename_template):
        self._queue = asyncio.Queue(15)
        self.out_dir = out_dir
        self.timeout = timeout
        self.output = None
        self.output_name = ''
        self.template = filename_template

    async def enqueue(self, item):
        await self._queue.put(item)

    async def finish(self):
        await self._queue.join()
        self.finalize()

    def finalize(self):
        if self.output is not None:
            try:
                logger.debug('Closing file.')
                old_name = self.output.filepath()
                self.output.close()

                # Rename temporary file to final name if necessary
                if old_name != self.output_name:
                    logger.debug('Renaming output %s to %s', old_name, 
self.output_name)
                    os.rename(old_name, self.output_name)
            except Exception:
                logger.exception('Exception while finalizing file: ', 
exc_info=sys.exc_info())
            finally:
                self.output = None

            cmd = "/opt/ldm/util/CmiLinkAndPqinsert.sh %s NIMAGE 96 
/opt/ldm/logs/CmiLinkAndPqinsert.log" % (self.output_name)
            f = os.popen(cmd)
            message = f.readlines()
            f.close()

    async def process_items(self, on_done):
        while True:
            try:
                product = await asyncio.wait_for(self._queue.get(), 
self.timeout)
                logger.debug('Processing product: %s', product.filepath())
                try:
                    if not self.output:
                        self.output_name = os.path.join(self.out_dir,
                                                        dataset_name(product, 
self.template))

                        # Open file if it exists, otherwise create it
                        if os.path.exists(self.output_name):
                            logger.info('Using existing file: %s', 
self.output_name)
                            self.output = Dataset(self.output_name, 'a')
                        else:
                            if os.path.sep in self.output_name:
                                os.makedirs(os.path.dirname(self.output_name), 
exist_ok=True)
                            temp_name = self.output_name + '.partial'
                            logger.debug('Creating temporary file: %s', 
temp_name)
                            self.output = Dataset(temp_name, 'w')

                            init_nc_file(product, self.output)

                    # Copy the tile
                    copy_tile(product, self.output)

                    # Break if done
                    self._queue.task_done()
                    if (self.output.product_tiles_received >= 
self.output.number_product_tiles
                            and self._queue.empty()):
                        logger.info('%s: All tiles received.',
                                    os.path.basename(self.output_name))
                        break
                    else:
                        logger.info('%s: %d of %d tiles received.',
                                    os.path.basename(self.output_name),
                                    self.output.product_tiles_received,
                                    self.output.number_product_tiles)

                except Exception:
                    logger.exception('Save data exception:', 
exc_info=sys.exc_info())

            # In the event of timeout, bail out.
            except asyncio.TimeoutError:
                logger.warning('Finishing due to timeout.')
                break

        self.finalize()
        on_done()


def read_netcdf_from_memory(mem):
    r"""Return a netCDF4.Dataset from data in memory.

    Uses a temp file until we have support in netCDF4-python.
    """
    try:
        with tempfile.NamedTemporaryFile(delete=False) as temp:
            temp.write(remove_footer(remove_header(mem)))
        return Dataset(temp.name, 'r')
    except Exception:
        logger.exception('Error opening file for netCDF input')
    finally:
        os.remove(temp.name)


#
# Argument parsing
#
def setup_arg_parser():
    import argparse

    # Example invocation:
    #
    # -------------------------------------
    # - NOAAPORT GOES-16 netCDF4 Data -
    # -------------------------------------
    #
    #NOTHER  (^TI..).. KNES ([0-9][0-9][0-9][0-9][0-9][0-9]) ...
    #        PIPE    -metadata
    #        util/goes-restitch.py -v -d /data/ldm/pub/native/satellite/GOES -t 
120 -l logs/goes-restitch/\1.log \1 \2

    # Set up argument parsing
    parser = argparse.ArgumentParser(description='Assemble netCDF4 tiles of 
GOES data into '
                                                 'single files.')
    parser.add_argument('-d', '--data_dir', help='Base output directory', 
type=str,
                        default='/data/ldm/pub/native/satellite/GOES')
    parser.add_argument('-t', '--timeout', help='Timeout in seconds for waiting 
for data',
                        default=60, type=int)
    parser.add_argument('-s', '--source', help='Source directory for data 
tiles', type=str,
                        default='')
    parser.add_argument('-v', '--verbose', help='Make output more verbose. Can 
be used '
                                                'multiple times.', 
action='count', default=0)
    parser.add_argument('-q', '--quiet', help='Make output quieter. Can be used 
'
                                              'multiple times.', 
action='count', default=0)
    parser.add_argument('-f', '--filename', help='Filename format string. Uses 
Python string format specification',
                        default=os.path.join('{satellite}', 'Products', 
'CloudAndMoistureImagery', '{scene}',
                                             'Channel{band:02d}', '{dt:%Y%m%d}',
                                             
'OR_ABI-L2-CMIP{covr}-M{mode}C{band:02d}_{sat}_s{dt:%Y%j%H%M%S}0_e{dt:%Y%j%H%M%S}0_c{dt:%Y%j%H%M%S}0.nc'))
    parser.add_argument('-l', '--log', help='Filename to log information to. 
Uses standard'
                        ' out if not given.', type=str, default='')
    parser.add_argument('other', help='Other arguments for LDM identification', 
type=str,
                        nargs='*')
    return parser


if __name__ == '__main__':
    args = setup_arg_parser().parse_args()

    fmt = '[' + ' '.join(args.other) + '] %(asctime)s [%(funcName)s]: 
%(message)s'
    if os.path.sep in args.log:
        os.makedirs(os.path.dirname(args.log), exist_ok=True)

    logger = init_logger(logging.Formatter(fmt=fmt),
                         stream=open(args.log, 'at') if args.log else None)

    # Figure out how noisy we should be. Start by clipping between -2 and 2.
    total_level = min(2, max(-2, args.quiet - args.verbose))
    logger.setLevel(30 + total_level * 10)  # Maps 2 -> 50, 1->40, 0->30, 
-1->20, -2->10

    # Set up event loop
    loop = asyncio.get_event_loop()
    manager = AssemblerManager(args.data_dir, args.timeout, args.filename, loop)
    queues = [manager]

    # Try to catch core dumps
    faulthandler.enable(open('logs/goes-restitch-crash.log', 'w'))

    # Read from disk if we're pointed to a file, otherwise read from stdin pipe
    try:
        if args.source:
            loop.run_until_complete(read_disk(args.source, queues))
        else:
            # Read directly from standard in buffer
            read_in = sys.stdin.buffer
            loop.run_until_complete(read_stream(loop, read_in, queues, 
timeout=args.timeout))
    except Exception:
        logger.exception('Exception raised:', exc_info=sys.exc_info())
    finally:
        loop.close()

Attachment: pqact.conf_npgoesr
Description: Binary data

Attachment: L2File.sh
Description: application/shellscript

Attachment: pqact.conf_l2
Description: Binary data