This is an old revision of the document!
Apache Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system.
Kafka is run as a cluster that stores streams of records in categories called topics. A topic is a category or feed name to which records are published.
Kafka has four core APIs:
More information about Kafka's design philosophy, use cases.
Implementation examples for Producers and Consumers, Kafka with Spring WebSockets.
Version 1.5. repository for looking into implementation details.
Example for a PollableSource .
A tiny example for an EventDrivenSource
public class CustomSource extends AbstractSource implements Configurable, EventDrivenSource { private WebsocketClient clientEndPoint; @Override public void configure(Context ctx) { try { String endpoint = ctx.getString("endpoint"); String user = ctx.getString("user"); String pw = ctx.getString("password"); String subscribe = ctx.getString("subscriptionString"); clientEndPoint = new WebsocketClient(new URI(endpoint), user, pw, subscribe); } catch (DeploymentException | IOException | URISyntaxException e) { e.printStackTrace(); } clientEndPoint.addMessageHandler(new MessageHandler() { public void handleMessage(String message) { ChannelProcessor processor = getChannelProcessor(); Event event = new SimpleEvent(); JsonValue value = Json.parse(message); event.setHeaders(createEventHeaders(value)); event.setBody(message.getBytes()); processor.processEvent(event); } private Map<String, String> createEventHeaders(JsonValue value) { Map<String, String> h = new TreeMap<String, String>(); String oid = value.asObject().get("payload").asObject().get("id").asString(); h.put("oid", oid); return h; } }) ; } @Override public void start() { super.start(); } @Override public void stop() { super.stop(); } }
HDFS sinks can produce different file formats: SequenceFile, DataStream or CompressedStream.
I used human-readable SequenceFiles
ais.sinks.hdfs1.hdfs.fileType = SequenceFile ais.sinks.hdfs1.hdfs.writeFormat = Text
which results in:
1552319652676 {"id":"219014435","position":[11.858878135681152,57.645938873291016]} 1552319652684 {"id":"205484390","position":[3.687695026397705,51.09416961669922]}
The key (first part of each line) is by default, the event header timestamp (read e.g. here for custom keys).
If a key is not required, use a DataStream
ais.sinks.hdfs1.hdfs.fileType = DataStream ais.sinks.hdfs1.hdfs.writeFormat = Text
instead.
Using a CompressedStream saves storage space:
ais.sinks.hdfs1.hdfs.fileType = DataStream ais.sinks.hdfs1.hdfs.codeC = gzip ais.sinks.hdfs1.hdfs.writeFormat = Text
Creating a more customised file name is possible via event header items. As implemented above, the header contains an object id called “oid” that can be used in the agent configuration:
ais.sinks.hdfs1.hdfs.filePrefix = %{oid}
I tried to write to a HDFS sink, and got this exception in the logs:
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: c1} at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275) ... Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:132) at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151) at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267) ... 33 more
After researching, increasing JVM_OPTS, and unsuccessfully tuning the channel parameters, the problem turned out to be that the flume user was not allowed to write to the hdfs.path.
Flume plugins have their own dedicated directory. In Ambari it's:
mkdir /usr/hdp/current/flume-server/plugins.d/
Dependencies go in to a subdir:
plugins.d/ plugins.d/MySource/ plugins.d/MySource/lib/my-source.jar plugins.d/MySource/libext/a-dependency-6.6.6.jar
or provide a fat jar instead.
General advice for developing converters
The CSV converter docs contain a lot of useful information.
The JSON converter docs already explains a lot. To ingest a ship data file which contains multiple JSON documents:
{"payload":{"mmsi":"11111","position":[5.061738442342835,52.4892342343],"SOG":0.0,"COG":0.0,"navStatus":"engaged in fishing","recordedAt":"2019-01-14T00:00:00.072Z"}} {"payload":{"mmsi":"66666","position":[10.869939804077148,59.077205657958984],"SOG":0.0,"COG":290.3,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.084Z"}} {"payload":{"mmsi":"88888","position":[30.880006790161133,70.88723754882812],"SOG":4.9,"COG":232.6,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.103Z"}}
you can use this converter definition
cat conf/sfts/shipdata/reference.conf geomesa { sfts { shipdata = { fields = [ { name = "mmsi", type = "Integer", index = "full" } { name = "geom", type = "Point", srid = 4326, index = true } { name = "recordedAt", type = "Date", index = "full" } { name = "navStatus", type = "String" } { name = "sog", type = "Double" } { name = "cog", type = "Double" } ] user-data = { geomesa.table.sharing = "false" } } } converters { shipdata = { type = "json" id-field = "concatenate($mmsi,$recordedAt)" options { error-mode = "raise-errors" } fields = [ { name = "mmsi", json-type = "string", path = "$.payload.mmsi", transform = "$0::int" } { name = "lon", json-type = "double", path = "$.payload.position[0]" } { name = "lat", json-type = "double", path = "$.payload.position[1]" } { name = "geom", transform = "point($lon, $lat)" } { name = "recordedAt", json-type = "string", path = "$.payload.recordedAt", transform = "dateTime($0)" } { name = "navStatus", json-type = "string", path = "$.payload.navStatus" } { name = "sog", json-type = "double", path = "$.payload.SOG" } { name = "cog", json-type = "double", path = "$.payload.COG" } ] } } }
Note
The json files can be gzipped, and can reside in HDFS:
ingest -c geomesa.shipdata -C shipdata -s shipdata --user xx --password xx --zookeepers a,b,c --input-format json --instance myinstance --run-mode local hdfs://server:port/ship/archive/2019-01.gz
The chat is a helpful place to deposit questions.
twosigma flint python only
spark-timeseries no longer active, the dev recommends flint
hdfs-audit.log files
DataFrames/DataSets/RDD, shuffling, transformations and performance tuning, streaming
key concepts like NN, classification, regression, clustering, tools and frameworks like spark mlib, tensorflow
https://academy.databricks.com/catalog
DB 301 - Apache Spark™ for Machine Learning and Data Science
short, self-paced courses based on AWS or Azure: $75
The MapR Academy Certification Program is closed to new registration as we work to update the exams.
Big Data Hadoop Certification Training Course
Aligned to Cloudera CCA175 certification exam