Jetpack_Sync_Sender

This class grabs pending actions from the queue and sends them.

Defined (1)

The class is defined in the following location(s).

/sync/class.jetpack-sync-sender.php  
  1. class Jetpack_Sync_Sender { 
  2.  
  3. const NEXT_SYNC_TIME_OPTION_NAME = 'jetpack_next_sync_time'; 
  4. const WPCOM_ERROR_SYNC_DELAY = 60; 
  5. const QUEUE_LOCKED_SYNC_DELAY = 10; 
  6.  
  7. private $dequeue_max_bytes; 
  8. private $upload_max_bytes; 
  9. private $upload_max_rows; 
  10. private $max_dequeue_time; 
  11. private $sync_wait_time; 
  12. private $sync_wait_threshold; 
  13. private $enqueue_wait_time; 
  14. private $sync_queue; 
  15. private $full_sync_queue; 
  16. private $codec; 
  17.  
  18. // singleton functions 
  19. private static $instance; 
  20.  
  21. public static function get_instance() { 
  22. if ( null === self::$instance ) { 
  23. self::$instance = new self(); 
  24.  
  25. return self::$instance; 
  26.  
  27. // this is necessary because you can't use "new" when you declare instance properties >:( 
  28. protected function __construct() { 
  29. $this->set_defaults(); 
  30. $this->init(); 
  31.  
  32. private function init() { 
  33. foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { 
  34. $module->init_before_send(); 
  35.  
  36. public function get_next_sync_time( $queue_name ) { 
  37. return (double) get_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name, 0 ); 
  38.  
  39. public function set_next_sync_time( $time, $queue_name ) { 
  40. return update_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name, $time, true ); 
  41.  
  42. public function do_full_sync() { 
  43. $this->continue_full_sync_enqueue(); 
  44. return $this->do_sync_and_set_delays( $this->full_sync_queue ); 
  45.  
  46. private function continue_full_sync_enqueue() { 
  47. if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) { 
  48. return false; 
  49.  
  50. if ( $this->get_next_sync_time( 'full-sync-enqueue' ) > microtime( true ) ) { 
  51. return false; 
  52.  
  53. Jetpack_Sync_Modules::get_module( 'full-sync' )->continue_enqueuing(); 
  54.  
  55. $this->set_next_sync_time( time() + $this->get_enqueue_wait_time(), 'full-sync-enqueue' ); 
  56.  
  57. public function do_sync() { 
  58. return $this->do_sync_and_set_delays( $this->sync_queue ); 
  59.  
  60. public function do_sync_and_set_delays( $queue ) { 
  61. // don't sync if importing 
  62. if ( defined( 'WP_IMPORTING' ) && WP_IMPORTING ) { 
  63. return false; 
  64.  
  65. // don't sync if we are throttled 
  66. if ( $this->get_next_sync_time( $queue->id ) > microtime( true ) ) { 
  67. return false; 
  68.  
  69. $start_time = microtime( true ); 
  70.  
  71. Jetpack_Sync_Settings::set_is_syncing( true ); 
  72.  
  73. $sync_result = $this->do_sync_for_queue( $queue ); 
  74.  
  75. Jetpack_Sync_Settings::set_is_syncing( false ); 
  76.  
  77. $exceeded_sync_wait_threshold = ( microtime( true ) - $start_time ) > (double) $this->get_sync_wait_threshold(); 
  78.  
  79. if ( is_wp_error( $sync_result ) ) { 
  80. if ( 'unclosed_buffer' === $sync_result->get_error_code() ) { 
  81. $this->set_next_sync_time( time() + self::QUEUE_LOCKED_SYNC_DELAY, $queue->id ); 
  82. } else { 
  83. $this->set_next_sync_time( time() + self::WPCOM_ERROR_SYNC_DELAY, $queue->id ); 
  84. $sync_result = false; 
  85. } elseif ( $exceeded_sync_wait_threshold ) { 
  86. // if we actually sent data and it took a while, wait before sending again 
  87. $this->set_next_sync_time( time() + $this->get_sync_wait_time(), $queue->id ); 
  88.  
  89. return $sync_result; 
  90.  
  91. public function get_items_to_send( $buffer, $encode = true ) { 
  92. // track how long we've been processing so we can avoid request timeouts 
  93. $start_time = microtime( true ); 
  94. $upload_size = 0; 
  95. $items_to_send = array(); 
  96. $items = $buffer->get_items(); 
  97. // set up current screen to avoid errors rendering content 
  98. require_once( ABSPATH . 'wp-admin/includes/class-wp-screen.php' ); 
  99. require_once( ABSPATH . 'wp-admin/includes/screen.php' ); 
  100. set_current_screen( 'sync' ); 
  101. $skipped_items_ids = array(); 
  102. // we estimate the total encoded size as we go by encoding each item individually 
  103. // this is expensive, but the only way to really know :/ 
  104. foreach ( $items as $key => $item ) { 
  105. // Suspending cache addition help prevent overloading in memory cache of large sites. 
  106. wp_suspend_cache_addition( true ); 
  107. /** 
  108. * Modify the data within an action before it is serialized and sent to the server 
  109. * For example, during full sync this expands Post ID's into full Post objects,  
  110. * so that we don't have to serialize the whole object into the queue. 
  111. * @since 4.2.0 
  112. * @param array The action parameters 
  113. * @param int The ID of the user who triggered the action 
  114. */ 
  115. $item[1] = apply_filters( 'jetpack_sync_before_send_' . $item[0], $item[1], $item[2] ); 
  116. wp_suspend_cache_addition( false ); 
  117. if ( $item[1] === false ) { 
  118. $skipped_items_ids[] = $key; 
  119. continue; 
  120. $encoded_item = $encode ? $this->codec->encode( $item ) : $item; 
  121. $upload_size += strlen( $encoded_item ); 
  122. if ( $upload_size > $this->upload_max_bytes && count( $items_to_send ) > 0 ) { 
  123. break; 
  124. $items_to_send[ $key ] = $encoded_item; 
  125. if ( microtime(true) - $start_time > $this->max_dequeue_time ) { 
  126. break; 
  127.  
  128. return array( $items_to_send, $skipped_items_ids, $items, microtime( true ) - $start_time ); 
  129.  
  130. public function do_sync_for_queue( $queue ) { 
  131.  
  132. do_action( 'jetpack_sync_before_send_queue_' . $queue->id ); 
  133. if ( $queue->size() === 0 ) { 
  134. return false; 
  135. // now that we're sure we are about to sync, try to 
  136. // ignore user abort so we can avoid getting into a 
  137. // bad state 
  138. if ( function_exists( 'ignore_user_abort' ) ) { 
  139. ignore_user_abort( true ); 
  140.  
  141. $checkout_start_time = microtime( true ); 
  142.  
  143. $buffer = $queue->checkout_with_memory_limit( $this->dequeue_max_bytes, $this->upload_max_rows ); 
  144.  
  145. if ( ! $buffer ) { 
  146. // buffer has no items 
  147. return false; 
  148.  
  149. if ( is_wp_error( $buffer ) ) { 
  150. return $buffer; 
  151.  
  152. $checkout_duration = microtime( true ) - $checkout_start_time; 
  153.  
  154. list( $items_to_send, $skipped_items_ids, $items, $preprocess_duration ) = $this->get_items_to_send( $buffer, true ); 
  155.  
  156. /** 
  157. * Fires when data is ready to send to the server. 
  158. * Return false or WP_Error to abort the sync (e.g. if there's an error) 
  159. * The items will be automatically re-sent later 
  160. * @since 4.2.0 
  161. * @param array $data The action buffer 
  162. * @param string $codec The codec name used to encode the data 
  163. * @param double $time The current time 
  164. * @param string $queue The queue used to send ('sync' or 'full_sync') 
  165. */ 
  166. Jetpack_Sync_Settings::set_is_sending( true ); 
  167. $processed_item_ids = apply_filters( 'jetpack_sync_send_data', $items_to_send, $this->codec->name(), microtime( true ), $queue->id, $checkout_duration, $preprocess_duration ); 
  168. Jetpack_Sync_Settings::set_is_sending( false ); 
  169.  
  170. if ( ! $processed_item_ids || is_wp_error( $processed_item_ids ) ) { 
  171. $checked_in_item_ids = $queue->checkin( $buffer ); 
  172. if ( is_wp_error( $checked_in_item_ids ) ) { 
  173. error_log( 'Error checking in buffer: ' . $checked_in_item_ids->get_error_message() ); 
  174. $queue->force_checkin(); 
  175. if ( is_wp_error( $processed_item_ids ) ) { 
  176. return $processed_item_ids; 
  177. // returning a WP_Error is a sign to the caller that we should wait a while 
  178. // before syncing again 
  179. return new WP_Error( 'server_error' ); 
  180. } else { 
  181. // detect if the last item ID was an error 
  182. $had_wp_error = is_wp_error( end( $processed_item_ids ) ); 
  183. if ( $had_wp_error ) { 
  184. $wp_error = array_pop( $processed_item_ids ); 
  185. // also checkin any items that were skipped 
  186. if ( count( $skipped_items_ids ) > 0 ) { 
  187. $processed_item_ids = array_merge( $processed_item_ids, $skipped_items_ids ); 
  188. $processed_items = array_intersect_key( $items, array_flip( $processed_item_ids ) ); 
  189. /** 
  190. * Allows us to keep track of all the actions that have been sent. 
  191. * Allows us to calculate the progress of specific actions. 
  192. * @since 4.2.0 
  193. * @param array $processed_actions The actions that we send successfully. 
  194. */ 
  195. do_action( 'jetpack_sync_processed_actions', $processed_items ); 
  196. $queue->close( $buffer, $processed_item_ids ); 
  197. // returning a WP_Error is a sign to the caller that we should wait a while 
  198. // before syncing again 
  199. if ( $had_wp_error ) { 
  200. return $wp_error; 
  201. return true; 
  202.  
  203. function get_sync_queue() { 
  204. return $this->sync_queue; 
  205.  
  206. function get_full_sync_queue() { 
  207. return $this->full_sync_queue; 
  208.  
  209. function get_codec() { 
  210. return $this->codec; 
  211.  
  212. function send_checksum() { 
  213. require_once 'class.jetpack-sync-wp-replicastore.php'; 
  214. $store = new Jetpack_Sync_WP_Replicastore(); 
  215. do_action( 'jetpack_sync_checksum', $store->checksum_all() ); 
  216.  
  217. function reset_sync_queue() { 
  218. $this->sync_queue->reset(); 
  219.  
  220. function reset_full_sync_queue() { 
  221. $this->full_sync_queue->reset(); 
  222.  
  223. function set_dequeue_max_bytes( $size ) { 
  224. $this->dequeue_max_bytes = $size; 
  225.  
  226. // in bytes 
  227. function set_upload_max_bytes( $max_bytes ) { 
  228. $this->upload_max_bytes = $max_bytes; 
  229.  
  230. // in rows 
  231. function set_upload_max_rows( $max_rows ) { 
  232. $this->upload_max_rows = $max_rows; 
  233.  
  234. // in seconds 
  235. function set_sync_wait_time( $seconds ) { 
  236. $this->sync_wait_time = $seconds; 
  237.  
  238. function get_sync_wait_time() { 
  239. return $this->sync_wait_time; 
  240.  
  241. function set_enqueue_wait_time( $seconds ) { 
  242. $this->enqueue_wait_time = $seconds; 
  243.  
  244. function get_enqueue_wait_time() { 
  245. return $this->enqueue_wait_time; 
  246.  
  247. // in seconds 
  248. function set_sync_wait_threshold( $seconds ) { 
  249. $this->sync_wait_threshold = $seconds; 
  250.  
  251. function get_sync_wait_threshold() { 
  252. return $this->sync_wait_threshold; 
  253.  
  254. // in seconds 
  255. function set_max_dequeue_time( $seconds ) { 
  256. $this->max_dequeue_time = $seconds; 
  257.  
  258. function set_defaults() { 
  259. $this->sync_queue = new Jetpack_Sync_Queue( 'sync' ); 
  260. $this->full_sync_queue = new Jetpack_Sync_Queue( 'full_sync' ); 
  261. $this->codec = new Jetpack_Sync_JSON_Deflate_Array_Codec(); 
  262.  
  263. // saved settings 
  264. Jetpack_Sync_Settings::set_importing( null ); 
  265. $settings = Jetpack_Sync_Settings::get_settings(); 
  266. $this->set_dequeue_max_bytes( $settings['dequeue_max_bytes'] ); 
  267. $this->set_upload_max_bytes( $settings['upload_max_bytes'] ); 
  268. $this->set_upload_max_rows( $settings['upload_max_rows'] ); 
  269. $this->set_sync_wait_time( $settings['sync_wait_time'] ); 
  270. $this->set_enqueue_wait_time( $settings['enqueue_wait_time'] ); 
  271. $this->set_sync_wait_threshold( $settings['sync_wait_threshold'] ); 
  272. $this->set_max_dequeue_time( Jetpack_Sync_Defaults::get_max_sync_execution_time() ); 
  273.  
  274. function reset_data() { 
  275. $this->reset_sync_queue(); 
  276. $this->reset_full_sync_queue(); 
  277.  
  278. foreach ( Jetpack_Sync_Modules::get_modules() as $module ) { 
  279. $module->reset_data(); 
  280.  
  281. foreach ( array( 'sync', 'full_sync' ) as $queue_name ) { 
  282. delete_option( self::NEXT_SYNC_TIME_OPTION_NAME . '_' . $queue_name ); 
  283.  
  284. Jetpack_Sync_Settings::reset_data(); 
  285.  
  286. function uninstall() { 
  287. // Lets delete all the other fun stuff like transient and option and the sync queue 
  288. $this->reset_data(); 
  289.  
  290. // delete the full sync status 
  291. delete_option( 'jetpack_full_sync_status' ); 
  292.  
  293. // clear the sync cron. 
  294. wp_clear_scheduled_hook( 'jetpack_sync_cron' ); 
  295. wp_clear_scheduled_hook( 'jetpack_sync_full_cron' );