// Copyright (c) 2004-2010 Sergey Lyubka // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #define _GNU_SOURCE 1 #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef BUFSIZ #define BUFSIZ 4096 #endif #define MAX_REQUEST_SIZE 4096 #define NUM_THREADS 4 #include #include #include #include #include #include #include #include #include #include #include #define ERRNO errno #define INVALID_SOCKET (-1) typedef int SOCKET; #include "mongoose.h" #define MONGOOSE_VERSION "3.0" #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #if defined(DEBUG) #define DEBUG_TRACE(x) do { \ flockfile(stdout); \ printf("*** %lu.%p.%s.%d: ", \ (unsigned long) time(NULL), (void *) pthread_self(), \ __func__, __LINE__); \ printf x; \ putchar('\n'); \ fflush(stdout); \ funlockfile(stdout); \ } while (0) #else #define DEBUG_TRACE(x) #endif // DEBUG typedef void * (*mg_thread_func_t)(void *); // Describes a socket which was accept()-ed by the master thread and queued for // future handling by the worker thread. struct socket { SOCKET sock; // Listening socket struct sockaddr_in local_addr; // Local socket address struct sockaddr_in remote_addr; // Remote socket address }; struct mg_context { volatile int stop_flag; // Should we stop event loop mg_callback_t user_callback; // User-defined callback function void *user_data; // User-defined data SOCKET local_socket; struct sockaddr_in local_address; volatile int num_threads; // Number of threads pthread_mutex_t mutex; // Protects (max|num)_threads pthread_cond_t cond; // Condvar for tracking workers terminations struct socket queue[20]; // Accepted sockets volatile int sq_head; // Head of the socket queue volatile int sq_tail; // Tail of the socket queue pthread_cond_t sq_full; // Singaled when socket is produced pthread_cond_t sq_empty; // Signaled when socket is consumed }; struct mg_connection { struct mg_request_info request_info; struct mg_context *ctx; struct socket client; // Connected client time_t birth_time; // Time connection was accepted int64_t num_bytes_sent; // Total bytes sent to client int64_t content_len; // Content-Length header value int64_t consumed_content; // How many bytes of content is already read char *buf; // Buffer for received data int buf_size; // Buffer size int request_len; // Size of the request + headers in a buffer int data_len; // Total size of data in a buffer }; static void *call_user(struct mg_connection *conn, enum mg_event event) { conn->request_info.user_data = conn->ctx->user_data; return conn->ctx->user_callback == NULL ? NULL : conn->ctx->user_callback(event, conn, &conn->request_info); } // Print error message to the opened error log stream. static void cry(struct mg_connection *conn, const char *fmt, ...) { char buf[BUFSIZ]; va_list ap; va_start(ap, fmt); (void) vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); // Do not lock when getting the callback value, here and below. // I suppose this is fine, since function cannot disappear in the // same way string option can. conn->request_info.log_message = buf; if (call_user(conn, MG_EVENT_LOG) == NULL) { DEBUG_TRACE(("[%s]", buf)); } conn->request_info.log_message = NULL; } // Return fake connection structure. Used for logging, if connection // is not applicable at the moment of logging. static struct mg_connection *fc(struct mg_context *ctx) { static struct mg_connection fake_connection; fake_connection.ctx = ctx; return &fake_connection; } const char *mg_version(void) { return MONGOOSE_VERSION; } static int lowercase(const char *s) { return tolower(* (const unsigned char *) s); } static int mg_strcasecmp(const char *s1, const char *s2) { int diff; do { diff = lowercase(s1++) - lowercase(s2++); } while (diff == 0 && s1[-1] != '\0'); return diff; } // Like snprintf(), but never returns negative value, or the value // that is larger than a supplied buffer. // Thanks to Adam Zeldis to pointing snprintf()-caused vulnerability // in his audit report. static int mg_vsnprintf(struct mg_connection *conn, char *buf, size_t buflen, const char *fmt, va_list ap) { int n; if (buflen == 0) return 0; n = vsnprintf(buf, buflen, fmt, ap); if (n < 0) { cry(conn, "vsnprintf error"); n = 0; } else if (n >= (int) buflen) { cry(conn, "truncating vsnprintf buffer: [%.*s]", n > 200 ? 200 : n, buf); n = (int) buflen - 1; } buf[n] = '\0'; return n; } static int mg_snprintf(struct mg_connection *conn, char *buf, size_t buflen, const char *fmt, ...) { va_list ap; int n; va_start(ap, fmt); n = mg_vsnprintf(conn, buf, buflen, fmt, ap); va_end(ap); return n; } // Skip the characters until one of the delimiters characters found. // 0-terminate resulting word. Skip the delimiter and following whitespaces if any. // Advance pointer to buffer to the next word. Return found 0-terminated word. // Delimiters can be quoted with quotechar. static char *skip_quoted(char **buf, const char *delimiters, const char *whitespace, char quotechar) { char *p, *begin_word, *end_word, *end_whitespace; begin_word = *buf; end_word = begin_word + strcspn(begin_word, delimiters); /* Check for quotechar */ if (end_word > begin_word) { p = end_word - 1; while (*p == quotechar) { /* If there is anything beyond end_word, copy it */ if (*end_word == '\0') { *p = '\0'; break; } else { size_t end_off = strcspn(end_word + 1, delimiters); memmove (p, end_word, end_off + 1); p += end_off; /* p must correspond to end_word - 1 */ end_word += end_off + 1; } } for (p++; p < end_word; p++) { *p = '\0'; } } if (*end_word == '\0') { *buf = end_word; } else { end_whitespace = end_word + 1 + strspn(end_word + 1, whitespace); for (p = end_word; p < end_whitespace; p++) { *p = '\0'; } *buf = end_whitespace; } return begin_word; } // Simplified version of skip_quoted without quote char // and whitespace == delimiters static char *skip(char **buf, const char *delimiters) { return skip_quoted(buf, delimiters, delimiters, 0); } // Return HTTP header value, or NULL if not found. static const char *get_header(const struct mg_request_info *ri, const char *name) { int i; for (i = 0; i < ri->num_headers; i++) if (!mg_strcasecmp(name, ri->http_headers[i].name)) return ri->http_headers[i].value; return NULL; } const char *mg_get_header(const struct mg_connection *conn, const char *name) { return get_header(&conn->request_info, name); } static const char *suggest_connection_header(const struct mg_connection *conn) { return "close"; } void mg_send_http_error(struct mg_connection *conn, int status, const char *reason, const char *fmt, ...) { char buf[BUFSIZ]; va_list ap; int len; conn->request_info.status_code = status; buf[0] = '\0'; len = 0; /* Errors 1xx, 204 and 304 MUST NOT send a body */ if (status > 199 && status != 204 && status != 304) { len = mg_snprintf(conn, buf, sizeof(buf), "Error %d: %s", status, reason); cry(conn, "%s", buf); buf[len++] = '\n'; va_start(ap, fmt); len += mg_vsnprintf(conn, buf + len, sizeof(buf) - len, fmt, ap); va_end(ap); } DEBUG_TRACE(("[%s]", buf)); mg_printf(conn, "HTTP/1.1 %d %s\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "Connection: %s\r\n\r\n", status, reason, len, suggest_connection_header(conn)); conn->num_bytes_sent += mg_printf(conn, "%s", buf); } static int start_thread(struct mg_context *ctx, mg_thread_func_t func, void *param) { pthread_t thread_id; pthread_attr_t attr; int retval; (void) pthread_attr_init(&attr); (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // TODO(lsm): figure out why mongoose dies on Linux if next line is enabled // (void) pthread_attr_setstacksize(&attr, sizeof(struct mg_connection) * 5); if ((retval = pthread_create(&thread_id, &attr, func, param)) != 0) { cry(fc(ctx), "%s: %s", __func__, strerror(retval)); } return retval; } static int set_non_blocking_mode(SOCKET sock) { int flags; flags = fcntl(sock, F_GETFL, 0); (void) fcntl(sock, F_SETFL, flags | O_NONBLOCK); return 0; } // Write data to the IO channel - opened file descriptor, socket or SSL // descriptor. Return number of bytes written. static int64_t push(FILE *fp, SOCKET sock, const char *buf, int64_t len) { int64_t sent; int n, k; sent = 0; while (sent < len) { /* How many bytes we send in this iteration */ k = len - sent > INT_MAX ? INT_MAX : (int) (len - sent); if (fp != NULL) { n = fwrite(buf + sent, 1, (size_t)k, fp); if (ferror(fp)) n = -1; } else { n = send(sock, buf + sent, (size_t)k, 0); } if (n < 0) break; sent += n; } return sent; } // Read from IO channel - opened file descriptor, socket, or SSL descriptor. // Return number of bytes read. static int pull(SOCKET sock, char *buf, int len) { int nread; nread = recv(sock, buf, (size_t) len, 0); return nread; } int mg_read(struct mg_connection *conn, void *buf, size_t len) { int n, buffered_len, nread; const char *buffered; assert((conn->content_len == -1 && conn->consumed_content == 0) || conn->consumed_content <= conn->content_len); DEBUG_TRACE(("%p %zu %" PRId64 " %" PRId64, buf, len, conn->content_len, conn->consumed_content)); nread = 0; if (conn->consumed_content < conn->content_len) { // Adjust number of bytes to read. int64_t to_read = conn->content_len - conn->consumed_content; if (to_read < (int64_t) len) { len = (int) to_read; } // How many bytes of data we have buffered in the request buffer? buffered = conn->buf + conn->request_len + conn->consumed_content; buffered_len = conn->data_len - conn->request_len; assert(buffered_len >= 0); // Return buffered data back if we haven't done that yet. if (conn->consumed_content < (int64_t) buffered_len) { buffered_len -= (int) conn->consumed_content; if (len < (size_t) buffered_len) { buffered_len = len; } memcpy(buf, buffered, (size_t)buffered_len); len -= buffered_len; buf = (char *) buf + buffered_len; conn->consumed_content += buffered_len; nread = buffered_len; } // We have returned all buffered data. Read new data from the remote socket. while (len > 0) { n = pull(conn->client.sock, (char *) buf, (int) len); if (n <= 0) { break; } buf = (char *) buf + n; conn->consumed_content += n; nread += n; len -= n; } } return nread; } int mg_write(struct mg_connection *conn, const void *buf, size_t len) { return (int) push(NULL, conn->client.sock, (const char *) buf, (int64_t) len); } int mg_printf(struct mg_connection *conn, const char *fmt, ...) { char buf[BUFSIZ]; int len; va_list ap; va_start(ap, fmt); len = mg_vsnprintf(conn, buf, sizeof(buf), fmt, ap); va_end(ap); return mg_write(conn, buf, (size_t)len); } // URL-decode input buffer into destination buffer. // 0-terminate the destination buffer. Return the length of decoded data. // form-url-encoded data differs from URI encoding in a way that it // uses '+' as character for space, see RFC 1866 section 8.2.1 // http://ftp.ics.uci.edu/pub/ietf/html/rfc1866.txt static size_t url_decode(const char *src, size_t src_len, char *dst, size_t dst_len, int is_form_url_encoded) { size_t i, j; int a, b; #define HEXTOI(x) (isdigit(x) ? x - '0' : x - 'W') for (i = j = 0; i < src_len && j < dst_len - 1; i++, j++) { if (src[i] == '%' && isxdigit(* (const unsigned char *) (src + i + 1)) && isxdigit(* (const unsigned char *) (src + i + 2))) { a = tolower(* (const unsigned char *) (src + i + 1)); b = tolower(* (const unsigned char *) (src + i + 2)); dst[j] = (char) ((HEXTOI(a) << 4) | HEXTOI(b)); i += 2; } else if (is_form_url_encoded && src[i] == '+') { dst[j] = ' '; } else { dst[j] = src[i]; } } dst[j] = '\0'; /* Null-terminate the destination */ return j; } // Check whether full request is buffered. Return: // -1 if request is malformed // 0 if request is not yet fully buffered // >0 actual request length, including last \r\n\r\n static int get_request_len(const char *buf, int buflen) { const char *s, *e; int len = 0; DEBUG_TRACE(("buf: %p, len: %d", buf, buflen)); for (s = buf, e = s + buflen - 1; len <= 0 && s < e; s++) // Control characters are not allowed but >=128 is. if (!isprint(* (const unsigned char *) s) && *s != '\r' && *s != '\n' && * (const unsigned char *) s < 128) { len = -1; } else if (s[0] == '\n' && s[1] == '\n') { len = (int) (s - buf) + 2; } else if (s[0] == '\n' && &s[1] < e && s[1] == '\r' && s[2] == '\n') { len = (int) (s - buf) + 3; } return len; } // Protect against directory disclosure attack by removing '..', // excessive '/' and '\' characters static void remove_double_dots_and_double_slashes(char *s) { char *p = s; while (*s != '\0') { *p++ = *s++; if (s[-1] == '/' || s[-1] == '\\') { // Skip all following slashes and backslashes while (*s == '/' || *s == '\\') { s++; } // Skip all double-dots while (*s == '.' && s[1] == '.') { s += 2; } } } *p = '\0'; } // Parse HTTP headers from the given buffer, advance buffer to the point // where parsing stopped. static void parse_http_headers(char **buf, struct mg_request_info *ri) { int i; for (i = 0; i < (int) ARRAY_SIZE(ri->http_headers); i++) { ri->http_headers[i].name = skip_quoted(buf, ":", " ", 0); ri->http_headers[i].value = skip(buf, "\r\n"); if (ri->http_headers[i].name[0] == '\0') break; ri->num_headers = i + 1; } } static int is_valid_http_method(const char *method) { fprintf(stderr, "Received HTTP method %s\n", method); return !strcmp(method, "GET") || !strcmp(method, "POST") || !strcmp(method, "DELETE") || !strcmp(method, "OPTIONS"); } // Parse HTTP request, fill in mg_request_info structure. static int parse_http_request(char *buf, struct mg_request_info *ri) { int status = 0; // RFC says that all initial whitespaces should be ingored while (*buf != '\0' && isspace(* (unsigned char *) buf)) { buf++; } ri->request_method = skip(&buf, " "); ri->uri = skip(&buf, " "); ri->http_version = skip(&buf, "\r\n"); if (is_valid_http_method(ri->request_method) && strncmp(ri->http_version, "HTTP/", 5) == 0) { ri->http_version += 5; /* Skip "HTTP/" */ parse_http_headers(&buf, ri); status = 1; } return status; } // Keep reading the input from socket sock // into buffer buf, until \r\n\r\n appears in the buffer (which marks the end // of HTTP request). Buffer buf may already have some data. The length of the // data is stored in nread. Upon every read operation, increase nread by the // number of bytes read. static int read_request(SOCKET sock, char *buf, int bufsiz, int *nread) { int n, request_len; request_len = 0; while (*nread < bufsiz && request_len == 0) { n = pull(sock, buf + *nread, bufsiz - *nread); if (n <= 0) { break; } else { *nread += n; request_len = get_request_len(buf, *nread); } } return request_len; } // This is the heart of the Mongoose's logic. // This function is called when the request is read, parsed and validated, // and Mongoose must decide what action to take: serve a file, or // a directory, or call embedded function, etcetera. static void handle_request(struct mg_connection *conn) { struct mg_request_info *ri = &conn->request_info; int uri_len; if ((conn->request_info.query_string = strchr(ri->uri, '?')) != NULL) { * conn->request_info.query_string++ = '\0'; } uri_len = strlen(ri->uri); (void) url_decode(ri->uri, (size_t)uri_len, ri->uri, (size_t)(uri_len + 1), 0); remove_double_dots_and_double_slashes(ri->uri); DEBUG_TRACE(("%s", ri->uri)); if (call_user(conn, MG_NEW_REQUEST) == NULL) { mg_send_http_error(conn, 404, "Not Found", "%s", "File not found"); } } static void close_all_listening_sockets(struct mg_context *ctx) { (void) close(ctx->local_socket); } // only reports address of the first listening socket int mg_get_listen_addr(struct mg_context *ctx, struct sockaddr *addr, socklen_t *addrlen) { size_t len = sizeof(ctx->local_address); if (*addrlen < len) return 0; *addrlen = len; memcpy(addr, &ctx->local_address, len); return 1; } static int set_ports_option(struct mg_context *ctx, int port) { int reuseaddr = 1, success = 1; socklen_t sock_len = sizeof(ctx->local_address); // MacOS needs that. If we do not zero it, subsequent bind() will fail. memset(&ctx->local_address, 0, sock_len); ctx->local_address.sin_family = AF_INET; ctx->local_address.sin_port = htons((uint16_t) port); ctx->local_address.sin_addr.s_addr = htonl(INADDR_ANY); struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 500 * 1000; if ((ctx->local_socket = socket(PF_INET, SOCK_STREAM, 6)) == INVALID_SOCKET || setsockopt(ctx->local_socket, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr)) != 0 || setsockopt(ctx->local_socket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) != 0 || bind(ctx->local_socket, (const struct sockaddr *) &ctx->local_address, sock_len) != 0 || // TODO(steineldar): Replace 20 (max socket backlog len in connections). listen(ctx->local_socket, 20) != 0) { close(ctx->local_socket); cry(fc(ctx), "%s: cannot bind to port %d: %s", __func__, port, strerror(ERRNO)); success = 0; } else if (getsockname(ctx->local_socket, (struct sockaddr *) &ctx->local_address, &sock_len)) { close(ctx->local_socket); cry(fc(ctx), "%s: %s", __func__, strerror(ERRNO)); success = 0; } if (!success) { ctx->local_socket = INVALID_SOCKET; close_all_listening_sockets(ctx); } return success; } static void reset_per_request_attributes(struct mg_connection *conn) { struct mg_request_info *ri = &conn->request_info; ri->request_method = ri->uri = ri->http_version = NULL; ri->num_headers = 0; ri->status_code = -1; conn->num_bytes_sent = conn->consumed_content = 0; conn->content_len = -1; conn->request_len = conn->data_len = 0; } static void close_socket_gracefully(SOCKET sock) { char buf[BUFSIZ]; int n; // Send FIN to the client (void) shutdown(sock, SHUT_WR); set_non_blocking_mode(sock); // Read and discard pending data. If we do not do that and close the // socket, the data in the send buffer may be discarded. This // behaviour is seen on Windows, when client keeps sending data // when server decide to close the connection; then when client // does recv() it gets no data back. do { n = pull(sock, buf, sizeof(buf)); } while (n > 0); // Now we know that our FIN is ACK-ed, safe to close (void) close(sock); } static void close_connection(struct mg_connection *conn) { if (conn->client.sock != INVALID_SOCKET) { close_socket_gracefully(conn->client.sock); } } static void discard_current_request_from_buffer(struct mg_connection *conn) { int buffered_len, body_len; buffered_len = conn->data_len - conn->request_len; assert(buffered_len >= 0); if (conn->content_len == -1) { body_len = 0; } else if (conn->content_len < (int64_t) buffered_len) { body_len = (int) conn->content_len; } else { body_len = buffered_len; } conn->data_len -= conn->request_len + body_len; memmove(conn->buf, conn->buf + conn->request_len + body_len, (size_t) conn->data_len); } static void process_new_connection(struct mg_connection *conn) { struct mg_request_info *ri = &conn->request_info; const char *cl; reset_per_request_attributes(conn); // If next request is not pipelined, read it in if ((conn->request_len = get_request_len(conn->buf, conn->data_len)) == 0) { conn->request_len = read_request(conn->client.sock, conn->buf, conn->buf_size, &conn->data_len); } assert(conn->data_len >= conn->request_len && conn->request_len > 1); if (conn->request_len == 0 && conn->data_len == conn->buf_size) { mg_send_http_error(conn, 413, "Request Too Large", ""); return; } if (conn->request_len <= 0) { return; // Remote end closed the connection } // Nul-terminate the request cause parse_http_request() uses sscanf conn->buf[conn->request_len - 1] = '\0'; if (!parse_http_request(conn->buf, ri)) { // Do not put garbage in the access log, just send it back to the client mg_send_http_error(conn, 400, "Bad Request", "Cannot parse HTTP request: [%.*s]", conn->data_len, conn->buf); } else if (strcmp(ri->http_version, "1.0") && strcmp(ri->http_version, "1.1")) { // Request seems valid, but HTTP version is strange mg_send_http_error(conn, 505, "HTTP version not supported", ""); } else { // Request is valid, handle it cl = get_header(ri, "Content-Length"); conn->content_len = cl == NULL ? -1 : strtoll(cl, NULL, 10); conn->birth_time = time(NULL); handle_request(conn); discard_current_request_from_buffer(conn); } } // Worker threads take accepted socket from the queue static int consume_socket(struct mg_context *ctx, struct socket *sp) { (void) pthread_mutex_lock(&ctx->mutex); DEBUG_TRACE(("going idle")); // If the queue is empty, wait. We're idle at this point. while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) { pthread_cond_wait(&ctx->sq_full, &ctx->mutex); } // Master thread could wake us up without putting a socket. // If this happens, it is time to exit. if (ctx->stop_flag) { (void) pthread_mutex_unlock(&ctx->mutex); return 0; } assert(ctx->sq_head > ctx->sq_tail); // Copy socket from the queue and increment tail *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)]; ctx->sq_tail++; DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock)); // Wrap pointers if needed while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) { ctx->sq_tail -= ARRAY_SIZE(ctx->queue); ctx->sq_head -= ARRAY_SIZE(ctx->queue); } (void) pthread_cond_signal(&ctx->sq_empty); (void) pthread_mutex_unlock(&ctx->mutex); return 1; } static void worker_thread(struct mg_context *ctx) { struct mg_connection *conn; // This is the specified request size limit for DIAL requests. Note that // this will effectively make the request limit one byte *smaller* than the // required in the DIAL specification. int buf_size = MAX_REQUEST_SIZE; #ifndef __APPLE__ pthread_setname_np( pthread_self(), __func__); #endif conn = (struct mg_connection *) calloc(1, sizeof(*conn) + buf_size); conn->buf_size = buf_size; conn->buf = (char *) (conn + 1); assert(conn != NULL); while (ctx->stop_flag == 0 && consume_socket(ctx, &conn->client)) { conn->birth_time = time(NULL); conn->ctx = ctx; // Fill in IP, port info early so even if SSL setup below fails, // error handler would have the corresponding info. // Thanks to Johannes Winkelmann for the patch. memcpy(&conn->request_info.remote_addr, &conn->client.remote_addr, sizeof(conn->client.remote_addr)); // Fill in local IP info socklen_t addr_len = sizeof(conn->request_info.local_addr); getsockname(conn->client.sock, (struct sockaddr *) &conn->request_info.local_addr, &addr_len); process_new_connection(conn); close_connection(conn); } free(conn); // Signal master that we're done with connection and exiting (void) pthread_mutex_lock(&ctx->mutex); ctx->num_threads--; (void) pthread_cond_signal(&ctx->cond); assert(ctx->num_threads >= 0); (void) pthread_mutex_unlock(&ctx->mutex); DEBUG_TRACE(("exiting")); } // Master thread adds accepted socket to a queue static void produce_socket(struct mg_context *ctx, const struct socket *sp) { (void) pthread_mutex_lock(&ctx->mutex); // If the queue is full, wait while (ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) { (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex); } assert(ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)); // Copy socket to the queue and increment head ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp; ctx->sq_head++; DEBUG_TRACE(("queued socket %d", sp->sock)); (void) pthread_cond_signal(&ctx->sq_full); (void) pthread_mutex_unlock(&ctx->mutex); } static void master_thread(struct mg_context *ctx) { struct socket accepted; #ifndef __APPLE__ pthread_setname_np( pthread_self(), __func__); #endif socklen_t sock_len = sizeof(accepted.local_addr); memcpy(&accepted.local_addr, &ctx->local_address, sock_len); while (ctx->stop_flag == 0) { memset(&accepted.remote_addr, 0, sock_len); accepted.sock = accept(ctx->local_socket, (struct sockaddr *) &accepted.remote_addr, &sock_len); if (accepted.sock != INVALID_SOCKET) { // Put accepted socket structure into the queue. DEBUG_TRACE(("accepted socket %d", accepted.sock)); produce_socket(ctx, &accepted); } } DEBUG_TRACE(("stopping workers")); // Stop signal received: somebody called mg_stop. Quit. close_all_listening_sockets(ctx); // Wakeup workers that are waiting for connections to handle. pthread_cond_broadcast(&ctx->sq_full); // Wait until all threads finish (void) pthread_mutex_lock(&ctx->mutex); while (ctx->num_threads > 0) { (void) pthread_cond_wait(&ctx->cond, &ctx->mutex); } (void) pthread_mutex_unlock(&ctx->mutex); // All threads exited, no sync is needed. Destroy mutex and condvars (void) pthread_mutex_destroy(&ctx->mutex); (void) pthread_cond_destroy(&ctx->cond); (void) pthread_cond_destroy(&ctx->sq_empty); (void) pthread_cond_destroy(&ctx->sq_full); // Signal mg_stop() that we're done ctx->stop_flag = 2; DEBUG_TRACE(("exiting")); } static void free_context(struct mg_context *ctx) { // Deallocate context itself free(ctx); } void mg_stop(struct mg_context *ctx) { ctx->stop_flag = 1; // Wait until mg_fini() stops while (ctx->stop_flag != 2) { // TODO(steineldar): Avoid busy waiting. (void) sleep(0); } free_context(ctx); } struct mg_context *mg_start(mg_callback_t user_callback, void *user_data, int port) { struct mg_context *ctx; // Allocate context and initialize reasonable general case defaults. // TODO(lsm): do proper error handling here. ctx = (struct mg_context *) calloc(1, sizeof(*ctx)); ctx->user_callback = user_callback; ctx->user_data = user_data; if (!set_ports_option(ctx, port)) { free_context(ctx); return NULL; } // Ignore SIGPIPE signal, so if browser cancels the request, it // won't kill the whole process. (void) signal(SIGPIPE, SIG_IGN); (void) pthread_mutex_init(&ctx->mutex, NULL); (void) pthread_cond_init(&ctx->cond, NULL); (void) pthread_cond_init(&ctx->sq_empty, NULL); (void) pthread_cond_init(&ctx->sq_full, NULL); // Start master (listening) thread start_thread(ctx, (mg_thread_func_t) master_thread, ctx); // Start worker threads for (int i = 0; i < NUM_THREADS; i++) { if (start_thread(ctx, (mg_thread_func_t) worker_thread, ctx) != 0) { cry(fc(ctx), "Cannot start worker thread: %d", ERRNO); } else { ctx->num_threads++; } } return ctx; }