WebSocket.php 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. <?php
  2. namespace app\process;
  3. use Workerman\Connection\TcpConnection;
  4. use Workerman\Lib\Timer;
  5. use Workerman\Worker;
  6. class WebSocket
  7. {
  8. protected $worker;
  9. protected $connections = [];
  10. protected $members = [];
  11. protected $groups = [];
  12. protected $handle;
  13. public function __construct(Worker $worker)
  14. {
  15. $this->worker = $worker;
  16. $this->handle = new WebSocketHandle($this);
  17. }
  18. public function setMember($member_id,$group,TcpConnection $connection)
  19. {
  20. $this->members[$group][$member_id] = $connection;
  21. }
  22. public function setGroup($group,TcpConnection $connection)
  23. {
  24. $this->groups[$group][$connection->id] = $connection;
  25. }
  26. public function onConnect(TcpConnection $connection)
  27. {
  28. $this->connections[$connection->id] = $connection;
  29. $connection->lastMessageTime = time();
  30. }
  31. public function onMessage(TcpConnection $connection, $res)
  32. {
  33. $connection->lastMessageTime = time();
  34. $res = json_decode($res, true);
  35. if (!method_exists($this->handle, $res['type'])) {
  36. return $connection->send('pong');
  37. }
  38. $this->handle->{$res['type']}($connection, $res['data']);
  39. }
  40. public function onWorkerStart(Worker $worker)
  41. {
  42. var_dump('onWorkerStart');
  43. // 启动时顺便启动内部通信,并连接监听内部通讯
  44. $inner_worker = new \app\worker\Worker('Text://127.0.0.1:5678');
  45. $inner_worker->onMessage = function (TcpConnection $tcpConnection, $data) use ($worker) {
  46. $data = json_decode($data, true);
  47. $group = $this->groups[$data['type']];
  48. if (!empty($data['ids'])) {
  49. foreach ($data['ids'] as $id) {
  50. $connection = $this->members[$data['type']][$id];
  51. isset($connection) && $connection->send(json_encode($data['content']));
  52. }
  53. } else {
  54. foreach ($group as $connection) {
  55. isset($connection) && $connection->send(json_encode($data['content']));
  56. }
  57. }
  58. if ($data['type'] == 'is_online') {
  59. $online = [];
  60. foreach ($this->groups['online_service'] as $connection) {
  61. $online[] = $connection->member_id;
  62. }
  63. return $tcpConnection->send(json_encode($online));
  64. }
  65. $tcpConnection->send('success');
  66. };
  67. $inner_worker->listen();
  68. //心跳监测
  69. Timer::add(10, function () use (&$worker) {
  70. $time_now = time();
  71. foreach ($worker->connections as $connection) {
  72. if ($time_now - $connection->lastMessageTime > 30) {
  73. $connection->close('心跳超时断开');
  74. }
  75. }
  76. });
  77. }
  78. public function onClose(TcpConnection $connection)
  79. {
  80. unset($this->connections[$connection->id]);
  81. if (isset($connection->member_id) && isset($connection->group)) {
  82. unset($this->members[$connection->group][$connection->member_id]);
  83. unset($this->groups[$connection->group][$connection->id]);
  84. }
  85. }
  86. }