dbus ws: support websocket dbus server [1/1]

PD#SWPL-187465

Problem:
support websocket dbus server

Solution:
support websocket dbus server

Verify:
local

Change-Id: I8046fa3eeb1cea5e5a09d09993f3195707db78b0
Signed-off-by: Daogao Xu <daogao.xu@amlogic.com>
diff --git a/aml_dbus_ws_br/js/dbusws.js b/aml_dbus_ws_br/js/dbusws.js
new file mode 100644
index 0000000..07ea20d
--- /dev/null
+++ b/aml_dbus_ws_br/js/dbusws.js
@@ -0,0 +1,120 @@
+"use strict";
+
+class DbusInterface {
+    constructor(serv, obj, intf) {
+        this.interface = intf;
+        this.so = serv + " " + obj + " ";
+        this.soi = this.so + intf + " ";
+    }
+    call(ws, mem, sig, args) {
+        if (sig === undefined)
+            return ws.callJsonRpc(this.soi + mem);
+        return ws.callJsonRpc(this.soi + mem + " " + sig, args);
+    }
+    getProperty(ws, prop) {
+        return ws.callJsonRpc(this.so + "org.freedesktop.DBus.Properties Get ss", [this.interface, prop]);
+    }
+    setProperty(ws, prop, sig, val) {
+        return ws.callJsonRpc(this.so + "org.freedesktop.DBus.Properties Set ssv", [this.interface, prop, [sig, val]]);
+    }
+}
+
+class WebsocketDbus {
+    static method_id = 1;
+    static match_id = 1;
+    constructor(url) {
+        this.ws = new WebSocket(url, "ambus-ws-br");
+        this.ws.addEventListener("open", ev => {
+            this.pending_call.forEach(e => {
+                this.logv(e);
+                this.ws.send(JSON.stringify(e));
+            });
+            this.pending_call = [];
+        });
+        this.ws.addEventListener("error", ev => {
+            console.log("WebSocket error", ev);
+            if (this.onError)
+                this.onError(ev);
+        });
+        this.ws.addEventListener("close", ev => {
+            console.log("WebSocket close", ev);
+            if (this.onClose)
+                this.onClose(ev);
+        });
+        this.ws.addEventListener("message", this.handleMsg.bind(this));
+        this.call_queue = {};
+        this.match_queue = {};
+        this.pending_call = [];
+    }
+    handleMsg(msg) {
+        this.logv(msg.data);
+        let obj = JSON.parse(msg.data);
+        if ("id" in obj && obj.id in this.call_queue) {
+            let p = this.call_queue[obj.id];
+            delete this.call_queue[obj.id];
+            if (obj.error)
+                p[1](obj);
+            else
+                p[0](obj);
+        } else if ("mid" in obj && obj.mid in this.match_queue) {
+            let m = this.match_queue[obj.mid];
+            m.cb(obj);
+        }
+    }
+    logv(str) {
+        //console.log(str);
+    }
+    callJsonRpc(method, args) {
+        return new Promise((resolve, reject) => {
+            let jrpc = { "jsonrpc": "2.0", "method": method, "id": WebsocketDbus.method_id++ };
+            if (args !== undefined)
+                jrpc.params = args;
+            if (this.ws.readyState == WebSocket.OPEN) {
+                this.call_queue[jrpc.id] = [resolve, reject];
+                this.logv(jrpc);
+                this.ws.send(JSON.stringify(jrpc));
+            } else if (this.ws.readyState == WebSocket.CONNECTING) {
+                this.call_queue[jrpc.id] = [resolve, reject];
+                this.pending_call.push(jrpc);
+            } else {
+                reject("websocket wrong state:" + this.ws.readyState);
+            }
+        });
+    }
+    callDbus(serv, obj, intf, member, sig, args) {
+        let method = serv + " " + obj + " " + intf + " " + member;
+        if (sig)
+            method += " " + sig;
+        return this.callJsonRpc(method, args);
+    }
+    getProperty(serv, obj, intf, prop) {
+        return this.callJsonRpc(serv + " " + obj + " org.freedesktop.DBus.Properties Get ss", [intf, prop]);
+    }
+    setProperty(serv, obj, intf, prop, sig, val) {
+        return this.callJsonRpc(serv + " " + obj + " org.freedesktop.DBus.Properties Set ssv", [intf, prop, [sig, val]]);
+    }
+    addMatch(match, cb) {
+        let obj = { "id": WebsocketDbus.match_id++, "cb": cb };
+        this.callJsonRpc("addMatch", match).then(r => {
+            obj.mid = r.result;
+            this.match_queue[obj.mid] = obj;
+        });
+        return obj.id;
+    }
+    addObject(path, cb) {
+        let obj = { "id": WebsocketDbus.match_id++, "cb": cb };
+        this.callJsonRpc("addObject", path).then(r => {
+            obj.mid = r.result;
+            this.match_queue[obj.mid] = obj;
+        });
+        return obj.id;
+    }
+    removeMatch(id) {
+        if (id in this.match_queue) {
+            let ar = this.match_queue[id];
+            delete this.match_queue[id];
+            this.callJsonRpc("removeMatch", ar.mid);
+        } else
+            this.logv("unable to find match id " + id);
+    }
+}
diff --git a/aml_dbus_ws_br/jsonrpc-dbus.c b/aml_dbus_ws_br/jsonrpc-dbus.c
index bd9c98e..f9cc25e 100644
--- a/aml_dbus_ws_br/jsonrpc-dbus.c
+++ b/aml_dbus_ws_br/jsonrpc-dbus.c
@@ -71,11 +71,19 @@
   return m;
 }
 
+struct dbus_object_method {
+  struct dbus_object_method *next;
+  sd_bus_message *reply;
+  struct dbus_maches *match;
+  uint64_t cookie;
+};
+
 struct json_rpc_handle {
   struct json_rpc_ws_session *pss;
   sd_bus *bus;
   struct dbus_method_call *pending_call;
   struct dbus_maches *matches;
+  struct dbus_object_method *pending_object_call;
   // struct json_rpc_custom_method *method_table;
 };
 
@@ -556,6 +564,7 @@
   len += jrpc_append_reply_string(jrpc, errmsg);
   len += jrpc_append_reply(jrpc, "}}");
   json_rpc_reply_buffer_done(jrpc->pss);
+  lwsl_warn("json_rpc_reply_error id:%" PRId64 " errcode:%d errmsg:%s\n", id, errcode, errmsg);
 }
 
 static int dbus_call_done(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
@@ -629,24 +638,113 @@
   return r;
 }
 
+static int dbus_on_object_msg(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+  struct dbus_maches *c = (struct dbus_maches *)userdata;
+  struct json_rpc_handle *jrpc = c->jrpc;
+  uint64_t cookie = 0LL;
+  sd_bus_message *reply = NULL;
+  if (sd_bus_message_get_cookie(m, &cookie) < 0 || sd_bus_message_new_method_return(m, &reply) < 0) {
+    lwsl_info("object msg fail to get cookie or reply\n");
+    return 0;
+  }
+  int len = jrpc_append_reply(jrpc,
+                              "{\"jsonrpc\":\"2.0\",\"mid\":%" PRId64 ",\"cookie\":\"%" PRIu64
+                              "\",\"destination\":\"%s\",\"sender\":\"%s\",\"path\":\"%s\",\"interface\":"
+                              "\"%s\",\"member\":\"%s\",\"signature\":\"%s\",\"params\":",
+                              c->seq, cookie, sd_bus_message_get_destination(m), sd_bus_message_get_sender(m),
+                              sd_bus_message_get_path(m), sd_bus_message_get_interface(m), sd_bus_message_get_member(m),
+                              sd_bus_message_get_signature(m, 1));
+  len += msg_to_json_string(jrpc, m);
+  len += jrpc_append_reply(jrpc, "}");
+  json_rpc_reply_buffer_done(jrpc->pss);
+
+  struct dbus_object_method *om = malloc(sizeof(*om));
+  om->next = jrpc->pending_object_call;
+  om->cookie = cookie;
+  om->reply = reply;
+  om->match = c;
+  jrpc->pending_object_call = om;
+  return 1;
+}
+
+static int json_rpc_call_add_object(struct json_rpc_handle *jrpc, uint64_t id, const char *path) {
+  struct dbus_maches *c = calloc(1, sizeof(*c));
+  c->jrpc = jrpc;
+  c->id = INT64_MIN;
+  c->seq = call_id++;
+  int r = sd_bus_add_object(jrpc->bus, &c->slot, path, dbus_on_object_msg, c);
+  if (r < 0) {
+    lwsl_warn("json_rpc_call_add_object fail %d %s\n", r, strerror(-r));
+    json_rpc_reply_error(jrpc, id, -32603, "add_object fail");
+    free(c);
+  } else {
+    lwsl_info("add object %" PRId64 " %s\n", c->seq, path);
+    c->next = jrpc->matches;
+    jrpc->matches = c;
+    json_rpc_reply_simple(c->jrpc, id, "%" PRId64, c->seq);
+    return 0;
+  }
+  return r;
+}
+
+static int json_rpc_call_send_reply(struct json_rpc_handle *jrpc, uint64_t id, struct json_value_base *params) {
+  struct json_value_base *jv_cookie = lws_json_object_get(params, "cookie");
+  struct json_value_base *jv_sig = lws_json_object_get(params, "signature");
+  struct json_value_base *jv_data = lws_json_object_get(params, "data");
+  if (jv_cookie == NULL || jv_cookie->type != json_vt_string) {
+    goto not_found;
+  } else {
+    uint64_t cookie = strtoull(JV_CAST(string, jv_cookie)->val, NULL, 0);
+    struct dbus_object_method *om = jrpc->pending_object_call, *prev = NULL;
+    for (; om && om->cookie != cookie; prev = om, om = om->next)
+      ;
+    if (om == NULL)
+      goto not_found;
+    if (prev)
+      prev->next = om->next;
+    else
+      jrpc->pending_object_call = om->next;
+    int r;
+    if (jv_sig && jv_sig->type == json_vt_string && jv_data) {
+      const char *sig = JV_CAST(string, jv_sig)->val;
+      r = msg_from_json(om->reply, sig, jv_data);
+      if (r < 0) {
+        lwsl_warn("fail to create dbus message with sig %s from json (%d %s):\n", sig, r, strerror(-r));
+        lws_json_val_dump(stderr, jv_data, 0);
+        goto param_error;
+      }
+    }
+    r = sd_bus_send(jrpc->bus, om->reply, NULL);
+    if (r < 0)
+      json_rpc_reply_error(jrpc, id, -32603, strerror(-r));
+    else
+      json_rpc_reply_simple(jrpc, id, "%d", 0);
+    sd_bus_message_unref(om->reply);
+    free(om);
+  }
+  return 0;
+not_found:
+  json_rpc_reply_error(jrpc, id, -32603, "sendReply cookie not found");
+  return 0;
+param_error:
+  json_rpc_reply_error(jrpc, id, -32603, "sendReply signature and data format wrong");
+  return 0;
+}
+
 static int json_rpc_call_remove_match(struct json_rpc_handle *jrpc, uint64_t id, uint64_t seq) {
-  struct dbus_maches *c = jrpc->matches, *prev;
+  struct dbus_maches *c = jrpc->matches, *prev = NULL;
+  for (; c && c->seq != seq; prev = c, c = c->next)
+    ;
   if (c == NULL)
     goto match_not_install;
-  if (c->seq == seq)
+  if (prev == NULL)
     jrpc->matches = c->next;
-  else {
-    for (prev = c; ((c = prev->next) != NULL) && c->seq != seq; prev = prev->next)
-      ;
-    if (c)
-      prev->next = c->next;
-    else
-      goto match_not_install;
-  }
+  else
+    prev->next = c->next;
   sd_bus_slot_unref(c->slot);
   lwsl_info("remove match %" PRId64 "\n", seq);
+  json_rpc_reply_simple(c->jrpc, id, "%d", 0);
   free(c);
-  json_rpc_reply_simple(c->jrpc, c->id, "%d", 0);
   return 0;
 match_not_install:
   lwsl_warn("match %" PRId64 " does not install\n", seq);
@@ -683,6 +781,12 @@
     return json_rpc_call_add_match(jrpc, id_val, JV_CAST(string, params)->val);
   } else if (strcmp(method_str, "removeMatch") == 0 && params && params->type == json_vt_int) {
     return json_rpc_call_remove_match(jrpc, id_val, JV_CAST(int, params)->val);
+  } else if (strcmp(method_str, "addObject") == 0 && params && params->type == json_vt_string) {
+    return json_rpc_call_add_object(jrpc, id_val, JV_CAST(string, params)->val);
+  } else if (strcmp(method_str, "removeObject") == 0 && params && params->type == json_vt_int) {
+    return json_rpc_call_remove_match(jrpc, id_val, JV_CAST(int, params)->val);
+  } else if (strcmp(method_str, "sendReply") == 0 && params) {
+    return json_rpc_call_send_reply(jrpc, id_val, params);
   }
 
   char *dst = strtok_r(method_str, " ", &savedptr);
@@ -750,6 +854,12 @@
     sd_bus_slot_unref(c->slot);
     free(c);
   }
+  for (struct dbus_object_method *c = jrpc->pending_object_call, *n; c; c = n) {
+    n = c->next;
+    if (c->reply)
+      sd_bus_message_unref(c->reply);
+    free(c);
+  }
   free(jrpc);
 }