Week 1
Data based on their format
1. Structured Data
-
Characteristics: Conforms to a data model or schema, typically relational, and can be managed using a Database Management System (DBMS).
符合数据模型或模式,通常是关系型数据,可以用数据库管理系统 (DBMS) 管理
-
Examples: Banking transactions, electronic health records. 银行交易记录、电子健康记录
2. Unstructured Data
-
Characteristics: Does not conform to any data model or schema, often textual or binary, and can be stored as Binary Large Objects (BLOBs) in a DBMS or NoSQL databases.
不符合任何数据模型或模式,通常是文本或二进制数据,可以作为二进制大对象 (BLOBs) 存储在 DBMS 或 NoSQL 数据库中
-
Examples: Tweets, video files.
推文、视频文件。
3. Semi-structured Data 半结构化数据
-
Characteristics: Non-relational data with a defined level of structure or consistency, often hierarchical or graph-based.
非关系型数据,但具有一定程度的结构或一致性,通常是层次结构或图结构。
-
Examples: Spreadsheets, XML data, sensor data, JSON data.
电子表格、XML 数据、传感器数据、JSON 数据。
4. Metadata
-
Characteristics: Provides information about a dataset’s characteristics and structure, including data provenance (where the data comes from).
提供数据集的特征和结构信息,包括数据来源(数据溯源)。
-
Examples: XML tags, attributes, file metadata.
XML 标签、属性、文件元数据等。
Big data characteristics
Definition
Attribute / 属性 | Description / 描述 |
---|---|
Volume / 数据量 | Data quantity is substantial and ever growing. Different storage and processing requirements. 数据量巨大且不断增长。存储和处理需求不同。 |
Velocity / 数据速度 | Data speed may be high. Elastic and available time for processing. 数据速度可能很高。处理时间具有弹性和可用性。 |
Variety / 数据多样性 | Multiple formats and types. Different integration, transformation, processing, and storage requirements. 多种格式和类型。集成、转换、处理和存储需求不同。 |
Veracity / 数据真实性 | Bias, noise, abnormalities. Different requirements for removing noise and resolving invalid data. 偏差、噪声、异常。去除噪声和解决无效数据的需求不同。 |
Value / 数据价值 | Data usefulness (utility). Depends on veracity, time of processing, storage & analysis decisions. 数据的有用性(效用)。取决于真实性、处理时间、存储和分析决策。 |
Example
- Volume(数据量):交通运输系统生成大量数据,例如航空公司的乘客记录和汽车的GPS数据。这些数据需要大量的存储和处理资源,通常使用云服务器和分布式计算框架。
- Velocity(数据速度):数据生成和更新的速度非常快,例如汽车位置的实时变化。系统需要能够高效处理这些高速数据流,并适应数据速度的变化。
- Variety(数据多样性):交通运输数据包括多种类型的数据,如乘客的 demographics、图片(如扫描的护照)和GPS坐标。这些数据需要集成和转换,以便进行有意义的分析,通常使用NoSQL数据库等特殊软件。
- Veracity(数据准确性):数据可能存在噪声或错误,例如由于设备限制或拼写错误。需要适当的技术和软件来清理和替换这些不准确的数据。
- Value(数据价值):交通运输数据的价值很高,特别是在反恐、反欺诈、污染管理和交通控制等应用中。然而,提取这些价值需要有效的数据存储和处理方法,以确保数据的时效性和准确性。
Big data analytics
Definition: Refers to the processes, technologies, and frameworks used to analyze data 用于分析数据的过程、技术和框架
Goal: To discover knowledge that makes systems smarter, faster, and more efficient 通过分析发现知识,使系统更智能、更快速、更高效
What Analytics Involve
-
Data Filtering 数据过滤:
-
Screening relevant and useful information from large datasets.
从大量数据中筛选出相关和有用的信息
-
Removing noise and unnecessary data.
去除噪声和不必要的数据
-
-
Data Processing 数据处理 :
-
Cleaning, transforming, and organizing data.
对数据进行清洗、转换和整理
-
Ensuring data quality and consistency.
确保数据的质量和一致性
-
-
Data Categorization 数据分类:
-
Classifying data based on specific criteria or features.
将数据按照特定的标准或特征进行分类
-
Facilitating subsequent analysis and understanding.
便于后续的分析和理解
-
-
Data Condensation 数据压缩:
-
Simplifying large datasets into more manageable forms.
将大量数据简化为更易管理的形式
-
Extracting key information and summaries.
提取关键信息和摘要
-
-
Data Contextualization 数据情境化:
-
Placing data within specific contexts or environments for analysis.
将数据置于特定的背景或环境中进行分析
-
Helping to understand the practical significance and application scenarios of data.
帮助理解数据的实际意义和应用场景
-
Example
- 第一个例子
步骤 | 任务描述 | 目的 |
---|---|---|
数据过滤 | 从图书馆的书籍列表中筛选出小说类书籍。 | 去除非小说类书籍,专注于分析小说数据。 |
数据处理 | 更新书籍列表,为每本书添加页数信息。 | 确保数据完整,便于后续分析和使用。 |
数据分类 | 将小说类书籍进一步分类为经典小说和现代小说。 | 细化数据分类,便于更精确的分析和管理。 |
数据压缩 | 为每本书创建一个简化的摘要,仅包含书名和作者。 | 简化数据,提取关键信息,便于快速浏览和参考。 |
数据情境化 | 为每本书添加基于其类型的简要描述。 | 提供额外的背景信息,帮助理解书籍的内容和特点。 |
- 第二个例子
步骤 | 任务描述 | 目的 |
---|---|---|
数据过滤 | 移除不相关的反馈,如垃圾信息或过短的评论(例如“好”或“可以”)。 | 确保分析的数据是相关且有意义的。 |
数据处理 | 对文本数据进行预处理,移除停用词(例如“the”、“and”)并将文本转换为小写。 | 清理数据,便于后续的文本分析和处理。 |
数据分类 | 使用情感分析将反馈分类为“正面”、“负面”和“中立”。 | 细化反馈分类,便于理解客户情绪和态度。 |
数据压缩 | 将客户反馈总结为关键主题。 | 简化数据,提取核心信息,便于快速理解反馈的主要内容。 |
数据情境化 | 为反馈添加背景信息,向利益相关者提供可操作的见解。 | 提供上下文,帮助利益相关者理解反馈的实际意义并采取相应行动。 |
Based on Their Goals
Big data analytics can be categorized into four types based on their goals: Descriptive, Diagnostic, Predictive, and Prescriptive analytics. Here’s a detailed breakdown of each:
大数据分析根据其目标可以分为四类:描述性分析、诊断性分析、预测性分析和规范性分析。以下是每类的详细说明
-
Descriptive Analytics 描述性分析:
-
Goal: To answer "What has happened?"
目标:回答“发生了什么”。
-
Method: Summarizes past data to provide insights into historical events.
方法:基于过去的数据,以总结的形式呈现。
-
Application: Used to summarize historical data and provide an understanding of past events.
应用:用于总结历史数据,提供对过去事件的洞察。
-
-
Diagnostic Analytics 诊断性分析:
-
Goal: To answer "Why did it happen?"
目标:回答“为什么会发生”。
-
Method: Analyzes past data to identify the causes of events.
方法:基于过去的数据,分析事件的原因。
-
Application: Used to diagnose the root causes of issues and understand the reasons behind data patterns.
应用:用于识别问题的根本原因,帮助理解数据背后的原因。
-
-
Predictive Analytics 预测性分析:
-
Goal: To answer "What is likely to happen?"
目标:回答“可能会发生什么”。
-
Method: Uses existing data to forecast future trends and outcomes.
方法:基于现有数据,预测未来的趋势和结果。
-
Application: Used to predict future events and support proactive decision-making.
应用:用于预测未来事件,帮助做出前瞻性决策。
-
-
Prescriptive Analytics 规范性分析:
-
Goal: To answer "What can we do to make something happen?"
目标:回答“我们可以做什么来促使某事发生”。
-
Method: Provides actionable recommendations based on existing data.
方法:基于现有数据,提供行动建议。
-
Application: Used to optimize decisions and suggest specific actions to achieve desired outcomes.
应用:用于优化决策,提供具体的行动方案以实现特定目标。
-
Based on Their Goals
-
Basic Statistics 基本统计: 用于数据总结和解释。
-
Generalised N-body Problems 广义N体问题: 用于处理涉及多个相互作用实体的问题,如模拟物理系统或复杂网络中的相互作用。
- Examples: clustering (constructs groups of data), classification (to be discussed later)
- Challenge: high-dimensionality
-
Linear Algebra 线性代数: 用于分析关系和连接。
-
Example: Investigate why a failure happened by PCA followed by clustering (i.e., make a data summary, group the data in it and see if they explain the failure).
-
Challenge: Large matrices with “difficult” properties
-
-
Graph-theoretic 图论: 用于识别社交网络中的有影响力用户。
- Examples: Search, centrality, shortest paths
- Challenge: high interconnectivity (complex relationships)
-
Optimisation Problems 优化问题: 用于找到最佳解决方案。
-
Integration Problems 积分计算: 用于结合不同数据源。
-
Alignment Problems 对齐问题: 用于匹配和统一不同数据集。
- Descriptive Analytics
- Basic Statistics
- Linear Algebra
- Diagnostic Analytics
- Generalised N-body Problems
- Linear Algebra
- Graph-theoretic
- Predictive Analytics
- Generalised N-body Problems
- Linear Algebra
- Graph-theoretic
- Integration Problems
- Alignment Problems
- Prescriptive Analytics
- Generalised N-body Problems
- Graph-theoretic
- Optimisation Problems
- Integration Problems
- Alignment Problems
Flow
Data processing model
-
Default Mode 默认模式:
-
Description: The dataset is stored in memory (RAM).
描述:数据集存储在内存(RAM)中。
-
Characteristics: Fast data access, suitable for small to medium-sized datasets.
特点:数据访问速度快,适合处理中小规模数据集。
-
-
Streaming Mode 流式处理:
-
Description: Data arrives as a stream, and only a part (window) of the data is stored in memory.
描述:数据以流的形式到达,只有一部分(窗口)数据存储在内存中。
-
Characteristics: Suitable for real-time data streams, efficiently handles continuously arriving data.
特点:适合处理实时数据流,能够高效处理连续到达的数据。
-
-
Distributed Mode 分布式处理:
-
Description: Data is distributed across multiple machines (stored in memory and/or disk).
描述:数据分布在多台机器上(存储在内存和/或磁盘中)。
-
Characteristics: Suitable for large-scale datasets, leverages the computational and storage resources of multiple machines.
特点:适合处理大规模数据集,能够利用多台机器的计算和存储资源。
-
-
Multi-threaded Mode 多线程处理:
-
Description: Data is stored on a single machine, and multiple processors share the machine's memory.
描述:数据存储在一台机器上,多个处理器共享该机器的内存。
-
Characteristics: Suitable for utilizing the parallel computing power of multi-core processors, improving data processing efficiency.
特点:适合利用多核处理器的并行计算能力,提高数据处理效率。
-
Quiz
Quiz 1
Which of the following is/are correct about Big Data? Select all answers that apply.
- [ ] A. There is one, universal format for Big Data.
- [x] B. The speed of Big Data may be high.
- [x] C. The value of Big Data depends on their time of processing.
- [ ] D. Big Data are typically free from noise.
- [ ] E. Big Data quantity is not growing.
Quiz 2
Which of the following is true about Big Data formats? Select all answers that apply.
- [ ] A. Big Data is always structured.
- [x] B. Big Data can include structured, semi-structured, and unstructured data.
- [ ] C. Tweets and videos are examples of structured data.
- [x] D. JSON and XML data are examples of semi-structured data.
- [x] E. Structured data is stored in relational tables.
Quiz 3
Which of the following is a challenge of working with Big Data? Select all answers that apply.
- [x] A. Managing diverse data types and formats.
- [x] B. Handling rapid data generation (velocity).
- [x] C. Storing and processing massive volumes of data efficiently.
- [x] D. Ensuring data security and privacy during storage and processing.
- [x] E. Extracting meaningful insights from raw data.
Quiz 4
You want to know what is likely to happen to the stock price of Tesla based on current market data and/or other influencing factors (e.g., political risk). Which type of analytics goal would this correspond to? Justify your answer by examining the goal.
- [ ] A. Descriptive Analytics
- [ ] B. Diagnostic Analytics
- [x] C. Predictive Analytics
- [ ] D. Prescriptive Analytics
Solution
Quiz 1
-
Correct Answers:
-
B. The speed of Big Data may be high.
-
C. The value of Big Data depends on their time of processing.
-
-
Explanation:
-
A. Incorrect. Big Data comes in various formats (structured, unstructured, semi-structured), and there is no universal format.
-
B. Correct. Big Data is often characterized by high velocity, meaning it is generated and processed at a rapid pace.
-
C. Correct. The value of Big Data can diminish over time, so timely processing is crucial.
-
D. Incorrect. Big Data often contains noise, errors, or irrelevant information, which needs to be cleaned and processed.
-
E. Incorrect. The volume of Big Data is continuously growing due to the increasing amount of data generated from various sources.
-
-
Quiz 2
- Correct Answers:
- B. Big Data can include structured, semi-structured, and unstructured data.
- D. JSON and XML data are examples of semi-structured data.
- E. Structured data is stored in relational tables.
- Explanation:
- A. Incorrect. Big Data is not always structured; it can also be semi-structured or unstructured.
- B. Correct. Big Data includes structured (e.g., relational tables), semi-structured (e.g., JSON, XML), and unstructured data (e.g., videos, tweets).
- C. Incorrect. Tweets and videos are examples of unstructured data, not structured data.
- D. Correct. JSON and XML are examples of semi-structured data formats.
- E. Correct. Structured data is typically stored in relational tables with rows and columns.
- Correct Answers:
-
Quiz 3
- Correct Answers:
- A. Managing diverse data types and formats.
- B. Handling rapid data generation (velocity).
- C. Storing and processing massive volumes of data efficiently.
- D. Ensuring data security and privacy during storage and processing.
- E. Extracting meaningful insights from raw data.
- Explanation:
- A. Correct. Big Data often includes structured, semi-structured, and unstructured data, making it challenging to manage diverse formats.
- B. Correct. The high velocity of data generation (e.g., real-time data streams) poses a significant challenge for processing and analysis.
- C. Correct. Storing and processing large volumes of data efficiently requires scalable infrastructure and advanced tools.
- D. Correct. Data security and privacy are critical concerns, especially when dealing with sensitive or personal information.
- E. Correct. Extracting meaningful insights from raw data requires advanced analytics, machine learning, and domain expertise.
- Correct Answers:
-
Quiz 4
- Correct Answer:
C. Predictive Analytics - Explanation:
- The goal is to determine what is likely to happen to Tesla's stock price in the future based on current market data and influencing factors.
- Correct Answer:
Week 2
Focus on Data Collection and Preparation
Data Collection
Big data can be collected from many different sources, including databases, such as MySQL, social networks and sensors.
大数据可从许多不同来源收集,包括 MySQL 等数据库、社交网络和传感器。
Due to the big data characteristics, we need special tools and frameworks for collectingbig data. We refer to such tools as data access connectors.
由于大数据的特点,我们需要专门的工具和框架来收集大数据。我们将此类工具称为数据访问连接器。
Data Access Connectors
There are five common types of data access connectors.
-
Publish-Subscribe Messaging 发布-订阅消息传递:
- Description: Publishers send messages to a topic managed by a broker (intermediary). Subscribers subscribe to topics. Brokers route messages from publishers to subscribers.
描述:发布者将消息发送到由代理(中介)管理的主题。订阅者订阅主题。代理将消息从发布者路由到订阅者。
- Examples: Apache Kafka, Amazon Kinesis.
示例:Apache Kafka、Amazon Kinesis。
-
Source-Sink Connectors 源-汇连接器:
- Description: Source connectors import data from another system (e.g., relational database) into a centralized data store (e.g., distributed file system). Sink connectors export data to another system.
描述:源连接器将数据从另一个系统(例如关系数据库)导入到集中式数据存储(例如分布式文件系统)。汇连接器将数据导出到另一个系统。
- Examples: Apache Flume.
示例:Apache Flume。
-
Database Connectors 数据库连接器:
- Description: Import data from relational databases into big data stores. These are specialized connectors for use together with databases.
描述:将数据从关系数据库导入到大数据存储中。这些是专门用于与数据库一起使用的连接器。
- Examples: Apache Sqoop.
示例:Apache Sqoop。
-
Messaging Queues 消息队列:
- Description: Producers push data to the queues. Consumers pull data from the queues. Producers and consumers do not need to be aware of each other.
描述:生产者将数据推送到队列中。消费者从队列中拉取数据。生产者和消费者不需要彼此了解。
- Examples: RabbitMQ, ZeroMQ, Amazon SQS.
示例:RabbitMQ、ZeroMQ、Amazon SQS。
-
Custom Connectors 自定义连接器:
- Description: Built for collecting data of specific types, such as data from social networks (e.g., Twitter) and NoSQL databases (e.g., Internet of Things). Such connectors are built based on the data sources they are targeted at, and the specific collection requirements for data produced by these data sources.
描述:用于收集特定类型的数据,例如来自社交网络(如Twitter)和NoSQL数据库(如物联网)的数据。这些连接器是基于其目标数据源以及这些数据源生成的数据的特定收集需求而构建的。
- Examples: Connectors for Twitter, IoT devices.
示例:用于Twitter、物联网设备的连接器
Apache Sqoop
Import Process
-
Step 1: Examine the Table 检查表:
-
Description: The table to be imported from the DBMS to the distributed file system (Hadoop) is examined by querying the metadata of the table. At this point, there is no data transfer.
描述:通过查询表的元数据,检查要从DBMS导入到分布式文件系统(Hadoop)的表。此时,没有数据传输。
-
Purpose: Understand the structure of the table (e.g., columns, data types) before initiating the transfer.
目的:在启动传输之前,了解表的结构(例如列、数据类型)。
-
-
Step 2: Generate JAVA Code 生成JAVA代码:
-
Description: Sqoop generates JAVA code that describes the data transfer. A JAVA class corresponds to a table, a method corresponds to an attribute, and there are also JAVA methods for interacting with JDBC.
描述:Sqoop生成描述数据传输的JAVA代码。一个JAVA类对应一个表,一个方法对应一个属性,还有一些JAVA方法用于与JDBC交互。
-
Purpose: Automate the data transfer process and ensure compatibility with Hadoop and the DBMS.
目的:自动化数据传输过程,并确保与Hadoop和DBMS的兼容性。
-
-
Step 3: Initiate Data Transfer 启动数据传输:
-
Description: Sqoop connects to the distributed file system (Hadoop) and submits a MapReduce job, as specified by the JAVA code. This job then transfers the data from the DBMS. The data transfer can be parallelized for efficiency.
描述:Sqoop连接到分布式文件系统(Hadoop)并提交一个MapReduce作业,该作业由JAVA代码指定。然后,该作业从DBMS传输数据。数据传输可以并行化以提高效率。
-
Purpose: Efficiently move large volumes of data from the DBMS to Hadoop.
目的:高效地将大量数据从DBMS移动到Hadoop。
-
Export Process
-
Step 1: Pick a Strategy for the Target Table 选择目标表的策略:
-
Description: Sqoop picks a strategy for the target table based on its metadata (e.g., column names, types, etc.).
描述:Sqoop根据目标表的元数据(例如列名、类型等)选择策略。
-
Purpose: Ensure the export process is optimized for the specific table structure.
目的:确保导出过程针对特定表结构进行优化。
-
-
Step 2: Generate JAVA Code 生成JAVA代码:
-
Description: Sqoop generates JAVA code to parse records from text files and generate INSERT statements.
描述:Sqoop生成JAVA代码,用于从文本文件中解析记录并生成INSERT语句。
-
Purpose: Automate the process of converting data from Hadoop format to a format suitable for the DBMS.
目的:自动化将数据从Hadoop格式转换为适合DBMS的格式的过程。
-
-
Step 3: Launch MapReduce Job 启动MapReduce作业:
-
Description: The JAVA code is used in the submitted MapReduce job that will export the data. The job reads data from Hadoop, processes it, and writes it to the target database table.
描述:JAVA代码用于提交的MapReduce作业中,该作业将导出数据。作业从Hadoop读取数据,处理数据,并将其写入目标数据库表。
-
Purpose: Efficiently transfer large volumes of data from Hadoop to the DBMS.
目的:高效地将大量数据从Hadoop传输到DBMS。
-
-
Step 4: Improve Insert Speed 提高插入速度 (可选)
为了提高效率,“m”个映射器并行写入数据,一个 INSERT 命令可能会传输多行
Apache Flume
-
Apache Flume is a system for collecting, aggregating, and moving data into a centralized data store (e.g., distributed file system) from various sources.
Apache Flume 是一个系统,用于从各种来源收集、汇总数据并将其移入一个集中式数据存储区(如分布式文件系统)。
-
Flume is more reliable, scalable, and efficient compared to ad-hoc solutions.
与临时解决方案相比,Flume 更可靠、可扩展且高效。
-
Flume can be easily managed and customized by users. The cost of installation, operation, and maintenance is low.
Flume 可以轻松地由用户管理和定制。安装、操作和维护的成本较低。
Main components their function
-
Agent 代理:
- Description: A process that hosts components through which the events flow or are transported from one place to another.
描述:一个托管组件的进程,事件通过这些组件从一个地方流动或传输到另一个地方。
- Function: Acts as the runtime environment for Flume components (sources, channels, sinks) to facilitate data flow.
功能:作为Flume组件(源、通道、接收器)的运行时环境,促进数据流动。
-
Source 数据源:
- Description: Receives data from data generators and transfers it to one or more channels.
描述:从数据生成器接收数据并将其传输到一个或多个通道。
-
Requirements:
-
Requires at least one channel to function.
需要至少一个通道才能运行。
-
Different types of sources are available for integration with well-known systems (e.g., Kafka, HTTP, Avro).
提供不同类型的源,用于与知名系统(如Kafka、HTTP、Avro)集成。
-
Function: Acts as the entry point for data into the Flume system.
功能:作为数据进入Flume系统的入口点。
-
Source 数据源:
- Description: Receives data from data generators and transfers it to one or more channels.
描述:数据源从数据生成器接收数据并将其传输到一个或多个通道。
- Function: Acts as the entry point for data into the Flume system.
功能:作为数据进入Flume系统的入口点。
-
Interceptor 拦截器:
- Description: Applied to source to modify, filter, or drop events.
描述:应用于源,以修改、过滤或删除事件。
- Function: Allows for preprocessing of data before it enters the channel.
功能:允许数据进入通道之前进行预处理。
-
Channel 通道:
- Description: A transient store which buffers events until they are consumed by sinks.
描述:一个临时存储,用于缓冲事件,直到它们被接收器使用。
- Function: Ensures reliable data transfer between sources and sinks.
功能:确保源和接收器之间的可靠数据传输。
-
Types:
-
Memory Channel 内存通道:
- Description: Stores events in memory (queue). Provides high throughput.
描述:将事件存储在内存(队列)中。提供高吞吐量。
- Pros: Fast data access and processing.
优点:快速的数据访问和处理。
- Cons: Data is lost if the agent fails.
缺点:如果代理失败,数据会丢失。
-
File Channel 文件通道:
- Description: Stores events in local files on disk(s). Uses a checkpoint file to track the state of an in-memory queue.
描述:将事件存储在磁盘上的本地文件中。使用检查点文件跟踪内存队列的状态。
- Pros: Persistent storage, ensuring data is not lost even if the agent fails.
优点:持久化存储,确保即使代理失败,数据也不会丢失。
- Cons: Slower than memory channels due to disk I/O.
缺点:由于磁盘I/O,速度比内存通道慢。
-
JDBC Channel JDBC通道:
- Description: Stores events in a relational database using JDBC.
描述:使用JDBC将事件存储在关系数据库中。
- Pros: Reliable and scalable, suitable for environments requiring database integration.
优点:可靠且可扩展,适合需要数据库集成的环境。
- Cons: Performance depends on the database system.
缺点:性能取决于数据库系统。
-
Custom Channel 自定义通道:
-
Description: Allows users to implement their own channel types based on specific requirements.
描述:允许用户根据特定需求实现自己的通道类型。 -
Pros: Highly flexible and customizable.
优点:高度灵活和可定制。
- Cons: Requires development effort and maintenance.
缺点:需要开发工作和维护。
-
-
Channel Selector 通道选择器:
- Description: When there are multiple channels, it defines policy about distributing events to the channels (If there are interceptors, the channel selector interceptors).
描述:当有多个通道时,它定义将事件分发到各个通道的策略(如果有拦截器,则通道选择器拦截器)。
- Function: Manages the routing of events to appropriate channels.
功能:管理事件到适当通道的路由。
-
Sink 接收器:
- Description: Removes events from a channel and transmits them to their next hop destination.
描述:从通道中删除事件并将其传输到下一跳目的地。
- Function: Acts as the exit point for data from the Flume system.
功能:作为数据从Flume系统退出的出口点。
-
Types:
-
HDFS Sink HDFS接收器:
- Description: Data is written as a file (e.g., SequenceFile, DataStream) in Hadoop Distributed File System (HDFS). The file creation stops based on time, size, or number of events.
描述:数据以文件形式(例如SequenceFile、DataStream)写入Hadoop分布式文件系统(HDFS)。文件创建基于时间、大小或事件数量停止
- Use Case: Suitable for storing large volumes of data in HDFS for batch processing.
用例:适合将大量数据存储在HDFS中以进行批处理。
-
HBase Sink HBase接收器:
- Description: Data is written as an HBase (column-oriented database) table.
描述:数据以HBase(列式数据库)表的形式写入。
- Use Case: Suitable for real-time data storage and retrieval in a NoSQL database.
用例:适合在NoSQL数据库中进行实时数据存储和检索。
-
File Roll Sink 文件滚动接收器:
- Description: Data is written as a local file. A new file is created based on specific conditions (e.g., file size, time).
描述:数据以本地文件形式写入。根据特定条件(例如文件大小、时间)创建新文件。
- Use Case: Suitable for local data storage and archiving.
用例:适合本地数据存储和归档。
-
Avro Sink Avro接收器:
- Description: Data is serialized using Avro and sent to another Flume agent or system.
描述:使用Avro序列化数据并发送到另一个Flume代理或系统。
- Use Case: Suitable for inter-agent communication and data serialization.
用例:适合代理间通信和数据序列化。
-
Thrift Sink Thrift接收器:
- Description: Data is serialized using Thrift and sent to another Flume agent or system.
描述:使用Thrift序列化数据并发送到另一个Flume代理或系统。
- Use Case: Suitable for inter-agent communication and data serialization.
用例:适合代理间通信和数据序列化。
-
Custom Sink 自定义接收器:
- Description: Allows users to implement their own sink types based on specific requirements.
描述:允许用户根据特定需求实现自己的接收器类型。
- Use Case: Suitable for highly specialized data storage or transmission needs.
用例:适合高度专业化的数据存储或传输需求。
-
Sink Processor 接收器处理器:
- Description: Invokes one sink from a specified group of sinks.
描述:从指定的接收器组中调用一个接收器。
- Function: Manages the selection and operation of sinks within a group.
功能:管理组内接收器的选择和操作
-
Types:
-
Load Balancing Sink Processor 负载均衡接收器处理器
-
Description: Provides load balancing capabilities over all sinks inside the group.
描述:为组内的所有接收器提供负载均衡功能。 -
Load Distribution:
-
Load is distributed to sinks selected randomly or in a round-robin fashion.
负载以随机或轮询方式分配给接收器。 -
If a sink fails, the next available sink is selected.
如果接收器失败,则选择下一个可用的接收器。 -
Limitation:
-
The next agents may not all receive the same amount of data, which is bad.
下游代理可能不会接收到相同数量的数据,这是不利的。 -
This is addressed by the failsafe processor (to be discussed soon).
这一问题通过故障安全处理器(稍后讨论)解决。 -
Failure Handling:
-
Failed sinks can be blacklisted for a given timeout.
失败的接收器可以在给定的超时时间内被列入黑名单。 -
The timeout increases exponentially if the sink is still failed after the timeout.
如果接收器在超时后仍然失败,超时时间会呈指数增长。
-
-
Failover Sink Processor 故障转移接收器处理器
- Description: Guarantees that events will be processed, as long as there are available sinks.
描述:只要存在可用的接收器,就保证事件会被处理。 - Priority Assignment:
- Unique priorities are assigned to sinks.
为接收器分配唯一的优先级。 - The sink with the highest priority writes data until it fails.
优先级最高的接收器会写入数据,直到它失败。 - Failure Handling:
- If a sink fails while sending an event, it is moved to a pool to “cool down” for a maximum penalty time period.
如果接收器在发送事件时失败,它会被移到一个池中“冷却”一段时间(最大惩罚时间)。 - The next sink with the highest priority is tried.
尝试下一个优先级最高的接收器。
- Description: Guarantees that events will be processed, as long as there are available sinks.
Data preparation
-
Data Cleaning 数据清洗:
- Description: Corrects errors such as typos, misspelled values, or inconsistent formatting.
描述:纠正错误,例如拼写错误、拼写错误的值或不一致的格式。
- Example: Fixing "New Yrok" to "New York" in a dataset.
示例:将数据集中的“New Yrok”更正为“New York”。
-
Data Wrangling 数据整理:
- Description: Converts data from a raw format to a more usable format.
描述:将数据从原始格式转换为更可用的格式。
- Example: Converting JSON data into a structured table (e.g., CSV or Excel).
示例:将JSON数据转换为结构化表格(例如CSV或Excel)。
-
De-duplication 去重:
- Description: Eliminates duplicate copies of data to ensure uniqueness.
描述:消除数据的重复副本以确保唯一性。
- Example: Removing repeated rows in a dataset where all columns are identical.
示例:删除数据集中所有列都相同的重复行。
-
Normalization 归一化:
- Description: Converts values to the same scale to make comparisons meaningful.
描述:将值转换为相同的尺度以使比较有意义。
- Example: Scaling all values in a column to a range of 0 to 1.
示例:将列中的所有值缩放到0到1的范围。
-
Sampling 采样:
- Description: Creates a smaller, representative subset of the data for analysis.
描述:创建一个较小的、具有代表性的数据子集进行分析。
- Example: Randomly selecting 10% of rows from a large dataset.
示例:从大数据集中随机选择10%的行。
-
Filtering 过滤:
- Description: Removes outliers or incorrect out-of-range values.
描述:删除异常值或不正确的超出范围的值。
- Example: Excluding rows where age is negative or greater than 120.
示例:排除年龄为负数或大于120的行。
Data Analysis Modes
Mode 模式 | Description 描述 | Technology 技术 |
---|---|---|
Batch Mode 批处理模式 |
Results updated ‘infrequently’ (after days or months – for example, daily sales of a company). 结果更新不频繁(例如,几天或几个月后更新,如公司的每日销售数据)。 |
• Hadoop/MapReduce: framework for distributed data processing. Hadoop/MapReduce:分布式数据处理的框架。 • Spark: cluster computing framework; various data analytics components. • Pig: high-level language to write MapReduce programs |
Real-time Mode 实时模式 |
Results updated ‘frequently’ (after a few seconds). 结果频繁更新(通常在几秒钟内)。 |
• Spark streaming component: for stream processing. Spark流处理组件:用于流处理。 • Storm: for stream processing. |
Interactive Mode 交互模式 |
Results updated ‘on demand’ as answer to queries. 结果根据查询需求更新。 |
• Hive: data warehousing framework built on HDFS, uses SQL-like language. Hive:基于HDFS的数据仓库框架,使用类SQL语言。 • Spark SQL component: SQL-like queries within Spark programs. Spark SQL组件**:在Spark程序中支持类SQL查询。 |
Data Visualizations
-
Static 静态分析:
- Description: The analysis results are stored in a database (e.g., MySQL, DynamoDB, MongoDB) and displayed.
描述:分析结果存储在数据库(例如MySQL、DynamoDB、MongoDB)中并显示。
- Use Case: Suitable for pre-computed reports or dashboards that do not require real-time updates.
用例:适用于不需要实时更新的预计算报告或仪表板。
-
Dynamic 动态分析:
- Description: The analysis results are updated regularly and displayed using live widgets, plots, or gauges.
描述:分析结果定期更新,并使用实时小部件、图表或仪表显示。
- Use Case: Suitable for real-time monitoring systems, such as stock market dashboards or live performance metrics.
用例:适用于实时监控系统,例如股票市场仪表板或实时性能指标。
-
Interactive 交互式分析:
- Description: The analysis results are displayed on demand, based on user input.
描述:分析结果根据用户输入按需显示。
- Use Case: Suitable for exploratory data analysis tools or business intelligence platforms where users need to query data interactively.
用例:适用于探索性数据分析工具或商业智能平台,用户需要交互式查询数据。
Data Access Connector 和 Data Processing Engine 的区别
1. Data Access Connector(数据访问连接器)
- 定义:
Data Access Connector 是一种工具或框架,用于连接不同的数据源和目标系统,以便在它们之间传输数据。 - 主要功能:
- 从数据源(如关系数据库、文件系统、消息队列等)读取数据。
- 将数据传输到目标系统(如数据仓库、数据湖、分析工具等)。
- 支持多种数据格式和协议(如 JDBC、ODBC、Kafka、HDFS 等)。
- 特点:
- 专注于数据的提取和加载(ETL 中的 Extract 和 Load)。
- 通常不涉及数据的复杂处理或转换。
- 是数据管道中的关键组件,用于确保数据的无缝流动。
- 示例工具:
- Apache Sqoop: 用于在关系数据库和 Hadoop 之间传输数据。
- Apache Flume: 用于收集和传输日志数据。
- Kafka Connect: 用于将数据从 Kafka 连接到外部系统。
2. Data Processing Engine(数据处理引擎)
- 定义:
Data Processing Engine 是一种计算框架或工具,用于对数据进行复杂的处理、转换和分析。 - 主要功能:
- 执行数据的清洗、转换、聚合和计算。
- 支持批处理(Batch Processing)和流处理(Stream Processing)。
- 提供分布式计算能力,以处理大规模数据。
- 特点:
- 专注于数据的处理和分析(ETL 中的 Transform)。
- 通常与数据存储系统(如 HDFS、S3)和数据访问连接器配合使用。
- 支持高级功能,如机器学习、图计算和实时分析。
- 示例工具:
- Apache Spark: 支持批处理、流处理、机器学习和图计算。
- Apache Flink: 专注于流处理和实时分析。
- Hadoop MapReduce: 用于大规模数据的批处理。
对比总结
特性 | Data Access Connector | Data Processing Engine |
---|---|---|
主要功能 | 数据提取和加载 | 数据处理、转换和分析 |
关注点 | 数据流动和连接 | 数据计算和复杂分析 |
典型工具 | Apache Sqoop, Kafka Connect, Apache Flume | Apache Spark, Apache Flink, Hadoop MapReduce |
数据处理能力 | 有限,通常不涉及复杂计算 | 强大,支持复杂计算和实时分析 |
与数据存储的关系 | 连接数据源和目标系统 | 从数据存储中读取数据并进行处理 |
实际应用中的协作
在实际的大数据生态系统中,Data Access Connector 和 Data Processing Engine 通常协同工作:
- Data Access Connector 负责从数据源提取数据并加载到数据存储(如 HDFS、数据湖)。
- Data Processing Engine 从数据存储中读取数据,进行复杂的处理和分析。
- 处理后的结果可以再次通过 Data Access Connector 传输到目标系统(如可视化工具、报表系统)。
Quiz
Quiz 1
Which of the following is true about Big Data formats? Select one or more.
- [x] A. Computing basic statistics is an example of a descriptive analytics task.
- [x] B. One goal of analytics is to answer what has happened based on past data.
- [x] C. Diagnostic analytics are based on past data.
- [ ] D. Linear regression is not an example of a descriptive analytics task.
- [ ] E. Contextualisation is an operation that is not involved in analytics.
Quiz 2
Which of the following is/are correct? More than one correct answer is possible.
- [x] A. Default and streaming are two of the settings in which analytics are applied.
- [ ] B. In none of the data analytics settings data are stored in RAM.
- [x] C. In interactive analysis mode the results are updated “on demand”.
- [ ] D. In interactive analysis mode the results are updated after days or months.
- [ ] E. In distributed setting data are in one machine and multiple processors share the RAM of that machine.
Quiz 3
Which of the followings is/are incorrect about data access connectors? Select all answers that apply.
- [ ] A. Data access connectors are frameworks for collecting big data.
- [x] B. Spark is a data access connector.
- [x] C. Hive is a data access connector.
- [ ] D. Apache Flume is a data access connector.
- [ ] E. ZeroMQ is a data access connector.
Quiz 4
Which of the followings is/are incorrect about data access connectors? Select all answers that apply.
- [ ] A. A broker is a component in a publish-subscribe messaging data access connector.
- [ ] B. A source connector exports data from a relational database.
- [x] C. Apache Sqoop has only sink connectors.
- [ ] D. In a messaging queue, producers and consumers do not need to be aware of each other.
- [x] E. In a messaging queue, publishers send messages to subscribers.
Quiz 5
Which of the followings is/are incorrect about Apache Flume? Select all answers that apply.
- [ ] A. Interceptor, Channel selector, and Sink processor are also components in Apache Flume.
- [x] B. Source can only transfer data to one channel.
- [x] C. Sink is applied before Channel.
- [ ] D. If there are interceptors, the channel selector will be applied after them.
- [x] E. The sink processor can only accept multiple sinks.
Solution
-
Quiz 1
-
Correct Answers:
A, B, and C -
Explanation:
- A. Correct.
Computing basic statistics (e.g., mean, median, mode) is a key task in descriptive analytics, which focuses on summarizing and interpreting historical data to understand what has happened.
- B. Correct.
One of the primary goals of analytics is to analyze past data to answer questions about what has occurred. This is the foundation of descriptive analytics.
- C. Correct.
*Diagnostic analytics** involves analyzing past data to identify the reasons behind specific outcomes or events. It relies heavily on historical data.
- D. Incorrect.
Linear regression is an example of descriptive analytics.
- E. Incorrect.
Contextualisation is an important operation in analytics. It involves understanding data within its specific context to derive meaningful insights.
-
-
Quiz 2
-
Correct Answers:
- A. Default and streaming are two of the settings in which analytics are applied.
- C. In interactive analysis mode, the results are updated “on demand.”
-
Explanation:
-
A. Correct. Analytics can be applied in various settings, including default (batch processing) and streaming (real-time processing).
-
B. Incorrect. In some analytics settings, data is stored in RAM for faster processing (e.g., in-memory computing).
-
C. Correct. Interactive analysis mode allows users to request updates to results in real-time or “on demand.”
-
D. Incorrect. Interactive analysis mode does not involve updates after days or months; it focuses on real-time or near-real-time updates.
-
E. Incorrect. In a distributed setting, data is typically spread across multiple machines, not confined to one machine.
-
-
-
Quiz 3
-
Incorrect Answers:
- b. Spark is a data access connector.
- c. Hive is a data access connector.
-
Explanation:
- a. Correct. Data access connectors are frameworks or tools used to collect and integrate data from various sources.
- b. Incorrect. Spark is a data processing engine, not a data access connector.
- c. Incorrect. Hive is a data warehousing tool, not a data access connector.
- d. Correct. Apache Flume is a data access connector used for collecting and moving large amounts of log data.
- e. Correct. ZeroMQ is a messaging library, not a data access connector.
-
-
Quiz 4
-
Incorrect Answers:
- c. Apache Sqoop has only sink connectors.
- e. In a messaging queue, publishers send messages to subscribers.
-
Explanation:
- a. Correct. A broker is a key component in a publish-subscribe messaging system, managing communication between publishers and subscribers.
- b. Correct. A source connector is used to export data from a source (e.g., a relational database) to a target system.
- c. Incorrect. Apache Sqoop has both source and sink connectors, enabling bidirectional data transfer between relational databases and Hadoop.
- d. Correct. In a messaging queue, producers and consumers operate independently and do not need to be aware of each other.
- e. Incorrect. In a messaging queue, publishers send messages to a queue or topic, and subscribers retrieve messages from the queue or topic. Publishers do not send messages directly to subscribers.
-
-
Quiz 5
-
Incorrect Answers:
- b. Source can only transfer data to one channel.
- c. Sink is applied before Channel.
- e. The sink processor can only accept multiple sinks.
-
Explanation:
- a. Correct. Interceptors, channel selectors, and sink processors are all components of Apache Flume.
- b. Incorrect. A source in Apache Flume can transfer data to multiple channels.
- c. Incorrect. In Apache Flume, data flows from the source to the channel and then to the sink. The sink is applied after the channel.
- d. Correct. If interceptors are used, the channel selector is applied after the interceptors process the data.
- e. Incorrect. A sink processor can handle multiple sinks, but it is not limited to only accepting multiple sinks.
-
Week 3
Distributed File System
Concept
-
File Chunks 文件分块:
- Description: Files are divided into smaller chunks that are stored across multiple chunk servers and replicated on different machines or racks to ensure fault tolerance.
描述:文件被分成较小的块,存储在多台块服务器上,并在不同的机器或机架上复制,以确保容错性。
- Use Case: Suitable for distributed file systems where high availability and recovery from hardware failures are critical.
用例:适用于分布式文件系统,其中高可用性和硬件故障恢复至关重要。
-
Master Node 主节点:
- Description: The master node stores metadata about the file chunks and their locations. It is also replicated to prevent a single point of failure.
描述:主节点存储有关文件块及其位置的元数据。它也被复制,以防止单点故障。
- Use Case: Essential for locating file chunks in a distributed file system and ensuring system reliability.
用例:在分布式文件系统中定位文件块并确保系统可靠性时至关重要。
-
Directory Replication 目录复制:
- Description: The file system directory is replicated and knows how to locate its copies, ensuring data availability even in case of failures.
描述:文件系统目录被复制,并知道如何定位其副本,确保即使在故障情况下数据仍然可用。
- Use Case: Critical for maintaining access to files and directories in the event of hardware or software failures.
用例:在硬件或软件故障时,保持对文件和目录的访问至关重要。
-
Fault Tolerance 容错性:
- Description: Replication of file chunks, master nodes, and directories ensures that the system can recover from failures, such as hard disk crashes.
描述:文件块、主节点和目录的复制确保系统可以从故障(如硬盘崩溃)中恢复。
- Use Case: Suitable for systems requiring high reliability and continuous availability, such as cloud storage or large-scale distributed systems.
用例:适用于需要高可靠性和持续可用性的系统,例如云存储或大规模分布式系统。
HDFS
HDFS read path
-
Step 1: The client requests block locations from the Namenode.
步骤 1: 客户端向 Namenode 请求数据块位置。
-
Step 2: Namenode checks if the file exists and verifies read permissions.
步骤 2: Namenode 检查文件是否存在并验证读取权限。
-
Step 3: Namenode returns the block locations.
步骤 3: Namenode 返回数据块的位置。
-
Step 4: The client sorts block locations based on proximity.
步骤 4: 客户端按距离对数据块位置进行排序。
-
Step 5: Datanodes stream data to the client; if a server fails, another replica is used.
步骤 5: Datanodes 向客户端传输数据;如果某个服务器发生故障,客户端可以从另一个副本读取数据。
HDFS Write Path
-
Step 1: The client creates a file request to the Namenode.
步骤 1: 客户端向 Namenode 发送创建文件的请求。
-
Step 2: Namenode checks if the file already exists and verifies write permissions.
步骤 2: Namenode 检查文件是否已存在,并验证写入权限。
-
Step 3: Namenode allocates new blocks.
步骤 3: Namenode 分配新的数据块。
-
Step 4: An output stream is created for data transmission.
步骤 4: 创建用于数据传输的输出流对象。
-
Step 5: The data is split into packets and added to the queue.
步骤 5: 数据被拆分成数据包并添加到队列中。
-
Step 6: The client establishes a connection with Datanodes.
步骤 6: 客户端与 Datanodes 建立连接。
-
Step 7: The first Datanode consumes data from the queue and writes to the next Datanode for replication.
步骤 7: 第一个 Datanode 从队列中读取数据,并写入下一个 Datanode 进行复制。
-
Step 8: Each Datanode sends an acknowledgment to the previous one after storing data.
步骤 8: 每个 Datanode 在存储数据后向前一个 Datanode 发送确认信息。
-
Step 9: Once the client receives an acknowledgment, it closes the file.
步骤 9: 客户端收到确认后,关闭文件写入。
Map Reduce
What is MapReduce?
Definition: MapReduce is a programming model and processing technique developed by Google for large-scale data processing in a distributed computing environment. It allows processing massive datasets in parallel across multiple nodes in a cluster.
定义: MapReduce 是 Google 开发的一种编程模型和处理技术,适用于分布式计算环境中的大规模数据处理。它允许在集群中的多个节点上并行处理海量数据。
MapReduce Architecture
- Map Phase (映射阶段)
- Shuffle & Sort Phase (洗牌和排序阶段)
- Reduce Phase (归约阶段)
- Output Phase (输出阶段)
How Does MapReduce Work?
-
Map Phase 映射阶段
-
Function
- The Map function takes input data and transforms it into intermediate key-value pairs
(k, v)
.
Map 函数 接收输入数据,并将其转换为中间的键值对
(k, v)
。- Each mapper runs independently and in parallel on different parts of the input dataset.
每个 Map 任务 独立且并行 运行,分别处理输入数据的不同部分。
- The goal is to extract meaningful information from raw data.
目标是从原始数据中提取 有意义的信息。
- The Map function takes input data and transforms it into intermediate key-value pairs
-
Process
- The input data is split into chunks (e.g., lines of text, database records, log files).
输入数据被拆分成 小块(例如文本行、数据库记录、日志文件)。
- Each chunk is processed by a separate Map task.
每个小块由 一个单独的 Map 任务 处理。
- The Map function outputs
(key, value)
pairs.
Map 函数 生成
(键, 值)
对作为输出。 -
Example
Input
hello tom hello jerry hello kitty hello world hello tom
Map Output
(hello, 1) (tom, 1) (hello, 1) (jerry, 1) (hello, 1) (kitty, 1) (hello, 1) (world, 1) (hello, 1) (tom, 1)
📌 Each word is mapped to the number
1
.
-
-
Shuffle & Sort Phase 洗牌和排序阶段
-
Function
- This phase is automatically performed by the system.
该阶段由系统 自动执行。
- It groups all values by key and sorts them before sending them to the Reduce function.
它将所有值按键进行 分组和排序,然后再将数据传递给 Reduce 函数。
- The process ensures that all identical keys (e.g., occurrences of "hello") are grouped together.
该过程确保所有相同的键(例如
"hello"
的出现次数)被聚合在一起。 -
Process
- The system collects and groups all
(key, value)
pairs by key.
系统收集并按照 键(key) 对
(key, value)
进行分组。- The keys are sorted before being sent to reducers.
这些键会在传输给 Reducer 之前 进行排序。
- The grouped and sorted data is then distributed among different reducers.
经过分组和排序后的数据被 分配给不同的 Reducer 进行处理。
- The system collects and groups all
-
Example
Input to Reduce Phase
css (hello, [1,1,1,1,1]) (tom, [1,1]) (jerry, [1]) (kitty, [1]) (world, [1])
📌 All occurrences of "hello" are now grouped together.
-
-
Reduce Phase 归约阶段
-
Function:
- The Reduce function processes each unique key and aggregates, filters, or transforms the values.
Reduce 函数 处理每个唯一的键,并对值进行 聚合、过滤或转换。
- The final computation combines all mapped values for each key.
最终计算 合并所有映射的值 以得到最终结果。
-
Process
- The Reducer takes the grouped values from the Shuffle & Sort phase.
Reducer 接收 Shuffle & Sort 阶段分组后的值。
- It applies an aggregation function (e.g., sum, average, count).
它应用 聚合函数(例如求和、求均值、计数)。
- The final reduced output is stored and written to HDFS.
计算后的最终输出 被存储并写入 HDFS。
-
Example
Input
css (hello, [1,1,1,1,1]) (tom, [1,1]) (jerry, [1]) (kitty, [1]) (world, [1])
Reduce Output
scss (hello, 5) (tom, 2) (jerry, 1) (kitty, 1) (world, 1)
📌 The word count for each word is now calculated!
-
MapReduce workflow
-
Splitting & Task Assignment (数据分片与任务分配)
-
The input data is split into chunks, and one instance is elected as the Master, which assigns Map and Reduce tasks to worker nodes.
输入数据被分成多个数据块,并选出一个实例作为主节点(Master),负责将 Map 和 Reduce 任务分配给工作节点(Worker)。
-
-
Map Phase (映射阶段)
-
Each worker processes its assigned chunk and converts the raw data into key-value pairs (k, v).
每个工作节点处理其分配的数据块,并将原始数据转换为键值对 (k, v)。
-
The key-value pairs are buffered in memory and then written to local disk.
键值对被缓存在内存中,随后写入本地磁盘。
-
-
Shuffle & Sort Phase (洗牌和排序阶段)
-
The system groups and sorts key-value pairs by key and distributes them to Reducers.
系统按照键(key)对键值对进行分组和排序,并将其分配给 Reduce 任务。
-
-
Reduce Phase (归约阶段)
-
The Reducer processes grouped key-value pairs, aggregates, filters, or transforms them into final results.
Reducer 处理分组后的键值对,并对数据进行聚合、过滤或转换,生成最终结果。
-
-
Output Phase (输出阶段)
-
The final results are written back to HDFS or another storage system for further use.
最终结果被写回 HDFS 或其他存储系统,以供进一步使用。
-
Refinement: Combiner
-
A Map task often produces multiple key-value pairs for the same key, e.g., counting popular words in a word count problem.
一个 Map 任务通常会为相同的键生成多个键值对,例如在词频统计问题中。
-
Using a Combiner helps in pre-aggregating values in the Mapper, reducing network overhead.
使用 Combiner 可以在 Mapper 阶段进行预聚合,从而减少网络开销。
-
A Combiner is a mini-reducer that combines intermediate values before shuffling the data.
Combiner 是一个小型 Reducer,在数据传输前对中间值进行合并。
-
Combiner is usually the same as the Reduce function but is applied locally on a single mapper’s output.
Combiner 通常与 Reduce 函数相同,但只作用于单个 Mapper 的输出。
-
Can only be applied to functions that are commutative and associative:
仅适用于满足交换律和结合律的函数:
- Commutative: ( f(x, y) = f(y, x) )
交换律:( f(x, y) = f(y, x) ) - Associative: ( f(f(x, y), z) = f(x, f(y, z)) )
结合律:( f(f(x, y), z) = f(x, f(y, z)) )
- Commutative: ( f(x, y) = f(y, x) )
-
The input and output of the combiner must be of the same type as that of the mapper.
Combiner 的输入和输出数据类型必须与 Mapper 相同。
Word Count Example with Combiner
-
Each Mapper generates key-value pairs like (word, 1).
每个 Mapper 生成 (word, 1) 形式的键值对。
-
The Combiner locally sums up word counts before shuffling.
Combiner 在 Shuffle 之前本地计算单词频次总和。
-
Less data is transferred between Mapper and Reducer, reducing network congestion.
减少 Mapper 与 Reducer 之间的数据传输,降低网络拥塞。
Failures in MapReduce
1. Map Worker Failure (Map 工作节点失败)
-
Completed Map tasks are lost.
已完成的 Map 任务丢失。
-
In-progress Map tasks fail.
正在进行的 Map 任务失败。
-
Actions (处理措施):
-
Map tasks are reset to idle and reassigned to another node.
Map 任务被重置为空闲状态,并重新分配到其他节点执行。
-
Reducers are notified when tasks are rescheduled.
Reducer 任务在 Map 任务被重新调度时收到通知。
-
2. Reduce Worker Failure (Reduce 工作节点失败)
-
Completed Reduce tasks remain unaffected.
已完成的 Reduce 任务不会受到影响。
-
In-progress Reduce tasks fail.
正在进行的 Reduce 任务失败。
-
Actions (处理措施):
-
In-progress Reduce tasks are reset to idle.
失败的 Reduce 任务被重置为空闲状态。
-
Reduce tasks are rescheduled to start later.
Reduce 任务将在稍后重新调度执行。
-
3. Master Failure (主节点失败)
-
Map and Reduce tasks are aborted.
所有 Map 和 Reduce 任务被中止。
-
Actions (处理措施):
-
The entire MapReduce job needs to be restarted.
整个 MapReduce 作业需要重新启动。
-
Quiz
Quiz 1
Which of the following is/are correct about a distributed file system? More than one correct answer is possible, so multiple selections may be marked.
- [ ] A. It should not be used when we need to perform data append operations
- [x] B. One of its components is a metadata file.
- [x] C. It provides replication.
- [x] D. It is appropriate to use for storing large datasets.
- [ ] E. It is appropriate to use for storing datasets that change frequently.
Quiz 2
Which of the following is/are incorrect about MapReduce? More than one correct answer is possible, so multiple selections may be marked.
- [x] A. A Reduce worker is not notified when a map task is rescheduled in case of a map worker failure.
- [x] B. There are two types of failures in MapReduce.
- [ ] C. A MapReduce map task that has been completed is reset to idle in case of a map worker failure.
- [x] D. A MapReduce task needs to be restarted in case of a map worker failure.
- [ ] E. A MapReduce task needs to be restarted in case of a master failure.
Solution
-
Quiz 1
- Correct Answers:
B, C, and D -
Explanation:
-
A. It should not be used when we need to perform data append operations.
Incorrect. Distributed file systems (e.g., HDFS) are designed to handle data append operations efficiently. -
B. One of its components is a metadata file.
Correct. Distributed file systems maintain metadata to manage file locations, permissions, and other attributes. -
C. It provides replication.
Correct. Distributed file systems typically replicate data across multiple nodes to ensure fault tolerance and high availability. -
D. It is appropriate to use for storing large datasets.
Correct. Distributed file systems are designed to handle large-scale data storage and processing. -
E. It is appropriate to use for storing datasets that change frequently.
Incorrect. Distributed file systems are optimized for large-scale, batch-oriented workloads rather than frequent, small updates. Systems like HDFS are better suited for write-once, read-many scenarios.
-
- Correct Answers:
-
Quiz 2
- Correct Answers:
A B D
- Correct Answers:
Week 5
Characteristics of NoSQL (Not Only SQL) Databases NoSQL 数据库的特点
NoSQL vs. Relational Databases NoSQL 与关系型数据库的区别
🔹 NoSQL databases are non-relational, meaning they do not use structured tables like traditional SQL databases.
NoSQL 数据库是非关系型的,不使用传统 SQL 数据库的表结构。
🔹 Designed for handling large-scale, distributed, and semi-structured or unstructured data.
适用于大规模、分布式、半结构化或非结构化数据处理。
🔹 No fixed schema, unlike relational databases that require predefined tables and relationships.
没有固定的模式 (Schema),相比于关系型数据库,无需预定义表结构。
🔹 Optimized for high read/write performance and scalability.
优化以提高读写性能和扩展能力。
Key Characteristics of NoSQL Databases NoSQL 数据库的核心特点
🔹 1. Non-Relational Data Model
-
NoSQL databases do not use tables, rows, and columns.
NoSQL 数据库不使用传统表、行、列的结构。 -
Data is often stored as:
数据存储方式包括:- Key-Value pairs (e.g., Redis)
键值对存储(如 Redis) - Documents (e.g., MongoDB)
文档存储(如 MongoDB) - Wide-column stores (e.g., Cassandra)
宽列存储(如 Cassandra) - Graph databases (e.g., Neo4j)
图数据库(如 Neo4j)
- Key-Value pairs (e.g., Redis)
🔹 2. Highly Scalable & Fault-Tolerant
-
Horizontal scalability: Can easily add more servers/nodes.
水平扩展:可轻松添加更多服务器或节点。 -
Distributed architecture: Ensures availability and resilience.
分布式架构:保证高可用性和容错性。 -
Replication & Sharding: Data is replicated across multiple nodes for fault tolerance.
复制与分片:数据可在多个节点之间复制,提高可靠性。
🔹 3. Schema-Free or Flexible Schema
-
NoSQL databases do not require predefined schemas.
无需预定义模式 (Schema)。 -
Supports dynamic and flexible data structures, making it ideal for unstructured data.
支持动态和灵活的数据结构,适用于非结构化数据。 -
New fields can be added without altering the existing schema.
可在不修改已有模式的情况下添加新字段。
🔹 4. Asynchronous Writes & Eventual Consistency
-
Eventual consistency instead of strict ACID transactions (as in SQL).
采用最终一致性,而非严格的 ACID 事务。 -
Optimized for high-speed reads and writes with minimal locking.
优化高性能读写,减少锁机制。 -
Best suited for big data analytics, caching, and high-speed data retrieval.
适用于大数据分析、缓存和高速数据读取。
🔹 5. API-Based Access & Custom Query Languages
-
NoSQL databases use APIs and custom query languages instead of SQL.
使用 API 和特定查询语言,而非 SQL 语句。 -
Examples:
示例:- MongoDB → Query in JSON-like BSON format.
MongoDB 使用 BSON(类 JSON)格式查询。 - Cassandra → CQL (Cassandra Query Language).
Cassandra 使用 CQL(Cassandra 查询语言)。 - Neo4j → Cypher Query Language for graph queries.
Neo4j 使用 Cypher 查询语言进行图查询。
- MongoDB → Query in JSON-like BSON format.
Types of NoSQL Databases & Use Cases NoSQL 数据库类型与应用场景**
NoSQL Type | Description | Use Cases |
---|---|---|
Key-Value Stores | Simple key-value pairs for fast lookups. | Caching, session management (e.g., Redis, DynamoDB). |
Document Stores | JSON or BSON documents for flexible schema. | Content management, real-time analytics (e.g., MongoDB, CouchDB). |
Wide-Column Stores | Tables with dynamic column families. | Big data storage, IoT (e.g., Cassandra, HBase). |
Graph Databases | Nodes & edges for complex relationships. | Social networks, recommendation engines (e.g., Neo4j, ArangoDB). |
Advantages & Disadvantages of NoSQL Databases
NoSQL 数据库的优缺点
🔹 Advantages
✅ Highly Scalable: Can handle massive amounts of data across distributed systems.
✅ Flexible Schema: Ideal for applications with evolving data models.
✅ Fast Reads & Writes: Optimized for high-performance operations.
✅ Fault-Tolerant: Replication & sharding improve resilience.
✅ Big Data Ready: Well-suited for real-time analytics & AI/ML applications.
🔹 Disadvantages
❌ Lack of ACID Compliance: Not suitable for transactional consistency.
❌ Complex Querying: Requires learning specific APIs & query languages.
❌ Limited Joins & Relationships: Unlike SQL, joins are inefficient in most NoSQL databases.
❌ Data Duplication: Often needed to improve read performance, increasing storage costs.
What Led to the Rise of NoSQL Databases? NoSQL 数据库兴起的原因
The Shift from SQL to NoSQL 从 SQL 到 NoSQL 的转变
🔹 Traditional SQL databases (RDBMS) struggled with new-age demands:
传统 SQL 数据库 (RDBMS) 难以满足现代应用需求:
- Massive, unstructured data became common.
海量的非结构化数据逐渐增多。 - High-traffic applications required global accessibility.
高并发应用需要全球 24/7 可用性。 - Agile development needed rapid scalability.
敏捷开发需要快速扩展能力。 - Cloud computing and SaaS models gained popularity.
云计算和 SaaS(软件即服务)模式流行。
Key Drivers Behind NoSQL Adoption NoSQL 发展的主要驱动因素
🔹 1. Data Became Bigger & More Unstructured
-
Modern applications generate massive amounts of data (social media, IoT, real-time analytics).
现代应用(社交媒体、物联网、实时分析)产生海量数据。 -
Traditional RDBMS struggled to scale for unstructured or semi-structured data (JSON, XML, multimedia).
传统 SQL 数据库难以扩展以存储非结构化或半结构化数据(如 JSON、XML、图片、视频)。 -
NoSQL databases support schema-less or flexible schemas, making them ideal for big data.
NoSQL 数据库支持无模式 (Schema-less) 或灵活模式 (Flexible Schema),适用于大数据存储。
🔹 2. Demand for Global, Always-Available Services
-
Traditional databases were not designed for 24/7 uptime across multiple locations.
传统数据库难以实现全球 24/7 高可用性。 -
Global-scale applications (e.g., Facebook, Google, Amazon) needed distributed systems.
全球级应用(如 Facebook、Google、Amazon)需要分布式数据库。 -
NoSQL databases support horizontal scaling (adding more machines instead of upgrading one server).
NoSQL 采用水平扩展(增加服务器节点,而不是升级单个服务器)。 -
Features like data replication & sharding ensure fault tolerance.
通过数据复制 (Replication) 和分片 (Sharding) 提高容错能力。
🔹 3. Rapid Application Development & Agile Needs
-
Companies wanted fast development cycles without strict schemas.
公司希望快速开发,不受严格数据库模式 (Schema) 限制。 -
NoSQL databases allow developers to quickly modify and add new fields.
NoSQL 允许开发人员快速修改数据结构或添加新字段。 -
Schema evolution without downtime helps with continuous development.
模式变更无需停机,支持持续开发和迭代。
🔹 4. Rise of SaaS & Cloud-Based Solutions
-
Businesses shifted to Software-as-a-Service (SaaS) and cloud applications.
企业转向 SaaS(软件即服务)和云端应用。 -
Traditional SQL databases struggled with multi-tenant cloud environments.
传统 SQL 数据库难以满足多租户 (Multi-Tenant) 云环境需求。 -
NoSQL databases are natively built for distributed cloud computing.
NoSQL 数据库天生适用于分布式云计算架构。 -
Cloud providers (AWS, Google Cloud, Azure) offer fully managed NoSQL services (e.g., DynamoDB, CosmosDB).
云服务商(AWS、Google Cloud、Azure)提供托管 NoSQL 服务(如 DynamoDB、CosmosDB)。
Key Differences: SQL vs. NoSQL SQL vs. NoSQL 的主要区别
Factor | SQL Databases | NoSQL Databases |
---|---|---|
Scalability | Vertical (scale-up) | Horizontal (scale-out) |
Schema | Fixed schema | Schema-less (flexible) |
Transactions | Strong ACID compliance | Eventual consistency (BASE model) |
Query Language | SQL | API-based (JSON, key-value, CQL, Cypher) |
Use Cases | Traditional business applications (ERP, CRM, banking) | Big data, real-time analytics, distributed apps |
When to Use NoSQL vs. SQL ? 何时使用 NoSQL 而非 SQL?
🔹 Use NoSQL When:
✅ Data is semi-structured or unstructured (e.g., JSON, logs, multimedia).
✅ Scalability is critical (e.g., high-traffic apps, global users).
✅ Agility is required (e.g., frequent schema changes).
✅ Cloud-native applications need flexible and fault-tolerant storage.
🔹 Use SQL When:
✅ Strong ACID transactions are required (e.g., banking, financial apps).
✅ Data integrity is a priority (e.g., inventory management).
✅ Complex joins and structured queries are needed.
NoSQL Databases & ACID Relaxation NoSQL 数据库对 ACID 特性的放松
Why NoSQL Relaxes ACID Properties? 为什么 NoSQL 允许放松 ACID 规则?
🔹 NoSQL databases prioritize scalability and availability over strict consistency.
NoSQL 数据库优先考虑可扩展性和可用性,而非严格的一致性。
🔹 Based on CAP Theorem:
基于 CAP 定理:
- Consistency (一致性): All clients see the latest update after a transaction.
所有客户都能在交易后看到最新更新。 - Availability (可用性): Clients can read/write even if some nodes fail.
即使某些节点出现故障,客户端也能进行读/写操作。 - Partition Tolerance (分区容忍性): System remains operational despite network failures.
尽管网络出现故障,系统仍可正常运行。
🔹 Distributed NoSQL databases must choose between consistency and availability when network partitions occur.
在分布式 NoSQL 数据库中,发生网络分区时需在一致性与可用性之间做出权衡。
CAP Trade-offs in NoSQL NoSQL 数据库的 CAP 取舍
Type | Trade-off | Example Databases |
---|---|---|
CP (Consistency + Partition Tolerance) | Sacrifices Availability | HBase, MongoDB (strong consistency mode) |
AP (Availability + Partition Tolerance) | Sacrifices Consistency | DynamoDB, Cassandra, Riak |
CA (Consistency + Availability) | Cannot tolerate partitions | Traditional SQL databases (e.g., MySQL, PostgreSQL) |
Summary 总结
🔹 NoSQL databases relax ACID properties to achieve high availability and scalability.
NoSQL 放松 ACID 规则,以提高可用性和可扩展性。
🔹 They follow CAP theorem principles and optimize for specific needs (CP, AP, or CA).
遵循 CAP 定理,并根据需求优化 CP、AP 或 CA 组合。
🔹 Useful for distributed systems, big data, and real-time applications.
适用于分布式系统、大数据和实时应用。
NoSQL Databases: Characteristics & Advantages NoSQL 数据库:特点与优势
Core NoSQL Features NoSQL 的核心特性
🔹 Volume (数据量)
- NoSQL databases scale out by adding more nodes instead of upgrading a single server.
NoSQL 通过增加节点进行横向扩展,而不是升级单一服务器。
🔹 Velocity (数据处理速度)
- Fast writes with schema-on-read (schema applied when data is retrieved).
采用 Schema-on-Read,即数据存储时无需模式,读取时动态解析。 - Low write latency (adding nodes reduces latency).
写入延迟低,添加节点可进一步减少延迟。
🔹 Variety (数据类型多样性)
- Supports semi-structured & unstructured data, making schema flexible or optional.
支持半结构化和非结构化数据,模式 (Schema) 灵活或可选。
NoSQL vs. RDBMS Scaling & Management NoSQL 与 RDBMS 的扩展性与管理性对比
Factor | RDBMS (SQL Databases) | NoSQL Databases |
---|---|---|
Scaling Model | Vertical (scale-up) | Horizontal (scale-out) |
Handling Big Data | Limited by hardware constraints | Designed for massive data workloads |
DBA Management | Requires specialized database administrators | Simpler management, automated scaling & repairs |
Schema Flexibility | Requires strict schema changes | No rigid schema; supports dynamic data structures |
Cost Efficiency | Expensive proprietary servers | Uses commodity hardware (lower cost per GB or transaction) |
Expertise Availability | More SQL experts available | Fewer NoSQL experts in the market |
Analytics & BI | Optimized for structured queries | More suited for Web 2.0, real-time applications |
NoSQL Data Models (NoSQL 数据存储模型)
Overview of NoSQL Databases (NoSQL 数据库概述)
🔹 NoSQL databases are designed for scalability, flexibility, and high-performance data storage.
NoSQL 数据库提供横向扩展、高灵活性和高性能的数据存储方案。
🔹 Unlike traditional relational databases (RDBMS), NoSQL databases:
不同于传统关系型数据库 (RDBMS),NoSQL 具有以下特点:
- Schema flexibility: No strict table structures.
模式 (Schema) 灵活,无需固定表结构。 - Horizontal scaling: Easily distributed across multiple servers.
支持横向扩展,可分布式存储。 - Optimized for specific use cases: High-performance querying and storage.
针对特定场景优化,提供高效查询与存储。
Types of NoSQL Databases (NoSQL 数据库类型)
Database Type | Description | Best For | Examples |
---|---|---|---|
Key-Value Stores | Simple key-value pairs | Caching, session management | Redis, DynamoDB |
Document Stores | Stores JSON-like documents | Content management, logs | MongoDB, CouchDB |
Wide-Column Stores | Stores data in flexible columns | Big data, IoT, real-time analytics | Cassandra, HBase |
Graph Databases | Stores nodes & relationships | Social networks, fraud detection | Neo4j, ArangoDB |
NoSQL Database Models (NoSQL 数据存储模型)
Key-Value Stores (键值存储)
✔ Best for: High-speed lookups where each key uniquely identifies a value.
✔ Examples: Redis, DynamoDB
Characteristics
- Key-value pairs where each key is unique.
每个键 (Key) 唯一,映射到一个值 (Value)。 - Opaque values: The database does not interpret stored values.
数据库不会解析值,仅存储和检索。 - Uses hashing for data partitioning.
采用哈希算法进行数据分片。
Pros & Cons
✅ Very fast reads/writes.
✅ Simple and scalable.
❌ Not suitable for complex queries or relationships.
Example
Key | Value |
---|---|
111 | "John Smith" |
324 | (Binary image data) |
567 | (XML document) |
Document Databases (文档数据库)
✔ Best for: Storing semi-structured data with a flexible schema.
✔ Examples: MongoDB, CouchDB
Characteristics
- Stores JSON/XML documents with a unique key.
以 JSON 或 XML 格式存储半结构化数据。 - Supports indexing, querying specific fields, and partial updates.
支持索引、字段查询和部分更新。 - Schema flexibility: Documents can have different structures.
模式灵活,每个文档结构可以不同。
Example Document (JSON Format)
{
"name": "Joe Smith",
"title": "Mr",
"address": {
"address1": "Dept. of Informatics",
"address2": "Strand",
"postcode": "WC2 1ER"
},
"expertise": ["MongoDB", "Python", "Javascript"],
"employee_number": 320,
"location": [53.34, -6.26]
}
Pros & Cons
✅ Schema flexibility, supports dynamic updates.
✅ Easy field-based queries.
❌ Not suitable for binary data storage.
❌ Complex joins are inefficient.
Comparison: Document Databases vs. Key-Value Stores
Feature | Document Databases | Key-Value Stores |
---|---|---|
Data Structure | Nested JSON/XML | Simple key-value pairs |
Query Support | Field-based queries | Lookup by key only |
Schema Flexibility | Dynamic schema | Opaque value storage |
Indexing | Supports indexing for fast retrieval | No native indexing |
Wide-Column Stores (列族数据库)
✔ Best for: Big data applications and real-time analytics.
✔ Examples: Apache Cassandra, HBase
Characteristics
- Stores data in columns instead of rows.
数据按列存储,而非按行存储。 - Groups related columns into column families for efficient retrieval.
相关列被组织在一起,提高查询性能。 - Schema flexibility: Different rows can have different sets of columns.
模式灵活,不同行可以有不同列。
Pros & Cons
✅ Optimized for large-scale data retrieval.
✅ Handles massive datasets efficiently.
❌ Not suitable for SQL-like complex joins.
❌ Not ACID-compliant for strict transactions.
Comparison: RDBMS vs. Column-Family Databases
Feature | RDBMS (SQL 关系型数据库) | Column-Family Databases (列族数据库) |
---|---|---|
Storage Model | Rows & Tables (按行存储) | Columns grouped into families (按列存储) |
Schema | Fixed schema | Flexible schema |
Performance | Optimized for row retrieval | Optimized for column-based queries |
Use Case | Transactional systems | Big data & analytics |
Graph Databases (图数据库)
✔ Best for: Highly connected data such as social networks.
✔ Examples: Neo4j, ArangoDB
Characteristics
- Graph-based storage with nodes (entities) and edges (relationships).
基于图结构存储数据,包括节点(实体)和边(关系)。 - Optimized for complex relationship queries.
优化关系查询性能。 - Nodes and edges can store attributes.
节点和边可以存储属性。
Example Use Case: Social Network
(Alice) --[FRIENDS_WITH]--> (Bob)
(Bob) --[WORKS_WITH]--> (Charlie)
Pros & Cons
✅ Efficient for relationship-heavy queries.
✅ Best for real-world networks and recommendations.
❌ Not suitable for ACID transactions.
❌ Complex setup compared to other NoSQL models.
Comparison: Graph Databases vs. Other NoSQL Models
Feature | Graph Databases (图数据库) | Document/Column Stores (文档/列族数据库) |
---|---|---|
Data Structure | Nodes & Edges | JSON, Wide Columns |
Best For | Relationship-based queries | Key-based lookups |
Performance | Optimized for network traversal | Optimized for read/write speed |
Summary & Key Takeaways (总结与关键点)
- Key-Value Stores: Best for fast lookups and caching.
适用于快速查找和缓存。 - Document Databases: Best for semi-structured data and flexible queries.
适用于半结构化数据,支持字段级查询。 - Wide-Column Stores: Best for big data and analytics.
适用于大数据和实时分析。 - Graph Databases: Best for highly interconnected data.
适用于复杂关系数据,如社交网络和推荐系统。
📌 NoSQL databases scale horizontally, provide schema flexibility, and are ideal for big data and distributed applications.
NoSQL 数据库支持横向扩展,模式灵活,适用于大数据和分布式应用。
MongoDB
Comparison Between RDBMS and MongoDB关系型数据库 (RDBMS) 与 MongoDB 的对比
RDBMS | MongoDB |
---|---|
Database | Database |
Table, View | Collection |
Row | Document (BSON) |
Column | Field |
Index | Index |
Join | Embedded Document |
Foreign Key | Reference |
Partition | Shard |
MongoDB 使用 文档模型 (Document Model) 代替传统的表结构,并采用 BSON (Binary JSON) 格式存储数据,使其更具灵活性。
MongoDB replaces traditional table structures with a document model and stores data in BSON (Binary JSON) format, making it more flexible.
MongoDB 基本查询 MongoDB Basic Queries
示例数据 (Example Data - products
Collection)
{ "_id": "apples", "qty": 5 }
{ "_id": "bananas", "qty": 7 }
{ "_id": "oranges", "qty": { "in stock": 8, "ordered": 12 } }
{ "_id": "avocados", "qty": "fourteen" }
查询 qty
大于 4 的文档
db.products.find({ qty: { $gt: 4 } })
返回结果 (Returned Documents):
{ "_id": "apples", "qty": 5 }
{ "_id": "bananas", "qty": 7 }
MongoDB 支持 JSON 格式的文档存储,查询时可以使用类似 SQL 的操作符 ($gt
, $lt
, $or
等)。
MongoDB supports JSON-like document storage, and queries use SQL-like operators ($gt
, $lt
, $or
, etc.).
MongoDB CRUD 操作
插入数据 (Insert Data)
db.products.insert({ _id: 10, item: "box", qty: 20 })
如果 _id
未指定,则 MongoDB 自动生成唯一 ID。
If _id
is not specified, MongoDB automatically generates a unique ID.
批量插入 (Insert Many Documents)
db.inventory.insertMany([
{ item: "journal", qty: 25, tags: ["blank", "red"], size: { h: 14, w: 21, uom: "cm" } },
{ item: "mat", qty: 85, tags: ["gray"], size: { h: 27.9, w: 35.5, uom: "cm" } },
{ item: "mousepad", qty: 25, tags: ["gel", "blue"], size: { h: 19, w: 22.85, uom: "cm" } }
])
更新数据 (Update Data)
db.books.update(
{ _id: 1 },
{
$inc: { stock: 5 },
$set: {
item: "ABC123",
"info.publisher": "2222",
tags: ["software"],
"ratings.1": { by: "xyz", rating: 3 }
}
}
)
$inc
:递增stock
字段值 (+5
)$set
:更新多个字段值
删除数据 (Remove Data)
db.products.remove({ item: "box" }) // 删除所有 item="box" 的文档
db.products.remove({ qty: { $gt: 20 } }) // 删除所有 qty>20 的文档
db.products.remove({}) // 删除集合中的所有文档
MapReduce 功能
MapReduce 用于对大规模数据集执行聚合操作。
MapReduce is used for performing aggregation operations on large datasets.
示例:计算每个 cust_id
的订单总金额
db.orders.mapReduce(
function() { emit(this.cust_id, this.amount); }, // Map function
function(key, values) { return Array.sum(values); }, // Reduce function
{
query: { status: "A" }, // 查询条件: 只处理 status="A" 的订单
out: "order_totals" // 结果输出到集合 "order_totals"
}
)
执行 MapReduce 后的结果 (Results After MapReduce Execution):
{ "_id": "A123", "value": 750 }
{ "_id": "B212", "value": 200 }
MongoDB 索引 (Indexing)
B+ tree indices, GeoSpatial indices, text indices
Index created by the system on _id.
创建索引 (Create Index)
db.products.createIndex({ "item": 1, "stock": 1 })
查看索引 (View Indexes)
db.collectionA.getIndexes()
删除索引 (Drop Index)
db.collectionA.dropIndex("catIdx")
地理位置索引 (GeoSpatial Index)
db.collectionA.createIndex({ loc: "2dsphere" })
MongoDB 复制集 (Replication)
复制集(Replica Set)是一组 mongod
实例,提供 高可用性 (High Availability) 和 故障恢复 (Fault Tolerance)。
Replica Set is a group of mongod
instances that provide high availability and fault tolerance.
复制集角色 (Replica Set Roles):
-
Primary: 处理所有写入操作。
-
Secondary: 复制 Primary 数据,并在 Primary 失效时选举新的 Primary。
-
Arbiter: 仅用于投票,不存储数据。
自动故障切换当主设备无法与10 秒钟以上时,一个辅助在选举和投票后成为主
MongoDB 分片 (Sharding)
1. What is Sharding? 什么是 Sharding(分片)?
- Sharding is MongoDB's horizontal scaling mechanism, distributing large datasets across multiple servers.
Sharding 是 MongoDB 的水平扩展机制,可将大规模数据集分布到多个服务器上。 - It improves query performance, increases storage capacity, and ensures high availability.
它可以提高查询性能,增加存储容量,并确保高可用性。 - Sharding divides data into partitions (shards) to reduce the load on a single server.
Sharding 通过将数据拆分为多个分区(Shards)来减少单个服务器的负载。
2. Why Do We Need Sharding? 为什么需要 Sharding?
- Single-server capacity is limited.
单台服务器的存储容量有限。 - High read and write loads can reduce performance.
高并发读写负载可能会降低性能。 - Sharding enables horizontal scaling (scale-out) instead of upgrading a single server (scale-up).
Sharding 允许通过添加更多服务器进行横向扩展(Scale-Out),而不是升级单台服务器(Scale-Up)。 - Even if a shard fails, data can still be accessed from other shards.
即使某个 Shard 失效,数据仍可从其他 Shard 访问,提高可用性。
3. MongoDB Sharding Architecture MongoDB Sharding 体系结构**
Component | Function |
---|---|
Shard | Stores a subset of the dataset. |
Config Server | Maintains metadata and Shard distribution information. |
Mongos Router | Routes client queries to the appropriate Shard. |
组件 | 作用 |
---|---|
Shard(分片) | 存储数据,每个 Shard 只存储部分数据。 |
Config Server(配置服务器) | 维护 Shard 元数据(存储数据分布信息)。 |
Mongos Router(查询路由) | 充当应用程序和 Shard 之间的查询路由器。 |
4. Key Concepts in Sharding Sharding 关键概念
- Shard: A single database instance storing part of the dataset.
Shard:一个数据库实例,存储部分数据。 - Shard Key: A field that determines how data is partitioned.
Shard Key(分片键):决定数据如何在 Shards 之间分布的字段。 - Chunk: A data segment within a Shard (usually 64MB).
Chunk(数据块):Shard 内部的数据分块,通常大小为 64MB。 - Balancer: Automatically redistributes data between Shards.
Balancer(均衡器):自动在 Shards 之间重新分配数据,防止负载不均衡。
MongoDB 快速就地更新 (Fast In-Place Updates)
MongoDB 支持就地更新 (In-Place Updates),避免重新分配文档,提高性能。
MongoDB supports in-place updates, avoiding document reallocation and improving performance.
示例:就地更新 stock
字段
db.books.update(
{ _id: 1 },
{
$inc: { stock: 5 },
$set: { item: "ABC123" }
}
)
此操作不会更改文档大小,因此速度更快。
This operation does not change document size, making it faster.
MapReduce in MongoDB
MapReduce Overview MapReduce 概述****
- MapReduce is a data processing paradigm used to handle large-scale data sets.
MapReduce 是一种用于处理大规模数据集的数据处理范式。 - It consists of two primary operations: Map and Reduce.
包含两个主要操作:Map 和 Reduce。 - In MongoDB, MapReduce is used for aggregation, transformation, and computation over documents.
在 MongoDB 中,MapReduce 主要用于聚合、转换和计算文档数据。
Map Function
-
The Map function emits key-value pairs from each document.
Map 函数从每个文档中提取键值对(key-value pairs)。 -
Each document in a collection is processed by the function.
Map 函数会处理集合中的每个文档。 -
Example:
示例:
function() { emit(this.cust_id, this.amount); }
- Emits customer ID as the key and the transaction amount as the value. 将
cust_id
作为键,amount
作为值进行输出。
- Emits customer ID as the key and the transaction amount as the value. 将
Reduce Function
-
The Reduce function aggregates values based on their keys.
Reduce 函数根据键聚合对应的值。 -
It is applied to all values associated with the same key.
该函数应用于所有相同键的值集合。 -
Example:
示例:
function(key, values) { return Array.sum(values); }
- This function sums up all the transaction amounts for a given customer. 该函数对相同
cust_id
的交易金额求和。
- This function sums up all the transaction amounts for a given customer. 该函数对相同
Executing MapReduce
-
The MapReduce operation is executed on a MongoDB collection. 在 MongoDB 集合上执行 MapReduce 操作。
-
The query selects input documents, then applies
map
andreduce
functions. 查询筛选输入文档,并应用map
和reduce
函数。 -
Example:
示例:
db.orders.mapReduce( function() { emit(this.cust_id, this.amount); }, function(key, values) { return Array.sum(values); }, { query: { status: "A" }, out: "order_totals" } )
- Selects orders with
status: "A"
, maps transaction amounts bycust_id
, and reduces them intoorder_totals
. 选择status: "A"
的订单,将cust_id
作为键,并对其交易金额求和,最终输出到order_totals
集合。
- Selects orders with
Key Components of MapReduce
- Map Phase ("maps" data into key-value pairs)
Map 阶段(将数据映射为键值对) - Reduce Phase ("reduces" values associated with each key)
Reduce 阶段(对相同键的值进行聚合) - Query (selects documents to process)
查询(筛选需要处理的文档) - Output (defines where results are stored, either in a collection or inline)
输出(定义结果存储位置,可存入集合或直接返回)
Advantages of Using MapReduce in MongoDB
✅ Efficient for processing large data sets
✅ 适用于大规模数据处理
✅ Parallel processing capability
✅ 支持并行计算
✅ Supports custom logic for aggregation and transformation
✅ 支持自定义逻辑进行聚合和数据转换
Limitations of MapReduce in MongoDB
❌ Slower than aggregation framework
❌ 比 MongoDB 的聚合框架速度慢
❌ Consumes more memory and CPU
❌ 占用较多内存和 CPU 资源
❌ Not recommended for real-time analytics
❌ 不适用于实时分析
Use Cases of MapReduce in MongoDB
- Large-scale log processing 大规模日志处理
- Complex aggregations requiring custom logic 需要自定义逻辑的复杂数据聚合
- Summarization of transactional data 交易数据的汇总
- Extracting analytics from unstructured data 从非结构化数据中提取分析结果
Week 6
Motivation for Apache Spark
Speed and Performance
-
Big Data is about speed. Can we have something faster than MapReduce?
大数据的核心是速度。我们能比 MapReduce 更快吗? -
Spark runs workloads up to 100x faster than Hadoop MapReduce.
Spark 运行任务比 Hadoop MapReduce 快 100 倍。 -
Example: Logistic regression in Hadoop vs. Spark
示例:Hadoop 和 Spark 运行逻辑回归的时间对比
- Hadoop: 110 seconds
- Spark: 0.9 seconds
Extracting Value from Big Data
-
Big Data has value. The value comes from analyzing the data using Machine Learning, Stream Analytics, Graph Analytics, etc.
大数据的价值来自数据分析,例如机器学习、流式分析、图分析等。 -
Apache Spark provides multiple libraries for different analytical tasks:
Apache Spark 提供多个用于不同数据分析任务的库:
- Spark SQL → Structured Query Processing (结构化查询处理)
- Spark Streaming → Real-time Data Processing (实时数据处理)
- MLlib (Machine Learning Library) → Machine Learning (机器学习)
- GraphX → Graph Analytics (图分析)
Handling Large-Scale Data
-
Big Data is massive and comes from different cloud sources.
大数据规模庞大,并且来源于不同的云端数据源。 -
Apache Spark integrates with multiple big data technologies:
Apache Spark 可以与多个大数据技术集成:
- Hadoop
- Cassandra
- Apache HBase
- Apache Mesos
MapReduce's Inefficiencies
-
MapReduce is inefficient for applications that reuse intermediate results across multiple computations.
MapReduce 在需要重用中间结果的多次计算任务中效率低下。 -
Examples of iterative applications where MapReduce struggles:
MapReduce 难以处理的迭代计算任务示例:
- PageRank
- k-means clustering
- Logistic Regression
-
Why?
原因:
- MapReduce writes intermediate results to disk at each iteration.
MapReduce 在每次迭代后都会将中间结果写入磁盘。 - Spark, on the other hand, keeps data in-memory, significantly improving performance.
Spark 则将数据保存在内存中,大幅提升性能。
- MapReduce writes intermediate results to disk at each iteration.
Inefficiency in Interactive Data Mining
-
MapReduce is inefficient for interactive data mining tasks. MapReduce
在交互式数据挖掘任务中效率低下。 -
Example: Web log queries
示例:网页日志查询
- Find total views of:
- All pages
- Pages with titles exactly matching a given word
- Pages with titles partially matching a word
- Find total views of:
-
Why MapReduce is inefficient:
MapReduce 效率低下的原因:
- It writes to Hadoop Distributed File System (HDFS), which adds overhead.
它依赖 Hadoop 分布式文件系统(HDFS),带来额外开销。 - Overhead due to data replication, disk I/O, and serialization.
数据复制、磁盘 I/O 和序列化带来的开销较大。
- It writes to Hadoop Distributed File System (HDFS), which adds overhead.
Limitations of Hadoop and Potential Solutions
-
Hadoop is inefficient because it writes to HDFS, causing high disk I/O and serialization overhead.
Hadoop 因为依赖 HDFS,导致高磁盘 I/O 和序列化开销,效率低下。 -
Some frameworks attempt to optimize this inefficiency:
一些框架尝试优化这种低效问题:
- Pregel: Iterative graph computations (用于迭代图计算)
- HaLoop: Iterative MapReduce interface (用于迭代 MapReduce 计算)
-
However, these frameworks do not support generic data reuse.
但这些框架不支持通用数据重用。
- Example: A user cannot load all logs in memory for interactive analysis.
示例:用户无法将所有日志数据加载到内存中进行交互式分析。
- Example: A user cannot load all logs in memory for interactive analysis.
Apache Spark
What is Apache Spark?
什么是 Apache Spark?
- Unified engine for distributed data processing
用于分布式数据处理的统一引擎 - Extends MapReduce with efficient data reuse
扩展 MapReduce,提供高效的数据复用 - Supports various data processing libraries: Streaming, SQL, ML, Graph
支持多种数据处理库:流式计算(Streaming)、SQL、机器学习(ML)、图计算(Graph) - Compatible with multiple storage and cluster management systems
兼容多种存储和集群管理系统
Why Choose Apache Spark?
- In-memory computation → Faster performance
内存计算 → 更快的性能 - Optimized for iterative tasks
适用于迭代计算任务 - Supports SQL, streaming, machine learning, and graph analytics
支持 SQL 查询、流式计算、机器学习和图分析 - Can run on Hadoop, Mesos, Kubernetes, or standalone
可以运行在 Hadoop、Mesos、Kubernetes 或独立模式下
What is an RDD (Resilient Distributed Dataset)?
什么是 RDD(弹性分布式数据集)?
- Efficient data reuse and fault-tolerance
支持高效数据复用和容错性 - A read-only collection of objects, operated in parallel
只读对象集合,可并行处理 - User program can manipulate it, Supports partitioning and in-memory persistence
用户可以修改他 支持数据分区和内存持久化** - Enables large-scale distributed computing
支持大规模分布式计算
Core Properties of RDDs
RDD 的核心特性
- Lineage: Tracks how it was derived from other datasets
血统(Lineage):跟踪 RDD 来源,可从存储中重建 - Only RDDs that can be reconstructed after failure can be referenced by a user’s program
只有故障后可以重建的 RDD 才能被用户程序引用 - Persistence: Can be cached in memory for faster access
持久化(Persistence):可缓存到内存,提高计算效率 - Partitioning: Enables data distribution across nodes
分区(Partitioning):允许数据跨节点分布,提高并行计算能力
用户可以指定如何根据每条记录中的键在机器之间对 RDD 的记录进行分区
RDD Transformations
RDD 转换操作
- map(f: T ⇒ U): Applies a function to each element (1-1 mapping)
对每个元素应用函数(1 对 1 映射) - filter(f: T ⇒ Bool): Selects elements satisfying a condition
筛选符合条件的元素 - flatMap(f: T ⇒ Seq[U]): Maps each input item to multiple output items
类似 map,但可生成多个输出项(类似于 MapReduce 的 map) - groupByKey(): Groups elements by key
按键分组 - reduceByKey(f: (V, V) ⇒ V): Aggregates values by key (optimized version of groupByKey)
按键聚合值(比 groupByKey 更高效) - join(): Joins two RDDs on key values
基于键值关联两个 RDD
RDD Actions
RDD 行动操作
- count(): Returns the number of elements
计算 RDD 中元素的个数 - collect(): Returns all elements as a list
返回所有元素的列表 - reduce(f: (T, T) ⇒ T): Aggregates elements using a function
使用函数对元素进行聚合计算 - lookup(k: K): Retrieves values associated with a given key
按键查询值 - save(path: String): Saves the RDD to a storage system like HDFS
将 RDD 保存到存储系统(如 HDFS)
RDD Persistence
RDD 持久化
-
Stored in RAM by default, spilled to disk when memory is insufficient
默认存储在 RAM,内存不足时溢写到磁盘 -
Users can choose alternative storage strategies
用户可选择不同存储策略
- Disk storage only → 仅存储到磁盘
- Replication across machines → 跨机器复制
- Priority setting for spilling in-memory data to disk → 设置优先级,控制数据溢写
Why is Spark Faster than MapReduce?
为什么 Spark 比 MapReduce 更快?
- Avoids writing intermediate results to disk
避免将中间结果写入磁盘 - In-memory computation significantly reduces I/O overhead
内存计算显著降低 I/O 开销 - Optimized for iterative and interactive workloads
针对迭代计算和交互式任务进行了优化
Limitations of MapReduce
MapReduce 的局限性
-
Inefficient for iterative computations
不适用于迭代计算
- Example: PageRank, k-means clustering, logistic regression
示例:PageRank、k-means 聚类、逻辑回归
- Example: PageRank, k-means clustering, logistic regression
-
Slow for interactive data mining
交互式数据挖掘性能较低
- Example: Analyzing website logs with multiple ad-hoc queries
示例:对网站日志进行多次查询
- Example: Analyzing website logs with multiple ad-hoc queries
-
High overhead due to HDFS writes and data replication
因 HDFS 写入和数据复制导致高开销
Solutions to MapReduce Inefficiencies
如何优化 MapReduce 的低效问题?
- Pregel: Optimized for iterative graph computations
Pregel:优化的图计算框架 - HaLoop: Iterative MapReduce interface
HaLoop:迭代式 MapReduce 接口 - Apache Spark: Unified framework that enables in-memory computation
Apache Spark:统一的大数据计算框架,支持内存计算
Lazy Evaluation 惰性评估
Key Mechanism
- Delayed Execution: Spark triggers computations only upon encountering the first action (e.g.,
count()
,collect()
). Transformations (e.g.,map
,filter
) are lazily evaluated, only recording metadata.
延迟执行:Spark仅在遇到第一个 动作(Action)(如count()
、collect()
)时触发计算,对 转换操作(Transformations)(如map
、filter
)进行惰性评估,仅记录元数据。 - Global Optimization: By constructing a Directed Acyclic Graph (DAG) of operations, Spark optimizes execution plans (e.g., combining operations, minimizing data shuffling).
全局优化:通过构建操作的有向无环图(DAG),Spark优化执行计划(如合并操作、减少数据混洗)。 - Fault Tolerance: Recomputes lost partitions from lineage (dependency graph) instead of replicating data.
容错性:通过血缘关系(依赖图)重新计算丢失的分区,而非依赖数据副本。
RDD Representation
Core Components
- Partitions: Atomic chunks of data (e.g., HDFS blocks for file-based RDDs).
分块(Partitions):数据的原子块(如基于文件的RDD对应HDFS块)。 - Dependencies: Track parent RDDs (narrow or wide dependencies).
依赖关系(Dependencies):记录父RDD的依赖类型(窄依赖或宽依赖)。 - Compute Function: Logic to derive the RDD from parent RDDs.
计算函数(Compute Function):从父RDD生成当前RDD的逻辑。 - Metadata: Partitioning scheme (e.g., hash, range) and data placement (node locations).
元数据(Metadata):分区策略(如哈希、范围分区)和数据位置信息。
Representing Dependency Types
依赖类型
Narrow Dependencies
- Definition: Each parent partition is used by at most one child partition.
定义:父RDD的每个分区最多被子RDD的一个分区使用。 - Examples:
map
,filter
(1-to-1 transformations).
示例:map
、filter
(一对一转换操作)。 - Advantages:
- Enable pipelining (e.g.,
map → filter
executed in-memory).
支持流水线执行(如map → filter
在内存中连续执行)。 - Efficient fault recovery (only recompute lost parent partitions).
高效容错恢复(仅需重新计算丢失的父分区)。
- Enable pipelining (e.g.,
Wide Dependencies
- Definition: Child partitions depend on multiple parent partitions (e.g.,
groupByKey
,join
).
定义:子RDD的分区依赖多个父分区(如groupByKey
、join
)。 - Examples: Shuffle operations requiring data redistribution.
示例:需要数据重分布的操作(如Shuffle)。 - Challenges:
- Require data shuffling across nodes.
跨节点数据混洗。 - Fault recovery may need full recomputation.
容错恢复可能需要完全重新计算。
- Require data shuffling across nodes.
Example: Log Analysis Workflow
示例:日志分析流程
# 1. Load HDFS file into an RDD
lines = spark.textFile("hdfs://logs/error_logs")
# 2. Filter error messages (lazy transformation)
errors = lines.filter(lambda line: line.startswith("ERROR"))
# 3. Persist the filtered RDD in memory
errors.persist()
# 4. Trigger actions (compute and output results)
errors.count() # Action 1: Count total errors
errors.filter(lambda line: "MySQL" in line).count() # Action 2: Count MySQL-related errors
errors.filter(lambda line: "HDFS" in line).map(lambda line: line.split("\t")[3]).collect() # Action 3: Extract timestamps
-
Behavior:
执行流程- No computation occurs until
errors.count()
(the first action).
在第一个动作(
errors.count()
)前无实际计算。- After the first action,
errors
RDD is cached in memory for reuse.
首次动作后,
errors
RDD被缓存以复用。 - No computation occurs until
Why Lazy Evaluation?
为何需要惰性评估?
- Optimization Opportunities:
优化机会:- Spark analyzes the entire DAG to eliminate redundant steps (e.g., skip unused transformations).
Spark分析完整DAG以消除冗余步骤(如跳过未使用的转换操作)。
- Spark analyzes the entire DAG to eliminate redundant steps (e.g., skip unused transformations).
- Resource Efficiency:
资源高效利用:- Avoids intermediate data storage (e.g., no need to save results of
filter
until an action is called).
避免存储中间结果(如无需保存filter
结果直到触发动作)。
- Avoids intermediate data storage (e.g., no need to save results of
Advantages of RDDs
1. Coarse-Grained Writes vs. Fine-Grained Reads 粗粒度写入与细粒度读取
-
RDDs:
- Writes: Bulk transformations applied to the entire dataset (e.g.,
map
,filter
).
写入:对整个数据集进行批量转换(如map
、filter
)。 - Reads: Fine-grained access (e.g., querying specific partitions or elements).
读取:支持细粒度访问(如查询特定分区或元素)。 - Fault Tolerance: Lineage-based recovery avoids data replication.
容错性:基于血缘关系的恢复机制,无需数据副本。
- Writes: Bulk transformations applied to the entire dataset (e.g.,
-
Distributed Shared Memory (DSM):
-
Writes: Fine-grained (arbitrary memory location updates).
写入:细粒度(任意内存位置更新)。 -
Reads: Fine-grained.
读取:细粒度。 -
Challenge: Requires checkpoints and rollback for fault tolerance.
挑战:容错依赖检查点和程序回滚。
-
2. Immutability and Consistency 不可变性与一致性
- RDDs:
- Immutable: Read-only partitions ensure trivial consistency.
不可变性:只读分区确保简单的一致性。 - Parallel Recovery: Lost partitions recomputed in parallel via lineage.
并行恢复:通过血缘关系并行重新计算丢失的分区。
- Immutable: Read-only partitions ensure trivial consistency.
- DSM:
- Mutable: Requires runtime or application-level consistency management.
可变性:需依赖运行时或应用层的一致性管理。 - Complex Recovery: May need full checkpoints.
复杂恢复:可能需要完整检查点。
- Mutable: Requires runtime or application-level consistency management.
3. Straggler Mitigation 应对慢节点(Straggler)
- RDDs:
- Speculative Execution: Re-run slow tasks on backup nodes.
推测执行:在备份节点上重新执行慢任务。 - Data Locality: Automatically schedules tasks close to data partitions.
数据本地性:自动根据数据位置调度任务。
- Speculative Execution: Re-run slow tasks on backup nodes.
- DSM:
- Difficult: Lack of lineage makes task rescheduling inefficient.
困难:缺乏血缘关系导致任务重新调度低效。
- Difficult: Lack of lineage makes task rescheduling inefficient.
4. Memory Management 内存管理
- RDDs:
- Graceful Degradation: Spills partitions to disk if RAM is insufficient.
优雅降级:内存不足时,将分区溢出到磁盘。 - In-Memory Priority: Cached partitions stay in memory for fast reuse.
内存优先:缓存的分区保留在内存中以快速复用。
- Graceful Degradation: Spills partitions to disk if RAM is insufficient.
- DSM:
- Poor Performance: Swapping to disk leads to high latency.
性能低下:磁盘交换导致高延迟。
- Poor Performance: Swapping to disk leads to high latency.
Spark Programming Interface
Spark 编程接口
Components
- Driver Program:
- Defines RDDs and invokes actions (e.g.,
collect()
,count()
). - Tracks RDD lineage for fault tolerance.
驱动程序:定义RDD并触发动作(如collect()
、count()
),跟踪RDD血缘关系以实现容错。
- Defines RDDs and invokes actions (e.g.,
- Worker Nodes:
- Processes: Execute tasks (transformations/actions) assigned by the driver.
流程:执行驱动程序分配的任务(转换/操作)。 - Data Storage: Store RDD partitions in memory or disk.
数据存储:在内存或磁盘中存储 RDD 分区。 - Data Locality: Read input blocks from distributed storage (e.g., HDFS).
工作节点:执行驱动程序分配的任务(转换/动作),在内存或磁盘存储RDD分区,优先从分布式存储(如HDFS)读取数据块。
- Processes: Execute tasks (transformations/actions) assigned by the driver.
Workflow Example
执行流程示例
-
Driver Program:
# Define RDD from HDFS lines = spark.textFile("hdfs://logs/data") # Transformation errors = lines.filter(lambda x: "ERROR" in x) # Action errors.persist().count()
-
Workers:
- Load HDFS blocks into memory.
将 HDFS 数据块加载到内存中。 - Apply
filter
transformation and cache results.
应用过滤器转换并缓存结果。 - Return count to the driver.
- Load HDFS blocks into memory.
Applications Suitable for RDDs
RDD 的适用场景
- Batch Processing:
- Example: Log analysis, ETL pipelines.
- Fit: Operations applied uniformly across the dataset (e.g.,
map
,reduce
).
批处理:如日志分析、ETL流程,适合对数据集进行统一操作(如map
、reduce
)。
- Iterative Algorithms:
- Example: Machine learning (e.g., gradient descent).
- Fit: Reuse intermediate results via caching (e.g.,
errors.persist()
).
迭代算法:如机器学习(梯度下降),通过缓存复用中间结果。
- Fault-Tolerant Queries:
- Example: Interactive queries on large datasets.
- Fit: Lineage enables fast recovery without data replication.
容错查询:如大数据集的交互式查询,血缘关系支持快速恢复。
Applications Unsuitable for RDDs
RDD 的不适用场景
- Asynchronous Fine-Grained Updates:
- Example: Real-time web app databases with frequent row-level updates.
- Limitation: RDDs are immutable; updates require creating new RDDs.
异步细粒度更新:如需要频繁行级更新的实时Web应用数据库,RDD不可变特性导致更新需创建新RDD。
- Low-Latency Random Access:
- Example: Key-value stores requiring microsecond reads.
- Limitation: RDDs optimized for sequential/parallel access, not random lookups.
低延迟随机访问:如需要微秒级读取的键值存储,RDD针对顺序/并行访问优化,而非随机查询。
How Spark Runs in Cluster Mode Spark 集群模式运行机制
Cluster Components 集群核心组件****
-
Cluster Manager
- Role: Allocates resources (CPU, memory) across applications.
作用:为应用程序分配资源(CPU、内存)。 - Supported Types:
- Standalone (Spark’s built-in manager).
Standalone(Spark内置管理器)。 - Apache Mesos (general-purpose cluster manager).
Apache Mesos(通用集群管理器)。 - Hadoop YARN (resource manager in Hadoop ecosystem).
Hadoop YARN(Hadoop生态系统资源管理器)。
- Standalone (Spark’s built-in manager).
- Role: Allocates resources (CPU, memory) across applications.
-
Driver Program
- Role:
- Runs the
main()
function of the application.
运行应用程序的main()
函数。 - Maintains SparkContext (entry point to Spark functionality).
维护SparkContext(Spark功能的入口点)。 - Schedules tasks to executors and monitors their status.
调度任务至执行器并监控状态。
- Runs the
- Requirements:
- Must be network-addressable by worker nodes.
必须能被工作节点通过网络访问。 - Ideally deployed close to workers (e.g., same LAN) to minimize latency.
建议与工作节点部署在同一局域网(LAN)以降低延迟。
- Must be network-addressable by worker nodes.
- Role:
-
Executors
-
Role:
- Run tasks assigned by the driver.
执行驱动程序分配的任务。 - Store RDD partitions in memory/disk.
在内存/磁盘存储RDD分区。 - Report task status to the driver.
向驱动程序报告任务状态。
- Run tasks assigned by the driver.
-
Isolation: Each application has its own executors (no resource sharing).
隔离性:每个应用程序独享执行器(资源不共享)。
-
-
Tasks
- Definition: Smallest unit of computation (e.g., processing a partition).
定义:最小的计算单元(如处理一个分区)。 - Execution: Sent by the driver to executors via the cluster manager.
执行:由驱动程序通过集群管理器发送给执行器。
- Definition: Smallest unit of computation (e.g., processing a partition).
-
SparkContext
- Role:
- Connects to the cluster manager.
连接集群管理器。 - Coordinates resource allocation and task distribution.
协调资源分配与任务分发。 - Manages application lifecycle (e.g., job submission).
管理应用程序生命周期(如作业提交)。
- Connects to the cluster manager.
- Role:
Cluster Mode Workflow
集群模式执行流程
-
Application Submission:
- User submits a Spark application (e.g., Python script) to the cluster.
- Driver program starts and initializes
SparkContext
.
应用程序提交:用户向集群提交Spark应用(如Python脚本),驱动程序启动并初始化
SparkContext
。 -
Resource Allocation:
SparkContext
requests resources (executors) from the cluster manager.- Cluster manager launches executors on worker nodes.
资源分配:
SparkContext
向集群管理器申请资源(执行器),集群管理器在工作节点上启动执行器。 -
Task Execution:
- Driver splits the job into tasks (based on RDD partitions).
- Tasks are sent to executors via the cluster manager.
- Executors run tasks in parallel and return results to the driver.
任务执行:驱动程序将作业拆分为任务(基于RDD分区),任务通过集群管理器发送至执行器,执行器并行运行任务并返回结果。
-
Shutdown:
- After all tasks complete,
SparkContext
releases resources. - Cluster manager terminates executors.
关闭流程:任务完成后,
SparkContext
释放资源,集群管理器终止执行器。 - After all tasks complete,
PySpark Shell Operations
1. Parallelized Collections
并行化集合
-
sc.parallelize()
:-
Purpose: Converts a local collection (e.g., list) into a distributed RDD.
作用:将本地集合(如列表)转换为分布式RDD。 -
Syntax:
parallel_col = sc.parallelize(data, num_partitions) # num_partitions is optional
- Example:
d = [1, 2, 3, 4, 5] parallel_col = sc.parallelize(d) # Auto-detect partitions parallel_col = sc.parallelize(d, 2) # Manually set 2 partitions
-
-
count()
:-
Action: Returns the number of elements in the RDD.
行动操作:返回RDD中的元素总数。 -
Example:
parallel_col.count() # Output: 5
-
2. Transformations
转换操作
-
filter(func)
:-
Purpose: Returns a new RDD containing elements that satisfy
func
.
作用:返回满足条件func
的元素组成的新RDD。 -
Example:
tf = sc.textFile("file:///usr/share/dict/words") lines_nonempty = tf.filter(lambda x: len(x) > 0) lines_nonempty.count() # Count non-empty lines
-
-
map(func)
:-
Purpose: Applies
func
to each element and returns a new RDD.
作用:对每个元素应用func
,返回新RDD。 -
Example:
nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() # [1, 4, 9, 16]
-
-
flatMap(func)
:-
Purpose: Similar to
map
, but flattens the output (e.g., splits strings into lists).
作用:类似map
,但输出会被展平(如将字符串拆分为列表)。 -
Example:
x = sc.parallelize(["a b", "c d"]) y = x.flatMap(lambda x: x.split(' ')).collect() # ['a', 'b', 'c', 'd']
-
3. Set Operations
集合操作
Operation | Description |
---|---|
rddA.union(rddB) |
Returns an RDD containing all elements from both RDDs. 返回两个RDD的并集。 |
rddA.intersection(rddB) |
Returns an RDD of common elements between two RDDs. 返回两个RDD的交集。 |
rddA.subtract(rddB) |
Returns elements in rddA but not in rddB .返回 rddA 中存在但rddB 中不存在的元素。 |
rddA.cartesian(rddB) |
Returns the Cartesian product of two RDDs. 返回两个RDD的笛卡尔积。 |
rddA.join(rddB) |
Joins RDDs of (K, V) pairs by key, returning (K, (V, W)) pairs. 对键值对RDD按Key连接,返回 (K, (V, W)) 对。 |
Example:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("b", 3), ("d", 4)])
x.join(y).collect() # Output: [('b', (2, 3))]
4. Reduction & Aggregation
聚合操作
-
reduce(func)
:-
Purpose: Aggregates elements using a commutative and associative function.
作用:使用交换律和结合律函数聚合元素。 -
Example:
rdd = sc.parallelize([1, 2, 3, 4, 5]) total = rdd.reduce(lambda x, y: x + y) # Output: 15
-
-
fold(zero_value, func)
:-
Purpose: Similar to
reduce
, but with an initial value (zero_value
).
作用:类似reduce
,但可指定初始值。 -
Example:
sum2 = rdd.fold(0.0, lambda x, y: x + y) # Output: 15.0
-
-
aggregate(zero_value, seq_op, comb_op)
:-
Purpose: Aggregates elements within partitions (
seq_op
) and merges results across partitions (comb_op
).
作用:分区内聚合(seq_op
)和跨分区合并结果(comb_op
)。 -
Example:
rdd = sc.parallelize([1, 2, 3, 4], 2) # Define functions seq_op = lambda local_res, elem: (local_res[0] + elem, local_res[1] + 1) comb_op = lambda resA, resB: (resA[0] + resB[0], resA[1] + resB[1]) result = rdd.aggregate((0, 0), seq_op, comb_op) # Output: (10, 4)
-
5. Accumulators
累加器
-
Purpose: Shared variables for aggregating values across tasks (e.g., counters).
作用:跨任务聚合值的共享变量(如计数器)。 -
Rules:
- Only the driver can read the value.
仅驱动程序可读取累加器的值。 - Tasks can only add values (using
add()
).
任务只能通过add()
累加值。
- Only the driver can read the value.
-
Example:
accum = sc.accumulator(0) sc.parallelize([1, 2, 3, 4, 5]).foreach(lambda x: accum.add(x)) print(accum.value) # Output: 15
6. Persistence
持久化
-
persist(storage_level)
:-
Purpose: Caches RDD in memory/disk for reuse.
作用:将RDD缓存到内存/磁盘以供复用。 -
Storage Levels:
存储级别:
Level Use Disk Use Memory Serialized Replication MEMORY_ONLY
❌ ✔️ ❌ 1 MEMORY_AND_DISK
✔️ ✔️ ❌ 1 MEMORY_ONLY_SER
❌ ✔️ ✔️ 1 DISK_ONLY
✔️ ❌ ❌ 1 - Example:
rdd.persist(StorageLevel.MEMORY_AND_DISK)
-
7. Example Application
示例应用
"""SimpleApp.py"""
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext("local", "Simple App")
# Read data and cache
logFile = "YOUR_SPARK_HOME/README.md"
rdd = sc.textFile(logFile).cache()
# Count lines containing 'a'
numAs = rdd.filter(lambda s: 'a' in s).count()
print("Lines with 'a':", numAs)
# Stop SparkContext
sc.stop()
-
Execution:
spark-submit --master local[4] SimpleApp.py
Week 7
HIVE
Motivation for HIVE
- Limitations of Traditional Data Warehousing:
- Scalability Issues: Traditional systems were not scalable enough for web-scale data.
传统数据仓库的限制:扩展性不足,难以处理网络规模的数据。 - High Costs: Prohibitively expensive (up to $200K/TB), making it impractical for large-scale use.
高昂成本:成本极高(每TB高达20万美元),不适合大规模应用。 - Lack of Execution Control: Limited control over execution methods and query optimization.
执行控制不足:难以优化查询或控制执行方式。 - Parallel Environment Challenges: Sparse statistics and reliance on UDFs complicated operations.
并行环境挑战:缺乏统计信息,依赖大量UDF(用户自定义函数)。
- Scalability Issues: Traditional systems were not scalable enough for web-scale data.
- MapReduce Limitations:
- Rigid Data Flow: Fixed one-input, two-stage structure (map-shuffle-reduce), requiring complex workarounds.
数据流僵化:固定的单输入、两阶段流程(Map-Shuffle-Reduce),修改复杂。 - Custom Code Overhead: Common operations (e.g., joins, sorting) required custom code, reducing accessibility.
代码冗余:需为常见操作(如连接、排序)编写自定义代码,门槛高。 - Opaque Functions: Map/Reduce functions were hard to maintain, extend, or optimize.
函数不透明:Map/Reduce函数难以维护、扩展或优化。
- Rigid Data Flow: Fixed one-input, two-stage structure (map-shuffle-reduce), requiring complex workarounds.
- HIVE’s Solution:
- High-Level Abstraction: Introduced SQL-like declarative language (HiveQL) to compile queries into Hadoop jobs.
高层抽象:通过类SQL语言(HiveQL)将查询编译为Hadoop任务。
- High-Level Abstraction: Introduced SQL-like declarative language (HiveQL) to compile queries into Hadoop jobs.
What is HIVE?
- Definition: A Hadoop-based open-source data warehousing solution that enables SQL-like queries (HiveQL) on unstructured data.
定义:基于Hadoop的开源数据仓库,支持对非结构化数据进行类SQL查询(HiveQL)。 - Key Features:
- Table Abstraction: Treats unstructured data as structured tables.
表抽象:将非结构化数据视为结构化表。 - Execution Plan Generation: Automatically translates queries into optimized MapReduce/Tez/Spark jobs.
执行计划生成:自动将查询编译为优化的MapReduce/Tez/Spark任务。
- Table Abstraction: Treats unstructured data as structured tables.
Historical Context
- Origin: Developed at Facebook (2007–2009) to address:
起源:由Facebook开发,用于解决:- Data Growth: From 15TB (2007) to 700TB (2009).
数据增长:数据量从15TB(2007年)激增至700TB(2009年)。 - RDBMS Inefficiency: Days-long processing times for traditional databases.
传统数据库低效:处理耗时长达数天。 - Hadoop Complexity: Low productivity due to manual MapReduce coding.
Hadoop复杂性:手动编写MapReduce代码导致效率低下。
- Data Growth: From 15TB (2007) to 700TB (2009).
Applications of HIVE
- Log Processing:
- Example: Analyzing server logs for error patterns.
日志处理:分析服务器日志中的错误模式。
- Example: Analyzing server logs for error patterns.
- Text Mining:
- Example: Extracting insights from social media text.
文本挖掘:从社交媒体文本中提取信息。
- Example: Extracting insights from social media text.
- Document Indexing:
- Example: Building search indices for large document repositories.
文档索引:为大规模文档库构建搜索索引。
- Example: Building search indices for large document repositories.
- Business Intelligence:
- Example: Customer-facing analytics (e.g., Google Analytics).
商业智能:面向客户的业务分析(如Google Analytics)。
- Example: Customer-facing analytics (e.g., Google Analytics).
- Predictive Modeling:
- Example: Training machine learning models on historical data.
预测建模:基于历史数据训练机器学习模型。
- Example: Training machine learning models on historical data.
- Hypothesis Testing:
- Example: Validating A/B testing results at scale.
假设检验:大规模验证A/B测试结果。
- Example: Validating A/B testing results at scale.
Technical Advantages
- Cost-Effective Scaling: Leverages Hadoop’s distributed storage for low-cost scaling.
经济高效的扩展:利用Hadoop分布式存储降低成本。 - Familiar Interface: SQL-like syntax lowers the learning curve for analysts.
易用接口:类SQL语法降低分析师的学习门槛。 - Flexible Execution: Supports multiple execution engines (MapReduce, Tez, Spark).
灵活执行:支持多引擎(MapReduce、Tez、Spark)。
Components of HIVE
Client Components
- Command Line Interface (CLI):
- Enables direct interaction with Hive via command-line.
命令行接口:通过命令行直接与Hive交互。
- Enables direct interaction with Hive via command-line.
- Web UI:
- Provides a graphical interface for managing and querying data.
网页界面:图形化管理与查询数据。
- Provides a graphical interface for managing and querying data.
- JDBC/ODBC Driver:
- Allows integration with external applications using Java or ODBC protocols.
JDBC/ODBC驱动:支持通过Java或ODBC协议与外部应用集成。
- Allows integration with external applications using Java or ODBC protocols.
Core Components
- Driver:
- Role: Manages the lifecycle of HiveQL statements.
作用:管理HiveQL语句的生命周期。 - Key Functions:
- Maintains session handles and statistics.
- Coordinates query execution across components.
关键功能:维护会话句柄与统计信息,协调组件间的查询执行。
- Role: Manages the lifecycle of HiveQL statements.
- Compiler:
- Role: Translates HiveQL queries into MapReduce tasks.
作用:将HiveQL查询编译为MapReduce任务。 - Process:
- Parses the query.
- Generates an execution plan.
流程:解析查询 → 生成执行计划。
- Role: Translates HiveQL queries into MapReduce tasks.
- Optimizer:
- Role: Optimizes the execution plan (e.g., predicate pushdown, cost-based optimizations).
作用:优化执行计划(如谓词下推、基于成本的优化)。
- Role: Optimizes the execution plan (e.g., predicate pushdown, cost-based optimizations).
- Executor:
- Role: Executes tasks in dependency order and interacts with Hadoop.
作用:按依赖顺序执行任务,并与Hadoop交互。 - Key Functions:
- Manages task scheduling.
- Handles Hadoop job submissions.
关键功能:任务调度管理,提交Hadoop作业。
- Role: Executes tasks in dependency order and interacts with Hadoop.
- Metastore:
- Role: Acts as the system catalog, storing metadata (tables, partitions, HDFS paths).
作用:存储元数据(表、分区、HDFS路径等),作为系统目录。 - Storage: Runs on an RDBMS (e.g., MySQL) for low-latency access.
存储:基于关系型数据库(如MySQL),确保低延迟访问。
- Role: Acts as the system catalog, storing metadata (tables, partitions, HDFS paths).
- Thrift Server:
- Role: Provides a bridge between clients and the Metastore.
作用:客户端与元存储之间的桥梁。 - Function: Enables clients to query or modify Metastore metadata programmatically.
功能:支持通过编程方式查询或修改元数据。
- Role: Provides a bridge between clients and the Metastore.
HIVE Data Model
Data Units
- Table:
- Structure:
- Rows with predefined columns.
结构:由行和预定义列组成。
- Rows with predefined columns.
- Column Types:
- Basic:
int
,float
,boolean
. - Complex:
List<element-type>
Map<key-type, value-type>
Struct<field-name: field-type, ...>
列类型:基础类型(整型、浮点等)、复杂类型(列表、映射、结构体)。
- Basic:
- Custom Types:
- Created by composing basic and complex types (e.g.,
List<Map<String, Struct<p1:int, p2:int>>
).
自定义类型:通过组合基础与复杂类型构建。
- Created by composing basic and complex types (e.g.,
- Structure:
- Partition:
- Definition: Logical division of a table based on column values (e.g., date, region).
定义:基于列值(如日期、地区)对表的逻辑划分。 - Usage:
- Accelerates queries by scanning only relevant partitions (partition pruning).
- Example: Partitioning a log table by
date
.
用途:通过分区剪枝提升查询效率(如按日期分区日志表)。
- Definition: Logical division of a table based on column values (e.g., date, region).
- Bucket:
- Definition: Subdivision of a partition or table into files based on hash values.
定义:基于哈希值将分区或表细分为文件。 - Usage:
- Avoids excessive small partitions (e.g., partitioning by
user_id
may create too many partitions). - Enables efficient sampling (e.g.,
SELECT * FROM table TABLESAMPLE(BUCKET 1 OUT OF 96)
).
用途:避免过多小分区,支持高效采样(如按用户ID分桶)。
- Avoids excessive small partitions (e.g., partitioning by
- Definition: Subdivision of a partition or table into files based on hash values.
HDFS Mapping
- Table → HDFS Directory:
- Example: Table
test_part
maps to/user/hive/warehouse/test_part
.
表映射:表对应HDFS目录(如test_part
映射到/user/hive/warehouse/test_part
)。
- Example: Table
- Partition → Subdirectory:
- Example: Partition
ds='2009-02-02', hr=11
maps to/user/hive/warehouse/test_part/ds=2009-02-02/hr=11
.
分区映射:分区对应表的子目录(如按日期和小时分区)。
- Example: Partition
- Bucket → File:
- Example: Bucket files (
bucket1
,bucket2
) reside in partition or table directories.
桶映射:桶对应分区或表目录中的文件(如bucket1
文件)。
- Example: Bucket files (
Efficiency Mechanisms
- Partition Pruning:
- Scans only relevant HDFS subdirectories, reducing I/O.
分区剪枝:仅扫描相关子目录,减少I/O开销。
- Scans only relevant HDFS subdirectories, reducing I/O.
- Bucketing for Sampling:
- Enables fast sampling via bucket files (e.g., 1/96 sample by reading the first bucket).
分桶采样:通过桶文件快速采样(如读取第一个桶生成1/96样本)。
- Enables fast sampling via bucket files (e.g., 1/96 sample by reading the first bucket).
HIVE Query Language
Supported SQL Subset
- Core Operations:
FROM
,JOIN
(INNER, LEFT OUTER, RIGHT OUTER, FULL OUTER),GROUP BY
, aggregation functions (e.g.,SUM
,AVG
),UNION ALL
.
核心操作:支持FROM
、JOIN
(内连接、左外连接等)、GROUP BY
、聚合函数、UNION ALL
。
- Metadata Browsing:
SHOW TABLES
: Lists available tables.EXPLAIN PLAN
: Displays the execution plan of a query.
元数据浏览:SHOW TABLES
显示表列表,EXPLAIN PLAN
展示查询执行计划。
Limitations
- Insert Operations:
- Overwrite-Only: Inserts always overwrite existing data; no support for appending to tables/partitions.
仅覆盖写入:插入操作会覆盖已有数据,不支持追加。
- Overwrite-Only: Inserts always overwrite existing data; no support for appending to tables/partitions.
- Join Predicates:
- Equality-Only: Only equality-based conditions (e.g.,
ON a.id = b.id
) are allowed inJOIN
clauses.
仅支持等值连接:JOIN
条件必须为等值比较(如ON a.id = b.id
)。
- Equality-Only: Only equality-based conditions (e.g.,
Compare With Mysql
Efficient Map-side JOIN in HIVE
Overview
- Purpose: Optimizes JOIN operations between a small table and a large table by avoiding the reduce phase.
目的:通过避免Reduce阶段,优化小表与大表之间的JOIN操作。
Mechanism
- In-Memory Processing:
- The smaller table is loaded entirely into memory.
- Chunks of the larger table are streamed and joined with the in-memory small table.
内存处理:小表完全加载到内存中,逐块读取大表并与内存中的小表进行连接。
- No Reduce Phase:
- Eliminates the need for shuffling and sorting, reducing I/O and latency.
无Reduce阶段:省去数据混洗和排序,降低I/O和延迟。
- Eliminates the need for shuffling and sorting, reducing I/O and latency.
示例查询:将stocks
表(大表)与dividends
表(小表)按日期和股票代码连接,筛选苹果公司(AAPL)数据。
Advantages
- Performance Boost:
- Avoids costly shuffle and reduce steps.
性能提升:避免高开销的Shuffle和Reduce阶段。
- Avoids costly shuffle and reduce steps.
- Memory Efficiency:
- Minimizes disk I/O by leveraging in-memory processing.
内存高效:利用内存处理减少磁盘I/O。
- Minimizes disk I/O by leveraging in-memory processing.
Best Practices
- Small Table Size:
- Ensure the smaller table fits within the configured memory threshold.
小表大小:确保小表不超过内存阈值。
- Ensure the smaller table fits within the configured memory threshold.
- Partitioning:
- Partition large tables to further optimize data scanning.
分区优化:对大表分区以减少扫描范围。
- Partition large tables to further optimize data scanning.
- Monitor Memory Usage:
- Adjust
hive.mapjoin.smalltable.filesize
based on cluster resources.
内存监控:根据集群资源调整小表大小参数。
- Adjust
Serializer/Deserializer (SerDe)
Role of SerDe
- Data Transformation: Converts raw file data (e.g., CSV, JSON) into a tabular format for Hive.
数据转换:将原始文件(如CSV、JSON)转换为Hive表结构。 - Flexibility:
- Built-in SerDes for common formats (e.g.,
LazySimpleSerDe
). - Custom SerDes via Java for specialized formats (e.g., Avro, Parquet).
灵活性:支持内置及第三方SerDe(如Avro、Parquet)。
- Built-in SerDes for common formats (e.g.,
Lazy SerDe
- Optimization Strategy:
- Delays object materialization until specific fields are accessed.
- Reduces Java object creation overhead, improving performance.
优化策略:延迟对象实例化,减少Java对象创建开销,提升性能。
Pros and Cons of HIVE
Advantages
- Scalability:
- Processes petabytes of data using Hadoop’s distributed framework.
扩展性:基于Hadoop分布式框架处理PB级数据。
- Processes petabytes of data using Hadoop’s distributed framework.
- SQL-Like Interface:
- Lowers the barrier for analysts familiar with SQL.
类SQL接口:降低SQL用户的学习成本。
- Lowers the barrier for analysts familiar with SQL.
- Extensibility:
- Supports UDFs (User-Defined Functions), UDAFs (Aggregate Functions), and UDTFs (Table Functions).
可扩展性:支持自定义函数(UDF/UDAF/UDTF)。
- Supports UDFs (User-Defined Functions), UDAFs (Aggregate Functions), and UDTFs (Table Functions).
- Optimized Execution:
- Generates efficient MapReduce/Tez/Spark execution plans.
优化执行:生成高效的MapReduce/Tez/Spark执行计划。
- Generates efficient MapReduce/Tez/Spark execution plans.
- Interoperability:
- Integrates with traditional databases (via JDBC/ODBC) and Hadoop ecosystem tools (e.g., HBase).
互操作性:兼容传统数据库(JDBC/ODBC)及Hadoop生态工具(如HBase)。
- Integrates with traditional databases (via JDBC/ODBC) and Hadoop ecosystem tools (e.g., HBase).
Disadvantages
- Immutable Data:
- HDFS Limitation: Cannot append data directly due to HDFS file immutability.
数据不可变性:HDFS文件不可修改,无法直接追加数据。
- HDFS Limitation: Cannot append data directly due to HDFS file immutability.
- Latency:
- Not optimized for real-time queries; designed for batch processing.
高延迟:适用于批处理,不支持实时查询。
- Not optimized for real-time queries; designed for batch processing.
- Java Overhead:
- SerDe and UDFs rely on Java, which may introduce performance bottlenecks.
Java开销:SerDe和UDF依赖Java,可能影响性能。
- SerDe and UDFs rely on Java, which may introduce performance bottlenecks.
SparkSQL
- Relational & Functional Integration: A Spark module that combines relational processing with Spark’s functional programming API, enabling declarative queries and optimized storage.
关系与函数式整合:Spark模块,将关系型处理与函数式API结合,支持声明式查询和存储优化。 - Key Benefits:
- Allows Spark programmers to use relational optimizations.
优势1:开发者可利用关系型优化技术。 - Enables SQL users to access Spark’s advanced analytics (e.g., ML).
优势2:SQL用户可调用Spark复杂分析库(如机器学习)。
- Allows Spark programmers to use relational optimizations.
Motivation
- MapReduce Limitations:
- Low-level API requiring manual optimizations for performance.
MapReduce缺陷:低级API需手动优化性能。
- Low-level API requiring manual optimizations for performance.
- Need for Declarative & Advanced Processing:
- Support ETL across structured/semi-structured data (e.g., JSON, Hive).
需求1:支持多数据源ETL(如JSON、Hive)。 - Enable complex analytics (ML, graph processing) not easily expressible in SQL.
需求2:支持复杂分析(机器学习、图计算)。
- Support ETL across structured/semi-structured data (e.g., JSON, Hive).
Core Goals
Main goal: Extend relational processing to cover native RDDs in Spark and a much
wider range of data sources, by:
- Unified Relational API:
- Extend relational operations to native RDDs and external data sources (e.g., Hive, JSON).
统一关系型API:支持对RDD和外部数据源(如Hive、JSON)执行关系操作。
- Extend relational operations to native RDDs and external data sources (e.g., Hive, JSON).
- High Performance:
- Leverage DBMS techniques (e.g., query optimization, columnar storage).
高性能:采用数据库优化技术(如查询优化、列式存储)。
- Leverage DBMS techniques (e.g., query optimization, columnar storage).
- Extensibility:
- Support new data sources (semi-structured, federated databases).
可扩展性:兼容半结构化数据、联合查询数据库。 - Integrate advanced analytics (e.g., MLlib, GraphX).
集成分析库:无缝整合机器学习、图处理库。
- Support new data sources (semi-structured, federated databases).
SparkSQL 尝试在这两种模式之间架起一座桥梁
DataFrame API,可对外部数据源和 Spark 内置 RDD 执行关系操作。
催化剂是一个高度可扩展的优化器。
SparkSQL
-
It runs as a library on top of Spark
它作为 Spark 的一个库运行 -
It exposes interfaces accessible through JDBC and command-line
它暴露了可通过 JDBC 和命令行访问的接口。 -
It exposes the DataFrame API which is accessible through different programming languages
它公开了可通过不同编程语言访问的 DataFrame API
DataFrame
-
Definition:
- A distributed collection of rows with a schema, similar to an RDBMS table.
定义:带模式的分布式数据集,类似关系型数据库表。 - Constructed from RDDs or external sources (Hive, JSON, etc.).
数据来源:可从RDD或外部数据源(Hive、JSON)构建。
- A distributed collection of rows with a schema, similar to an RDBMS table.
-
Lazy Evaluation:
- Logical plans are optimized and executed only when output actions (e.g.,
count()
) are called.
惰性求值:逻辑计划延迟执行,触发输出操作(如count()
)时优化执行。
- Logical plans are optimized and executed only when output actions (e.g.,
-
Data Model:
- Primitive Types:
int
,string
,date
,timestamp
, etc.
基础类型:整型、字符串、日期等。 - Complex Types: Structs, arrays, maps, unions.
复杂类型:结构体、数组、映射、联合类型。 - Native Interoperability: Directly access in-memory objects without data conversion.
原生互操作性:直接访问内存对象,避免格式转换开销。
- Primitive Types:
Operations on DataFrames
-
Domain-Specific Language (DSL):
-
Example:
young = users.where(users.age < 18).groupBy("dept").count()
-
Builds an Abstract Syntax Tree (AST) optimized by Catalyst.
领域特定语言:通过操作符(如where
,groupBy
)构建AST,由Catalyst优化。
-
-
SQL Queries:
-
Register DataFrames as temporary tables:
users.registerTempTable("users_table")
-
Execute SQL:
SELECT dept, COUNT(*) FROM users_table WHERE age < 18 GROUP BY dept
SQL支持:DataFrame注册为临时表后可直接执行SQL查询。
-
Catalyst Optimizer
- Rule-Based Optimization:
- Uses Scala’s pattern matching to transform ASTs.
基于规则的优化:通过Scala模式匹配优化AST。 - Example: Constant folding (
x + (1+2)
→x+3
).
优化示例:常量折叠(简化表达式)。
- Uses Scala’s pattern matching to transform ASTs.
- Extensibility:
- Add custom optimization rules (e.g., data source-specific predicate pushdown).
可扩展性:支持自定义规则(如数据源谓词下推)。
- Add custom optimization rules (e.g., data source-specific predicate pushdown).
Querying Native Datasets
SparkSQL infers the schema of the native objects of a programming language automatically
SparkSQL 自动推断编程语言本地对象的模式
- Schema Inference:
- Java: Uses reflection to infer schemas from objects.
Java模式推断:通过反射自动推断对象结构。 - Python: Samples data to infer types.
Python模式推断:通过采样数据推断类型。
- Java: Uses reflection to infer schemas from objects.
- Direct Data Access:
- Operate on native objects (e.g., Java/Python classes) without serialization.
直接数据访问:无需序列化即可操作原生对象。
- Operate on native objects (e.g., Java/Python classes) without serialization.
- Benefits of automatic schema inference
- Run relational operations on existing Spark programs.
在现有 Spark 程序上运行关系操作。 - Combine RDDs with external structured data (e.g., HIVE tables)
将 RDD 与外部结构化数据(如 HIVE 表)相结合
- Run relational operations on existing Spark programs.
Advantages over Traditional Systems
- Language Flexibility:
- Combine relational operations with procedural code (loops, conditionals).
语言灵活性:关系操作与编程语言控制结构(循环、条件)结合。
- Combine relational operations with procedural code (loops, conditionals).
- Cross-Language Optimization:
- Catalyst optimizes logical plans uniformly across Python, Scala, and Java.
跨语言优化:Catalyst统一优化多语言逻辑计划。
- Catalyst optimizes logical plans uniformly across Python, Scala, and Java.
- Early Error Detection:
- Validate logical plans during DataFrame construction (e.g., invalid column names).
错误提前检测:DataFrame构建时即验证逻辑计划(如无效列名)。
- Validate logical plans during DataFrame construction (e.g., invalid column names).
Use Cases
- ETL Pipelines:
- Transform data between Hive, JSON, and RDBMS using SQL-like syntax.
ETL流程:用类SQL语法跨数据源转换数据。
- Transform data between Hive, JSON, and RDBMS using SQL-like syntax.
- Advanced Analytics:
- Train ML models using DataFrames for feature processing.
高级分析:基于DataFrame进行特征工程与模型训练。
- Train ML models using DataFrames for feature processing.
- Hybrid Workloads:
- Combine batch processing (Spark) with interactive queries (via caching).
混合负载:批处理与交互式查询结合(通过缓存加速)。
- Combine batch processing (Spark) with interactive queries (via caching).
Week 8
Data Streams
Definition
- An ordered and potentially infinite sequence of elements:
<e1, e2, e3, …>
.
一个有序且可能无限的序列:<e1, e2, e3, …>
- Each element (data point) can represent:
每个元素(数据点)可以表示:- An event (e.g., "page A visited" in web click-stream data).
一个事件(例如,网页点击流数据中的“页面A被访问”)。 - A value (e.g., stock price "10.3" in stock quote data).
一个值(例如,股票报价数据中的股票价格“10.3”)。 - A record (e.g., customer record
[item, qty, price]
in marketing data).
一条记录(例如,营销数据中的客户记录[商品, 数量, 价格]
)。 - A graph (e.g., in social network data streams).
一个图(例如,社交网络数据流中的图结构)。
- An event (e.g., "page A visited" in web click-stream data).
Properties
- Potentially Infinite: The stream may never end.
可能无限:数据流可能永远不会结束。- Transient: Data might not be stored on disk; it is processed in real-time.
瞬时性:数据可能不会存储在磁盘上,而是实时处理。 - Single Pass: Data can only be processed once as it arrives.
单次处理:数据只能在到达时处理一次。 - Summaries Only: Only summaries or aggregates can be stored due to high velocity and volume.
仅存储摘要:由于数据的高速度和大量性,只能存储摘要或聚合结果。 - Real-Time Processing: Data is processed in main memory as it arrives.
实时处理:数据在到达时在内存中处理。
- Transient: Data might not be stored on disk; it is processed in real-time.
- Dynamic Nature:
- Incremental updates: New data continuously arrives.
增量更新:新数据不断到达。 - Concept drift: The underlying data distribution may change over time.
概念漂移:数据的底层分布可能随时间变化。 - Forgetting old data: Older data may be discarded to focus on recent trends.
遗忘旧数据:旧数据可能被丢弃以关注最新趋势。
- Incremental updates: New data continuously arrives.
- Temporal Order: The order of data points may be important (e.g., time-series data).
时间顺序:数据点的时间顺序可能很重要(例如,时间序列数据)。
Management
- Real-Time Processing: Use in-memory processing to handle high-velocity data.
实时处理:使用内存处理来应对高速数据。 - Windowing: Divide the stream into time-based windows (e.g., tumbling windows) for analysis.
窗口化:将数据流划分为基于时间的窗口(例如,滚动窗口)进行分析。 - Sampling: Select a subset of data points for analysis when the stream is too large.
采样:当数据流过大时,选择一部分数据点进行分析。 - Aggregation: Compute summaries (e.g., averages, counts) over windows or subsets of data.
聚合:在窗口或数据子集上计算摘要(例如,平均值、计数)。 - Querying: Use stream processing engines (e.g., Apache Kafka, Apache Flink) to run queries on the fly.
查询:使用流处理引擎(如Apache Kafka、Apache Flink)实时运行查询。
Common Queries (Problems) on Streams
- Sampling: Selecting a representative subset of data points for analysis.
采样:选择代表性的数据点子集进行分析。 - Window Queries: Analyzing data within specific time windows (e.g., average temperature over the last 30 seconds).
窗口查询:在特定时间窗口内分析数据(例如,过去30秒的平均温度)。 - Filtering: Extracting specific data points based on conditions (e.g., temperature > 100 degrees).
过滤:根据条件提取特定数据点(例如,温度 > 100度)。 - Aggregation: Computing summaries (e.g., average, sum) over a stream or window.
聚合:在数据流或窗口上计算摘要(例如,平均值、总和)。 - Concept Drift Detection: Identifying changes in the underlying data distribution.
概念漂移检测:识别数据底层分布的变化。
Example
-
Bluetooth Sensor Network:
蓝牙传感器网络:-
Each sensor outputs a stream of JSON data (e.g., temperature, humidity).
每个传感器输出JSON数据流(例如,温度、湿度)。 -
Query: Monitor average temperature every 30 seconds and display sensor name and average temperature if it exceeds 100 degrees.
查询:每30秒监控平均温度,如果平均温度超过100度,则显示传感器名称和平均温度。 -
SQL-like query:
SELECT System.Timestamp AS OutputTime, dspl AS SensorName, Avg(temp) AS AvgTemperature INTO output FROM InputStream TIMESTAMP BY time GROUP BY TumblingWindow(second, 30), dspl HAVING Avg(temp) > 100
-
Data Stream Management System (DSMS)
Common Applications of Stream Queries
- Mining Query Streams:
查询流挖掘:- Example: Google wants to identify queries that are more frequent today than yesterday.
示例:Google希望识别今天比昨天更频繁的查询。
- Example: Google wants to identify queries that are more frequent today than yesterday.
- Mining Click Streams:
点击流挖掘:- Example: Yahoo wants to detect pages receiving an unusual number of hits in the past hour.
示例:Yahoo希望检测过去一小时中点击量异常增加的页面。
- Example: Yahoo wants to detect pages receiving an unusual number of hits in the past hour.
- Mining Social Network News Feeds:
社交网络新闻流挖掘:- Example: Identifying trending topics on platforms like Twitter or Facebook.
示例:在Twitter、Facebook等平台上识别热门话题。
- Example: Identifying trending topics on platforms like Twitter or Facebook.
Sampling from Data Streams
Objective
- Construct a sample from a data stream such that queries on the sample yield statistically representative results for the entire stream.
从数据流中构建一个样本,使得对样本的查询能够代表整个流的统计结果。 - Example Application:
- Stream from a search query engine:
(u1, q1, t1), (u1, q1, t2), (u2, q2, t3), …
搜索引擎的查询流:(u1, q1, t1), (u1, q1, t2), (u2, q2, t3), …
- Query: What fraction of a typical user’s queries were repeated over the last month?
查询:典型用户在过去一个月中重复查询的比例是多少? - Constraint: Cannot store more than 1/10th of the stream.
约束:不能存储超过流数据的1/10。
- Stream from a search query engine:
Solution
- Store 1/10th of each user’s queries:
存储每个用户查询的1/10:- Generate a random integer in
[0..9]
for each query.
为每个查询生成一个[0..9]
的随机整数。 - Store the tuple
(u, q, t)
if the integer is0
; otherwise, discard it.
如果整数为0
,则存储元组(u, q, t)
;否则丢弃。
- Generate a random integer in
- Problem:
- For users with many queries, 1/10th of their queries will be in the sample.
对于查询量大的用户,其查询的1/10会被存储在样本中。 - Does this work for duplicate queries?
是否适用于重复查询? - Suppose each user issues
x
queries once andd
queries twice.
假设每个用户发出x
次单次查询和d
次重复查询。 - Correct answer for duplicate queries:
d / (x + d)
.
重复查询的正确比例:d / (x + d)
。 - Sample-based answer:
d / (10x + 19d)
.
基于样本的比例:d / (10x + 19d)
。 - Issue: The sample-based answer is significantly smaller than the correct answer (e.g., 1.9% vs. 23.1% for
x=1000, d=300
).
问题:基于样本的比例显著小于正确比例(例如,x=1000, d=300
时,1.9% vs. 23.1%)。
- For users with many queries, 1/10th of their queries will be in the sample.
Improved Solution
改进方案
- Main Idea: Sample users instead of queries.
核心思想:对用户进行采样,而不是对查询进行采样。 - Steps:
- Select 1/10th of users (assuming this is ≤10% of the stream).
选择1/10的用户(假设不超过流数据的10%)。 - For each selected user, store all their queries in the sample.
对每个选中的用户,存储其所有查询。 - Use a hash function
h: user → {1, 2, …, 10}
.
使用哈希函数h: user → {1, 2, …, 10}
。 - If
h(user) = 1
, accept their queries; otherwise, discard them.
如果h(user) = 1
,则接受其查询;否则丢弃。 - Count duplicates in the sample:
d / (x + d)
for each user.
计算样本中的重复查询比例:d / (x + d)
。
- Select 1/10th of users (assuming this is ≤10% of the stream).
General Problem
一般问题
-
Stream of tuples with keys (e.g.,
(user, search, time)
; key isuser
).
带有键的元组流(例如,(用户, 搜索, 时间)
;键为用户
)。 -
To get a sample of
a/b
fraction of the stream:
获取流的a/b
比例样本:- Hash each tuple’s key uniformly into
b
buckets.
将每个元组的键均匀哈希到b
个桶中。 - Pick the tuple if its hash value is in
{1, …, a}
.
如果哈希值在{1, …, a}
中,则选择该元组。
- Hash each tuple’s key uniformly into
-
Example: How to generate a 30% sample?
示例:如何生成30%的样本? -
Challenge: Handling new users while maintaining a storage budget.
挑战:在存储预算内处理新用户。
Reservoir Sampling
蓄水池采样(Reservoir Sampling)
- Objective: Uniformly sample
s
tuples from a stream(e1, e2, …)
containing>s
tuples.
目标:从包含>s
个元组的流(e1, e2, …)
中均匀采样s
个元组。 - Algorithm:
- Add the first
s
tuples to a reservoirR
.
将前s
个元组加入蓄水池R
。 - For
j > s
:
对于j > s
: - With probability
s/j
, replace a random entry inR
with thej-th
tuple.
以概率s/j
,用第j
个元组替换R
中的一个随机元组。 - At
j = n
, returnR
.
当j = n
时,返回R
。
- Add the first
- Proof by Induction (for
s = 1
):
归纳证明(s = 1
时):- Base case: For
j = 1
, the tuple is in the sample with probability1/1 = 1
.
基础情况:对于j = 1
,元组被选中的概率为1/1 = 1
。 - Inductive step:
归纳步骤: - Assume for
j = n
, each tuple is in the sample with probability1/n
.
假设对于j = n
,每个元组被选中的概率为1/n
。 - For
j = n + 1
:
对于j = n + 1
:- The new tuple is selected with probability
1/(n + 1)
.
新元组被选中的概率为1/(n + 1)
。 - An old tuple remains in
R
with probability1 - 1/(n + 1)
.
旧元组保留在R
中的概率为1 - 1/(n + 1)
。 - Thus, each tuple in
R
has probability1/(n + 1)
.
因此,每个元组在R
中的概率为1/(n + 1)
。
- The new tuple is selected with probability
- Base case: For
Queries Over a (Long) Sliding Window
Problem Definition
- Many queries focus on a window containing the N most recent elements.
许多查询关注包含最近N个元素的窗口。 - Challenge: N is so large that the data cannot be stored in memory or even on disk.
挑战:N非常大,数据无法存储在内存或磁盘中。 - Example:
- Amazon tracks whether a product X was sold in the n-th transaction (0/1 stream).
Amazon跟踪产品X在第n次交易中是否被售出(0/1流)。 - Query: How many times was X sold in the last k transactions?
查询:X在最近k次交易中被售出了多少次?
- Amazon tracks whether a product X was sold in the n-th transaction (0/1 stream).
Sliding Window Basics
- A sliding window represents the most recent N elements of a stream.
滑动窗口表示流中最近的N个元素。 - Example:
- Stream:
0 1 0 1 0 1 0 0 0 1 0 1 0 1 1 0 0 1 0 1 1 1
- Window size (N = 6): The last 6 bits are considered for queries.
窗口大小(N = 6):最近的6个比特用于查询。
- Stream:
Obvious Solution
- Store the most recent N bits.
存储最近的N个比特。 - When a new bit arrives, discard the (N+1)th bit.
当新比特到达时,丢弃第(N+1)个比特。 - Issue: Storing N bits is infeasible for large N (e.g., N = 1 billion).
问题:对于大N(例如N = 10亿),存储N个比特不可行。
Approximate Solutions
- Since storing N bits is impractical, we aim for approximate answers.
由于存储N个比特不现实,目标是获得近似答案。 - Naïve Method:
简单方法:- Maintain two counters:
维护两个计数器: - S: Total number of 1s in the stream.
S:流中1的总数。 - Z: Total number of 0s in the stream.
Z:流中0的总数。 - Estimate the number of 1s in the last N bits:
N * (S / (S + Z))
.
估计最近N个比特中1的数量:N * (S / (S + Z))
。 - Problem: Assumes uniformity, which may not hold (e.g.,
0 0 0 0 0 0 0 0 0 0 1 1
yields incorrect results).
问题:假设均匀分布,可能不成立(例如,0 0 0 0 0 0 0 0 0 0 1 1
会导致错误结果)。
- Maintain two counters:
DGIM Method
- Key Features:
关键特性:- Does not assume uniformity.
不假设均匀分布。 - Provides a bounded approximation error (within 1 + r of the true answer).
提供有界近似误差(在真实答案的1 + r范围内)。 - Stores only O(log²N) bits.
仅存储O(log²N)比特。
- Does not assume uniformity.
- Bucket Representation:
- Each bucket contains:
每个桶包含: - Timestamp of its most recent element (O(logN) bits).
最近元素的时间戳(O(logN)比特)。 - Size (number of 1s, a power of 2, O(log(logN)) bits).
大小(1的数量,2的幂,O(log(logN))比特)。 - Rules:
- The right end of a bucket is always a 1.
桶的右端始终为1。 - Every 1 is in a bucket.
每个1都在一个桶中。 - No position is in more than one bucket.
没有位置属于多个桶。 - There are one or two buckets of any given size (up to a maximum size).
每种大小的桶有一到两个(最大大小除外)。 - Bucket sizes increase from left to right.
桶的大小从左到右递增。
- The right end of a bucket is always a 1.
- Each bucket contains:
Updating Buckets
更新桶:
- When a new bit arrives:
当新比特到达时:- Drop the oldest bucket if its timestamp is outside the window.
如果最旧桶的时间戳超出窗口,则丢弃它。 - If the new bit is 1:
如果新比特为1:- Create a new bucket of size 1.
创建一个大小为1的新桶。 - If there are three buckets of the same size, merge the oldest two into a larger bucket.
如果有三个相同大小的桶,合并最旧的两个为一个更大的桶。
- Create a new bucket of size 1.
- Repeat the merging process for larger bucket sizes.
对更大的桶大小重复合并过程。
- Drop the oldest bucket if its timestamp is outside the window.
Querying
- To estimate the number of 1s in the last N bits:
估计最近N个比特中1的数量:- Sum the sizes of all buckets except the last (leftmost) bucket.
求和所有桶的大小(除了最后一个桶)。 - Add half the size of the last bucket.
加上最后一个桶大小的一半。
- Sum the sizes of all buckets except the last (leftmost) bucket.
- Error Bound:
误差界限:- The error is at most 50% due to the uncertainty of how many 1s in the last bucket are within the window.
由于不确定最后一个桶中有多少1在窗口内,误差最多为50%。
- The error is at most 50% due to the uncertainty of how many 1s in the last bucket are within the window.
Error Bound Proof
-
Objective:
- Prove that with 1 or 2 buckets of each size, the error in estimating the number of 1s in the sliding window is at most 50%.
证明当每种大小的桶有1或2个时,滑动窗口中1的数量的估计误差最多为50%。
- Prove that with 1 or 2 buckets of each size, the error in estimating the number of 1s in the sliding window is at most 50%.
-
Case 1: Estimate ≥ Correct Answer:
情况1:估计 ≥ 正确答案:-
Assumption: The last bucket has size
2^r
.
假设:最后一个桶的大小为2^r
。 -
Scenario:
- Only the rightmost bit of the last bucket is within the window.
只有最后一个桶的最右端比特在窗口内。 - For sizes smaller than
2^r
, there is only one bucket of each size.
对于小于2^r
的大小,每种大小只有一个桶。
- Only the rightmost bit of the last bucket is within the window.
-
True Sum:
真实总和:- The true number of 1s in the window is at least:
窗口中1的真实数量至少为:
1 + 2 + 4 + ... + 2^(r-1) = 2^r - 1
- This is because the buckets of sizes
1, 2, 4, ..., 2^(r-1)
contribute at least this many 1s.
这是因为大小为1, 2, 4, ..., 2^(r-1)
的桶至少贡献了这么多1。
- The true number of 1s in the window is at least:
-
Error:
误差:-
The estimate is at most
2^r
(half of the last bucket is added).
估计值最多为2^r
(加上最后一个桶的一半)。 -
Thus, the error is at most:
因此,误差最多为:
(2^r) - (2^r - 1) = 1
- Relative to the true sum, the error is at most 50%.
相对于真实总和,误差最多为50%。
-
-
-
Case 2: Estimate < Correct Answer:
情况2:估计 < 正确答案:- Scenario:
场景:- The estimate misses at most half of the last bucket (
2^(r-1)
1s).
估计值最多错过最后一个桶的一半(2^(r-1)
个1)。
- The estimate misses at most half of the last bucket (
- Error:
- The error is at most
2^(r-1)
.
误差最多为2^(r-1)
。 - Relative to the true sum, the error is at most 50%.
相对于真实总和,误差最多为50%。
- The error is at most
- Scenario:
-
Conclusion:
- With 1 or 2 buckets of each size, the error in the DGIM method is bounded by 50%.
当每种大小的桶有1或2个时,DGIM方法的误差界限为50%。
- With 1 or 2 buckets of each size, the error in the DGIM method is bounded by 50%.
Further Reducing the Error
进一步减少误差
- Generalization:
泛化:- Instead of maintaining 1 or 2 buckets of each size, allow r-1 or r buckets (where
r > 2
).
不再限制每种大小的桶为1或2个,而是允许r-1或r个桶(其中r > 2
)。 - For the largest size buckets, allow any number between 1 and r.
对于最大大小的桶,允许1到r个。
- Instead of maintaining 1 or 2 buckets of each size, allow r-1 or r buckets (where
- Error Bound:
误差界限- The error reduces to O(1/r).
误差减少到O(1/r)。 - By choosing
r
appropriately, we can trade off between:
通过选择合适的r
,可以在以下两者之间进行权衡:- Storage: More buckets require more bits.
存储:更多的桶需要更多的比特。 - Accuracy: Larger
r
reduces the error.
准确性:更大的r
减少误差。
- Storage: More buckets require more bits.
- The error reduces to O(1/r).
- Tradeoff:
权衡- Smaller
r
: Less storage, higher error.
较小的r
:存储较少,误差较大。 - Larger
r
: More storage, lower error.
较大的r
:存储较多,误差较小。
- Smaller
Week 9
More algorithms for streams
Filtering a data stream: Bloom filters
Select elements with property x from stream
从数据流中选择具有 x 属性的元素
Definition:
- Problem: Given a data stream of tuples (e.g., (user1, item1)) and a list of keys S (e.g., [user100, user150]), determine which tuples in the stream are in S.
给定一个数据流(例如,(user1, item1))和一个键列表 S(例如,[user100, user150]),确定数据流中哪些元组在 S 中。 - Challenge: If the list S is too large to fit into memory (e.g., millions of tuples), traditional methods like hash tables become infeasible.
如果列表 S 太大,无法放入内存(例如,数百万个元组),传统的哈希表方法将不可行。
Applications:
- Mobile Push Notifications:
移动推送通知:- Tokens are used to route notifications to users.
使用令牌将通知路由给用户。 - Stream of tokens is consumed, and notifications are sent if the token matches a user's interest.
消费令牌流,如果令牌与用户的兴趣匹配,则发送通知。 - Example: If a user is interested in [app1, app2], and the stream contains [token1, token3, token5, ...], send a notification if token1 or token2 is in the stream.
示例:如果用户对 [app1, app2] 感兴趣,且流中包含 [token1, token3, token5, ...],则如果 token1 或 token2 在流中,发送通知。
- Tokens are used to route notifications to users.
- Email Spam Filtering:
电子邮件垃圾邮件过滤:- A white-list of 1 billion email addresses is used to filter spam.
使用包含 10 亿个电子邮件地址的白名单来过滤垃圾邮件。 - Stream of email addresses is consumed, and emails are forwarded if they are in the white-list.
消费电子邮件地址流,如果地址在白名单中,则转发邮件。 - Example: If the stream contains [cnn@gmail.com, xyz@gmail.com, abc@gmail.com, ...], forward abc@gmail.com if it is in the white-list.
示例:如果流中包含 [cnn@gmail.com, xyz@gmail.com, abc@gmail.com, ...],且 abc@gmail.com 在白名单中,则转发该邮件。
- A white-list of 1 billion email addresses is used to filter spam.
First Idea: Bit Array with Hash Function
初步想法:使用哈希函数的位数组
- Setup:
设置:- Create a bit array B of size n, initialized to 0.
创建一个大小为 n 的位数组 B,初始化为 0。 - Choose a hash function h with range [0, n).
选择一个范围为 [0, n) 的哈希函数 h。 - Hash each element of S to a bucket in B and set the corresponding bit to 1.
将 S 中的每个元素哈希到 B 中的一个桶,并将对应的位设置为 1。
- Create a bit array B of size n, initialized to 0.
- Filtering:
过滤:- For each element in the stream, hash it using h.
对于流中的每个元素,使用 h 进行哈希。 - If the corresponding bit in B is 1, output the element (i.e., it is in S).
如果 B 中对应的位为 1,则输出该元素(即在 S 中)。
- For each element in the stream, hash it using h.
- Properties:
特性:- No False Negatives: If an element is in S, it will always be output.
无假阴性:如果元素在 S 中,它总是会被输出。 - False Positives: If an element is not in S, it may still be output with some probability.
假阳性:如果元素不在 S 中,它仍可能以一定概率被输出。
- No False Negatives: If an element is in S, it will always be output.
- Probability of False Positives:
- Probability that a bucket hashes an element: 1/n.
一个桶哈希到某个元素的概率 - Probability that a bucket does not hash an element: 1−1/n.
一个桶不哈希到某个元素的概率 - Probability that a bucket does not hash any element after hashing all |S| elements: .
在哈希所有 |S| 个元素后,一个桶不哈希到任何元素的概率 - Probability that a bucket hashes at least one element after hashing all |S| elements: 1− 上面的公式.
在哈希所有 |S| 个元素后,一个桶哈希到至少一个元素的概率 - Using the approximation , the probability of a false positive is.
- Probability that a bucket hashes an element: 1/n.
Bloom filter
- Purpose: Probabilistic set membership testing.
目的: 用于概率集合成员测试。 - Construct a data structure to answer that is:
- Fast (Faster than searching through S).
- Small (Smaller than explicit representation).
- Properties:
- Zero False Negatives: If an element is in S, it will always be reported as present.
无假阴性: 如果元素在 S 中,它总是会被报告为存在。 - Nonzero False Positives: Elements not in S may be reported as present.
有假阳性: 不在 S 中的元素可能被报告为存在。
- Zero False Negatives: If an element is in S, it will always be reported as present.
Given a set of keys S that we want to filter
-
Create a bit array B of n bits, initially all 0
创建一个包含 n 个比特的比特数组 B,初始值均为 0 -
Choose k hash functions h1 ,…, hk with range [0,n)
选择 k 个哈希函数 h,..., h,范围为 [0,n) -
Hash each member of $s \in S$
-
When a stream element with key y arrives
Bloom filter example
Key Parameters
- Size of Stream |S|: Higher |S| increases false positive rate.
流的大小 |S|: 较大的 |S| 会增加假阳性率。 - Size of Bloom Filter (n): Larger n reduces false positive rate but requires more space.
布隆过滤器大小 (n): 较大的 n 会降低假阳性率,但需要更多空间。 - Number of Hash Functions (k): More hash functions reduce false positive rate (up to optimal k).
哈希函数数量 (k): 更多的哈希函数会降低假阳性率(直到达到最优 k)。
Other uses of Bloom filter
- Google BigTable: Google's column-oriented NoSQL Big Data database service. It's the same database that powers many core Google services, including Search, Analytics, Maps, and Gmail.
谷歌面向列的 NoSQL 大数据数据库服务。正是这个数据库为搜索、分析、地图和 Gmail 等谷歌核心服务提供了支持。 - Apache Hbase: open-source, distributed, versioned, non-relational database modeled after Google's Bigtable
开源、分布式、版本化、非关系型数据库,以 Google 的 Bigtable 为模型 - Apache Cassandra : Key-value NoSQL database
- Postgresal : Object-Relational DBMS
use Bloom filters to reduce the disk lookups for non-existent rows or columns
Counting distinct elements: Flajolet-Martin
Number of distinct elements in the last k elements of the stream
- Problem: Given a data stream of elements from a set of size N, maintain a count of the number of distinct elements seen so far.
- Challenge: Traditional methods like hash tables are infeasible due to space constraints.
Counting distinct elements:弗拉约莱-马丁
数据流最后 k 个元素中的不同元素数
-
问题:给定大小为 N 的集合中元素的数据流,计算迄今为止看到的不同元素的数量。
-
挑战:由于空间限制,哈希表等传统方法不可行。
Applications:
- Big Data: Used in Spark's
approxCountDistinct
for approximate counting. - Security Monitoring: Detecting more than X attempts.
- Virus Propagation: Estimating the spread rate.
- Distributed Computing: Combining sketches to find total distinct elements and common elements (e.g., document overlap for plagiarism).
应用:
-
大数据:在 Spark 的 "approxCountDistinct "中用于近似计数。
-
安全监控:检测超过 X 次的尝试。
-
病毒传播:估算传播速度。
-
分布式计算:将草图组合起来,找出不同元素和共同元素的总和(例如,查找文件重叠以发现剽窃行为)。
Estimating moments: AMS method
Estimate std. dev. of last k elements
What are frequency moments
- f0: 不同元素的数量。
- f1: 总频率(流的长度)。
Why they are important:
频率矩的重要性
- 数据倾斜: 表示数据分布的倾斜程度,在并行数据库应用中非常有用。
- 惊奇数 S*S*: f2f2 衡量分布的不均匀程度。
AMS method
📘 频率矩与 AMS 方法详细笔记总结
1. 什么是频率矩(Frequency Moments)
📌 定义:
对于包含 NNN 种不同元素 a1,...,aNa_1, ..., a_Na1,...,aN 的数据流,其中每个元素 aia_iai 出现了 mim_imi 次,第 kkk 阶频率矩定义为:
fk=∑i=1Nmikfk = \sum{i=1}^N m_i^kfk=i=1∑Nmik
📌 特例:
- f0f_0f0:不同元素的个数(distinct elements count)
f0=∑i=1N1=Nf0 = \sum{i=1}^{N} 1 = Nf0=∑i=1N1=N - f1f_1f1:数据流总长度(total stream length)
f1=∑i=1Nmif1 = \sum{i=1}^{N} m_if1=∑i=1Nmi - f2f_2f2:用于衡量数据分布的不均衡程度(data skew)
📌 示例(计算 f2f_2f2):
数据流:
css
复制编辑
a b c b d a c d a b d c a a b
频数:
- a: 5 次
- b: 4 次
- c: 3 次
- d: 3 次
计算:
f2=52+42+32+32=59f_2 = 5^2 + 4^2 + 3^2 + 3^2 = 59f2=52+42+32+32=59
2. 为什么频率矩重要(Why Frequency Moments Are Important)
📌 应用价值:
- 判断数据倾斜(data skew):用于并行数据库的任务划分、负载均衡等。
- 选择合适的数据划分算法。
- f2f_2f2 被称为 惊奇数(Surprise number, SSS),衡量分布不均衡的程度。
📌 示例:
假设数据流长度为 100,共有 11 个不同的元素。
-
情况一:元素频数为 [10,9,9,...,9][10, 9, 9, ..., 9][10,9,9,...,9],相对均匀
f2=910f_2 = 910f2=910
-
情况二:频数为 [90,1,1,...,1][90, 1, 1, ..., 1][90,1,1,...,1],极度倾斜
f2=8110f_2 = 8110f2=8110
3. AMS 方法(Alon-Matias-Szegedy Algorithm)
AMS 是一种用于估算频率矩的抽样算法,尤其适合处理 流数据模型(streaming model)。
✅ 特点:
- 能估算所有阶的频率矩 fkf_kfk
- 空间复杂度小,不需记录全部数据流
- 我们重点学习 估算 f2f_2f2 的过程
4. AMS 方法估计 f2f_2f2 的步骤
🧩 Step 1: 变量设计
我们随机挑选多个变量 XiX_iXi,每个变量包含两部分:
- X.elX.elX.el:所选位置对应的元素
- X.valX.valX.val:从该位置开始,这个元素出现的次数(频数)
🧩 Step 2: 从流中选一个随机位置 ttt
- 假设数据流长度为 nnn
- 随机挑选一个时间点 t<nt < nt<n
- 记录这个位置上的元素 iii,将 X.el=iX.el = iX.el=i
- 继续向后扫流,统计该元素 iii 的频数 ccc
🧩 Step 3: 单个估计量计算
对每个变量 XiX_iXi,计算估计值:
Si=n⋅(2⋅Xi.val−1)S_i = n \cdot (2 \cdot X_i.val - 1)Si=n⋅(2⋅Xi.val−1)
🧩 Step 4: 多个变量平均
最终估算值:
f^2=1k∑i=1kSi=avg(S1,S2,...,Sk)\hat{f}2 = \frac{1}{k} \sum{i=1}^{k} S_i = \text{avg}(S_1, S_2, ..., S_k)f^2=k1i=1∑kSi=avg(S1,S2,...,Sk)
5. 示例(Example)
假设数据流长度 n=15n = 15n=15
- X1=(c,3)⇒S1=15⋅(2⋅3−1)=75X_1 = (c, 3) \Rightarrow S_1 = 15 \cdot (2 \cdot 3 - 1) = 75X1=(c,3)⇒S1=15⋅(2⋅3−1)=75
- X2=(d,2)⇒S2=15⋅(2⋅2−1)=45X_2 = (d, 2) \Rightarrow S_2 = 15 \cdot (2 \cdot 2 - 1) = 45X2=(d,2)⇒S2=15⋅(2⋅2−1)=45
- X3=(a,2)⇒S3=45X_3 = (a, 2) \Rightarrow S_3 = 45X3=(a,2)⇒S3=45
则:
f^2=13(75+45+45)=55\hat{f}_2 = \frac{1}{3}(75 + 45 + 45) = 55f^2=31(75+45+45)=55
真实值为 59,误差较小,说明估计较准确。
6. AMS 方法为何有效(Why AMS Works)
📘 定义:
- e(i)e(i)e(i):流中第 iii 个位置的元素
- c(i)c(i)c(i):从位置 iii 开始,e(i)e(i)e(i) 元素出现的次数
📘 数学期望推导:
估算值为:
E[S]=1n∑i=1nn⋅(2⋅c(i)−1)=∑i=1Nmi2=f2E[S] = \frac{1}{n} \sum{i=1}^n n \cdot (2 \cdot c(i) - 1) = \sum{i=1}^N m_i^2 = f_2E[S]=n1i=1∑nn⋅(2⋅c(i)−1)=i=1∑Nmi2=f2
所以,AMS 是无偏估计器(unbiased estimator)
7. 拓展:估算任意阶频率矩 fkf_kfk
使用类似逻辑,只需更换估计函数:
-
若 k=2k = 2k=2:估计公式为 n⋅(2c−1)n \cdot (2c - 1)n⋅(2c−1)
-
若 k≥3k \geq 3k≥3:估计公式为
n⋅(ck−(c−1)k)n \cdot (c^k - (c - 1)^k)n⋅(ck−(c−1)k)
8. 处理数据流长度 nnn 不确定的情况
❓ 问题:
- 流长度不确定时,不能提前选随机位置 ttt
- 如果太早选,可能覆盖面小
- 太晚选,可能重复元素太多
✅ 解决方案:蓄水池抽样(Reservoir Sampling)
- 选择前 kkk 个元素
- 当第 nnn 个元素到达(n>kn > kn>k)时,以概率 k/nk/nk/n 替换已有变量
- 保证选择位置服从均匀分布
✅ 总结
概念 | 解释 |
---|---|
fkf_kfk | 第 kkk 阶频率矩,衡量元素频率的分布情况 |
f2f_2f2 | 惊奇数,衡量数据分布的不均匀性 |
AMS 方法 | 一种空间高效、适合流数据的频率矩估计方法 |
关键公式 | S=n⋅(2⋅c−1)S = n \cdot (2 \cdot c - 1)S=n⋅(2⋅c−1),f^2=avg(S1,...,Sk)\hat{f}_2 = \text{avg}(S_1, ..., S_k)f^2=avg(S1,...,Sk) |
优点 | 无偏估计,内存需求低,可扩展至 fkf_kfk |
Week 10
Spark streaming
Definition
Definition: An extension of the core Spark API
定义: Spark核心API的扩展。
Purpose: Enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
目的: 实现可扩展、高吞吐量、容错的实时数据流处理。
Key Features:
- Scales to hundreds of nodes.
可扩展至数百个节点。 - Achieves second-scale latencies.
实现秒级延迟。 - Efficiently recovers from failures.
高效地从故障中恢复。 - Integrates with batch and interactive processing.
与批处理和交互式处理集成。
Data Sources: Kafka, Flume, HDFS/S3, Kinesis, Twitter.
数据源: Kafka, Flume, HDFS/S3, Kinesis, Twitter。
Output Destinations: HDFS, Databases, Dashboards.
输出目的地: HDFS, 数据库, 仪表盘。
Advantages of a Unified Stack
-
Interactive Data Exploration:
交互式数据探索:-
Use the Spark shell to explore data interactively and identify issues.
使用Spark Shell交互式地探索数据并发现问题。 -
Example:
$ ./spark-shell scala> val file = sc.hadoopFile("smallLogs") scala> val filtered = file.filter(_.contains("ERROR")) scala> val mapped = filtered.map(...)
-
-
Code Reusability:
代码复用性:-
The same code used in the shell can be applied to process large-scale data in Spark.
在Shell中使用的代码可以直接用于处理大规模数据。 -
Example:
object ProcessProductionData { def main(args: Array[String]) { val sc = new SparkContext(...) val file = sc.hadoopFile("productionLogs") val filtered = file.filter(_.contains("ERROR")) val mapped = filtered.map(...) } }
-
-
Real-Time Processing with Spark Streaming:
实时流处理:-
Similar code can be used in Spark Streaming for real-time data processing.
类似的代码可以用于Spark Streaming进行实时数据处理。 -
Example:
object ProcessLiveStream { def main(args: Array[String]) { val sc = new StreamingContext(...) val stream = sc.kafkaStream(...) val filtered = stream.filter(_.contains("ERROR")) val mapped = filtered.map(...) } }
-
Why We Need Spark Streaming
Use Cases:
应用场景
- Twitter Campaigns Twitter恶意活动检测:
- Detect malicious campaigns promoting computer viruses in real time.
实时检测推广计算机病毒的恶意活动。 - Example: Tweets like "Go to http:/... to see the replay during football match."
示例: 类似“访问http:/...观看足球比赛回放”的推文。
- Detect malicious campaigns promoting computer viruses in real time.
- Datacenter Log Monitoring:
数据中心日志监控:- Process streams of logs from clusters (e.g., Flume) to detect problems in real time.
处理来自集群(如Flume)的日志流,实时检测问题。
- Process streams of logs from clusters (e.g., Flume) to detect problems in real time.
- Facebook Advertisements:
Facebook广告:- Advertisers need real-time insights into who clicks on their ads using platforms like Facebook Ads.
广告商需要实时了解谁点击了他们的广告(通过Facebook Ads等平台)。
- Advertisers need real-time insights into who clicks on their ads using platforms like Facebook Ads.
Challenges in Handling These Applications 处理这些应用的挑战:
-
Data Size:
数据规模:- Requires a very large cluster environment.
需要大规模的集群环境。
- Requires a very large cluster environment.
-
Cluster Issues:
集群问题:- More nodes increase the likelihood of faults.
节点越多,故障发生的可能性越高。 - More nodes increase the likelihood of slow nodes.
节点越多,出现慢节点的可能性越高。
- More nodes increase the likelihood of faults.
-
Importance of Quick Recovery:
快速恢复的重要性:- Critical to avoid losing valuable data, such as:
避免丢失有价值的数据,例如: - Part of click logs for advertising.
广告点击日志。 - Twitter accounts affected by malicious campaigns.
受恶意活动影响的Twitter账户。
- Critical to avoid losing valuable data, such as:
-
Need for a New Streaming Processing Model:
需要新的流处理模型:- Traditional models are insufficient for handling large-scale, real-time data with fault tolerance and low latency.
传统模型无法满足大规模、实时数据的处理需求,尤其是在容错和低延迟方面。
- Traditional models are insufficient for handling large-scale, real-time data with fault tolerance and low latency.
Discretized Streams (D-Streams) 离散化流(D-Streams)
Concept 概念
- Run streaming computations as a series of very small, deterministic batch jobs.
将流式计算作为一系列非常小的、确定性的批处理作业运行。 - Divide the live data stream into small batches (e.g., X seconds).
将实时数据流切割成小批次(例如,X秒)。 - Treat each batch as an RDD and process it using RDD operations.
将每个批次视为RDD并使用RDD操作进行处理。 - Return processed results in batches.
返回批处理后的结果。
Benefits:
- Low Latency: Batch size can be as low as 0.5 seconds.
低延迟: 批次大小可低至0.5秒。 - Combination of Batch and Streaming Processing:
批处理和流处理的结合:- RDDs can store batch data, enabling integration of batch and streaming processing.
RDD可以存储批处理数据,支持批处理和流处理的集成。 - Example: Find spam URLs from a static list in a Twitter stream.
示例:从Twitter流中查找静态列表中的垃圾URL。
- RDDs can store batch data, enabling integration of batch and streaming processing.
Example - Get Hashtags from Twitter:
-
Step 1: Create a DStream from Twitter Streaming API.
步骤1: 从Twitter Streaming API创建DStream。val tweets = ssc.twitterStream()
-
Step 2: Extract hashtags using
flatMap
.
步骤2: 使用flatMap
提取话题标签。val hashTags = tweets.flatMap(status => getTags(status))
-
Step 3: Save results to HDFS.
步骤3: 将结果保存到HDFS。hashTags.saveAsHadoopFiles("hdfs://...")
-
Step 4: Use
foreach
to process each RDD (e.g., write to a database or update analytics).
步骤4: 使用foreach
处理每个RDD(例如,写入数据库或更新分析)。hashTags.foreach(hashTagRDD => { ... })
Python Examples:
-
Example 1: Word Count in HDFS Files:
示例1: 统计HDFS文件中单词数:-
Count words in new text files created in an HDFS directory.
统计HDFS目录中新创建的文本文件中的单词数。 -
Run:
spark-submit hdfs_wordcount.py
> stream_out.txt - Add files to HDFS:
hadoop fs -mkdir test_stream hadoop fs -put localfile.txt test_stream/
- View results:
nano stream_out.txt
-
-
Example 2: Stream from TCP Socket:
-
Use
nc -lk 9999
to send text to a Spark streaming script. -
Python script:
from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream("localhost", 9999) words = lines.flatMap(lambda line: line.split(" ")) words.pprint() ssc.start() ssc.awaitTermination()
-
Operations in Spark Streaming
Transformations 转换操作:
- Modify data from one DStream to another.
将数据从一个DStream转换为另一个DStream。 - Standard RDD Operations:
标准RDD操作map
,flatMap
,countByValue
,count
,union
, etc.flatMap(func)
: Similar tomap
, but each input item can map to 0 or more output items.
类似于map
,但每个输入项可以映射到0个或多个输出项。countByValue()
: Returns a new DStream of(K, Long)
pairs, where the value is the frequency of each key in the source DStream.
返回一个新的DStream,包含(K, Long)
对,表示每个键的频率。
Stateful Operations:
有状态操作
window(windowLength, slideInterval)
: Returns a new DStream computed based on windowed batches of the source DStream.
返回基于窗口批次的DStream。countByWindow(windowLength, slideInterval)
: Returns a sliding window count of elements in the stream.
返回滑动窗口中的元素计数。reduceByWindow(func, windowLength, slideInterval)
: Aggregates elements in the stream over a sliding interval using a functionfunc
.
使用函数func
在滑动窗口内聚合元素。
Window-Based Queries:
-
Example: Count hashtags over the last 1 minute.
统计过去1分钟内的话题标签。val tweets = ssc.twitterStream() val hashTags = tweets.flatMap(status => getTags(status)) val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
- Returns a DStream of
(hashtag, count)
pairs, e.g.,[(#cat, 5), (#dog, 6), ...]
.
返回(话题标签, 计数)
对的DStream,例如[(#cat, 5), (#dog, 6), ...]
。
- Returns a DStream of
Output Operations:
-
Send data to external systems.
将数据发送到外部系统。 -
saveAsTextFiles(prefix, [suffix])
: Saves DStream contents as text files (not supported in Spark 1.6.0).
将DStream内容保存为文本文件(Spark 1.6.0不支持)。 -
foreachRDD(func)
: Applies a function to each RDD in the DStream to push data to external systems (e.g., save to files or write to a database).
对DStream中的每个RDD应用函数,将数据推送到外部系统(例如保存到文件或写入数据库)。def sendPartition(iter): connection = createNewConnection() for record in iter: connection.send(record) connection.close() dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
updateStateByKey
-
Maintains a running count of each word in a text data stream.
维护文本数据流中每个单词的累计计数。 -
Function: Updates the state based on previous state and new values.
函数: 根据之前的状态和新值更新状态。def updateFunction(newValues, runningCount): if runningCount is None: runningCount = 0 return sum(newValues, runningCount)
-
Usage:
val runningCounts = pairs.updateStateByKey(updateFunction)
-
General Behavior:
- Applies the state update function for all existing keys in every batch.
在每个批次中对所有现有键应用状态更新函数。 - If the update function returns
None
, the key-value pair is eliminated.
如果更新函数返回None
,则删除该键值对。
- Applies the state update function for all existing keys in every batch.
Combine Batch and Stream Processing:
Filtering Spam Words from Streams
批处理和流处理的结合
-
Requires:
- An RDD containing spam words.
包含垃圾词汇的RDD - A stream with all words.
包含所有词汇的流。
- An RDD containing spam words.
-
Process:
- Join the data stream with spam information for data cleaning.
将数据流与垃圾信息进行连接以进行数据清理。
cleanedDStream = wordcounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
- Join Operation: Combines two DStreams of
(K, V)
and(K, W)
pairs into a new DStream of(K, (V, W))
pairs.
连接操作: 将两个(K, V)
和(K, W)
对的DStream合并为一个新的(K, (V, W))
对的DStream。 - Example:
wordCounts:
(the, 1), (dog, 5), (buy, 1), (drug, 1)
spamInfoRDD:
(buy, 1), (drug, 1)
- After Join:
(buy, (1,1)), (drug, (1,1))
- Filter: Remove spam words using a filter function.
使用过滤函数移除垃圾词汇。
- Join the data stream with spam information for data cleaning.
Use Cases for Combining Batch and Stream Processing:
- Online Machine Learning:
在线机器学习- Predict
{True, False}
based on stream elements.
基于流元素预测{True, False}
。 - Continuously learn and update data models using
updateStateByKey
andtransform
.
使用updateStateByKey
和transform
持续学习和更新数据模型。 - Combine live data streams with historical data.
将实时数据流与历史数据结合。
- Predict
- Complex Event Processing (CEP):
复杂事件处理(CEP)- Match incoming events against patterns.
将传入事件与模式匹配 - Example:
- Stream:
{room_visit, sanitize, room_visit, ...}
- Pattern:
{room_visit, room_visit}
- If the pattern appears, trigger an alert (e.g., doctor did not sanitize hands).
如果模式出现,触发警报(例如,医生未消毒双手)。 - Use window-based operations like
reduceByWindow
.
使用基于窗口的操作,如reduceByWindow
。
- Match incoming events against patterns.
Fault Tolerance in Spark Streaming:
Fault Tolerance Semantics of RDDs:
RDD的容错语义
- Each RDD remembers the lineage of operations used to create it from a fault-tolerant input dataset (e.g., HDFS).
每个RDD都记录了从故障容错输入数据集(如HDFS)创建它的操作谱系。 - If a partition is lost due to a worker node failure, it can be recomputed using the lineage.
如果由于工作节点故障导致分区丢失,可以使用谱系重新计算。 - The final transformed RDD data remains consistent regardless of cluster failures.
无论集群是否发生故障,最终转换后的RDD数据保持一致。
Challenges in Spark Streaming:
- Received data may come from the network, which is not fault-tolerant by default.
接收的数据可能来自网络,而网络默认不具备容错性。
Solution for Fault Tolerance:
- Data Replication:
数据复制- Buffer and replicate data among multiple Spark executors (default: 2 replicas).
在多个Spark执行器之间缓冲和复制数据(默认:2个副本)。 - If a worker fails, retrieve data from another node.
如果工作节点故障,从其他节点检索数据。
- Buffer and replicate data among multiple Spark executors (default: 2 replicas).
- Handling Data Loss:
处理数据丢失- If data is buffered but not replicated, attempt to retrieve it from the data source.
如果数据已缓冲但未复制,尝试从数据源检索。
- If data is buffered but not replicated, attempt to retrieve it from the data source.
Fault Tolerance Scenarios:
容错场景
- Worker Node Failure:
工作节点故障:- All in-memory data is lost.
所有内存中的数据丢失。 - If receivers were running on the failed node, buffered data is lost.
如果接收器在故障节点上运行,缓冲的数据丢失。
- All in-memory data is lost.
- Driver Node Failure:
驱动节点故障:- The SparkContext is lost.
SparkContext丢失。 - All executors and their in-memory data are lost.
所有执行器及其内存中的数据丢失。 - Solution: Use checkpointing to recover.
解决方案: 使用检查点恢复。
- The SparkContext is lost.
- Data Source Considerations:
数据源考虑:- For text file sources, the system must be fault-tolerant (e.g., HDFS).
对于文本文件源,系统必须具有容错性(如HDFS)。 - For receiver-based sources, the receiver must acknowledge data replication.
对于基于接收器的源,接收器必须确认数据复制。
- For text file sources, the system must be fault-tolerant (e.g., HDFS).
Discretized Streams (D-Streams) in More Detail:
DStream Overview:
DStream概述:
- A DStream is a sequence of RDDs representing a stream of data.
DStream是一系列表示数据流的RDD。 - It defines how to generate an RDD in each batch interval.
它定义了如何在每个批次间隔内生成RDD。 - Key Components:
- Dependencies: List of parent DStreams it depends on.
依赖关系: 依赖的父DStream列表。 - Slide Interval: The interval at which RDDs are computed.
滑动间隔: 计算RDD的时间间隔。 - Compute Function: Function to generate the RDD at a specific time
t
.
计算函数: 在特定时间t
生成RDD的函数。
- Dependencies: List of parent DStreams it depends on.
Example: Mapped DStream:
-
Dependencies: Single parent DStream.
依赖关系: 单个父DStream。 -
Slide Interval: Same as the parent DStream.
滑动间隔: 与父DStream相同。 -
Compute Function:
计算函数:- Creates a new RDD by applying a
map
function to the parent DStream's RDD at timet
.
通过对父DStream在时间t
的RDD应用map
函数来创建新的RDD。
override def compute(time: Time): Option[RDD[U]] = { parent.getOrCompute(time).map(_.map[U](mapFunc)) }
getOrCompute(time)
: Retrieves the RDD at timet
if already computed; otherwise, generates it.
getOrCompute(time)
: 如果RDD已经计算过,则检索它;否则生成它。mapFunc
: The function applied to generate the new RDD.
mapFunc
: 用于生成新RDD的函数。
- Creates a new RDD by applying a
From DStreams to Spark Jobs: Step 1 (Program → DStream Graph):
-
A Spark Streaming program defines a DStream graph.
Spark Streaming程序定义了一个DStream图。 -
Example Program:
val t = ssc.twitterStream("...").map(...) t.foreach(...)
-
DStream Graph:
- Input DStream (T): Represents the source (e.g., Twitter stream).
输入DStream (T): 表示数据源(例如Twitter流)。 - Mapped DStream (M): Applies transformations (e.g.,
map
).
映射DStream (M): 应用转换操作(例如map
)。 - Foreach DStream (F): Represents output operations (e.g.,
foreach
).
Foreach DStream (F): 表示输出操作(例如foreach
)。
- Input DStream (T): Represents the source (e.g., Twitter stream).
-
Complex Example:
val t1 = ssc.twitterStream("...") val t2 = ssc.twitterStream("...") val t = t1.union(t2).map(...) t.saveAsHadoopFiles(...) t.map(...).foreach(...) t.filter(...).foreach(...)
From DStreams to Spark Jobs: Step 2 (DStream Graph → RDD Graph → Spark Job):
- Process:
- Every interval, the RDD graph is computed from the DStream graph.
每个间隔内,从DStream图计算RDD图。 - For each output operation, a Spark action is created.
为每个输出操作创建一个Spark动作。 - For each action, a Spark job is created to compute it.
为每个动作创建一个Spark作业来计算它。
- Every interval, the RDD graph is computed from the DStream graph.
- Example:
- DStream Graph: Contains transformations and output operations.
DStream图: 包含转换操作和输出操作。 - RDD Graph: Contains Block RDDs with data received in the last batch interval.
RDD图: 包含在上一个批次间隔内接收到的数据的Block RDD。 - Spark Actions (A): Trigger the computation of the RDD graph.
Spark动作 (A): 触发RDD图的计算。
- DStream Graph: Contains transformations and output operations.
System Model:
Components:
- Spark Streaming Client:
Spark Streaming客户端- Contains the
SparkContext
andDStream Graph
.
包含SparkContext
和DStream Graph
。
- Contains the
- Network Input Tracker:
网络输入跟踪器- Tracks data received by network receivers and maps them to input DStreams.
跟踪网络接收器接收的数据,并将其映射到输入DStream。
- Tracks data received by network receivers and maps them to input DStreams.
- Job Scheduler:
作业调度器- Periodically queries the DStream graph to generate Spark jobs from received data.
定期查询DStream图,根据接收到的数据生成Spark作业。 - Hands jobs to the Job Manager for execution.
将作业交给作业管理器执行。
- Periodically queries the DStream graph to generate Spark jobs from received data.
- Job Manager:
作业管理器- Maintains a job queue and executes jobs in Spark.
维护作业队列并执行Spark作业。
- Maintains a job queue and executes jobs in Spark.
- Block Manager:
块管理器- Manages data blocks across worker nodes.
管理跨工作节点的数据块。
- Manages data blocks across worker nodes.
- Cluster Manager:
集群管理器- Manages resources and schedules tasks across the cluster.
管理资源并在集群中调度任务。
- Manages resources and schedules tasks across the cluster.
Execution Model - Receiving Data:
执行模型 - 接收数据:
- Process:
StreamingContext.start()
initializes the system.
StreamingContext.start()
初始化系统。- Network receivers are launched as tasks on Spark workers.
网络接收器作为任务在Spark工作节点上启动。 - Received data is divided into blocks and pushed to the Block Manager.
接收的数据被分成块并推送到块管理器。 - Blocks are replicated across nodes for fault tolerance.
数据块在节点之间复制以实现容错。 - The Network Input Tracker notifies the system of received block IDs.
网络输入跟踪器通知系统接收到的块ID。
- Key Points:
- Block Manager acts as a "write-once" key-value store.
块管理器充当“一次性写入”的键值存储。 - Data can be replicated across nodes or stored to disk.
数据可以在节点之间复制或存储到磁盘。
- Block Manager acts as a "write-once" key-value store.
Execution Model - Job Scheduling:
执行模型 - 作业调度
- Process:
- The Job Scheduler queries the DStream graph to generate Spark jobs.
作业调度器查询DStream图以生成Spark作业。 - Jobs are handed to the Job Manager for execution.
作业被交给作业管理器执行。 - Jobs are executed on worker nodes using Spark's schedulers.
作业使用Spark的调度器在工作节点上执行。
- The Job Scheduler queries the DStream graph to generate Spark jobs.
- Key Points:
- The Job Manager maintains a job queue and ensures jobs are executed in order.
作业管理器维护作业队列并确保作业按顺序执行。 - The Block Manager provides the location of block IDs for job execution.
块管理器提供块ID的位置以供作业执行。
- The Job Manager maintains a job queue and ensures jobs are executed in order.
DStream Persistence
Overview
- If a DStream is set to persist at a specific storage level, all RDDs generated by it will use the same storage level.
如果DStream设置为持久化到特定存储级别,则其生成的所有RDD都将使用相同的存储级别。 - When to Persist:
何时持久化:- When there are multiple transformations or actions on a DStream.
当DStream上有多个转换或操作时。 - When RDDs in a DStream will be used multiple times.
当DStream中的RDD将被多次使用时。
- When there are multiple transformations or actions on a DStream.
- Window-Based DStreams:
基于窗口的DStream:- Automatically persisted in memory.
自动持久化到内存中。
- Automatically persisted in memory.
Default Storage Levels
默认存储级别:
- DStreams:
StorageLevel.MEMORY_ONLY_SER
(in memory as serialized bytes).
DStream:StorageLevel.MEMORY_ONLY_SER
(以序列化字节形式存储在内存中)。 - Input DStreams:
StorageLevel.MEMORY_AND_DISK_SER_2
.
输入DStream:StorageLevel.MEMORY_AND_DISK_SER_2
。 - Note: This differs from RDD's default storage level (
MEMORY_ONLY
), which does not use serialization.
注意: 这与RDD的默认存储级别(MEMORY_ONLY
)不同,后者不使用序列化。 - Advantage of Serialization:
序列化的优势:- Reduces random pauses due to garbage collection (GC), providing more consistent job processing times.
减少由于垃圾回收(GC)导致的随机暂停,提供更一致的作业处理时间。
- Reduces random pauses due to garbage collection (GC), providing more consistent job processing times.
Checkpointing in Spark Streaming
Overview:
- Streaming applications must operate 24/7, even in case of system failures.
流式应用程序必须24/7运行,即使在系统故障的情况下。 - Purpose of Checkpointing:
检查点的目的:- Metadata Checkpointing: Saves configuration, DStream operations, and incomplete jobs to recover from driver failures.
元数据检查点: 保存配置、DStream操作和未完成的作业,以便从驱动程序故障中恢复。 - Data Checkpointing: Saves generated RDDs to reliable storage (e.g., HDFS) to prevent the RDD lineage from growing too large.
数据检查点: 将生成的RDD保存到可靠的存储(如HDFS),以防止RDD谱系变得过大。
- Metadata Checkpointing: Saves configuration, DStream operations, and incomplete jobs to recover from driver failures.
Data Checkpointing:
数据检查点
-
Process:
- RDDs are saved to HDFS transparently to the user program.
RDD透明地保存到HDFS,对用户程序无感知。 - Checkpointing is done lazily; the RDD is saved to HDFS the first time it is computed.
检查点是惰性的;RDD在第一次计算时保存到HDFS。
red_rdd.checkpoint()
- The contents of
red_rdd
are saved to an HDFS file, and this is transparent to all child RDDs.
red_rdd
的内容保存到HDFS文件,对所有子RDD透明。
- RDDs are saved to HDFS transparently to the user program.
Why is RDD Checkpointing Necessary?
为什么需要RDD检查点?
-
Stateful DStream Operators:
有状态的DStream操作:- Can have infinite lineages, leading to large RDD graphs.
可能具有无限的谱系,导致RDD图变得过大。
- Can have infinite lineages, leading to large RDD graphs.
-
Issues with Large Lineages:
大谱系的问题:- Large closure of the RDD object → large task sizes → high task launch times.
RDD对象的闭包过大 → 任务大小增加 → 任务启动时间变长。 - High recovery times under failure.
故障恢复时间增加。
- Large closure of the RDD object → large task sizes → high task launch times.
-
Solution:
解决方案:- Periodic RDD checkpointing to truncate the lineage and reduce recovery time.
定期RDD检查点,截断谱系并减少恢复时间.
- Periodic RDD checkpointing to truncate the lineage and reduce recovery time.
Tradeoff in Checkpointing Periodicity:
检查点频率的权衡:
- Checkpoint Too Frequent:
检查点过于频繁:- HDFS writing can slow down processing.
HDFS写入会减慢处理速度。
- HDFS writing can slow down processing.
- Checkpoint Too Infrequent:
检查点过于稀疏:- Task launch times may increase due to large RDD lineages.
由于RDD谱系过大,任务启动时间可能增加。
- Task launch times may increase due to large RDD lineages.
- Default Setting:
默认设置:- Checkpoints at most once every 10 seconds.
最多每10秒检查一次。 - Aim to checkpoint once every 10 batches.
目标是每10个批次检查一次。
- Checkpoints at most once every 10 seconds.
Performance of Spark Streaming:
-
Throughput and Latency:
吞吐量和延迟:- Can process 60 million records per second (6 GB/sec) on 100 nodes with sub-second latency.
在100个节点上每秒可处理6000万条记录(6 GB/秒),延迟为亚秒级。 - WordCount Example:
WordCount示例:- Performs a sliding window count every 30 seconds.
每30秒执行一次滑动窗口计数。 - Achieves 1-2 seconds latency with high throughput.
在1-2秒延迟内实现高吞吐量。
- Performs a sliding window count every 30 seconds.
- Grep Example:
Grep示例:- Network-bound and performs pattern matching on strings.
网络绑定,对字符串进行模式匹配。 - Achieves 1-2 seconds latency with high throughput.
在1-2秒延迟内实现高吞吐量。
- Network-bound and performs pattern matching on strings.
- Can process 60 million records per second (6 GB/sec) on 100 nodes with sub-second latency.
-
Comparison with Other Systems:
与其他系统的比较:-
Higher Throughput than Storm:
吞吐量高于Storm:- Spark Streaming: 670k records/sec/node.
Spark Streaming: 每秒67万条记录/节点。 - Storm: 115k records/sec/node.
Storm: 每秒11.5万条记录/节点。 - Commercial Systems: 100-500k records/sec/node.
商用系统: 每秒10-50万条记录/节点。
- Spark Streaming: 670k records/sec/node.
-
Throughput per Node:
每个节点的吞吐量:- WordCount:
- Spark outperforms Storm across all record sizes (100-1000 bytes).
Spark在所有记录大小(100-1000字节)上均优于Storm。 - Grep:
- Spark significantly outperforms Storm, especially for larger record sizes.
Spark显著优于Storm,尤其是在较大记录大小的情况下。
-
-
Apache Storm:
- Core Concepts:
- Tuple: A named list of values.
元组(Tuple): 命名的值列表。 - Stream: An unbounded sequence of tuples processed by the application.
流(Stream): 应用程序处理的无限元组序列。
- Tuple: A named list of values.
- Application Defined by Topology:
通过拓扑结构定义应用程序:- Spouts: Stream sources that read tuples from external sources (e.g., Twitter API) or disk.
Spouts: 流源,从外部源(如Twitter API)或磁盘读取元组。 - Bolts: Process input streams and produce output streams, encapsulating application logic.
Bolts: 处理输入流并生成输出流,封装应用逻辑。
- Spouts: Stream sources that read tuples from external sources (e.g., Twitter API) or disk.
- Core Concepts:
-
Fast Fault Recovery:
快速故障恢复:- Recovers from faults or stragglers within 1 second due to:
在1秒内从故障或滞后中恢复,原因是:- Checkpointing: Ensures data and metadata are saved for recovery.
检查点(Checkpointing): 确保数据和元数据保存以供恢复。 - Speculative Execution: Creates backup copies of tasks on slow nodes (≥1.4x slower than the median task).
推测执行(Speculative Execution): 为慢节点(比中位任务慢≥1.4倍)创建任务备份副本。
- Checkpointing: Ensures data and metadata are saved for recovery.
- Recovers from faults or stragglers within 1 second due to:
-
Mobile Millennium Project:
Mobile Millennium项目:- Traffic Transit Time Estimation:
交通通行时间估计:- Uses online machine learning on GPS observations.
使用在线机器学习处理GPS观测数据。 - CPU Intensive: Requires dozens of machines for useful computation.
CPU密集型: 需要数十台机器进行有效计算。 - Scalability: Scales linearly with cluster size.
可扩展性: 随集群规模线性扩展。
- Uses online machine learning on GPS observations.
- Traffic Transit Time Estimation: