audio_client: Use namespace PID as stream ID. [1/1]

PD#SWPL-99067

Problem:
In container mode the getpid() returns PID in
local namespace so it's not global unique.

Solution:
Use pid from root namespace to construct audio
stream ID.

Verify:
Verify from audio server side log in logcat
and see the audio stream name is with root
pid of client.

Change-Id: I0068a1fd629c237ef634ca913f7b311b30b99593
Signed-off-by: Tim Yao <tim.yao@amlogic.com>
diff --git a/src/audio_client.cpp b/src/audio_client.cpp
index aac5e45..bbc906d 100644
--- a/src/audio_client.cpp
+++ b/src/audio_client.cpp
@@ -328,7 +328,7 @@
 {
   TRACE_ENTRY();
   ClientContext context;
-  StatusReturn r;
+  DeviceOpenStreamReturn r;
   Status status = stub_->Device_open_output_stream(&context,
       MakeOpenOutputStream(new_stream_name(stream_out->name, sizeof(stream_out->name)),
                       kSharedBufferSize,
@@ -337,6 +337,7 @@
                       flags,
                       config,
                       address), &r);
+  update_stream_name(stream_out->name, sizeof(stream_out->name), r.client_id());
   return r.ret();
 }
 
@@ -362,7 +363,7 @@
 {
   TRACE_ENTRY();
   ClientContext context;
-  StatusReturn r;
+  DeviceOpenStreamReturn r;
   Status status = stub_->Device_open_input_stream(&context,
       MakeOpenInputStream(new_stream_name(stream_in->name, sizeof(stream_in->name)),
                       kSharedBufferSize,
@@ -372,6 +373,7 @@
                       flags,
                       address,
                       source), &r);
+  update_stream_name(stream_in->name, sizeof(stream_in->name), r.client_id());
   return r.ret();
 }
 
diff --git a/src/audio_client.h b/src/audio_client.h
index e9f5d53..ccaae59 100644
--- a/src/audio_client.h
+++ b/src/audio_client.h
@@ -5,6 +5,7 @@
 #include <cstdlib>
 #include <iomanip>
 #include <unistd.h>
+#include <fstream>
 
 #include <hardware/hardware.h>
 #include <hardware/audio.h>
@@ -162,11 +163,20 @@
     std::string new_stream_name(char *name, size_t size) {
       int pid = ::getpid();
       int seq = (stream_seq_++);
-      printf("pid=%d seq=%d\n", pid, seq);
       snprintf(name, size, "%d-%d", pid, seq);
-      printf("name = %s\n", name);
+      printf("pid=%d seq=%d name=%s\n", pid, seq, name);
       return std::string(name);
     }
+    void update_stream_name(char *name, size_t size, int id) {
+      int pid = ::getpid();
+      if ((id != pid) && (id > 0)) {
+        int seq;
+        if (sscanf(name, "%d-%d", &pid, &seq) == 2) {
+          printf("vpid %d -> pid %d\n", pid, id);
+          snprintf(name, size, "%d-%d", id, seq);
+        }
+      }
+    }
 
     std::unique_ptr<AudioService::Stub> stub_;
     static std::atomic_int stream_seq_;
diff --git a/src/audio_server.cpp b/src/audio_server.cpp
index eb3864e..d85d4da 100644
--- a/src/audio_server.cpp
+++ b/src/audio_server.cpp
@@ -4,6 +4,10 @@
 #include <unistd.h>
 #include <sys/stat.h>
 #include <fcntl.h>
+#include <grp.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
 
 #include <boost/interprocess/managed_shared_memory.hpp>
 #include <cstdlib>
@@ -21,6 +25,7 @@
 #include <grpcpp/server.h>
 #include <grpcpp/server_builder.h>
 #include <grpcpp/server_context.h>
+#include <grpcpp/server_posix.h>
 #include <grpcpp/security/server_credentials.h>
 #include <IpcBuffer/audio_server_shmem.h>
 #include <IpcBuffer/IpcBuffer.h>
@@ -220,10 +225,24 @@
       return Status::OK;
     }
 
-    Status Device_open_output_stream(ServerContext* context, const OpenOutputStream* request, StatusReturn* response) {
+    Status Device_open_output_stream(ServerContext* context, const OpenOutputStream* request, DeviceOpenStreamReturn* response) {
       TRACE_ENTRY();
       if (!dev_) return Status::CANCELLED;
 
+      int vpid, seq;
+      if (sscanf(request->name().c_str(), "%d-%d", &vpid, &seq) != 2) {
+        response->set_client_id(-1);
+        return Status::CANCELLED;
+      }
+      int client_id = get_client_pid_(context);
+      if (client_id < 0) {
+        response->set_client_id(-1);
+        return Status::CANCELLED;
+      }
+      char buf[32];
+      snprintf(buf, sizeof(buf), "%d-%d", client_id, seq);
+      std::string stream_id(buf);
+
       struct audio_stream_out *stream = nullptr;
       struct audio_config config;
       config.sample_rate = request->config().sample_rate();
@@ -240,18 +259,19 @@
                           &config,
                           &stream,
                           request->address().c_str()));
+      response->set_client_id(client_id);
 
       if (stream) {
-        IpcBuffer * cb = shm_->find<IpcBuffer>(request->name().c_str()).first;
+        IpcBuffer * cb = shm_->find<IpcBuffer>(stream_id.c_str()).first;
         if (cb == nullptr) {
-          cb = shm_->construct<IpcBuffer>(request->name().c_str())(request->name().c_str(), request->size());
-          std::cout << "[AudioServer] open stream pid-seq " << request->name().c_str() << std::endl;
-          ALOGI("%s pid-seq %s\n", __func__, request->name().c_str());
+          cb = shm_->construct<IpcBuffer>(stream_id.c_str())(stream_id.c_str(), request->size());
+          std::cout << "[AudioServer] open stream pid-seq " << stream_id.c_str() << std::endl;
+          ALOGI("%s pid-seq %s\n", __func__, stream_id.c_str());
         }
 
         std::lock_guard<std::mutex> lock(map_out_mutex_);
         streamout_map_.insert(
-          std::pair<const std::string, streamout_map_t>(request->name(), streamout_map_t(cb, stream)));
+          std::pair<const std::string, streamout_map_t>(stream_id, streamout_map_t(cb, stream)));
       }
 
       return Status::OK;
@@ -280,10 +300,22 @@
       return Status::OK;
     }
 
-    Status Device_open_input_stream(ServerContext* context, const OpenInputStream* request, StatusReturn* response) {
+    Status Device_open_input_stream(ServerContext* context, const OpenInputStream* request, DeviceOpenStreamReturn* response) {
       TRACE_ENTRY();
       if (!dev_) return Status::CANCELLED;
 
+      int vpid, seq;
+      if (sscanf(request->name().c_str(), "%d-%d", &vpid, &seq) != 2) {
+        return Status::CANCELLED;
+      }
+      int client_id = get_client_pid_(context);
+      if (client_id < 0) {
+        return Status::CANCELLED;
+      }
+      char buf[32];
+      snprintf(buf, sizeof(buf), "%d-%d", client_id, seq);
+      std::string stream_id(buf);
+
       struct audio_stream_in *stream = nullptr;
       struct audio_config config;
       config.sample_rate = request->config().sample_rate();
@@ -301,15 +333,16 @@
                           (audio_input_flags_t)(request->flags()),
                           request->address().c_str(),
                           (audio_source_t)(request->source())));
+      response->set_client_id(client_id);
 
       if (stream) {
-        IpcBuffer * cb = shm_->find<IpcBuffer>(request->name().c_str()).first;
+        IpcBuffer * cb = shm_->find<IpcBuffer>(stream_id.c_str()).first;
         if (cb == nullptr)
-          cb = shm_->construct<IpcBuffer>(request->name().c_str())(request->name().c_str(), request->size());
+          cb = shm_->construct<IpcBuffer>(stream_id.c_str())(request->name().c_str(), request->size());
 
         std::lock_guard<std::mutex> lock(map_in_mutex_);
         streamin_map_.insert(
-          std::pair<const std::string, streamin_map_t>(request->name(), streamin_map_t(cb, stream)));
+          std::pair<const std::string, streamin_map_t>(stream_id, streamin_map_t(cb, stream)));
       }
 
       return Status::OK;
@@ -767,6 +800,39 @@
     }
 
   private:
+    static int get_client_pid_(grpc::ServerContext* context)
+    {
+       ALOGI("peer %s", context->peer().c_str());
+
+       // get the client socket fd
+       int socketFd = -1;
+       if ((sscanf(context->peer().c_str(), "fd:%d", &socketFd) != 1) || (socketFd < 0)) {
+           ALOGE("Error: failed to get fd of the client");
+           return -1;
+       }
+
+       // sanity check it's an actual fd by dup'ing it
+       int duppedFd = fcntl(socketFd, F_DUPFD_CLOEXEC, 3);
+       if (duppedFd < 0) {
+         ALOGE("Error: failed to dup client socket fd (%d - %s)", errno, strerror(errno));
+         return -1;
+       }
+
+       struct ucred cred;
+       socklen_t credLen = sizeof(cred);
+       if (getsockopt(duppedFd, SOL_SOCKET, SO_PEERCRED, &cred, &credLen) < 0) {
+         ALOGE("Error: failed to get creds from client socket (%d - %s)", errno, strerror(errno));
+         cred.pid = -1;
+       }
+
+       if (close(duppedFd) != 0) {
+         ALOGE("Error: failed to close dup'd socket (%d - %s)", errno, strerror(errno));
+       }
+
+       return cred.pid;
+    }
+
+
     void streamout_gc_()
     {
       std::lock_guard<std::mutex> lock_out(map_out_mutex_);
@@ -890,6 +956,69 @@
   chmod(file_path, (buf.st_mode & 0711) | 0066);
 }
 
+static bool runListenerLoop(const std::unique_ptr<grpc::Server> &server, const std::string &address)
+{
+  if (address.find("unix://") != 0) {
+    ALOGE("Error: must be an 'unix://' server address");
+    return false;
+  }
+
+  std::string socketPath = address.substr(strlen("unix://"));
+
+  struct stat st;
+  if ((stat(socketPath.c_str(), &st) == 0) && ((st.st_mode & S_IFMT) == S_IFSOCK)) {
+    unlink(socketPath.c_str());
+  }
+
+  int serverSock = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
+  if (serverSock < 0) {
+    ALOGE("Error: failed to create unix socket (%d - %s)", errno, strerror(errno));
+    return false;
+  }
+
+  struct sockaddr_un serverAddr;
+  memset(&serverAddr, 0, sizeof(serverAddr));
+  serverAddr.sun_family = AF_UNIX;
+  strncpy(serverAddr.sun_path, socketPath.c_str(), sizeof(serverAddr.sun_path) - 1);
+
+  if (bind(serverSock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0) {
+    ALOGE("Error: failed to bind to unix socket @ '%s' (%d - %s)",
+          socketPath.c_str(), errno, strerror(errno));
+    return false;
+  }
+
+  SetAudioPermissions(socketPath.c_str());
+
+  if (listen(serverSock, 5) == -1) {
+    ALOGE("Error: failed to set socket to listening mode (%d - %s)",
+          errno, strerror(errno));
+    return false;
+  }
+
+  for (;;) {
+    struct sockaddr_un clientAddr;
+    socklen_t clientAddrLen = sizeof(clientAddr);
+    int clientSock = TEMP_FAILURE_RETRY(accept4(serverSock,(struct sockaddr *) &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC));
+    if (clientSock < 0) {
+      ALOGE("Error: accepting new connection (%d - %s)", errno, strerror(errno));
+      continue;
+    }
+
+    struct ucred cred;
+    socklen_t credLen = sizeof(cred);
+    if (getsockopt(clientSock, SOL_SOCKET, SO_PEERCRED, &cred, &credLen) < 0) {
+      ALOGE("Error: failed to get creds from client socket (%d - %s)",
+            errno, strerror(errno));
+      close(clientSock);
+      continue;
+    }
+
+    grpc::AddInsecureChannelFromFd(server.get(), clientSock);
+  }
+
+  return true;
+}
+
 void RunServer()
 {
   const char *url = std::getenv("AUDIO_SERVER_SOCKET");
@@ -900,12 +1029,13 @@
       server_address = url;
   }
   ServerBuilder builder;
-  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
+  //builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
   builder.RegisterService(&service);
   std::unique_ptr<Server> server(builder.BuildAndStart());
   SetAudioPermissions(socket_location.c_str());
   SetAudioPermissions("/dev/shm/AudioServiceShmem");
   std::cout << "[AudioServer] listening on " << server_address << std::endl;
+  runListenerLoop(server, server_address);
   server->Wait();
 }
 
diff --git a/src/audio_service.proto b/src/audio_service.proto
index 06d7fb0..3b5bf47 100644
--- a/src/audio_service.proto
+++ b/src/audio_service.proto
@@ -69,11 +69,11 @@
   // - Bluetooth devices use the MAC address of the device in the form "00:11:22:AA:BB:CC"
   // - USB devices use the ALSA card and device numbers in the form  "card=X;device=Y"
   // - Other devices may use a number or any other string.
-  rpc Device_open_output_stream(OpenOutputStream) returns (StatusReturn) {}
+  rpc Device_open_output_stream(OpenOutputStream) returns (DeviceOpenStreamReturn) {}
   rpc Device_close_output_stream(Stream) returns (StatusReturn) {}
 
   // This method creates and opens the audio hardware input stream
-  rpc Device_open_input_stream(OpenInputStream) returns (StatusReturn) {}
+  rpc Device_open_input_stream(OpenInputStream) returns (DeviceOpenStreamReturn) {}
   rpc Device_close_input_stream(Stream) returns (StatusReturn) {}
 
   rpc Device_dump(google.protobuf.Empty) returns (StatusReturn) {}
@@ -530,6 +530,11 @@
   uint32 size = 2;             // size
 }
 
+message DeviceOpenStreamReturn {
+  int32 ret = 1;
+  int32 client_id = 2;
+}
+
 message GetFrameTimestampReturn {
   int32 ret = 1;
   uint64 frames = 2;
@@ -553,4 +558,4 @@
   bytes cmd_data = 3;
   uint32 reply_size = 4;
   bytes reply_data = 5;
-}
\ No newline at end of file
+}