Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ void IStorageCluster::read(
{
auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
if (remote_initiator_cluster_name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster' or 'object_storage_cluster'");
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster', 'object_storage_cluster', or cluster name in arguments");

/// rewrite query to execute `remote('remote_host', s3(...))`
/// remote_host can execute query itself or make on-cluster query depends on own `object_storage_cluster` setting
Expand Down
15 changes: 14 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace Setting
extern const SettingsObjectStorageGranularityLevel cluster_table_function_split_granularity;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString object_storage_cluster;
extern const SettingsBool object_storage_remote_initiator;
extern const SettingsString object_storage_remote_initiator_cluster;
extern const SettingsInt64 delta_lake_snapshot_start_version;
extern const SettingsInt64 delta_lake_snapshot_end_version;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
Expand All @@ -46,6 +48,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_SETTING_VALUE;
extern const int BAD_ARGUMENT;
}

String StorageObjectStorageCluster::getPathSample(ContextPtr context)
Expand Down Expand Up @@ -719,9 +722,19 @@ String StorageObjectStorageCluster::getClusterName(ContextPtr context) const
QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const
{
if (!isClusterSupported())
return QueryProcessingStage::Enum::FetchColumns;

/// Full query if fall back to pure storage.
if (getClusterName(context).empty())
if (getClusterName(context).empty() // Not cluster request
&& context->getSettingsRef()[Setting::object_storage_remote_initiator_cluster].value.empty()) // Not request with remote initiator
{
if (context->getSettingsRef()[Setting::object_storage_remote_initiator])
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting 'object_storage_remote_initiator' can be used only with 'object_storage_remote_initiator_cluster', 'object_storage_cluster', or cluster name in arguments");

return QueryProcessingStage::Enum::FetchColumns;
}

/// Distributed storage.
return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ void TableFunctionObjectStorageClusterFallback<Definition, Base>::parseArguments
const auto & settings = context->getSettingsRef();

is_cluster_function = !settings[Setting::object_storage_cluster].value.empty() && typename Base::Configuration().isClusterSupported();
is_remote = settings[Setting::object_storage_remote_initiator];
// Remote initiator requires 'object_storage_cluster' or 'object_storage_remote_initiator_cluster'
is_remote = settings[Setting::object_storage_remote_initiator]
&& (!settings[Setting::object_storage_cluster].value.empty()
|| !settings[Setting::object_storage_remote_initiator_cluster].value.empty());
Comment on lines +121 to +123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve validation for missing remote initiator cluster

When object_storage_remote_initiator=1 is set on the fallback s3/object-storage table functions but both object_storage_cluster and object_storage_remote_initiator_cluster are empty, this now makes is_remote false and routes the query through BaseSimple. That bypasses the existing IStorageCluster::read validation that raises BAD_ARGUMENTS for this invalid configuration, so a misconfigured query silently runs locally instead of failing or using a remote initiator.

Useful? React with 👍 / 👎.

Comment on lines +121 to +123
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve validation for bare remote initiator requests

When object_storage_remote_initiator=1 is set on the fallback s3/object-storage table function without object_storage_cluster or object_storage_remote_initiator_cluster, this new guard leaves is_remote false, so executeImpl builds the simple storage and bypasses the validation added in StorageObjectStorageCluster/IStorageCluster. That makes an invalid remote-initiator request run locally instead of raising the documented exception, which is easy to hit when the companion cluster setting is omitted or misspelled.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object_storage_remote_initiator_cluster without cluster name or object_storage_remote_initiator_cluster call an exception before.


if (is_cluster_function)
{
Expand Down
75 changes: 75 additions & 0 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,3 +1509,78 @@ def test_object_storage_remote_initiator_without_cluster_function(started_cluste
assert users[1:] == ["s0_0_0\tdefault",
"s0_0_1\tfoo",
"s0_1_0\tfoo"]


def test_object_storage_remote_initiator_aggregation(started_cluster):
node = started_cluster.instances["s0_0_0"]

# Remove initiator without cluster request
# Check that aggregation works on nodes
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT sum(value) from s3(
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_with_dots_and_user'
""",
query_id = query_id,
)

assert result == "67802152770\n"

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
result_rows = node.query(
f"""
SELECT sum(result_rows)
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
AND is_initial_query = 0
ORDER BY ALL
FORMAT TSV
"""
).splitlines()

# Data processed on cluster 'hidden_cluster_with_username_and_password'.
# Cluster contains two nodes, each returns one row.
assert result_rows == ["2"]

# Remove initiator without cluster request
# Check that aggregation works on nodes
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT value % 2 as bit, sum(value) from s3(
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
GROUP BY bit
ORDER BY bit
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_with_dots_and_user'
""",
query_id = query_id,
)

assert result == "0\t41117771522\n1\t26684381248\n"

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
result_rows = node.query(
f"""
SELECT sum(result_rows)
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
AND is_initial_query = 0
ORDER BY ALL
FORMAT TSV
"""
).splitlines()

# Data processed on cluster 'hidden_cluster_with_username_and_password'.
# Cluster contains two nodes, each returns up to two rows, at least two rows totaly.
result_rows = int(result_rows[0])
assert result_rows >= 2 and result_rows <= 4
Loading