diff --git a/CHANGELOG.md b/CHANGELOG.md index c7c538c..de93189 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,16 @@ Please mark backwards incompatible changes with an exclamation mark at the start ## [Unreleased] +### Deprecated +- The `#task_by_id` method of the `Elasticsearch::Client` class. + +### Added +- The `#all` method to `JayAPI::Elasticsearch::Tasks`. The method returns the + status of all running tasks on the Elasticsearch cluster. +- The `#tasks` method to `JayAPI::Elasticsearch::Client`. The method returns an + instance of `JayAPI::Elasticsearch::Tasks`, which gives the user access to the + status of the tasks running on the Elasticsearch cluster. + ## [29.5.0] - 2026-02-23 ### Fixed diff --git a/lib/jay_api/elasticsearch.rb b/lib/jay_api/elasticsearch.rb index 22588f3..ed65435 100644 --- a/lib/jay_api/elasticsearch.rb +++ b/lib/jay_api/elasticsearch.rb @@ -8,6 +8,7 @@ require_relative 'elasticsearch/index' require_relative 'elasticsearch/indexes' require_relative 'elasticsearch/indices' +require_relative 'elasticsearch/mixins' require_relative 'elasticsearch/query_builder' require_relative 'elasticsearch/query_results' require_relative 'elasticsearch/response' diff --git a/lib/jay_api/elasticsearch/client.rb b/lib/jay_api/elasticsearch/client.rb index 0a1cc18..799427e 100644 --- a/lib/jay_api/elasticsearch/client.rb +++ b/lib/jay_api/elasticsearch/client.rb @@ -1,12 +1,12 @@ # frozen_string_literal: true require 'timeout' -require 'elasticsearch/transport/transport/errors' require 'faraday/error' require 'forwardable' -require_relative '../abstract/connection' +require_relative 'mixins/retriable_requests' require_relative 'stats' +require_relative 'tasks' module JayAPI module Elasticsearch @@ -17,23 +17,7 @@ module Elasticsearch class Client extend Forwardable - # The errors that, if raised, must cause a retry of the connection. - ERRORS = [ - ::Elasticsearch::Transport::Transport::ServerError, - Faraday::TimeoutError - ].freeze - - # Subclasses of the +Elasticsearch::Transport::Transport::ServerError+ - # for which a retry doesn't make sense. - NON_RETRIABLE_ERRORS = [ - ::Elasticsearch::Transport::Transport::Errors::BadRequest, - ::Elasticsearch::Transport::Transport::Errors::Unauthorized, - ::Elasticsearch::Transport::Transport::Errors::Forbidden, - ::Elasticsearch::Transport::Transport::Errors::NotFound, - ::Elasticsearch::Transport::Transport::Errors::MethodNotAllowed, - ::Elasticsearch::Transport::Transport::Errors::RequestEntityTooLarge, - ::Elasticsearch::Transport::Transport::Errors::NotImplemented - ].freeze + include JayAPI::Elasticsearch::Mixins::RetriableRequests attr_reader :transport_client, :logger, :max_attempts, :wait_strategy @@ -88,6 +72,7 @@ def delete_by_query(**args) # parameters. If the request fails, additional retries will be performed. # @see Elasticsearch::Client#tasks for more info about the arguments and # the return value. + # @deprecated Use Tasks#by_id instead. def task_by_id(**args) retry_request { transport_client.tasks.get(**args) } end @@ -98,13 +83,11 @@ def stats @stats ||= ::JayAPI::Elasticsearch::Stats.new(transport_client) end - private - - # @param [Proc] block The block to execute. - # @yieldreturn [Object] Whatever the block returns - def retry_request(&block) - Abstract::Connection.new(max_attempts: max_attempts, wait_strategy: wait_strategy.dup, logger: logger) - .retry(errors: ERRORS, except: NON_RETRIABLE_ERRORS, &block) + # @return [JayAPI::Elasticsearch::Tasks] An instance of the +Tasks+ class, + # which can be used to retrieve the status of the tasks running on the + # Elasticsearch cluster. + def tasks + @tasks ||= ::JayAPI::Elasticsearch::Tasks.new(client: self) end end end diff --git a/lib/jay_api/elasticsearch/mixins.rb b/lib/jay_api/elasticsearch/mixins.rb new file mode 100644 index 0000000..5169bf8 --- /dev/null +++ b/lib/jay_api/elasticsearch/mixins.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +require_relative 'mixins/retriable_requests' + +module JayAPI + module Elasticsearch + # A namespace for Elasticsearch related mixins. + module Mixins; end + end +end diff --git a/lib/jay_api/elasticsearch/mixins/retriable_requests.rb b/lib/jay_api/elasticsearch/mixins/retriable_requests.rb new file mode 100644 index 0000000..3a2b99f --- /dev/null +++ b/lib/jay_api/elasticsearch/mixins/retriable_requests.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require 'elasticsearch/transport/transport/errors' + +require_relative '../../abstract/connection' + +module JayAPI + module Elasticsearch + module Mixins + # A mixin that allows the including class to retry requests to + # Elasticsearch by leveraging the +Abstract::Connection+ class' + # capabilities. + module RetriableRequests + # The errors that, if raised, must cause a retry of the connection. + RETRIABLE_ERRORS = [ + ::Elasticsearch::Transport::Transport::ServerError, + Faraday::TimeoutError + ].freeze + + # Subclasses of the +Elasticsearch::Transport::Transport::ServerError+ + # for which a retry doesn't make sense. + NON_RETRIABLE_ERRORS = [ + ::Elasticsearch::Transport::Transport::Errors::BadRequest, + ::Elasticsearch::Transport::Transport::Errors::Unauthorized, + ::Elasticsearch::Transport::Transport::Errors::Forbidden, + ::Elasticsearch::Transport::Transport::Errors::NotFound, + ::Elasticsearch::Transport::Transport::Errors::MethodNotAllowed, + ::Elasticsearch::Transport::Transport::Errors::RequestEntityTooLarge, + ::Elasticsearch::Transport::Transport::Errors::NotImplemented + ].freeze + + # @return [Integer] The maximum number of times a request should be + # retried before giving up. + def max_attempts + raise_not_implemented(__method__) + end + + # @return [JayAPI::Elasticsearch::WaitStrategy] The waiting strategy for + # retries. + def wait_strategy + raise_not_implemented(__method__) + end + + # @return [Logging::Logger] A logger to log messages. + def logger + raise_not_implemented(__method__) + end + + # @return [Array] The array of errors that, if raised, must cause + # a retry of the request. + def retriable_errors + RETRIABLE_ERRORS + end + + # @return [Array] An array of subclasses of the + # +Elasticsearch::Transport::Transport::ServerError+ for which a retry + # doesn't make sense. + def non_retriable_errors + NON_RETRIABLE_ERRORS + end + + private + + # Uses the +Abstract::Connection+ class to retry the request enclosed in + # the given block. + def retry_request(&) + Abstract::Connection.new(max_attempts:, wait_strategy: wait_strategy.dup, logger:) + .retry(errors: retriable_errors, except: non_retriable_errors, &) + end + + # @raise [NotImplementedError] Is always raised with the appropriate + # error message. + def raise_not_implemented(method) + raise NotImplementedError, "Please implement the method ##{method} in #{self.class}" + end + end + end + end +end diff --git a/lib/jay_api/elasticsearch/tasks.rb b/lib/jay_api/elasticsearch/tasks.rb index 8f1d84c..4032c3f 100644 --- a/lib/jay_api/elasticsearch/tasks.rb +++ b/lib/jay_api/elasticsearch/tasks.rb @@ -1,22 +1,52 @@ # frozen_string_literal: true require 'active_support' +require 'active_support/core_ext/enumerable' require 'active_support/core_ext/hash/indifferent_access' +require 'forwardable' + +require_relative 'mixins/retriable_requests' module JayAPI module Elasticsearch # Represents Elasticsearch tasks. Returns information about the tasks # currently executing in the cluster. - # TODO: Add #all [JAY-593] class Tasks + extend Forwardable + include ::JayAPI::Elasticsearch::Mixins::RetriableRequests + attr_reader :client + def_delegators :client, :transport_client, :max_attempts, :wait_strategy, :logger + # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client # object def initialize(client:) @client = client end + # Gets the list of tasks running on the Elasticsearch cluster. + # For more information about this endpoint and the parameters please see: + # https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-tasks-list + # @param [Array] actions A list of actions. Only tasks matching + # these actions will be returned, if no task matches the result will be + # empty. + # @param [Boolean] detailed Whether or not the result should include task + # details or not. + # @return [Hash] A hash with the list of tasks running on the + # Elasticsearch cluster. + def all(actions: nil, detailed: false) + # Needed because unlike many Elasticsearch methods Tasks#list doesn't + # call #listify over +actions+. + actions = actions&.then do |value| + value.is_a?(Array) ? value.join(',') : value + end + + retry_request do + tasks_client.list({ actions:, detailed: }.compact_blank) + end + end + # Retrieves info about the task with the passed +task_id+ # For more information on how to build the query please refer to the # Elasticsearch DSL documentation: @@ -29,7 +59,17 @@ def initialize(client:) # @raise [Elasticsearch::Transport::Transport::ServerError] If the # query fails. def by_id(task_id) - client.task_by_id(task_id: task_id, wait_for_completion: true).deep_symbolize_keys + retry_request do + tasks_client.get(task_id:, wait_for_completion: true).deep_symbolize_keys + end + end + + private + + # @return [Elasticsearch::API::Tasks::TasksClient] The client used to + # access tasks-related information. + def tasks_client + @tasks_client ||= transport_client.tasks end end end diff --git a/spec/integration/jay_api/elasticsearch/client_spec.rb b/spec/integration/jay_api/elasticsearch/client_spec.rb index 201ed1a..0158ac1 100644 --- a/spec/integration/jay_api/elasticsearch/client_spec.rb +++ b/spec/integration/jay_api/elasticsearch/client_spec.rb @@ -318,4 +318,27 @@ expect(method_call).to be(stats) end end + + describe '#tasks' do + subject(:method_call) { client.tasks } + + let(:tasks) do + instance_double( + JayAPI::Elasticsearch::Tasks + ) + end + + before do + allow(JayAPI::Elasticsearch::Tasks).to receive(:new).and_return(tasks) + end + + it "initializes an instance of JayAPI::Elasticsearch::Tasks and passes a reference to 'self'" do + expect(JayAPI::Elasticsearch::Tasks).to receive(:new).with(client:) + method_call + end + + it 'returns the JayAPI::Elasticsearch::Tasks instance' do + expect(method_call).to be(tasks) + end + end end diff --git a/spec/integration/jay_api/elasticsearch/tasks_spec.rb b/spec/integration/jay_api/elasticsearch/tasks_spec.rb new file mode 100644 index 0000000..a2b4abd --- /dev/null +++ b/spec/integration/jay_api/elasticsearch/tasks_spec.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +require 'jay_api/elasticsearch/tasks' + +RSpec.describe JayAPI::Elasticsearch::Tasks do + subject(:tasks) { described_class.new(client:) } + + let(:transport_client) do + instance_double( + Elasticsearch::Transport::Client + ) + end + + let(:wait_strategy) do + JayAPI::Abstract::ConstantWait.new(wait_interval: 2) + end + + let(:client) do + JayAPI::Elasticsearch::Client.new(transport_client, max_attempts: 10, wait_strategy:) + end + + describe '#by_id' do + subject(:method_call) { tasks.by_id('S13zyUneSa2Brl5XRNoD7Q:170244912') } + + let(:response) do + <<~JSON + { + "completed" : false, + "task" : { + "node" : "S13zyUneSa2Brl5XRNoD7Q", + "id" : 170244912, + "type" : "transport", + "action" : "indices:data/write/delete/byquery", + "status" : { + "total" : 183950, + "updated" : 0, + "created" : 0, + "deleted" : 42000, + "batches" : 43, + "version_conflicts" : 0, + "noops" : 0, + "retries" : { + "bulk" : 0, + "search" : 0 + }, + "throttled_millis" : 0, + "requests_per_second" : -1.0, + "throttled_until_millis" : 0 + }, + "description" : "delete-by-query [xyz01_integration_tests]", + "start_time_in_millis" : 1773061157278, + "running_time_in_nanos" : 8385740150, + "cancellable" : true, + "cancelled" : false, + "headers" : { }, + "resource_stats" : { + "total" : { + "cpu_time_in_nanos" : 0, + "memory_in_bytes" : 0 + } + } + } + } + JSON + end + + let(:expected_hash) do + { + completed: false, + task: { + node: 'S13zyUneSa2Brl5XRNoD7Q', + id: 170_244_912, + type: 'transport', + action: 'indices:data/write/delete/byquery', + status: { + total: 183_950, + updated: 0, + created: 0, + deleted: 42_000, + batches: 43, + version_conflicts: 0, + noops: 0, + retries: { bulk: 0, search: 0 }, + throttled_millis: 0, + requests_per_second: -1.0, + throttled_until_millis: 0 + }, + description: 'delete-by-query [xyz01_integration_tests]', + start_time_in_millis: 1_773_061_157_278, + running_time_in_nanos: 8_385_740_150, + cancellable: true, + cancelled: false, + headers: {}, + resource_stats: { + total: { cpu_time_in_nanos: 0, memory_in_bytes: 0 } + } + } + } + end + + let(:tasks_client) do + instance_double( + Elasticsearch::API::Tasks::TasksClient, + get: JSON.parse(response) + ) + end + + before do + allow(transport_client).to receive(:tasks).and_return(tasks_client) + end + + it 'returns the expected hash' do + expect(method_call).to eq(expected_hash) + end + end +end diff --git a/spec/jay_api/elasticsearch/mixins/retriable_requests_spec.rb b/spec/jay_api/elasticsearch/mixins/retriable_requests_spec.rb new file mode 100644 index 0000000..bacc274 --- /dev/null +++ b/spec/jay_api/elasticsearch/mixins/retriable_requests_spec.rb @@ -0,0 +1,83 @@ +# frozen_string_literal: true + +require 'jay_api/elasticsearch/mixins/retriable_requests' + +RSpec.describe JayAPI::Elasticsearch::Mixins::RetriableRequests do + subject(:test_instance) do + test_class.new + end + + let(:test_class) do + Class.new do + include JayAPI::Elasticsearch::Mixins::RetriableRequests + end + end + + describe '#max_attempts' do + subject(:method_call) { test_instance.max_attempts } + + it 'raises a NotImplementedError' do + expect { method_call }.to raise_error( + NotImplementedError, + include('Please implement the method #max_attempts in # + { 'S13zyUneSa2Brl5XRNoD7Q' => + { 'name' => 'c8f5b1ae733a17bf05b35c66032e72e7', + 'roles' => %w[data ingest master remote_cluster_client], + 'tasks' => + { 'S13zyUneSa2Brl5XRNoD7Q:170466185' => + { 'node' => 'S13zyUneSa2Brl5XRNoD7Q', + 'id' => 170_466_185, + 'type' => 'direct', + 'action' => 'cluster:monitor/tasks/lists[n]', + 'start_time_in_millis' => 1_773_072_735_568, + 'running_time_in_nanos' => 13_000_011, + 'cancellable' => false, + 'cancelled' => false, + 'parent_task_id' => 'S13zyUneSa2Brl5XRNoD7Q:170466184', + 'headers' => {} }, + 'S13zyUneSa2Brl5XRNoD7Q:170466184' => + { 'node' => 'S13zyUneSa2Brl5XRNoD7Q', + 'id' => 170_466_184, + 'type' => 'transport', + 'action' => 'cluster:monitor/tasks/lists', + 'start_time_in_millis' => 1_773_072_735_557, + 'running_time_in_nanos' => 29_102_229, + 'cancellable' => false, + 'cancelled' => false, + 'headers' => {} } } }, + '2MqUhOT_Sdi6ZJ8P04aUtg' => + { 'name' => '224b01c103d31d5f520636719d930944', + 'roles' => %w[data ingest master remote_cluster_client], + 'tasks' => + { '2MqUhOT_Sdi6ZJ8P04aUtg:80324869' => + { 'node' => '2MqUhOT_Sdi6ZJ8P04aUtg', + 'id' => 80_324_869, + 'type' => 'transport', + 'action' => 'cluster:monitor/tasks/lists[n]', + 'start_time_in_millis' => 1_773_072_735_582, + 'running_time_in_nanos' => 23_474_179, + 'cancellable' => false, + 'cancelled' => false, + 'parent_task_id' => 'S13zyUneSa2Brl5XRNoD7Q:170466184', + 'headers' => {} } } } } } + end + + shared_examples_for '#all' do + it 'directly returns the response' do + expect(method_call).to be(transport_response) + end + end + + shared_examples_for '#all when no parameters are given' do + it_behaves_like '#tasks_client' + + it 'forwards the call to the Elasticsearch client, with the expected parameter' do + expect(tasks_client).to receive(:list).with({}) + method_call + end + + it_behaves_like '#all' + end + + context 'when no parameters are given' do + let(:method_params) { {} } + + it_behaves_like '#all when no parameters are given' + end + + context 'when actions are provided as a single string' do + let(:method_params) { { actions: '*forcemerge' } } + + it_behaves_like '#tasks_client' + + it 'forwards the call to the Elasticsearch client, with the expected parameters' do + expect(tasks_client).to receive(:list).with({ actions: '*forcemerge' }) + method_call + end + + it_behaves_like '#all' + end + + context 'when actions are provided as an array of strings' do + let(:method_params) { { actions: %w[*forcemerge *byquery] } } + + it_behaves_like '#tasks_client' + + it 'forwards the call to the Elasticsearch client, with the expected parameters' do + expect(tasks_client).to receive(:list).with({ actions: '*forcemerge,*byquery' }) + method_call + end + + it_behaves_like '#all' + end + + context 'when no tasks match the given actions' do + let(:method_params) { { actions: %w[*ingest] } } + + let(:transport_response) do + { 'nodes' => {} } + end + + it_behaves_like '#tasks_client' + + it 'forwards the call to the Elasticsearch client, with the expected parameters' do + expect(tasks_client).to receive(:list).with({ actions: '*ingest' }) + method_call + end + + it_behaves_like '#all' + end + + context "when 'detailed' is given as false" do + let(:method_params) { { detailed: false } } + + it_behaves_like '#all when no parameters are given' + end + + context "when 'detailed' is given as true" do + let(:method_params) { { detailed: true } } + + it_behaves_like '#tasks_client' + + it 'forwards the call to the Elasticsearch client, with the expected parameters' do + expect(tasks_client).to receive(:list).with({ detailed: true }) + method_call + end + + it_behaves_like '#all' + end + + context "when both 'actions' and 'detailed' are given" do + let(:method_params) { { actions: '*forcemerge', detailed: true } } + + it_behaves_like '#tasks_client' + + it 'forwards the call to the Elasticsearch client, with the expected parameters' do + expect(tasks_client).to receive(:list).with({ actions: '*forcemerge', detailed: true }) + method_call + end + + it_behaves_like '#all' + end + end + describe '#by_id' do subject(:method_call) { tasks.by_id(task_id) } let(:task_id) { 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' } - let(:successful_response) do + let(:transport_response) do { 'completed' => true, 'task' => { @@ -76,12 +251,10 @@ } end - before do - allow(client).to receive(:task_by_id).and_return(successful_response) - end + it_behaves_like '#tasks_client' - it 'relays the command to the Elasticsearch client' do - expect(client).to receive(:task_by_id).with(task_id: task_id, wait_for_completion: true) + it 'uses the TasksClient to fetch the task with the given ID' do + expect(tasks_client).to receive(:get).with(task_id:, wait_for_completion: true).ordered method_call end @@ -160,7 +333,7 @@ end before do - allow(client).to receive(:task_by_id).and_raise(*error) + allow(tasks_client).to receive(:get).and_raise(*error) end it 're-raises the error' do