|
2 years ago | |
---|---|---|
images | 2 years ago | |
services | 2 years ago | |
volume | 2 years ago | |
.gitignore | 2 years ago | |
Makefile | 2 years ago | |
README.md | 2 years ago | |
bigdata-platform.yml | 2 years ago | |
docker-compose-build-all.yml | 2 years ago | |
docker-compose-build.yml | 2 years ago | |
docker-compose.yml | 2 years ago | |
docker-compose.yml.copy | 2 years ago | |
hadoop.yml | 2 years ago | |
hbase.yml | 2 years ago | |
hdfs.md | 2 years ago | |
kafka.yml | 2 years ago | |
logistic-regression.md | 2 years ago | |
pom.xml | 2 years ago | |
proxyer.yml | 2 years ago | |
readme-docker.md | 2 years ago | |
readme-es.md | 2 years ago | |
readme-hadoop.md | 2 years ago | |
readme-hbase.md | 2 years ago | |
readme-kafka.md | 2 years ago | |
readme-spark.md | 2 years ago | |
spark.yml | 2 years ago | |
zookeeper.yml | 2 years ago |
本环境参考了 HDP 的各个组件版本进行配置,因为这些组件间的兼容性会更好点。HDP组件的基本版本如下图: 。
容器版的具体版本如下:
稳定版本 | 最新版本 | 介绍 | |
---|---|---|---|
Java环境 | openjdk:8-jre-alpine | 环境基础 | |
Zookeeper | ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。 分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协 调/通知、集群管理、Master 选举、配置维护,名字服务、分布式同步、分布式锁和分布式队列 等功能。 | ||
Kafka | Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 | ||
Hadoop | 2.7.7 | Hadoop是一个由Apache基金会所开发的分布式系统基础架构。 | |
Spark | 2.4.5 | Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。 | |
Hive | 2.3.7 | hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。 | |
Hbase | 2.1.10 | HBase是一个分布式的、面向列的开源数据库,HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。 | |
Flink |
整个大数据使用使用 docker stack
管理镜像和容器,并进行集群的编排。
上图中,灰色的镜像(centos:6)为docker hub官方基础镜像。其它镜像(twinsen/hadoop:2.7.2等)都是在下层镜像的基础上实现的。这一镜像之间的依赖关系,决定了镜像的编译顺序.
具体安装方法请自行百度,安装完成后,在命令行下输入 docker info
进行测试,输出结果如下图所示,说明安装成功
镜像的构建可以通过 services 目录下的 Dockfile 使用 docker build .
直接生成,也可以使用 docker-compose
生成,推荐后者。
构建基本操作系统和 OpenJDK 环境(TODO)
# 通过yum安装,镜像有点大
docker-compose -f docker-compose-build.yml build os-jvm
# 手动创建 bigdata 的网络, 其他服务可以依次往网络中添加
docker network create --driver overlay --subnet 13.14.15.0/24 --ip-range 13.14.15.0/24 --gateway 13.14.15.1 bigdata
docker pull mysql:5.7
docker pull centos:6
docker pull twinsen/os-jvm:centos6-openjdk8
docker pull twinsen/hadoop:2.7.2
docker pull twinsen/hive:2.1.1
docker pull twinsen/spark:2.1.0
完成上一步的镜像编译工作后,在系统命令行中,可以使用docker images命令查看目前docker环境下的镜像,如下图所示:
为了方便使用,在工程根目录下放置了一个docker-compose.yml文件,这一文件中已经预先配置好了由3个slave节点和1个master节点组成的Spark集群。
在使用集群之前,需要先完成初始化
#[创建容器]
docker-compose up -d
#[格式化HDFS。第一次启动集群前,需要先格式化HDFS;以后每次启动集群时,都不需要再次格式化HDFS]
docker-compose exec spark-master hdfs namenode -format
#[初始化Hive数据库。仅在第一次启动集群前执行一次]
docker-compose exec spark-master schematool -dbType mysql -initSchema
#[将Spark相关的jar文件打包,存储在/code目录下,命名为spark-libs.jar]
docker-compose exec spark-master jar cv0f /code/spark-libs.jar -C /root/spark/jars/ .
#[启动HDFS]
docker-compose exec spark-master start-dfs.sh
#[在HDFS中创建/user/spark/share/lib/目录]
docker-compose exec spark-master hadoop fs -mkdir -p /user/spark/share/lib/
#[将/code/spark-libs.jar文件上传至HDFS下的/user/spark/share/lib/目录下]
docker-compose exec spark-master hadoop fs -put /code/spark-libs.jar /user/spark/share/lib/
#[关闭HDFS]
docker-compose exec spark-master stop-dfs.sh
下面简要介绍启动和关闭Spark集群的步骤(以下步骤均在命令行环境下完成,在工程根目录下执行)
#[启动HDFS]
docker-compose exec spark-master start-dfs.sh
#[启动YARN]
docker-compose exec spark-master start-yarn.sh
#[启动Spark]
docker-compose exec spark-master start-all.sh
#[停止Spark]
docker-compose exec spark-master stop-all.sh
#[停止YARN]
docker-compose exec spark-master stop-yarn.sh
#[停止HDFS]
docker-compose exec spark-master stop-dfs.sh
#[停止容器]
docker-compose down
目前集群中采用的是1个master节点和3个slave节点的分配方案,可以通过调整docker-compose配置文件以及相应软件的配置文件来实现集群扩容,暂时无法做到自动化扩容。
编写程序可以使用任意的IDE和操作系统,程序编写完成后,打包为jar文件,然后放在工程根目录下的./volume/code/目录下。任何一个集群环境下,都会在集群启动时将code目录挂载在master节点的/code路径下。
如果要执行wordcount程序(在volume/code/tests/mapreduce-test目录下已经包含了)。在启动集群并启动各服务进程后。执行下列语句,可以进入master节点的命令行环境:
docker-compose exec spark-master /bin/bash
docker ps -a
(查看所有的容器,包括已经停止的)docker-compose exec spark-master /bin/bash
进入shell.docker inspect spark-master | grep volume
hdfs dfs -put /code.yun.csv /user
hdfs dfs -get /user/yun.csv /code
dfs dfs -ls /user
求最大值:
package yun.mao
/**
* @Classname MaxPrice
* @Description TODO
* @Date 19-3-18 下午2:34
* @Created by mao<tianmao818@qq.com>
*/
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf,SparkContext}
object MaxPrice {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("Max Price")
val sc = new SparkContext(conf)
sc.textFile(args(0))
.map(_.split(","))
.map(rec => ((rec(0).split("-"))(0).toInt, rec(1).toFloat))
.reduceByKey((a,b) => Math.max(a,b))
.saveAsTextFile(args(1))
}
}
spark-submit --class yun.mao.MaxPrice --master yarn --deploy-mode cluster yunmao.jar hdfs://hadoop-master:54310/user/yun.csv hdfs://hadoop-master:54310/user/mao.txt
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
进入mysql容器:docker exec -it mysql容器ID /bin/bash
# 在配置文件中查看mysql配置信息
mysql:
image: mysql:5.7
volumes:
- "./volume/mysql:/var/lib/mysql"
container_name: mysql
hostname: mysql
networks:
- spark
environment:
- MYSQL_ROOT_PASSWORD=hadoop
tty: true
CREATE TABLE `people` (
`name` varchar(150) NOT NULL,
`user_id` int(11) NOT NULL,
PRIMARY KEY (`age`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `people` VALUES("mao",24);
INSERT INTO `people` VALUES("yun",24);
spark连接数据库
val sqlContext=new org.apache.spark.sql.SQLContext(sc)
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
//首先使用docker inspect查看容器的ip
// 使用useSSL=false,其他参数分割使用&符号
val url ="jdbc:mysql://172.20.0.2:3306/test?useSSL=false&user=root&password=hadoop"
val df = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "people").load()
// Looks the schema of this DataFrame.
df.printSchema()
// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()
// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
val sqlContext=new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = Seq(
(1,5),
(1,6),
(1,7),
(1,8),
(0,1),
(0,2),
(0,3),
(0,4)
).toDF("label", "features")
import org.apache.spark.ml.classification.LogisticRegression
val model = lr.fit(df)
val weights = model.weights
model.transform(df).show()
flume的监控方式
# 监控一个文件,导向kafka中
agent.sources=s1
agent.sinks=k1
agent.channels=c1
agent.sources.s1.type=exec
agent.sources.s1.command = tail -f /home/logdfs/log
agent.sources.s1.channels=c1
agent.sources.s1.shell = /bin/sh -c
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100
agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList=kafka2:9093
agent.sinks.k1.topic=test0
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
agent.sinks.k1.channel=c1
# 其他方式
agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 5678
agent.sources.s1.channels = c1
agent.sinks.sk1.type = logger
agent.sinks.sk1.channel = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
agent.sources.s1.type = spooldir
agent.sources.s1.spoolDir =/var/log
agent.sources.s1.fileHeader = true
agent.sources.s1.channels = c1
agent.sinks.sk1.type = logger
agent.sinks.sk1.channel = c1
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10004
agent.channels.c1.transactionCapacity = 100
zookeeper相关的配置
clientPort=2181
dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
测试kafka(kafka依赖与zookeeper,在docker-compose文件中已经进行了配置,必须在zookeeper服务启动起来的时候kafka才能够被启动起来)
# 创建topic
kafka-topics.sh --create --zookeeper zoo1:2181, zoo2:2181, zoo3:2181 --replication-factor 3 --partitions 3 --topic test
# 查看topic
kafka-topics.sh --describe --zookeeper zoo1:2181, zoo2:2181, zoo3:2181 --topic test
# 创建生产者
kafka-console-producer.sh --broker-list kafka1:9092 -topic test
# 创建消费者
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
导入相关的jars:spark-shell --jars
# 注意jar之间需要使用逗号进行分割
spark-shell --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar,kafka_2.12-2.1.1.jar,kafka-clients-2.1.1.jar
spark-shell脚本
# 导入包
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession
# 配置
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "kafka1:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","group.id" -> "group_1","enable.auto.commit" -> (false: java.lang.Boolean))
val streamingContext = new StreamingContext(sc, Seconds(1))
val topics = Array("test0")
# 接受消息
val kafkaStream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
# 分词
val words = kafkaStream.transform { rdd =>rdd.flatMap(record => (record.value().toString.split(" ")))}
# 打印
words.print()
# 启动
streamingContext.start()
# 关闭
streamingContext.awaitTermination()
执行结果(flume监控log文件,log文件增加一行触发,flume将增加的行输入到kafka,spark从kafka订阅了topic接收消息,spark streaming自动对接收到的行进行分词并打印)