summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeremy Harris <jgh146exb@wizmail.org>2023-02-13 11:34:38 +0000
committerJeremy Harris <jgh146exb@wizmail.org>2023-02-13 11:34:38 +0000
commit1e835086d1592bdfbcd8577133965b78470840ac (patch)
treef59b796ea82b41bfa78a3ee620e877af9fd18b0c
parent63deec8a3ba77fcabf405d9c30fdd65a8b909526 (diff)
Named queues: support multiple queue-runners from single daemon
-rw-r--r--doc/doc-docbook/spec.xfpt9
-rw-r--r--doc/doc-txt/NewStuff4
-rw-r--r--src/src/daemon.c431
-rw-r--r--src/src/exim.c276
-rw-r--r--src/src/functions.h19
-rw-r--r--src/src/globals.c4
-rw-r--r--src/src/globals.h4
-rw-r--r--src/src/macros.h6
-rw-r--r--src/src/queue.c92
-rw-r--r--src/src/structs.h18
-rw-r--r--test/scripts/0000-Basic/05762
-rw-r--r--test/stdout/05764
12 files changed, 553 insertions, 316 deletions
diff --git a/doc/doc-docbook/spec.xfpt b/doc/doc-docbook/spec.xfpt
index fd2b47f22..6199b5d89 100644
--- a/doc/doc-docbook/spec.xfpt
+++ b/doc/doc-docbook/spec.xfpt
@@ -4615,6 +4615,15 @@ combined daemon at system boot time is to use a command such as
Such a daemon listens for incoming SMTP calls, and also starts a queue runner
process every 30 minutes.
+.new
+.cindex "named queues" "queue runners"
+It is possible to set up runners for multiple named queues within one daemon,
+For example:
+.code
+exim -qGhipri/2m -q10m -qqGmailinglist/1h
+.endd
+.wen
+
When a daemon is started by &%-q%& with a time value, but without &%-bd%&, no
pid file is written unless one is explicitly requested by the &%-oP%& option.
diff --git a/doc/doc-txt/NewStuff b/doc/doc-txt/NewStuff
index c1e139e35..adad7ec57 100644
--- a/doc/doc-txt/NewStuff
+++ b/doc/doc-txt/NewStuff
@@ -19,7 +19,9 @@ Version 4.97
5. The smtp transport option "max_rcpt" is now expanded before use.
- 6. The tls_eccurve option for OpenSSL now takes a list of group names
+ 6. The tls_eccurve option for OpenSSL now takes a list of group names.
+
+ 7. Queue runners for several queues can now be started from one daemon.
Version 4.96
------------
diff --git a/src/src/daemon.c b/src/src/daemon.c
index 7666626ed..b0533c28f 100644
--- a/src/src/daemon.c
+++ b/src/src/daemon.c
@@ -3,7 +3,7 @@
*************************************************/
/* Copyright (c) The Exim Maintainers 2020 - 2022 */
-/* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) University of Cambridge 1995 - 2023 */
/* See the file NOTICE for conditions of use and distribution. */
/* SPDX-License-Identifier: GPL-2.0-or-later */
@@ -16,17 +16,20 @@
/* Structure for holding data for each SMTP connection */
typedef struct smtp_slot {
- pid_t pid; /* pid of the spawned reception process */
- uschar *host_address; /* address of the client host */
+ pid_t pid; /* pid of the spawned reception process */
+ uschar * host_address; /* address of the client host */
} smtp_slot;
+typedef struct runner_slot {
+ pid_t pid; /* pid of spawned queue-runner process */
+ const uschar *queue_name; /* pointer to the name in the qrunner struct */
+} runner_slot;
+
/* An empty slot for initializing (Standard C does not allow constructor
expressions in assignments except as initializers in declarations). */
static smtp_slot empty_smtp_slot = { .pid = 0, .host_address = NULL };
-
-
/*************************************************
* Local static variables *
*************************************************/
@@ -39,9 +42,11 @@ static int accept_retry_count = 0;
static int accept_retry_errno;
static BOOL accept_retry_select_failed;
-static int queue_run_count = 0;
-static pid_t *queue_pid_slots = NULL;
-static smtp_slot *smtp_slots = NULL;
+static int queue_run_count = 0; /* current runners */
+
+static unsigned queue_runner_slot_count = 0;
+static runner_slot * queue_runner_slots = NULL;
+static smtp_slot * smtp_slots = NULL;
static BOOL write_pid = TRUE;
@@ -920,19 +925,30 @@ while ((pid = waitpid(-1, &status, WNOHANG)) > 0)
/* If it wasn't an accepting process, see if it was a queue-runner
process that we are tracking. */
- if (queue_pid_slots)
- {
- int max = atoi(CS expand_string(queue_run_max));
- for (int i = 0; i < max; i++)
- if (queue_pid_slots[i] == pid)
+ if (queue_runner_slots)
+ for (unsigned i = 0; i < queue_runner_slot_count; i++)
+ {
+ runner_slot * r = queue_runner_slots + i;
+ if (r->pid == pid)
{
- queue_pid_slots[i] = 0;
+ r->pid = 0; /* free up the slot */
+
if (--queue_run_count < 0) queue_run_count = 0;
DEBUG(D_any) debug_printf("%d queue-runner process%s now running\n",
- queue_run_count, (queue_run_count == 1)? "" : "es");
+ queue_run_count, queue_run_count == 1 ? "" : "es");
+
+ for (qrunner ** p = &qrunners, * q = qrunners; q; p = &q->next, q = *p)
+ if (q->name == r->queue_name)
+ {
+ if (q->interval) /* a periodic queue run */
+ q->run_count--;
+ else /* a one-time run */
+ *p = q->next; /* drop this qrunner */
+ break;
+ }
break;
}
- }
+ }
}
}
@@ -1232,7 +1248,11 @@ bad:
}
+/* Data for notifier-triggered queue runs */
+
static uschar queuerun_msgid[MESSAGE_ID_LENGTH+1];
+static const uschar * queuerun_msg_qname;
+
/* The notifier socket has something to read. Pull the message from it, decode
and do the action.
@@ -1320,7 +1340,12 @@ switch (buf[0])
/* this should be a message_id */
DEBUG(D_queue_run)
debug_printf("%s: qrunner trigger: %s\n", __FUNCTION__, buf+1);
+
memcpy(queuerun_msgid, buf+1, MESSAGE_ID_LENGTH+1);
+
+ for (qrunner * q = qrunners; q; q = q->next)
+ if (Ustrcmp(q->name, buf+1+MESSAGE_ID_LENGTH+1) == 0)
+ { queuerun_msg_qname = q->name; break; }
return TRUE;
#endif
@@ -1383,7 +1408,45 @@ ALARM(resignal_interval);
}
-static void
+/* Re-sort the qrunners list, and return the shortest interval.
+That could be negatime.
+The next-tick times should have been updated by any runs initiated,
+though will not be when the global limit on runners was reached.
+
+Unlikely to have many queues, so insertion-sort.
+*/
+
+static int
+next_qrunner_interval(void)
+{
+qrunner * sorted = NULL;
+for (qrunner * q = qrunners, * next; q; q = next)
+ {
+ next = q->next;
+ q->next = NULL;
+ if (sorted)
+ {
+ qrunner ** p = &sorted;
+ for (qrunner * qq; qq = *p; p = &(qq->next))
+ if ( q->next_tick < qq->next_tick
+ || q->next_tick == qq->next_tick && q->interval < qq->interval
+ )
+ {
+ *p = q;
+ q->next = qq;
+ goto INSERTED;
+ }
+ *p = q;
+ INSERTED: ;
+ }
+ else
+ sorted = q;
+ }
+qrunners = sorted;
+return qrunners ? qrunners->next_tick - time(NULL) : 0;
+}
+
+static int
daemon_qrun(int local_queue_run_max, struct pollfd * fd_polls, int listen_socket_count)
{
DEBUG(D_any) debug_printf("%s received\n",
@@ -1392,140 +1455,207 @@ DEBUG(D_any) debug_printf("%s received\n",
#endif
"SIGALRM");
-/* Do a full queue run in a child process, if required, unless we already
-have enough queue runners on the go. If we are not running as root, a
-re-exec is required. */
-
-if ( queue_interval > 0
- && (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max))
- {
-pid_t pid;
+/* Do a full queue run in a child process, if required, unless we already have
+enough queue runners on the go. If we are not running as root, a re-exec is
+required. In the calling process, restart the alamr timer for the next run. */
- if ((pid = exim_fork(US"queue-runner")) == 0)
+if (is_multiple_qrun())
+ if (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max)
{
- /* Disable debugging if it's required only for the daemon process. We
- leave the above message, because it ties up with the "child ended"
- debugging messages. */
+ qrunner * q = NULL;
+
+#ifndef DISABLE_QUEUE_RAMP
+ if (*queuerun_msgid) /* See if we can start another runner for this queue */
+ {
+ for (qrunner * qq = qrunners; qq; qq = qq->next)
+ if (qq->name == queuerun_msg_qname)
+ {
+ q = qq->run_count < qq->run_max ? qq : NULL;
+ break;
+ }
+ }
+ else
+#endif
+ /* In order of run priority, find the first queue for which we can start
+ a runner */
- if (f.debug_daemon) debug_selector = 0;
+ for (q = qrunners; q; q = q->next)
+ if (q->run_count < q->run_max) break;
+
+ if (q)
+ {
+ pid_t pid;
- /* Close any open listening sockets in the child */
+ /* Bump this queue's next-tick by it's interval */
- close_daemon_sockets(daemon_notifier_fd,
- fd_polls, listen_socket_count);
+ if (q->interval)
+ {
+ time_t now = time(NULL);
+ do ; while ((q->next_tick += q->interval) <= now);
+ }
- /* Reset SIGHUP and SIGCHLD in the child in both cases. */
+ if ((pid = exim_fork(US"queue-runner")) == 0)
+ {
+ /* Disable debugging if it's required only for the daemon process. We
+ leave the above message, because it ties up with the "child ended"
+ debugging messages. */
- signal(SIGHUP, SIG_DFL);
- signal(SIGCHLD, SIG_DFL);
- signal(SIGTERM, SIG_DFL);
- signal(SIGINT, SIG_DFL);
+ if (f.debug_daemon) debug_selector = 0;
- /* Re-exec if privilege has been given up, unless deliver_drop_
- privilege is set. Reset SIGALRM before exec(). */
+ /* Close any open listening sockets in the child */
- if (geteuid() != root_uid && !deliver_drop_privilege)
- {
- uschar opt[8];
- uschar *p = opt;
- uschar *extra[7];
- int extracount = 1;
-
- signal(SIGALRM, SIG_DFL);
- *p++ = '-';
- *p++ = 'q';
- if ( f.queue_2stage
+ close_daemon_sockets(daemon_notifier_fd,
+ fd_polls, listen_socket_count);
+
+ /* Reset SIGHUP and SIGCHLD in the child in both cases. */
+
+ signal(SIGHUP, SIG_DFL);
+ signal(SIGCHLD, SIG_DFL);
+ signal(SIGTERM, SIG_DFL);
+ signal(SIGINT, SIG_DFL);
+
+ /* Re-exec if privilege has been given up, unless deliver_drop_
+ privilege is set. Reset SIGALRM before exec(). */
+
+ if (geteuid() != root_uid && !deliver_drop_privilege)
+ {
+ uschar opt[8];
+ uschar *p = opt;
+ uschar *extra[7];
+ int extracount = 1;
+
+ signal(SIGALRM, SIG_DFL);
+ queue_name = US"";
+
+ *p++ = '-';
+ *p++ = 'q';
+ if ( q->queue_2stage
#ifndef DISABLE_QUEUE_RAMP
- && !*queuerun_msgid
+ && !*queuerun_msgid
#endif
- ) *p++ = 'q';
- if (f.queue_run_first_delivery) *p++ = 'i';
- if (f.queue_run_force) *p++ = 'f';
- if (f.deliver_force_thaw) *p++ = 'f';
- if (f.queue_run_local) *p++ = 'l';
- *p = 0;
- extra[0] = *queue_name
- ? string_sprintf("%sG%s", opt, queue_name) : opt;
+ ) *p++ = 'q';
+ if (q->queue_run_first_delivery) *p++ = 'i';
+ if (q->queue_run_force) *p++ = 'f';
+ if (q->deliver_force_thaw) *p++ = 'f';
+ if (q->queue_run_local) *p++ = 'l';
+ *p = 0;
+
+ extra[0] = q->name
+ ? string_sprintf("%sG%s", opt, q->name) : opt;
#ifndef DISABLE_QUEUE_RAMP
- if (*queuerun_msgid)
- {
- log_write(0, LOG_MAIN, "notify triggered queue run");
- extra[extracount++] = queuerun_msgid; /* Trigger only the */
- extra[extracount++] = queuerun_msgid; /* one message */
- }
+ if (*queuerun_msgid)
+ {
+ log_write(0, LOG_MAIN, "notify triggered queue run");
+ extra[extracount++] = queuerun_msgid; /* Trigger only the */
+ extra[extracount++] = queuerun_msgid; /* one message */
+ }
#endif
- /* If -R or -S were on the original command line, ensure they get
- passed on. */
+ /* If -R or -S were on the original command line, ensure they get
+ passed on. */
- if (deliver_selectstring)
- {
- extra[extracount++] = f.deliver_selectstring_regex ? US"-Rr" : US"-R";
- extra[extracount++] = deliver_selectstring;
- }
+ if (deliver_selectstring)
+ {
+ extra[extracount++] = f.deliver_selectstring_regex ? US"-Rr" : US"-R";
+ extra[extracount++] = deliver_selectstring;
+ }
- if (deliver_selectstring_sender)
- {
- extra[extracount++] = f.deliver_selectstring_sender_regex
- ? US"-Sr" : US"-S";
- extra[extracount++] = deliver_selectstring_sender;
- }
+ if (deliver_selectstring_sender)
+ {
+ extra[extracount++] = f.deliver_selectstring_sender_regex
+ ? US"-Sr" : US"-S";
+ extra[extracount++] = deliver_selectstring_sender;
+ }
- /* Overlay this process with a new execution. */
+ /* Overlay this process with a new execution. */
- (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount,
- extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]);
+ (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount,
+ extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]);
- /* Control never returns here. */
- }
+ /* Control never returns here. */
+ }
- /* No need to re-exec; SIGALRM remains set to the default handler */
+ /* No need to re-exec; SIGALRM remains set to the default handler */
#ifndef DISABLE_QUEUE_RAMP
- if (*queuerun_msgid)
- {
- log_write(0, LOG_MAIN, "notify triggered queue run");
- f.queue_2stage = FALSE;
- queue_run(queuerun_msgid, queuerun_msgid, FALSE);
- }
- else
+ if (*queuerun_msgid)
+ {
+ log_write(0, LOG_MAIN, "notify triggered queue run");
+ f.queue_2stage = FALSE;
+ queue_run(q, queuerun_msgid, queuerun_msgid, FALSE);
+ }
+ else
#endif
- queue_run(NULL, NULL, FALSE);
- exim_underbar_exit(EXIT_SUCCESS);
- }
+ queue_run(q, NULL, NULL, FALSE);
+ exim_underbar_exit(EXIT_SUCCESS);
+ }
- if (pid < 0)
- {
- log_write(0, LOG_MAIN|LOG_PANIC, "daemon: fork of queue-runner "
- "process failed: %s", strerror(errno));
- log_close_all();
- }
- else
- {
- for (int i = 0; i < local_queue_run_max; ++i)
- if (queue_pid_slots[i] <= 0)
+ if (pid < 0)
{
- queue_pid_slots[i] = pid;
- queue_run_count++;
- break;
+ log_write(0, LOG_MAIN|LOG_PANIC, "daemon: fork of queue-runner "
+ "process failed: %s", strerror(errno));
+ log_close_all();
}
- DEBUG(D_any) debug_printf("%d queue-runner process%s running\n",
- queue_run_count, queue_run_count == 1 ? "" : "es");
+ else
+ {
+ for (int i = 0; i < local_queue_run_max; ++i)
+ if (queue_runner_slots[i].pid <= 0)
+ {
+ queue_runner_slots[i].pid = pid;
+ queue_runner_slots[i].queue_name = q->name;
+ q->run_count++;
+ queue_run_count++;
+ break;
+ }
+ DEBUG(D_any) debug_printf("%d queue-runner process%s running\n",
+ queue_run_count, queue_run_count == 1 ? "" : "es");
+ }
+ }
}
- }
-
-/* Reset the alarm clock */
sigalrm_seen = FALSE;
#ifndef DISABLE_QUEUE_RAMP
-if (*queuerun_msgid)
+if (*queuerun_msgid) /* it was a fast-ramp kick */
*queuerun_msgid = 0;
-else
+else /* periodic or one-time queue run */
#endif
- ALARM(queue_interval);
+ { /* Impose a minimum 1s tick, even when a run was outstanding */
+ int interval = next_qrunner_interval();
+ if (interval <= 0) interval = 1;
+
+ if (qrunners) /* there are still periodic qrunners */
+ {
+ ALARM(interval);
+ return interval;
+ }
+ }
+return 0;
}
+
+
+
+const uschar *
+describe_queue_runners(void)
+{
+gstring * g = NULL;
+
+if (!is_multiple_qrun()) return US"no queue runs";
+
+for (qrunner * q = qrunners; q; q = q->next)
+ {
+ g = string_catn(g, US"-q", 2);
+ if (q->name) g = string_append(g, 3, US"G", q->name, US"/");
+ g = string_cat(g, readconf_printtime(q->interval));
+ g = string_catn(g, US" ", 1);
+ }
+gstring_trim(g, 1);
+gstring_release_unused(g);
+return string_from_gstring(g);
+}
+
+
/*************************************************
* Exim Daemon Mainline *
*************************************************/
@@ -1557,7 +1687,32 @@ struct pollfd * fd_polls, * tls_watch_poll = NULL, * dnotify_poll = NULL;
int listen_socket_count = 0, poll_fd_count;
ip_address_item * addresses = NULL;
time_t last_connection_time = (time_t)0;
-int local_queue_run_max = atoi(CS expand_string(queue_run_max));
+int local_queue_run_max = 0;
+BOOL queue_run_max_has_dollar;
+
+if (is_multiple_qrun())
+
+ /* Nuber of runner-tracking structs needed: If the option queue_run_max has
+ no expandable elements then it is the overall maximum; else we assume it
+ depends on the queue name, and add them up to get the maximum.
+ Evaluate both that and the individual limits. */
+
+ if (Ustrchr(queue_run_max, '$') != NULL)
+ {
+ for (qrunner * q = qrunners; q; q = q->next)
+ {
+ queue_name = q->name;
+ local_queue_run_max +=
+ (q->run_max = atoi(CS expand_string(queue_run_max)));
+ }
+ queue_name = US"";
+ }
+ else
+ {
+ local_queue_run_max = atoi(CS expand_string(queue_run_max));
+ for (qrunner * q = qrunners; q; q = q->next)
+ q->run_max = local_queue_run_max;
+ }
process_purpose = US"daemon";
@@ -2216,10 +2371,11 @@ originator_login = (pw = getpwuid(exim_uid))
/* Get somewhere to keep the list of queue-runner pids if we are keeping track
of them (and also if we are doing queue runs). */
-if (queue_interval > 0 && local_queue_run_max > 0)
+if (is_multiple_qrun() && local_queue_run_max > 0)
{
- queue_pid_slots = store_get(local_queue_run_max * sizeof(pid_t), GET_UNTAINTED);
- for (int i = 0; i < local_queue_run_max; i++) queue_pid_slots[i] = 0;
+ queue_runner_slot_count = local_queue_run_max;
+ queue_runner_slots = store_get(local_queue_run_max * sizeof(runner_slot), GET_UNTAINTED);
+ memset(queue_runner_slots, 0, local_queue_run_max * sizeof(runner_slot));
}
/* Set up the handler for termination of child processes, and the one
@@ -2233,9 +2389,12 @@ os_non_restarting_signal(SIGTERM, main_sigterm_handler);
os_non_restarting_signal(SIGINT, main_sigterm_handler);
/* If we are to run the queue periodically, pretend the alarm has just gone
-off. This will cause the first queue-runner to get kicked off straight away. */
+off. This will cause the first queue-runner to get kicked off straight away.
+Get an initial sort of the list of queues, to prioritize the initial q-runs */
-sigalrm_seen = (queue_interval > 0);
+
+if ((sigalrm_seen = is_multiple_qrun()))
+ (void) next_qrunner_interval();
/* Log the start up of a daemon - at least one of listening or queue running
must be set up. */
@@ -2264,20 +2423,16 @@ else if (f.daemon_listen)
int smtps_ports = 0;
ip_address_item * ipa;
uschar * p;
- uschar * qinfo = queue_interval > 0
- ? string_sprintf("-q%s%s",
- f.queue_2stage ? "q" : "", readconf_printtime(queue_interval))
- : US"no queue runs";
+ const uschar * qinfo = describe_queue_runners();
/* Build a list of listening addresses in big_buffer, but limit it to 10
items. The style is for backwards compatibility.
- It is now possible to have some ports listening for SMTPS (the old,
- deprecated protocol that starts TLS without using STARTTLS), and others
- listening for standard SMTP. Keep their listings separate. */
+ It is possible to have some ports listening for SMTPS (as opposed to TLS
+ startted by STARTTLS), and others listening for standard SMTP. Keep their
+ listings separate. */
for (int j = 0, i; j < 2; j++)
- {
for (i = 0, ipa = addresses; i < 10 && ipa; i++, ipa = ipa->next)
{
/* First time round, look for SMTP ports; second time round, look for
@@ -2315,7 +2470,7 @@ else if (f.daemon_listen)
&& Ustrcmp(ipa->address, i2->address) == 0
)
{ /* found; append port to list */
- for (p = i2->log; *p; ) p++; /* end of existing string */
+ for (p = i2->log; *p; ) p++; /* end of existing string { */
if (*--p == '}') *p = '\0'; /* drop EOL */
while (isdigit(*--p)) ; /* char before port */
@@ -2331,7 +2486,6 @@ else if (f.daemon_listen)
}
}
}
- }
p = big_buffer;
for (int j = 0, i; j < 2; j++)
@@ -2367,11 +2521,9 @@ else if (f.daemon_listen)
version_string, qinfo, big_buffer);
}
-else
+else /* no listening sockets, only queue-runs */
{
- uschar * s = *queue_name
- ? string_sprintf("-qG%s/%s", queue_name, readconf_printtime(queue_interval))
- : string_sprintf("-q%s", readconf_printtime(queue_interval));
+ const uschar * s = describe_queue_runners();
log_write(0, LOG_MAIN,
"exim %s daemon started: pid=%d, %s, not listening for SMTP",
version_string, getpid(), s);
@@ -2445,6 +2597,8 @@ report_time_since(&timestamp_startup, US"daemon loop start"); /* testcase 0022 *
for (;;)
{
+ int nolisten_sleep = 60;
+
if (sigterm_seen)
daemon_die(); /* Does not return */
@@ -2458,7 +2612,8 @@ for (;;)
if (inetd_wait_timeout > 0)
daemon_inetd_wtimeout(last_connection_time); /* Might not return */
else
- daemon_qrun(local_queue_run_max, fd_polls, listen_socket_count);
+ nolisten_sleep =
+ daemon_qrun(local_queue_run_max, fd_polls, listen_socket_count);
/* Sleep till a connection happens if listening, and handle the connection if
@@ -2663,7 +2818,7 @@ for (;;)
else
{
struct pollfd p;
- poll(&p, 0, queue_interval * 1000);
+ poll(&p, 0, nolisten_sleep * 1000);
handle_ending_processes();
}
diff --git a/src/src/exim.c b/src/src/exim.c
index dcc71ea45..9cba8d51e 100644
--- a/src/src/exim.c
+++ b/src/src/exim.c
@@ -1688,6 +1688,33 @@ else
/*************************************************
+* Queue-runner operations *
+*************************************************/
+
+/* Prefix a new qrunner descriptor to the qrunners list */
+
+static qrunner *
+alloc_qrunner(void)
+{
+qrunner * q = qrunners;
+qrunners = store_get(sizeof(qrunner), GET_UNTAINTED);
+memset(qrunners, 0, sizeof(qrunner)); /* default queue, zero interval */
+qrunners->next = q;
+qrunners->next_tick = time(NULL); /* run right away */
+return qrunners;
+}
+
+static qrunner *
+alloc_onetime_qrunner(void)
+{
+qrunners = store_get_perm(sizeof(qrunner), GET_UNTAINTED);
+memset(qrunners, 0, sizeof(qrunner)); /* default queue, zero interval */
+qrunners->next_tick = time(NULL); /* run right away */
+qrunners->run_max = 1;
+}
+
+
+/*************************************************
* Entry point and high-level code *
*************************************************/
@@ -2051,7 +2078,7 @@ this is a smail convention. */
if ((namelen == 4 && Ustrcmp(argv[0], "runq") == 0) ||
(namelen > 4 && Ustrncmp(argv[0] + namelen - 5, "/runq", 5) == 0))
{
- queue_interval = 0;
+ alloc_onetime_qrunner();
receiving_message = FALSE;
called_as = US"-runq";
}
@@ -2110,7 +2137,7 @@ on the second character (the one after '-'), to save some effort. */
BOOL badarg = FALSE;
uschar * arg = argv[i];
uschar * argrest;
- int switchchar;
+ uschar switchchar;
/* An argument not starting with '-' is the start of a recipients list;
break out of the options-scanning loop. */
@@ -3504,87 +3531,100 @@ on the second character (the one after '-'), to save some effort. */
}
break;
+ /* -q: set up queue runs */
case 'q':
- receiving_message = FALSE;
- if (queue_interval >= 0)
- exim_fail("exim: -q specified more than once\n");
-
- /* -qq...: Do queue runs in a 2-stage manner */
-
- if (*argrest == 'q')
{
- f.queue_2stage = TRUE;
- argrest++;
- }
+ BOOL two_stage, first_del, force, thaw = FALSE, local;
- /* -qi...: Do only first (initial) deliveries */
+ receiving_message = FALSE;
- if (*argrest == 'i')
- {
- f.queue_run_first_delivery = TRUE;
- argrest++;
- }
+ /* -qq...: Do queue runs in a 2-stage manner */
- /* -qf...: Run the queue, forcing deliveries
- -qff..: Ditto, forcing thawing as well */
+ if ((two_stage = *argrest == 'q'))
+ argrest++;
- if (*argrest == 'f')
- {
- f.queue_run_force = TRUE;
- if (*++argrest == 'f')
- {
- f.deliver_force_thaw = TRUE;
- argrest++;
- }
- }
+ /* -qi...: Do only first (initial) deliveries */
- /* -q[f][f]l...: Run the queue only on local deliveries */
+ if ((first_del = *argrest == 'i'))
+ argrest++;
- if (*argrest == 'l')
- {
- f.queue_run_local = TRUE;
- argrest++;
- }
+ /* -qf...: Run the queue, forcing deliveries
+ -qff..: Ditto, forcing thawing as well */
- /* -q[f][f][l][G<name>]... Work on the named queue */
+ if ((force = *argrest == 'f'))
+ if ((thaw = *++argrest == 'f'))
+ argrest++;
- if (*argrest == 'G')
- {
- int i;
- for (argrest++, i = 0; argrest[i] && argrest[i] != '/'; ) i++;
- exim_len_fail_toolong(i, EXIM_DRIVERNAME_MAX, "-q*G<name>");
- queue_name = string_copyn(argrest, i);
- argrest += i;
- if (*argrest == '/') argrest++;
- }
+ /* -q[f][f]l...: Run the queue only on local deliveries */
+
+ if ((local = *argrest == 'l'))
+ argrest++;
- /* -q[f][f][l][G<name>]: Run the queue, optionally forced, optionally local
- only, optionally named, optionally starting from a given message id. */
+ /* -q[f][f][l][G<name>]... Work on the named queue */
- if (!(list_queue || count_queue))
- if ( !*argrest
- && (i + 1 >= argc || argv[i+1][0] == '-' || mac_ismsgid(argv[i+1])))
+ if (*argrest == 'G')
{
- queue_interval = 0;
- if (i+1 < argc && mac_ismsgid(argv[i+1]))
- start_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
- if (i+1 < argc && mac_ismsgid(argv[i+1]))
- stop_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
+ int i;
+ for (argrest++, i = 0; argrest[i] && argrest[i] != '/'; ) i++;
+ exim_len_fail_toolong(i, EXIM_DRIVERNAME_MAX, "-q*G<name>");
+ queue_name = string_copyn(argrest, i);
+ argrest += i;
+ if (*argrest == '/') argrest++;
}
- /* -q[f][f][l][G<name>/]<n>: Run the queue at regular intervals, optionally
- forced, optionally local only, optionally named. */
+ /* -q[f][f][l][G<name>]: Run the queue, optionally forced, optionally local
+ only, optionally named, optionally starting from a given message id. */
- else if ((queue_interval = readconf_readtime(*argrest ? argrest : argv[++i],
- 0, FALSE)) <= 0)
- exim_fail("exim: bad time value %s: abandoned\n", argv[i]);
- break;
+ if (!(list_queue || count_queue))
+ {
+ qrunner * q;
+
+ if ( !*argrest
+ && (i + 1 >= argc || argv[i+1][0] == '-' || mac_ismsgid(argv[i+1])))
+ {
+ q = alloc_onetime_qrunner();
+ if (i+1 < argc && mac_ismsgid(argv[i+1]))
+ start_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
+ if (i+1 < argc && mac_ismsgid(argv[i+1]))
+ stop_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
+ }
+
+ /* -q[f][f][l][G<name>/]<n>: Run the queue at regular intervals, optionally
+ forced, optionally local only, optionally named. */
+
+ else
+ {
+ int intvl = readconf_readtime(*argrest ? argrest : argv[++i], 0, FALSE);
+ if (intvl <= 0)
+ exim_fail("exim: bad time value %s: abandoned\n", argv[i]);
+
+ for (qrunner * qq = qrunners; qq; qq = qq->next)
+ if ( queue_name && qq->name && Ustrcmp(queue_name, qq->name) == 0
+ || !queue_name && !qq->name)
+ exim_fail("exim: queue-runner specified more than once\n");
+
+ q = alloc_qrunner();
+ q->interval = intvl;
+ }
+
+ q->name = *queue_name ? queue_name : NULL; /* will be NULL for the default queue */
+ q->queue_run_force = force;
+ q->deliver_force_thaw = thaw;
+ q->queue_run_first_delivery = first_del;
+ q->queue_run_local = local;
+ q->queue_2stage = two_stage;
+ }
+
+ break;
+ }
case 'R': /* Synonymous with -qR... */
+ case 'S': /* Synonymous with -qS... */
{
- const uschar *tainted_selectstr;
+ const uschar * tainted_selectstr;
+ uschar * s;
receiving_message = FALSE;
@@ -3594,20 +3634,28 @@ on the second character (the one after '-'), to save some effort. */
-Rrf: Regex and force
-Rrff: Regex and force and thaw
+ -S...: Like -R but works on sender.
+
in all cases provided there are no further characters in this
argument. */
+ alloc_onetime_qrunner();
+ qrunners->queue_2stage = f.queue_2stage;
if (*argrest)
for (int i = 0; i < nelem(rsopts); i++)
if (Ustrcmp(argrest, rsopts[i]) == 0)
{
- if (i != 2) f.queue_run_force = TRUE;
- if (i >= 2) f.deliver_selectstring_regex = TRUE;
- if (i == 1 || i == 4) f.deliver_force_thaw = TRUE;
+ if (i != 2) qrunners->queue_run_force = TRUE;
+ if (i >= 2)
+ if (switchchar == 'R')
+ f.deliver_selectstring_regex = TRUE;
+ else
+ f.deliver_selectstring_sender_regex = TRUE;
+ if (i == 1 || i == 4) qrunners->deliver_force_thaw = TRUE;
argrest += Ustrlen(rsopts[i]);
}
- /* -R: Set string to match in addresses for forced queue run to
+ /* -R or -S: Set string to match in addresses for forced queue run to
pick out particular messages. */
/* Avoid attacks from people providing very long strings, and do so before
@@ -3617,58 +3665,22 @@ on the second character (the one after '-'), to save some effort. */
else if (i+1 < argc)
tainted_selectstr = argv[++i];
else
- exim_fail("exim: string expected after -R\n");
- deliver_selectstring = string_copy_taint(
+ exim_fail("exim: string expected after %s\n", switchchar == 'R' ? "-R" : "-S");
+
+ s = string_copy_taint(
exim_str_fail_toolong(tainted_selectstr, EXIM_EMAILADDR_MAX, "-R"),
GET_TAINTED);
- }
- break;
-
- /* -r: an obsolete synonym for -f (see above) */
-
-
- /* -S: Like -R but works on sender. */
-
- case 'S': /* Synonymous with -qS... */
- {
- const uschar *tainted_selectstr;
-
- receiving_message = FALSE;
-
- /* -Sf: As -S (below) but force all deliveries,
- -Sff: Ditto, but also thaw all frozen messages,
- -Sr: String is regex
- -Srf: Regex and force
- -Srff: Regex and force and thaw
-
- in all cases provided there are no further characters in this
- argument. */
-
- if (*argrest)
- for (int i = 0; i < nelem(rsopts); i++)
- if (Ustrcmp(argrest, rsopts[i]) == 0)
- {
- if (i != 2) f.queue_run_force = TRUE;
- if (i >= 2) f.deliver_selectstring_sender_regex = TRUE;
- if (i == 1 || i == 4) f.deliver_force_thaw = TRUE;
- argrest += Ustrlen(rsopts[i]);
- }
-
- /* -S: Set string to match in addresses for forced queue run to
- pick out particular messages. */
- if (*argrest)
- tainted_selectstr = argrest;
- else if (i+1 < argc)
- tainted_selectstr = argv[++i];
+ if (switchchar == 'R')
+ deliver_selectstring = s;
else
- exim_fail("exim: string expected after -S\n");
- deliver_selectstring_sender = string_copy_taint(
- exim_str_fail_toolong(tainted_selectstr, EXIM_EMAILADDR_MAX, "-S"),
- GET_TAINTED);
+ deliver_selectstring_sender = s;
}
break;
+
+ /* -r: an obsolete synonym for -f (see above) */
+
/* -Tqt is an option that is exclusively for use by the testing suite.
It is not recognized in other circumstances. It allows for the setting up
of explicit "queue times" so that various warning/retry things can be
@@ -3777,9 +3789,8 @@ on the second character (the one after '-'), to save some effort. */
/* If -R or -S have been specified without -q, assume a single queue run. */
- if ( (deliver_selectstring || deliver_selectstring_sender)
- && queue_interval < 0)
- queue_interval = 0;
+ if ((deliver_selectstring || deliver_selectstring_sender) && !qrunners)
+ alloc_onetime_qrunner();
END_ARG:
@@ -3791,22 +3802,22 @@ if (usage_wanted) exim_usage(called_as);
/* Arguments have been processed. Check for incompatibilities. */
if ( ( (smtp_input || extract_recipients || recipients_arg < argc)
- && ( f.daemon_listen || queue_interval >= 0 || bi_option
+ && ( f.daemon_listen || qrunners || bi_option
|| test_retry_arg >= 0 || test_rewrite_arg >= 0
|| filter_test != FTEST_NONE
|| msg_action_arg > 0 && !one_msg_action
) )
|| ( msg_action_arg > 0
- && ( f.daemon_listen || queue_interval > 0 || list_options
+ && ( f.daemon_listen || is_multiple_qrun() || list_options
|| checking && msg_action != MSG_LOAD
|| bi_option || test_retry_arg >= 0 || test_rewrite_arg >= 0
) )
- || ( (f.daemon_listen || queue_interval > 0)
+ || ( (f.daemon_listen || is_multiple_qrun())
&& ( sender_address || list_options || list_queue || checking
|| bi_option
) )
- || f.daemon_listen && queue_interval == 0
- || f.inetd_wait_mode && queue_interval >= 0
+ || f.daemon_listen && is_onetime_qrun()
+ || f.inetd_wait_mode && qrunners
|| ( list_options
&& ( checking || smtp_input || extract_recipients
|| filter_test != FTEST_NONE || bi_option
@@ -3822,7 +3833,7 @@ if ( ( (smtp_input || extract_recipients || recipients_arg < argc)
|| ( smtp_input
&& (sender_address || filter_test != FTEST_NONE || extract_recipients)
)
- || deliver_selectstring && queue_interval < 0
+ || deliver_selectstring && !qrunners
|| msg_action == MSG_LOAD && (!expansion_test || expansion_test_message)
)
exim_fail("exim: incompatible command-line options or arguments\n");
@@ -4444,7 +4455,7 @@ if (!f.admin_user)
if ( deliver_give_up || f.daemon_listen || malware_test_file
|| count_queue && queue_list_requires_admin
|| list_queue && queue_list_requires_admin
- || queue_interval >= 0 && prod_requires_admin
+ || qrunners && prod_requires_admin
|| queue_name_dest && prod_requires_admin
|| debugset && !f.running_in_test_harness
)
@@ -4460,7 +4471,7 @@ regression testing. */
if ( real_uid != root_uid && real_uid != exim_uid
&& ( continue_hostname
|| ( f.dont_deliver
- && (queue_interval >= 0 || f.daemon_listen || msg_action_arg > 0)
+ && (qrunners || f.daemon_listen || msg_action_arg > 0)
) )
&& !f.running_in_test_harness
)
@@ -4577,11 +4588,11 @@ to the state Exim usually runs in. */
if ( !unprivileged /* originally had root AND */
&& !removed_privilege /* still got root AND */
&& !f.daemon_listen /* not starting the daemon */
- && queue_interval <= 0 /* (either kind of daemon) */
+ && (!qrunners || is_onetime_qrun()) /* (either kind of daemon) */
&& ( /* AND EITHER */
deliver_drop_privilege /* requested unprivileged */
|| ( /* OR */
- queue_interval < 0 /* not running the queue */
+ !qrunners /* not running the queue */
&& ( msg_action_arg < 0 /* and */
|| msg_action != MSG_DELIVER /* not delivering */
) /* and */
@@ -4696,7 +4707,7 @@ if (msg_action_arg > 0 && msg_action != MSG_DELIVER && msg_action != MSG_LOAD)
}
/* We used to set up here to skip reading the ACL section, on
- (msg_action_arg > 0 || (queue_interval == 0 && !f.daemon_listen)
+ (msg_action_arg > 0 || (is_onetime_qrun() && !f.daemon_listen)
Now, since the intro of the ${acl } expansion, ACL definitions may be
needed in transports so we lost the optimisation. */
@@ -4947,18 +4958,9 @@ if (msg_action_arg > 0 && msg_action != MSG_LOAD)
/* If only a single queue run is requested, without SMTP listening, we can just
turn into a queue runner, with an optional starting message id. */
-if (queue_interval == 0 && !f.daemon_listen)
+if (is_onetime_qrun() && !f.daemon_listen)
{
- DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n",
- start_queue_run_id ? US" starting at " : US"",
- start_queue_run_id ? start_queue_run_id: US"",
- stop_queue_run_id ? US" stopping at " : US"",
- stop_queue_run_id ? stop_queue_run_id : US"");
- if (*queue_name)
- set_process_info("running the '%s' queue (single queue run)", queue_name);
- else
- set_process_info("running the queue (single queue run)");
- queue_run(start_queue_run_id, stop_queue_run_id, FALSE);
+ single_queue_run(qrunners, start_queue_run_id, stop_queue_run_id);
exim_exit(EXIT_SUCCESS);
}
@@ -5083,7 +5085,7 @@ returns. We leave this till here so that the originator_ fields are available
for incoming messages via the daemon. The daemon cannot be run in mua_wrapper
mode. */
-if (f.daemon_listen || f.inetd_wait_mode || queue_interval > 0)
+if (f.daemon_listen || f.inetd_wait_mode || is_multiple_qrun())
{
if (mua_wrapper)
{
diff --git a/src/src/functions.h b/src/src/functions.h
index 961db2dc0..37f0a57bc 100644
--- a/src/src/functions.h
+++ b/src/src/functions.h
@@ -411,7 +411,7 @@ extern void queue_list(int, uschar **, int);
#ifndef DISABLE_QUEUE_RAMP
extern void queue_notify_daemon(const uschar * hostname);
#endif
-extern void queue_run(uschar *, uschar *, BOOL);
+extern void queue_run(qrunner *, uschar *, uschar *, BOOL);
extern int random_number(int);
extern const uschar *rc_to_string(int);
@@ -498,6 +498,7 @@ extern int sieve_interpret(const uschar *, int, const uschar *,
const uschar *, const uschar *, const uschar *,
address_item **, uschar **);
extern void sigalrm_handler(int);
+extern void single_queue_run(qrunner *, uschar *, uschar *);
extern int smtp_boundsock(smtp_connect_args *);
extern void smtp_closedown(uschar *);
extern void smtp_command_timeout_exit(void) NORETURN;
@@ -1368,6 +1369,22 @@ int res;
return !s || !*s || (res = Uatoi(s)) == 0 ? UNLIMITED_ADDRS : res;
}
+/******************************************************************************/
+/* Queue-runner operations */
+
+static inline BOOL
+is_onetime_qrun(void)
+{
+return qrunners && !qrunners->next && qrunners->interval == 0;
+}
+
+static inline BOOL
+is_multiple_qrun(void)
+{
+return qrunners && (qrunners->interval > 0 || qrunners->next);
+}
+
+
# endif /* !COMPILE_UTILITY */
/******************************************************************************/
diff --git a/src/src/globals.c b/src/src/globals.c
index 7af345465..a4b2c6a9c 100644
--- a/src/src/globals.c
+++ b/src/src/globals.c
@@ -291,8 +291,6 @@ struct global_flags f =
.queue_2stage = FALSE,
.queue_only_policy = FALSE,
- .queue_run_first_delivery = FALSE,
- .queue_run_force = FALSE,
.queue_run_local = FALSE,
.queue_running = FALSE,
.queue_smtp = FALSE,
@@ -1248,6 +1246,8 @@ uschar *prvscheck_keynum = NULL;
uschar *prvscheck_result = NULL;
+qrunner *qrunners = NULL;
+
const uschar *qualify_domain_recipient = NULL;
uschar *qualify_domain_sender = NULL;
uschar *queue_domains = NULL;
diff --git a/src/src/globals.h b/src/src/globals.h
index f2e147670..914e2d0f9 100644
--- a/src/src/globals.h
+++ b/src/src/globals.h
@@ -257,8 +257,6 @@ extern struct global_flags {
BOOL queue_2stage :1; /* Run queue in 2-stage manner */
BOOL queue_only_policy :1; /* ACL or local_scan wants queue_only */
- BOOL queue_run_first_delivery :1; /* If TRUE, first deliveries only */
- BOOL queue_run_force :1; /* TRUE to force during queue run */
BOOL queue_run_local :1; /* Local deliveries only in queue run */
BOOL queue_running :1; /* TRUE for queue running process and */
BOOL queue_smtp :1; /* Disable all immediate SMTP (-odqs)*/
@@ -837,6 +835,8 @@ extern uschar *prvscheck_address; /* Set during prvscheck expansion item */
extern uschar *prvscheck_keynum; /* Set during prvscheck expansion item */
extern uschar *prvscheck_result; /* Set during prvscheck expansion item */
+extern qrunner *qrunners; /* tracking data for queues */
+
extern const uschar *qualify_domain_recipient; /* Domain to qualify recipients with */
extern uschar *qualify_domain_sender; /* Domain to qualify senders with */
extern uschar *queue_domains; /* Queue these domains */
diff --git a/src/src/macros.h b/src/src/macros.h
index 585067fc9..3b0293b97 100644
--- a/src/src/macros.h
+++ b/src/src/macros.h
@@ -1113,9 +1113,9 @@ should not be one active. */
#define NOTIFIER_SOCKET_NAME "exim_daemon_notify"
/* Notify message types */
-#define NOTIFY_MSG_QRUN 1
-#define NOTIFY_QUEUE_SIZE_REQ 2
-#define NOTIFY_REGEX 3
+#define NOTIFY_MSG_QRUN 1 /* 2stage qrun fast-ramp trigger */
+#define NOTIFY_QUEUE_SIZE_REQ 2 /* obtain current queue count */
+#define NOTIFY_REGEX 3 /* an RE for caching */
/* Flags for match_check_string() */
typedef unsigned mcs_flags;
diff --git a/src/src/queue.c b/src/src/queue.c
index f86e24b42..d01cde655 100644
--- a/src/src/queue.c
+++ b/src/src/queue.c
@@ -325,8 +325,8 @@ previous lexically lesser one if the given stop message doesn't exist. Because
a queue run can take some time, stat each file before forking, in case it has
been delivered in the meantime by some other means.
-The global variables queue_run_force and queue_run_local may be set to cause
-forced deliveries or local-only deliveries, respectively.
+The qrun descriptor variables queue_run_force and queue_run_local may be set to
+cause forced deliveries or local-only deliveries, respectively.
If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do
not contain the string. As this option is typically used when a machine comes
@@ -339,6 +339,7 @@ is set so that routing is done for all messages. Thus in the second run those
that are routed to the same host should go down the same SMTP connection.
Arguments:
+ q queue-runner descriptor
start_id message id to start at, or NULL for all
stop_id message id to end at, or NULL for all
recurse TRUE if recursing for 2-stage run
@@ -347,10 +348,10 @@ Returns: nothing
*/
void
-queue_run(uschar *start_id, uschar *stop_id, BOOL recurse)
+queue_run(qrunner * q, uschar * start_id, uschar * stop_id, BOOL recurse)
{
-BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL ||
- deliver_selectstring_sender != NULL;
+BOOL force_delivery = q->queue_run_force
+ || deliver_selectstring || deliver_selectstring_sender;
const pcre2_code *selectstring_regex = NULL;
const pcre2_code *selectstring_regex_sender = NULL;
uschar *log_detail = NULL;
@@ -363,6 +364,13 @@ BOOL single_id = FALSE;
report_time_since(&timestamp_startup, US"queue_run start");
#endif
+/* Copy the legacy globals from the newer per-qrunner-desc */
+
+queue_name = q->name ? q->name : US"";
+f.queue_2stage = q->queue_2stage;
+f.deliver_force_thaw = q->deliver_force_thaw;
+f.queue_run_local = q->queue_run_local;
+
/* Cancel any specific queue domains. Turn off the flag that causes SMTP
deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
gets set. Save the queue_runner's pid and the flag that indicates any
@@ -371,7 +379,7 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */
queue_domains = NULL;
queue_smtp_domains = NULL;
-f.queue_smtp = f.queue_2stage;
+f.queue_smtp = q->queue_2stage;
queue_run_pid = getpid();
f.queue_running = TRUE;
@@ -383,11 +391,11 @@ if (!recurse)
uschar extras[8];
uschar *p = extras;
- if (f.queue_2stage) *p++ = 'q';
- if (f.queue_run_first_delivery) *p++ = 'i';
- if (f.queue_run_force) *p++ = 'f';
- if (f.deliver_force_thaw) *p++ = 'f';
- if (f.queue_run_local) *p++ = 'l';
+ if (q->queue_2stage) *p++ = 'q';
+ if (q->queue_run_first_delivery) *p++ = 'i';
+ if (q->queue_run_force) *p++ = 'f';
+ if (q->deliver_force_thaw) *p++ = 'f';
+ if (q->queue_run_local) *p++ = 'l';
*p = 0;
p = big_buffer;
@@ -399,25 +407,25 @@ if (!recurse)
if (deliver_selectstring)
{
snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
- f.deliver_selectstring_regex? "r" : "", deliver_selectstring);
+ f.deliver_selectstring_regex ? "r" : "", deliver_selectstring);
p += Ustrlen(CCS p);
}
if (deliver_selectstring_sender)
{
snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
- f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender);
+ f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender);
p += Ustrlen(CCS p);
}
log_detail = string_copy(big_buffer);
- if (*queue_name)
+ if (q->name)
log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s",
- queue_name, log_detail);
+ q->name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
- single_id = start_id && stop_id && !f.queue_2stage
+ single_id = start_id && stop_id && !q->queue_2stage
&& Ustrcmp(start_id, stop_id) == 0;
}
@@ -474,7 +482,7 @@ for (int i = queue_run_in_order ? -1 : 0;
/* Unless deliveries are forced, if deliver_queue_load_max is non-negative,
check that the load average is low enough to permit deliveries. */
- if (!f.queue_run_force && deliver_queue_load_max >= 0)
+ if (!q->queue_run_force && deliver_queue_load_max >= 0)
if ((load_average = os_getloadavg()) > deliver_queue_load_max)
{
log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)",
@@ -492,7 +500,7 @@ for (int i = queue_run_in_order ? -1 : 0;
/* If initial of a 2-phase run, maintain a set of child procs
to get disk parallelism */
- if (f.queue_2stage && !queue_run_in_order)
+ if (q->queue_2stage && !queue_run_in_order)
{
int i;
if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
@@ -530,7 +538,7 @@ for (int i = queue_run_in_order ? -1 : 0;
message when many are not going to be delivered. */
if (deliver_selectstring || deliver_selectstring_sender ||
- f.queue_run_first_delivery)
+ q->queue_run_first_delivery)
{
BOOL wanted = TRUE;
BOOL orig_dont_deliver = f.dont_deliver;
@@ -548,7 +556,7 @@ for (int i = queue_run_in_order ? -1 : 0;
header file, we might as well do the freeze test now, and save forking
another process. */
- if (f.deliver_freeze && !f.deliver_force_thaw)
+ if (f.deliver_freeze && !q->deliver_force_thaw)
{
log_write(L_skip_delivery, LOG_MAIN, "Message is frozen");
wanted = FALSE;
@@ -556,7 +564,7 @@ for (int i = queue_run_in_order ? -1 : 0;
/* Check first_delivery in the case when there are no message logs. */
- else if (f.queue_run_first_delivery && !f.deliver_firsttime)
+ else if (q->queue_run_first_delivery && !f.deliver_firsttime)
{
DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text);
wanted = FALSE;
@@ -688,7 +696,7 @@ single_item_retry:
/* A zero return means a delivery was attempted; turn off the force flag
for any subsequent calls unless queue_force is set. */
- if (!(status & 0xffff)) force_delivery = f.queue_run_force;
+ if (!(status & 0xffff)) force_delivery = q->queue_run_force;
/* If the process crashed, tell somebody */
@@ -723,13 +731,13 @@ single_item_retry:
set_process_info("running queue");
/* If initial of a 2-phase run, we are a child - so just exit */
- if (f.queue_2stage && !queue_run_in_order)
+ if (q->queue_2stage && !queue_run_in_order)
exim_exit(EXIT_SUCCESS);
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
- if (f.running_in_test_harness && !f.queue_2stage)
+ if (f.running_in_test_harness && !q->queue_2stage)
{
uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
if (fqtnext) fudged_queue_times = fqtnext + 1;
@@ -740,7 +748,7 @@ single_item_retry:
go_around:
/* If initial of a 2-phase run, we are a child - so just exit */
- if (f.queue_2stage && !queue_run_in_order)
+ if (q->queue_2stage && !queue_run_in_order)
exim_exit(EXIT_SUCCESS);
} /* End loop for list of messages */
@@ -767,7 +775,7 @@ single_item_retry:
/* If queue_2stage is true, we do it all again, with the 2stage flag
turned off. */
-if (f.queue_2stage)
+if (q->queue_2stage)
{
/* wait for last children */
@@ -782,22 +790,40 @@ if (f.queue_2stage)
#ifdef MEASURE_TIMING
report_time_since(&timestamp_startup, US"queue_run 1st phase done");
#endif
- f.queue_2stage = FALSE;
- queue_run(start_id, stop_id, TRUE);
+ q->queue_2stage = f.queue_2stage = FALSE;
+ queue_run(q, start_id, stop_id, TRUE);
}
/* At top level, log the end of the run. */
if (!recurse)
- if (*queue_name)
+ if (q->name)
log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s",
- queue_name, log_detail);
+ q->name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail);
}
+void
+single_queue_run(qrunner * q, uschar * start_id, uschar * stop_id)
+{
+DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n",
+ start_id ? US" starting at " : US"",
+ start_id ? start_id: US"",
+ stop_id ? US" stopping at " : US"",
+ stop_id ? stop_id : US"");
+
+if (*queue_name)
+ set_process_info("running the '%s' queue (single queue run)", queue_name);
+else
+ set_process_info("running the queue (single queue run)");
+queue_run(q, start_id, stop_id, FALSE);
+}
+
+
+
/************************************************
* Count messages on the queue *
@@ -1552,20 +1578,22 @@ if (s)
void
queue_notify_daemon(const uschar * msgid)
{
-uschar buf[MESSAGE_ID_LENGTH + 2];
+int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1;
+uschar * buf = store_get(bsize, GET_UNTAINTED);
int fd;
DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
buf[0] = NOTIFY_MSG_QRUN;
memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name);
if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
{
struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
ssize_t len = daemon_notifier_sockname(&sa_un);
- if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
+ if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
DEBUG(D_queue_run)
debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
close(fd);
diff --git a/src/src/structs.h b/src/src/structs.h
index eae66e88d..3f237fce5 100644
--- a/src/src/structs.h
+++ b/src/src/structs.h
@@ -958,4 +958,22 @@ struct ob_dkim {
#endif
};
+
+/* per-queue-runner info */
+typedef struct qrunner {
+ struct qrunner * next; /* list sorted by next tick */
+
+ uschar * name; /* NULL for the default queue */
+ unsigned interval; /* tick rate, seconds */
+ time_t next_tick; /* next run should, or should have, start(ed) */
+ unsigned run_max; /* concurrent queue runner limit */
+ unsigned run_count; /* current runners */
+
+ BOOL queue_run_force :1;
+ BOOL deliver_force_thaw :1;
+ BOOL queue_run_first_delivery :1;
+ BOOL queue_run_local :1;
+ BOOL queue_2stage :1;
+} qrunner;
+
/* End of structs.h */
diff --git a/test/scripts/0000-Basic/0576 b/test/scripts/0000-Basic/0576
index 5d6e8fc21..90d87c927 100644
--- a/test/scripts/0000-Basic/0576
+++ b/test/scripts/0000-Basic/0576
@@ -104,8 +104,10 @@ exim -bp
exim -bp -qGalternate
****
#
+### move msg from default to third q
exim -MG third $msg1
****
+### move msg from alternate q to third q
exim -qGalternate -MG third $msg1
****
### third q
diff --git a/test/stdout/0576 b/test/stdout/0576
index 48e29b761..48cc6fd71 100644
--- a/test/stdout/0576
+++ b/test/stdout/0576
@@ -53,7 +53,9 @@
0m sss 10HmbC-0005vi-00 <CALLER@the.local.host.name>
alternate@test.ex
+### move msg from default to third q
Message 10HmbB-0005vi-00
+### move msg from alternate q to third q
Message 10HmbC-0005vi-00
### third q
0m sss 10HmbB-0005vi-00 <CALLER@the.local.host.name>
@@ -79,6 +81,8 @@ Message 10HmbB-0005vi-00 Message 10HmbC-0005vi-00
### load messages
### default q
### alternate q
+### move msg from default to third q
+### move msg from alternate q to third q
### third q
### default q
### alternate q