ISIT312程序辅导、辅导java,Python编程
- 首页 >> Python编程 ISIT312 Big Data Management
Spring 2021
Assignment 3
All files left on Moodle in a state "Draft(not submitted)" will not be evaluated. Please refer to
the submission dropbox on Moodle for the submission due date and time.
This assessment contributes to 20% of the total evaluation in the subject. The deliverable is
specified in the task(s).
It is a requirement that all Laboratory and Assignment tasks in this subject must be solved
individually without any cooperation with the other students. If you have any doubts, questions,
etc. please consult your lecturer or tutor during lab classes or office hours. Plagiarism will result
in a FAIL grade being recorded for that assessment task.
The environment of implementation is the Ubuntu virtual machine which is imported from
the BigDataVM-2021v2_2.ova file.
Q1. Apache HBase (3 marks)
Consider the following conceptual model:
Develop two alternative implementations of the above schema in HBase; in other words, two HBase
tables are to be created. Each HBase table must be loaded with at least two instructors and two subjects.
The names of column families and column qualifiers must be indicative. You can determine the row
keys and cell values.
After finishing loading the data , use a “scan” command to list all rows from each table.
You must also explain the key difference between your two implementations.
Deliverable. A file solution1.pdf which includes:
The Hadoop commands, HBase shell commands and execution output in Zeppelin or Terminal;
An explanation of the difference between the two implementations.
SUBJECT
code ID
tit le
INSTRUCTOR
staff-number ID
first-name
last-name
email
is-the-coordinator-of
teaches
Q2. Apache Pig (4 marks)
DATA SETS: apat63_99.txt and cite75_99.txt which are in the “datasets” folder on
Desktop of the VM. The source of the two data sets is http://www.nber.org/patents/
The first file apat63_99.txt contains about 3 million records for the U.S. patents. Please refer to
the Assignment 1 specification for a description of this data set. The second file cite75_99.txt
contains more than 16 million lines of citation records. The following content shows the first few lines:
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
3858241,3634889
3858242,1515701
3858242,3319261
…
For example, the second line shows that patent 3858241 cites patent 956203. The file is sorted by the
citing patents. A citation count of a patent refers the number of times that it is cited by other patents.
For example, if a patent is cited by 100 patents in total, the citation count of this patient is 100.
Load the files into HDFS and use process the following operations in Apache Pig:
(1) Find the average number of claims grouped by patents where the grant year is ≥1975 and the country
is AU.
(2) Find the patent number, grant year and country for the most-cited patient(s), namely those patent(s)
with the largest citation count.
Deliverable. A file solution2.pdf which contains your implementation (in Pig Latin), commands and
results for the above two operations.
Q3. Apache Spark, Hive and HBase (8 marks)
Consider the above conceptual model of a data warehouse. The data of this model is stored in the files
customer.tbl, order_details.tbl, order.tbl, product.tbl and salesperson.tbl, all of which are available in
a “Resources” folder of Assignment 3 on Moodle. Note, that each file has a header with information
about the meanings of data in each column. A header is not a data component of each file. Remove the
headers before transferring the files into HDFS
(1) Load the data in the above five files into five external tables in Hive. (You can make reasonable
assumptions on the data types.)
(2) Load the five Hive tables into Spark dataframes and process the following operations in Spark:
a. Find the number of orders whose ship-city is London.
b. Find the number of products that were not ordered in 1996.
c. Find the order value (i.e., unit price multiplied by quantity of products per order) for order IDs
10270 to 10279.
d. Sort the salespersons by the total order value of orders they handled in a descending order, and
find the employee ID, fist name and last name of the top three salespersons.
(3) Convert the Spark dataframe for the salesperson data (which is from salesperson.tbl) into an HBase
table. Then, in the HBase shell, use the get command to retrieve the salesperson data with the employee
ID “1”. (You can make reasonable assumptions on the column families.)
Note. All the above steps need to be performed in the command line interfaces of the related software.
Do not use Zeppelin.
Deliverable. A file solution3.pdf which contains all your commands and code in the related command
line interfaces.
SALESPERSON
employee-id ID
last-nme
first-name
tit le
birth-date
hire-date
notes
CUSTOMER
customer-id ID
customer-code
company-name
contact-name
contact-tit le
city
region
postal-code
country
phone
fax
PRODUCT
product-id ID
product-name
unit-price
units- in-stock
units-on-order
discontinuted
ORDER
order-id ID
order-date
ship-via
ship-city
ship-region
ship-postal-code
ship-country
ORDER-DETAIL
unit-price
quantity
discount
Q4. Spark Streaming (5 marks)
DATASET: A file containing some transaction data in the resources folder for this assignment on
Moodle. The transaction data includes daily retail records of a retailer in one year.
Based on the following sample Scala code, implement a streaming query on the above dataset. The file
source of this query is HDFS. Thus, after you download (and unzip) the files, you need to upload the
files to HDFS.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
// for an older version of Spark, also include:
// import org.apache.spark.sql.streaming.ProcessingTime
val retail_data = ...
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(retail_data)
val staticSchema = staticDataFrame.schema
staticDataFrame.printSchema()
// root
// |-- InvoiceNo: string (nullable = true)
// |-- StockCode: string (nullable = true)
// |-- Description: string (nullable = true)
// |-- Quantity: integer (nullable = true)
// |-- InvoiceDate: timestamp (nullable = true)
// |-- UnitPrice: double (nullable = true)
// |-- CustomerID: double (nullable = true)
// |-- Country: string (nullable = true)
spark.conf.set("spark.sql.shuffle.partitions", 2)
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 10)
.format("csv")
.option("header", "true")
.load(retail_data)
streamingDataFrame.isStreaming //true if streaming
val purchaseQuery = streamingDataFrame
/*
*/
val query = purchaseQuery
.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.trigger(Trigger.ProcessingTime("5 seconds"))
// for an older version of Spark, use:
// .trigger(ProcessingTime("5 seconds"))
.start()
The streaming query performs the following operations:
It filters out data with a missing value (if any) in the InvoiceNo, UnitPrice, Quantity and
CustomerID columns (i.e., rows with a null value in those columns are removed).
It returns the average purchasing value (= UnitPrice × Quantity / (InvoiceNo)) per
customer, which is sorted in a descending order of the average purchasing value. (Note: The
return will be updated in the streaming query.)
Based on the provided sample code, implement a Scala script to complete the above operations. Execute
the script in Spark shell by using :paste command. Report the first 20 rows of the first four return
batches.
Deliverable. A file solution4.pdf which includes your Scala code, commands and the output (namely
the return batches)
Spring 2021
Assignment 3
All files left on Moodle in a state "Draft(not submitted)" will not be evaluated. Please refer to
the submission dropbox on Moodle for the submission due date and time.
This assessment contributes to 20% of the total evaluation in the subject. The deliverable is
specified in the task(s).
It is a requirement that all Laboratory and Assignment tasks in this subject must be solved
individually without any cooperation with the other students. If you have any doubts, questions,
etc. please consult your lecturer or tutor during lab classes or office hours. Plagiarism will result
in a FAIL grade being recorded for that assessment task.
The environment of implementation is the Ubuntu virtual machine which is imported from
the BigDataVM-2021v2_2.ova file.
Q1. Apache HBase (3 marks)
Consider the following conceptual model:
Develop two alternative implementations of the above schema in HBase; in other words, two HBase
tables are to be created. Each HBase table must be loaded with at least two instructors and two subjects.
The names of column families and column qualifiers must be indicative. You can determine the row
keys and cell values.
After finishing loading the data , use a “scan” command to list all rows from each table.
You must also explain the key difference between your two implementations.
Deliverable. A file solution1.pdf which includes:
The Hadoop commands, HBase shell commands and execution output in Zeppelin or Terminal;
An explanation of the difference between the two implementations.
SUBJECT
code ID
tit le
INSTRUCTOR
staff-number ID
first-name
last-name
is-the-coordinator-of
teaches
Q2. Apache Pig (4 marks)
DATA SETS: apat63_99.txt and cite75_99.txt which are in the “datasets” folder on
Desktop of the VM. The source of the two data sets is http://www.nber.org/patents/
The first file apat63_99.txt contains about 3 million records for the U.S. patents. Please refer to
the Assignment 1 specification for a description of this data set. The second file cite75_99.txt
contains more than 16 million lines of citation records. The following content shows the first few lines:
"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
3858241,3634889
3858242,1515701
3858242,3319261
…
For example, the second line shows that patent 3858241 cites patent 956203. The file is sorted by the
citing patents. A citation count of a patent refers the number of times that it is cited by other patents.
For example, if a patent is cited by 100 patents in total, the citation count of this patient is 100.
Load the files into HDFS and use process the following operations in Apache Pig:
(1) Find the average number of claims grouped by patents where the grant year is ≥1975 and the country
is AU.
(2) Find the patent number, grant year and country for the most-cited patient(s), namely those patent(s)
with the largest citation count.
Deliverable. A file solution2.pdf which contains your implementation (in Pig Latin), commands and
results for the above two operations.
Q3. Apache Spark, Hive and HBase (8 marks)
Consider the above conceptual model of a data warehouse. The data of this model is stored in the files
customer.tbl, order_details.tbl, order.tbl, product.tbl and salesperson.tbl, all of which are available in
a “Resources” folder of Assignment 3 on Moodle. Note, that each file has a header with information
about the meanings of data in each column. A header is not a data component of each file. Remove the
headers before transferring the files into HDFS
(1) Load the data in the above five files into five external tables in Hive. (You can make reasonable
assumptions on the data types.)
(2) Load the five Hive tables into Spark dataframes and process the following operations in Spark:
a. Find the number of orders whose ship-city is London.
b. Find the number of products that were not ordered in 1996.
c. Find the order value (i.e., unit price multiplied by quantity of products per order) for order IDs
10270 to 10279.
d. Sort the salespersons by the total order value of orders they handled in a descending order, and
find the employee ID, fist name and last name of the top three salespersons.
(3) Convert the Spark dataframe for the salesperson data (which is from salesperson.tbl) into an HBase
table. Then, in the HBase shell, use the get command to retrieve the salesperson data with the employee
ID “1”. (You can make reasonable assumptions on the column families.)
Note. All the above steps need to be performed in the command line interfaces of the related software.
Do not use Zeppelin.
Deliverable. A file solution3.pdf which contains all your commands and code in the related command
line interfaces.
SALESPERSON
employee-id ID
last-nme
first-name
tit le
birth-date
hire-date
notes
CUSTOMER
customer-id ID
customer-code
company-name
contact-name
contact-tit le
city
region
postal-code
country
phone
fax
PRODUCT
product-id ID
product-name
unit-price
units- in-stock
units-on-order
discontinuted
ORDER
order-id ID
order-date
ship-via
ship-city
ship-region
ship-postal-code
ship-country
ORDER-DETAIL
unit-price
quantity
discount
Q4. Spark Streaming (5 marks)
DATASET: A file containing some transaction data in the resources folder for this assignment on
Moodle. The transaction data includes daily retail records of a retailer in one year.
Based on the following sample Scala code, implement a streaming query on the above dataset. The file
source of this query is HDFS. Thus, after you download (and unzip) the files, you need to upload the
files to HDFS.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
// for an older version of Spark, also include:
// import org.apache.spark.sql.streaming.ProcessingTime
val retail_data = ...
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(retail_data)
val staticSchema = staticDataFrame.schema
staticDataFrame.printSchema()
// root
// |-- InvoiceNo: string (nullable = true)
// |-- StockCode: string (nullable = true)
// |-- Description: string (nullable = true)
// |-- Quantity: integer (nullable = true)
// |-- InvoiceDate: timestamp (nullable = true)
// |-- UnitPrice: double (nullable = true)
// |-- CustomerID: double (nullable = true)
// |-- Country: string (nullable = true)
spark.conf.set("spark.sql.shuffle.partitions", 2)
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 10)
.format("csv")
.option("header", "true")
.load(retail_data)
streamingDataFrame.isStreaming //true if streaming
val purchaseQuery = streamingDataFrame
/*
*/
val query = purchaseQuery
.writeStream
.format("console")
.queryName("customer_purchases")
.outputMode("complete")
.trigger(Trigger.ProcessingTime("5 seconds"))
// for an older version of Spark, use:
// .trigger(ProcessingTime("5 seconds"))
.start()
The streaming query performs the following operations:
It filters out data with a missing value (if any) in the InvoiceNo, UnitPrice, Quantity and
CustomerID columns (i.e., rows with a null value in those columns are removed).
It returns the average purchasing value (= UnitPrice × Quantity / (InvoiceNo)) per
customer, which is sorted in a descending order of the average purchasing value. (Note: The
return will be updated in the streaming query.)
Based on the provided sample code, implement a Scala script to complete the above operations. Execute
the script in Spark shell by using :paste command. Report the first 20 rows of the first four return
batches.
Deliverable. A file solution4.pdf which includes your Scala code, commands and the output (namely
the return batches)