Overview
This module provides HTTP/1.1 streaming transport for FastLED's JSON-RPC system, enabling bidirectional communication over HTTP using chunked transfer encoding. The transport supports three RPC modes: SYNC (immediate response), ASYNC (ACK + later result), and ASYNC_STREAM (ACK + multiple updates + final result).
Architecture Components
Component Hierarchy
Remote (application layer)
↓
HttpStreamTransport (base transport interface)
↓
HttpStreamClient / HttpStreamServer (platform-specific implementations)
↓
HttpConnection (connection lifecycle)
↓
ChunkedReader / ChunkedWriter (HTTP/1.1 encoding)
↓
TCP Socket (platform-specific: POSIX, ESP-IDF, etc.)
Core Components
1. ChunkedReader / ChunkedWriter
Purpose: Parse and generate HTTP/1.1 chunked transfer encoding
Files:
ChunkedReader API:
class ChunkedReader {
public:
ChunkedReader();
void feed(const uint8_t* data, size_t len);
bool hasChunk() const;
bool isFinal() const;
void reset();
private:
enum State {
READ_SIZE,
READ_DATA,
READ_TRAILER,
FINAL
};
State mState;
size_t mChunkSize;
size_t mBytesRead;
};
ChunkedWriter API:
class ChunkedWriter {
public:
ChunkedWriter();
private:
};
Chunked Encoding Format:
<chunk-size-hex>\r\n
<chunk-data>
\r\n
<chunk-size-hex>\r\n
<chunk-data>
\r\n
...
0\r\n
\r\n
2. HttpParser
Purpose: Parse HTTP/1.1 request and response messages
Files:
HttpRequestParser API:
struct HttpRequest {
};
class HttpRequestParser {
public:
HttpRequestParser();
void feed(const uint8_t* data, size_t len);
bool isComplete() const;
void reset();
private:
enum State {
READ_REQUEST_LINE,
READ_HEADERS,
READ_BODY,
COMPLETE
};
State mState;
HttpRequest mRequest;
ChunkedReader mChunkedReader;
size_t mContentLength;
bool mIsChunked;
};
MapRedBlackTree< Key, T, Compare, fl::allocator_slab< char > > map
HttpResponseParser API:
struct HttpResponse {
int statusCode;
};
class HttpResponseParser {
public:
HttpResponseParser();
void feed(const uint8_t* data, size_t len);
bool isComplete() const;
void reset();
private:
enum State {
READ_STATUS_LINE,
READ_HEADERS,
READ_BODY,
COMPLETE
};
State mState;
HttpResponse mResponse;
ChunkedReader mChunkedReader;
size_t mContentLength;
bool mIsChunked;
};
3. HttpConnection
Purpose: Manage HTTP connection lifecycle with reconnection and heartbeat
Files:
HttpConnection API:
class HttpConnection {
public:
enum State {
DISCONNECTED,
CONNECTING,
CONNECTED,
RECONNECTING,
CLOSED
};
HttpConnection(const char* host, uint16_t port);
bool connect();
void disconnect();
void close();
bool isConnected() const;
State getState() const;
void setAutoReconnect(bool enable);
void setReconnectInterval(uint32_t minMs, uint32_t maxMs);
void setHeartbeatInterval(uint32_t intervalMs);
void setTimeout(uint32_t timeoutMs);
void update(uint32_t currentTimeMs);
virtual int send(const uint8_t* data, size_t len) = 0;
virtual int recv(uint8_t* buffer, size_t maxLen) = 0;
virtual bool isSocketConnected() const = 0;
protected:
virtual bool platformConnect() = 0;
virtual void platformDisconnect() = 0;
private:
State mState;
uint16_t mPort;
bool mAutoReconnect;
uint32_t mReconnectIntervalMin;
uint32_t mReconnectIntervalMax;
uint32_t mReconnectIntervalCurrent;
uint32_t mLastReconnectAttempt;
int mReconnectAttempts;
uint32_t mHeartbeatInterval;
uint32_t mLastHeartbeat;
uint32_t mTimeout;
uint32_t mLastActivity;
};
Reconnection Logic:
- Exponential backoff: 1s, 2s, 4s, 8s, ..., max 30s
- Reset backoff on successful connection
- Disabled by default (user must enable)
Heartbeat Logic:
- Send ping every
mHeartbeatInterval ms (default 30s)
- Expect pong response within
mTimeout ms (default 60s)
- Disconnect on timeout, trigger reconnect if enabled
4. NativeHttpClient / NativeHttpServer
Purpose: Platform-specific HTTP client/server implementations using POSIX sockets
Files:
NativeHttpClient API:
class NativeHttpClient : public HttpConnection {
public:
NativeHttpClient(const char* host, uint16_t port);
~NativeHttpClient();
int send(const uint8_t* data, size_t len) override;
int recv(uint8_t* buffer, size_t maxLen) override;
bool isSocketConnected() const override;
protected:
bool platformConnect() override;
void platformDisconnect() override;
private:
int mSocket;
};
NativeHttpServer API:
struct HttpClientConnection {
int socket;
uint16_t remotePort;
HttpRequestParser parser;
ChunkedWriter writer;
};
class NativeHttpServer {
public:
NativeHttpServer(uint16_t port);
~NativeHttpServer();
bool start();
void stop();
bool isListening() const;
void update();
void writeResponse(int clientId, const HttpResponse& response);
void writeChunk(int clientId, const uint8_t* data, size_t len);
void writeChunkFinal(int clientId);
void closeClient(int clientId);
private:
uint16_t mPort;
int mListenSocket;
void acceptClient();
};
5. HttpStreamTransport
Purpose: Base class for HTTP streaming transport, implements RequestSource/ResponseSink for Remote
Files:
HttpStreamTransport API:
class HttpStreamTransport {
public:
HttpStreamTransport(const char* host, uint16_t port);
virtual ~HttpStreamTransport() = default;
virtual bool connect() = 0;
virtual void disconnect() = 0;
virtual bool isConnected() const = 0;
void writeResponse(
const fl::json& response);
virtual void update(uint32_t currentTimeMs) = 0;
void setAutoReconnect(bool enable);
void setHeartbeatInterval(uint32_t intervalMs);
void setTimeout(uint32_t timeoutMs);
protected:
virtual int send(const uint8_t* data, size_t len) = 0;
virtual int recv(uint8_t* buffer, size_t maxLen) = 0;
HttpConnection* mConnection;
ChunkedReader mReader;
ChunkedWriter mWriter;
};
A first-in, first-out (FIFO) queue container adapter.
6. HttpStreamClient
Purpose: Client-side HTTP streaming for RPC
Files:
HttpStreamClient API:
class HttpStreamClient : public HttpStreamTransport {
public:
HttpStreamClient(const char* host, uint16_t port);
~HttpStreamClient();
bool connect() override;
void disconnect() override;
bool isConnected() const override;
void update(uint32_t currentTimeMs) override;
void sendRequest(
const fl::json& request);
protected:
int send(const uint8_t* data, size_t len) override;
int recv(uint8_t* buffer, size_t maxLen) override;
private:
NativeHttpClient mClient;
HttpResponseParser mResponseParser;
bool mRequestPending;
};
Usage Example (Client):
client->setAutoReconnect(true);
client->setHeartbeatInterval(30000);
[&client]() { return client->readRequest(); },
[&client](
const fl::json& r) { client->writeResponse(r); }
);
while (true) {
client->update(millis());
delay(10);
}
fl::unique_ptr< fl::Remote > remote
JSON-RPC server with scheduling support.
shared_ptr< T > make_shared(Args &&... args) FL_NOEXCEPT
7. HttpStreamServer
Purpose: Server-side HTTP streaming for RPC
Files:
HttpStreamServer API:
class HttpStreamServer : public HttpStreamTransport {
public:
HttpStreamServer(uint16_t port);
~HttpStreamServer();
bool start();
void stop();
bool isListening() const;
bool connect() override;
void disconnect() override;
bool isConnected() const override;
void update(uint32_t currentTimeMs) override;
protected:
int send(const uint8_t* data, size_t len) override;
int recv(uint8_t* buffer, size_t maxLen) override;
private:
NativeHttpServer mServer;
int mCurrentClient;
};
Usage Example (Server):
);
remote.bind(
"add", [](
int a,
int b) {
return a + b; });
remote.bindAsync(
"longTask", [](ResponseSend& send,
const Json& params) {
send.send(Json::object().set("ack", true));
send.send(Json::object().set("value", 42));
});
while (true) {
delay(10);
}
HTTP Streaming RPC Protocol
Request Format (Client → Server)
POST /rpc HTTP/1.1
Host: localhost:8080
Content-Type: application/json
Transfer-Encoding: chunked
Connection: keep-alive
<chunk-size>\r\n
{"jsonrpc":"2.0","method":"add","params":[1,2],"id":1}\r\n
0\r\n
\r\n
Response Formats (Server → Client)
SYNC Mode (Immediate Response)
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
<chunk-size>\r\n
{"jsonrpc":"2.0","result":3,"id":1}\r\n
0\r\n
\r\n
ASYNC Mode (ACK + Later Result)
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
<chunk-size>\r\n
{"jsonrpc":"2.0","result":{"ack":true},"id":1}\r\n
<chunk-size>\r\n
{"jsonrpc":"2.0","result":{"value":42},"id":1}\r\n
0\r\n
\r\n
ASYNC_STREAM Mode (ACK + Multiple Updates + Final)
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
<chunk-size>\r\n
{"jsonrpc":"2.0","result":{"ack":true},"id":1}\r\n
<chunk-size>\r\n
{"jsonrpc":"2.0","result":{"update":10},"id":1}\r\n
<chunk-size>\r\n
{"jsonrpc":"2.0","result":{"update":20},"id":1}\r\n
<chunk-size>\r\n
{"jsonrpc":"2.0","result":{"value":100,"stop":true},"id":1}\r\n
0\r\n
\r\n
Heartbeat (Bidirectional)
Send periodically to keep connection alive:
<chunk-size>\r\n
{"jsonrpc":"2.0","method":"rpc.ping","id":null}\r\n
Expected pong response:
<chunk-size>\r\n
{"jsonrpc":"2.0","result":"pong","id":null}\r\n
Connection Lifecycle
[DISCONNECTED] --connect()--> [CONNECTING] --success--> [CONNECTED]
^ | |
| fail error
| | |
+--<autoReconnect=false>---+ |
| |
+--<autoReconnect=true>---> [RECONNECTING] <---------+
|
exponential backoff
(1s, 2s, 4s, ..., 30s)
|
retry connect()
States:
DISCONNECTED: Initial state, no connection
CONNECTING: Connection attempt in progress
CONNECTED: Connection established, ready for I/O
RECONNECTING: Auto-reconnect in progress (exponential backoff)
CLOSED: Connection permanently closed (no reconnect)
Timeouts:
mTimeout (default 60s): Max idle time before disconnect
mHeartbeatInterval (default 30s): Ping interval
mReconnectInterval (dynamic): Backoff delay between reconnect attempts
Data Flow
Client Request Flow
User Code
|
v
Remote::sendRequest(method, params)
|
v
HttpStreamClient::writeResponse(jsonrpc)
|
v
ChunkedWriter::writeChunk(json)
|
v
NativeHttpClient::send(chunk)
|
v
TCP Socket → Server
Server Response Flow
TCP Socket ← Client
|
v
NativeHttpServer::recv(data)
|
v
HttpRequestParser::feed(data)
|
v
ChunkedReader::readChunk()
|
v
HttpStreamServer::readRequest()
|
v
Remote::update() → Rpc::handle(request)
|
v
User RPC Function (sync/async/stream)
|
v
ResponseSink::writeResponse(result)
|
v
ChunkedWriter::writeChunk(result)
|
v
NativeHttpServer::send(chunk)
|
v
TCP Socket → Client
Error Handling
Connection Errors
- Connection refused: Immediate reconnect if auto-reconnect enabled
- Connection timeout: Exponential backoff before retry
- Connection lost: Detect via socket error or heartbeat timeout, trigger reconnect
Protocol Errors
- Invalid HTTP: Close connection, log error
- Invalid JSON-RPC: Send error response per JSON-RPC 2.0 spec
- Malformed chunk: Close connection, log error
Timeout Errors
- Heartbeat timeout: No pong received within timeout → disconnect → reconnect
- Request timeout: No response within timeout → cancel request, return error
Platform Support
Native Platform (POSIX Sockets)
Supported:
- Linux (POSIX)
- macOS (POSIX)
- Windows (Winsock2, POSIX-like API)
Implementation:
NativeHttpClient uses POSIX socket(), connect(), send(), recv()
NativeHttpServer uses POSIX bind(), listen(), accept()
- Non-blocking sockets with
select() or poll() for multi-client
ESP32 Platform (Future)
Planned:
- ESP-IDF HTTP client (esp_http_client)
- ESP-IDF HTTP server (esp_http_server)
- WiFi connection management
- mDNS service discovery
Not Implemented Yet: ESP32 support will be added in future iterations
Testing Strategy
Unit Tests
- Chunked Encoding: Test ChunkedReader/ChunkedWriter with known inputs
- HTTP Parser: Test HttpRequestParser/HttpResponseParser with sample HTTP messages
- Connection State Machine: Test reconnection, heartbeat, timeout logic
- Mock Transport: Test HttpStreamTransport with mock socket layer
Integration Tests
- Loopback RPC: Client + Server on native platform, all RPC modes
- Stress Test: High-frequency requests, measure latency/throughput
- Reconnection Test: Simulate connection loss, verify auto-reconnect
- Heartbeat Test: Long-idle connection, verify ping/pong
Examples
- RpcServer: Native server with SYNC/ASYNC/ASYNC_STREAM methods (
examples/Asio/RpcServer/)
- RpcClient: Native client calling server methods (
examples/Asio/RpcClient/)
- RpcBidirectional: Client + Server in same process, loopback (
examples/Asio/RpcBidirectional/)
Performance Considerations
Latency
- Chunked encoding overhead: Minimal (~10 bytes per chunk)
- HTTP overhead: ~100 bytes per request/response (headers)
- JSON parsing: Optimized with FastLED's native JSON parser
Throughput
- Single connection: ~1000 req/s on localhost (native platform)
- Multiple connections: Server scales with thread pool (future)
- Streaming: Multiple responses per request, minimal latency
Memory
- Per-connection overhead: ~1KB (buffers, state)
- Per-request overhead: ~256 bytes (request/response objects)
- Chunked buffers: Dynamic allocation, freed after chunk processed
Security Considerations
NOT Implemented
This is a basic transport layer for development/testing. The following security features are NOT implemented:
- ❌ Authentication: No user authentication, all requests accepted
- ❌ Authorization: No access control, all methods callable
- ❌ Encryption: Plain HTTP (not HTTPS), no TLS/SSL
- ❌ Input validation: Minimal validation, assumes trusted clients
- ❌ Rate limiting: No request rate limiting
- ❌ DoS protection: No protection against denial-of-service
For Production Use: Add HTTPS (TLS), authentication (tokens, OAuth), and rate limiting
Next Steps (Implementation)
Phase 2: HTTP Streaming Transport Layer
- ✅ Task 2.1: Design HTTP Transport Architecture (THIS DOCUMENT)
- Task 2.2: Implement HTTP Chunked Encoding Parser
- Task 2.3: Implement HTTP Request/Response Parser
- Task 2.4: Implement Connection State Machine
- Task 2.5: Implement Native HTTP Client
- Task 2.6: Implement Native HTTP Server
Phase 3: HTTP Streaming RPC Integration
- Task 3.1: Design HTTP Streaming RPC Protocol (see PROTOCOL.md)
- Task 3.2: Implement HttpStreamTransport Base
- Task 3.3: Implement HttpStreamClient
- Task 3.4: Implement HttpStreamServer
- Task 3.5: Integrate HttpStreamTransport with Remote
Status: ✅ Architecture design complete Next: Implement chunked encoding parser (Task 2.2)