Shop.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. <?php
  2. namespace app\console\Commands;
  3. use app\common\facades\SiteSetting;
  4. use app\host\HostManager;
  5. use app\process\CronKeeper;
  6. use app\process\QueueKeeper;
  7. use app\process\WebSocket;
  8. use app\worker\Worker;
  9. use Illuminate\Console\Command;
  10. use Illuminate\Support\Facades\Redis;
  11. use Workerman\Timer;
  12. class Shop extends Command
  13. {
  14. protected $signature = 'shop {action} {--d}';
  15. /**
  16. * The console command description.
  17. *
  18. * @var string
  19. */
  20. protected $description = '商城守护进程';
  21. public function handle()
  22. {
  23. global $argv;
  24. $action = $this->argument('action');
  25. $argv[0] = 'wk';
  26. $argv[1] = $action;
  27. $argv[2] = $this->option('d') ? '-d' : '';
  28. if ($action == 'start') {
  29. $this->start();
  30. }
  31. if ($action == 'stop') {
  32. }
  33. // 运行worker
  34. Worker::runAll();
  35. }
  36. private $startTime;
  37. /**
  38. * 队列进程管理
  39. */
  40. private function queueKeeper()
  41. {
  42. $worker = new Worker();
  43. $worker->count = 1;
  44. $worker->name = 'yun_shop_queue';
  45. app()->forgetInstance('redis');
  46. app()->forgetInstance('db');
  47. // 生成队列进程
  48. $worker->onWorkerStart = function ($worker) {
  49. // 避免多个服务器的work同一时间生成,产生不有必要的并发问题
  50. sleep(rand(0, 100) / 100);
  51. error_reporting(0);
  52. ini_set('display_errors', 1);
  53. app('redis')->refresh();
  54. app('db')->refresh();
  55. (new HostManager())->localhost()->clearPids();
  56. $queueKeeper = new QueueKeeper();
  57. $queueKeeper->main();
  58. $queueTimer = function () use ($queueKeeper) {
  59. $hostManager = new HostManager();
  60. $hostManager->localhost()->clearPids();
  61. $hostManager->localhost()->register();
  62. $queueKeeper->keepAlive();
  63. };
  64. call_user_func($queueTimer);
  65. Timer::add(30, $queueTimer);
  66. $worker->queueKeeper = $queueKeeper;
  67. };
  68. // 关闭队列进程
  69. $worker->onWorkerStop = function ($worker) {
  70. error_reporting(0);
  71. ini_set('display_errors', 1);
  72. Timer::delAll();
  73. $worker->queueKeeper->stop();
  74. $hostManager = new HostManager();
  75. $hostManager->localhost()->clearPids();
  76. $hostManager->localhost()->killAll();
  77. $hostManager->logout(gethostname());
  78. };
  79. }
  80. private function daemonKeeper()
  81. {
  82. $worker = new Worker();
  83. $worker->count = 1;
  84. $worker->name = 'yun_shop_daemon';
  85. app()->forgetInstance('redis');
  86. app()->forgetInstance('db');
  87. $worker->onWorkerStart = function ($worker) {
  88. sleep(rand(0, 100) / 100);
  89. error_reporting(0);
  90. ini_set('display_errors', 1);
  91. app('redis')->refresh();
  92. app('db')->refresh();
  93. Timer::add(1, function () {
  94. $hostManager = new HostManager();
  95. // 升级后延时重启队列(防止更新后无重启队列)
  96. //判断重启时间,如果60秒内重启过就不重启
  97. // if (($hostManager->restartTime() > $this->startTime) && ($this->startTime + 60 < $hostManager->restartTime())) {
  98. if (($hostManager->restartTime() > $this->startTime)) {
  99. app('supervisor')->restart();
  100. }
  101. });
  102. };
  103. // 关闭队列进程
  104. $worker->onWorkerStop = function ($worker) {
  105. error_reporting(0);
  106. ini_set('display_errors', 1);
  107. Timer::delAll();
  108. };
  109. }
  110. private function cronKeeper()
  111. {
  112. $worker = new Worker();
  113. $worker->count = 1;
  114. $worker->name = 'yun_shop_cron';
  115. // 生成队列进程
  116. $worker->onWorkerStart = function ($worker) {
  117. // 避免多个服务器的work同一时间生成,产生不有必要的并发问题
  118. sleep(rand(0, 100) / 100);
  119. error_reporting(0);
  120. ini_set('display_errors', 1);
  121. app('redis')->refresh();
  122. app('db')->refresh();
  123. $cronKeeper = new CronKeeper();
  124. $cronKeeper->main();
  125. Timer::add(60, function () use ($cronKeeper) {
  126. $cronKeeper->run();
  127. });
  128. };
  129. // 关闭队列进程
  130. $worker->onWorkerStop = function ($worker) {
  131. error_reporting(0);
  132. ini_set('display_errors', 1);
  133. Timer::delAll();
  134. Redis::del('RunningCronJobs');
  135. };
  136. }
  137. private function webSocketKeeper()
  138. {
  139. if (SiteSetting::get('websocket')['is_open'] != 1) {
  140. return;
  141. }
  142. $worker = new Worker('websocket://0.0.0.0:8181');
  143. $worker->count = 1;
  144. $webSocket = new WebSocket($worker);
  145. // 连接时回调
  146. $worker->onConnect = [$webSocket, 'onConnect'];
  147. // 收到客户端信息时回调
  148. $worker->onMessage = [$webSocket, 'onMessage'];
  149. // 进程启动后的回调
  150. $worker->onWorkerStart = [$webSocket, 'onWorkerStart'];
  151. // 断开时触发的回调
  152. $worker->onClose = [$webSocket, 'onClose'];
  153. }
  154. private function start()
  155. {
  156. // 记录自身host
  157. (new HostManager())->localhost()->register();
  158. $this->startTime = time();
  159. $this->daemonKeeper();
  160. $this->queueKeeper();
  161. $this->cronKeeper();
  162. $this->webSocketKeeper();
  163. }
  164. }