]> git.netwichtig.de Git - user/henk/code/exim.git/blobdiff - src/src/deliver.c
Pipe transport: fix log_defer_output option. Bug 840
[user/henk/code/exim.git] / src / src / deliver.c
index a1d16ecedb815d4b186c09f7e9c9d37aa3db22c6..3dfa84261e1f468f9b4515a609a8ad66f0f0d3ec 100644 (file)
@@ -1929,7 +1929,7 @@ address. This feature is not available for shadow transports. */
 
 if (  !shadowing
    && (  tp->return_output || tp->return_fail_output
-      || tp->log_output || tp->log_fail_output
+      || tp->log_output || tp->log_fail_output || tp->log_defer_output
    )  )
   {
   uschar *error;
@@ -1945,9 +1945,6 @@ if (  !shadowing
     }
   }
 
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
 /* Create the pipe for inter-process communication. */
 
 if (pipe(pfd) != 0)
@@ -2317,6 +2314,52 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
 
 
 
+
+/* Check transport for the given concurrency limit.  Return TRUE if over
+the limit (or an expansion failure), else FALSE and if there was a limit,
+the key for the hints database used for the concurrency count. */
+
+static BOOL
+tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key)
+{
+unsigned max_parallel;
+
+if (!tp->max_parallel) return FALSE;
+
+max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE);
+if (expand_string_message)
+  {
+  log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+       "in %s transport (%s): %s", tp->name, addr->address,
+       expand_string_message);
+  return TRUE;
+  }
+
+if (max_parallel > 0)
+  {
+  uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name);
+  if (!enq_start(serialize_key, max_parallel))
+    {
+    address_item * next;
+    DEBUG(D_transport)
+      debug_printf("skipping tpt %s because concurrency limit %u reached\n",
+                 tp->name, max_parallel);
+    do
+      {
+      next = addr->next;
+      addr->message = US"concurrency limit reached for transport";
+      addr->basic_errno = ERRNO_TRETRY;
+      post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
+      } while ((addr = next));
+    return TRUE;
+    }
+  *key = serialize_key;
+  }
+return FALSE;
+}
+
+
+
 /*************************************************
 *              Do local deliveries               *
 *************************************************/
@@ -2348,6 +2391,7 @@ while (addr_local)
   int logflags = LOG_MAIN;
   int logchar = dont_deliver? '*' : '=';
   transport_instance *tp;
+  uschar * serialize_key = NULL;
 
   /* Pick the first undelivered address off the chain */
 
@@ -2483,7 +2527,7 @@ while (addr_local)
         last = next;
         batch_count++;
         }
-      else anchor = &(next->next);      /* Skip the address */
+      else anchor = &next->next;        /* Skip the address */
       }
     }
 
@@ -2614,6 +2658,25 @@ while (addr_local)
 
   if (!addr) continue;
 
+  /* If the transport is limited for parallellism, enforce that here.
+  We use a hints DB entry, incremented here and decremented after
+  the transport (and any shadow transport) completes. */
+
+  if (tpt_parallel_check(tp, addr, &serialize_key))
+    {
+    if (expand_string_message)
+      {
+      logflags |= LOG_PANIC;
+      do
+       {
+       addr = addr->next;
+       post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+       } while ((addr = addr2));
+      }
+    continue;                  /* Loop for the next set of addresses. */
+    }
+
+
   /* So, finally, we do have some addresses that can be passed to the
   transport. Before doing so, set up variables that are relevant to a
   single delivery. */
@@ -2719,6 +2782,10 @@ while (addr_local)
 
   deliver_set_expansions(NULL);
 
+  /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+  if (serialize_key) enq_end(serialize_key);
+
   /* Now we can process the results of the real transport. We must take each
   address off the chain first, because post_process_one() puts it on another
   chain. */
@@ -3730,7 +3797,14 @@ while (parcount > max)
       "remote delivery process count got out of step");
     parcount = 0;
     }
-  else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+  else
+    {
+    transport_instance * tp = doneaddr->transport;
+    if (tp->max_parallel)
+      enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+    remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+    }
   }
 }
 
@@ -3853,6 +3927,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   address_item *last = addr;
   address_item *next;
   uschar * panicmsg;
+  uschar * serialize_key = NULL;
 
   /* Pull the first address right off the list. */
 
@@ -4027,6 +4102,16 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     return FALSE;
     }
 
+  /* If the transport is limited for parallellism, enforce that here.
+  The hints DB entry is decremented in par_reduce(), when we reap the
+  transport process. */
+
+  if (tpt_parallel_check(tp, addr, &serialize_key))
+    if ((panicmsg = expand_string_message))
+      goto panic_continue;
+    else
+      continue;                        /* Loop for the next set of addresses. */
+
   /* Set up the expansion variables for this set of addresses */
 
   deliver_set_expansions(addr);
@@ -4055,7 +4140,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
       {
       panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
        tp->return_path, expand_string_message);
-      goto panic_continue;
+      goto enq_continue;
       }
     }
 
@@ -4066,7 +4151,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
     {
     panicmsg = NULL;
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* If this transport has a setup function, call it now so that it gets
@@ -4104,11 +4189,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     if (!ok)
       {
       DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
-      next = addr;
+      if (serialize_key) enq_end(serialize_key);
 
       if (addr->fallback_hosts && !fallback)
         {
-        for (;; next = next->next)
+       for (next = addr; ; next = next->next)
           {
           next->host_list = next->fallback_hosts;
           DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
@@ -4119,11 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
         }
 
       else
-        {
-        while (next->next) next = next->next;
-        next->next = addr_defer;
-        addr_defer = addr;
-        }
+       {
+       while (next->next) next = next->next;
+       next->next = addr_defer;
+       addr_defer = addr;
+       }
 
       continue;
       }
@@ -4185,7 +4270,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   if (!pipe_done)
     {
     panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Find a free slot in the pardata list. Must do this after the possible
@@ -4203,7 +4288,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     (void)close(pfd[pipe_write]);
     (void)close(pfd[pipe_read]);
     panicmsg = US"Unexpectedly no free subprocess slot";
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Now fork a subprocess to do the remote delivery, but before doing so,
@@ -4532,7 +4617,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     (void)close(pfd[pipe_read]);
     panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
         addr->domain, strerror(errno));
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Fork succeeded; increment the count, and remember relevant data for
@@ -4567,6 +4652,8 @@ for (delivery_count = 0; addr_remote; delivery_count++)
 
   continue;
 
+enq_continue:
+  if (serialize_key) enq_end(serialize_key);
 panic_continue:
   remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
   continue;
@@ -6588,7 +6675,6 @@ if (addr_local)
 so just queue them all. */
 
 if (queue_run_local)
-  {
   while (addr_remote)
     {
     address_item *addr = addr_remote;
@@ -6598,7 +6684,6 @@ if (queue_run_local)
     addr->message = US"remote deliveries suppressed";
     (void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
     }
-  }
 
 /* Handle remote deliveries */
 
@@ -7448,7 +7533,7 @@ if (!addr_defer)
 #ifdef EXPERIMENTAL_EVENT
   (void) event_raise(event_action, US"msg:complete", NULL);
 #endif
-}
+  }
 
 /* If there are deferred addresses, we are keeping this message because it is
 not yet completed. Lose any temporary files that were catching output from