Table of Contents

Apache Kafka

Apache Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system.

Kafka is run as a cluster that stores streams of records in categories called topics. A topic is a category or feed name to which records are published.

Kafka has four core APIs:

More information about Kafka's design philosophy, use cases.

Implementation examples for Producers and Consumers, Kafka with Spring WebSockets.

Apache Storm

Storm

Apache Flume

Version 1.5. repository for looking into implementation details.

Agents

Simple example

Custom source

Example for a PollableSource .

EventDrivenSource

A tiny example for an EventDrivenSource

public class CustomSource extends AbstractSource implements Configurable, EventDrivenSource {
	private WebsocketClient clientEndPoint;
 
        @Override
  	public void configure(Context ctx) {		
		try {
			String endpoint = ctx.getString("endpoint");
			String user = ctx.getString("user");
			String pw = ctx.getString("password");
			String subscribe = ctx.getString("subscriptionString");
			clientEndPoint = new WebsocketClient(new URI(endpoint), user, pw, subscribe);
		} catch (DeploymentException | IOException | URISyntaxException e) {
			e.printStackTrace();
		}
 
		clientEndPoint.addMessageHandler(new MessageHandler() {
			public void handleMessage(String message) {
				ChannelProcessor processor = getChannelProcessor();						
				Event event = new SimpleEvent();
				JsonValue value = Json.parse(message);
 
				event.setHeaders(createEventHeaders(value));
				event.setBody(message.getBytes());
				processor.processEvent(event);
 
			}		
 
			private Map<String, String> createEventHeaders(JsonValue value) {
                                Map<String, String> h = new TreeMap<String, String>();
 
				String oid = value.asObject().get("payload").asObject().get("id").asString();
 
				h.put("oid", oid);
				return h;
			}
		}) ;
	}
 
	@Override
	public void start() {
		super.start();
	}
 
	@Override
	public void stop() {
		super.stop();
	}
}

HDFS sinks

HDFS sinks can produce different file formats: SequenceFile, DataStream or CompressedStream.

I used human-readable SequenceFiles

ais.sinks.hdfs1.hdfs.fileType = SequenceFile
ais.sinks.hdfs1.hdfs.writeFormat = Text

which results in:

1552319652676   {"id":"219014435","position":[11.858878135681152,57.645938873291016]}
1552319652684   {"id":"205484390","position":[3.687695026397705,51.09416961669922]}

The key (first part of each line) is by default, the event header timestamp (read e.g. here for custom keys).

If a key is not required, use a DataStream

ais.sinks.hdfs1.hdfs.fileType = DataStream 	
ais.sinks.hdfs1.hdfs.writeFormat = Text

instead.

Using a CompressedStream saves storage space:

ais.sinks.hdfs1.hdfs.fileType = DataStream 	
ais.sinks.hdfs1.hdfs.codeC = gzip
ais.sinks.hdfs1.hdfs.writeFormat = Text

Creating a more customised file name is possible via event header items. As implemented above, the header contains an object id called “oid” that can be used in the agent configuration:

ais.sinks.hdfs1.hdfs.filePrefix = %{oid}

Troubleshooting

ChannelFullException: Space for commit to queue couldn't be acquired.

I tried to write to a HDFS sink, and got this exception in the logs:

org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: c1}
        at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
...
 
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:132)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
        ... 33 more

After researching, increasing JVM_OPTS, and unsuccessfully tuning the channel parameters, the problem turned out to be that the flume user was not allowed to write to the hdfs.path.

Installing plugins in Ambari

Flume plugins have their own dedicated directory. In Ambari it's:

mkdir /usr/hdp/current/flume-server/plugins.d/

Dependencies go in to a subdir:

plugins.d/
plugins.d/MySource/
plugins.d/MySource/lib/my-source.jar
plugins.d/MySource/libext/a-dependency-6.6.6.jar

or provide a fat jar instead.

Managing agents in Ambari

Flume in Ambari

GeoMesa

Simple Feature Types

SFTs

Ingesting into table

General advice for developing converters

CSV

The CSV converter docs contain a lot of useful information.

JSON

The JSON converter docs already explains a lot. To ingest a ship data file which contains multiple JSON documents:

{"payload":{"mmsi":"11111","position":[5.061738442342835,52.4892342343],"SOG":0.0,"COG":0.0,"navStatus":"engaged in fishing","recordedAt":"2019-01-14T00:00:00.072Z"}}
{"payload":{"mmsi":"66666","position":[10.869939804077148,59.077205657958984],"SOG":0.0,"COG":290.3,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.084Z"}}
{"payload":{"mmsi":"88888","position":[30.880006790161133,70.88723754882812],"SOG":4.9,"COG":232.6,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.103Z"}}

you can use this converter definition

cat conf/sfts/shipdata/reference.conf
geomesa {
  sfts {
    shipdata = {
      fields = [
        { name = "mmsi",        type = "Integer",            index = "full" }
        { name = "geom",        type = "Point", srid = 4326, index = true   }
        { name = "recordedAt",  type = "Date",               index = "full" }
        { name = "navStatus",   type = "String"                             }
        { name = "sog",         type = "Double"                             }
        { name = "cog",         type = "Double"                             }
      ]
      user-data = {
        geomesa.table.sharing = "false"
      }
    }
  }


  converters {
    shipdata = {
      type = "json"
      id-field = "concatenate($mmsi,$recordedAt)"
      options {
           error-mode = "raise-errors"
       }
      fields = [
        { name = "mmsi",        json-type = "string",   path = "$.payload.mmsi",             transform = "$0::int"             }
        { name = "lon",         json-type = "double",   path = "$.payload.position[0]"                                         }
        { name = "lat",         json-type = "double",   path = "$.payload.position[1]"                                         }
        { name = "geom",                                                                     transform = "point($lon, $lat)"   }
        { name = "recordedAt",  json-type = "string",   path = "$.payload.recordedAt",       transform = "dateTime($0)"        }        
        { name = "navStatus",   json-type = "string",   path = "$.payload.navStatus"                                           }
        { name = "sog",         json-type = "double",   path = "$.payload.SOG"                                                 }
        { name = "cog",         json-type = "double",   path = "$.payload.COG"                                                 }
      ]
    }
  }
}

Note

The json files can be gzipped, and can reside in HDFS:

ingest -c geomesa.shipdata -C shipdata -s shipdata --user xx --password xx --zookeepers a,b,c  --input-format json  --instance myinstance --run-mode local   hdfs://server:port/ship/archive/2019-01.gz

Troubleshooting

The chat is a helpful place to deposit questions.

Hortonworks

HDP vs HDF

Hortonworks Data Platform (HDP)

Alerts

https://community.hortonworks.com/questions/140101/what-does-hdfs-storage-capacity-usage-weekly-alert.html

Logging

hdfs audit log

Time series

Libraries

twosigma flint python only

spark-timeseries no longer active, the dev recommends flint

Data protection

HDFS

hdfs-audit.log files

Clouds

https://aws.amazon.com/pricing/

https://www.cloudera.com/products/pricing.html

https://azure.microsoft.com/en-us/pricing/

Trainings and Certifications

From here and here

Cloudera (has swallowed Hortonworks)

roadmap for merging HDP into Cloudera

HDP Spark Developer DEV-343

HDP Spark Developer DEV-343

DataFrames/DataSets/RDD, shuffling, transformations and performance tuning, streaming

HDP Data Science SCI-241

HDP Data Science SCI-241

key concepts like NN, classification, regression, clustering, tools and frameworks like spark mlib, tensorflow

Cloudera Certified Associate (CCA)

Cloudera Certified Associate (CCA) consists of 3 parts:

each exam costs $295

course locations

Coursera

https://www.coursera.org/courses?query=apache%20spark

DataBricks

https://academy.databricks.com/catalog

DB 301 - Apache Spark™ for Machine Learning and Data Science

short, self-paced courses based on AWS or Azure: $75

certifications

Dell

site down

example: Specialist -Data Scientist, Advanced Analytics Version 1.0

Google

Heinlein

Big Data mit Hadoop

introduction, overview, first steps

IBM

Data Science Professional Certificate

Advanced Data Science with IBM Specialization

Hewlett Packard (has swallowed MapR)

(merger announcement)

The MapR Academy Certification Program is closed to new registration as we work to update the exams.

Microsoft Azure

SimpliLearn

Big Data Hadoop Certification Training Course

Aligned to Cloudera CCA175 certification exam

Meetups

https://www.meetup.com/Wien-Cloud-Computing-Meetup/

https://www.meetup.com/Big-Data-Vienna/

https://www.meetup.com/Vienna-Data-Science-Group-Meetup/

https://www.ocg.at/cloud-computing-big-data