FastLED 3.9.15
Loading...
Searching...
No Matches
stream_transport.cpp.hpp
Go to the documentation of this file.
1#pragma once
2
4#include "fl/stl/json.h"
5#include "fl/stl/string.h"
6#include "fl/stl/cstring.h"
7#include "fl/stl/stdio.h"
8#include "fl/stl/noexcept.h"
9
10namespace fl {
11namespace net {
12namespace http {
13
14// --- StreamHandle implementation ---
15
21
23 if (mUpdateCallback) {
25 }
26 return *this;
27}
28
30 mPromise.then(fl::move(cb));
31 return *this;
32}
33
35 mPromise.catch_(fl::move(cb));
36 return *this;
37}
38
42
43bool StreamHandle::valid() const {
44 return mPromise.valid();
45}
46
47// --- HttpStreamTransport implementation ---
48
49HttpStreamTransport::HttpStreamTransport(const fl::string& host, u16 port, u32 heartbeatIntervalMs)
50 : mConnection(ConnectionConfig{}) // Use default config
53 , mHeartbeatInterval(heartbeatIntervalMs)
54 , mTimeoutMs(60000) // Default 60s timeout
55 , mWasConnected(false) {
56}
57
59 // Note: Cannot call pure virtual disconnect() here
60 // Subclasses must clean up in their own destructors
61}
62
64 if (id.is_int()) {
65 char buf[32];
66 fl::snprintf(buf, sizeof(buf), "%d", id.as_int().value());
67 return fl::string(buf);
68 }
69 if (id.is_string()) {
70 return id.as_string().value();
71 }
72 // Fallback for other types - use string representation
73 return id.to_string();
74}
75
77 for (;;) {
78 size_t chunkSize = mReader.nextChunkSize();
79 if (chunkSize == 0) {
80 break;
81 }
82
83 u8 stackBuf[512];
84 fl::vector<u8> heapBuf;
85 fl::span<u8> outSpan;
86 if (chunkSize <= sizeof(stackBuf)) {
87 outSpan = fl::span<u8>(stackBuf, sizeof(stackBuf));
88 } else {
89 heapBuf.resize(chunkSize);
90 outSpan = heapBuf;
91 }
92
93 ChunkedReadResult result = mReader.readChunk(outSpan);
94 if (!result.hasData()) {
95 break;
96 }
97
98 fl::string jsonStr(reinterpret_cast<const char*>(result.mData.data()), result.mData.size()); // ok reinterpret cast
100 if (json.is_null()) {
101 continue;
102 }
103
104 // Filter heartbeats
105 if (json["method"].as_string() == "rpc.ping") {
107 continue;
108 }
109
111
112 // Try to dispatch to pending calls/streams by id
113 if (json.contains("id") && !json["id"].is_null()) {
114 fl::string idKey = idToString(json["id"]);
115
116 if (resolveRpc(json, idKey)) {
117 continue; // Consumed by pending call
118 }
119 if (resolveRpcStream(json, idKey)) {
120 continue; // Consumed by pending stream
121 }
122 }
123
124 // Not matched - add to incoming queue for readRequest()
125 mIncomingQueue.push_back(fl::move(json));
126 }
127}
128
130 auto it = mPendingCalls.find(idKey);
131 if (it == mPendingCalls.end()) {
132 return false;
133 }
134 PendingCall* pending = &it->second;
135
136 // Check for error
137 if (msg.contains("error")) {
138 fl::task::Error err(msg["error"]["message"].as_string().value());
139 pending->promise.complete_with_error(err);
140 mPendingCalls.erase(idKey);
141 return true;
142 }
143
144 // Check for ACK (ASYNC mode sends acknowledged first)
145 if (msg.contains("result") && msg["result"].contains("acknowledged")) {
146 if (msg["result"]["acknowledged"].as_bool() == true) {
147 pending->ackReceived = true;
148 return true; // Stay pending, wait for final result
149 }
150 }
151
152 // Final result
153 pending->promise.complete_with_value(msg);
154 mPendingCalls.erase(idKey);
155 return true;
156}
157
159 auto it = mPendingStreams.find(idKey);
160 if (it == mPendingStreams.end()) {
161 return false;
162 }
163 PendingStream* pending = &it->second;
164
165 // Check for error
166 if (msg.contains("error")) {
167 fl::task::Error err(msg["error"]["message"].as_string().value());
168 pending->promise.complete_with_error(err);
169 mPendingStreams.erase(idKey);
170 return true;
171 }
172
173 // Check for ACK
174 if (msg.contains("result") && msg["result"].contains("acknowledged")) {
175 if (msg["result"]["acknowledged"].as_bool() == true) {
176 pending->ackReceived = true;
177 return true;
178 }
179 }
180
181 // Check for stream update
182 if (msg.contains("result") && msg["result"].contains("update")) {
183 if (pending->updateCallback && *pending->updateCallback) {
184 (*pending->updateCallback)(msg["result"]["update"]);
185 }
186 return true;
187 }
188
189 // Check for stream final (stop marker)
190 if (msg.contains("result") && msg["result"].contains("stop")) {
191 if (msg["result"]["stop"].as_bool() == true) {
192 // Resolve with the value field if present, otherwise the full result
193 if (msg["result"].contains("value")) {
194 pending->promise.complete_with_value(msg["result"]["value"]);
195 } else {
196 pending->promise.complete_with_value(msg["result"]);
197 }
198 mPendingStreams.erase(idKey);
199 return true;
200 }
201 }
202
203 // Unrecognized message shape - treat as final
204 pending->promise.complete_with_value(msg);
205 mPendingStreams.erase(idKey);
206 return true;
207}
208
210 int id = mNextCallId++;
211 fl::json request = fl::json::object();
212 request.set("jsonrpc", "2.0");
213 request.set("method", method);
214 request.set("params", params);
215 request.set("id", id);
216 return rpc(request);
217}
218
221
222 if (!isConnected()) {
223 p.complete_with_error(fl::task::Error("Not connected"));
224 return p;
225 }
226
227 fl::string idKey = idToString(fullRequest["id"]);
228 PendingCall pc;
229 pc.promise = p;
230 mPendingCalls.insert(idKey, fl::move(pc));
231
232 writeResponse(fullRequest);
233 return p;
234}
235
237 int id = mNextCallId++;
238 fl::json request = fl::json::object();
239 request.set("jsonrpc", "2.0");
240 request.set("method", method);
241 request.set("params", params);
242 request.set("id", id);
243 return rpcStream(request);
244}
245
247 auto updateCb = fl::make_shared<fl::function<void(const fl::json&)>>();
249
250 if (!isConnected()) {
251 p.complete_with_error(fl::task::Error("Not connected"));
252 return StreamHandle(fl::move(p), fl::move(updateCb));
253 }
254
255 fl::string idKey = idToString(fullRequest["id"]);
257 ps.promise = p;
258 ps.updateCallback = updateCb;
259 mPendingStreams.insert(idKey, fl::move(ps));
260
261 writeResponse(fullRequest);
262 return StreamHandle(p, updateCb);
263}
264
266 if (!isConnected()) {
267 return fl::nullopt;
268 }
269
270 // Process incoming data and drain into queue
273
274 // Pop from incoming queue
275 if (mIncomingQueue.empty()) {
276 return fl::nullopt;
277 }
278
279 fl::json front = fl::move(mIncomingQueue[0]);
280 // Shift remaining elements
281 fl::vector<fl::json> remaining;
282 remaining.reserve(mIncomingQueue.size() - 1);
283 for (size_t i = 1; i < mIncomingQueue.size(); i++) {
284 remaining.push_back(fl::move(mIncomingQueue[i]));
285 }
286 mIncomingQueue = fl::move(remaining);
287
288 return front;
289}
290
292 if (!isConnected()) {
293 return;
294 }
295
296 // Serialize JSON to string
297 fl::string jsonStr = response.to_string();
298
299 // Write chunked data into stack buffer (or heap for large payloads)
300 size_t needed = ChunkedWriter::chunkOverhead(jsonStr.size());
301 u8 stackBuf[512];
302 fl::vector<u8> heapBuf;
303 fl::span<u8> outSpan;
304 if (needed <= sizeof(stackBuf)) {
305 outSpan = fl::span<u8>(stackBuf, sizeof(stackBuf));
306 } else {
307 heapBuf.resize(needed);
308 outSpan = heapBuf;
309 }
310 size_t written = mWriter.writeChunk(
311 fl::span<const u8>(reinterpret_cast<const u8*>(jsonStr.c_str()), jsonStr.size()), // ok reinterpret cast
312 outSpan
313 );
314
315 // Send chunked data
316 if (written > 0) {
317 sendData(fl::span<const u8>(outSpan.data(), written));
318 }
319
320 // Update last sent time
322}
323
324void HttpStreamTransport::update(u32 currentTimeMs) {
325 // Update connection state
326 mConnection.update(currentTimeMs);
327 bool nowConnected = isConnected();
328
329 // Handle state changes
330 if (mWasConnected != nowConnected) {
331 handleConnectionStateChange(currentTimeMs);
332 mWasConnected = nowConnected;
333 }
334
335 if (!isConnected()) {
336 // Attempt reconnection if needed
337 if (mConnection.shouldReconnect()) {
339 }
340 return;
341 }
342
343 // Send heartbeat if needed
344 u32 timeSinceLastSent = currentTimeMs - mLastHeartbeatSent;
345 if (timeSinceLastSent >= mHeartbeatInterval) {
347 }
348
349 // Check for heartbeat timeout
350 checkHeartbeatTimeout(currentTimeMs);
351
352 // Process incoming data and drain messages (resolves promises)
355}
356
358 mOnConnect = callback;
359}
360
364
366 mHeartbeatInterval = intervalMs;
367}
368
372
374 mTimeoutMs = timeoutMs;
375}
376
378 return mTimeoutMs;
379}
380
382 // Default implementation uses system time
383 // Subclasses can override for testing
384 return fl::millis();
385}
386
388 // Default implementation does nothing
389 // Subclasses should override to implement reconnection logic
390}
391
393 if (!isConnected()) {
394 return;
395 }
396
397 // Create heartbeat request (rpc.ping notification)
398 fl::json heartbeat = fl::json::object();
399 heartbeat.set("jsonrpc", "2.0");
400 heartbeat.set("method", "rpc.ping");
401 heartbeat.set("id", fl::json()); // null value
402
403 // Write as response (uses same chunked encoding)
404 writeResponse(heartbeat);
405}
406
408 u32 timeSinceLastReceived = currentTimeMs - mLastHeartbeatReceived;
409 if (timeSinceLastReceived >= getTimeout()) {
410 // Heartbeat timeout - connection is dead
411 mConnection.onDisconnected();
412 disconnect();
413 }
414}
415
417 if (!isConnected()) {
418 return false;
419 }
420
421 // Read incoming data from socket
422 u8 buffer[1024];
423 int bytesRead = recvData(buffer);
424
425 if (bytesRead < 0) {
426 // Error reading from socket
427 mConnection.onDisconnected();
428 disconnect();
429 return false;
430 }
431
432 if (bytesRead == 0) {
433 // No data available (non-blocking socket)
434 return false;
435 }
436
437 // Feed data to chunked reader
438 mReader.feed(fl::span<const u8>(buffer, static_cast<size_t>(bytesRead)));
439
440 return true;
441}
442
444 if (isConnected()) {
445 // Just connected — use the caller-provided timestamp to avoid
446 // unsigned-underflow races between getCurrentTimeMs() and the
447 // currentTimeMs snapshot used by checkHeartbeatTimeout().
448 mLastHeartbeatSent = currentTimeMs;
449 mLastHeartbeatReceived = currentTimeMs;
450 if (mOnConnect) {
451 mOnConnect();
452 }
453 } else {
454 // Just disconnected
455 if (mOnDisconnect) {
457 }
458 }
459}
460
461} // namespace http
462} // namespace net
463} // namespace fl
const char * c_str() const FL_NOEXCEPT
fl::size size() const FL_NOEXCEPT
bool is_null() const FL_NOEXCEPT
Definition json.h:238
bool contains(size_t idx) const FL_NOEXCEPT
Definition json.h:625
void set(const fl::string &key, const json &value) FL_NOEXCEPT
Definition json.h:701
static json parse(const fl::string &txt) FL_NOEXCEPT
Definition json.h:677
static json object() FL_NOEXCEPT
Definition json.h:692
static size_t chunkOverhead(size_t dataLen)
void writeResponse(const fl::json &response)
Write JSON-RPC response to stream.
bool resolveRpc(const fl::json &msg, const fl::string &idKey)
StreamHandle rpcStream(const fl::string &method, const fl::json &params)
Send a streaming JSON-RPC request, returns StreamHandle for intermediate data.
fl::task::Promise< fl::json > rpc(const fl::string &method, const fl::json &params)
Send a JSON-RPC request, returns promise that resolves with the final response.
void setOnDisconnect(StateCallback callback)
Set callback for connection lost.
virtual void triggerReconnect()
Trigger reconnection (for subclasses to override)
fl::flat_map< fl::string, PendingCall, fl::StringFastLess > mPendingCalls
virtual int recvData(fl::span< u8 > buffer)=0
Receive raw data from connection.
void handleConnectionStateChange(u32 currentTimeMs)
void update(u32 currentTimeMs)
Update connection state (handles reconnection, heartbeat) Call this in main loop.
virtual u32 getCurrentTimeMs() const
Get current time in milliseconds.
fl::vector< fl::json > mIncomingQueue
void setHeartbeatInterval(u32 intervalMs)
Set heartbeat interval.
fl::function< void()> StateCallback
Connection state callback.
virtual bool isConnected() const =0
Check if connected.
void setOnConnect(StateCallback callback)
Set callback for connection established.
fl::flat_map< fl::string, PendingStream, fl::StringFastLess > mPendingStreams
virtual ~HttpStreamTransport() FL_NOEXCEPT
Virtual destructor.
void setTimeout(u32 timeoutMs)
Set connection timeout.
fl::optional< fl::json > readRequest()
Read next JSON-RPC request from stream Non-blocking, returns nullopt if no complete request available...
HttpStreamTransport(const fl::string &host, u16 port, u32 heartbeatIntervalMs=30000)
Constructor.
virtual void disconnect()=0
Disconnect from server.
static fl::string idToString(const fl::json &id)
u32 getTimeout() const
Get connection timeout.
u32 getHeartbeatInterval() const
Get heartbeat interval.
virtual int sendData(fl::span< const u8 > data)=0
Send raw data over connection.
bool resolveRpcStream(const fl::json &msg, const fl::string &idKey)
fl::shared_ptr< fl::function< void(const fl::json &)> > updateCallback
fl::task::Promise< fl::json > & promise()
Access the underlying promise.
StreamHandle() FL_NOEXCEPT=default
StreamHandle & onData(fl::function< void(const fl::json &)> cb)
Register callback for intermediate stream data.
StreamHandle & catch_(fl::function< void(const fl::task::Error &)> cb)
Register callback for errors.
fl::task::Promise< fl::json > mPromise
fl::shared_ptr< fl::function< void(const fl::json &)> > mUpdateCallback
StreamHandle & then(fl::function< void(const fl::json &)> cb)
Register callback for final result.
bool valid() const
Check if handle is valid.
Handle for ASYNC_STREAM calls Provides onData() for intermediate updates, plus then()/catch_() for fi...
const T * data() const FL_NOEXCEPT
Definition span.h:461
bool complete_with_value(const T &value) FL_NOEXCEPT
Complete the Promise with a result (used by networking library)
Definition promise.h:183
static Promise< T > create() FL_NOEXCEPT
Create a pending Promise.
Definition promise.h:61
bool complete_with_error(const Error &error) FL_NOEXCEPT
Complete the Promise with an error (used by networking library)
Definition promise.h:194
Promise class that provides fluent .then() and .catch_() semantics This is a lightweight wrapper arou...
Definition promise.h:58
void reserve(fl::size n) FL_NOEXCEPT
Definition vector.h:591
void push_back(const T &value) FL_NOEXCEPT
Definition vector.h:624
void resize(fl::size n) FL_NOEXCEPT
Definition vector.h:593
P ps[MAXP]
Definition Luminova.h:57
FastLED's Elegant JSON Library: fl::json
constexpr remove_reference< T >::type && move(T &&t) FL_NOEXCEPT
Definition s16x16x4.h:28
constexpr remove_reference< T >::type && move(T &&t) FL_NOEXCEPT
Definition move.h:28
unsigned char u8
Definition stdint.h:131
constexpr int type_rank< T >::value
fl::u32 millis()
Universal millisecond timer - returns milliseconds since system startup.
Optional< T > optional
Definition optional.h:16
int snprintf(char *buffer, fl::size size, const char *format, const Args &... args) FL_NOEXCEPT
Snprintf-like formatting function that writes to a buffer.
Definition stdio.h:666
shared_ptr< T > make_shared(Args &&... args) FL_NOEXCEPT
Definition shared_ptr.h:414
expected< T, E > result
Alias for expected (Rust-style naming)
Definition result.h:31
constexpr nullopt_t nullopt
Definition optional.h:13
Base definition for an LED controller.
Definition crgb.hpp:179
#define FL_NOEXCEPT
Error type for promises.
Definition promise.h:39