dbus ws: support websocket dbus server [1/1]
PD#SWPL-187465
Problem:
support websocket dbus server
Solution:
support websocket dbus server
Verify:
local
Change-Id: I8046fa3eeb1cea5e5a09d09993f3195707db78b0
Signed-off-by: Daogao Xu <daogao.xu@amlogic.com>
diff --git a/aml_dbus_ws_br/js/dbusws.js b/aml_dbus_ws_br/js/dbusws.js
new file mode 100644
index 0000000..07ea20d
--- /dev/null
+++ b/aml_dbus_ws_br/js/dbusws.js
@@ -0,0 +1,120 @@
+"use strict";
+
+class DbusInterface {
+ constructor(serv, obj, intf) {
+ this.interface = intf;
+ this.so = serv + " " + obj + " ";
+ this.soi = this.so + intf + " ";
+ }
+ call(ws, mem, sig, args) {
+ if (sig === undefined)
+ return ws.callJsonRpc(this.soi + mem);
+ return ws.callJsonRpc(this.soi + mem + " " + sig, args);
+ }
+ getProperty(ws, prop) {
+ return ws.callJsonRpc(this.so + "org.freedesktop.DBus.Properties Get ss", [this.interface, prop]);
+ }
+ setProperty(ws, prop, sig, val) {
+ return ws.callJsonRpc(this.so + "org.freedesktop.DBus.Properties Set ssv", [this.interface, prop, [sig, val]]);
+ }
+}
+
+class WebsocketDbus {
+ static method_id = 1;
+ static match_id = 1;
+ constructor(url) {
+ this.ws = new WebSocket(url, "ambus-ws-br");
+ this.ws.addEventListener("open", ev => {
+ this.pending_call.forEach(e => {
+ this.logv(e);
+ this.ws.send(JSON.stringify(e));
+ });
+ this.pending_call = [];
+ });
+ this.ws.addEventListener("error", ev => {
+ console.log("WebSocket error", ev);
+ if (this.onError)
+ this.onError(ev);
+ });
+ this.ws.addEventListener("close", ev => {
+ console.log("WebSocket close", ev);
+ if (this.onClose)
+ this.onClose(ev);
+ });
+ this.ws.addEventListener("message", this.handleMsg.bind(this));
+ this.call_queue = {};
+ this.match_queue = {};
+ this.pending_call = [];
+ }
+ handleMsg(msg) {
+ this.logv(msg.data);
+ let obj = JSON.parse(msg.data);
+ if ("id" in obj && obj.id in this.call_queue) {
+ let p = this.call_queue[obj.id];
+ delete this.call_queue[obj.id];
+ if (obj.error)
+ p[1](obj);
+ else
+ p[0](obj);
+ } else if ("mid" in obj && obj.mid in this.match_queue) {
+ let m = this.match_queue[obj.mid];
+ m.cb(obj);
+ }
+ }
+ logv(str) {
+ //console.log(str);
+ }
+ callJsonRpc(method, args) {
+ return new Promise((resolve, reject) => {
+ let jrpc = { "jsonrpc": "2.0", "method": method, "id": WebsocketDbus.method_id++ };
+ if (args !== undefined)
+ jrpc.params = args;
+ if (this.ws.readyState == WebSocket.OPEN) {
+ this.call_queue[jrpc.id] = [resolve, reject];
+ this.logv(jrpc);
+ this.ws.send(JSON.stringify(jrpc));
+ } else if (this.ws.readyState == WebSocket.CONNECTING) {
+ this.call_queue[jrpc.id] = [resolve, reject];
+ this.pending_call.push(jrpc);
+ } else {
+ reject("websocket wrong state:" + this.ws.readyState);
+ }
+ });
+ }
+ callDbus(serv, obj, intf, member, sig, args) {
+ let method = serv + " " + obj + " " + intf + " " + member;
+ if (sig)
+ method += " " + sig;
+ return this.callJsonRpc(method, args);
+ }
+ getProperty(serv, obj, intf, prop) {
+ return this.callJsonRpc(serv + " " + obj + " org.freedesktop.DBus.Properties Get ss", [intf, prop]);
+ }
+ setProperty(serv, obj, intf, prop, sig, val) {
+ return this.callJsonRpc(serv + " " + obj + " org.freedesktop.DBus.Properties Set ssv", [intf, prop, [sig, val]]);
+ }
+ addMatch(match, cb) {
+ let obj = { "id": WebsocketDbus.match_id++, "cb": cb };
+ this.callJsonRpc("addMatch", match).then(r => {
+ obj.mid = r.result;
+ this.match_queue[obj.mid] = obj;
+ });
+ return obj.id;
+ }
+ addObject(path, cb) {
+ let obj = { "id": WebsocketDbus.match_id++, "cb": cb };
+ this.callJsonRpc("addObject", path).then(r => {
+ obj.mid = r.result;
+ this.match_queue[obj.mid] = obj;
+ });
+ return obj.id;
+ }
+ removeMatch(id) {
+ if (id in this.match_queue) {
+ let ar = this.match_queue[id];
+ delete this.match_queue[id];
+ this.callJsonRpc("removeMatch", ar.mid);
+ } else
+ this.logv("unable to find match id " + id);
+ }
+}
diff --git a/aml_dbus_ws_br/jsonrpc-dbus.c b/aml_dbus_ws_br/jsonrpc-dbus.c
index bd9c98e..f9cc25e 100644
--- a/aml_dbus_ws_br/jsonrpc-dbus.c
+++ b/aml_dbus_ws_br/jsonrpc-dbus.c
@@ -71,11 +71,19 @@
return m;
}
+struct dbus_object_method {
+ struct dbus_object_method *next;
+ sd_bus_message *reply;
+ struct dbus_maches *match;
+ uint64_t cookie;
+};
+
struct json_rpc_handle {
struct json_rpc_ws_session *pss;
sd_bus *bus;
struct dbus_method_call *pending_call;
struct dbus_maches *matches;
+ struct dbus_object_method *pending_object_call;
// struct json_rpc_custom_method *method_table;
};
@@ -556,6 +564,7 @@
len += jrpc_append_reply_string(jrpc, errmsg);
len += jrpc_append_reply(jrpc, "}}");
json_rpc_reply_buffer_done(jrpc->pss);
+ lwsl_warn("json_rpc_reply_error id:%" PRId64 " errcode:%d errmsg:%s\n", id, errcode, errmsg);
}
static int dbus_call_done(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
@@ -629,24 +638,113 @@
return r;
}
+static int dbus_on_object_msg(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+ struct dbus_maches *c = (struct dbus_maches *)userdata;
+ struct json_rpc_handle *jrpc = c->jrpc;
+ uint64_t cookie = 0LL;
+ sd_bus_message *reply = NULL;
+ if (sd_bus_message_get_cookie(m, &cookie) < 0 || sd_bus_message_new_method_return(m, &reply) < 0) {
+ lwsl_info("object msg fail to get cookie or reply\n");
+ return 0;
+ }
+ int len = jrpc_append_reply(jrpc,
+ "{\"jsonrpc\":\"2.0\",\"mid\":%" PRId64 ",\"cookie\":\"%" PRIu64
+ "\",\"destination\":\"%s\",\"sender\":\"%s\",\"path\":\"%s\",\"interface\":"
+ "\"%s\",\"member\":\"%s\",\"signature\":\"%s\",\"params\":",
+ c->seq, cookie, sd_bus_message_get_destination(m), sd_bus_message_get_sender(m),
+ sd_bus_message_get_path(m), sd_bus_message_get_interface(m), sd_bus_message_get_member(m),
+ sd_bus_message_get_signature(m, 1));
+ len += msg_to_json_string(jrpc, m);
+ len += jrpc_append_reply(jrpc, "}");
+ json_rpc_reply_buffer_done(jrpc->pss);
+
+ struct dbus_object_method *om = malloc(sizeof(*om));
+ om->next = jrpc->pending_object_call;
+ om->cookie = cookie;
+ om->reply = reply;
+ om->match = c;
+ jrpc->pending_object_call = om;
+ return 1;
+}
+
+static int json_rpc_call_add_object(struct json_rpc_handle *jrpc, uint64_t id, const char *path) {
+ struct dbus_maches *c = calloc(1, sizeof(*c));
+ c->jrpc = jrpc;
+ c->id = INT64_MIN;
+ c->seq = call_id++;
+ int r = sd_bus_add_object(jrpc->bus, &c->slot, path, dbus_on_object_msg, c);
+ if (r < 0) {
+ lwsl_warn("json_rpc_call_add_object fail %d %s\n", r, strerror(-r));
+ json_rpc_reply_error(jrpc, id, -32603, "add_object fail");
+ free(c);
+ } else {
+ lwsl_info("add object %" PRId64 " %s\n", c->seq, path);
+ c->next = jrpc->matches;
+ jrpc->matches = c;
+ json_rpc_reply_simple(c->jrpc, id, "%" PRId64, c->seq);
+ return 0;
+ }
+ return r;
+}
+
+static int json_rpc_call_send_reply(struct json_rpc_handle *jrpc, uint64_t id, struct json_value_base *params) {
+ struct json_value_base *jv_cookie = lws_json_object_get(params, "cookie");
+ struct json_value_base *jv_sig = lws_json_object_get(params, "signature");
+ struct json_value_base *jv_data = lws_json_object_get(params, "data");
+ if (jv_cookie == NULL || jv_cookie->type != json_vt_string) {
+ goto not_found;
+ } else {
+ uint64_t cookie = strtoull(JV_CAST(string, jv_cookie)->val, NULL, 0);
+ struct dbus_object_method *om = jrpc->pending_object_call, *prev = NULL;
+ for (; om && om->cookie != cookie; prev = om, om = om->next)
+ ;
+ if (om == NULL)
+ goto not_found;
+ if (prev)
+ prev->next = om->next;
+ else
+ jrpc->pending_object_call = om->next;
+ int r;
+ if (jv_sig && jv_sig->type == json_vt_string && jv_data) {
+ const char *sig = JV_CAST(string, jv_sig)->val;
+ r = msg_from_json(om->reply, sig, jv_data);
+ if (r < 0) {
+ lwsl_warn("fail to create dbus message with sig %s from json (%d %s):\n", sig, r, strerror(-r));
+ lws_json_val_dump(stderr, jv_data, 0);
+ goto param_error;
+ }
+ }
+ r = sd_bus_send(jrpc->bus, om->reply, NULL);
+ if (r < 0)
+ json_rpc_reply_error(jrpc, id, -32603, strerror(-r));
+ else
+ json_rpc_reply_simple(jrpc, id, "%d", 0);
+ sd_bus_message_unref(om->reply);
+ free(om);
+ }
+ return 0;
+not_found:
+ json_rpc_reply_error(jrpc, id, -32603, "sendReply cookie not found");
+ return 0;
+param_error:
+ json_rpc_reply_error(jrpc, id, -32603, "sendReply signature and data format wrong");
+ return 0;
+}
+
static int json_rpc_call_remove_match(struct json_rpc_handle *jrpc, uint64_t id, uint64_t seq) {
- struct dbus_maches *c = jrpc->matches, *prev;
+ struct dbus_maches *c = jrpc->matches, *prev = NULL;
+ for (; c && c->seq != seq; prev = c, c = c->next)
+ ;
if (c == NULL)
goto match_not_install;
- if (c->seq == seq)
+ if (prev == NULL)
jrpc->matches = c->next;
- else {
- for (prev = c; ((c = prev->next) != NULL) && c->seq != seq; prev = prev->next)
- ;
- if (c)
- prev->next = c->next;
- else
- goto match_not_install;
- }
+ else
+ prev->next = c->next;
sd_bus_slot_unref(c->slot);
lwsl_info("remove match %" PRId64 "\n", seq);
+ json_rpc_reply_simple(c->jrpc, id, "%d", 0);
free(c);
- json_rpc_reply_simple(c->jrpc, c->id, "%d", 0);
return 0;
match_not_install:
lwsl_warn("match %" PRId64 " does not install\n", seq);
@@ -683,6 +781,12 @@
return json_rpc_call_add_match(jrpc, id_val, JV_CAST(string, params)->val);
} else if (strcmp(method_str, "removeMatch") == 0 && params && params->type == json_vt_int) {
return json_rpc_call_remove_match(jrpc, id_val, JV_CAST(int, params)->val);
+ } else if (strcmp(method_str, "addObject") == 0 && params && params->type == json_vt_string) {
+ return json_rpc_call_add_object(jrpc, id_val, JV_CAST(string, params)->val);
+ } else if (strcmp(method_str, "removeObject") == 0 && params && params->type == json_vt_int) {
+ return json_rpc_call_remove_match(jrpc, id_val, JV_CAST(int, params)->val);
+ } else if (strcmp(method_str, "sendReply") == 0 && params) {
+ return json_rpc_call_send_reply(jrpc, id_val, params);
}
char *dst = strtok_r(method_str, " ", &savedptr);
@@ -750,6 +854,12 @@
sd_bus_slot_unref(c->slot);
free(c);
}
+ for (struct dbus_object_method *c = jrpc->pending_object_call, *n; c; c = n) {
+ n = c->next;
+ if (c->reply)
+ sd_bus_message_unref(c->reply);
+ free(c);
+ }
free(jrpc);
}