if ( !shadowing
&& ( tp->return_output || tp->return_fail_output
- || tp->log_output || tp->log_fail_output
+ || tp->log_output || tp->log_fail_output || tp->log_defer_output
) )
{
uschar *error;
}
}
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
/* Create the pipe for inter-process communication. */
if (pipe(pfd) != 0)
+
+/* Check transport for the given concurrency limit. Return TRUE if over
+the limit (or an expansion failure), else FALSE and if there was a limit,
+the key for the hints database used for the concurrency count. */
+
+static BOOL
+tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key)
+{
+unsigned max_parallel;
+
+if (!tp->max_parallel) return FALSE;
+
+max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE);
+if (expand_string_message)
+ {
+ log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+ "in %s transport (%s): %s", tp->name, addr->address,
+ expand_string_message);
+ return TRUE;
+ }
+
+if (max_parallel > 0)
+ {
+ uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name);
+ if (!enq_start(serialize_key, max_parallel))
+ {
+ address_item * next;
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because concurrency limit %u reached\n",
+ tp->name, max_parallel);
+ do
+ {
+ next = addr->next;
+ addr->message = US"concurrency limit reached for transport";
+ addr->basic_errno = ERRNO_TRETRY;
+ post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
+ } while ((addr = next));
+ return TRUE;
+ }
+ *key = serialize_key;
+ }
+return FALSE;
+}
+
+
+
/*************************************************
* Do local deliveries *
*************************************************/
int logflags = LOG_MAIN;
int logchar = dont_deliver? '*' : '=';
transport_instance *tp;
+ uschar * serialize_key = NULL;
/* Pick the first undelivered address off the chain */
last = next;
batch_count++;
}
- else anchor = &(next->next); /* Skip the address */
+ else anchor = &next->next; /* Skip the address */
}
}
if (!addr) continue;
+ /* If the transport is limited for parallellism, enforce that here.
+ We use a hints DB entry, incremented here and decremented after
+ the transport (and any shadow transport) completes. */
+
+ if (tpt_parallel_check(tp, addr, &serialize_key))
+ {
+ if (expand_string_message)
+ {
+ logflags |= LOG_PANIC;
+ do
+ {
+ addr = addr->next;
+ post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+ } while ((addr = addr2));
+ }
+ continue; /* Loop for the next set of addresses. */
+ }
+
+
/* So, finally, we do have some addresses that can be passed to the
transport. Before doing so, set up variables that are relevant to a
single delivery. */
deliver_set_expansions(NULL);
+ /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+ if (serialize_key) enq_end(serialize_key);
+
/* Now we can process the results of the real transport. We must take each
address off the chain first, because post_process_one() puts it on another
chain. */
"remote delivery process count got out of step");
parcount = 0;
}
- else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ else
+ {
+ transport_instance * tp = doneaddr->transport;
+ if (tp->max_parallel)
+ enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+ remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ }
}
}
address_item *last = addr;
address_item *next;
uschar * panicmsg;
+ uschar * serialize_key = NULL;
/* Pull the first address right off the list. */
return FALSE;
}
+ /* If the transport is limited for parallellism, enforce that here.
+ The hints DB entry is decremented in par_reduce(), when we reap the
+ transport process. */
+
+ if (tpt_parallel_check(tp, addr, &serialize_key))
+ if ((panicmsg = expand_string_message))
+ goto panic_continue;
+ else
+ continue; /* Loop for the next set of addresses. */
+
/* Set up the expansion variables for this set of addresses */
deliver_set_expansions(addr);
{
panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
tp->return_path, expand_string_message);
- goto panic_continue;
+ goto enq_continue;
}
}
if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
{
panicmsg = NULL;
- goto panic_continue;
+ goto enq_continue;
}
/* If this transport has a setup function, call it now so that it gets
if (!ok)
{
DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
- next = addr;
+ if (serialize_key) enq_end(serialize_key);
if (addr->fallback_hosts && !fallback)
{
- for (;; next = next->next)
+ for (next = addr; ; next = next->next)
{
next->host_list = next->fallback_hosts;
DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
}
else
- {
- while (next->next) next = next->next;
- next->next = addr_defer;
- addr_defer = addr;
- }
+ {
+ while (next->next) next = next->next;
+ next->next = addr_defer;
+ addr_defer = addr;
+ }
continue;
}
if (!pipe_done)
{
panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Find a free slot in the pardata list. Must do this after the possible
(void)close(pfd[pipe_write]);
(void)close(pfd[pipe_read]);
panicmsg = US"Unexpectedly no free subprocess slot";
- goto panic_continue;
+ goto enq_continue;
}
/* Now fork a subprocess to do the remote delivery, but before doing so,
(void)close(pfd[pipe_read]);
panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
addr->domain, strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Fork succeeded; increment the count, and remember relevant data for
continue;
+enq_continue:
+ if (serialize_key) enq_end(serialize_key);
panic_continue:
remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
continue;
so just queue them all. */
if (queue_run_local)
- {
while (addr_remote)
{
address_item *addr = addr_remote;
addr->message = US"remote deliveries suppressed";
(void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
}
- }
/* Handle remote deliveries */
#ifdef EXPERIMENTAL_EVENT
(void) event_raise(event_action, US"msg:complete", NULL);
#endif
-}
+ }
/* If there are deferred addresses, we are keeping this message because it is
not yet completed. Lose any temporary files that were catching output from