CEP-45: Lost witness marker race#4892
Conversation
Don't truncate journal segments until witnessed offsets they contain are flushed. Also moves MutationTrackingService startup to after the commit log is replayed
frankgh
left a comment
There was a problem hiding this comment.
I've added a couple of comments, but the patch looks good in general.
| ); | ||
| pendingClearReplaySize = Metrics.register( | ||
| factory.createMetricName("PendingClearReplaySize"), | ||
| () -> MutationJournal.instance().pendingClearReplaySize() |
There was a problem hiding this comment.
Do we want to worry about the case where MT is disabled and maybe handle the IllegalStateException thrown when the instance is null?
| // opaque / immutable list of segments that we should clear the needs-replay flag on | ||
| public static class PendingClearReplay | ||
| { | ||
| private ImmutableSet<Long> segments; |
There was a problem hiding this comment.
NIT: can we make this final?
| private ImmutableSet<Long> segments; | |
| private final mmutableSet<Long> segments; |
|
|
||
| private void shutdownBlocking() throws InterruptedException | ||
| { | ||
| ClusterMetadataService.instance().log().removeListener(tcmListener); |
There was a problem hiding this comment.
should we conditionally remove the listener here, only if the service was started?
| boolean wasStarted; | |
| synchronized (this) | |
| { | |
| wasStarted = started; | |
| if (wasStarted) | |
| ClusterMetadataService.instance().log().removeListener(tcmListener); | |
| } |
There was a problem hiding this comment.
additionally, we need to synchronize for access to the started volatile variable.
There was a problem hiding this comment.
no, it's just a noop if the tcmListener isn't registered.
| executor.awaitTermination(1, TimeUnit.MINUTES); | ||
| // attempt to persist offsets and mark segments as | ||
| // not needing replay one last time before shutdown | ||
| if (started) |
There was a problem hiding this comment.
| if (started) | |
| if (wasStarted) |
| ClusterMetadataService.instance().log().removeListener(tcmListener); | ||
| activeReconciler.shutdownBlocking(); | ||
| executor.shutdown(); | ||
| executor.awaitTermination(1, TimeUnit.MINUTES); |
There was a problem hiding this comment.
Should we log if we fail to shutdown here?
| executor.awaitTermination(1, TimeUnit.MINUTES); | |
| if (!executor.awaitTermination(1, TimeUnit.MINUTES)) | |
| { | |
| logger.warn("Mutation tracking executor did not terminate within 1 minute; forcing shutdown"); | |
| } |
| * To improve startup, we periodically save our view of mutation ids that we've witnessed to disk as part of this | ||
| * class. Any ids witnessed since the last time this class was run are reconstructed by replaying the journal. | ||
| * | ||
| * However, if an sstable is flushed is after the most recent LogStatePersister run, AND it marks a segment as no |
There was a problem hiding this comment.
NIT:
| * However, if an sstable is flushed is after the most recent LogStatePersister run, AND it marks a segment as no | |
| * However, if an sstable is flushed after the most recent LogStatePersister run, AND it marks a segment as no |
| TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); | ||
| DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(key)); | ||
| MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); | ||
| if (summary.size() == 0) |
There was a problem hiding this comment.
NIT:
| if (summary.size() == 0) | |
| if (summary.isEmpty()) |
Don't truncate journal segments until witnessed offsets they contain are flushed. Also moves MutationTrackingService startup to after the commit log is replayed
Thanks for sending a pull request! Here are some tips if you're new here:
Commit messages should follow the following format:
The Cassandra Jira