====== Apache Kafka ======
[[https://kafka.apache.org/intro|Apache Kafka]] is a fast, scalable, durable, and fault-tolerant [[https://hortonworks.com/apache/kafka/#section_2|publish-subscribe messaging system]].
Kafka is run as a cluster that stores streams of **records** in categories called topics. A **topic** is a category or feed name to which records are published.
Kafka has four core APIs:
* **Producers** publish a stream of records to one or more Kafka topics.
* **Consumers** subscribe to one or more topics and process the stream of records produced to them.
* **Stream** processors allow an application to consume input streams and produce output streams.
* **Connectors** are producers or consumers that connect Kafka topics to existing applications or data systems
More information about Kafka's [[https://kafka.apache.org/documentation/#design|design philosophy]], [[https://kafka.apache.org/uses|use cases]].
Implementation examples for [[https://dzone.com/articles/kafka-producer-and-consumer-example|Producers and Consumers]], [[https://dzone.com/articles/live-dashboard-using-apache-kafka-and-spring-webso|Kafka with Spring WebSockets]].
====== Apache Storm ======
[[https://storm.apache.org/|Storm]]
====== Apache Flume ======
[[https://github.com/apache/flume/tree/flume-1.5|Version 1.5. repository]] for looking into implementation details.
===== Agents =====
[[https://flume.apache.org/releases/content/1.5.0/FlumeUserGuide.html#a-simple-example|Simple example]]
===== Custom source =====
Example for a [[https://flume.apache.org/releases//content/1.5.0/FlumeDeveloperGuide.html#source|PollableSource]] .
==== EventDrivenSource ====
A tiny example for an EventDrivenSource
* implements Configurable: configure(Context ctx)
* implement EventDrivenSource
* overrides start() and stop()
* based on WebsocketClient, a [[https://docs.oracle.com/javaee/7/tutorial/websocket004.htm|Endpoint]] implementation.
public class CustomSource extends AbstractSource implements Configurable, EventDrivenSource {
private WebsocketClient clientEndPoint;
@Override
public void configure(Context ctx) {
try {
String endpoint = ctx.getString("endpoint");
String user = ctx.getString("user");
String pw = ctx.getString("password");
String subscribe = ctx.getString("subscriptionString");
clientEndPoint = new WebsocketClient(new URI(endpoint), user, pw, subscribe);
} catch (DeploymentException | IOException | URISyntaxException e) {
e.printStackTrace();
}
clientEndPoint.addMessageHandler(new MessageHandler() {
public void handleMessage(String message) {
ChannelProcessor processor = getChannelProcessor();
Event event = new SimpleEvent();
JsonValue value = Json.parse(message);
event.setHeaders(createEventHeaders(value));
event.setBody(message.getBytes());
processor.processEvent(event);
}
private Map createEventHeaders(JsonValue value) {
Map h = new TreeMap();
String oid = value.asObject().get("payload").asObject().get("id").asString();
h.put("oid", oid);
return h;
}
}) ;
}
@Override
public void start() {
super.start();
}
@Override
public void stop() {
super.stop();
}
}
===== HDFS sinks =====
[[https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/ds_flume/FlumeUserGuide.html#hdfs-sink|HDFS sinks]] can produce different file formats: SequenceFile, DataStream or CompressedStream.
I used human-readable SequenceFiles
ais.sinks.hdfs1.hdfs.fileType = SequenceFile
ais.sinks.hdfs1.hdfs.writeFormat = Text
which results in:
1552319652676 {"id":"219014435","position":[11.858878135681152,57.645938873291016]}
1552319652684 {"id":"205484390","position":[3.687695026397705,51.09416961669922]}
The key (first part of each line) is by default, the event header timestamp (read e.g. [[https://stackoverflow.com/questions/32440926/flume-how-to-create-a-custom-key-for-a-hdfs-sequencefile|here for custom keys]]).
If a **key is not required**, use a DataStream
ais.sinks.hdfs1.hdfs.fileType = DataStream
ais.sinks.hdfs1.hdfs.writeFormat = Text
instead.
Using a CompressedStream saves storage space:
ais.sinks.hdfs1.hdfs.fileType = DataStream
ais.sinks.hdfs1.hdfs.codeC = gzip
ais.sinks.hdfs1.hdfs.writeFormat = Text
Creating a more customised file name is possible via event header items. As [[bigdata#eventdrivensource|implemented above]], the header contains an object id called "oid" that can be used in the agent configuration:
ais.sinks.hdfs1.hdfs.filePrefix = %{oid}
===== Troubleshooting =====
==== ChannelFullException: Space for commit to queue couldn't be acquired. ====
I tried to write to a [[https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/ds_flume/FlumeUserGuide.html#hdfs-sink|HDFS sink]], and got this exception in the logs:
org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: c1}
at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
...
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:132)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
... 33 more
After researching, increasing [[https://bigdatadiscussionspr.blogspot.com/|JVM_OPTS]], and unsuccessfully tuning the channel parameters, the problem turned out to be that the flume user was** not allowed to write** to the hdfs.path.
===== Installing plugins in Ambari =====
[[https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.5/ds_flume/FlumeUserGuide.html#the-plugins-d-directory|Flume plugins]] have their own dedicated directory. In Ambari it's:
mkdir /usr/hdp/current/flume-server/plugins.d/
Dependencies go in to a subdir:
plugins.d/
plugins.d/MySource/
plugins.d/MySource/lib/my-source.jar
plugins.d/MySource/libext/a-dependency-6.6.6.jar
or provide a fat jar instead.
===== Managing agents in Ambari =====
[[https://developer.ibm.com/hadoop/2017/01/06/start-flume-agents-using-ambari-web-interface/|Flume in Ambari]]
====== GeoMesa ======
===== Simple Feature Types =====
[[https://www.geomesa.org/documentation/user/cli/sfts.html?|SFTs]]
===== Ingesting into table =====
General advice for developing converters
* use the **convert** command (faster, doesn't modify tables) instead of ingest
* use a small subset of your data
* use --run-mode local
* enable debugging in conf/log4j.properties org.locationtech.geomesa.convert=debug
* set the error mode to 'raise-errors' in the converter options: https://www.geomesa.org/documentation/user/convert/parsing_and_validation.html#error-mode
==== CSV ====
The [[https://www.geomesa.org/documentation/user/convert/delimited_text.html|CSV converter docs]] contain a lot of useful information.
==== JSON ====
The [[https://www.geomesa.org/documentation/user/convert/json.html|JSON converter docs]] already explains a lot.
To ingest a ship data file which contains **multiple** JSON documents:
{"payload":{"mmsi":"11111","position":[5.061738442342835,52.4892342343],"SOG":0.0,"COG":0.0,"navStatus":"engaged in fishing","recordedAt":"2019-01-14T00:00:00.072Z"}}
{"payload":{"mmsi":"66666","position":[10.869939804077148,59.077205657958984],"SOG":0.0,"COG":290.3,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.084Z"}}
{"payload":{"mmsi":"88888","position":[30.880006790161133,70.88723754882812],"SOG":4.9,"COG":232.6,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.103Z"}}
you can use this converter definition
cat conf/sfts/shipdata/reference.conf
geomesa {
sfts {
shipdata = {
fields = [
{ name = "mmsi", type = "Integer", index = "full" }
{ name = "geom", type = "Point", srid = 4326, index = true }
{ name = "recordedAt", type = "Date", index = "full" }
{ name = "navStatus", type = "String" }
{ name = "sog", type = "Double" }
{ name = "cog", type = "Double" }
]
user-data = {
geomesa.table.sharing = "false"
}
}
}
converters {
shipdata = {
type = "json"
id-field = "concatenate($mmsi,$recordedAt)"
options {
error-mode = "raise-errors"
}
fields = [
{ name = "mmsi", json-type = "string", path = "$.payload.mmsi", transform = "$0::int" }
{ name = "lon", json-type = "double", path = "$.payload.position[0]" }
{ name = "lat", json-type = "double", path = "$.payload.position[1]" }
{ name = "geom", transform = "point($lon, $lat)" }
{ name = "recordedAt", json-type = "string", path = "$.payload.recordedAt", transform = "dateTime($0)" }
{ name = "navStatus", json-type = "string", path = "$.payload.navStatus" }
{ name = "sog", json-type = "double", path = "$.payload.SOG" }
{ name = "cog", json-type = "double", path = "$.payload.COG" }
]
}
}
}
Note
* the **feature-path** parameter is optional, and not needed for our simple documents
* to create points from a json array, an intermediate transformation step is defined to extract **lat** and **lon**
The json files can be gzipped, and can reside in HDFS:
ingest -c geomesa.shipdata -C shipdata -s shipdata --user xx --password xx --zookeepers a,b,c --input-format json --instance myinstance --run-mode local hdfs://server:port/ship/archive/2019-01.gz
==== Troubleshooting ====
The [[https://gitter.im/locationtech/geomesa|chat]] is a helpful place to deposit questions.
====== Hortonworks ======
[[https://community.hortonworks.com/questions/80005/confusion-with-hdf-and-hdp-and-their-capabilities.html|HDP vs HDF]]
====== Hortonworks Data Platform (HDP) ======
===== Alerts =====
https://community.hortonworks.com/questions/140101/what-does-hdfs-storage-capacity-usage-weekly-alert.html
===== Logging =====
[[https://www.ibm.com/support/knowledgecenter/en/SSC2HF_3.0.1/insurance/t_hdfs_audit_logging.html|hdfs audit log]]
===== Time series =====
==== Libraries ====
[[https://github.com/twosigma/flint|twosigma flint]] python only
[[https://github.com/sryza/spark-timeseries|spark-timeseries]] no longer active, the dev [[https://groups.google.com/d/msg/spark-ts/iIhJBgQuhr8/bpzgj1kHBAAJ|recommends flint]]
====== Data protection ======
===== HDFS =====
hdfs-audit.log files
====== Clouds ======
https://aws.amazon.com/pricing/
https://www.cloudera.com/products/pricing.html
https://azure.microsoft.com/en-us/pricing/
====== Trainings and Certifications ======
From [[https://www.simplilearn.com/top-big-data-certifications-article|here]] and [[https://www.cio.com/article/3222879/15-data-science-certifications-that-will-pay-off.html|here]]
===== Cloudera (has swallowed Hortonworks) =====
[[https://www.computerworld.com/article/3428025/everything-you-need-to-know-about-the-new-cloudera-data-platform--vision--migration-and-roadmap.html|roadmap for merging HDP into Cloudera]]
==== HDP Spark Developer DEV-343 ====
[[https://www.cloudera.com/about/training/courses/hdp-spark-developer.html#?classType=virtual|HDP Spark Developer DEV-343]]
DataFrames/DataSets/RDD, shuffling, transformations and performance tuning, streaming
* dataOps, analysts
* 4 days
* 3200 $ virtual classroom or
* 3500 € Paris
==== HDP Data Science SCI-241 ====
[[https://www.cloudera.com/about/training/courses/applying-data-science-using-apache-hadoop.html|HDP Data Science SCI-241]]
key concepts like NN, classification, regression, clustering, tools and frameworks like spark mlib, tensorflow
* analysts, scientists
* 4 days
* on request
==== Cloudera Certified Associate (CCA) ====
[[https://www.cloudera.com/about/training/certification.html|Cloudera Certified Associate (CCA)]] consists of 3 parts:
* [[https://university.cloudera.com/content/cca175|CCA Spark and Hadoop Developer exam]]
* [[https://www.cloudera.com/about/training/certification/cca-data-analyst.html|CCA Data Analyst exam]]
* [[https://www.cloudera.com/about/training/certification/cca-admin.html|CCA Administrator exam]]
each exam costs $295
[[https://www.cloudera.com/about/training/course-listing.html#?course=all|course locations]]
===== Coursera =====
https://www.coursera.org/courses?query=apache%20spark
===== DataBricks =====
https://academy.databricks.com/catalog
[[https://academy.databricks.com/instructor-led-training/DB301|DB 301 - Apache Spark™ for Machine Learning and Data Science]]
* 3 days
* virtual classroom
* $2500
short, self-paced courses based on AWS or Azure: $75
[[https://academy.databricks.com/category/certifications|certifications]]
===== Dell =====
[[https://education.dellemc.com/index_downtime.htm|site down]]
example: [[https://education.dellemc.com/content/dam/dell-emc/documents/en-us/E20_065_Advanced_Analytics_Specialist_Exam.pdf|Specialist -Data Scientist, Advanced Analytics Version 1.0]]
===== Google =====
===== Heinlein =====
[[https://www.heinlein-support.de/schulung/big-data-mit-hadoop|Big Data mit Hadoop]]
introduction, overview, first steps
* analysts, dataOps
* 3 days
* 2000 €
* Berlin
===== IBM =====
[[https://www.coursera.org/professional-certificates/ibm-data-science|Data Science Professional Certificate]]
[[https://www.coursera.org/specializations/advanced-data-science-ibm|Advanced Data Science with IBM Specialization]]
===== Hewlett Packard (has swallowed MapR) =====
([[https://www.hpe.com/us/en/newsroom/press-release/2019/08/hpe-advances-its-intelligent-data-platform-with-acquisition-of-mapr-business-assets.html|merger announcement]])
[[https://mapr.com/training/certification/|The MapR Academy Certification Program is closed to new registration as we work to update the exams.]]
===== Microsoft Azure =====
===== SimpliLearn =====
[[https://www.simplilearn.com/big-data-and-analytics/big-data-and-hadoop-training|Big Data Hadoop Certification Training Course]]
Aligned to Cloudera [[https://www.cloudera.com/about/training/certification/cca-spark.html|CCA175]] certification exam
====== Meetups ======
https://www.meetup.com/Wien-Cloud-Computing-Meetup/
https://www.meetup.com/Big-Data-Vienna/
https://www.meetup.com/Vienna-Data-Science-Group-Meetup/
https://www.ocg.at/cloud-computing-big-data