diff --git a/src/sources/gcp_cloud_storage/pubsub.rs b/src/sources/gcp_cloud_storage/pubsub.rs index 83776ce3426cd..6e801d1b86186 100644 --- a/src/sources/gcp_cloud_storage/pubsub.rs +++ b/src/sources/gcp_cloud_storage/pubsub.rs @@ -100,6 +100,19 @@ pub struct PubsubConfig { #[configurable(metadata(docs::type_unit = "tasks"))] #[configurable(metadata(docs::examples = 5))] pub client_concurrency: Option, + + /// The acknowledgement deadline, in seconds, to set for pulled messages. + /// + /// After pulling a batch, Vector extends the deadline (via `modifyAckDeadline`) to this + /// value so that slow processing — including downstream sink batch latency — does not let + /// the lease expire and trigger redelivery before the message is acknowledged. This + /// decouples processing time from the subscription's static `ackDeadlineSeconds`. Clamped + /// to the Pub/Sub-allowed range of 10 to 600 seconds; total processing of a pulled batch + /// must complete within this deadline. + #[serde(default = "default_ack_deadline_secs")] + #[derivative(Default(value = "default_ack_deadline_secs()"))] + #[configurable(metadata(docs::type_unit = "seconds"))] + pub ack_deadline_secs: u32, } fn default_pubsub_endpoint() -> String { @@ -114,6 +127,16 @@ const fn default_max_messages() -> u32 { 10 } +const fn default_ack_deadline_secs() -> u32 { + // Mirror the aws_s3 source's effective visibility timeout (its floor is 600s and the + // Pub/Sub maximum is also 600s), so a pulled batch has the same lease behavior. + 600 +} + +/// Bounds for the maintained ack deadline, per the Pub/Sub API. +const MIN_ACK_DEADLINE_SECS: u32 = 10; +const MAX_ACK_DEADLINE_SECS: u32 = 600; + const fn default_true() -> bool { true } @@ -169,6 +192,12 @@ pub enum ProcessingError { #[snafu(display("Pub/Sub acknowledge returned HTTP {}: {}", status, body))] AcknowledgeMessagesHttp { status: u16, body: String }, + + #[snafu(display("Failed to modify Pub/Sub ack deadline: {}", source))] + ModifyAckDeadline { source: crate::http::HttpError }, + + #[snafu(display("Pub/Sub modifyAckDeadline returned HTTP {}: {}", status, body))] + ModifyAckDeadlineHttp { status: u16, body: String }, } pub struct State { @@ -188,6 +217,7 @@ pub struct State { client_concurrency: usize, delete_message: bool, delete_failed_message: bool, + ack_deadline_secs: u32, decoder: Decoder, } @@ -232,6 +262,9 @@ impl Ingestor { .unwrap_or_else(crate::num_threads), delete_message: config.delete_message, delete_failed_message: config.delete_failed_message, + ack_deadline_secs: config + .ack_deadline_secs + .clamp(MIN_ACK_DEADLINE_SECS, MAX_ACK_DEADLINE_SECS), decoder, }); @@ -353,6 +386,25 @@ impl IngestorProcess { return Ok(false); } + // Extend the ack deadline for the whole pulled batch up front (like the aws_s3 + // source sets its SQS visibility timeout at receive time) so that slow processing — + // including downstream sink batch latency — does not let the lease expire and trigger + // redelivery before we acknowledge. Best-effort: a failure here is non-fatal; the + // message simply retains the subscription's static deadline. + if self.state.delete_message { + let all_ack_ids: Vec = messages.iter().map(|m| m.ack_id.clone()).collect(); + if let Err(err) = self + .modify_ack_deadline(&all_ack_ids, self.state.ack_deadline_secs) + .await + { + warn!( + message = "Failed to extend Pub/Sub ack deadline; messages may be redelivered.", + %err, + count = all_ack_ids.len(), + ); + } + } + let mut ack_ids = Vec::new(); for message in &messages { let message_id = message @@ -694,6 +746,46 @@ impl IngestorProcess { Ok(pull_response.received_messages.unwrap_or_default()) } + async fn modify_ack_deadline( + &self, + ack_ids: &[String], + deadline_secs: u32, + ) -> Result<(), ProcessingError> { + let url = format!( + "{}/v1/projects/{}/subscriptions/{}:modifyAckDeadline", + self.state.pubsub_endpoint, self.state.project, self.state.subscription + ); + + let body = serde_json::json!({ + "ackIds": ack_ids, + "ackDeadlineSeconds": deadline_secs, + }); + + let mut request = http::Request::post(&url) + .header("content-type", "application/json") + .body(hyper::Body::from(serde_json::to_vec(&body).unwrap())) + .expect("building Pub/Sub modifyAckDeadline request should not fail"); + self.state.auth.apply(&mut request); + + let response = self + .state + .http_client + .send(request) + .await + .map_err(|source| ProcessingError::ModifyAckDeadline { source })?; + + if !response.status().is_success() { + let status = response.status().as_u16(); + let body_bytes = body_to_bytes(response.into_body()) + .await + .unwrap_or_default(); + let body = error_body_string(&body_bytes); + return Err(ProcessingError::ModifyAckDeadlineHttp { status, body }); + } + + Ok(()) + } + async fn acknowledge_messages(&self, ack_ids: &[String]) -> Result<(), ProcessingError> { let url = format!( "{}/v1/projects/{}/subscriptions/{}:acknowledge",