GuzzleHttpPromiseEachPromise

Represents a promise that iterates over many promises and invokes side-effect functions in the process.

Defined (1)

The class is defined in the following location(s).

/lib/Azure/GuzzleHttp/Promise/EachPromise.php  
  1. class EachPromise implements PromisorInterface 
  2. private $pending = []; 
  3.  
  4. /** @var \Iterator */ 
  5. private $iterable; 
  6.  
  7. /** @var callable|int */ 
  8. private $concurrency; 
  9.  
  10. /** @var callable */ 
  11. private $onFulfilled; 
  12.  
  13. /** @var callable */ 
  14. private $onRejected; 
  15.  
  16. /** @var Promise */ 
  17. private $aggregate; 
  18.  
  19. /** @var bool */ 
  20. private $mutex; 
  21.  
  22. /** 
  23. * Configuration hash can include the following key value pairs: 
  24. * - fulfilled: (callable) Invoked when a promise fulfills. The function 
  25. * is invoked with three arguments: the fulfillment value, the index 
  26. * position from the iterable list of the promise, and the aggregate 
  27. * promise that manages all of the promises. The aggregate promise may 
  28. * be resolved from within the callback to short-circuit the promise. 
  29. * - rejected: (callable) Invoked when a promise is rejected. The 
  30. * function is invoked with three arguments: the rejection reason, the 
  31. * index position from the iterable list of the promise, and the 
  32. * aggregate promise that manages all of the promises. The aggregate 
  33. * promise may be resolved from within the callback to short-circuit 
  34. * the promise. 
  35. * - concurrency: (integer) Pass this configuration option to limit the 
  36. * allowed number of outstanding concurrently executing promises,  
  37. * creating a capped pool of promises. There is no limit by default. 
  38. * @param mixed $iterable Promises or values to iterate. 
  39. * @param array $config Configuration options 
  40. */ 
  41. public function __construct($iterable, array $config = []) 
  42. $this->iterable = iter_for($iterable); 
  43.  
  44. if (isset($config['concurrency'])) { 
  45. $this->concurrency = $config['concurrency']; 
  46.  
  47. if (isset($config['fulfilled'])) { 
  48. $this->onFulfilled = $config['fulfilled']; 
  49.  
  50. if (isset($config['rejected'])) { 
  51. $this->onRejected = $config['rejected']; 
  52.  
  53. public function promise() 
  54. if ($this->aggregate) { 
  55. return $this->aggregate; 
  56.  
  57. try { 
  58. $this->createPromise(); 
  59. $this->iterable->rewind(); 
  60. $this->refillPending(); 
  61. } catch (\Throwable $e) { 
  62. $this->aggregate->reject($e); 
  63. } catch (\Exception $e) { 
  64. $this->aggregate->reject($e); 
  65.  
  66. return $this->aggregate; 
  67.  
  68. private function createPromise() 
  69. $this->mutex = false; 
  70. $this->aggregate = new Promise(function () { 
  71. reset($this->pending); 
  72. if (empty($this->pending) && !$this->iterable->valid()) { 
  73. $this->aggregate->resolve(null); 
  74. return; 
  75.  
  76. // Consume a potentially fluctuating list of promises while 
  77. // ensuring that indexes are maintained (precluding array_shift). 
  78. while ($promise = current($this->pending)) { 
  79. next($this->pending); 
  80. $promise->wait(); 
  81. if ($this->aggregate->getState() !== PromiseInterface::PENDING) { 
  82. return; 
  83. }); 
  84.  
  85. // Clear the references when the promise is resolved. 
  86. $clearFn = function () { 
  87. $this->iterable = $this->concurrency = $this->pending = null; 
  88. $this->onFulfilled = $this->onRejected = null; 
  89. }; 
  90.  
  91. $this->aggregate->then($clearFn, $clearFn); 
  92.  
  93. private function refillPending() 
  94. if (!$this->concurrency) { 
  95. // Add all pending promises. 
  96. while ($this->addPending() && $this->advanceIterator()); 
  97. return; 
  98.  
  99. // Add only up to N pending promises. 
  100. $concurrency = is_callable($this->concurrency) 
  101. ? call_user_func($this->concurrency, count($this->pending)) 
  102. : $this->concurrency; 
  103. $concurrency = max($concurrency - count($this->pending), 0); 
  104. // Concurrency may be set to 0 to disallow new promises. 
  105. if (!$concurrency) { 
  106. return; 
  107. // Add the first pending promise. 
  108. $this->addPending(); 
  109. // Note this is special handling for concurrency=1 so that we do 
  110. // not advance the iterator after adding the first promise. This 
  111. // helps work around issues with generators that might not have the 
  112. // next value to yield until promise callbacks are called. 
  113. while (--$concurrency 
  114. && $this->advanceIterator() 
  115. && $this->addPending()); 
  116.  
  117. private function addPending() 
  118. if (!$this->iterable || !$this->iterable->valid()) { 
  119. return false; 
  120.  
  121. $promise = promise_for($this->iterable->current()); 
  122. $idx = $this->iterable->key(); 
  123.  
  124. $this->pending[$idx] = $promise->then( 
  125. function ($value) use ($idx) { 
  126. if ($this->onFulfilled) { 
  127. call_user_func( 
  128. $this->onFulfilled, $value, $idx, $this->aggregate 
  129. ); 
  130. $this->step($idx); 
  131. },  
  132. function ($reason) use ($idx) { 
  133. if ($this->onRejected) { 
  134. call_user_func( 
  135. $this->onRejected, $reason, $idx, $this->aggregate 
  136. ); 
  137. $this->step($idx); 
  138. ); 
  139.  
  140. return true; 
  141.  
  142. private function advanceIterator() 
  143. // Place a lock on the iterator so that we ensure to not recurse,  
  144. // preventing fatal generator errors. 
  145. if ($this->mutex) { 
  146. return false; 
  147.  
  148. $this->mutex = true; 
  149.  
  150. try { 
  151. $this->iterable->next(); 
  152. $this->mutex = false; 
  153. return true; 
  154. } catch (\Throwable $e) { 
  155. $this->aggregate->reject($e); 
  156. $this->mutex = false; 
  157. return false; 
  158. } catch (\Exception $e) { 
  159. $this->aggregate->reject($e); 
  160. $this->mutex = false; 
  161. return false; 
  162.  
  163. private function step($idx) 
  164. // If the promise was already resolved, then ignore this step. 
  165. if ($this->aggregate->getState() !== PromiseInterface::PENDING) { 
  166. return; 
  167.  
  168. unset($this->pending[$idx]); 
  169.  
  170. // Only refill pending promises if we are not locked, preventing the 
  171. // EachPromise to recursively invoke the provided iterator, which 
  172. // cause a fatal error: "Cannot resume an already running generator" 
  173. if ($this->advanceIterator() && !$this->checkIfFinished()) { 
  174. // Add more pending promises if possible. 
  175. $this->refillPending(); 
  176.  
  177. private function checkIfFinished() 
  178. if (!$this->pending && !$this->iterable->valid()) { 
  179. // Resolve the promise if there's nothing left to do. 
  180. $this->aggregate->resolve(null); 
  181. return true; 
  182.  
  183. return false;