Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/sources/gcp_cloud_storage/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,19 @@ pub struct PubsubConfig {
#[configurable(metadata(docs::type_unit = "tasks"))]
#[configurable(metadata(docs::examples = 5))]
pub client_concurrency: Option<NonZeroUsize>,

/// The acknowledgement deadline, in seconds, to set for pulled messages.
///
/// After pulling a batch, Vector extends the deadline (via `modifyAckDeadline`) to this
/// value so that slow processing — including downstream sink batch latency — does not let
/// the lease expire and trigger redelivery before the message is acknowledged. This
/// decouples processing time from the subscription's static `ackDeadlineSeconds`. Clamped
/// to the Pub/Sub-allowed range of 10 to 600 seconds; total processing of a pulled batch
/// must complete within this deadline.
#[serde(default = "default_ack_deadline_secs")]
#[derivative(Default(value = "default_ack_deadline_secs()"))]
#[configurable(metadata(docs::type_unit = "seconds"))]
pub ack_deadline_secs: u32,
}

fn default_pubsub_endpoint() -> String {
Expand All @@ -114,6 +127,16 @@ const fn default_max_messages() -> u32 {
10
}

const fn default_ack_deadline_secs() -> u32 {
// Mirror the aws_s3 source's effective visibility timeout (its floor is 600s and the
// Pub/Sub maximum is also 600s), so a pulled batch has the same lease behavior.
600
}

/// Bounds for the maintained ack deadline, per the Pub/Sub API.
const MIN_ACK_DEADLINE_SECS: u32 = 10;
const MAX_ACK_DEADLINE_SECS: u32 = 600;

const fn default_true() -> bool {
true
}
Expand Down Expand Up @@ -169,6 +192,12 @@ pub enum ProcessingError {

#[snafu(display("Pub/Sub acknowledge returned HTTP {}: {}", status, body))]
AcknowledgeMessagesHttp { status: u16, body: String },

#[snafu(display("Failed to modify Pub/Sub ack deadline: {}", source))]
ModifyAckDeadline { source: crate::http::HttpError },

#[snafu(display("Pub/Sub modifyAckDeadline returned HTTP {}: {}", status, body))]
ModifyAckDeadlineHttp { status: u16, body: String },
}

pub struct State {
Expand All @@ -188,6 +217,7 @@ pub struct State {
client_concurrency: usize,
delete_message: bool,
delete_failed_message: bool,
ack_deadline_secs: u32,
decoder: Decoder,
}

Expand Down Expand Up @@ -232,6 +262,9 @@ impl Ingestor {
.unwrap_or_else(crate::num_threads),
delete_message: config.delete_message,
delete_failed_message: config.delete_failed_message,
ack_deadline_secs: config
.ack_deadline_secs
.clamp(MIN_ACK_DEADLINE_SECS, MAX_ACK_DEADLINE_SECS),
decoder,
});

Expand Down Expand Up @@ -353,6 +386,25 @@ impl IngestorProcess {
return Ok(false);
}

// Extend the ack deadline for the whole pulled batch up front (like the aws_s3
// source sets its SQS visibility timeout at receive time) so that slow processing —
// including downstream sink batch latency — does not let the lease expire and trigger
// redelivery before we acknowledge. Best-effort: a failure here is non-fatal; the
// message simply retains the subscription's static deadline.
if self.state.delete_message {
let all_ack_ids: Vec<String> = messages.iter().map(|m| m.ack_id.clone()).collect();
if let Err(err) = self
.modify_ack_deadline(&all_ack_ids, self.state.ack_deadline_secs)
.await
{
warn!(
message = "Failed to extend Pub/Sub ack deadline; messages may be redelivered.",
%err,
count = all_ack_ids.len(),
);
}
}

let mut ack_ids = Vec::new();
for message in &messages {
let message_id = message
Expand Down Expand Up @@ -694,6 +746,46 @@ impl IngestorProcess {
Ok(pull_response.received_messages.unwrap_or_default())
}

async fn modify_ack_deadline(
&self,
ack_ids: &[String],
deadline_secs: u32,
) -> Result<(), ProcessingError> {
let url = format!(
"{}/v1/projects/{}/subscriptions/{}:modifyAckDeadline",
self.state.pubsub_endpoint, self.state.project, self.state.subscription
);

let body = serde_json::json!({
"ackIds": ack_ids,
"ackDeadlineSeconds": deadline_secs,
});

let mut request = http::Request::post(&url)
.header("content-type", "application/json")
.body(hyper::Body::from(serde_json::to_vec(&body).unwrap()))
.expect("building Pub/Sub modifyAckDeadline request should not fail");
self.state.auth.apply(&mut request);

let response = self
.state
.http_client
.send(request)
.await
.map_err(|source| ProcessingError::ModifyAckDeadline { source })?;

if !response.status().is_success() {
let status = response.status().as_u16();
let body_bytes = body_to_bytes(response.into_body())
.await
.unwrap_or_default();
let body = error_body_string(&body_bytes);
return Err(ProcessingError::ModifyAckDeadlineHttp { status, body });
}

Ok(())
}

async fn acknowledge_messages(&self, ack_ids: &[String]) -> Result<(), ProcessingError> {
let url = format!(
"{}/v1/projects/{}/subscriptions/{}:acknowledge",
Expand Down
Loading