Post

Stream tweets with Apache Kafka Connect

Abstract

Apache Kafka provides a variety of ways to connect its topics with external systems (to use as sources or sinks). One of them is Connect API. Here is example how to set up and configure com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector connector. Once it is set up and configured we will me receiving recent tweets using Twitter API that include java keyword and send them into Kafka topic. Prior you need to register application at Twitter developer https://developer.twitter.com/en/apps/create, setup read-only permission and generate Access token, Access token secret, API key and API secret key for Connector configuration.

The following components diagram will be used:

Highlevel Architecture Kafka Connect

Apply Twitter Kafka Connector

Using REST API do a call to create Connector, the request should include all metadata for connector to set up. Some platform have a UI and preinstalled connectors others are require to add connector manually. But all of them support API requests.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ curl -X POST \
  http://localhost:8083/connectors \
  -H 'Content-Type: application/json' \
  -H 'Accept: application/json' \
  -d '{
  "name": "source-twitter-distributed",
  "config": {
    "connector.class": "com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector",
    "tasks.max": "1",
    "topics": "topic-twitter",
    "topic": "topic-twitter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "twitter.consumerkey": "xxxxxxxxxxxxx",
    "twitter.consumersecret": "xxxxxxxxxxxxx",
    "twitter.token": "xxxxxxxxxxxxx",
    "twitter.secret": "xxxxxxxxxxxxx",
    "track.terms": "java",
    "language": "en"
  }
}'

The response includes connector info. I have specified to consumer java tweets, by setting “track.terms”:”java”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"name":"source-twitter-distributed",
"config": {
    "connector.class":"com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector",
    "tasks.max":"1",
    "topics":"topic-twitter",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",
    "twitter.consumerkey":"xxxxxxxxxxxxx",
    "twitter.consumersecret":"xxxxxxxxxxxxx",
    "twitter.token":"xxxxxxxxxxxxx",
    "twitter.secret":"xxxxxxxxxxxxx",
    "track.terms":"java",
    "language":"en",
    "name":"source-twitter-distributed"
},
"tasks":[],
"type":"unknown"
}

Run Consumer for topic and start receiving messages

1
$ kafka-console-consumer.sh --topic topic-twitter --bootstrap-server 127.0.0.1:9092

Once the Consumer is up and running the messages will apear in the console output. We receive output in JSON format, due to applied .converter”:”org.apache.kafka.connect.json.JsonConverter”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
	"id": 1246814159699709953,
	"created_at": "2020-04-05T14:57:15.000+0000",
	"user": {
		"id": 125107613,
		"name": "sunnyesquire",
		"screen_name": "sunny_esquire",
		"location": null,
		"verified": false,
		"friends_count": 316,
		"followers_count": 180,
		"statuses_count": 9079
	},
	"text": "RT @javarevisited: Follow these people to keep learning @java on lockdown\n@TheDonRaab  \n@jeanneboyarsky\n@javarevisited \n@trisha_gee\n@yawkat…",
	"lang": "en",
	"is_retweet": true,
	"entities": {
		"hashtags": [],
		"media": [],
		"urls": [],
		"user_mentions": [{
			"id": 1005111468595920897,
			"name": "Javarevisited",
			"screen_name": "javarevisited"
		}, {
			"id": 125485258,
			"name": "Java",
			"screen_name": "java"
		}, {
			"id": 2790286156,
			"name": "Donald Raab",
			"screen_name": "TheDonRaab"
		}, {
			"id": 55440347,
			"name": "Jeanne Boyarsky",
			"screen_name": "jeanneboyarsky"
		}, {
			"id": 1005111468595920897,
			"name": "Javarevisited",
			"screen_name": "javarevisited"
		}, {
			"id": 14162967,
			"name": "Trisha Gee",
			"screen_name": "trisha_gee"
		}, {
			"id": 463764901,
			"name": "yawkat",
			"screen_name": "yawkat"
		}]
	}
}

Check active connectors running:

1
2
3
4
5
6
$ curl localhost:8083/connectors | jq
[
  "source-twitter-distributed",
  "logs-broker",
  "file-stream-demo-distributed"
]
This post is licensed under CC BY 4.0 by the author.