import java.util.function.Consumer; import com.azure.core.amqp.AmqpTransportType; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.ErrorContext; import com.azure.messaging.eventhubs.models.EventContext; import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.common.*; //import com.squareup.okhttp; //import com.azure.storage.blob.*; public class EventHubValidation{ public static void main(String[] args) throws Exception{ def connectionString = "**********"; def eventHubName = "***********"; def storageConnectionString = "***********"; def storageContainerName = *********** String accountName = System.getenv("***********"); String accountKey = System.getenv("***********"); String endpoint = "https://" + accountName + ".blob.core.windows.net"; String containerName = "***********"; //String blobName = "iris.csv"; // Create a SharedKeyCredential //StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey); // Create a blobServiceClient BlobServiceClient blobServiceClient = new BlobServiceClientBuilder() .endpoint(endpoint) .buildClient(); // Create a containerClient BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(containerName); BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .connectionString(storageConnectionString) .containerName(storageContainerName) .buildAsyncClient(); // Create a builder object that you will use later to build an event processor client to receive and process events and errors. EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() .connectionString(connectionString, eventHubName) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .transportType(AmqpTransportType.AMQP_WEB_SOCKETS) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); // Use the builder object to create an event processor client EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); println("Starting event processor"); eventProcessorClient.start(); println("Press enter to stop."); System.in.read(); println("Stopping event processor"); eventProcessorClient.stop(); println("Event processor stopped."); println("Exiting process"); }; public static final Consumer PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer ERROR_HANDLER = errorContext -> { printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); }; }