Introduction
Welcome to my first “Big Data”(ish) project on GCP Dataflow.
This was a Deep Learning project wherein I tackled attempting to train a DNN (a Convolutional Neural Network followed by a Recurrant Neural Network, an LSTM specifically).
The datasets were extracted from more than 2600 videos produced by the research conducted by Boston and Rutgers Universities, jointly under the National Center for Sign Language and Gesture Resources project.
Each video has up to three camera perspectives synchronized from obviously three independent video cameras.
The researchers produced corpus documents in the form of XML and linked metadata therein to corresponding videos. Videos were processed with the OpenCV Python library.
After all data was extracted, this resulted in 3,314 distinct “tokens”. One distinct ASL sign = one lingustic token or “word”, which is the rough equivalent of an English word plus other ASL-specific linguistic aspects. There may not always be a direct translation since an ASL utterance is comprised of much more than a simple English word. I am not a linguist so I will just stop right there.
But the focus of the FIRST part of my project - captured by the source code in this repo and this summary - focuses on the “Big Data” aspect. I will start with how I end.
When I began this summary, I had no idea what I was in for. In the end, there are more than 2600 videos to process, which amount to more than 561,000 frames, assuming a frame-rate of 30 FPS, which is only half the frame-rate at which the videos were originally recorded.
After frame extraction, data needed to be related to each and every one of those 561,000+ frames, which was NOT done by the researchers. This, I had to do myself.
So this part of my project focuses on making that pipeline doable. It had to be done using cloud storage and a parallel data processing framework. I chose Apache Beam using GCP Dataflow as the runner in the end - Apache Beam since its “raison d’etre” is portability to other runners, including local and interactive (Jupyter Notebooks).
I will leave the report of my initial failures for the end.
This write-up is more concerned with my eventual success using Apache Beam on GCP Dataflow.
So, without further ado, let me dive right in!
Preliminaries
This notebook assumes that all setup steps from GCP-readme.md have been followed.
This notebook does not demonstrate the full ETL pipeline used in production (executed on GCP Dataflow).
We only investigate the first couple of major steps of the production pipeline (which is executed in Dataflow on the Google Cloud Platform) in order to demonstrate the general idea of the process. Note that the production pipeline is really a sequence of (sub) pipelines that are daisychained together in a particular order, since latter pipelines depend on former pipelines.
In this notebook, we demonstrate the first two pipelines, which accomplish the following:
1. Boostrap the video index
Substeps are:
1. Download the video index (archive)
2. Extract it.
3. Write it to the destination directory as a CSV
The video index drives the entire full-blown production pipeline.
It tells us the filenames of the target videos.
### 2. Download each video segment comprising final target videos. (The video index contains the URLs).
Target videos are comprised of video segments since some of the final target videos can be rather large.
Altgether, **the *production* pipeline (executed on GCP Dataflow) retrieves more than 2600 videos** produced by the research conducted by Boston and Rutgers Universities, jointly under the [National Center for Sign Language and Gesture Resources project](http://www.bu.edu/asllrp/ncslgr.html).
This notebook will demonstrate retrieving 50 of those.
The implementation of the download shall leverage Apache Beam's parallelism in order to avoid the amount of time it would take to accomplish doing it sequentially. Note that when executed locally, the Apache Beam SDK uses Docker containers for worker node clusters. A cluster in this case consists of 8 workers nodes since my local machine has 8 cores.
In *production*, on GCP Dataflow, this can be scheduled to your heart's content (but this, of course, costs more money to do so).
### 3. Use the `OpenCV` library to extract all frames (from each segment) for each target video.
This step leverages Apache Beam's parallelism as well.
But we MUST take care to ensure that a single worker extracts the frames of each segment associated with the target video.
This is because frames are ordered/sequenced.
Allowing two different workers to extract frames of different segments associated with the same final target video would likely result in frames being extracted out of order (due to parallelism).
Therefore, we partition the extraction task by final target video in order to ensure a single worker handles all segments associated with a single target video.
But we do want parallelism to occurr at the final target video level.
In the end, **in production, the pipeline extracts more than 561,000 frames (images) from the source target videos**!
Of course in this demonstration we will be extracting much less - only 50 out of the more than 2600 videos available will be downloaded and processed (frames extracted). Still, extracting from 50 videos will amount to thousands of frames.
```python
%load_ext autoreload
%autoreload 2
from __future__ import absolute_import
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from api import beam__common, fileio, fidscs_globals
from api.fidscs_globals import disp_source
from api import data_extractor__beam
from apache_beam.options.pipeline_options import PipelineOptions
```
## Constants to be used in this notebook
```python
WORK_DIR = '/tmp'
MAX_TARGET_VIDEOS = 50 # set to -1 for all in production but not in this interactive notebook! That will result in extracting more than 561,000 images from more than 2600 videos! (onto your local machine)
PIPELINE_BASE_JOB_NAME = 'sc-fids-capstone-etl-demo'
```
## Use Apache Beam PipelineOptions for any global settings
We MUST do this since the point of Apache Beam is to enable parallelism (in processing).
How this is accomplished is beyond the scope of this notebook.
But suffice it to say that any notion of a global variable cannot be implemented in the manner one is normally implemented - e.g. with Python global variables.
However, a PipelineOptions object IS passed to each and every worker node by Apache Beam.
Therefore, we accomplish global settings to be shared by all workers - e.g. the working directory and the final destination filepaths to be output by the pipeline - by passing would-be global settings to PipelineOptions, which are required to bootstrap each worker node by Apache Beam.
### Custom Apache Beam Pipeline options
The `beam__common.FIDSCapstonePipelineOptions` class was written to do just that and allows us to create and use our own custom options in Apache Beam pipelines.
Without it, attempting to set custom options on the Pipeline will fail since Apache Beam's PipelineOptions class will reject any options it doesn't recognize.
```python
disp_source(beam__common.FIDSCapstonePipelineOptions)
```
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
class FIDSCapstonePipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_MAX_TARGET_VIDEOS)}',
default=None
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_WORK_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_DATA_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_TMP_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_CORPUS_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_CORPUS_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_DOCUMENT_ASL_CONSULTANT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_ASL_CONSULTANT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_SEGMENT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_FRAME_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_VIDEO_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_FRAME_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VOCABULARY_DS_PATH)}',
default=None,
)
def make_fids_options_dict(work_dir, max_target_videos=-1, beam_gcp_project=fidscs_globals.GCP_PROJECT):
data_dir = fileio.path_join(work_dir, fidscs_globals.DATA_DIR_NAME)
tmp_dir = fileio.path_join(data_dir, fidscs_globals.TMP_DIR_NAME)
videos_dir = fileio.path_join(data_dir, fidscs_globals.VIDEO_DIR_NAME)
stitched_video_frames_dir = fileio.path_join(data_dir, fidscs_globals.STICHED_VIDEO_FRAMES_DIR_NAME)
corpus_dir = fileio.path_join(tmp_dir, fidscs_globals.CORPUS_BASE)
corpus_ds_path = fileio.path_join(data_dir, fidscs_globals.CORPUS_DS_FNAME)
document_asl_cconsultant_ds_path = fileio.path_join(data_dir, fidscs_globals.DOCUMENT_ASL_CONSULTANT_DS_FNAME)
asl_consultant_ds_path = fileio.path_join(data_dir, fidscs_globals.ASL_CONSULTANT_DS_FNAME)
video_indexes_dir = fileio.path_join(tmp_dir, fidscs_globals.VIDEO_INDEX_BASE)
selected_video_index_path = fileio.path_join(video_indexes_dir, fidscs_globals.SELECTED_VIDEO_INDEX)
video_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_DS_FNAME)
video_segment_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_SEGMENT_DS_FNAME)
video_frame_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_FRAME_DS_FNAME)
utterance_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_DS_FNAME)
utterance_video_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_VIDEO_DS_FNAME)
utterance_token_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_TOKEN_DS_FNAME)
utterance_token_frame_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_TOKEN_FRAME_DS_FNAME)
vocabulary_ds_path = fileio.path_join(data_dir, fidscs_globals.VOCABULARY_DS_FNAME)
return {
fidscs_globals.OPT_NAME_PROJECT: beam_gcp_project,
fidscs_globals.OPT_NAME_MAX_TARGET_VIDEOS: max_target_videos,
fidscs_globals.OPT_NAME_WORK_DIR: work_dir,
fidscs_globals.OPT_NAME_DATA_DIR: data_dir,
fidscs_globals.OPT_NAME_TMP_DIR: tmp_dir,
fidscs_globals.OPT_NAME_VIDEO_DIR: videos_dir,
fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR: stitched_video_frames_dir,
fidscs_globals.OPT_NAME_CORPUS_DIR: corpus_dir,
fidscs_globals.OPT_NAME_CORPUS_DS_PATH: corpus_ds_path,
fidscs_globals.OPT_NAME_DOCUMENT_ASL_CONSULTANT_DS_PATH: document_asl_cconsultant_ds_path,
fidscs_globals.OPT_NAME_ASL_CONSULTANT_DS_PATH: asl_consultant_ds_path,
fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR: video_indexes_dir,
fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH: selected_video_index_path,
fidscs_globals.OPT_NAME_VIDEO_DS_PATH: video_ds_path,
fidscs_globals.OPT_NAME_VIDEO_SEGMENT_DS_PATH: video_segment_ds_path,
fidscs_globals.OPT_NAME_VIDEO_FRAME_DS_PATH: video_frame_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_DS_PATH: utterance_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_VIDEO_DS_PATH: utterance_video_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_DS_PATH: utterance_token_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_FRAME_DS_PATH: utterance_token_frame_ds_path,
fidscs_globals.OPT_NAME_VOCABULARY_DS_PATH: vocabulary_ds_path
}
There are two top-level functions used by the "boostrap-vid-index" pipeline, in this order:
1. `data_extractor__beam.pl__1__bootstrap_target_video_index`
2. `data_extractor__beam.pl__2__write_target_vid_index_csv`
Let's examine the source code for `data_extractor__beam.pl__1__bootstrap_target_video_index`...
The following python source code illustrates the programming paradigm used in all Apache Beam (stands for **B**atch and Str**eam** processing) pipelines.
```python
disp_source(data_extractor__beam.pl__1__bootstrap_target_video_index)
```
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN"
"http://www.w3.org/TR/html4/strict.dtd">
def pl__1__bootstrap_target_video_index(pl):
if not fileio.file_path_exists(pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH], pl._options._all_options)[0]:
sel_vid_index_path = (
pl
| "Beam PL: create initial pcoll containing information for boostrap_target_video_index" >> beam.Create(
[ # one row containing dict of:
# 1. url of video indexes archive
# 2. local destination (path) for the downloaded archive
# 3. local destination (path) which will receive the extracted archive csv files (there are more than one)
# 4. final path to the selected videx index csv
# (note that the dict is not laid out in the above order)
{
'vid_indexes_dir': pl._options._all_options[fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR],
'sel_vid_index_path': pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH],
'video_indexes_archive': fidscs_globals.VIDEO_INDEXES_ARCHIVE,
'tmp_dir': pl._options._all_options[fidscs_globals.OPT_NAME_TMP_DIR],
'video_ds_path': pl._options._all_options[fidscs_globals.OPT_NAME_VIDEO_DS_PATH]
}
]
)
# | "Beam PL: bootstrap target video index" >> beam.Map(boostrap_target_video_index) # boostrap_target_video_index outputs SELECTED_VIDEO_INDEX_PATH but beam.Map() wraps this in a pcoll and is fed to...
| "Beam PL: bootstrap target video index" >> beam.ParDo(TargetVideoIndexBootstrapper(pl._options._all_options)) # boostrap_target_video_index outputs SELECTED_VIDEO_INDEX_PATH but beam.Map() wraps this in a pcoll and is fed to...
)
else:
sel_vid_index_path = (
pl
| "Beam PL: create initial pcoll containing path to existing sel_vid_index" >> beam.Create([pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH]])
| "Beam PL: print path to existing sel_vid_index" >> beam.ParDo(beam__common.PipelinePcollPrinter(msg="FOUND EXISTING SEL VID INDEX"))
)
full_target_vid_index_schemad_pcoll = (
sel_vid_index_path
| "Beam PL: read video index into pcoll" >> beam.FlatMap(beam__common.load_vid_index_csv)
| "Beam PL: apply schema to video index pcoll" >> beam.Map(
lambda x: beam.Row(
target_video_filename=str(urllib.parse.quote(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[0]])),
video_seq_id=int(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[1]]),
perspective_cam_id=int(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[2]]),
compressed_mov_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[3]]),
uncompressed_avi_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[4]]),
uncompressed_avi_mirror_1_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[5]]),
uncompressed_avi_mirror_2_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[6]])
)
)
)
return full_target_vid_index_schemad_pcoll
def pl__2__write_target_vid_index_csv(full_target_vid_index_schemad_pcoll, d_pl_options):
vid_index_path = fileio.path_join(d_pl_options[fidscs_globals.OPT_NAME_DATA_DIR], fidscs_globals.VIDEO_INDEX_BASE+'.csv')
if not fileio.file_path_exists(vid_index_path, d_pl_options)[0]:
sorted_full_target_vid_index_schemad_pcoll = beam__common.pl__X__sort_pcoll(full_target_vid_index_schemad_pcoll, pcoll_label="full_target_vid_index")
sorted_corpus_index_csv_rows_pcoll = (
sorted_full_target_vid_index_schemad_pcoll
| "Beam PL: re-apply schema to sorted_full_target_vid_index" >> beam.Map(lambda sorted_full_target_vid_index_schemad_pcoll_row: beam.Row(
target_video_filename=sorted_full_target_vid_index_schemad_pcoll_row.target_video_filename,
video_seq_id=sorted_full_target_vid_index_schemad_pcoll_row.video_seq_id,
perspective_cam_id=sorted_full_target_vid_index_schemad_pcoll_row.perspective_cam_id,
compressed_mov_url=sorted_full_target_vid_index_schemad_pcoll_row.compressed_mov_url,
uncompressed_avi_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_url,
uncompressed_avi_mirror_1_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_mirror_1_url,
uncompressed_avi_mirror_2_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_mirror_2_url
)
)
| beam.Map(lambda sorted_full_target_vid_index_schemad_pcoll_row: beam__common.beam_row_to_csv_string(sorted_full_target_vid_index_schemad_pcoll_row))
)
return beam__common.pl__X__write_pcoll_to_csv(
sorted_corpus_index_csv_rows_pcoll,
"TARGET-VIDEO-INDEX",
fidscs_globals.VIDEO_INDEXES_ARCHIVE,
fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX,
d_pl_options
)
else:
print(f"FOUND EXISTING VID INDEX: {vid_index_path}")
return [vid_index_path]
0 | 1 | 2 | 3 | 4 | 5 | 6 | |
---|---|---|---|---|---|---|---|
0 | http://csr.bu.edu/asl/sequences/compressed/mas... | 0 | 539_219_small_0.mov | 219 | |||
1 | http://csr.bu.edu/asl/sequences/compressed/sla... | 1 | 539_219_small_1.mov | http://csr.bu.edu/asl0/uploading/2000_02_29/se... | 219 | ||
2 | http://csr.bu.edu/asl/sequences/compressed/sla... | 2 | 539_219_small_2.mov | http://csr.bu.edu/asl0/uploading/2000_02_29/se... | http://csr.bu.edu/asl0/working/tapes1/2000_01_... | http://csr.bu.edu/asl0/uploading/2000_02_25/sl... | 219 |
3 | http://csr.bu.edu/asl/private/compressed/maste... | 0 | 548_master_small.mov | http://csr.bu.edu/asl0/working/tapes2/2000_06_... | http://csr.bu.edu/asl0/uploading/2000_06_07/ma... | 548 | |
4 | http://csr.bu.edu/asl/private/compressed/slave... | 1 | 548_slave1_small.mov | http://csr.bu.edu/asl0/working/tapes2/2000_06_... | 548 | ||
... | ... | ... | ... | ... | ... | ... | ... |
2607 | http://csr.bu.edu/asl/private/downloads/2001_8... | 0 | siblings_1066_small_0.mov | http://csr.bu.edu/asl0/working/tapes2/2001_07_... | http://csr.bu.edu/asl0/working/tapes2/2001_07_... | 1066 | |
2608 | http://csr.bu.edu/asl/private/downloads/2001_8... | 2 | siblings_1066_small_2.mov | http://csr.bu.edu/asl0/uploading/2001_07_24/sl... | 1066 | ||
2609 | http://csr.bu.edu/asl/private/downloads/2003_0... | 0 | whitewater_1049_small_0.mov | http://csr.bu.edu/asl0/working/tapes2/2001_07_... | http://csr.bu.edu/asl0/working/tapes2/2001_07_... | 1049 | |
2610 | http://csr.bu.edu/asl/private/downloads/2003_0... | 2 | whitewater_1049_small_2.mov | http://csr.bu.edu/asl0/working/tapes2/2001_07_... | http://csr.bu.edu/asl0/working/tapes2/2001_07_... | 1049 | |
2611 | http://csr.bu.edu/asl/private/downloads/2003_0... | 3 | whitewater_1049_small_3.mov | 1049 |
2612 rows × 7 columns
#### The full "boostrap-vid-index" pipeline
```python
# create a new instance of the pipeline
pl = beam.Pipeline(options=pipeline_options)
full_target_vid_index_schemad_pcoll = data_extractor__beam.pl__1__bootstrap_target_video_index(pl)
_ = data_extractor__beam.pl__2__write_target_vid_index_csv(full_target_vid_index_schemad_pcoll, pl._options._all_options)
```
We know that observing the `full_target_vid_index_schemad_pcoll` `Pcollection` won't be particularly useful and the `Pcollection` that `data_extractor__beam.pl__2__write_target_vid_index_csv` outputs simply has the destination path after it successfully writes `full_target_vid_index_schemad_pcoll` to `
#### The "download-videos-extract-frames" pipeline
The "download-videos-extract-frames" pipeline is comprised of four steps:
1. `beam__common.pl__1__read_target_vid_index_csv`
2. `data_extractor__beam.pl__2__filter_target_vid_index`
3. `data_extractor__beam.pl__3__parallel_download_videos`
4. `data_extractor__beam.pl__4__parallel_extract_target_video_frames`
The function names used for each step suggest what they do. So I will only show source code for `data_extractor__beam.pl__3__parallel_download_videos` and `data_extractor__beam.pl__4__parallel_extract_target_video_frames`, and provide short explanations for steps 1 and 2.
Step 1 obviously reads `def pl__3__parallel_download_videos(vid_index_schemad_pcoll, d_pl_options, n_partitions=8):
vid_index_schemad_pcoll_download_partitions = (
vid_index_schemad_pcoll
| "Beam PL: partition schemad video index for download parallelization" >> beam.Partition(
lambda vid_index_row, num_partitions: random.randint(0,num_partitions-1),
n_partitions
)
)
partition_download_results = [None for i in range(n_partitions)]
for i, vid_index_schemad_pcoll_partition in enumerate(vid_index_schemad_pcoll_download_partitions):
p_label = f"p{i+1}"
p_label_indented = f"\t{p_label}"
p_dl_results = (
vid_index_schemad_pcoll_partition
| f"Beam PL: {p_label} gather download info for video segments" >> beam.ParDo(VideoSegmentInfoGatherer(d_pl_options))
| f"Beam PL: {p_label} download video segments" >> beam.ParDo(VideoSegmentDownloader(d_pl_options, f"{p_label_indented}"))
)
partition_download_results[i] = p_dl_results
merged_download_results = (
(p_dl_r for p_dl_r in partition_download_results)
| f"Beam PL: merge download results" >> beam.Flatten()
)
return merged_download_resultsdef pl__4__parallel_extract_target_video_frames(merged_download_results, d_pl_options, n_partitions=8):
"""
# ******************** EXTRACT SEGMENT-FRAMES IN PARALLEL: BEGIN ********************
# NOTE! THIS IS A CRUCIAL PIECE SO PAY ATTENTION TO THE FOLLOWING!!
# ********** --> IMPORTANT VIDEO-FRAME EXTRACTION PROCESSING INFORMATION<-- (BEGIN) **********
# We partitioned vid_index_schemad_pcoll so that video-SEGMENT downloads can occur independently.
# Downloading segments can occur independently since there is no correlation between each segment
# AS FAR AS DOWNLOADING IS CONCERNED.
#
# However, AS FAR AS EXTRACTION IS CONCERNED, each segment is related by the target video composed
# of each segment. The segment-videos themselves are ordered as they compose the final target
# video corresponding of ordered segment videos. For example, if a target video is composed of
# three segment videos, those segments occur in a certain order, as specified by the video index.
# Expanding upon this example, suppose target video "some_story_given_by_john_doe_0.mov", was recorded
# and saved in three corresponding video segments (to save space, I guess?)
# "some_story_given_by_john_doe_0_1.mov", "some_story_given_by_john_doe_0_2.mov", and
# "some_story_given_by_john_doe_0_3.mov". Note that the trailing "0" in the TARGET VIDEO filename
# indicates the camera perspective... all stories are potentially filmed from multiple synchronized
# camera perspectives/angles - there were obvioiusly multiple synchronized video recorders used in
# in that case. However, for this example, we are focusing on the target video for camera perspective 0.
# Anyway, as said, there are three segments which compose the target video. THESE SEGMENT VIDEOS
# ARE ORDERED (in time). THEREFORE, THE FRAMES COMPOSING EACH SEGMENT VIDEO ARE CONSEQUENTLY ORDERED
# (in time). THE BOTTOM LINE IS THAT WE NOW NEED TO GROUP SEGMENT VIDEOS, KEYED BY CORRESPONDING
# TARGET VIDEO. FURTHERMORE, THE COLLECTION OF SEGMENT VIDEOS FOR EACH TARGET VIDEO MUST BE ORDERED.
# THAT IS, WE MUST EXTRACT SEGMENT FRAMES AND SAVE THEM TO THE FILE SYSTEM WITH A FILE NAMING SCHEME
# THAT REFLECTS FRAME ORDER OF THE UNION OF ALL SEGMENT FRAMES. IF WE EXTRACT THE FRAMES OF EACH
# ORDERED SEGMENT, THEN A SIMPLE NUMERIC INDEX AS SEGMENT-FRAME FILENAME WILL DO THE TRICK.
# ********** --> IMPORTANT VIDEO-FRAME EXTRACTION PROCESSING INFORMATION<-- (END) **********
"""
# GROUP segment videos by target video
# note that this depends on the DAG - i.e. will not occur until partition_download_results are ready which, of course, does not occur until all videos have been downloaded
target_vid_seg_frame_extraction_partitions = (
merged_download_results
| f"Beam PL: group extraction info for video segments by target video" >> beam.GroupBy(lambda d: d['target_video_fname'])
| f"Beam PL: partition target video segment info for extraction parallelization" >> beam.Partition(
lambda vid_index_row, num_partitions: random.randint(0,num_partitions-1),
n_partitions
)
)
partition_extraction_results = [None for i in range(n_partitions)]
for i, p in enumerate(target_vid_seg_frame_extraction_partitions):
p_label = f"p{i+1}"
p_label_indented = f"\t{p_label}"
p_extraction_results = (
p
| f"Beam PL: {p_label} extract frames of each segment per target video" >> beam.ParDo(SegmentFrameExtractor(d_pl_options, f"{p_label_indented}", debug=False))
)
partition_extraction_results[i] = p_extraction_results
(
p_extraction_results
| f"Beam PL: {p_label} count target videos processed" >> beam.combiners.Count.Globally()
| f"Beam PL: {p_label} print target videos processed count" >> beam.ParDo(beam__common.PipelinePcollPrinter(label=p_label_indented, msg="target videos processed"))
)
merged_extraction_results = (
(p_extraction_results for p_extraction_results in partition_extraction_results)
| f"Beam PL: merge extraction results" >> beam.Flatten()
)
_ = (
merged_extraction_results
| "Beam PL: apply schema to merged extraction results pcoll" >> beam.Map(lambda x: beam.Row(
video_fname=str(x[0]),
n_stitched_frames=int(x[1])
))
# | "Beam PL: count total frames extracted" >> beam.transforms.sql.SqlTransform(f"SELECT SUM(n_stitched_frames) AS total_frames_extracted FROM PCOLLECTION") # this is VERY, VERY SLOW
| "Beam PL: select n_stitched_frames" >> beam.Map(lambda extraction_results_row: extraction_results_row.n_stitched_frames)
| "Beam PL: count total frames extracted" >> beam.CombineGlobally(sum)
| f"Beam PL: print total frames extracted" >> beam.ParDo(beam__common.PipelinePcollPrinter(msg="TOTAL FRAMES EXTRACTED"))
)
return merged_extraction_resultsdef beam_extract_frames(tpl_target_video_extraction_info, d_pl_options, label="", debug=False):
"""
expects tpl_target_video_extraction_info: (video_fname, list({'target_video_fname': target_video_fname, 'target_video_frames_dir': target_video_frames_dir, 'segment_url': str(url), 'segment_fname': str(url).split('/')[-1]}))
"""
# # log_results = []
target_video_fname = tpl_target_video_extraction_info[0]
segment_dicts = sorted(tpl_target_video_extraction_info[1], key=lambda segment_dict: segment_dict['segment_fname'])
target_video_frames_dir = segment_dicts[0]['target_video_frames_dir']
target_stitched_vid_name = target_video_frames_dir.split(os.path.sep)[-1]
if not fileio.dir_path_exists(target_video_frames_dir, d_pl_options)[0]:
fileio.make_dirs(target_video_frames_dir, d_pl_options)
video_dir = d_pl_options[fidscs_globals.OPT_NAME_VIDEO_DIR]
local_vid_segment_paths = [fileio.path_join(video_dir, segment_dict['segment_fname']) for segment_dict in segment_dicts]
for segment_dict in segment_dicts:
segment_dict['n_frames_extracted'] = 0
# create local dir for extraction (since OpenCV works only with local file system currently) if we have GCS filesystem
truly_local_vid_dir = None
truly_local_vid_dir_suffix = None
fs = FileSystems.get_filesystem(video_dir)
if type(fs) == GCSFileSystem:
truly_local_vid_dir_suffix = '/'.join(video_dir.split('/')[1:])
truly_local_vid_dir = '/tmp'+truly_local_vid_dir_suffix
# print(f"\t\tGCS storage detected! Extracting frames to truly_local_vid_dir {truly_local_vid_dir} (and will then upload to GCS after that)...")
if debug: print(f"\t\t{truly_local_vid_dir} exists: {fileio.dir_path_exists(truly_local_vid_dir, d_pl_options)}")
if not fileio.dir_path_exists(truly_local_vid_dir, d_pl_options)[0]:
if debug: print(f"\tcreating {truly_local_vid_dir}...")
truly_local_vid_dir_path_segs = truly_local_vid_dir.split('/')
if debug: print(f"\t\ttruly_local_vid_dir_path_segs: {truly_local_vid_dir_path_segs}")
s_cum_path = ''
for i, truly_local_vid_dir_path_seg in enumerate(truly_local_vid_dir_path_segs[1:]):
s_cum_path += '/'+truly_local_vid_dir_path_seg
fileio.make_dirs(s_cum_path, d_pl_options)
if debug: print(f"\t\t{s_cum_path} exists: {fileio.dir_path_exists(s_cum_path, d_pl_options)}")
vc_results = [capture_segment_video(local_vid_segment_path, truly_local_vid_dir, d_pl_options, debug=debug) for local_vid_segment_path in local_vid_segment_paths]
vid_caps = [vc_result[0] for vc_result in vc_results]
truly_local_target_video_frames_dirs = [vc_result[1] for vc_result in vc_results]
for seg_vid_cap in vid_caps:
seg_vid_cap.set(cv2.CAP_PROP_FPS, fidscs_globals.FPS)
frame_counts = list(map(lambda vc: int(vc.get(cv2.CAP_PROP_FRAME_COUNT)), vid_caps))
n_frames_expected = sum(frame_counts)
failed_target_videos = []
n_stitched_frames = 0
if n_frames_expected > 0:
# get count of existing stitched frames in target_stitched_vid_frames_dir
n_stitched_frames = len(fileio.list_dir(target_video_frames_dir, d_pl_options))
b_restitch = n_stitched_frames < n_frames_expected
n_stitched_frames = 0 if b_restitch else n_stitched_frames
for i, seg_vid_cap in enumerate(vid_caps):
segment_dict = segment_dicts[i]
_n_frames_expected = frame_counts[i]
if b_restitch:
success, frame = seg_vid_cap.read()
n_frames = 0
while success:
write_frame_to_file(
frame,
n_stitched_frames,
target_video_frames_dir,
truly_local_target_video_frames_dir=truly_local_target_video_frames_dirs[i],
debug=debug
)
n_frames += 1
n_stitched_frames += 1
success, frame = seg_vid_cap.read()
seg_path = local_vid_segment_paths[i]
seg_fname = seg_path.split(os.path.sep)[-1]
if n_frames != _n_frames_expected:
print(f"{label+': ' if len(label)>0 else ''}{fidscs_globals.VALIDATION_FATAL_ERROR_TEXT} Cannot stitch together target video {target_video_fname} since {_n_frames_expected} frames were expected from segment {seg_fname} ({seg_path}) but only {n_frames} were successfully extracted")
failed_target_videos.append(target_video_fname)
fail = True
break
else:
print(f"{label+': ' if len(label)>0 else ''}Added {n_stitched_frames} frames from segment {seg_fname} for target video {target_video_fname} (stitched-frames dir {target_video_frames_dir})")
else:
n_frames = _n_frames_expected
print(f"{label+': ' if len(label)>0 else ''}Found existing stiched-frames for {target_stitched_vid_name} ({n_stitched_frames} frames in {target_video_frames_dir})")
segment_dict['n_frames_extracted'] = n_frames
else:
if fidscs_globals.OUTPUT_INFO_LEVEL <= fidscs_globals.OUTPUT_INFO_LEVEL__WARNING:
print(f"\t{fidscs_globals.VALIDATION_WARNING_TEXT} Cannot stitch together target video {target_video_fname} since cv2.CAP_PROP_FRAME_COUNT reports segments have zero frames")
failed_target_videos.append(target_video_fname)
fail = True
if truly_local_vid_dir is not None:
for truly_local_target_video_frames_dir in truly_local_target_video_frames_dirs:
fileio.delete_file(truly_local_target_video_frames_dir, d_pl_options, recursive=True, debug=True)
return [(tpl_target_video_extraction_info[0], n_stitched_frames, segment_dicts)]def capture_segment_video(vid_segment_path, truly_local_vid_dir, d_pl_options, debug=False):
video_fname = vid_segment_path.split('/')[-1]
truly_local_target_video_frames_dir = None
fs = FileSystems.get_filesystem(vid_segment_path)
if type(fs) == GCSFileSystem:
if debug: print(f"\n\n\tattempting to open video {vid_segment_path} for reading...")
with fileio.open_file_read(vid_segment_path) as f:
if debug: print(f"\t\tSUCCESS")
# now read from local bytes and write to GCS
buffer = f.read()
truly_local_vid_segment_path = truly_local_vid_dir+'/'+video_fname
if debug: print(f"\t\tattempting to write {truly_local_vid_segment_path} (truly) locally...")
with fileio.open_file_write(truly_local_vid_segment_path) as f_local:
f_local.write(buffer)
f_local.close()
if debug: print(f"\t\t\tSUCCESS")
f.close()
vid_segment_path = truly_local_vid_segment_path
# (truly local) dir for saving frames
truly_local_target_video_frames_dir = truly_local_vid_dir+'/'+fidscs_globals.STICHED_VIDEO_FRAMES_DIR_NAME+'/'+video_fname.split('.')[0]
if debug: print(f"\t\t\tattempting to create directory {truly_local_target_video_frames_dir} (truly_local_target_video_frames_dir) for frames extracted from (truly local) video {truly_local_vid_segment_path}...")
if not fileio.dir_path_exists(truly_local_target_video_frames_dir, d_pl_options)[0]:
if debug: print(f"\t\t\t\tcreating {truly_local_target_video_frames_dir}...")
fileio.make_dirs(truly_local_target_video_frames_dir, d_pl_options)
truly_local_target_video_frames_dir_exists = fileio.dir_path_exists(truly_local_target_video_frames_dir, d_pl_options)[0]
if debug: print(f"\t\t\t\t\t{truly_local_target_video_frames_dir} exists: {truly_local_target_video_frames_dir_exists}")
if not truly_local_target_video_frames_dir_exists:
raise Exception(f"required directory truly_local_target_video_frames_dir {truly_local_target_video_frames_dir_exists} does not exist")
if debug: print(f"\t\t\tattempting to capture (cv2.VideoCapture) video {vid_segment_path})...")
# finally, capture the video bytes
return cv2.VideoCapture(vid_segment_path), truly_local_target_video_frames_dirdef write_frame_to_file(frame, index, target_video_frames_dir, truly_local_target_video_frames_dir=None, debug=False):
local_frame_path = fileio.path_join(target_video_frames_dir, f"{index}.jpg") # this is the final frame path
if truly_local_target_video_frames_dir is not None:
# write truly local frame file
truly_local_frame_path = truly_local_target_video_frames_dir+'/'+f"{index}.jpg"
if debug: print(f"\t\t\t\t\t\tattempting to write {truly_local_frame_path} frame...")
cv2.imwrite(truly_local_frame_path, frame)
if debug: print(f"\t\t\t\t\t\t\tSUCCESS")
if debug: print(f"\t\t\t\t\t\t\tattempting to open {truly_local_frame_path} for read...")
with fileio.open_file_read(truly_local_frame_path) as f_truly_local_frame:
buffer = f_truly_local_frame.read()
if debug: print(f"\t\t\t\t\t\t\t\tSUCCESS")
if debug: print(f"\t\t\t\t\t\t\t\t\tattempting to open {local_frame_path} for final write...")
with fileio.open_file_write(local_frame_path) as f_frame_final:
f_frame_final.write(buffer)
f_frame_final.close()
if debug: print(f"\t\t\t\t\t\t\t\t\t\tSUCCESS")
buffer = None
f_truly_local_frame.close()
else:
if debug: print(f"\t\t\t\t\t\t\t\t\tattempting to open {local_frame_path} for final write...")
cv2.imwrite(local_frame_path, frame)
if debug: print(f"\t\t\t\t\t\t\t\t\t\tSUCCESS")
We are now ready to execute the "download-videos-extract-frames" pipeline. But first we must...
#### Create the "download-videos-extract-frames" pipeline execution graph
```python
job_suffix = 'download-videos-extract-frames'
job_name = f"{PIPELINE_BASE_JOB_NAME}--{job_suffix}"
options.update({
'job_name': job_name
})
pipeline_options = PipelineOptions(flags=[], **options) # easier to pass in options from command-line this way
print(f"PipelineOptions:\n{pipeline_options.get_all_options()}\n")
pl = beam.Pipeline(options=pipeline_options)
full_target_vid_index_schemad_pcoll = beam__common.pl__1__read_target_vid_index_csv(pl)
filtered_target_vid_index_schemad_pcoll = data_extractor__beam.pl__2__filter_target_vid_index(full_target_vid_index_schemad_pcoll, pl._options._all_options)
merged_download_results = data_extractor__beam.pl__3__parallel_download_videos(filtered_target_vid_index_schemad_pcoll, pl._options._all_options, n_partitions)
merged_extraction_results = data_extractor__beam.pl__4__parallel_extract_target_video_frames(merged_download_results, pl._options._all_options, n_partitions)
```
PipelineOptions:
{'runner': 'InteractiveRunner', 'streaming': False, 'beam_services': {}, 'type_check_strictness': 'DEFAULT_TO_ANY', 'type_check_additional': '', 'pipeline_type_check': True, 'runtime_type_check': False, 'performance_runtime_type_check': False, 'direct_runner_use_stacked_bundle': True, 'direct_runner_bundle_repeat': 0, 'direct_num_workers': 0, 'direct_running_mode': 'multi_threading', 'dataflow_endpoint': 'https://dataflow.googleapis.com', 'project': 'sc-fids-capstone', 'job_name': 'sc-fids-capstone-etl-demo--download-videos-extract-frames', 'staging_location': None, 'temp_location': None, 'region': None, 'service_account_email': None, 'no_auth': False, 'template_location': None, 'labels': None, 'update': False, 'transform_name_mapping': None, 'enable_streaming_engine': False, 'dataflow_kms_key': None, 'flexrs_goal': None, 'hdfs_host': None, 'hdfs_port': None, 'hdfs_user': None, 'hdfs_full_urls': False, 'num_workers': None, 'max_num_workers': None, 'autoscaling_algorithm': None, 'machine_type': None, 'disk_size_gb': None, 'disk_type': None, 'worker_region': None, 'worker_zone': None, 'zone': None, 'network': None, 'subnetwork': None, 'worker_harness_container_image': None, 'sdk_harness_container_image_overrides': None, 'use_public_ips': None, 'min_cpu_platform': None, 'dataflow_worker_jar': None, 'dataflow_job_file': None, 'experiments': None, 'number_of_worker_harness_threads': None, 'profile_cpu': False, 'profile_memory': False, 'profile_location': None, 'profile_sample_rate': 1.0, 'requirements_file': None, 'requirements_cache': None, 'setup_file': None, 'beam_plugins': None, 'save_main_session': False, 'sdk_location': 'default', 'extra_packages': None, 'prebuild_sdk_container_engine': None, 'prebuild_sdk_container_base_image': None, 'docker_registry_push_url': None, 'job_endpoint': None, 'artifact_endpoint': None, 'job_server_timeout': 60, 'environment_type': 'DOCKER', 'environment_config': None, 'environment_options': None, 'sdk_worker_parallelism': 1, 'environment_cache_millis': 0, 'output_executable_path': None, 'artifacts_dir': None, 'job_port': 0, 'artifact_port': 0, 'expansion_port': 0, 'flink_master': '[auto]', 'flink_version': '1.10', 'flink_job_server_jar': None, 'flink_submit_uber_jar': False, 'spark_master_url': 'local[4]', 'spark_job_server_jar': None, 'spark_submit_uber_jar': False, 'spark_rest_url': None, 'on_success_matcher': None, 'dry_run': False, 'wait_until_finish_duration': None, 'pubsubRootUrl': None, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_session_token': None, 's3_endpoint_url': None, 's3_region_name': None, 's3_api_version': None, 's3_verify': None, 's3_disable_ssl': False, 'fidscs_capstone_max_target_videos': 50, 'fidscs_capstone_work_dir': '/tmp', 'fidscs_capstone_data_dir': '/tmp/data', 'fidscs_capstone_tmp_dir': '/tmp/data/tmp', 'fidscs_capstone_videos_dir': '/tmp/data/videos', 'fidscs_capstone_stitched_video_frames_dir': '/tmp/data/stitched_video_frames', 'fidscs_capstone_corpus_dir': '/tmp/data/tmp/ncslgr-xml', 'fidscs_capstone_corpus_ds_path': '/tmp/data/ncslgr-corpus-index.csv', 'fidscs_capstone_document_asl_cconsultant_ds_path': '/tmp/data/document-consultant-index.csv', 'fidscs_capstone_asl_consultant_ds_path': '/tmp/data/consultant-index.csv', 'fidscs_capstone_video_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'fidscs_capstone_selected_video_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'fidscs_capstone_video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv', 'fidscs_capstone_video_segment_ds_path': '/tmp/data/document-consultant-targetvideo-segment-index.csv', 'fidscs_capstone_video_frame_ds_path': '/tmp/data/document-consultant-targetvideo-frame-index.csv', 'fidscs_capstone_utterance_ds_path': '/tmp/data/document-consultant-utterance-index.csv', 'fidscs_capstone_utterance_video_ds_path': '/tmp/data/document-consultant-utterance-targetvideo-index.csv', 'fidscs_capstone_utterance_token_ds_path': '/tmp/data/document-consultant-utterance-token-index.csv', 'fidscs_capstone_utterance_token_frame_ds_path': '/tmp/data/document-consultant-targetvideo-utterance-token-frame-index.csv', 'fidscs_capstone_vocabulary_ds_path': '/tmp/data/vocabulary-index.csv'}
This time we would like to observe the results (collected into Pandas `DataFrame`s)...
```python
# we require this in order to make use of ib.show() (which provides visualization of the pcolls specified) or ib.collect() (which creates a pandas dataframe from a pcoll)
# but all pcolls we wish to visualize must be created prior to executing the following line
ib.watch(locals())
```
And calling `ib.collect` forces the pipeline to actually run...
#### Run the full "download-videos-extract-frames" pipeline
We do this by collecting `Pcollection`s into Pandas `DataFrame`s for viewing with *Interactive Beam*.
```python
print(f"\n\n****************************** Starting pipeline job: {job_name} ******************************")
df_full_target_vid_index_schemad_pcoll = ib.collect(full_target_vid_index_schemad_pcoll)
df_filtered_target_vid_index_schemad_pcoll = ib.collect(filtered_target_vid_index_schemad_pcoll)
df_merged_download_results = ib.collect(merged_download_results)
df_merged_extraction_results = ib.collect(merged_extraction_results)
print(f"****************************** Finished pipeline job: {job_name} ******************************")
```
****************************** Starting pipeline job: sc-fids-capstone-etl-demo--download-videos-extract-frames ******************************
target_video_filename
video_seq_id
perspective_cam_id
compressed_mov_url
uncompressed_avi_url
uncompressed_avi_mirror_1_url
uncompressed_avi_mirror_2_url
0
_1508_small_3.mov
1508
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
1
_1510_small_0.mov
1510
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
2
_1510_small_1.mov
1510
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
3
_1510_small_3.mov
1510
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
4
_1512_small_0.mov
1512
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
5
_1512_small_1.mov
1512
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
6
_1512_small_3.mov
1512
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
7
_1513_small_0.mov
1513
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
8
_1513_small_1.mov
1513
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
9
_1513_small_3.mov
1513
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
10
_1516_small_0.mov
1516
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
11
_1516_small_1.mov
1516
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
12
_1516_small_3.mov
1516
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
13
_1522_small_0.mov
1522
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
14
_1522_small_1.mov
1522
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
15
_1522_small_3.mov
1522
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
16
_1523_small_0.mov
1523
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
17
_1523_small_1.mov
1523
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
18
_1523_small_3.mov
1523
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
19
_1524_small_0.mov
1524
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
20
_1524_small_1.mov
1524
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
21
_1524_small_3.mov
1524
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
22
_1525_small_0.mov
1525
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
23
_1525_small_1.mov
1525
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
24
_1525_small_3.mov
1525
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
25
_1526_small_0.mov
1526
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
26
_1526_small_1.mov
1526
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
27
_1526_small_3.mov
1526
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
28
_1531_small_0.mov
1531
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
29
_1531_small_1.mov
1531
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
30
_1531_small_3.mov
1531
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
31
_1532_small_0.mov
1532
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
32
_1532_small_1.mov
1532
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
33
_1532_small_3.mov
1532
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
34
_1533_small_0.mov
1533
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
35
_1533_small_1.mov
1533
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
36
_1533_small_3.mov
1533
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
37
_1535_small_0.mov
1535
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
38
_1535_small_1.mov
1535
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
39
_1535_small_3.mov
1535
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
40
_1536_small_0.mov
1536
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
41
_1536_small_1.mov
1536
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
42
_1536_small_3.mov
1536
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
43
_1537_small_0.mov
1537
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
44
_1537_small_1.mov
1537
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
45
_1537_small_3.mov
1537
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
46
_1538_small_0.mov
1538
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
47
_1538_small_1.mov
1538
1
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
48
_1538_small_3.mov
1538
3
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
49
_1539_small_0.mov
1539
0
http://csr.bu.edu/asl/private/downloads/2002_8...
http://csr.bu.edu/asl0/working/tapes3/2002_08_...
target_video_fname
target_video_frames_dir
segment_url
segment_fname
0
_1508_small_3.mov
/tmp/data/stitched_video_frames/_1508_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1508_small_3.mov
1
_1513_small_1.mov
/tmp/data/stitched_video_frames/_1513_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1513_small_1.mov
2
_1523_small_0.mov
/tmp/data/stitched_video_frames/_1523_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1523_small_0.mov
3
_1525_small_3.mov
/tmp/data/stitched_video_frames/_1525_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1525_small_3.mov
4
_1532_small_1.mov
/tmp/data/stitched_video_frames/_1532_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1532_small_1.mov
5
_1536_small_0.mov
/tmp/data/stitched_video_frames/_1536_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1536_small_0.mov
6
_1538_small_3.mov
/tmp/data/stitched_video_frames/_1538_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1538_small_3.mov
7
_1512_small_0.mov
/tmp/data/stitched_video_frames/_1512_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1512_small_0.mov
8
_1516_small_3.mov
/tmp/data/stitched_video_frames/_1516_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1516_small_3.mov
9
_1524_small_1.mov
/tmp/data/stitched_video_frames/_1524_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1524_small_1.mov
10
_1531_small_0.mov
/tmp/data/stitched_video_frames/_1531_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1531_small_0.mov
11
_1533_small_3.mov
/tmp/data/stitched_video_frames/_1533_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1533_small_3.mov
12
_1537_small_1.mov
/tmp/data/stitched_video_frames/_1537_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1537_small_1.mov
13
_1510_small_1.mov
/tmp/data/stitched_video_frames/_1510_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1510_small_1.mov
14
_1516_small_0.mov
/tmp/data/stitched_video_frames/_1516_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1516_small_0.mov
15
_1523_small_3.mov
/tmp/data/stitched_video_frames/_1523_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1523_small_3.mov
16
_1526_small_1.mov
/tmp/data/stitched_video_frames/_1526_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1526_small_1.mov
17
_1533_small_0.mov
/tmp/data/stitched_video_frames/_1533_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1533_small_0.mov
18
_1536_small_3.mov
/tmp/data/stitched_video_frames/_1536_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1536_small_3.mov
19
_1510_small_0.mov
/tmp/data/stitched_video_frames/_1510_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1510_small_0.mov
20
_1513_small_3.mov
/tmp/data/stitched_video_frames/_1513_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1513_small_3.mov
21
_1523_small_1.mov
/tmp/data/stitched_video_frames/_1523_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1523_small_1.mov
22
_1526_small_0.mov
/tmp/data/stitched_video_frames/_1526_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1526_small_0.mov
23
_1532_small_3.mov
/tmp/data/stitched_video_frames/_1532_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1532_small_3.mov
24
_1536_small_1.mov
/tmp/data/stitched_video_frames/_1536_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1536_small_1.mov
25
_1539_small_0.mov
/tmp/data/stitched_video_frames/_1539_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1539_small_0.mov
26
_1512_small_3.mov
/tmp/data/stitched_video_frames/_1512_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1512_small_3.mov
27
_1522_small_1.mov
/tmp/data/stitched_video_frames/_1522_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1522_small_1.mov
28
_1525_small_0.mov
/tmp/data/stitched_video_frames/_1525_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1525_small_0.mov
29
_1531_small_3.mov
/tmp/data/stitched_video_frames/_1531_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1531_small_3.mov
30
_1535_small_1.mov
/tmp/data/stitched_video_frames/_1535_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1535_small_1.mov
31
_1538_small_0.mov
/tmp/data/stitched_video_frames/_1538_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1538_small_0.mov
32
_1510_small_3.mov
/tmp/data/stitched_video_frames/_1510_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1510_small_3.mov
33
_1516_small_1.mov
/tmp/data/stitched_video_frames/_1516_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1516_small_1.mov
34
_1524_small_0.mov
/tmp/data/stitched_video_frames/_1524_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1524_small_0.mov
35
_1526_small_3.mov
/tmp/data/stitched_video_frames/_1526_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1526_small_3.mov
36
_1533_small_1.mov
/tmp/data/stitched_video_frames/_1533_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1533_small_1.mov
37
_1537_small_0.mov
/tmp/data/stitched_video_frames/_1537_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1537_small_0.mov
38
_1512_small_1.mov
/tmp/data/stitched_video_frames/_1512_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1512_small_1.mov
39
_1522_small_0.mov
/tmp/data/stitched_video_frames/_1522_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1522_small_0.mov
40
_1524_small_3.mov
/tmp/data/stitched_video_frames/_1524_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1524_small_3.mov
41
_1531_small_1.mov
/tmp/data/stitched_video_frames/_1531_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1531_small_1.mov
42
_1535_small_0.mov
/tmp/data/stitched_video_frames/_1535_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1535_small_0.mov
43
_1537_small_3.mov
/tmp/data/stitched_video_frames/_1537_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1537_small_3.mov
44
_1513_small_0.mov
/tmp/data/stitched_video_frames/_1513_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1513_small_0.mov
45
_1522_small_3.mov
/tmp/data/stitched_video_frames/_1522_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1522_small_3.mov
46
_1525_small_1.mov
/tmp/data/stitched_video_frames/_1525_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1525_small_1.mov
47
_1532_small_0.mov
/tmp/data/stitched_video_frames/_1532_small_0
http://csr.bu.edu/asl/private/downloads/2002_8...
_1532_small_0.mov
48
_1535_small_3.mov
/tmp/data/stitched_video_frames/_1535_small_3
http://csr.bu.edu/asl/private/downloads/2002_8...
_1535_small_3.mov
49
_1538_small_1.mov
/tmp/data/stitched_video_frames/_1538_small_1
http://csr.bu.edu/asl/private/downloads/2002_8...
_1538_small_1.mov
segment_fname
frames
segment_dicts
0
_1523_small_3.mov
129
[{'target_video_fname': '_1523_small_3.mov', '...
1
_1531_small_0.mov
121
[{'target_video_fname': '_1531_small_0.mov', '...
2
_1537_small_0.mov
71
[{'target_video_fname': '_1537_small_0.mov', '...
3
_1523_small_1.mov
129
[{'target_video_fname': '_1523_small_1.mov', '...
4
_1525_small_1.mov
71
[{'target_video_fname': '_1525_small_1.mov', '...
5
_1525_small_0.mov
71
[{'target_video_fname': '_1525_small_0.mov', '...
6
_1538_small_1.mov
89
[{'target_video_fname': '_1538_small_1.mov', '...
7
_1536_small_3.mov
81
[{'target_video_fname': '_1536_small_3.mov', '...
8
_1537_small_1.mov
71
[{'target_video_fname': '_1537_small_1.mov', '...
9
_1513_small_3.mov
83
[{'target_video_fname': '_1513_small_3.mov', '...
10
_1524_small_3.mov
65
[{'target_video_fname': '_1524_small_3.mov', '...
11
_1513_small_1.mov
83
[{'target_video_fname': '_1513_small_1.mov', '...
12
_1536_small_0.mov
81
[{'target_video_fname': '_1536_small_0.mov', '...
13
_1516_small_3.mov
81
[{'target_video_fname': '_1516_small_3.mov', '...
14
_1533_small_3.mov
93
[{'target_video_fname': '_1533_small_3.mov', '...
15
_1510_small_0.mov
89
[{'target_video_fname': '_1510_small_0.mov', '...
16
_1512_small_3.mov
85
[{'target_video_fname': '_1512_small_3.mov', '...
17
_1523_small_0.mov
129
[{'target_video_fname': '_1523_small_0.mov', '...
18
_1531_small_3.mov
121
[{'target_video_fname': '_1531_small_3.mov', '...
19
_1526_small_1.mov
81
[{'target_video_fname': '_1526_small_1.mov', '...
20
_1524_small_0.mov
65
[{'target_video_fname': '_1524_small_0.mov', '...
21
_1532_small_3.mov
63
[{'target_video_fname': '_1532_small_3.mov', '...
22
_1508_small_3.mov
89
[{'target_video_fname': '_1508_small_3.mov', '...
23
_1522_small_1.mov
81
[{'target_video_fname': '_1522_small_1.mov', '...
24
_1531_small_1.mov
121
[{'target_video_fname': '_1531_small_1.mov', '...
25
_1535_small_3.mov
111
[{'target_video_fname': '_1535_small_3.mov', '...
26
_1510_small_1.mov
89
[{'target_video_fname': '_1510_small_1.mov', '...
27
_1524_small_1.mov
65
[{'target_video_fname': '_1524_small_1.mov', '...
28
_1533_small_1.mov
93
[{'target_video_fname': '_1533_small_1.mov', '...
29
_1512_small_1.mov
85
[{'target_video_fname': '_1512_small_1.mov', '...
30
_1538_small_0.mov
89
[{'target_video_fname': '_1538_small_0.mov', '...
31
_1538_small_3.mov
89
[{'target_video_fname': '_1538_small_3.mov', '...
32
_1516_small_0.mov
81
[{'target_video_fname': '_1516_small_0.mov', '...
33
_1516_small_1.mov
81
[{'target_video_fname': '_1516_small_1.mov', '...
34
_1536_small_1.mov
81
[{'target_video_fname': '_1536_small_1.mov', '...
35
_1535_small_0.mov
111
[{'target_video_fname': '_1535_small_0.mov', '...
36
_1525_small_3.mov
71
[{'target_video_fname': '_1525_small_3.mov', '...
37
_1535_small_1.mov
111
[{'target_video_fname': '_1535_small_1.mov', '...
38
_1512_small_0.mov
85
[{'target_video_fname': '_1512_small_0.mov', '...
39
_1526_small_3.mov
81
[{'target_video_fname': '_1526_small_3.mov', '...
40
_1537_small_3.mov
71
[{'target_video_fname': '_1537_small_3.mov', '...
41
_1532_small_1.mov
63
[{'target_video_fname': '_1532_small_1.mov', '...
42
_1522_small_0.mov
81
[{'target_video_fname': '_1522_small_0.mov', '...
43
_1513_small_0.mov
83
[{'target_video_fname': '_1513_small_0.mov', '...
44
_1533_small_0.mov
93
[{'target_video_fname': '_1533_small_0.mov', '...
45
_1510_small_3.mov
89
[{'target_video_fname': '_1510_small_3.mov', '...
46
_1526_small_0.mov
81
[{'target_video_fname': '_1526_small_0.mov', '...
47
_1539_small_0.mov
111
[{'target_video_fname': '_1539_small_0.mov', '...
48
_1532_small_0.mov
63
[{'target_video_fname': '_1532_small_0.mov', '...
49
_1522_small_3.mov
81
[{'target_video_fname': '_1522_small_3.mov', '...
## Conclusion
This notebook hopefully adequately demonstrated the power of data processing parallelism that can be accomplished by using Apache Beam. Remember, the full pipeline is executed in the GCP Dataflow environment. This project was really only feasible doing it in that environment.
The following statistics demonstrate this.
### Trying to do it locally, without Apache Beam
As I said when I began this summary, when I first started my work on this project, I did some initial testing using the Pandas `DataFrame` approach.
First, I ran out of space and my MacBook Pro was very upset with me.
Second, the sequence didn't complete.
Third, I let the process run for more than 18 hours.
When I rebooted, I found the sequence had only progressed about 20% of the way through the entire list of videos.
Of course, since I required a way to relate other information about the videos and frames, such as the data about the ASL consultant, camera perspective, etc., I had the "brilliant" idea to stuff each and every image byte into a table (csv). No wonder I ran out of space.
I estimated that had I let it finish, assuming I had enough local disk space to do so, it would have taken close to a week to do so.
### And with Apache Beam using GCP Dataflow as the runner, coupled with GCS for storage
Whereas, **by executing the pipeline in GCP Dataflow, the entire pipeline took a little over 4 hours to complete, including the validation-train split** (which will be discussed in another blog post.). See the snapshot below.
![Dataflow-Jobs-All-ETL](./Dataflow-Jobs-All-ETL.png)
#### Other Statistics
##### Total Frame Count: 561,000+
##### Total Disk Space: 17 GB+
##### 19 Datasets
But only 2 are used to train the DNN.
Please keep in mind that **I did this, basically at half resolution, at 30 FPS when the videos were produced at 60 FPS**.
**I had to compromise *somewhere***.
### Final Report
The total calendar time from start to finish was a little over four months.
That seems like a long time. But this was a MASSIVE project and before I realized I bit off more than I could chew initially, I was already committed.
So I persevered, did A LOT of research, and learned how to do Big Data.
During that time frame I worked on this nearly every day, taking time off to sleep and when I needed a sanity check.
Most of the time I worked an average of probably at least 10 hours per day.
One thing I will say is that Apache Beam is not for the light-hearted.
It is fairly low-level. This is not the Pandas framework, folks.
One needs to build nearly everything oneself.
BUT... the catch is that one can do Big Data. One can do data processing the likes of which would not otherwise be possible on a single, local machine.
That's BIG! It's not called "Big Data" for nothing.
In spite of all the pain, I give Apache Beam 5 out of 5 stars.
It was well worth the blood, sweat, and tears.
So... thank you and goodnight!