diff --git a/README.md b/README.md index e4fa1fa..f86abf4 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,35 @@ -# Learning - -This project is a companion repository to the [Apache Kafka Connect course on Udemy](https://links.datacumulus.com/kafka-connect-coupon). - -https://links.datacumulus.com/kafka-connect-coupon - # Kafka Connect Source GitHub -This connector allows you to get a stream of issues and pull requests from your GitHub repository, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository +This connector allows you to get a stream of issues and pull requests from GitHub repositories, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository Issues are pulled based on `updated_at` field, meaning any update to an issue or pull request will appear in the stream. The connector writes to topic that is great candidate to demonstrate *log compaction*. It's also a fun way to automate your GitHub workflow. -It's finally aimed to be an educative example to demonstrate how to write a Source Connector a little less trivial than the `FileStreamSourceConnector` provided in Kafka. - -# Contributing - -This connector is not perfect and can be improved, please feel free to submit any PR you deem useful. - # Configuration ``` name=GitHubSourceConnectorDemo -tasks.max=1 +tasks.max=2 connector.class=com.simplesteph.kafka.GitHubSourceConnector -topic=github-issues -github.owner=kubernetes -github.repo=kubernetes since.timestamp=2017-01-01T00:00:00Z -# I heavily recommend you set those two fields: -auth.username=your_username -auth.password=your_password +#Pattern to be followed for mentioning repositories owner/repo:topic +github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka +# I heavily recommend you set auth.accesstoken field: +#auth.username= +#auth.password= +auth.accesstoken=your_accestoken ``` +Note: Configuration for **github.repos** should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 .... + +You can control the number of tasks to run by using **tasks.max**. This allows work to be divided among tasks i.e., each task will be assigned few repositories and +will fetch issues for those repositories. + +Set **since.timestamp** to fetch the issues of repositories which have been updated after the required timestamp. + +Use either **auth.username** and **auth.password** or only **auth.accesstoken**. Using **auth.accesstoken** is preferable +because authentication with *username* and *password* has been deprecated and will soon be not supported by Github APIs. +For generating the *personal accesstoken* follow the steps in [https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line) # Running in development @@ -49,4 +47,14 @@ The simplest way to run `run.sh` is to have docker installed. It will pull a Doc Note: Java 8 is required for this connector. -TODO +#### Distributed Mode + +Build the project using `./build.sh`. + +Paste the folder `target/kafka-connnect-github-source-1.1-package /share/java/kafka-connect-github-source`(this folder has all jars of the project) +in connect-workers' `plugin.path`(can be found in the connect-workers' properties) +directory. The connect-worker should be able to detect `GitHubSourceConnector`. + +# Contributing + +This connector can be improved much, please feel free to submit any PR you deem useful. \ No newline at end of file diff --git a/config/GitHubSourceConnectorExample.properties b/config/GitHubSourceConnectorExample.properties index 6dec2c3..faf9831 100644 --- a/config/GitHubSourceConnectorExample.properties +++ b/config/GitHubSourceConnectorExample.properties @@ -1,10 +1,10 @@ name=GitHubSourceConnectorDemo -tasks.max=1 +tasks.max=2 connector.class=com.simplesteph.kafka.GitHubSourceConnector -topic=github-issues -github.owner=kubernetes -github.repo=kubernetes since.timestamp=2017-01-01T00:00:00Z -# I heavily recommend you set those two fields: -# auth.username=your_username -# auth.password=your_password \ No newline at end of file +#Pattern to be followed for mentioning repositories owner/repo:topic +github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka +# I heavily recommend you set auth.accesstoken field: +#auth.username= +#auth.password= +auth.accesstoken=your_accestoken \ No newline at end of file diff --git a/pom.xml b/pom.xml index 1cff8da..999b8db 100644 --- a/pom.xml +++ b/pom.xml @@ -33,11 +33,16 @@ slf4j-log4j12 1.7.25 - - com.mashape.unirest - unirest-java - 1.4.9 + org.json + json + 20190722 + + + + org.apache.httpcomponents + httpclient + 4.5.10 diff --git a/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java b/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java index 3e17a09..cd4113a 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java +++ b/src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java @@ -1,25 +1,33 @@ package com.simplesteph.kafka; -import com.mashape.unirest.http.Headers; -import com.mashape.unirest.http.HttpResponse; -import com.mashape.unirest.http.JsonNode; -import com.mashape.unirest.http.Unirest; -import com.mashape.unirest.http.exceptions.UnirestException; -import com.mashape.unirest.request.GetRequest; +import com.simplesteph.kafka.utils.SetBasicAuthUtil; +import com.simplesteph.kafka.utils.SetBearerAuthUtil; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.util.EntityUtils; import org.apache.kafka.connect.errors.ConnectException; import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.HashMap; // GitHubHttpAPIClient used to launch HTTP Get requests -public class GitHubAPIHttpClient { +public class GitHubAPIHttpClient implements Closeable { private static final Logger log = LoggerFactory.getLogger(GitHubAPIHttpClient.class); + private HttpClientProvider httpClientProvider; + private HttpClient httpClient; // for efficient http requests private Integer XRateLimit = 9999; private Integer XRateRemaining = 9999; @@ -29,27 +37,44 @@ public class GitHubAPIHttpClient { public GitHubAPIHttpClient(GitHubSourceConnectorConfig config){ this.config = config; + this.httpClientProvider=new HttpClientProvider(); + this.httpClient=this.httpClientProvider.getHttpClient(); } - protected JSONArray getNextIssues(Integer page, Instant since) throws InterruptedException { + protected JSONArray getNextIssues(RepositoryVariables repoVar) throws InterruptedException { - HttpResponse jsonResponse; + HttpResponse httpResponse; try { - jsonResponse = getNextIssuesAPI(page, since); - - // deal with headers in any case - Headers headers = jsonResponse.getHeaders(); - XRateLimit = Integer.valueOf(headers.getFirst("X-RateLimit-Limit")); - XRateRemaining = Integer.valueOf(headers.getFirst("X-RateLimit-Remaining")); - XRateReset = Integer.valueOf(headers.getFirst("X-RateLimit-Reset")); - switch (jsonResponse.getStatus()){ + httpResponse = getNextIssuesAPI(repoVar); + //reading the headers of the response + HashMap headers=new HashMap(); + for(Header h:httpResponse.getAllHeaders()) + headers.put(h.getName(),h.getValue()); + + XRateLimit = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Limit").getValue()); + XRateRemaining = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Remaining").getValue()); + XRateReset = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Reset").getValue()); + //reading the httpResponse content(body) + String jsonResponse= EntityUtils.toString(httpResponse.getEntity()); + JSONArray jsonArray=new JSONArray(); + JSONObject jsonBody=new JSONObject(); + try{ + //try to read httpResponse as JSONArray if possible + jsonArray=new JSONArray(jsonResponse); + } + catch (JSONException ex){ + //read as JSONObject + jsonBody=new JSONObject(jsonResponse); + } + + switch (httpResponse.getStatusLine().getStatusCode()){ case 200: - return jsonResponse.getBody().getArray(); + return jsonArray; case 401: throw new ConnectException("Bad GitHub credentials provided, please edit your config"); case 403: // we have issues too many requests. - log.info(jsonResponse.getBody().getObject().getString("message")); + log.info(jsonBody.getString("message")); log.info(String.format("Your rate limit is %s", XRateLimit)); log.info(String.format("Your remaining calls is %s", XRateRemaining)); log.info(String.format("The limit will reset at %s", @@ -57,41 +82,45 @@ protected JSONArray getNextIssues(Integer page, Instant since) throws Interrupte long sleepTime = XRateReset - Instant.now().getEpochSecond(); log.info(String.format("Sleeping for %s seconds", sleepTime )); Thread.sleep(1000 * sleepTime); - return getNextIssues(page, since); + return getNextIssues(repoVar); default: - log.error(constructUrl(page, since)); - log.error(String.valueOf(jsonResponse.getStatus())); - log.error(jsonResponse.getBody().toString()); - log.error(jsonResponse.getHeaders().toString()); + log.error(constructUrl(repoVar)); + log.error(String.valueOf(httpResponse.getStatusLine().getStatusCode())); + log.error(jsonResponse); + log.error(headers.toString()); log.error("Unknown error: Sleeping 5 seconds " + "before re-trying"); Thread.sleep(5000L); - return getNextIssues(page, since); + return getNextIssues(repoVar); } - } catch (UnirestException e) { + } catch (IOException e) { e.printStackTrace(); Thread.sleep(5000L); return new JSONArray(); } } - protected HttpResponse getNextIssuesAPI(Integer page, Instant since) throws UnirestException { - GetRequest unirest = Unirest.get(constructUrl(page, since)); - if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){ - unirest = unirest.basicAuth(config.getAuthUsername(), config.getAuthPassword()); + protected HttpResponse getNextIssuesAPI(RepositoryVariables repoVar) throws IOException { + HttpGet httpGet=new HttpGet(constructUrl(repoVar)); + if(!config.getAuthAccesstoken().isEmpty()){ + SetBearerAuthUtil.SetBearerAuthUtil(httpGet,config.getAuthAccesstoken()); } - log.debug(String.format("GET %s", unirest.getUrl())); - return unirest.asJson(); + + else if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){ + SetBasicAuthUtil.SetBasicAuth(httpGet,config.getAuthUsername(), config.getAuthPassword()); + } + HttpResponse response= httpClient.execute(httpGet); + return response; } - protected String constructUrl(Integer page, Instant since){ + protected String constructUrl(RepositoryVariables repoVar){ return String.format( "https://api.github.com/repos/%s/%s/issues?page=%s&per_page=%s&since=%s&state=all&direction=asc&sort=updated", - config.getOwnerConfig(), - config.getRepoConfig(), - page, + repoVar.getOwner(), + repoVar.getRepoName(), + repoVar.nextPageToVisit, config.getBatchSize(), - since.toString()); + repoVar.nextQuerySince.toString()); } public void sleep() throws InterruptedException { @@ -108,4 +137,10 @@ public void sleepIfNeed() throws InterruptedException { sleep(); } } + + @Override + public void close() throws IOException { + if(httpClientProvider!=null) + httpClientProvider.close(); + } } \ No newline at end of file diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java b/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java index 768bdde..d453092 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java @@ -1,9 +1,11 @@ package com.simplesteph.kafka; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import com.simplesteph.kafka.utils.RepoJoinUtil; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; @@ -13,6 +15,7 @@ public class GitHubSourceConnector extends SourceConnector { private static Logger log = LoggerFactory.getLogger(GitHubSourceConnector.class); private GitHubSourceConnectorConfig config; + private Map settings; @Override public String version() { @@ -22,6 +25,7 @@ public String version() { @Override public void start(Map map) { config = new GitHubSourceConnectorConfig(map); + settings=map; } @Override @@ -32,8 +36,35 @@ public Class taskClass() { @Override public List> taskConfigs(int i) { // Define the individual task configurations that will be executed. - ArrayList> configs = new ArrayList<>(1); - configs.add(config.originalsStrings()); + String repos[]=config.getReposConfig(); + if(repos.length> configs = new ArrayList<>(i); + + //Distributing the repositories among the taskConfigs efficiently so that the possible max of repos per task is min + int remainingRepos=repos.length; + int currentRepo=0; + while(remainingRepos>0){ + int NumOfReposForTask=(remainingRepos/i) +((remainingRepos%i==0)?0:1); + String ReposForTask=""; + for(int j=0;j task_settings=new HashMap<>(settings); + //updating repos for the task + task_settings.put(GitHubSourceConnectorConfig.REPOS_CONFIG,ReposForTask); + configs.add(task_settings); + + currentRepo+=NumOfReposForTask; + remainingRepos-=NumOfReposForTask; + i--; + } + return configs; } diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java b/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java index 9056ef9..eb2c7a0 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceConnectorConfig.java @@ -1,6 +1,7 @@ package com.simplesteph.kafka; import com.simplesteph.kafka.Validators.BatchSizeValidator; +import com.simplesteph.kafka.Validators.ReposPatternValidator; import com.simplesteph.kafka.Validators.TimestampValidator; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -14,21 +15,15 @@ public class GitHubSourceConnectorConfig extends AbstractConfig { - public static final String TOPIC_CONFIG = "topic"; - private static final String TOPIC_DOC = "Topic to write to"; - - public static final String OWNER_CONFIG = "github.owner"; - private static final String OWNER_DOC = "Owner of the repository you'd like to follow"; - - public static final String REPO_CONFIG = "github.repo"; - private static final String REPO_DOC = "Repository you'd like to follow"; - public static final String SINCE_CONFIG = "since.timestamp"; private static final String SINCE_DOC = "Only issues updated at or after this time are returned.\n" + "This is a timestamp in ISO 8601 format: YYYY-MM-DDTHH:MM:SSZ.\n" + "Defaults to a year from first launch."; + public static final String REPOS_CONFIG="github.repos"; + public static final String REPOS_DOC="Repositories you'd like to follow . owner/repo:topic pattern to be followed for mentioning repositories."; + public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "Number of data points to retrieve at a time. Defaults to 100 (max value)"; @@ -39,6 +34,9 @@ public class GitHubSourceConnectorConfig extends AbstractConfig { private static final String AUTH_PASSWORD_DOC = "Optional Password to authenticate calls"; + public static final String AUTH_ACCESSTOKEN_CONFIG = "auth.accesstoken"; + private static final String AUTH_ACCESSTOKEN_DOC = "Optional accesstoken to authenticate calls"; + public GitHubSourceConnectorConfig(ConfigDef config, Map parsedConfig) { super(config, parsedConfig); } @@ -49,23 +47,16 @@ public GitHubSourceConnectorConfig(Map parsedConfig) { public static ConfigDef conf() { return new ConfigDef() - .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, TOPIC_DOC) - .define(OWNER_CONFIG, Type.STRING, Importance.HIGH, OWNER_DOC) - .define(REPO_CONFIG, Type.STRING, Importance.HIGH, REPO_DOC) + .define(REPOS_CONFIG,Type.STRING,"apache/kafka:github-issues",new ReposPatternValidator(),Importance.HIGH,REPOS_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 100, new BatchSizeValidator(), Importance.LOW, BATCH_SIZE_DOC) .define(SINCE_CONFIG, Type.STRING, ZonedDateTime.now().minusYears(1).toInstant().toString(), new TimestampValidator(), Importance.HIGH, SINCE_DOC) .define(AUTH_USERNAME_CONFIG, Type.STRING, "", Importance.HIGH, AUTH_USERNAME_DOC) - .define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC); + .define(AUTH_PASSWORD_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_PASSWORD_DOC) + .define(AUTH_ACCESSTOKEN_CONFIG, Type.PASSWORD, "", Importance.HIGH, AUTH_ACCESSTOKEN_DOC); } - public String getOwnerConfig() { - return this.getString(OWNER_CONFIG); - } - - public String getRepoConfig() { - return this.getString(REPO_CONFIG); - } + public String[] getReposConfig(){return this.getString(REPOS_CONFIG).split(",");} public Integer getBatchSize() { return this.getInt(BATCH_SIZE_CONFIG); @@ -75,10 +66,6 @@ public Instant getSince() { return Instant.parse(this.getString(SINCE_CONFIG)); } - public String getTopic() { - return this.getString(TOPIC_CONFIG); - } - public String getAuthUsername() { return this.getString(AUTH_USERNAME_CONFIG); } @@ -86,4 +73,6 @@ public String getAuthUsername() { public String getAuthPassword(){ return this.getPassword(AUTH_PASSWORD_CONFIG).value(); } + + public String getAuthAccesstoken(){return this.getPassword(AUTH_ACCESSTOKEN_CONFIG).value();} } diff --git a/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java b/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java index c5f90ac..c46c44e 100644 --- a/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java +++ b/src/main/java/com/simplesteph/kafka/GitHubSourceTask.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.time.Instant; import java.util.*; import static com.simplesteph.kafka.GitHubSchemas.*; @@ -21,13 +22,12 @@ public class GitHubSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(GitHubSourceTask.class); public GitHubSourceConnectorConfig config; - protected Instant nextQuerySince; - protected Integer lastIssueNumber; - protected Integer nextPageToVisit = 1; - protected Instant lastUpdatedAt; - + ArrayList repositoryList; GitHubAPIHttpClient gitHubHttpAPIClient; + int RoundRobinNumber; + int NumOfReposToFollow; + @Override public String version() { return VersionUtil.getVersion(); @@ -42,25 +42,35 @@ public void start(Map map) { } private void initializeLastVariables(){ - Map lastSourceOffset = null; - lastSourceOffset = context.offsetStorageReader().offset(sourcePartition()); - if( lastSourceOffset == null){ - // we haven't fetched anything yet, so we initialize to 7 days ago - nextQuerySince = config.getSince(); - lastIssueNumber = -1; - } else { - Object updatedAt = lastSourceOffset.get(UPDATED_AT_FIELD); - Object issueNumber = lastSourceOffset.get(NUMBER_FIELD); - Object nextPage = lastSourceOffset.get(NEXT_PAGE_FIELD); - if(updatedAt != null && (updatedAt instanceof String)){ - nextQuerySince = Instant.parse((String) updatedAt); - } - if(issueNumber != null && (issueNumber instanceof String)){ - lastIssueNumber = Integer.valueOf((String) issueNumber); - } - if (nextPage != null && (nextPage instanceof String)){ - nextPageToVisit = Integer.valueOf((String) nextPage); + //Initializing last variables of all the Repositories given to the task + String repos[]=config.getReposConfig(); + repositoryList=new ArrayList(repos.length); + RoundRobinNumber=0; + NumOfReposToFollow=repos.length; + for(String repo:repos) { + RepositoryVariables RepoVar=new RepositoryVariables(repo); + Map lastSourceOffset = null; + lastSourceOffset = context.offsetStorageReader().offset(sourcePartition(RepoVar)); + if (lastSourceOffset == null) { + // we haven't fetched anything yet, so we initialize to given configuration timestamp + RepoVar.nextQuerySince = config.getSince(); + RepoVar.nextPageToVisit=1; + RepoVar.lastIssueNumber = -1; + } else { + Object updatedAt = lastSourceOffset.get(UPDATED_AT_FIELD); + Object issueNumber = lastSourceOffset.get(NUMBER_FIELD); + Object nextPage = lastSourceOffset.get(NEXT_PAGE_FIELD); + if (updatedAt != null && (updatedAt instanceof String)) { + RepoVar.nextQuerySince = Instant.parse((String) updatedAt); + } + if (issueNumber != null && (issueNumber instanceof String)) { + RepoVar.lastIssueNumber = Integer.valueOf((String) issueNumber); + } + if (nextPage != null && (nextPage instanceof String)) { + RepoVar.nextPageToVisit = Integer.valueOf((String) nextPage); + } } + repositoryList.add(RepoVar); } } @@ -70,39 +80,42 @@ private void initializeLastVariables(){ public List poll() throws InterruptedException { gitHubHttpAPIClient.sleepIfNeed(); + RepositoryVariables repoVar=repositoryList.get(RoundRobinNumber); + // fetch data final ArrayList records = new ArrayList<>(); - JSONArray issues = gitHubHttpAPIClient.getNextIssues(nextPageToVisit, nextQuerySince); + JSONArray issues = gitHubHttpAPIClient.getNextIssues(repoVar); // we'll count how many results we get with i int i = 0; for (Object obj : issues) { Issue issue = Issue.fromJson((JSONObject) obj); - SourceRecord sourceRecord = generateSourceRecord(issue); + SourceRecord sourceRecord = generateSourceRecord(issue,repoVar); records.add(sourceRecord); i += 1; - lastUpdatedAt = issue.getUpdatedAt(); + repoVar.lastUpdatedAt = issue.getUpdatedAt(); } if (i > 0) log.info(String.format("Fetched %s record(s)", i)); - if (i == 100){ + if (i == config.getBatchSize()){ // we have reached a full batch, we need to get the next one - nextPageToVisit += 1; + repoVar.nextPageToVisit += 1; } else { - nextQuerySince = lastUpdatedAt.plusSeconds(1); - nextPageToVisit = 1; + repoVar.nextQuerySince = repoVar.lastUpdatedAt.plusSeconds(1); + repoVar.nextPageToVisit = 1; gitHubHttpAPIClient.sleep(); } + RoundRobinNumber=(RoundRobinNumber+1)%NumOfReposToFollow; return records; } - private SourceRecord generateSourceRecord(Issue issue) { + private SourceRecord generateSourceRecord(Issue issue,RepositoryVariables repoVar) { return new SourceRecord( - sourcePartition(), - sourceOffset(issue.getUpdatedAt()), - config.getTopic(), + sourcePartition(repoVar), + sourceOffset(repoVar), + repoVar.getTopic(), null, // partition will be inferred by the framework KEY_SCHEMA, - buildRecordKey(issue), + buildRecordKey(issue,repoVar), VALUE_SCHEMA, buildRecordValue(issue), issue.getUpdatedAt().toEpochMilli()); @@ -111,27 +124,34 @@ private SourceRecord generateSourceRecord(Issue issue) { @Override public void stop() { // Do whatever is required to stop your task. + try { + log.info("Closing gitHubHttpAPIClient"); + gitHubHttpAPIClient.close(); + } catch (IOException e) { + log.error("Exception in closing gitHubHttpAPIClient"); + e.printStackTrace(); + } } - private Map sourcePartition() { + private Map sourcePartition(RepositoryVariables RepoVar) { Map map = new HashMap<>(); - map.put(OWNER_FIELD, config.getOwnerConfig()); - map.put(REPOSITORY_FIELD, config.getRepoConfig()); + map.put(OWNER_FIELD, RepoVar.getOwner()); + map.put(REPOSITORY_FIELD, RepoVar.getRepoName()); return map; } - private Map sourceOffset(Instant updatedAt) { + private Map sourceOffset(RepositoryVariables repoVar) { Map map = new HashMap<>(); - map.put(UPDATED_AT_FIELD, DateUtils.MaxInstant(updatedAt, nextQuerySince).toString()); - map.put(NEXT_PAGE_FIELD, nextPageToVisit.toString()); + map.put(UPDATED_AT_FIELD,repoVar.nextQuerySince.toString()); + map.put(NEXT_PAGE_FIELD, repoVar.nextPageToVisit.toString()); return map; } - private Struct buildRecordKey(Issue issue){ + private Struct buildRecordKey(Issue issue,RepositoryVariables repoVar){ // Key Schema Struct key = new Struct(KEY_SCHEMA) - .put(OWNER_FIELD, config.getOwnerConfig()) - .put(REPOSITORY_FIELD, config.getRepoConfig()) + .put(OWNER_FIELD, repoVar.getOwner()) + .put(REPOSITORY_FIELD, repoVar.getRepoName()) .put(NUMBER_FIELD, issue.getNumber()); return key; diff --git a/src/main/java/com/simplesteph/kafka/HttpClientProvider.java b/src/main/java/com/simplesteph/kafka/HttpClientProvider.java new file mode 100644 index 0000000..d18b3a1 --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/HttpClientProvider.java @@ -0,0 +1,66 @@ +package com.simplesteph.kafka; + +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.ssl.SSLContexts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.Closeable; +import java.io.IOException; +import java.security.GeneralSecurityException; + +public class HttpClientProvider implements Closeable { + private static final Logger log = LoggerFactory.getLogger(HttpClientProvider.class); + private PoolingHttpClientConnectionManager poolingConnectionManager; + private CloseableHttpClient httpClient; + public HttpClientProvider(){ + CreateAllSSLHttpClient(); + } + public void CreateAllSSLHttpClient() { + try { + TrustStrategy acceptingTrustStrategy = (cert, authType) -> true; + SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, acceptingTrustStrategy).build(); + SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, + NoopHostnameVerifier.INSTANCE); + + Registry socketFactoryRegistry = + RegistryBuilder.create() + .register("https", sslsf) + .register("http", new PlainConnectionSocketFactory()) + .build(); + + this.poolingConnectionManager = + new PoolingHttpClientConnectionManager(socketFactoryRegistry); + this.httpClient = HttpClients.custom().setSSLSocketFactory(sslsf) + .setConnectionManager(poolingConnectionManager).build(); + + log.info("Successfully created AcceptAllSSLHttpClient"); + } + catch (GeneralSecurityException ex){ + log.error("Unable to create AcceptAllSSLHttpClient"); + } + } + public CloseableHttpClient getHttpClient(){ + return httpClient; + } + @Override + public void close() throws IOException { + if(httpClient!=null) + httpClient.close(); + if(poolingConnectionManager!=null) { + poolingConnectionManager.close(); + poolingConnectionManager.shutdown(); + } + } +} diff --git a/src/main/java/com/simplesteph/kafka/RepositoryVariables.java b/src/main/java/com/simplesteph/kafka/RepositoryVariables.java new file mode 100644 index 0000000..5582a5e --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/RepositoryVariables.java @@ -0,0 +1,32 @@ +package com.simplesteph.kafka; +import java.time.Instant; + +//This is a class which has all necessary details about a repository +public class RepositoryVariables { + private String repository; + private String owner; + private String repo_name; + private String topic; + protected Instant nextQuerySince; + protected Integer lastIssueNumber; + protected Integer nextPageToVisit = 1; + protected Instant lastUpdatedAt; + public RepositoryVariables(String repository){ + this.repository=repository; + String components[]=repository.split("[/:]"); //splitting repository string which is of pattern "owner/repo:topic" + this.owner=components[0]; + this.repo_name=components[1]; + this.topic=components[2]; + } + + public String getOwner(){ + return owner; + } + public String getRepoName(){ + return repo_name; + } + + public String getTopic(){ + return topic; + } +} diff --git a/src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java b/src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java new file mode 100644 index 0000000..c9af53d --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/Validators/ReposPatternValidator.java @@ -0,0 +1,18 @@ +package com.simplesteph.kafka.Validators; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +public class ReposPatternValidator implements ConfigDef.Validator { + @Override + public void ensureValid(String name, Object value) { + + String []repos=((String) value).split(","); + + for(String repo:repos){ + //[A-Za-z0-9_.-] are the characters that are allowed for naming in github and [a-zA-Z0-9\._-] in kafka topics + if(!repo.matches("[A-Za-z0-9_.-]{1,}/[A-Za-z0-9_.-]{1,}:[a-zA-Z0-9\\._-]{1,}")) + throw new ConfigException(name,value,"'owner1/repo1:topic1,owner2/repo2:topic2' pattern to be followed for mentioning repositories"); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java b/src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java new file mode 100644 index 0000000..b067b51 --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/utils/RepoJoinUtil.java @@ -0,0 +1,15 @@ +package com.simplesteph.kafka.utils; + +public class RepoJoinUtil { + public static String Join(String s1,String s2){ + if(s1==null&&s2==null) + return ""; + if(s1==null||s1.trim().isEmpty()) + return s2.trim(); + else if(s2==null||s2.trim().isEmpty()) + return s1.trim(); + else + return s1.trim()+","+s2.trim(); + } + +} diff --git a/src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java b/src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java new file mode 100644 index 0000000..30d9660 --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/utils/SetBasicAuthUtil.java @@ -0,0 +1,19 @@ +package com.simplesteph.kafka.utils; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpHeaders; +import org.apache.http.HttpRequest; + +import java.nio.charset.StandardCharsets; + +public class SetBasicAuthUtil { + public static HttpRequest SetBasicAuth(HttpRequest request,String username,String password ){ + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64( + auth.getBytes(StandardCharsets.ISO_8859_1)); + String authHeader = "Basic " + new String(encodedAuth); + request.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + return request; + } + +} diff --git a/src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java b/src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java new file mode 100644 index 0000000..5154a3a --- /dev/null +++ b/src/main/java/com/simplesteph/kafka/utils/SetBearerAuthUtil.java @@ -0,0 +1,12 @@ +package com.simplesteph.kafka.utils; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpRequest; + +public class SetBearerAuthUtil { + public static HttpRequest SetBearerAuthUtil(HttpRequest request, String bearToken){ + String authHeader = "Bearer" + " " + bearToken; + request.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + return request; + } +} diff --git a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java index b635b13..1ec76d5 100644 --- a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java +++ b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java @@ -18,11 +18,9 @@ public class GitHubSourceConnectorConfigTest { @Before public void setUpInitialConfig() { config = new HashMap<>(); - config.put(OWNER_CONFIG, "foo"); - config.put(REPO_CONFIG, "bar"); + config.put(REPOS_CONFIG,"kubernetes/kubernetes:github-issues.31,foo-foo/bar_bar:github.issues"); config.put(SINCE_CONFIG, "2017-04-26T01:23:45Z"); config.put(BATCH_SIZE_CONFIG, "100"); - config.put(TOPIC_CONFIG, "github-issues"); } @Test @@ -44,6 +42,17 @@ public void canReadConfigCorrectly() { } + @Test + public void validateRepos() { + config.put(REPOS_CONFIG, "not-a-valid-pattern"); + ConfigValue configValue = configDef.validateAll(config).get(REPOS_CONFIG); + assertTrue(configValue.errorMessages().size() > 0); + + config.put(REPOS_CONFIG, "valid/pattern:followed"); + configValue = configDef.validateAll(config).get(REPOS_CONFIG); + assertEquals(configValue.errorMessages().size() , 0); + } + @Test public void validateSince() { config.put(SINCE_CONFIG, "not-a-date"); diff --git a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java index 077af8e..8f00e0f 100644 --- a/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java +++ b/src/test/java/com/simplesteph/kafka/GitHubSourceConnectorTest.java @@ -12,11 +12,9 @@ public class GitHubSourceConnectorTest { private Map initialConfig() { Map baseProps = new HashMap<>(); - baseProps.put(OWNER_CONFIG, "foo"); - baseProps.put(REPO_CONFIG, "bar"); + baseProps.put(REPOS_CONFIG,"kubernetes/kubernetes:github-issues.31,foo-foo/bar_bar:github.issues,foo-bar/bar_foo:github.issues"); baseProps.put(SINCE_CONFIG, "2017-04-26T01:23:45Z"); baseProps.put(BATCH_SIZE_CONFIG, "100"); - baseProps.put(TOPIC_CONFIG, "github-issues"); return (baseProps); } @@ -25,6 +23,9 @@ public void taskConfigsShouldReturnOneTaskConfig() { GitHubSourceConnector gitHubSourceConnector = new GitHubSourceConnector(); gitHubSourceConnector.start(initialConfig()); assertEquals(gitHubSourceConnector.taskConfigs(1).size(),1); - assertEquals(gitHubSourceConnector.taskConfigs(10).size(),1); + assertEquals(gitHubSourceConnector.taskConfigs(2).size(),2); + assertEquals(gitHubSourceConnector.taskConfigs(3).size(),3); + assertEquals(gitHubSourceConnector.taskConfigs(4).size(),3); + assertEquals(gitHubSourceConnector.taskConfigs(10).size(),3); } } diff --git a/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java b/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java index fca0cb0..1db65ac 100644 --- a/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java +++ b/src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java @@ -1,12 +1,13 @@ package com.simplesteph.kafka; -import com.mashape.unirest.http.HttpResponse; -import com.mashape.unirest.http.JsonNode; -import com.mashape.unirest.http.exceptions.UnirestException; import com.simplesteph.kafka.model.Issue; +import org.apache.http.HttpResponse; +import org.apache.http.util.EntityUtils; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.Test; +import java.io.IOException; import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -22,33 +23,31 @@ public class GitHubSourceTaskTest { private Map initialConfig() { Map baseProps = new HashMap<>(); - baseProps.put(OWNER_CONFIG, "apache"); - baseProps.put(REPO_CONFIG, "kafka"); - baseProps.put(SINCE_CONFIG, "2017-04-26T01:23:44Z"); + baseProps.put(REPOS_CONFIG,"apache/kafka:github_issues,kubernetes/kubernetes:github-issues.31,foo-foo/bar_bar:github.issues"); + baseProps.put(SINCE_CONFIG, "2017-01-01T00:00:00Z"); baseProps.put(BATCH_SIZE_CONFIG, batchSize.toString()); - baseProps.put(TOPIC_CONFIG, "github-issues"); return baseProps; } @Test - public void test() throws UnirestException { + public void test() throws IOException { gitHubSourceTask.config = new GitHubSourceConnectorConfig(initialConfig()); - gitHubSourceTask.nextPageToVisit = 1; - gitHubSourceTask.nextQuerySince = Instant.parse("2017-01-01T00:00:00Z"); + RepositoryVariables repoVar=new RepositoryVariables(gitHubSourceTask.config.getReposConfig()[0]); + repoVar.nextQuerySince=Instant.parse("2017-01-01T00:00:00Z"); gitHubSourceTask.gitHubHttpAPIClient = new GitHubAPIHttpClient(gitHubSourceTask.config); - String url = gitHubSourceTask.gitHubHttpAPIClient.constructUrl(gitHubSourceTask.nextPageToVisit, gitHubSourceTask.nextQuerySince); + String url = gitHubSourceTask.gitHubHttpAPIClient.constructUrl(repoVar); System.out.println(url); - HttpResponse httpResponse = gitHubSourceTask.gitHubHttpAPIClient.getNextIssuesAPI(gitHubSourceTask.nextPageToVisit, gitHubSourceTask.nextQuerySince); - if (httpResponse.getStatus() != 403) { - assertEquals(200, httpResponse.getStatus()); - Set headers = httpResponse.getHeaders().keySet(); - assertTrue(headers.contains("ETag")); - assertTrue(headers.contains("X-RateLimit-Limit")); - assertTrue(headers.contains("X-RateLimit-Remaining")); - assertTrue(headers.contains("X-RateLimit-Reset")); - assertEquals(batchSize.intValue(), httpResponse.getBody().getArray().length()); - JSONObject jsonObject = (JSONObject) httpResponse.getBody().getArray().get(0); + HttpResponse httpResponse = gitHubSourceTask.gitHubHttpAPIClient.getNextIssuesAPI(repoVar); + if (httpResponse.getStatusLine().getStatusCode() != 403) { + assertEquals(200, httpResponse.getStatusLine().getStatusCode()); + assertTrue(httpResponse.getFirstHeader("ETag")!=null); + assertTrue(httpResponse.getFirstHeader("X-RateLimit-Limit")!=null); + assertTrue(httpResponse.getFirstHeader("X-RateLimit-Remaining")!=null); + assertTrue(httpResponse.getFirstHeader("X-RateLimit-Reset")!=null); + JSONArray jsonArray=new JSONArray(EntityUtils.toString(httpResponse.getEntity())); + assertTrue(batchSize.intValue() >= jsonArray.length()); + JSONObject jsonObject = (JSONObject) jsonArray.get(0); Issue issue = Issue.fromJson(jsonObject); assertNotNull(issue); assertNotNull(issue.getNumber());