@Override publicvoidrun(SourceContext<T> ctx)throws Exception { if (ownedTopicStarts.isEmpty()) { ctx.markAsTemporarilyIdle(); }
log.info("Source {} creating fetcher with offsets {}", taskIndex, StringUtils.join(ownedTopicStarts.entrySet()));
// from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Pulsar through the fetcher, if configured to do so)
/** * This method is called as a notification once a distributed checkpoint has been completed. * * Note that any exception during this method will not cause the checkpoint to * fail any more. * * @param checkpointId The ID of the checkpoint that has been completed. * @throws Exception */ voidnotifyCheckpointComplete(long checkpointId)throws Exception;