From 9a7c599d4666157238c852d6ccc2a0ff961d8010 Mon Sep 17 00:00:00 2001 From: Signalforger Date: Wed, 14 Jan 2026 13:57:47 +0100 Subject: [PATCH] Add Streamforge proxy integration for high-performance uploads When Streamforge FastCGI proxy is deployed between nginx and php-fpm, it handles file uploads by writing them to disk before PHP starts, freeing PHP workers from I/O wait during slow uploads. Integration features: - Automatic detection via HTTP_X_STREAMFORGE and HTTP_X_UPLOAD_FILE_COUNT - Transparent UploadedFile creation from streamforge metadata headers - Request::isStreamforgeEnabled() static method for userland detection - RSHUTDOWN cleanup of unmoved temp files to prevent disk leaks - Zero changes required to existing application code Protocol: Streamforge sends HTTP_X_UPLOAD_N_* headers with file metadata (NAME, FILENAME, PATH, SIZE, TYPE) instead of populating $_FILES. Adds comprehensive README section documenting the integration. --- README.md | 109 ++++++++++++++++++++++++++++++++++++++ php_signalforge_http.h | 15 ++++-- signalforge_http.c | 40 +++++++++++++- src/request.c | 105 +++++++++++++++++++++++++++++++++++++ src/uploadedfile.c | 116 +++++++++++++++++++++++++++++++++++++++++ src/uploadedfile.h | 17 ++++-- 6 files changed, 395 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 49ab928..a0e4ecf 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,115 @@ HTTP request/response handling is invoked on nearly every request, often hundred - **Immutable Objects**: All `with*()` methods return new instances - **Memory Efficient**: Proper reference counting and cleanup +## Streamforge Proxy Integration + +The extension integrates seamlessly with the [Streamforge](../streamforge) FastCGI proxy for high-performance file upload handling. When Streamforge is deployed between nginx and php-fpm, it offloads file upload I/O from PHP workers, dramatically improving throughput for upload-heavy applications. + +### How It Works + +``` +WITHOUT STREAMFORGE: +┌────────┐ ┌─────────┐ ┌───────────┐ +│ nginx │────▶│ php-fpm │────▶│ $_FILES │ +└────────┘ └─────────┘ └───────────┘ + │ + Worker blocked during + entire upload duration + +WITH STREAMFORGE: +┌────────┐ ┌─────────────┐ ┌─────────┐ ┌─────────────────────┐ +│ nginx │────▶│ streamforge │────▶│ php-fpm │────▶│ HTTP_X_UPLOAD_* hdrs│ +└────────┘ └─────────────┘ └─────────┘ └─────────────────────┘ + │ + Files written to disk + before PHP starts +``` + +### Benefits + +| Scenario | Standard PHP | With Streamforge | +|----------|--------------|------------------| +| 500MB upload over slow connection | Worker blocked ~30s | Worker engaged ~5ms | +| Memory per upload | Up to 500MB buffered | ~100KB proxy buffers | +| 20 concurrent uploads, 10 workers | Site unresponsive | No impact on other requests | + +### Transparent Integration + +The extension automatically detects Streamforge and reads uploads from the appropriate source. **Your application code remains unchanged**: + +```php +// Works identically with or without Streamforge +$request = Request::capture(); +$files = $request->getUploadedFiles(); + +foreach ($files as $name => $file) { + $file->getClientFilename(); // "document.pdf" + $file->getSize(); // 52428800 + $file->moveTo('/storage/docs/document.pdf'); +} +``` + +### Detection API + +Check if Streamforge is handling the current request: + +```php +use Signalforge\NativeHttp\Request; + +// Static method +if (Request::isStreamforgeEnabled()) { + // Streamforge is proxying this request +} + +// Or check $_SERVER directly +if (isset($_SERVER['HTTP_X_STREAMFORGE'])) { + // Streamforge marker present +} + +// Check for processed uploads +if (isset($_SERVER['HTTP_X_UPLOAD_FILE_COUNT'])) { + $count = (int) $_SERVER['HTTP_X_UPLOAD_FILE_COUNT']; + // Streamforge handled $count file uploads +} +``` + +### Protocol + +When Streamforge handles multipart uploads, it: + +1. Parses the multipart body and writes files to disk +2. Adds metadata headers to the FastCGI request: + - `HTTP_X_STREAMFORGE=1` - Proxy marker + - `HTTP_X_UPLOAD_FILE_COUNT=N` - Number of uploaded files + - `HTTP_X_UPLOAD_0_NAME` - Form field name + - `HTTP_X_UPLOAD_0_FILENAME` - Original client filename + - `HTTP_X_UPLOAD_0_PATH` - Temp file path on disk + - `HTTP_X_UPLOAD_0_SIZE` - File size in bytes + - `HTTP_X_UPLOAD_0_TYPE` - MIME type +3. Sends only form fields (not file content) to PHP-FPM + +The extension reads these headers and creates `UploadedFile` objects that work identically to standard PHP uploads. + +### Cleanup + +Temp files are automatically cleaned up: +- **On `moveTo()`**: File is moved, no cleanup needed +- **On request end**: Unmoved temp files are deleted by the extension's RSHUTDOWN handler + +This prevents disk space leaks even if application code doesn't handle all uploaded files. + +### Deployment + +See the [Streamforge documentation](../streamforge/README.md) for deployment instructions. Basic setup: + +```bash +# Start Streamforge between nginx and php-fpm +streamforge -l 0.0.0.0:9001 -u /var/run/php-fpm.sock -d /tmp/uploads + +# Configure nginx to send requests to Streamforge +# fastcgi_pass 127.0.0.1:9001; +``` + ## Requirements - PHP 8.3, 8.4, or 8.5 diff --git a/php_signalforge_http.h b/php_signalforge_http.h index ae469cc..928f427 100644 --- a/php_signalforge_http.h +++ b/php_signalforge_http.h @@ -52,12 +52,21 @@ extern zend_class_entry *signalforge_uri_ce; /* ============================================================================ * Module Globals * - * Per-request storage for interned strings. Automatically thread-safe - * in ZTS builds, regular globals in non-ZTS builds. + * Per-request storage for streamforge integration and cleanup tracking. + * Automatically thread-safe in ZTS builds, regular globals in non-ZTS builds. * ============================================================================ */ +/* Maximum streamforge uploads to track for cleanup */ +#define SIGNALFORGE_MAX_STREAMFORGE_UPLOADS 64 + ZEND_BEGIN_MODULE_GLOBALS(signalforge_http) - int dummy; /* Keep at least one global for TSRM */ + /* Streamforge integration */ + zend_bool streamforge_detected; /* True if HTTP_X_STREAMFORGE=1 present */ + int streamforge_upload_count; /* Number of uploads from streamforge */ + + /* Temp file paths that need cleanup on RSHUTDOWN if not moved */ + char *streamforge_temp_paths[SIGNALFORGE_MAX_STREAMFORGE_UPLOADS]; + zend_bool streamforge_temp_moved[SIGNALFORGE_MAX_STREAMFORGE_UPLOADS]; ZEND_END_MODULE_GLOBALS(signalforge_http) ZEND_EXTERN_MODULE_GLOBALS(signalforge_http) diff --git a/signalforge_http.c b/signalforge_http.c index 7a23cff..517c26c 100644 --- a/signalforge_http.c +++ b/signalforge_http.c @@ -16,6 +16,7 @@ #include "php_signalforge_http.h" #include "src/psr7_interfaces.h" +#include /* For unlink() */ #include "src/request.h" #include "src/response.h" #include "src/stream.h" @@ -46,6 +47,8 @@ PHP_MINFO_FUNCTION(signalforge_http) "disabled" #endif ); + php_info_print_table_row(2, "Streamforge Integration", "enabled"); + php_info_print_table_row(2, "Max Streamforge Uploads", ZEND_TOSTR(SIGNALFORGE_MAX_STREAMFORGE_UPLOADS)); php_info_print_table_end(); } @@ -82,6 +85,16 @@ PHP_RINIT_FUNCTION(signalforge_http) ZEND_TSRMLS_CACHE_UPDATE(); #endif + /* Initialize streamforge state for this request */ + SIGNALFORGE_HTTP_G(streamforge_detected) = 0; + SIGNALFORGE_HTTP_G(streamforge_upload_count) = 0; + + /* Clear temp path tracking arrays */ + for (int i = 0; i < SIGNALFORGE_MAX_STREAMFORGE_UPLOADS; i++) { + SIGNALFORGE_HTTP_G(streamforge_temp_paths)[i] = NULL; + SIGNALFORGE_HTTP_G(streamforge_temp_moved)[i] = 0; + } + return SUCCESS; } @@ -91,7 +104,32 @@ PHP_RSHUTDOWN_FUNCTION(signalforge_http) ZEND_TSRMLS_CACHE_UPDATE(); #endif - /* No per-request cleanup needed - all resources freed via free_obj handlers */ + /* + * Clean up streamforge temp files that weren't moved. + * + * When streamforge handles uploads, it writes files to temp paths. If the + * PHP application doesn't call moveTo() on an uploaded file, we need to + * delete the temp file to prevent disk space leaks. + * + * Files that were moved have streamforge_temp_moved[i] = 1. + */ + for (int i = 0; i < SIGNALFORGE_HTTP_G(streamforge_upload_count); i++) { + if (SIGNALFORGE_HTTP_G(streamforge_temp_paths)[i] != NULL) { + /* Delete if not moved */ + if (!SIGNALFORGE_HTTP_G(streamforge_temp_moved)[i]) { + unlink(SIGNALFORGE_HTTP_G(streamforge_temp_paths)[i]); + } + + /* Free the path string */ + efree(SIGNALFORGE_HTTP_G(streamforge_temp_paths)[i]); + SIGNALFORGE_HTTP_G(streamforge_temp_paths)[i] = NULL; + } + } + + /* Reset state */ + SIGNALFORGE_HTTP_G(streamforge_detected) = 0; + SIGNALFORGE_HTTP_G(streamforge_upload_count) = 0; + return SUCCESS; } diff --git a/src/request.c b/src/request.c index fac85b2..2bcabd4 100644 --- a/src/request.c +++ b/src/request.c @@ -778,6 +778,52 @@ PHP_METHOD(Signalforge_Http_Request, capture) } /* }}} */ +/* {{{ proto bool Request::isStreamforgeEnabled() + * Check if the current request is being proxied through streamforge. + * + * When streamforge is handling requests, it adds HTTP_X_STREAMFORGE=1 to + * the server params. This method provides a convenient way to check for + * streamforge presence without manually inspecting $_SERVER. + * + * Use cases: + * - Conditional logic based on proxy presence + * - Debugging/logging + * - Performance optimization paths + * + * @return bool True if streamforge proxy is detected + */ +PHP_METHOD(Signalforge_Http_Request, isStreamforgeEnabled) +{ + zval *server_zv; + + ZEND_PARSE_PARAMETERS_NONE(); + + /* Check global detection flag first (set during getUploadedFiles) */ + if (SIGNALFORGE_HTTP_G(streamforge_detected)) { + RETURN_TRUE; + } + + /* Check $_SERVER for HTTP_X_STREAMFORGE header */ + server_zv = signalforge_get_superglobal("_SERVER", sizeof("_SERVER") - 1); + if (server_zv && Z_TYPE_P(server_zv) == IS_ARRAY) { + zval *marker = zend_hash_str_find(Z_ARRVAL_P(server_zv), + "HTTP_X_STREAMFORGE", sizeof("HTTP_X_STREAMFORGE") - 1); + if (marker) { + RETURN_TRUE; + } + + /* Also check for upload file count (indicates streamforge handled uploads) */ + zval *file_count = zend_hash_str_find(Z_ARRVAL_P(server_zv), + "HTTP_X_UPLOAD_FILE_COUNT", sizeof("HTTP_X_UPLOAD_FILE_COUNT") - 1); + if (file_count) { + RETURN_TRUE; + } + } + + RETURN_FALSE; +} +/* }}} */ + /* {{{ proto Request Request::create(string $method, mixed $uri, array $serverParams = []) * Creates a new Request instance with the specified method and URI. * This is the PSR-17 factory method for creating Request instances programmatically. @@ -1853,6 +1899,61 @@ PHP_METHOD(Signalforge_Http_Request, getUploadedFiles) array_init(return_value); + /* + * Check for streamforge proxy uploads first. + * + * When streamforge handles multipart uploads, it writes files to disk + * and passes metadata via HTTP_X_UPLOAD_* headers. $_FILES will be empty + * because streamforge already consumed the multipart body. + */ + if (Z_TYPE(intern->zv_server) == IS_ARRAY) { + zval *file_count_zv = zend_hash_str_find(Z_ARRVAL(intern->zv_server), + "HTTP_X_UPLOAD_FILE_COUNT", sizeof("HTTP_X_UPLOAD_FILE_COUNT") - 1); + + if (file_count_zv) { + /* Streamforge mode - read uploads from HTTP_X_UPLOAD_* headers */ + int count = 0; + + if (Z_TYPE_P(file_count_zv) == IS_LONG) { + count = (int)Z_LVAL_P(file_count_zv); + } else if (Z_TYPE_P(file_count_zv) == IS_STRING) { + count = atoi(Z_STRVAL_P(file_count_zv)); + } + + /* Mark streamforge as detected in globals */ + SIGNALFORGE_HTTP_G(streamforge_detected) = 1; + + for (int i = 0; i < count && i < SIGNALFORGE_MAX_STREAMFORGE_UPLOADS; i++) { + char name_key[64]; + zval uploaded_file_zv; + zval *field_name_zv; + + /* Get the form field name for this upload */ + snprintf(name_key, sizeof(name_key), "HTTP_X_UPLOAD_%d_NAME", i); + field_name_zv = zend_hash_str_find(Z_ARRVAL(intern->zv_server), + name_key, strlen(name_key)); + + /* Create UploadedFile from streamforge metadata */ + signalforge_uploadedfile_from_streamforge( + Z_ARRVAL(intern->zv_server), i, &uploaded_file_zv); + + /* Add to result array using field name as key */ + if (field_name_zv && Z_TYPE_P(field_name_zv) == IS_STRING) { + zend_hash_str_update(Z_ARRVAL_P(return_value), + Z_STRVAL_P(field_name_zv), Z_STRLEN_P(field_name_zv), + &uploaded_file_zv); + } else { + /* Fallback: use numeric index if no field name */ + zend_hash_index_update(Z_ARRVAL_P(return_value), i, &uploaded_file_zv); + } + } + + /* Return early - streamforge uploads are already handled */ + return; + } + } + + /* Standard mode - read uploads from $_FILES */ if (Z_TYPE(intern->zv_files) == IS_ARRAY) { /* Convert $_FILES array entries to UploadedFile objects */ ZEND_HASH_FOREACH_STR_KEY_VAL(Z_ARRVAL(intern->zv_files), key, val) { @@ -2069,6 +2170,9 @@ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_request_create, 0, 2, Signalforge ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, serverParams, IS_ARRAY, 1, "[]") ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_request_isStreamforgeEnabled, 0, 0, _IS_BOOL, 0) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_request_getProtocolVersion, 0, 0, IS_STRING, 0) ZEND_END_ARG_INFO() @@ -2189,6 +2293,7 @@ ZEND_END_ARG_INFO() static const zend_function_entry signalforge_request_methods[] = { PHP_ME(Signalforge_Http_Request, capture, arginfo_request_capture, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(Signalforge_Http_Request, create, arginfo_request_create, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Signalforge_Http_Request, isStreamforgeEnabled, arginfo_request_isStreamforgeEnabled, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) /* MessageInterface */ PHP_ME(Signalforge_Http_Request, getProtocolVersion, arginfo_request_getProtocolVersion, ZEND_ACC_PUBLIC) diff --git a/src/uploadedfile.c b/src/uploadedfile.c index a5b1e5e..9255914 100644 --- a/src/uploadedfile.c +++ b/src/uploadedfile.c @@ -54,6 +54,10 @@ static zend_object *signalforge_uploadedfile_create_object(zend_class_entry *ce) intern->error = 0; intern->stream_loaded = 0; + /* Initialize streamforge state */ + intern->from_streamforge = 0; + intern->streamforge_index = -1; + /* Set up Zend object infrastructure */ zend_object_std_init(&intern->std, ce); object_properties_init(&intern->std, ce); @@ -488,6 +492,9 @@ PHP_METHOD(Signalforge_Http_UploadedFile, moveTo) RETURN_THROWS(); } + /* Mark streamforge temp file as moved (prevents RSHUTDOWN cleanup) */ + signalforge_uploadedfile_mark_moved(intern); + /* Update tmp_name to new location */ if (intern->tmp_name) { zend_string_release(intern->tmp_name); @@ -637,3 +644,112 @@ void signalforge_uploadedfile_register_class(void) } +/* ============================================================================ + * STREAMFORGE INTEGRATION + * + * When the streamforge proxy handles multipart uploads, it writes files to disk + * and passes metadata via HTTP_X_UPLOAD_* headers instead of populating $_FILES. + * These functions create UploadedFile objects from that metadata. + * ============================================================================ */ + +/** + * Create UploadedFile from streamforge HTTP_X_UPLOAD_* headers + * + * Streamforge sends these headers for each uploaded file: + * HTTP_X_UPLOAD_N_NAME - Form field name + * HTTP_X_UPLOAD_N_FILENAME - Original client filename + * HTTP_X_UPLOAD_N_PATH - Temp file path on disk + * HTTP_X_UPLOAD_N_SIZE - File size in bytes + * HTTP_X_UPLOAD_N_TYPE - MIME type (optional) + * + * @param server_ht $_SERVER hashtable + * @param index Upload index (0, 1, 2, ...) + * @param return_value zval to initialize with UploadedFile object + * @return Internal object pointer, or NULL on error + */ +signalforge_uploadedfile_object *signalforge_uploadedfile_from_streamforge( + HashTable *server_ht, int index, zval *return_value) +{ + signalforge_uploadedfile_object *intern; + char key[64]; + zval *val; + + /* Create new instance */ + object_init_ex(return_value, signalforge_uploadedfile_ce); + intern = Z_SIGNALFORGE_UPLOADEDFILE_P(return_value); + + /* Mark as from streamforge */ + intern->from_streamforge = 1; + intern->error = 0; /* UPLOAD_ERR_OK - streamforge already validated */ + + /* Get temp file path (required) */ + snprintf(key, sizeof(key), "HTTP_X_UPLOAD_%d_PATH", index); + val = zend_hash_str_find(server_ht, key, strlen(key)); + if (val && Z_TYPE_P(val) == IS_STRING && Z_STRLEN_P(val) > 0) { + intern->tmp_name = zend_string_copy(Z_STR_P(val)); + + /* Register for cleanup tracking */ + if (SIGNALFORGE_HTTP_G(streamforge_upload_count) < SIGNALFORGE_MAX_STREAMFORGE_UPLOADS) { + int idx = SIGNALFORGE_HTTP_G(streamforge_upload_count)++; + SIGNALFORGE_HTTP_G(streamforge_temp_paths)[idx] = estrdup(Z_STRVAL_P(val)); + SIGNALFORGE_HTTP_G(streamforge_temp_moved)[idx] = 0; + intern->streamforge_index = idx; + } + } else { + /* No path - upload error */ + intern->tmp_name = NULL; + intern->error = 4; /* UPLOAD_ERR_NO_FILE */ + return intern; + } + + /* Get original filename */ + snprintf(key, sizeof(key), "HTTP_X_UPLOAD_%d_FILENAME", index); + val = zend_hash_str_find(server_ht, key, strlen(key)); + if (val && Z_TYPE_P(val) == IS_STRING) { + intern->client_filename = zend_string_copy(Z_STR_P(val)); + } else { + intern->client_filename = NULL; + } + + /* Get MIME type */ + snprintf(key, sizeof(key), "HTTP_X_UPLOAD_%d_TYPE", index); + val = zend_hash_str_find(server_ht, key, strlen(key)); + if (val && Z_TYPE_P(val) == IS_STRING) { + intern->client_media_type = zend_string_copy(Z_STR_P(val)); + } else { + intern->client_media_type = NULL; + } + + /* Get file size */ + snprintf(key, sizeof(key), "HTTP_X_UPLOAD_%d_SIZE", index); + val = zend_hash_str_find(server_ht, key, strlen(key)); + if (val) { + if (Z_TYPE_P(val) == IS_LONG) { + intern->size = Z_LVAL_P(val); + } else if (Z_TYPE_P(val) == IS_STRING) { + char *endptr = NULL; + zend_long parsed = ZEND_STRTOL(Z_STRVAL_P(val), &endptr, 10); + if (endptr && *endptr == '\0' && parsed >= 0) { + intern->size = parsed; + } + } + } + + return intern; +} + +/** + * Mark a streamforge upload as moved + * + * Called from moveTo() when file is successfully moved. This prevents + * RSHUTDOWN from trying to clean up a file that no longer exists at + * its original temp location. + */ +void signalforge_uploadedfile_mark_moved(signalforge_uploadedfile_object *intern) +{ + if (intern->from_streamforge && intern->streamforge_index >= 0 && + intern->streamforge_index < SIGNALFORGE_MAX_STREAMFORGE_UPLOADS) { + SIGNALFORGE_HTTP_G(streamforge_temp_moved)[intern->streamforge_index] = 1; + } +} + diff --git a/src/uploadedfile.h b/src/uploadedfile.h index 7ef9919..890a633 100644 --- a/src/uploadedfile.h +++ b/src/uploadedfile.h @@ -17,17 +17,21 @@ extern zend_class_entry *signalforge_uploadedfile_ce; * ============================================================================ */ typedef struct _signalforge_uploadedfile_object { - /* File information from $_FILES */ + /* File information from $_FILES or streamforge */ zend_string *tmp_name; // Temporary file path (OWNED) zend_string *client_filename; // Original filename (OWNED, may be NULL) zend_string *client_media_type; // MIME type (OWNED, may be NULL) zend_long size; // File size in bytes zend_long error; // UPLOAD_ERR_* constant - + /* Stream (lazy-loaded) */ zval zv_stream; // StreamInterface object (lazy-loaded) zend_bool stream_loaded; // Flag to track if stream was created - + + /* Streamforge integration */ + zend_bool from_streamforge; // True if file came from streamforge proxy + int streamforge_index; // Index in globals temp_paths array (-1 if N/A) + /* Standard zend_object MUST be last member */ zend_object std; } signalforge_uploadedfile_object; @@ -57,5 +61,12 @@ void signalforge_uploadedfile_register_class(void); signalforge_uploadedfile_object *signalforge_uploadedfile_from_files_array(zval *file_data, zval *return_value); +/* Create UploadedFile from streamforge HTTP_X_UPLOAD_* headers */ +signalforge_uploadedfile_object *signalforge_uploadedfile_from_streamforge( + HashTable *server_ht, int index, zval *return_value); + +/* Mark streamforge upload as moved (prevents RSHUTDOWN cleanup) */ +void signalforge_uploadedfile_mark_moved(signalforge_uploadedfile_object *intern); + #endif /* SIGNALFORGE_UPLOADEDFILE_H */