Mailing List Archive

POC: Allowing ap_process_connection() to return EAGAIN
Hi all,

I’ve put together a proof of concept as to how ap_process_connection() might be able to return EAGAIN (or AGAIN in this case).

The idea is that ap_process_connection() can return AGAIN at any time, and if so, we’ll jump ahead to where we left off and run the hook again. This way the MPMs aren’t obliged to swallow a whole request before returning.

This doesn’t yet work, it’s just to illustrate the idea.

Regards,
Graham
--

Index: include/ap_mpm.h
===================================================================
--- include/ap_mpm.h (revision 1874370)
+++ include/ap_mpm.h (working copy)
@@ -182,6 +182,8 @@
#define AP_MPMQ_CAN_SUSPEND 17
/** MPM supports additional pollfds */
#define AP_MPMQ_CAN_POLL 18
+/** MPM supports EAGAIN */
+#define AP_MPMQ_CAN_AGAIN 19
/** @} */

/**
Index: include/http_connection.h
===================================================================
--- include/http_connection.h (revision 1874370)
+++ include/http_connection.h (working copy)
@@ -40,8 +40,11 @@
* @param csd The mechanism on which this connection is to be read.
* Most times this will be a socket, but it is up to the module
* that accepts the request to determine the exact type.
+ * @return OK if processing is complete, SUSPENDED if processing
+ * should be suspended and retried at a later time, and
+ * AGAIN if processing should be retried when data is available.
*/
-AP_CORE_DECLARE(void) ap_process_connection(conn_rec *c, void *csd);
+AP_CORE_DECLARE(int) ap_process_connection(conn_rec *c, void *csd);

/**
* Shutdown the connection for writing.
@@ -109,7 +112,7 @@
* @param csd The mechanism on which this connection is to be read.
* Most times this will be a socket, but it is up to the module
* that accepts the request to determine the exact type.
- * @return OK or DECLINED
+ * @return OK, DECLINED, SUSPENDED or AGAIN.
*/
AP_DECLARE_HOOK(int,pre_connection,(conn_rec *c, void *csd))

@@ -119,7 +122,7 @@
* function does that for each protocol module. The first protocol module
* to handle the request is the last module run.
* @param c The connection on which the request has been received.
- * @return OK or DECLINED
+ * @return OK, DECLINED, SUSPENDED or AGAIN.
*/
AP_DECLARE_HOOK(int,process_connection,(conn_rec *c))

Index: include/httpd.h
===================================================================
--- include/httpd.h (revision 1874370)
+++ include/httpd.h (working copy)
@@ -464,6 +464,9 @@
*/
#define SUSPENDED -3 /**< Module will handle the remainder of the request.
* The core will never invoke the request again, */
+#define AGAIN -4 /**< Module has not finished handling the
+ * stage and wants to be called again
+ */

/** Returned by the bottom-most filter if no data was written.
* @see ap_pass_brigade(). */
Index: server/connection.c
===================================================================
--- server/connection.c (revision 1874370)
+++ server/connection.c (working copy)
@@ -30,6 +30,8 @@
#include "http_log.h"
#include "util_filter.h"

+#include "core.h"
+
APR_HOOK_STRUCT(
APR_HOOK_LINK(create_connection)
APR_HOOK_LINK(process_connection)
@@ -205,17 +207,38 @@
apr_socket_close(csd);
}

-AP_CORE_DECLARE(void) ap_process_connection(conn_rec *c, void *csd)
+AP_CORE_DECLARE(int) ap_process_connection(conn_rec *c, void *csd)
{
- int rc;
+ int rc = OK;
+ conn_config_t *conn_config = ap_get_core_module_config(c->conn_config);
+
+switch (conn_config->process_connection_st) {
+case STATE_UPDATE_VHOST_GIVEN_IP:
+
ap_update_vhost_given_ip(c);

+conn_config->process_connection_st = STATE_RUN_PRE_CONNECTION;
+case STATE_RUN_PRE_CONNECTION:
+
rc = ap_run_pre_connection(c, csd);
+ if (rc == SUSPENDED || rc == AGAIN) {
+ return rc;
+ }
if (rc != OK && rc != DONE) {
c->aborted = 1;
}

if (!c->aborted) {
- ap_run_process_connection(c);
+
+conn_config->process_connection_st = STATE_RUN_PROCESS_CONNECTION;
+case STATE_RUN_PROCESS_CONNECTION:
+
+ rc = ap_run_process_connection(c);
+
}
+
+}; /* end of switch */
+
+ return rc;
}
+
Index: server/core.h
===================================================================
--- server/core.h (revision 1874370)
+++ server/core.h (working copy)
@@ -26,11 +26,22 @@
#define CORE_H

/**
+ * @brief States for the ap_process_connection function
+ */
+typedef enum __attribute__((__packed__)) ap_process_connection_e {
+ STATE_UPDATE_VHOST_GIVEN_IP = 0,
+ STATE_RUN_PRE_CONNECTION,
+ STATE_RUN_PROCESS_CONNECTION
+} ap_process_connection_e;
+
+/**
* @brief A structure to contain connection state information
*/
typedef struct conn_config_t {
/** Socket belonging to the connection */
apr_socket_t *socket;
+ /** State of ap_process_connection() */
+ ap_process_connection_e process_connection_st;
} conn_config_t;

/**
Index: server/mpm/event/event.c
===================================================================
--- server/mpm/event/event.c (revision 1874370)
+++ server/mpm/event/event.c (working copy)
@@ -221,6 +221,16 @@
typedef struct event_conn_state_t event_conn_state_t;

/*
+ * States for the process_socket function
+ */
+typedef enum __attribute__((__packed__)) process_socket_e {
+ STATE_PROCESS_SOCKET = 0,
+ STATE_UPDATE_VHOST_GIVEN_IP,
+ STATE_RUN_PRE_CONNECTION,
+ STATE_RUN_PROCESS_CONNECTION
+} process_socket_e;
+
+/*
* The chain of connections to be shutdown by a worker thread (deferred),
* linked list updated atomically.
*/
@@ -254,6 +264,8 @@
conn_state_t pub;
/** chaining in defer_linger_chain */
struct event_conn_state_t *chain;
+ /** state of process_socket() */
+ process_socket_e process_socket_st;
};

APR_RING_HEAD(timeout_head_t, event_conn_state_t);
@@ -725,6 +737,9 @@
case AP_MPMQ_CAN_POLL:
*result = 1;
break;
+ case AP_MPMQ_CAN_AGAIN:
+ *result = 1;
+ break;
default:
*rv = APR_ENOTIMPL;
break;
@@ -997,6 +1012,9 @@
apr_status_t rv;
int rc = OK;

+switch (cs->process_socket_st) {
+case STATE_PROCESS_SOCKET:
+
if (cs == NULL) { /* This is a new connection */
listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
cs = apr_pcalloc(p, sizeof(event_conn_state_t));
@@ -1027,9 +1045,18 @@
apr_pool_pre_cleanup_register(p, cs, ptrans_pre_cleanup);
TO_QUEUE_ELEM_INIT(cs);

+cs->process_socket_st = STATE_UPDATE_VHOST_GIVEN_IP;
+case STATE_UPDATE_VHOST_GIVEN_IP:
+
ap_update_vhost_given_ip(c);

+cs->process_socket_st = STATE_RUN_PRE_CONNECTION;
+case STATE_RUN_PRE_CONNECTION:
+
rc = ap_run_pre_connection(c, sock);
+ if (rc == AGAIN) {
+ return;
+ }
if (rc != OK && rc != DONE) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(00469)
"process_socket: connection aborted");
@@ -1084,7 +1111,13 @@
if (clogging) {
apr_atomic_inc32(&clogged_count);
}
+
+cs->process_socket_st = STATE_RUN_PROCESS_CONNECTION;
+case STATE_RUN_PROCESS_CONNECTION:
rc = ap_run_process_connection(c);
+ if (rc == AGAIN) {
+ return;
+ }
if (clogging) {
apr_atomic_dec32(&clogged_count);
}
@@ -1239,6 +1272,7 @@
else {
apr_thread_mutex_unlock(timeout_mutex);
}
+cs->process_socket_st = STATE_PROCESS_SOCKET;
return;
}

@@ -1246,6 +1280,7 @@
cs->c->suspended_baton = cs;
apr_atomic_inc32(&suspended_count);
notify_suspend(cs);
+cs->process_socket_st = STATE_PROCESS_SOCKET;
return;
}

@@ -1256,6 +1291,10 @@
cs->pub.state == CONN_STATE_LINGER_SHORT)) {
process_lingering_close(cs);
}
+
+}; /* end of switch */
+
+cs->process_socket_st = STATE_PROCESS_SOCKET;
}

/* Put a SUSPENDED connection back into a queue. */