User Tools

Site Tools


bigdata

Apache Kafka

Apache Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system.

Kafka is run as a cluster that stores streams of records in categories called topics. A topic is a category or feed name to which records are published.

Kafka has four core APIs:

  • Producers publish a stream of records to one or more Kafka topics.
  • Consumers subscribe to one or more topics and process the stream of records produced to them.
  • Stream processors allow an application to consume input streams and produce output streams.
  • Connectors are producers or consumers that connect Kafka topics to existing applications or data systems

More information about Kafka's design philosophy, use cases.

Implementation examples for Producers and Consumers, Kafka with Spring WebSockets.

Apache Storm

Apache Flume

Version 1.5. repository for looking into implementation details.

Agents

Custom source

Example for a PollableSource .

EventDrivenSource

A tiny example for an EventDrivenSource

  • implements Configurable: configure(Context ctx)
  • implement EventDrivenSource
  • overrides start() and stop()
  • based on WebsocketClient, a Endpoint implementation.
public class CustomSource extends AbstractSource implements Configurable, EventDrivenSource {
	private WebsocketClient clientEndPoint;
 
        @Override
  	public void configure(Context ctx) {		
		try {
			String endpoint = ctx.getString("endpoint");
			String user = ctx.getString("user");
			String pw = ctx.getString("password");
			String subscribe = ctx.getString("subscriptionString");
			clientEndPoint = new WebsocketClient(new URI(endpoint), user, pw, subscribe);
		} catch (DeploymentException | IOException | URISyntaxException e) {
			e.printStackTrace();
		}
 
		clientEndPoint.addMessageHandler(new MessageHandler() {
			public void handleMessage(String message) {
				ChannelProcessor processor = getChannelProcessor();						
				Event event = new SimpleEvent();
				JsonValue value = Json.parse(message);
 
				event.setHeaders(createEventHeaders(value));
				event.setBody(message.getBytes());
				processor.processEvent(event);
 
			}		
 
			private Map<String, String> createEventHeaders(JsonValue value) {
                                Map<String, String> h = new TreeMap<String, String>();
 
				String oid = value.asObject().get("payload").asObject().get("id").asString();
 
				h.put("oid", oid);
				return h;
			}
		}) ;
	}
 
	@Override
	public void start() {
		super.start();
	}
 
	@Override
	public void stop() {
		super.stop();
	}
}

HDFS sinks

HDFS sinks can produce different file formats: SequenceFile, DataStream or CompressedStream.

I used human-readable SequenceFiles

ais.sinks.hdfs1.hdfs.fileType = SequenceFile
ais.sinks.hdfs1.hdfs.writeFormat = Text

which results in:

1552319652676   {"id":"219014435","position":[11.858878135681152,57.645938873291016]}
1552319652684   {"id":"205484390","position":[3.687695026397705,51.09416961669922]}

The key (first part of each line) is by default, the event header timestamp (read e.g. here for custom keys).

If a key is not required, use a DataStream

ais.sinks.hdfs1.hdfs.fileType = DataStream 	
ais.sinks.hdfs1.hdfs.writeFormat = Text

instead.

Using a CompressedStream saves storage space:

ais.sinks.hdfs1.hdfs.fileType = DataStream 	
ais.sinks.hdfs1.hdfs.codeC = gzip
ais.sinks.hdfs1.hdfs.writeFormat = Text

Creating a more customised file name is possible via event header items. As implemented above, the header contains an object id called “oid” that can be used in the agent configuration:

ais.sinks.hdfs1.hdfs.filePrefix = %{oid}

Troubleshooting

ChannelFullException: Space for commit to queue couldn't be acquired.

I tried to write to a HDFS sink, and got this exception in the logs:

org.apache.flume.ChannelException: Unable to put event on required channel: org.apache.flume.channel.MemoryChannel{name: c1}
        at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
...
 
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:132)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
        ... 33 more

After researching, increasing JVM_OPTS, and unsuccessfully tuning the channel parameters, the problem turned out to be that the flume user was not allowed to write to the hdfs.path.

Installing plugins in Ambari

Flume plugins have their own dedicated directory. In Ambari it's:

mkdir /usr/hdp/current/flume-server/plugins.d/

Dependencies go in to a subdir:

plugins.d/
plugins.d/MySource/
plugins.d/MySource/lib/my-source.jar
plugins.d/MySource/libext/a-dependency-6.6.6.jar

or provide a fat jar instead.

Managing agents in Ambari

GeoMesa

Simple Feature Types

Ingesting into table

General advice for developing converters

CSV

The CSV converter docs contain a lot of useful information.

JSON

The JSON converter docs already explains a lot. To ingest a ship data file which contains multiple JSON documents:

{"payload":{"mmsi":"11111","position":[5.061738442342835,52.4892342343],"SOG":0.0,"COG":0.0,"navStatus":"engaged in fishing","recordedAt":"2019-01-14T00:00:00.072Z"}}
{"payload":{"mmsi":"66666","position":[10.869939804077148,59.077205657958984],"SOG":0.0,"COG":290.3,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.084Z"}}
{"payload":{"mmsi":"88888","position":[30.880006790161133,70.88723754882812],"SOG":4.9,"COG":232.6,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.103Z"}}

you can use this converter definition

cat conf/sfts/shipdata/reference.conf
geomesa {
  sfts {
    shipdata = {
      fields = [
        { name = "mmsi",        type = "Integer",            index = "full" }
        { name = "geom",        type = "Point", srid = 4326, index = true   }
        { name = "recordedAt",  type = "Date",               index = "full" }
        { name = "navStatus",   type = "String"                             }
        { name = "sog",         type = "Double"                             }
        { name = "cog",         type = "Double"                             }
      ]
      user-data = {
        geomesa.table.sharing = "false"
      }
    }
  }


  converters {
    shipdata = {
      type = "json"
      id-field = "concatenate($mmsi,$recordedAt)"
      options {
           error-mode = "raise-errors"
       }
      fields = [
        { name = "mmsi",        json-type = "string",   path = "$.payload.mmsi",             transform = "$0::int"             }
        { name = "lon",         json-type = "double",   path = "$.payload.position[0]"                                         }
        { name = "lat",         json-type = "double",   path = "$.payload.position[1]"                                         }
        { name = "geom",                                                                     transform = "point($lon, $lat)"   }
        { name = "recordedAt",  json-type = "string",   path = "$.payload.recordedAt",       transform = "dateTime($0)"        }        
        { name = "navStatus",   json-type = "string",   path = "$.payload.navStatus"                                           }
        { name = "sog",         json-type = "double",   path = "$.payload.SOG"                                                 }
        { name = "cog",         json-type = "double",   path = "$.payload.COG"                                                 }
      ]
    }
  }
}

Note

  • the feature-path parameter is optional, and not needed for our simple documents
  • to create points from a json array, an intermediate transformation step is defined to extract lat and lon

The json files can be gzipped, and can reside in HDFS:

ingest -c geomesa.shipdata -C shipdata -s shipdata --user xx --password xx --zookeepers a,b,c  --input-format json  --instance myinstance --run-mode local   hdfs://server:port/ship/archive/2019-01.gz

Troubleshooting

The chat is a helpful place to deposit questions.

Hortonworks

Hortonworks Data Platform (HDP)

Alerts

Logging

Time series

Libraries

twosigma flint python only

spark-timeseries no longer active, the dev recommends flint

Data protection

HDFS

hdfs-audit.log files

Clouds

Trainings and Certifications

From here and here

Cloudera (has swallowed Hortonworks)

HDP Spark Developer DEV-343

HDP Spark Developer DEV-343

DataFrames/DataSets/RDD, shuffling, transformations and performance tuning, streaming

  • dataOps, analysts
  • 4 days
  • 3200 $ virtual classroom or
  • 3500 € Paris

HDP Data Science SCI-241

HDP Data Science SCI-241

key concepts like NN, classification, regression, clustering, tools and frameworks like spark mlib, tensorflow

  • analysts, scientists
  • 4 days
  • on request

Cloudera Certified Associate (CCA)

Coursera

DataBricks

https://academy.databricks.com/catalog

DB 301 - Apache Spark™ for Machine Learning and Data Science

  • 3 days
  • virtual classroom
  • $2500

short, self-paced courses based on AWS or Azure: $75

certifications

Dell

Google

Heinlein

Big Data mit Hadoop

introduction, overview, first steps

  • analysts, dataOps
  • 3 days
  • 2000 €
  • Berlin

IBM

Hewlett Packard (has swallowed MapR)

Microsoft Azure

SimpliLearn

Big Data Hadoop Certification Training Course

Aligned to Cloudera CCA175 certification exam

Meetups

bigdata.txt · Last modified: 2020/07/13 13:39 by mantis