X-Git-Url: https://git.netwichtig.de/gitweb/?a=blobdiff_plain;f=src%2Fsrc%2Fdeliver.c;h=3dfa84261e1f468f9b4515a609a8ad66f0f0d3ec;hb=ec0eb1a387b88e6ddd3f1f9ba6ffad6422b7298a;hp=6a3df89bb2170abbfac927b4e8b22c0ee0993a9d;hpb=fa41615da7020d4d951ed3a0b98464bed66ff58b;p=user%2Fhenk%2Fcode%2Fexim.git diff --git a/src/src/deliver.c b/src/src/deliver.c index 6a3df89bb..3dfa84261 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -1929,7 +1929,7 @@ address. This feature is not available for shadow transports. */ if ( !shadowing && ( tp->return_output || tp->return_fail_output - || tp->log_output || tp->log_fail_output + || tp->log_output || tp->log_fail_output || tp->log_defer_output ) ) { uschar *error; @@ -2315,17 +2315,47 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message) -/* Put the chain of addrs on the defer list. Retry will happen -on the next queue run, earlier if triggered by a new message. -Loop for the next set of addresses. */ +/* Check transport for the given concurrency limit. Return TRUE if over +the limit (or an expansion failure), else FALSE and if there was a limit, +the key for the hints database used for the concurrency count. */ -static void -deferlist_chain(address_item * addr) +static BOOL +tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key) { -address_item * next; -for (next = addr; next->next; next = next->next) ; -next->next = addr_defer; -addr_defer = addr; +unsigned max_parallel; + +if (!tp->max_parallel) return FALSE; + +max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE); +if (expand_string_message) + { + log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " + "in %s transport (%s): %s", tp->name, addr->address, + expand_string_message); + return TRUE; + } + +if (max_parallel > 0) + { + uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name); + if (!enq_start(serialize_key, max_parallel)) + { + address_item * next; + DEBUG(D_transport) + debug_printf("skipping tpt %s because concurrency limit %u reached\n", + tp->name, max_parallel); + do + { + next = addr->next; + addr->message = US"concurrency limit reached for transport"; + addr->basic_errno = ERRNO_TRETRY; + post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0); + } while ((addr = next)); + return TRUE; + } + *key = serialize_key; + } +return FALSE; } @@ -2632,33 +2662,18 @@ while (addr_local) We use a hints DB entry, incremented here and decremented after the transport (and any shadow transport) completes. */ - if (tp->max_parallel) + if (tpt_parallel_check(tp, addr, &serialize_key)) { - int_eximarith_t max_parallel = - expand_string_integer(tp->max_parallel, TRUE); if (expand_string_message) { logflags |= LOG_PANIC; - log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " - "in %s transport (%s): %s", tp->name, addr->address, - expand_string_message); - for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next) + do + { + addr = addr->next; post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0); - continue; - } - if ( max_parallel > 0 - && !enq_start( - serialize_key = string_sprintf("tpt-serialize-%s", tp->name), - (unsigned) max_parallel) - ) - { - DEBUG(D_transport) - debug_printf("skipping tpt %s because parallelism limit %u reached\n", - tp->name, (unsigned) max_parallel); - - deferlist_chain(addr); - continue; + } while ((addr = addr2)); } + continue; /* Loop for the next set of addresses. */ } @@ -4091,29 +4106,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) The hints DB entry is decremented in par_reduce(), when we reap the transport process. */ - if (tp->max_parallel) - { - int_eximarith_t max_parallel = - expand_string_integer(tp->max_parallel, TRUE); - if (expand_string_message) - { - panicmsg = expand_string_message; + if (tpt_parallel_check(tp, addr, &serialize_key)) + if ((panicmsg = expand_string_message)) goto panic_continue; - } - if ( max_parallel > 0 - && !enq_start( - serialize_key = string_sprintf("tpt-serialize-%s", tp->name), - (unsigned) max_parallel) - ) - { - DEBUG(D_transport) - debug_printf("skipping tpt %s because parallelism limit %u reached\n", - tp->name, (unsigned) max_parallel); - - deferlist_chain(addr); - continue; - } - } + else + continue; /* Loop for the next set of addresses. */ /* Set up the expansion variables for this set of addresses */ @@ -4207,7 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) } else - deferlist_chain(addr); + { + while (next->next) next = next->next; + next->next = addr_defer; + addr_defer = addr; + } continue; } @@ -6674,7 +6675,6 @@ if (addr_local) so just queue them all. */ if (queue_run_local) - { while (addr_remote) { address_item *addr = addr_remote; @@ -6684,7 +6684,6 @@ if (queue_run_local) addr->message = US"remote deliveries suppressed"; (void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0); } - } /* Handle remote deliveries */