FastLED 3.9.15
Loading...
Searching...
No Matches
remote.cpp.hpp
Go to the documentation of this file.
1#include "fl/remote/remote.h"
2#include "fl/stl/int.h"
3#include "fl/stl/json.h"
4#include "fl/log/log.h"
5#include "fl/remote/rpc/rpc.h"
7#include "fl/remote/types.h"
9#include "fl/stl/chrono.h"
10#include "fl/stl/cstddef.h"
11#include "fl/stl/function.h"
12#include "fl/stl/move.h"
13#include "fl/stl/optional.h"
14#include "fl/stl/string.h"
15#include "fl/stl/strstream.h"
16#include "fl/stl/vector.h"
17
18namespace fl {
19
20bool Remote::unbind(const fl::string& name) {
21 bool removed = mRpc.unbind(name.c_str());
22 if (removed) {
23 FL_DBG("Unregistered RPC function: " << name);
24 }
25 return removed;
26}
27
28bool Remote::has(const fl::string& name) const {
29 return mRpc.has(name.c_str());
30}
31
32// Async Response Support
33
34void Remote::sendAsyncResponse(const char* method, const fl::json& result) {
35 fl::string methodName(method);
36 auto it = mAsyncRequests.find(methodName);
37 if (it == mAsyncRequests.end()) {
38 FL_WARN("No pending async request for method: " << method);
39 return;
40 }
41
42 int requestId = it->second.requestId;
43 mAsyncRequests.erase(it);
44
45 // Build JSON-RPC response
46 fl::json response = fl::json::object();
47 response.set("jsonrpc", "2.0");
48 response.set("id", requestId);
49 response.set("result", result);
50
51 // Send via response sink
52 if (mResponseSink) {
53 mResponseSink(response);
54 FL_DBG("Sent async response for " << method << " (id=" << requestId << ")");
55 }
56}
57
58void Remote::sendStreamUpdate(const char* method, const fl::json& update) {
59 fl::string methodName(method);
60 auto it = mAsyncRequests.find(methodName);
61 if (it == mAsyncRequests.end()) {
62 FL_WARN("No pending async request for method: " << method);
63 return;
64 }
65
66 int requestId = it->second.requestId;
67 // Don't erase - stream is still active
68
69 // Build JSON-RPC response with "update" marker
70 fl::json response = fl::json::object();
71 response.set("jsonrpc", "2.0");
72 response.set("id", requestId);
73
74 fl::json resultObj = fl::json::object();
75 resultObj.set("update", update);
76 response.set("result", resultObj);
77
78 // Send via response sink
79 if (mResponseSink) {
80 mResponseSink(response);
81 FL_DBG("Sent stream update for " << method << " (id=" << requestId << ")");
82 }
83}
84
85void Remote::sendStreamFinal(const char* method, const fl::json& result) {
86 fl::string methodName(method);
87 auto it = mAsyncRequests.find(methodName);
88 if (it == mAsyncRequests.end()) {
89 FL_WARN("No pending async request for method: " << method);
90 return;
91 }
92
93 int requestId = it->second.requestId;
94 mAsyncRequests.erase(it); // Stream complete - remove request
95
96 // Build JSON-RPC response with "stop" marker
97 fl::json response = fl::json::object();
98 response.set("jsonrpc", "2.0");
99 response.set("id", requestId);
100
101 fl::json resultObj = fl::json::object();
102 resultObj.set("value", result);
103 resultObj.set("stop", true);
104 response.set("result", resultObj);
105
106 // Send via response sink
107 if (mResponseSink) {
108 mResponseSink(response);
109 FL_DBG("Sent stream final for " << method << " (id=" << requestId << ")");
110 }
111}
112
113// Error Reporting
114
115void Remote::reportError(const fl::string& message) {
116 fl::json params = fl::json::object();
117 params.set("message", message);
118 reportError(params);
119}
120
121void Remote::reportError(const fl::json& data) {
122 if (!mResponseSink) {
123 return;
124 }
125 fl::json notification = fl::json::object();
126 notification.set("jsonrpc", "2.0");
127 notification.set("method", "__error");
128 notification.set("params", data);
129 mResponseSink(notification);
130}
131
132// RPC Processing
133
135 // Extract optional timestamp field (0 = immediate, >0 = scheduled)
136 u32 timestamp = 0;
137 if (request.contains("timestamp") && request["timestamp"].is_int()) {
138 timestamp = static_cast<u32>(request["timestamp"].as_int().value());
139 }
140
141 u32 receivedAt = fl::millis();
142
143 // Execute or schedule
144 if (timestamp == 0) {
145 // Store request ID BEFORE invoking function (needed for async functions)
146 // This allows sendAsyncResponse() to find the request ID while function is running
147 if (request.contains("id") && request.contains("method")) {
148 fl::string methodName = request["method"].as_string().value_or("");
149 int requestId = request["id"].as_int().value_or(0);
150 mAsyncRequests[methodName] = {requestId, receivedAt};
151 FL_DBG("Stored request ID for " << methodName.c_str() << " (id=" << requestId << ")");
152 }
153
154 // Immediate execution - pass directly to Rpc
155 fl::json response = mRpc.handle(request);
156
157 // For async functions, response already sent via sendAsyncResponse()
158 if (response.contains("__async") && response["__async"].as_bool().value_or(false)) {
159 // Don't return response (ACK already sent by Rpc)
160 // Return null to prevent Server from queuing it
161 fl::json nullResponse = fl::json::object();
162 nullResponse.set("__skip", true); // Marker to skip queueing
163 return nullResponse;
164 }
165
166 // For sync functions, remove request ID (not needed)
167 if (request.contains("method")) {
168 fl::string methodName = request["method"].as_string().value_or("");
169 mAsyncRequests.erase(methodName);
170 }
171
172 // Record result if successful
173 if (response.contains("result") && request.contains("method")) {
174 fl::string funcName = request["method"].as_string().value_or("");
175 recordResult(funcName, response["result"], 0, receivedAt, receivedAt, false);
176 }
177
178 return response;
179 } else {
180 // Scheduled execution - result will be pushed to ResponseSink after execution
181 scheduleFunction(timestamp, receivedAt, request);
182 FL_DBG("RPC: Scheduled function - result will be pushed after execution");
183
184 // Return acknowledgment with null result and "scheduled" marker
185 fl::json response = fl::json::object();
186 if (request.contains("id")) {
187 response.set("id", request["id"]);
188 }
189 response.set("result", fl::json(nullptr));
190 response.set("scheduled", true); // Marker to not queue this response
191 return response;
192 }
193}
194
195void Remote::scheduleFunction(u32 timestamp, u32 receivedAt, const fl::json& jsonRpcRequest) {
196 // Make explicit copy for capture (avoid reference issues)
197 fl::json requestCopy = jsonRpcRequest;
198 fl::string funcName = requestCopy["method"].as_string().value_or("unknown");
199
200 // Wrap RPC execution in lambda and schedule it
201 mScheduler.schedule(timestamp, [this, requestCopy, timestamp, receivedAt, funcName]() {
202 u32 executedAt = fl::millis();
203
204 // Execute JSON-RPC request
205 fl::json response = mRpc.handle(requestCopy);
206
207 // Record result with timing metadata
208 if (response.contains("result") && requestCopy.contains("method")) {
209 recordResult(funcName, response["result"], timestamp, receivedAt, executedAt, true);
210 }
211 });
212
213 FL_DBG("Scheduled RPC: " << funcName << " at " << timestamp);
214}
215
216void Remote::recordResult(const fl::string& funcName, const fl::json& result, u32 scheduledAt, u32 receivedAt, u32 executedAt, bool wasScheduled) {
217 mResults.push_back({funcName, result, scheduledAt, receivedAt, executedAt, wasScheduled});
218}
219
220// Update Loop
221
222size_t Remote::tick(u32 currentTimeMs) {
223 // Clear previous results
224 mResults.clear();
225
226 // Delegate to generic scheduler - tasks handle their own execution and result recording
227 return mScheduler.tick(currentTimeMs);
228}
229
230// Utility Methods
231
232size_t Remote::pendingCount() const {
233 return mScheduler.pendingCount();
234}
235
237 if ((flags & ClearFlags::Results) != ClearFlags::None) {
238 mResults.clear();
239 FL_DBG("Cleared RPC results");
240 }
241 if ((flags & ClearFlags::Scheduled) != ClearFlags::None) {
242 mScheduler.clear();
243 FL_DBG("Cleared scheduled RPC calls");
244 }
245 if ((flags & ClearFlags::Functions) != ClearFlags::None) {
246 mRpc.clear();
247 FL_DBG("Cleared registered RPC functions");
248 }
249}
250
251// Constructor
252
254 : Server(fl::move(source), fl::move(sink))
255{
256 // Set request handler to processRpc
257 setRequestHandler([this](const fl::json& request) {
258 return processRpc(request);
259 });
260
261 // Set response sink on Rpc for async ACKs
262 mRpc.setResponseSink([this](const fl::json& response) {
263 // Send response directly via Server's response sink
264 if (mResponseSink) {
265 mResponseSink(response);
266 }
267 });
268}
269
270// Server Coordination
271
272size_t Remote::update(u32 currentTimeMs) {
273 size_t processed = Server::pull(); // Pull requests from Server
274 size_t executed = tick(currentTimeMs); // Process scheduled tasks
275
276 // Push scheduled results as JSON-RPC responses
277 for (const auto& r : mResults) {
278 fl::json response = fl::json::object();
279 response.set("result", r.result);
280 // Note: We don't have the original request ID for scheduled calls
281 // This could be improved by storing the ID with RpcResult
282 mOutgoingQueue.push_back(response);
283 }
284
285 size_t sent = Server::push(); // Push responses from Server
286 return processed + executed + sent;
287}
288
289// Schema Methods
290
293
294 // Get flat JSON schema from underlying RPC
295 // Format: [["methodName", "returnType", [["param1", "type1"], ...]], ...]
296 fl::json jsonMethods = mRpc.methods();
297
298 if (!jsonMethods.is_array()) {
299 return result;
300 }
301
302 // Convert each flat method array to MethodInfo struct
303 for (fl::size i = 0; i < jsonMethods.size(); i++) {
304 fl::json method = jsonMethods[i];
305
306 if (!method.is_array() || method.size() < 3) {
307 continue; // Invalid method format
308 }
309
310 MethodInfo info;
311
312 // method[0] = method name
313 info.name = method[0].as_string().value_or("");
314
315 // method[1] = return type
316 info.returnType = method[1].as_string().value_or("void");
317
318 // method[2] = params array: [["param1", "type1"], ["param2", "type2"], ...]
319 if (method[2].is_array()) {
320 fl::json params = method[2];
321 for (fl::size j = 0; j < params.size(); j++) {
322 fl::json param = params[j];
323 if (param.is_array() && param.size() >= 2) {
324 ParamInfo paramInfo;
325 paramInfo.name = param[0].as_string().value_or("");
326 paramInfo.type = param[1].as_string().value_or("unknown");
327 info.params.push_back(fl::move(paramInfo));
328 }
329 }
330 }
331
332 // Flat schema doesn't include description/tags
333 info.description = "";
334 info.tags.clear();
335
336 result.push_back(fl::move(info));
337 }
338
339 return result;
340}
341
342} // namespace fl
int requestId
FastLED chrono implementation - duration types for time measurements.
void sendStreamFinal(const char *method, const fl::json &result)
Send final stream response for a streaming async method (ASYNC_STREAM mode) The request ID is automat...
void recordResult(const fl::string &funcName, const fl::json &result, u32 scheduledAt, u32 receivedAt, u32 executedAt, bool wasScheduled)
size_t tick(u32 currentTimeMs)
Process scheduled calls (call regularly)
fl::ParamInfo ParamInfo
Definition remote.h:50
size_t update(u32 currentTimeMs)
Main update: pull + tick + push (overrides Server::update)
fl::net::RpcScheduler mScheduler
Definition remote.h:209
bool has(const fl::string &name) const
Check if method is registered.
void scheduleFunction(u32 timestamp, u32 receivedAt, const fl::json &jsonRpcRequest)
size_t pendingCount() const
Get number of pending scheduled calls.
fl::MethodInfo MethodInfo
Definition remote.h:49
fl::unordered_map< fl::string, AsyncRequest > mAsyncRequests
Definition remote.h:201
fl::RemoteClearFlags ClearFlags
Definition remote.h:48
fl::vector< MethodInfo > methods() const
Get method information for all registered methods.
void reportError(const fl::string &message)
Send an error notification to the remote peer.
void sendAsyncResponse(const char *method, const fl::json &result)
Send async response for a previously-called async method The request ID is automatically retrieved fr...
void sendStreamUpdate(const char *method, const fl::json &update)
Send stream update for a streaming async method (ASYNC_STREAM mode) The request ID is automatically r...
fl::vector< RpcResult > mResults
Definition remote.h:212
bool unbind(const fl::string &name)
Unregister method by name.
fl::Rpc mRpc
Definition remote.h:206
void clear(ClearFlags flags)
Clear state (bitwise OR of ClearFlags)
fl::json processRpc(const fl::json &request)
Process JSON-RPC request (with optional "timestamp" field for scheduling) Returns JSON-RPC response: ...
Remote(RequestSource source, ResponseSink sink)
Construct with I/O callbacks.
size_t push()
Push queued responses to sink.
size_t update()
Main update: pull + push.
fl::vector< fl::json > mOutgoingQueue
Definition server.h:91
size_t pull()
Pull requests from source, process, queue responses.
fl::function< fl::optional< fl::json >()> RequestSource
Definition server.h:29
void setRequestHandler(RequestHandler handler)
Set request handler.
Server() FL_NOEXCEPT
Default constructor.
fl::function< void(const fl::json &)> ResponseSink
Definition server.h:30
ResponseSink mResponseSink
Definition server.h:89
const char * c_str() const FL_NOEXCEPT
fl::optional< i64 > as_int() const FL_NOEXCEPT
Definition json.h:255
bool is_array() const FL_NOEXCEPT
Definition json.h:246
bool is_int() const FL_NOEXCEPT
Definition json.h:240
size_t size() const FL_NOEXCEPT
Definition json.h:633
bool contains(size_t idx) const FL_NOEXCEPT
Definition json.h:625
fl::optional< fl::string > as_string() const FL_NOEXCEPT
Definition json.h:282
void set(const fl::string &key, const json &value) FL_NOEXCEPT
Definition json.h:701
static json object() FL_NOEXCEPT
Definition json.h:692
void clear() FL_NOEXCEPT
Definition vector.h:634
FastLED's Elegant JSON Library: fl::json
#define FL_WARN(X)
Definition log.h:276
#define FL_DBG
Definition log.h:388
Centralized logging categories for FastLED hardware interfaces and subsystems.
constexpr remove_reference< T >::type && move(T &&t) FL_NOEXCEPT
Definition s16x16x4.h:28
constexpr remove_reference< T >::type && move(T &&t) FL_NOEXCEPT
Definition move.h:28
constexpr int type_rank< T >::value
fl::u32 millis()
Universal millisecond timer - returns milliseconds since system startup.
expected< T, E > result
Alias for expected (Rust-style naming)
Definition result.h:31
Base definition for an LED controller.
Definition crgb.hpp:179
fl::vector< ParamInfo > params
Definition rpc.h:122
fl::vector< fl::string > tags
Definition rpc.h:125
fl::string name
Definition rpc.h:121
fl::string type
Definition rpc.h:114
fl::string description
Definition rpc.h:124
fl::string name
Definition rpc.h:113
fl::string returnType
Definition rpc.h:123