Dispatcher.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. <?php
  2. namespace app\framework\Bus;
  3. use app\process\QueueKeeper;
  4. use Illuminate\Contracts\Queue\Queue;
  5. use Illuminate\Database\Events\TransactionBeginning;
  6. use Illuminate\Database\Events\TransactionCommitted;
  7. use Illuminate\Database\Events\TransactionRolledBack;
  8. use Illuminate\Queue\RedisQueue;
  9. use RuntimeException;
  10. class Dispatcher extends \Illuminate\Bus\Dispatcher
  11. {
  12. private $redisQueues = [];
  13. private $index;
  14. private $p = [];
  15. /**
  16. * Dispatch a command to its appropriate handler.
  17. *
  18. * @param mixed $command
  19. * @return mixed
  20. */
  21. public function dispatch($command)
  22. {
  23. if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
  24. // 如果没有开启商城守护进程,则不为队列分组
  25. if (!QueueKeeper::isAlive()) {
  26. $command->queue = null;
  27. }
  28. return $this->dispatchToQueue($command);
  29. } else {
  30. return $this->dispatchNow($command);
  31. }
  32. }
  33. /**
  34. * redis队列等待数据提交后
  35. * @param mixed $command
  36. * @return mixed
  37. */
  38. public function dispatchToQueue($command)
  39. {
  40. $connection = isset($command->connection) ? $command->connection : null;
  41. $queue = call_user_func($this->queueResolver, $connection);
  42. if (!$queue instanceof Queue) {
  43. throw new RuntimeException('Queue resolver did not return a Queue implementation.');
  44. }
  45. if (method_exists($command, 'queue')) {
  46. return $command->queue($queue, $command);
  47. } else {
  48. // 当队列任务驱动为redis,并且包含在数据库事务中时,保存redis队列任务,等待事务先提交
  49. if ($queue instanceof RedisQueue && app('db.connection')->transactionLevel() > 0) {
  50. $this->addRedisQueue($queue, $command, app('db.connection')->transactionLevel());
  51. } else {
  52. return $this->pushCommandToQueue($queue, $command);
  53. }
  54. }
  55. }
  56. public function getRedis()
  57. {
  58. return $this->redisQueues;
  59. }
  60. private function addRedisQueue($queue, $command, $level)
  61. {
  62. //存入当前指针
  63. $this->redisQueues[end($this->p)][] = [$queue, $command];
  64. }
  65. public function dbTransactionBeginning(TransactionBeginning $event)
  66. {
  67. //指针
  68. $level = $event->connection->transactionLevel();
  69. if (!empty($this->p)) {
  70. $now = end($this->p) . '-' . $level;
  71. $count = array_count_values($this->index);
  72. $next_level = $count[$level] + 1;
  73. $now .= '('.$next_level.')';
  74. } else {
  75. $now = (string)$level;
  76. }
  77. $this->index[] = $event->connection->transactionLevel();
  78. $this->p[] = $now;
  79. end($this->p);
  80. }
  81. public function dbTransactionCommitted(TransactionCommitted $event)
  82. {
  83. //指针前移
  84. array_pop($this->p);
  85. // mysql事务提交后,推送redis队列任务,判断是否level是否为0
  86. if ($event->connection->transactionLevel() == 0) {
  87. $this->pushRedisQueues();
  88. }
  89. }
  90. public function dbTransactionRollBack(TransactionRolledBack $event)
  91. {
  92. //指针前移
  93. $p = array_pop($this->p);
  94. if (!isset($this->redisQueues)) {
  95. return;
  96. }
  97. $p = addcslashes($p,"-()");
  98. foreach ($this->redisQueues as $key=>$value) {
  99. if (preg_match("/$p(.*)/",$key,$match)) {
  100. unset($this->redisQueues[$key]);
  101. };
  102. }
  103. }
  104. public function pushRedisQueues()
  105. {
  106. if (empty($this->redisQueues)) {
  107. return;
  108. }
  109. foreach ($this->redisQueues as $redisQueueLevel) {
  110. foreach ($redisQueueLevel as $redisQueue) {
  111. list($queue, $command) = $redisQueue;
  112. $this->pushCommandToQueue($queue, $command);
  113. }
  114. }
  115. unset($this->redisQueues);
  116. }
  117. }