QueueKeeper.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. <?php
  2. namespace app\process;
  3. use app\common\exceptions\ShopException;
  4. use app\common\facades\SiteSetting;
  5. use app\common\modules\shop\ShopConfig;
  6. use app\common\services\SystemMsgService;
  7. use app\framework\Log\SimpleLog;
  8. use app\host\HostManager;
  9. use Illuminate\Queue\Events\JobExceptionOccurred;
  10. use Illuminate\Queue\Events\JobProcessed;
  11. use Illuminate\Queue\Events\JobProcessing;
  12. use Illuminate\Queue\Events\WorkerStopping;
  13. use Illuminate\Queue\QueueManager;
  14. use Illuminate\Support\Facades\Redis;
  15. class QueueKeeper
  16. {
  17. use Fork;
  18. private $config;
  19. private $pid;
  20. /**
  21. * @var SimpleLog
  22. */
  23. private $log;
  24. private function config()
  25. {
  26. $result = ShopConfig::current()->getItem('queue');
  27. $queueSetting = SiteSetting::get('queue');
  28. $host_count = count((new HostManager())->hosts()) ?: 1;
  29. foreach ($result as &$item) {
  30. if (isset($queueSetting[$item['key']]) && $queueSetting[$item['key']]) {
  31. $item['total'] = $queueSetting[$item['key']];
  32. } else {
  33. $item['total'] = $host_count * $item['total'];
  34. }
  35. }
  36. if ($this->config != $result) {
  37. $this->log->add('config changed', [$this->config, $result, $host_count]);
  38. $this->config = $result;
  39. }
  40. $this->config = $result;
  41. return $result;
  42. }
  43. public function main()
  44. {
  45. $this->pid = getmypid();
  46. $this->log = new SimpleLog('queueKeeper');
  47. $this->stopQueues();
  48. $this->log->add("{$this->pidKey}:stopping", ['初始化']);
  49. /**
  50. * @var QueueManager $queue
  51. */
  52. $queue = app('queue');
  53. $queue->looping(function () {
  54. // 执行任务之前,检查自己是否存在于redis的pid列表中,检查keeper是否存活
  55. if ((new HostManager())->localhost()->pid($this->pidKey) != getmypid() || !posix_kill($this->pid, 0)) {
  56. die;
  57. }
  58. });
  59. $queue->before(function (JobProcessing $event) {
  60. Redis::hset('RunningQueueJobs', $this->pidKey, $event->job->getRawBody());
  61. $dataStr = $event->job->getRawBody();
  62. $data = json_decode($dataStr);
  63. $this->log->add("{$this->pidKey}:{$data->id}:{$data->data->commandName}:begin", [$event->job->getRawBody()]);
  64. });
  65. $queue->after(function (JobProcessed $event) {
  66. Redis::hdel('RunningQueueJobs', $this->pidKey);
  67. $dataStr = $event->job->getRawBody();
  68. $data = json_decode($dataStr);
  69. $this->log->add("{$this->pidKey}:{$data->id}:{$data->data->commandName}:end");
  70. });
  71. $queue->exceptionOccurred(function (JobExceptionOccurred $event) {
  72. Redis::hdel('RunningQueueJobs', $this->pidKey);
  73. $dataStr = $event->job->getRawBody();
  74. $data = json_decode($dataStr);
  75. if (!($event->exception instanceof ShopException)) {
  76. SystemMsgService::addWorkMessage(['title' => '队列执行错误', 'content' => "{$data->data->commandName}:failed"], unserialize($data->data->command)->uniacid);
  77. }
  78. $this->log->add("{$this->pidKey}:{$data->id}:{$data->data->commandName}:failed", [$event->job->getRawBody(), $event->exception]);
  79. \Log::error("队列任务[{$this->pidKey}:{$data->id}:{$data->data->commandName}]运行错误({$this->pidKey})", [$event->job->getRawBody(), $event->exception]);
  80. });
  81. $queue->stopping(function (WorkerStopping $event) {
  82. // 队列关闭时清除redis 进程记录
  83. (new HostManager())->localhost()->clearPid($this->pidKey);
  84. $this->log->add("{$this->pidKey}:stopping", [$event->status,"内存占用过高:" . (memory_get_usage() / 1024 / 1024)]);
  85. });
  86. //$this->startQueues();
  87. }
  88. public function aliveQueueTotal()
  89. {
  90. $aliveTotal = 0;
  91. $hostManager = new HostManager();
  92. foreach ($hostManager->pidkeys() as $pidkey) {
  93. if (strpos($pidkey, 'queues:') !== false) {
  94. $aliveTotal += 1;
  95. }
  96. }
  97. return $aliveTotal;
  98. }
  99. public function aliveQueueLocalTotal()
  100. {
  101. $aliveTotal = 0;
  102. $hostManager = new HostManager();
  103. foreach ($hostManager->pidkey() as $pidkey) {
  104. if (strpos($pidkey, 'queues:') !== false) {
  105. $aliveTotal += 1;
  106. }
  107. }
  108. return $aliveTotal;
  109. }
  110. public function keepAlive()
  111. {
  112. Redis::setex('queueKeeperAlive', 600, 1);
  113. $queueTotal = collect($this->config())->sum('total');
  114. $hostManager = new HostManager();
  115. // 每个服务器平均队列数
  116. $avgTotal = $queueTotal / (count((new HostManager())->hosts()) ?: 1) ?: 1;
  117. if ($this->aliveQueueTotal() < $queueTotal && $hostManager->localhost()->numberOfPids() < $avgTotal + 1) {
  118. //平均数 - 当前服务器存活数
  119. $i = $avgTotal - $this->aliveQueueLocalTotal();
  120. //所有服务器存活数小于总数
  121. while ($this->aliveQueueTotal() < $queueTotal && $i > 0) {
  122. $i--;
  123. $this->startQueues();
  124. }
  125. } elseif ($this->aliveQueueTotal() > $queueTotal) {
  126. $this->log->add("{$this->pidKey}:stopping", ['已生成队列进程数量大于设置队列总数', $this->aliveQueueTotal(), $queueTotal]);
  127. // 大于设置数量重新生成
  128. $hostManager->refresh();
  129. } else {
  130. if ($avgTotal < 3) {
  131. // 太少不需要处理
  132. } elseif ($hostManager->localhost()->numberOfPids() / $avgTotal > 1.5) {
  133. $this->log->add("{$this->pidKey}:stopping", ['已生成队列进程数大于单机最大数量', $hostManager->localhost()->numberOfPids(), $avgTotal]);
  134. // 小于平均值1.5倍数,刷新重新分配
  135. $pidKeys = array_slice($hostManager->localhost()->pidKeys(), $hostManager->localhost()->numberOfPids());
  136. foreach ($pidKeys as $key => $pid) {
  137. $hostManager->localhost()->killProcess($key);
  138. }
  139. }
  140. }
  141. }
  142. public function startQueues()
  143. {
  144. foreach ($this->config() as $conf) {
  145. $result = $this->queue($conf['key'], $conf['option'], $conf['total'], $conf['is_serial']);
  146. if ($result) {
  147. return true;
  148. }
  149. }
  150. }
  151. static public function isAlive()
  152. {
  153. return Redis::get('queueKeeperAlive');
  154. }
  155. static public function stopQueues()
  156. {
  157. \Artisan::call("queue:restart");
  158. Redis::del('RunningQueueJobs');
  159. }
  160. public function stop()
  161. {
  162. $this->log->add("{$this->pidKey}:stopping", debug_backtrace(2));
  163. self::stopQueues();
  164. }
  165. private function queue($key, $option, $total, $isSerial = false)
  166. {
  167. $hostManager = new HostManager();
  168. $total = SiteSetting::get('queue.' . $key) ?: $total;
  169. $option = array_merge(["--sleep" => 5, "--tries" => 1], $option);
  170. for ($i = 0; $i < $total; $i++) {
  171. $queueProcess = new QueueProcess($key, $total, $i, $option, $isSerial);
  172. $queueKey = 'queues:' . $queueProcess->queueName . $queueProcess->index;
  173. // 检查集群所有服务器范围内是否重复
  174. foreach ($hostManager->pidkeys() as $pidkey) {
  175. if (strpos($pidkey, $queueKey) !== false) {
  176. continue 2;
  177. }
  178. }
  179. $this->log->add($queueKey, $queueProcess->getOption());
  180. $this->cProcess(function () use ($queueProcess, $hostManager, $queueKey) {
  181. try {
  182. \Artisan::call("queue:work", $queueProcess->getOption());
  183. } catch (\Exception $exception) {
  184. $hostManager->localhost()->clearPid($this->pidKey);
  185. throw $exception;
  186. }
  187. }, $queueKey);
  188. return true;
  189. }
  190. return false;
  191. }
  192. }