GuzzleHttpPool

Sends and iterator of requests concurrently using a capped pool size.

Defined (1)

The class is defined in the following location(s).

/lib/Azure/GuzzleHttp/Pool.php  
  1. class Pool implements PromisorInterface 
  2. /** @var EachPromise */ 
  3. private $each; 
  4.  
  5. /** 
  6. * @param ClientInterface $client Client used to send the requests. 
  7. * @param array|\Iterator $requests Requests or functions that return 
  8. * requests to send concurrently. 
  9. * @param array $config Associative array of options 
  10. * - concurrency: (int) Maximum number of requests to send concurrently 
  11. * - options: Array of request options to apply to each request. 
  12. * - fulfilled: (callable) Function to invoke when a request completes. 
  13. * - rejected: (callable) Function to invoke when a request is rejected. 
  14. */ 
  15. public function __construct( 
  16. ClientInterface $client,  
  17. $requests,  
  18. array $config = [] 
  19. ) { 
  20. // Backwards compatibility. 
  21. if (isset($config['pool_size'])) { 
  22. $config['concurrency'] = $config['pool_size']; 
  23. } elseif (!isset($config['concurrency'])) { 
  24. $config['concurrency'] = 25; 
  25.  
  26. if (isset($config['options'])) { 
  27. $opts = $config['options']; 
  28. unset($config['options']); 
  29. } else { 
  30. $opts = []; 
  31.  
  32. $iterable = \GuzzleHttp\Promise\iter_for($requests); 
  33. $requests = function () use ($iterable, $client, $opts) { 
  34. foreach ($iterable as $key => $rfn) { 
  35. if ($rfn instanceof RequestInterface) { 
  36. yield $key => $client->sendAsync($rfn, $opts); 
  37. } elseif (is_callable($rfn)) { 
  38. yield $key => $rfn($opts); 
  39. } else { 
  40. throw new \InvalidArgumentException('Each value yielded by ' 
  41. . 'the iterator must be a Psr7\Http\Message\RequestInterface ' 
  42. . 'or a callable that returns a promise that fulfills ' 
  43. . 'with a Psr7\Message\Http\ResponseInterface object.'); 
  44. }; 
  45.  
  46. $this->each = new EachPromise($requests(), $config); 
  47.  
  48. public function promise() 
  49. return $this->each->promise(); 
  50.  
  51. /** 
  52. * Sends multiple requests concurrently and returns an array of responses 
  53. * and exceptions that uses the same ordering as the provided requests. 
  54. * IMPORTANT: This method keeps every request and response in memory, and 
  55. * as such, is NOT recommended when sending a large number or an 
  56. * indeterminate number of requests concurrently. 
  57. * @param ClientInterface $client Client used to send the requests 
  58. * @param array|\Iterator $requests Requests to send concurrently. 
  59. * @param array $options Passes through the options available in 
  60. * {@see GuzzleHttp\Pool::__construct} 
  61. * @return array Returns an array containing the response or an exception 
  62. * in the same order that the requests were sent. 
  63. * @throws \InvalidArgumentException if the event format is incorrect. 
  64. */ 
  65. public static function batch( 
  66. ClientInterface $client,  
  67. $requests,  
  68. array $options = [] 
  69. ) { 
  70. $res = []; 
  71. self::cmpCallback($options, 'fulfilled', $res); 
  72. self::cmpCallback($options, 'rejected', $res); 
  73. $pool = new static($client, $requests, $options); 
  74. $pool->promise()->wait(); 
  75. ksort($res); 
  76.  
  77. return $res; 
  78.  
  79. private static function cmpCallback(array &$options, $name, array &$results) 
  80. if (!isset($options[$name])) { 
  81. $options[$name] = function ($v, $k) use (&$results) { 
  82. $results[$k] = $v; 
  83. }; 
  84. } else { 
  85. $currentFn = $options[$name]; 
  86. $options[$name] = function ($v, $k) use (&$results, $currentFn) { 
  87. $currentFn($v, $k); 
  88. $results[$k] = $v; 
  89. };