Jetpack_Sync_Queue

A persistent queue that can be flushed in increments of N items, and which blocks reads until checked-out buffers are checked in or closed.

Defined (1)

The class is defined in the following location(s).

/sync/class.jetpack-sync-queue.php  
  1. class Jetpack_Sync_Queue { 
  2. public $id; 
  3. private $row_iterator; 
  4.  
  5. function __construct( $id ) { 
  6. $this->id = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL 
  7. $this->row_iterator = 0; 
  8. $this->random_int = mt_rand( 1, 1000000 ); 
  9.  
  10. function add( $item ) { 
  11. global $wpdb; 
  12. $added = false; 
  13. // this basically tries to add the option until enough time has elapsed that 
  14. // it has a unique (microtime-based) option key 
  15. while ( ! $added ) { 
  16. $rows_added = $wpdb->query( $wpdb->prepare( 
  17. "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s, %s)",  
  18. $this->get_next_data_row_option_name(),  
  19. serialize( $item ),  
  20. 'no' 
  21. ) ); 
  22. $added = ( 0 !== $rows_added ); 
  23.  
  24. // Attempts to insert all the items in a single SQL query. May be subject to query size limits! 
  25. function add_all( $items ) { 
  26. global $wpdb; 
  27. $base_option_name = $this->get_next_data_row_option_name(); 
  28.  
  29. $query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES "; 
  30.  
  31. $rows = array(); 
  32.  
  33. for ( $i = 0; $i < count( $items ); $i += 1 ) { 
  34. $option_name = esc_sql( $base_option_name . '-' . $i ); 
  35. $option_value = esc_sql( serialize( $items[ $i ] ) ); 
  36. $rows[] = "('$option_name', '$option_value', 'no')"; 
  37.  
  38. $rows_added = $wpdb->query( $query . join( ', ', $rows ) ); 
  39.  
  40. if ( count( $items ) === $rows_added ) { 
  41. return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" ); 
  42.  
  43. // Peek at the front-most item on the queue without checking it out 
  44. function peek( $count = 1 ) { 
  45. $items = $this->fetch_items( $count ); 
  46. if ( $items ) { 
  47. return Jetpack_Sync_Utils::get_item_values( $items ); 
  48.  
  49. return array(); 
  50.  
  51. // lag is the difference in time between the age of the oldest item 
  52. // (aka first or frontmost item) and the current time 
  53. function lag() { 
  54. global $wpdb; 
  55.  
  56. $first_item_name = $wpdb->get_var( $wpdb->prepare( 
  57. "SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",  
  58. "jpsq_{$this->id}-%" 
  59. ) ); 
  60.  
  61. if ( ! $first_item_name ) { 
  62. return 0; 
  63.  
  64. // break apart the item name to get the timestamp 
  65. $matches = null; 
  66. if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) { 
  67. return microtime( true ) - floatval( $matches[1] ); 
  68. } else { 
  69. return 0; 
  70.  
  71. function reset() { 
  72. global $wpdb; 
  73. $this->delete_checkout_id(); 
  74. $wpdb->query( $wpdb->prepare( 
  75. "DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%" 
  76. ) ); 
  77.  
  78. function size() { 
  79. global $wpdb; 
  80.  
  81. return (int) $wpdb->get_var( $wpdb->prepare( 
  82. "SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%" 
  83. ) ); 
  84.  
  85. // we use this peculiar implementation because it's much faster than count(*) 
  86. function has_any_items() { 
  87. global $wpdb; 
  88. $value = $wpdb->get_var( $wpdb->prepare( 
  89. "SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%" 
  90. ) ); 
  91.  
  92. return ( $value === '1' ); 
  93.  
  94. function checkout( $buffer_size ) { 
  95. if ( $this->get_checkout_id() ) { 
  96. return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' ); 
  97.  
  98. $buffer_id = uniqid(); 
  99.  
  100. $result = $this->set_checkout_id( $buffer_id ); 
  101.  
  102. if ( ! $result || is_wp_error( $result ) ) { 
  103. return $result; 
  104.  
  105. $items = $this->fetch_items( $buffer_size ); 
  106.  
  107. if ( count( $items ) === 0 ) { 
  108. return false; 
  109.  
  110. $buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) ); 
  111.  
  112. return $buffer; 
  113.  
  114. // this checks out rows until it either empties the queue or hits a certain memory limit 
  115. // it loads the sizes from the DB first so that it doesn't accidentally 
  116. // load more data into memory than it needs to. 
  117. // The only way it will load more items than $max_size is if a single queue item 
  118. // exceeds the memory limit, but in that case it will send that item by itself. 
  119. function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) { 
  120. if ( $this->get_checkout_id() ) { 
  121. return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' ); 
  122.  
  123. $buffer_id = uniqid(); 
  124.  
  125. $result = $this->set_checkout_id( $buffer_id ); 
  126.  
  127. if ( ! $result || is_wp_error( $result ) ) { 
  128. return $result; 
  129.  
  130. // get the map of buffer_id -> memory_size 
  131. global $wpdb; 
  132.  
  133. $items_with_size = $wpdb->get_results( 
  134. $wpdb->prepare( 
  135. "SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",  
  136. "jpsq_{$this->id}-%",  
  137. $max_buffer_size 
  138. ),  
  139. OBJECT 
  140. ); 
  141.  
  142. if ( count( $items_with_size ) === 0 ) { 
  143. return false; 
  144.  
  145. $total_memory = 0; 
  146.  
  147. $min_item_id = $max_item_id = $items_with_size[0]->id; 
  148.  
  149. foreach ( $items_with_size as $id => $item_with_size ) { 
  150. $total_memory += $item_with_size->value_size; 
  151.  
  152. // if this is the first item and it exceeds memory, allow loop to continue 
  153. // we will exit on the next iteration instead 
  154. if ( $total_memory > $max_memory && $id > 0 ) { 
  155. break; 
  156.  
  157. $max_item_id = $item_with_size->id; 
  158.  
  159. $query = $wpdb->prepare(  
  160. "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",  
  161. $min_item_id,  
  162. $max_item_id 
  163. ); 
  164.  
  165. $items = $wpdb->get_results( $query, OBJECT ); 
  166. foreach ( $items as $item ) { 
  167. $item->value = maybe_unserialize( $item->value ); 
  168.  
  169. if ( count( $items ) === 0 ) { 
  170. $this->delete_checkout_id(); 
  171.  
  172. return false; 
  173.  
  174. $buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items ); 
  175.  
  176. return $buffer; 
  177.  
  178. function checkin( $buffer ) { 
  179. $is_valid = $this->validate_checkout( $buffer ); 
  180.  
  181. if ( is_wp_error( $is_valid ) ) { 
  182. return $is_valid; 
  183.  
  184. $this->delete_checkout_id(); 
  185.  
  186. return true; 
  187.  
  188. function close( $buffer, $ids_to_remove = null ) { 
  189. $is_valid = $this->validate_checkout( $buffer ); 
  190.  
  191. if ( is_wp_error( $is_valid ) ) { 
  192. return $is_valid; 
  193.  
  194. $this->delete_checkout_id(); 
  195.  
  196. // by default clear all items in the buffer 
  197. if ( is_null( $ids_to_remove ) ) { 
  198. $ids_to_remove = $buffer->get_item_ids(); 
  199.  
  200. global $wpdb; 
  201.  
  202. if ( count( $ids_to_remove ) > 0 ) { 
  203. $sql = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')'; 
  204. $query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) ); 
  205. $wpdb->query( $query ); 
  206.  
  207. return true; 
  208.  
  209. function flush_all() { 
  210. $items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() ); 
  211. $this->reset(); 
  212.  
  213. return $items; 
  214.  
  215. function get_all() { 
  216. return $this->fetch_items(); 
  217.  
  218. // use with caution, this could allow multiple processes to delete 
  219. // and send from the queue at the same time 
  220. function force_checkin() { 
  221. $this->delete_checkout_id(); 
  222.  
  223. // used to lock checkouts from the queue. 
  224. // tries to wait up to $timeout seconds for the queue to be empty 
  225. function lock( $timeout = 30 ) { 
  226. $tries = 0; 
  227.  
  228. while ( $this->has_any_items() && $tries < $timeout ) { 
  229. sleep( 1 ); 
  230. $tries += 1; 
  231.  
  232. if ( $tries === 30 ) { 
  233. return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' ); 
  234.  
  235. if ( $this->get_checkout_id() ) { 
  236. return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' ); 
  237.  
  238. // hopefully this means we can acquire a checkout? 
  239. $result = $this->set_checkout_id( 'lock' ); 
  240.  
  241. if ( ! $result || is_wp_error( $result ) ) { 
  242. return $result; 
  243.  
  244. return true; 
  245.  
  246. function unlock() { 
  247. return $this->delete_checkout_id(); 
  248.  
  249. private function get_checkout_id() { 
  250. global $wpdb; 
  251. $checkout_value = $wpdb->get_var(  
  252. $wpdb->prepare( 
  253. "SELECT option_value FROM $wpdb->options WHERE option_name = %s",  
  254. $this->get_lock_option_name() 
  255. ); 
  256.  
  257. if ( $checkout_value ) { 
  258. list( $checkout_id, $timestamp ) = explode( ':', $checkout_value ); 
  259. if ( intval( $timestamp ) > time() ) { 
  260. return $checkout_id; 
  261.  
  262. return false; 
  263.  
  264. private function set_checkout_id( $checkout_id ) { 
  265. global $wpdb; 
  266.  
  267. $expires = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout; 
  268. $updated_num = $wpdb->query( 
  269. $wpdb->prepare( 
  270. "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",  
  271. "$checkout_id:$expires",  
  272. $this->get_lock_option_name() 
  273. ); 
  274.  
  275. if ( ! $updated_num ) { 
  276. $updated_num = $wpdb->query( 
  277. $wpdb->prepare( 
  278. "INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",  
  279. $this->get_lock_option_name(),  
  280. "$checkout_id:$expires" 
  281. ); 
  282.  
  283. return $updated_num; 
  284.  
  285. private function delete_checkout_id() { 
  286. global $wpdb; 
  287. // rather than delete, which causes fragmentation, we update in place 
  288. return $wpdb->query( 
  289. $wpdb->prepare(  
  290. "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",  
  291. "0:0",  
  292. $this->get_lock_option_name()  
  293. )  
  294. ); 
  295.  
  296.  
  297. private function get_lock_option_name() { 
  298. return "jpsq_{$this->id}_checkout"; 
  299.  
  300. private function get_next_data_row_option_name() { 
  301. // this option is specifically chosen to, as much as possible, preserve time order 
  302. // and minimise the possibility of collisions between multiple processes working 
  303. // at the same time 
  304. // TODO: confirm we only need to support PHP 5.05+ (otherwise we'll need to emulate microtime as float, and avoid PHP_INT_MAX) 
  305. // @see: http://php.net/manual/en/function.microtime.php 
  306. $timestamp = sprintf( '%.6f', microtime( true ) ); 
  307.  
  308. // row iterator is used to avoid collisions where we're writing data waaay fast in a single process 
  309. if ( $this->row_iterator === PHP_INT_MAX ) { 
  310. $this->row_iterator = 0; 
  311. } else { 
  312. $this->row_iterator += 1; 
  313.  
  314. return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator; 
  315.  
  316. private function fetch_items( $limit = null ) { 
  317. global $wpdb; 
  318.  
  319. if ( $limit ) { 
  320. $query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit ); 
  321. } else { 
  322. $query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" ); 
  323.  
  324. $items = $wpdb->get_results( $query_sql, OBJECT ); 
  325. foreach ( $items as $item ) { 
  326. $item->value = maybe_unserialize( $item->value ); 
  327.  
  328. return $items; 
  329.  
  330. private function validate_checkout( $buffer ) { 
  331. if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) { 
  332. return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' ); 
  333.  
  334. $checkout_id = $this->get_checkout_id(); 
  335.  
  336. if ( ! $checkout_id ) { 
  337. return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' ); 
  338.  
  339. if ( $checkout_id != $buffer->id ) { 
  340. return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' ); 
  341.  
  342. return true;