[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

20030731: Questions/issues with pqact



Steven,

>Date: Thu, 31 Jul 2003 11:23:40 -0500
>From: Steven Danz <address@hidden>
>Organization: Aviation Weather Center
>To: address@hidden
>Subject: Questions/issues with pqact

The above message contained the following:

> I've run across an interesting issue with pqact, and while I don't
> have a solution, I have a minor (maybe) change for pqact and some
> questions (of course).
>
> What I'm seeing.  If I watch the logs for the gempak decoders I'm
> running I'm seeing the metar, taf, etc (actually all of them) decoders
> start/stop/start every minute or so.  While things are thrashing
> starting/stopping isn't great it shouldn't break anything.  The
> problem that is occurring is that every once in awhile a new decoder
> starts up before the old decoder stops which corrupts the output file.
>
> First, why the trashing. In its current form, the filel.c only allows
> for 32 open files or pipes. When there are a large number of files
> and pipes in use (which there is in my case, I have tons of files
> being written), there is the potential for thrashing the pipes as
> the decoders are shutdown and restarted. Why not use code similar to
> action.c to determine the number of possible open files by asking the
> OS, and then dropping off some for std[in/out/err], shared memory,
> rpc.  Dividing by 2 seems a bit pessimistic, but an option. I've
> attached a filel.c (from 5.2.2, 6.1.14 has the same issue) where I'm
> trying this and it seems to work.

Good points.  Unfortunately, I haven't worked with pqact(1) yet, so
I don't yet know what to say.  I'll put this on my list of things to
investigate.

One thing springs to mind: if I put the LDM package on SourceForge,
would you be willing to be a developer?

> Second, why the startup of a decoder before the old one shuts down? I
> haven't had a chance to work my way through the code yet, but I'm
> wondering if the mechanism used to close the pipe wait until the
> application at the other end is completely closed, or does it just
> close the pipe at the pqact end and move on, not waiting for the
> process to terminate?

I'm afraid I don't yet know the answer.

> As a work around I'm thinking of running two pqacts, one with a
> configuration with just pipes and execs, the other with all the files.
> Is this supported?

Yes.  In fact, that's typically the way the GEMPAK and McIDAS decoders 
work: they have their own pqact(1) configuration-files.

> I wasn't sure if running multiple pqacts on the
> same queue would work or not.

Shouldn't be a problem.

> Thanks for your time
> 
> Steven
> 
> -- 
> Steven Danz
> Senior Software Development Engineer
> Aviation Weather Center (NOAA/NWS/NCEP)
> 7220 NW 101st Terrace, Room 101
> Kansas City, MO 64153-2371
> 
> Email: address@hidden
> Phone: 816.584.7251
> Fax:   816.880.0650
> URL:   http://aviationweather.gov/
> 
> The opinions expressed in this message do not necessarily reflect those 
> of the National Weather Service, or the Aviation Weather Center.

Regards,
Steve Emmerson

> --------------070609090501010602000202
> Content-Type: text/plain;
>  name="filel.c"
> Content-Transfer-Encoding: 7bit
> Content-Disposition: inline;
>  filename="filel.c"
> 
> /*
>  *   Copyright 1993, University Corporation for Atmospheric Research
>  *   See ../COPYRIGHT file for copying and redistribution conditions.
>  */
> /* $Id: filel.c,v 1.2 2003/07/31 05:07:55 sdanz Exp $ */
> 
> /* #define _POSIX_SOURCE */
> 
> #include <features.h>
> #include <ldmconfig.h>
> #include <stdio.h>
> #include <assert.h>
> #include <string.h>
> #include <ctype.h>
> #include <limits.h> /* PATH_MAX */
> #ifndef PATH_MAX
> #define PATH_MAX 255
> #endif /* !PATH_MAX */
> #include <sys/types.h>
> #include <sys/stat.h>
> #include <fcntl.h> /* O_RDONLY et al */
> #include <unistd.h> /* access, lseek */
> #include <signal.h>
> #include <errno.h>
> 
> #if defined(_AIX) && !defined(NO_WAITPID)
> /*
>  * Use POSIX wait macros, not _BSD
>  */
> #define _H_M_WAIT
> #endif
> #include <sys/wait.h>
> 
> 
> #include "filel.h"
> #include "action.h"
> #include "ldm.h"
> #include "ldmalloc.h"
> #include "mkdirs_open.h"
> #include "ulog.h"
> #include "pbuf.h"
> 
> /*
>  * Defined in pqcat.c
>  */
> extern int pipe_timeo;
> 
> #ifndef NO_DB
> 
> /*
>  * Define DB_XPROD non zero if you want the whole "product" data
>  * structure put into the database, otherwise just the data goes in.
>  * Be sure this is consistant with ../dbcat and whatever else used
>  * to read the files.
>  */
> # ifndef DB_XPROD
> # define DB_XPROD 1
> # endif
> 
> /*
>  * Backward compatibility.
>  * If you want to use gdbm interfaces, define USE_GDBM
>  */
> # ifdef USE_GDBM
> # include "gdbm.h"
> # else
> # include <ndbm.h>
> # endif
> 
> #endif /* !NO_DB */
> 
> /*
>  * Tuning parameter: MAXENTRIES is the number of descriptors
>  * you wish to allocate to this list. You need to leave enough around
>  * for the error output and for rpc.
>  */
> /* #define MAXENTRIES OPEN_MAX/2 */
> #define MAXENTRIES 32
> 
> 
> /*
>  * 
>  */
> typedef enum {
>       FT_NONE = 0,
>       UNIXIO,
>       STDIO,
>       PIPE,
>       FT_DB
> } ft_t ;
> 
> 
> union f_handle {
>       int fd;
>       FILE *stream;
>       pbuf *pbuf;
> #ifndef NO_DB
> # ifdef USE_GDBM
>       GDBM_FILE db;
> # else
>       DBM *db;
> # endif
> #endif /*!NO_DB*/
> };
> typedef union f_handle f_handle;
> 
> 
> struct fl_entry {
>       int flags;
>       ft_t type;
>       struct fl_entry *next;
>       struct fl_entry *prev;
>       struct fl_ops *ops;
>       f_handle handle;
>       long serial;    /* a serial number for this connection */
>       unsigned long private;          /* pid, hstat *, read/write flg */
>       char path[PATH_MAX+1];
> };
> typedef struct fl_entry fl_entry;
> 
> #if defined(__cplusplus) || defined(__STDC__)
> struct fl_ops {
>       int (*cmp)(fl_entry *, int, char**);
>       int (*open)(fl_entry *, int, char**);
>       void (*close)(fl_entry *);
>       int (*sync)(fl_entry *, int);
>       int (*put)(fl_entry *, const char *,
>               const void *, size_t );
> };
> #else /* Old Style C */
> struct fl_ops {
>       int (*cmp)();
>       int (*open)();
>       void (*close)();
>       int (*sync)();
>       int (*dbufput)();
> };
> #endif
> 
> 
> /*
>  * the one global list of of open files
>  */
> static struct fl {
>       int size;
>       fl_entry *head;
>       fl_entry *tail;
> } thefl[] = {
>       0 ,
>       NULL ,
>       NULL
> };
> 
> 
> #define TO_HEAD(entry) \
>       if(thefl->head != entry) to_head(entry)
> 
> static void
> to_head(fl_entry *entry)
> {
>       if(thefl->head == entry)
>               return;
> 
>       if(entry->prev != NULL)
>               entry->prev->next = entry->next;
>       if(entry->next != NULL)
>               entry->next->prev = entry->prev;
> 
>       if(thefl->head != NULL)
>               thefl->head->prev = entry;
>       if(thefl->tail == entry)
>               thefl->tail = entry->prev;
> 
>       entry->next = thefl->head;
>       entry->prev = NULL;
>       thefl->head = entry;
>       if(thefl->tail == NULL)
>               thefl->tail = entry;
> }
> 
> 
> static void
> free_fl_entry(fl_entry *entry)
> {
>       if(entry == NULL) return;
> 
>       if(entry->ops != NULL)
>       {
>               entry->ops->close(entry);
>       }
>       free(entry);
> }
> 
> 
> /*
>  * generate a serial number, used to validate
>  * Action cache entries.
>  */
> static long
> newSerial(void)
> {
>       struct timeval now;
>       if( gettimeofday(&now, NULL) < 0)
>       {
>               serror("newSerial: gettimeofday");
>               return ((long) rand() );
>       }
>       return ( ((now.tv_sec % 1000000) * 1000) + (now.tv_usec / 1000));
> }
> 
> /* forward reference */
> static fl_entry * new_fl_entry(ft_t type, int argc, char **argv);
> 
> #ifdef FL_DEBUG
> static void
> dump_fl(void)
> {
>       fl_entry *entry;
>       int fd;
> 
>       udebug("      thefl->size %d", thefl->size);
>       for(entry = thefl->head; entry != NULL; 
>                       entry = entry->next )
>       {
>               switch (entry->type) {
>               case UNIXIO :
>                       fd = entry->handle.fd;
>                       break;
>               case STDIO :
>                       fd = entry->handle.stream == NULL
>                               ? -1 : fileno(entry->handle.stream);    
>                       break;
>               case PIPE :
>                       fd = entry->handle.pbuf == NULL
>                               ? -1 : entry->handle.pbuf->pfd;
>                       break;
>               case FT_DB :
> #ifndef NO_DB
>                       fd = entry->handle.db == NULL
>                               ? -1 : -2;
>                       break;
> #endif /* !NO_DB */
>               default :
>                       fd = -2;
>               }
>               udebug("       %d %s", fd, entry->path);
>       }
> }
> #endif
> 
> static fl_entry *
> lookup_fl_entry(ft_t type, int argc, char **argv)
> {
>       fl_entry *entry = NULL;
> 
>       for(entry = thefl->head; entry != NULL; 
>                       entry = entry->next )
>       {
>               if(entry->type == type &&
>                               entry->ops->cmp(entry, argc, argv) == 0)
>                       break;
>       }
>       return entry;
> }
> 
> 
> static void
> delete_entry(fl_entry *entry)
> {
>       /* assert(thefl->size >= 1); */
>       if(entry == NULL) return;
> 
>       if(entry->prev != NULL)
>               entry->prev->next = entry->next;
>       if(entry->next != NULL)
>               entry->next->prev = entry->prev;
>       if(thefl->head == entry)
>               thefl->head = entry->next;
>       if(thefl->tail == entry)
>               thefl->tail = entry->prev;
>       thefl->size--;
> 
>       free_fl_entry(entry);
> }
> 
> 
> /*
>  * sync up to nentries entries, tail to head.
>  */
> void
> fl_sync(int nentries,
>        int block) /* bool_t, FALSE => nonblocking */
> {
>       fl_entry *entry, *prev;
> 
> /*    udebug("  fl_sync"); */
> 
>       if(thefl->size <= 0)
>               return;
>       if(nentries == -1)      /* sync everyone */
>               nentries = thefl->size;
>               
>       for(entry = thefl->tail;
>               entry != NULL && nentries >= 0; entry = prev, nentries--)
>       {
>               prev = entry->prev;
>               if(entry->flags & FL_NEEDS_SYNC)
>               {
>                       if(entry->ops->sync(entry, block) == -1)
>                               delete_entry(entry);
>               }
>       }
> }
> 
> 
> /*
>  * close the "least recently used" entry
>  */
> void
> close_lru(int skipflags)
> {
>       fl_entry *entry, *prev;
> 
>       if(thefl->size <= 0)
>               return;
>       entry = thefl->tail;    
> 
> 
>       for(entry = thefl->tail;
>               entry != NULL; entry = prev)
>       {
>               prev = entry->prev;
>               /* twisted logic */
>               if(entry->flags & skipflags)
>                       continue;
>               /* else */
>       /*      udebug("   close_lru: %s", entry->path); */
>               delete_entry(entry);
>               return;
>       }
> }
> 
> 
> void
> fl_close_all(void)
> {
>       while(thefl->size > 0)
>       {
>               close_lru(0);
>       }
> }
> 
> 
> /*
>  * Look for an fl_entry in the list.
>  * If there isn't one there that matches what you need, make a new one.
>  */
> static fl_entry *
> get_fl_entry(ft_t type, int argc, char **argv)
> {
>       fl_entry *entry;
> 
>       static int open_max = 0; /* number of descriptors */
>       if (!open_max)
>       {
> #ifdef _SC_OPEN_MAX
>               open_max = (int) sysconf(_SC_OPEN_MAX);
>         /* Hold some aside for std[in/out/err], shm handles and libraries */
>         if (open_max > 32) {
>             open_max = open_max - 16;
>         } else {
>             open_max = open_max/2;
>         }
> #else
>               open_max = 32; /* punt */
> #endif
>       }
> 
>       entry = lookup_fl_entry(type, argc, argv);
>       if( entry != NULL )
>       {
>               TO_HEAD(entry);
> #ifdef FL_DEBUG
>               dump_fl();
> #endif
>               return entry;
>       }
>       /* else */
> 
>       if(thefl->size >= open_max)
>               close_lru(0);
> 
>       entry = new_fl_entry(type, argc, argv);
>       if( entry == NULL )
>       {
>               return NULL; /* malloc or open failed */
>       }
> 
>       /* to front */
>       if(thefl->head != NULL)
>               thefl->head->prev = entry;
>       entry->next = thefl->head;
>       entry->prev = NULL;
>       thefl->head = entry;
>       if(thefl->tail == NULL)
>               thefl->tail = entry;
>       thefl->size++;
> 
> #ifdef FL_DEBUG
>       dump_fl();
> #endif
>       return entry;
> }
> 
> 
> static int
> atFinishedArgs(int ac,
>       char *av[],
>       fl_entry *entry)
> {
>       int status = 0;
>       int syncflag = 0;
>       int closeflag = 0;
>       for(; ac > 1 && *av[0] == '-'; ac-- , av++)
>       {
>               if( strncmp(*av,"-close",3) == 0)
>               {
>                       closeflag = !0;
>               }
>               else if( strncmp(*av,"-flush",3) == 0)
>               {
>                       syncflag = !0;
>               }
>       }
>       if(syncflag)
>               status = (*entry->ops->sync)(entry, syncflag);
>       if(closeflag)
>               delete_entry(entry);
>       return status;
> }
> 
> 
> /*
>  * Given a dbuf, return a copy with the non '\n'
>  * control characters removed.
>  * Remember to free the result.
>  */
> static void *
> dupstrip(const void *in, size_t len, size_t *outlenp)
> {
>       void *out;
>       size_t blen;
>       const unsigned char *ip;
>       char *op;
> 
>       if(in == NULL || len == 0)
>               return NULL;
> 
>       out = malloc(len);
>       if(out == NULL)
>       {
>               serror("dupstrip: malloc %ld failed", (long) len);
>               return NULL;
>       }
>       
>       for(blen = len, ip = in, op = out, *outlenp = 0;
>               blen != 0; blen--, ip++)
>       {
>               if(((int)*ip) > 127
>                               || (iscntrl(*ip) && *ip != '\n'))
>                       continue;
>               /* else */
>               *op++ = *ip;
>               (*outlenp)++;
>       }
> 
>       return out;
> }
> 
> 
> /* Begin UNIXIO */
> static int
> str_cmp( fl_entry *entry, int argc, char **argv)
> {
>       char *path;
> 
>       assert(argc > 0);
>       assert(argv[argc -1] != NULL);
>       assert(*argv[argc -1] != 0);
> 
>       path = argv[argc-1];
>       return(strcmp(path, entry->path));
> }
> 
> 
> static int
> unio_open(fl_entry *entry, int ac, char **av)
> {
>       char *path;
>       int flags = (O_WRONLY|O_CREAT);
> 
>       assert(ac > 0);
>       assert(av[ac -1] != NULL);
>       assert(*av[ac -1] != 0);
> 
>       entry->handle.fd = -1;
> 
>       for(; ac > 1 && *av[0] == '-'; ac-- , av++)
>       {
>               if( strncmp(*av,"-overwrite",3) == 0)
>               {
>                       flags |= O_TRUNC;
>               }
>               else if( strncmp(*av,"-strip",3) == 0)
>               {
>                       entry->flags |= FL_STRIP;
>               }
>       }
> 
>       path = av[ac-1];
> 
>       entry->handle.fd = mkdirs_open(path, flags, 0666);
>       if(entry->handle.fd == -1)
>       {
>               if(errno == EMFILE)
>               {
>                       /* Too many open files */
>                       close_lru(0);
>                       close_lru(0);
>               }
>               serror("unio_open: %s", path);
>               return -1;
>       }
>       if(!(flags & O_TRUNC))
>               if(lseek(entry->handle.fd, 0, SEEK_END) < 0)
>                       serror("unio_open:lseek: %s", path);
>                               /* fatal ? what about devices? */
> 
>       strncpy(entry->path, path, PATH_MAX);
>       entry->path[PATH_MAX] = 0; /* just in case */
>       udebug("    unio_open: %d", entry->handle.fd);
>       return entry->handle.fd;
> }
> 
> 
> static void
> unio_close(fl_entry *entry)
> {
>       udebug("    unio_close: %d", entry->handle.fd); 
>       if(entry->handle.fd != -1)
>       {
>               if(close(entry->handle.fd) == -1) 
>               {
>                       serror("close: %s", entry->path);
>               }
>       }
>       entry->path[0] = 0;
>       entry->handle.fd = -1;
> }
> 
> 
> static int
> unio_sync(fl_entry *entry, int block)
> {
>       /*
>        * Some systems may not have an fsync(2) call.
>        * The best you can do then would be to make this
>        * routine a noop which returns 0.
>        */
>       int status  = 0;
>       udebug("    unio_sync: %d %s",
>               entry->handle.fd, block ? "" : "non-block"); 
>       if(block)
>       {
> #ifndef NO_FSYNC
>               if(entry->handle.fd != -1)
>                       status = fsync(entry->handle.fd); 
>               if(status == -1)
>               {
>                       serror("fsync: %s", entry->path);
>               }
> #endif
>               entry->flags &= ~FL_NEEDS_SYNC;
>       }
>       return status;
> }
> 
> 
> /*ARGSUSED*/
> static int
> unio_put(fl_entry *entry, const char *ignored, 
>               const void *data, size_t sz)
> {
>       int nwrote;
> 
>       TO_HEAD(entry);
>       udebug("    unio_dbufput: %d", entry->handle.fd);
> 
>       nwrote = (int) write(entry->handle.fd, data, sz);
>       if(nwrote != sz)
>       {
>               serror("unio_put: %s write error",
>                       entry->path);
>               /* don't waste time syncing an errored entry */
>               entry->flags &= ~FL_NEEDS_SYNC;
>               delete_entry(entry);
>               return -1;
>       }
>       /* else */
>       entry->flags |= FL_NEEDS_SYNC;
>       return 0;
> }
> 
> 
> static struct fl_ops unio_ops = {
>       str_cmp,
>       unio_open,
>       unio_close,
>       unio_sync,
>       unio_put,
> };
> 
> 
> /*ARGSUSED*/
> int
> unio_prodput(const product *prodp, int argc, char **argv,
>               const void *ignored, size_t also_ignored)
> {
>       int status = 0;
>       void *data = prodp->data;
>       size_t sz = prodp->info.sz;
>       fl_entry *entry = get_fl_entry(UNIXIO, argc, argv);
> 
>       udebug("    unio_prodput: %d %s",
>               entry == NULL ? -1 : entry->handle.fd , prodp->info.ident);
>       if(entry == NULL)
>               return -1;
> 
>       if(entry->flags & FL_STRIP)
>       {
>               data = dupstrip(prodp->data, prodp->info.sz, &sz);
>               if(data == NULL)
>                       return -1;
>       }
> 
>       status = unio_put(entry, prodp->info.ident, data, sz);
>       if(data != prodp->data)
>               free(data);
>       if(status != -1)
>               status = atFinishedArgs(argc, argv, entry);
>       return status;
> }
> 
> 
> /* End UNIXIO */
> 
> /* Begin STDIO */
> static int
> stdio_open(fl_entry *entry, int ac, char **av)
> {
>       char *path;
>       int flags = (O_WRONLY|O_CREAT);
>       int fd;
>       char *mode = "a";
>       /* extern FILE *fdopen(int, const char *); */
> 
>       assert(ac > 0);
>       assert(av[ac -1] != NULL);
>       assert(*av[ac -1] != 0);
> 
>       entry->handle.stream = NULL;
> 
>       for(; ac > 1 && *av[0] == '-'; ac-- , av++)
>       {
>               if( strncmp(*av,"-overwrite",3) == 0)
>               {
>                       flags |= O_TRUNC;
>                       mode = "w";
>               }
>               else if( strncmp(*av,"-strip",3) == 0)
>               {
>                       entry->flags |= FL_STRIP;
>               }
>       }
> 
>       path = av[ac-1];
> 
>       fd = mkdirs_open(path, flags, 0666);
>       if(fd == -1)
>       {
>               if(errno == EMFILE)
>               {
>                       /* Too many open files */
>                       close_lru(0);
>                       close_lru(0);
>               }
>               serror("mkdirs_open: %s", path);
>               return -1;
>       }
>       entry->handle.stream = fdopen(fd, mode);
>       if(entry->handle.stream == NULL)
>       {
>               serror("fdopen: %s", path);
>               return -1;
>       }
>       strncpy(entry->path, path, PATH_MAX);
>       entry->path[PATH_MAX] = 0; /* just in case */
>       udebug("    stdio_open: %d", fileno(entry->handle.stream));
>       return fileno(entry->handle.stream);
> }
> 
> 
> static void
> stdio_close(fl_entry *entry)
> {
>       udebug("    stdio_close: %d",
>               entry->handle.stream ? fileno(entry->handle.stream) : -1);
>       if(entry->handle.stream != NULL)
>       {
>               if(fclose(entry->handle.stream) == EOF) 
>               {
>                       serror("fclose: %s", entry->path);
>               }
>       }
>       entry->path[0] = 0;
>       entry->handle.stream = NULL;
> }
> 
> 
> /*ARGSUSED*/
> static int
> stdio_sync(fl_entry *entry, int block)
> {
>       int status  = 0;
>       udebug("    stdio_sync: %d",
>               entry->handle.stream ? fileno(entry->handle.stream) : -1);
>       if(fflush(entry->handle.stream) == EOF) 
>       {
>               serror("fflush: %s", entry->path);
>               status = -1;
>       }
>       entry->flags &= ~FL_NEEDS_SYNC;
>       return status;
> }
> 
> 
> /*ARGSUSED*/
> static int
> stdio_put(fl_entry *entry, const char *ignored,
>               const void *data, size_t sz)
> {
>       int nwrote;
> 
>       TO_HEAD(entry);
>       udebug("    stdio_dbufput: %d", fileno(entry->handle.stream));
> 
>       /* else */
>       nwrote = (int) fwrite(data, sz, 1,
>                        entry->handle.stream);
>       if(nwrote != 1)
>       {
>               serror("stdio_put: %s fwrite error",
>                       entry->path);
>               /* don't waste time syncing an errored entry */
>               entry->flags &= ~FL_NEEDS_SYNC;
>               delete_entry(entry);
>               return -1;
>       }
>       /* else */
>       entry->flags |= FL_NEEDS_SYNC;
>       return 0;
> }
> 
> 
> static struct fl_ops stdio_ops = {
>       str_cmp,
>       stdio_open,
>       stdio_close,
>       stdio_sync,
>       stdio_put,
> };
> 
> 
> /*ARGSUSED*/
> int
> stdio_prodput(const product *prodp, int argc, char **argv,
>               const void *ignored, size_t also_ignored)
> {
>       int status = 0;
>       void *data = prodp->data;
>       size_t sz = prodp->info.sz;
>       fl_entry *entry = get_fl_entry(STDIO, argc, argv);
> 
>       udebug("    stdio_prodput: %d %s",
>               entry == NULL ? -1 :
>                        fileno(entry->handle.stream) , prodp->info.ident);
>       if(entry == NULL)
>               return -1;
> 
>       if(entry->flags & FL_STRIP)
>       {
>               data = dupstrip(prodp->data, prodp->info.sz, &sz);
>               if(data == NULL)
>                       return -1;
>       }
> 
>       status = stdio_put(entry, prodp->info.ident, data, sz);
>       if(data != prodp->data)
>               free(data);
>       if(status != -1)
>               status = atFinishedArgs(argc, argv, entry);
>       return status;
> }
> 
> /* End STDIO */
> 
> 
> /* Begin PIPE */
> static int
> argcat(char *buf, int len, int argc, char **argv)
> {
>       int cnt = 0;
>       char *cp;
> 
>       while(argc-- > 0 && (cp = *argv++) != NULL)
>       {
>               while(*cp != 0)
>               {
>                       buf[cnt++] = *cp++;
>                       if(cnt >= len)
>                               break;
>               }
>       }
>       buf[cnt] = 0;
>       return cnt;
> }
> 
> 
> static int
> argcat_cmp(fl_entry *entry, int argc, char **argv)
> {
>       char buf[PATH_MAX+1];
> 
>       assert(argc > 0);
>       assert(argv[0] != NULL);
>       assert(*argv[0] != 0);
> 
>       argcat(buf, sizeof(buf), argc, argv);
>       return(strcmp(buf, entry->path));
> }
> 
> 
> /*
>  * Set to non-root privilege if possible.
>  * Do it in such a way that it is safe to fork.
>  * TODO: this is duplicated from ../server/priv.c
>  */
> void
> endpriv(void)
> {
>       const uid_t euid = geteuid();
>       const uid_t uid = getuid();
> 
>       /* if either euid or uid is unprivileged, use it */
>       if(euid > 0)
>               setuid(euid);
>       else if(uid > 0)
>               setuid(uid);
> 
>       /* else warn??? or set to nobody??? */
> }
> 
> 
> static int
> pipe_open(fl_entry *entry, int argc, char **argv)
> {
>       int ac = argc;
>       char **av = argv;
>       int pfd[2];
>       pid_t pid;
> 
>       assert(argc >= 1);
>       assert(argv[0] != NULL && *argv[0] != 0);
> 
>       entry->handle.pbuf = NULL;
> 
>       entry->flags |= FL_NOTRANSIENT;
>       /* handle any options */
>       for(; ac > 1 && *av[0] == '-'; ac-- , av++)
>       {
>               if( strncmp(*av,"-transient",3) == 0)
>               {
>                       entry->flags &= ~FL_NOTRANSIENT;
>               }
>               else if( strncmp(*av,"-strip",3) == 0)
>               {
>                       entry->flags |= FL_STRIP;
>               }
>       }
> 
>       if( pipe(pfd) == -1 )
>       {
>               if(errno == EMFILE)
>               {
>                       /* Too many open files */
>                       close_lru(0);
>                       close_lru(0);
>               }
>               serror("pipe");
>               return -1;
>       }
> 
>       pid = fork();
>       if(pid == -1)
>       {       /* failure */
>               serror("pipe_open: fork");
>               goto error_out;
>       }
>       /* else */
> 
>       if(pid == 0)
>       {       /* child */
> 
>               (void)signal(SIGCHLD, SIG_DFL);
>               (void)signal(SIGTERM, SIG_DFL);
> 
>               if( dup2(pfd[0] , 0) == -1 ) 
>               {
>                       serror("pipe: child dup2");
>                       _exit(-1);
>               }
>               close_all();
>               /* Set up fd 1, stdout */
>               (void) close(1);
>               {
>                       int fd = open("/dev/null", O_WRONLY);
>                       if(fd >= 0 && fd != 1)
>                       {
>                               (void) dup2(fd, 1);
>                               (void) close(fd);
>                       }
>               }
>               /* we leave stderr alone */
> 
>               endpriv();
> 
>               assert(av[ac] == NULL);
>               (void) execvp(av[0], &av[0]);
>               serror("pipe: execvp: %s", av[0]);
>               _exit(127);
>       }
>       /* else, parent */
> 
>       if(close(pfd[0]) == -1)
>       {
>               /* How can this ever happen? */
>               serror("pipe: parent close");
>               /* not fatal ? */
>       }
> 
>       /* set up pfd[1] as output descriptor */
>       entry->handle.pbuf = new_pbuf(pfd[1], 512); /* _POSIX_PIPE_BUF */
>       if(entry->handle.pbuf == NULL) 
>               goto error_out;
> 
>       entry->private = pid;
>       argcat(entry->path, PATH_MAX, argc, argv);
> 
>       udebug("    pipe_open: %d %d", pfd[1], pid);
> 
>       return pfd[1];
> 
> error_out:
>       (void)close(pfd[1]);
>       (void)close(pfd[0]);
>       return -1;
> }
> 
> 
> /*
>  * Used by reap() to delete a PIPE entry
>  */
> static void
> tag_pid_entry(pid_t pid)
> {
>       fl_entry *entry = NULL;
> 
>       if(pid == -1)
>               return; 
> 
>       for(entry = thefl->tail; entry != NULL; 
>                       entry = entry->prev )
>       {
>               if(entry->type == PIPE 
>                               && pid == entry->private)
>                       break;
>       }
> 
>       if(entry != NULL)
>       {
>               udebug("       reap(%d): %s", pid, entry->path);
>               /* mark the entry as dead */
>               entry->private = -1;
>               /* and delete it. Unsafe in interrupt context */
>               delete_entry(entry);
>       }
>       else
>       {
>               udebug("       reap(%d): proc not on filel", pid);
>       }
> }
> 
> 
> int
> reap(pid_t pid, int options)
> {
>       pid_t wpid = 0;
>       int status = 0;
> 
> #ifndef NO_WAITPID
>       wpid = waitpid(pid, &status, options);
> #else
>       if(options == 0)
>               wpid = wait(&status);
>       /* customize here for older systems, use wait3 or whatever */
> #endif
>       if(wpid == -1)
>       {
>               if(!(errno == ECHILD && pid == -1)) /* Only complain when 
> relevant */
>                       serror("waitpid");
>               return -1;
>       }
>       /* else */
> 
>       if(wpid != 0) 
>       {
> 
> #if !defined(WIFSIGNALED) && !defined(WIFEXITED)
> #error "Can't decode wait status"
> #endif
> 
> #if defined(WIFSTOPPED)
>               if(WIFSTOPPED(status))
>               {
>                       unotice("child %d stopped by signal %d",
>                               wpid, WSTOPSIG(status));
>               }
>               else
> #endif
> #if defined(WIFSIGNALED)
>               if(WIFSIGNALED(status))
>               {
>                       tag_pid_entry(wpid);
>                       unotice("child %d terminated by signal %d",
>                               wpid, WTERMSIG(status));
>               }
>               else
> #endif
> #if defined(WIFEXITED)
>               if(WIFEXITED(status))
>               {
>                       tag_pid_entry(wpid);
>                       if(WEXITSTATUS(status) != 0)
>                               unotice("child %d exited with status %d",
>                                       wpid, WEXITSTATUS(status));
>               }
> #endif
>       }
> 
>       return wpid;
> }
> 
> 
> static int
> pipe_sync(fl_entry *entry, int block)
> {
>       int status = ENOERR;
>       udebug("    pipe_sync: %d %s",
>               entry->handle.pbuf ? entry->handle.pbuf->pfd : -1,
>               block ? "" : "non-block"); 
>       status = pbuf_flush(entry->handle.pbuf, block, pipe_timeo);
>       if(status != ENOERR)
>               entry->flags &= ~FL_NEEDS_SYNC;
>       return status;
> }
> 
> 
> static void
> pipe_close(fl_entry *entry)
> {
>       pid_t pid = (pid_t)entry->private;
>       int pfd = -1;
> 
>       udebug("    pipe_close: %d, %d",
>               entry->handle.pbuf ? entry->handle.pbuf->pfd : -1, pid);
>       if(entry->handle.pbuf != NULL)
>       {
>               if(pid >= 0 && (entry->flags & FL_NEEDS_SYNC))
>               {
>                       (void) pipe_sync(entry, TRUE);
>               }
>               pfd = entry->handle.pbuf->pfd;
>               free_pbuf(entry->handle.pbuf);
>       }
>       if(pfd != -1)
>       {
>               if(close(pfd) == -1) 
>               {
>                       serror("pipe close: %s", entry->path);
>               }
>               /*
>                * The close should cause termination of the child
>                * as the child reads EOF. The child is wait()'ed
>                * upon asynchronous in a SIGCHLD handler.
>                */
>       }
>       entry->path[0] = 0;
>       entry->handle.pbuf = NULL;
>       entry->private = 0;
> }
> 
> 
> /*
>  * N.B. New return convention:
>  * returns ENOERR (0) or, on failure, the errno.
>  */
> /*ARGSUSED*/
> static int
> pipe_put(fl_entry *entry, const char *ignored,
>               const void *data, size_t sz)
> {
>       int status = ENOERR;
> 
>       udebug("    pipe_put: %d",
>               entry->handle.pbuf ? entry->handle.pbuf->pfd : -1);
>       TO_HEAD(entry);
>       if(entry->handle.pbuf == NULL)
>               return EINVAL;
> 
>       status = pbuf_write(entry->handle.pbuf,
>               data, sz, pipe_timeo);
> 
>       if(status != ENOERR)
>       {
>               uerror("pipe_dbufput: %s write error",
>                               entry->path);
>               /* don't waste time syncing an errored entry */
>               entry->flags &= ~FL_NEEDS_SYNC;
>               delete_entry(entry);
>               return status;
>       }
>       entry->flags |= FL_NEEDS_SYNC;
>       return ENOERR;
> }
> 
> 
> static struct fl_ops pipe_ops = {
>       argcat_cmp,
>       pipe_open,
>       pipe_close,
>       pipe_sync,
>       pipe_put,
> };
> 
> 
> /*ARGSUSED*/
> int
> pipe_prodput(const product *prodp, int argc, char **argv,
>               const void *ignored, size_t also_ignored)
> {
>       int status = 0;
>       void *data = prodp->data;
>       size_t sz = prodp->info.sz;
>       fl_entry *entry = get_fl_entry(PIPE, argc, argv);
> 
>       udebug("    pipe_prodput: %d %s",
>               (entry != NULL && entry->handle.pbuf)
>               ?  entry->handle.pbuf->pfd : -1,
>               prodp->info.ident);
> 
>       if(entry == NULL)
>               return -1;
> 
>       if(entry->flags & FL_STRIP)
>       {
>               data = dupstrip(prodp->data, prodp->info.sz, &sz);
>               if(data == NULL)
>                       return -1;
>       }
> 
>       status = pipe_put(entry, prodp->info.ident, data, sz);
>       if(status == EPIPE)
>       {
>               /*
>                * In case the decoder exited and we haven't yet reaped,
>                * try again once.
>                */
>               uerror("pipe_prodput: trying again");
>               entry = get_fl_entry(PIPE, argc, argv);
>               if(entry == NULL)
>                       return -1;
>               status = pipe_put(entry, prodp->info.ident, data, sz);
>       }
>       if(data != prodp->data)
>               free(data);
> 
>       if(status != ENOERR)
>               return -1;
> 
>       return atFinishedArgs(argc, argv, entry);;
> }
> 
> 
> /*ARGSUSED*/
> int
> spipe_prodput(const product *prod, int argc, char **argv,
>               const void *ignored, size_t also_ignored)
> {
>       fl_entry *entry;
>         char *buffer;
>       size_t len;
>         unsigned long offset;
>       int status = ENOERR;
> 
>       typedef union {
>               unsigned long u_long;
>               char cu_long[sizeof(unsigned long)];
>       } conv;
>       conv key_len;
>       conv data_len;
>       conv sync;
> 
> 
>       entry = get_fl_entry(PIPE, argc, argv);
>       udebug("    spipe_prodput: %d %s",
>               (entry != NULL && entry->handle.pbuf)
>                       ?  entry->handle.pbuf->pfd : -1,
>                       prod->info.ident);
>       if(entry == NULL)
>               return -1;
> 
>       /*
>         **---------------------------------------------------------
>       ** Place the following information into dbuf_val for 
>       ** writing to the pipe:
>       **
>       ** unsigned long SPIPE_SYNC
>       ** unsigned long key_len
>       ** char *key
>       ** unsigned long data_len  (this includes ETX/RS makers)
>       ** char *data
>       ** char SPIPE_ETX
>       ** char SPIPE_RS
>       **
>       ** First, get lengths of key and data to allocate space
>       ** in a temporary buffer.
>       **
>         **---------------------------------------------------------
>       */
> #ifndef SPIPE_SYNC
> #define SPIPE_SYNC 0x1DFCCF1A
> #endif /* !SPIPE_SYNC */
> 
> #ifndef SPIPE_ETX
> #define SPIPE_ETX '\003'
> #endif /* !SPIPE_ETX */
> 
> #ifndef SPIPE_RS
> #define SPIPE_RS '\036'
> #endif /* !SPIPE_ETX */
> 
>       key_len.u_long = strlen(prod->info.ident);
>       data_len.u_long = prod->info.sz + 2;
>       sync.u_long = SPIPE_SYNC;
> 
>       len = (unsigned ) (sizeof(unsigned long) +
>         sizeof(key_len.cu_long) + strlen(prod->info.ident) +
>         sizeof(data_len.cu_long) + prod->info.sz + 2);
> 
>       buffer = calloc(1, len);
> 
>         /*---------------------------------------------------------
>         ** Now place the individual items into the buffer
>         **-------------------------------------------------------*/
> 
>       offset = 0;
> 
>         memcpy (buffer+offset, sync.cu_long, sizeof(sync.cu_long));
>       offset = offset + sizeof(unsigned long);
> 
>       memcpy(buffer+offset, key_len.cu_long, sizeof(key_len.cu_long));
>       offset = offset + sizeof(key_len);
> 
>       memcpy(buffer+offset, prod->info.ident, key_len.u_long);
>       offset = offset + key_len.u_long;
> 
>       memcpy(buffer+offset, data_len.cu_long, sizeof(data_len.cu_long));
>       offset = offset + sizeof(data_len);
> 
>         memcpy(buffer+offset, prod->data, prod->info.sz);
>       
>         /*---------------------------------------------------------
>         ** Terminate the message with ETX & RS
>         **-------------------------------------------------------*/
>         buffer[len - 2] = SPIPE_ETX;
>         buffer[len - 1] = SPIPE_RS;
> 
>       uerror("spipe_prodput: size = %d\t%d %d %d", prod->info.sz, buffer[len 
> -3],
>           buffer[len -2], buffer[len  -1]);
> 
>         /*---------------------------------------------------------
>         ** Send this stuff and tidy up
>         **-------------------------------------------------------*/
>       status = pipe_put(entry, prod->info.ident, buffer, len);
>       if(status == EPIPE)
>       {
>               /*
>                * In case the decoder exited and we haven't yet reaped,
>                * try again once.
>                */
>               uerror("spipe_prodput: trying again");
>               entry = get_fl_entry(PIPE, argc, argv);
>               if(entry == NULL)
>                       return -1;
>               status = pipe_put(entry, prod->info.ident, buffer, len);
>       }
>         free(buffer);
>       if(status != ENOERR)
>               return -1;
> 
>       return atFinishedArgs(argc, argv, entry);
> 
> }
> 
> 
> int
> xpipe_prodput(const product *prod, int argc, char **argv,
>               const void *xprod, size_t xlen)
> {
>       int status = ENOERR;
>       fl_entry *entry;
> 
>       entry = get_fl_entry(PIPE, argc, argv);
>       udebug("    xpipe_prodput: %d %s",
>               (entry != NULL && entry->handle.pbuf)
>                       ?  entry->handle.pbuf->pfd : -1, prod->info.ident);
>       if(entry == NULL)
>               return -1;
> 
>       status = pipe_put(entry, prod->info.ident, xprod, xlen);
>       if(status == EPIPE)
>       {
>               /*
>                * In case the decoder exited and we haven't yet reaped,
>                * try again once.
>                */
>               uerror("xpipe_prodput: trying again");
>               entry = get_fl_entry(PIPE, argc, argv);
>               if(entry == NULL)
>                       return -1;
>               status = pipe_put(entry, prod->info.ident, xprod, xlen);
>       }
> 
>       if(status != ENOERR)
>               return -1;
> 
>       return atFinishedArgs(argc, argv, entry);
> }
> /* End PIPE */
> 
> 
> #ifndef NO_DB
> # ifdef USE_GDBM
> /* namespace conflict with gdbm_open, etc, so using prefix ldmdb_ */
> 
> 
> /*
>  * called in gdbm when it tries to punt
>  * If we didn't provide this function, gdbm would print the
>  * message and call exit(-1).
>  */
> static void
> ldmdb_fatal ( char * str)
> {
>       serror("%s", str);
> }
> 
> 
> /*
>  * two or 3 args:
>  *    pathname flag [dblocksize]
>  *    if flag is 0 open read/write/create, otherwise open readonly
>  */
> static int
> ldmdb_open(fl_entry *entry, int argc, char **argv)
> {
>       char *path;
>       GDBM_FILE db;
>       long tmp = 0;
>       int read_write = GDBM_WRCREAT;
>       /* default: choose to optimize for space over time */
> #define DEFAULT_DBLOCKSIZE 512
>       int dblocksize = DEFAULT_DBLOCKSIZE;
> 
>       entry->handle.db = NULL;
>       path = argv[0];
>       read_write = atoi(argv[1]);
> 
>       if(argc > 2)
>       {
>               if ( (tmp = atoi(argv[2])) > 0 )
>               {
>                       dblocksize = (int)tmp;
>               }
>               else 
>               {
>                       uerror("%s: ldmdb_open: -dblocksize %s invalid",
>                               path, argv[1] );
>               }
>       }
> 
>       if(read_write != GDBM_READER) /* not read only */
>       {
>               /* create directories if needed */
>               if(diraccess(path, (R_OK | W_OK), !0) == -1)
>               {
>                       serror("Couldn't access directories leading to %s", 
> path);
>                       return -1;
>               }
>       }
> 
>       db = gdbm_open(path, dblocksize, read_write, 0664, ldmdb_fatal);
>       if(db == NULL)
>       {
>               if(errno == EMFILE)
>               {
>                       /* Too many open files */
>                       close_lru(0);
>                       close_lru(0);
>               }
>               serror("gdbm_open: %s", path);
>               return -1;
>       }
>       entry->handle.db = db;
>       entry->private = read_write;
>       strncpy(entry->path, path, PATH_MAX);
>       entry->path[PATH_MAX] = 0; /* just in case */
>       udebug("    ldmdb_open: %s", entry->path);
>       return 0;
> }
> 
> 
> static void
> ldmdb_close(fl_entry *entry)
> {
>       udebug("    ldmdb_close: %s", entry->path);
>       if(entry->handle.db != NULL)
>               gdbm_close(entry->handle.db);
>       entry->private = 0;
>       entry->path[0] = 0;
>       entry->handle.db = NULL;
> }
> 
> 
> static int
> ldmdb_cmp(fl_entry *entry, int argc, char **argv)
> {
>       char *path;
>       int read_write;
>       int cmp;
> 
>       assert(argc > 1);
>       assert(argv[0] != NULL);
>       assert(*argv[0] != 0);
> 
>       path = argv[0];
>       read_write = atoi(argv[1]);
> 
>       cmp = strcmp(path, entry->path);
>       if(cmp == 0)
>       {
>               if(read_write != GDBM_READER &&
>                       read_write != entry->private)
>               {
>                       /*
>                        * the flags don't match, so close and reopen
>                        */
>                       ldmdb_close(entry);
>                       if(ldmdb_open(entry, argc, argv) < 0)
>                               cmp = -1;
>               }
>       }
>       return cmp;
> }
> 
> 
> /*ARGSUSED*/
> static int
> ldmdb_sync(fl_entry *entry, int block)
> {
>       /* there is no gdbm_sync */
>       udebug("    ldmdb_sync: %s", 
>               entry->handle.db ? entry->path : "");
>       entry->flags &= ~FL_NEEDS_SYNC;
>       return(0);
> }
> 
> 
> /*ARGSUSED*/
> static int
> ldmdb_put(fl_entry *entry, const char *keystr,
>               const void *data, size_t sz)
> {
>       datum key, content;
>       int status;
> 
>       key.dptr = (char *) keystr /* N.B. cast away const */;
>       key.dsize = (int) strlen(key.dptr) + 1; /* include the \0 */
> 
>       content.dptr = (char *) data; /* N.B. cast away const */
>       content.dsize = (int)sz;
> 
> #if defined(DB_CONCAT) && !DB_XPROD
>     /* concatenate duplicate keys  */
>       /* 
>        * Code for concatenating data when the key is a duplicate.
>        * Contributed 9/17/91 JCaron/PNeilley/LCarson
>        * Wrecks idea of "product" when applied at this layer, so
>        * only define DB_CONCAT when DB_XPROD is not defined.
>        */
> 
>       status = gdbm_store(entry->handle.db, key, content, GDBM_INSERT);
>         if (status == 1 )
>             {
>             int               size;
>             datum       old_stuff, new_stuff;
>             old_stuff = gdbm_fetch(entry->handle.db, key);
>                       udebug("\tConcatenating data under key %s", key.dptr);
>             if (NULL == old_stuff.dptr)
>                 {
>                 serror("ldmdb_prodput: Inconsistent Duplicate Key storage");
>                 return -1;
>                 }
>             size = content.dsize+old_stuff.dsize;
>             if (NULL == (new_stuff.dptr = malloc(size)))
>                 {
>                 serror("ldmdb_prodput: malloc failed");
>                 free (old_stuff.dptr);
>                 return -1;
>                 }
>             memcpy(new_stuff.dptr, old_stuff.dptr, old_stuff.dsize);
>             memcpy(&new_stuff.dptr[old_stuff.dsize], content.dptr, 
> content.dsize);
>             new_stuff.dsize = size;
>             status = gdbm_store(entry->handle.db, key, new_stuff, 
> GDBM_REPLACE);
>             free (new_stuff.dptr);
>             free (old_stuff.dptr);
>             }
> 
> #else
>       /* TODO: replace flag */
>       status = gdbm_store(entry->handle.db, key, content, GDBM_REPLACE);
> #endif
>       return status;
> }
> 
> # else /*USE_GDBM*/
> 
> /*
>  * two or 3 args:
>  *    pathname flag [dblocksize]
>  *    if flag is 0 open read/write/create, otherwise open readonly
>  */
> static int
> ldmdb_open(fl_entry *entry, int ac, char **av)
> {
>       const char *path;
>       int flags = (O_WRONLY|O_CREAT);
> 
>       assert(ac > 0);
>       assert(av[ac -1] != NULL);
>       assert(*av[ac -1] != 0);
> 
>       entry->handle.db = NULL;
> 
>       for(; ac > 1 && *av[0] == '-'; ac-- , av++)
>       {
>               if( strncmp(*av,"-overwrite",3) == 0)
>               {
>                       flags |= O_TRUNC;
>               }
>               else if( strncmp(*av,"-strip",3) == 0)
>               {
>                       entry->flags |= FL_STRIP;
>               }
>       }
> 
>       path = av[ac-1];
> 
>       /* create directories if needed */
>       if(diraccess(path, (R_OK | W_OK), !0) == -1)
>       {
>               serror("Couldn't access directories leading to %s", path);
                return -1;
>       }
> 
>       entry->handle.db = dbm_open(path, flags, 0666);
>       if(entry->handle.db == NULL)
>       {
>               if(errno == EMFILE)
>               {
>                       /* Too many open files */
>                       close_lru(0);
>                       close_lru(0);
>                       close_lru(0);
>                       close_lru(0);
>               }
>               serror("ldmdb_open: %s", path);
>               return -1;
>       }
>       strncpy(entry->path, path, PATH_MAX);
>       entry->path[PATH_MAX] = 0; /* just in case */
>       udebug("    ldmdb_open: %s", entry->path);
>       return 0;
> }
> 
> 
> static void
> ldmdb_close(fl_entry *entry)
> {
>       udebug("    ldmdb_close: %s", entry->path);
>       if(entry->handle.db != NULL)
>               dbm_close(entry->handle.db);
>       entry->private = 0;
>       entry->path[0] = 0;
>       entry->handle.db = NULL;
> }
> 
> 
> static int
> ldmdb_cmp(fl_entry *entry, int argc, char **argv)
> {
>       return str_cmp(entry, argc, argv);
> }
> 
> 
> /*ARGSUSED*/
> static int
> ldmdb_sync(fl_entry *entry, int block)
> {
>       /* there is no dbm_sync */
>       udebug("    ldmdb_sync: %s", 
>               entry->handle.db ? entry->path : "");
>       entry->flags &= ~FL_NEEDS_SYNC;
>       return(0);
> }
> 
> 
> /*ARGSUSED*/
> static int
> ldmdb_put(fl_entry *entry, const char *keystr,
>               const void *data, size_t sz)
> {
>       datum key, content;
>       int status;
> 
>       key.dptr = (char *) keystr /* N.B. cast away const */;
>       key.dsize = (int) strlen(key.dptr) + 1; /* include the \0 */
> 
>       content.dptr = (char *) data; /* N.B. cast away const */
>       content.dsize = (int)sz;
> 
> #if defined(DB_CONCAT) && !DB_XPROD
>     /* concatenate duplicate keys  */
>       /* 
>        * Code for concatenating data when the key is a duplicate.
>        * Contributed 9/17/91 JCaron/PNeilley/LCarson
>        * Wrecks idea of "product" when applied at this layer, so
>        * only define DB_CONCAT when DB_XPROD is not defined.
>        */
> 
>       status = dbm_store(entry->handle.db, key, content, DBM_INSERT);
>         if (status == 1 )
>             {
>             int               size;
>             datum       old_stuff, new_stuff;
>             old_stuff = dbm_fetch(entry->handle.db, key);
>                       udebug("\tConcatenating data under key %s", key.dptr);
>             if (NULL == old_stuff.dptr)
>                 {
>                 serror("ldmdb_prodput: Inconsistent Duplicate Key storage");
>                 return -1;
>                 }
>             size = content.dsize+old_stuff.dsize;
>             if (NULL == (new_stuff.dptr = malloc(size)))
>                 {
>                 serror("ldmdb_prodput: malloc failed");
>                 free (old_stuff.dptr);
>                 return -1;
>                 }
>             memcpy(new_stuff.dptr, old_stuff.dptr, old_stuff.dsize);
>             memcpy(&((char *)new_stuff.dptr)[old_stuff.dsize],
>                       content.dptr, content.dsize);
>             new_stuff.dsize = size;
>             status = dbm_store(entry->handle.db, key, new_stuff, DBM_REPLACE);
>             free (new_stuff.dptr);
>             free (old_stuff.dptr);
>             }
> 
> #else
>       /* TODO: replace flag */
>       status = dbm_store(entry->handle.db, key, content, DBM_REPLACE);
> #endif
>       return status;
> }
> # endif /*USE_GDBM*/
> 
> 
> static struct fl_ops ldmdb_ops = {
>       ldmdb_cmp,
>       ldmdb_open,
>       ldmdb_close,
>       ldmdb_sync,
>       ldmdb_put,
> };
> 
> 
> /*ARGSUSED*/
> int
> ldmdb_prodput(const product *prod, int ac, char **av,
>               const void *xp, size_t xlen)
> {
>       fl_entry *entry;
>       int status;
>       int closeflag = 0;
> 
>       const char *keystr;
>       char *dblocksizep = NULL;
>       char *gdbm_wrcreat = "2";
> 
>       for(; ac > 1 && *av[0] == '-'; ac-- , av++)
>       {
>               if( strncmp(*av,"-close",3) == 0)
>                       closeflag = !0;
>               else if( strncmp(*av,"-dblocksize",3) == 0)
>               {
>                       ac--; av++;
>                       dblocksizep = *av;
>               } else
>                       uerror("dbfile: Invalid argument %s", *av);
> 
>       }
> 
>       {
>               /* set up simple argc, argv for ldmdb_open */
>               int argc = 0;
>               char *argv[4];
>               argv[argc++] = av[0];
>               argv[argc++] = gdbm_wrcreat;
>               if(dblocksizep != NULL)
>                       argv[argc++] = dblocksizep;
>               argv[argc] = NULL;
>               entry = get_fl_entry(FT_DB, argc, argv);
>               udebug("    ldmdb_prodput: %s %s",
>                       entry == NULL ? "" : entry->path, prod->info.ident);
>               if(entry == NULL) return -1;
>       }
> 
>       ac--; av++;
> 
>       if(ac >= 0 && av[0] != NULL && *av[0] != 0) 
>       {
>               /* use command line arg as key */
>               keystr = av[0];
>       }
>       else
>       {
>               /* use product->ident */
>               keystr = prod->info.ident;
>       }
> 
> #if DB_XPROD
>       status = ldmdb_put(entry, keystr, xp, xlen);
> #else
>       status = ldmdb_put(entry, keystr, prod->data, prod->info.sz);
> #endif
> 
>       if(status == -1)
>       {
>               uerror("db_put: %s error for %s, dbkey %s",
>                       entry->path, prod->info.ident, keystr);
>       }
>       if(closeflag || status == -1)
>       {
>               delete_entry(entry);
>       }
> 
>       return status;
> }
> 
> #endif /* !NO_DB */
> 
> 
> static fl_entry *
> new_fl_entry(ft_t type, int argc, char **argv)
> {
>       fl_entry *entry = NULL;
> 
>       entry = Alloc(1, fl_entry);
>       if(entry == NULL)
>       {
>               serror("new_fl_entry: malloc");
>               return NULL;
>       }
>       entry->path[0] = 0;
> 
>       switch (type) {
>       case UNIXIO :
>               entry->ops = &unio_ops;
>               break;
>       case STDIO :
>               entry->ops = &stdio_ops;
>               break;
>       case PIPE :
>               entry->ops = &pipe_ops;
>               break;
>       case FT_DB :
> #ifndef NO_DB
>               entry->ops = &ldmdb_ops;
> #else
>               uerror("new_fl_entry: DB type not enabled");
>               goto err;
>               /*NOTREACHED*/
> #endif /* !NO_DB */
>               break;
>       default :
>               uerror("new_fl_entry: unknown type %d", type);
>               goto err;
>       }
> 
>       entry->flags = 0;
>       entry->type = type;
>       entry->next = NULL;
>       entry->prev = NULL;
>       entry->path[0] = 0;
>       entry->private = 0;
> 
>       if( entry->ops->open(entry, argc, argv) == -1 )
>               goto err;
>       entry->serial = newSerial();
> 
>       return entry;
> err :
>       free_fl_entry(entry);
>       return NULL;
> }
> 
> --------------070609090501010602000202--