preserve time order * and minimise the possibility of collisions between multiple processes working * at the same time. * * @return string */ protected function generate_option_name_timestamp() { return sprintf( '%.6f', microtime( true ) ); } /** * Gets the checkout ID. * * @return bool|string */ private function get_checkout_id() { global $wpdb; $checkout_value = $wpdb->get_var( $wpdb->prepare( "SELECT option_value FROM $wpdb->options WHERE option_name = %s", $this->get_lock_option_name() ) ); if ( $checkout_value ) { list( $checkout_id, $timestamp ) = explode( ':', $checkout_value ); if ( (int) $timestamp > time() ) { return $checkout_id; } } return false; } /** * Sets the checkout id. * * @param string $checkout_id The ID of the checkout. * * @return bool|int */ private function set_checkout_id( $checkout_id ) { global $wpdb; $expires = time() + Defaults::$default_sync_queue_lock_timeout; $updated_num = $wpdb->query( $wpdb->prepare( "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", "$checkout_id:$expires", $this->get_lock_option_name() ) ); if ( ! $updated_num ) { $updated_num = $wpdb->query( $wpdb->prepare( "INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )", $this->get_lock_option_name(), "$checkout_id:$expires" ) ); } return $updated_num; } /** * Deletes the checkout ID. * * @return bool|int */ private function delete_checkout_id() { global $wpdb; // Rather than delete, which causes fragmentation, we update in place. return $wpdb->query( $wpdb->prepare( "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s", '0:0', $this->get_lock_option_name() ) ); } /** * Return the lock option name. * * @return string */ private function get_lock_option_name() { return "jpsq_{$this->id}_checkout"; } /** * Return the next data row option name. * * @return string */ private function get_next_data_row_option_name() { $timestamp = $this->generate_option_name_timestamp(); // Row iterator is used to avoid collisions where we're writing data waaay fast in a single process. if ( PHP_INT_MAX === $this->row_iterator ) { $this->row_iterator = 0; } else { $this->row_iterator += 1; } return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator; } /** * Return the items in the queue. * * @param null|int $limit Limit to the number of items we fetch at once. * * @return array|object|null */ private function fetch_items( $limit = null ) { global $wpdb; if ( $limit ) { $items = $wpdb->get_results( $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit ), OBJECT ); } else { $items = $wpdb->get_results( $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" ), OBJECT ); } return $this->unserialize_values( $items ); } /** * Return items with specific ids. * * @param array $items_ids Array of event ids. * * @return array|object|null */ private function fetch_items_by_id( $items_ids ) { global $wpdb; // return early if $items_ids is empty or not an array. if ( empty( $items_ids ) || ! is_array( $items_ids ) ) { return null; } $ids_placeholders = implode( ', ', array_fill( 0, count( $items_ids ), '%s' ) ); $query_with_placeholders = "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name IN ( $ids_placeholders )"; $items = $wpdb->get_results( $wpdb->prepare( $query_with_placeholders, // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared $items_ids ), OBJECT ); return $this->unserialize_values( $items ); } /** * Unserialize item values. * * @param array $items Events from the Queue to be serialized. * * @return mixed */ private function unserialize_values( $items ) { array_walk( $items, function ( $item ) { $item->value = maybe_unserialize( $item->value ); } ); return $items; } /** * Return true if the buffer is still valid or an Error other wise. * * @param Automattic\Jetpack\Sync\Queue_Buffer $buffer The Queue_Buffer. * * @return bool|\WP_Error */ private function validate_checkout( $buffer ) { if ( ! $buffer instanceof Queue_Buffer ) { return new \WP_Error( 'not_a_buffer', 'You must checkin an instance of Automattic\\Jetpack\\Sync\\Queue_Buffer' ); } $checkout_id = $this->get_checkout_id(); if ( ! $checkout_id ) { return new \WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' ); } // TODO: change to strict comparison. if ( $checkout_id != $buffer->id ) { // phpcs:ignore WordPress.PHP.StrictComparisons.LooseComparison return new \WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' ); } return true; } }