Big Data: Parallel Processing with Apache Beam and GCP Dataflow

Posted by Steven Contreras on February 23, 2021

Introduction

Welcome to my first “Big Data”(ish) project on GCP Dataflow.

This was a Deep Learning project wherein I tackled attempting to train a DNN (a Convolutional Neural Network followed by a Recurrant Neural Network, an LSTM specifically).

The datasets were extracted from more than 2600 videos produced by the research conducted by Boston and Rutgers Universities, jointly under the National Center for Sign Language and Gesture Resources project.

Each video has up to three camera perspectives synchronized from obviously three independent video cameras.

The researchers produced corpus documents in the form of XML and linked metadata therein to corresponding videos. Videos were processed with the OpenCV Python library.

After all data was extracted, this resulted in 3,314 distinct “tokens”. One distinct ASL sign = one lingustic token or “word”, which is the rough equivalent of an English word plus other ASL-specific linguistic aspects. There may not always be a direct translation since an ASL utterance is comprised of much more than a simple English word. I am not a linguist so I will just stop right there.

But the focus of the FIRST part of my project - captured by the source code in this repo and this summary - focuses on the “Big Data” aspect. I will start with how I end.

When I began this summary, I had no idea what I was in for. In the end, there are more than 2600 videos to process, which amount to more than 561,000 frames, assuming a frame-rate of 30 FPS, which is only half the frame-rate at which the videos were originally recorded.

After frame extraction, data needed to be related to each and every one of those 561,000+ frames, which was NOT done by the researchers. This, I had to do myself.

So this part of my project focuses on making that pipeline doable. It had to be done using cloud storage and a parallel data processing framework. I chose Apache Beam using GCP Dataflow as the runner in the end - Apache Beam since its “raison d’etre” is portability to other runners, including local and interactive (Jupyter Notebooks).

I will leave the report of my initial failures for the end.

This write-up is more concerned with my eventual success using Apache Beam on GCP Dataflow.

So, without further ado, let me dive right in!

Preliminaries

This notebook assumes that all setup steps from GCP-readme.md have been followed.

This notebook does not demonstrate the full ETL pipeline used in production (executed on GCP Dataflow).

We only investigate the first couple of major steps of the production pipeline (which is executed in Dataflow on the Google Cloud Platform) in order to demonstrate the general idea of the process. Note that the production pipeline is really a sequence of (sub) pipelines that are daisychained together in a particular order, since latter pipelines depend on former pipelines.

In this notebook, we demonstrate the first two pipelines, which accomplish the following:

1. Boostrap the video index

Substeps are:

1. Download the video index (archive)
2. Extract it.
3. Write it to the destination directory as a CSV

The video index drives the entire full-blown production pipeline.

It tells us the filenames of the target videos.


### 2. Download each video segment comprising final target videos. (The video index contains the URLs). Target videos are comprised of video segments since some of the final target videos can be rather large. Altgether, **the *production* pipeline (executed on GCP Dataflow) retrieves more than 2600 videos** produced by the research conducted by Boston and Rutgers Universities, jointly under the [National Center for Sign Language and Gesture Resources project](http://www.bu.edu/asllrp/ncslgr.html). This notebook will demonstrate retrieving 50 of those. The implementation of the download shall leverage Apache Beam's parallelism in order to avoid the amount of time it would take to accomplish doing it sequentially. Note that when executed locally, the Apache Beam SDK uses Docker containers for worker node clusters. A cluster in this case consists of 8 workers nodes since my local machine has 8 cores. In *production*, on GCP Dataflow, this can be scheduled to your heart's content (but this, of course, costs more money to do so).


### 3. Use the `OpenCV` library to extract all frames (from each segment) for each target video. This step leverages Apache Beam's parallelism as well. But we MUST take care to ensure that a single worker extracts the frames of each segment associated with the target video. This is because frames are ordered/sequenced. Allowing two different workers to extract frames of different segments associated with the same final target video would likely result in frames being extracted out of order (due to parallelism). Therefore, we partition the extraction task by final target video in order to ensure a single worker handles all segments associated with a single target video. But we do want parallelism to occurr at the final target video level. In the end, **in production, the pipeline extracts more than 561,000 frames (images) from the source target videos**! Of course in this demonstration we will be extracting much less - only 50 out of the more than 2600 videos available will be downloaded and processed (frames extracted). Still, extracting from 50 videos will amount to thousands of frames. ```python %load_ext autoreload %autoreload 2 from __future__ import absolute_import import apache_beam as beam import apache_beam.runners.interactive.interactive_beam as ib from api import beam__common, fileio, fidscs_globals from api.fidscs_globals import disp_source from api import data_extractor__beam from apache_beam.options.pipeline_options import PipelineOptions ``` ## Constants to be used in this notebook ```python WORK_DIR = '/tmp' MAX_TARGET_VIDEOS = 50 # set to -1 for all in production but not in this interactive notebook! That will result in extracting more than 561,000 images from more than 2600 videos! (onto your local machine) PIPELINE_BASE_JOB_NAME = 'sc-fids-capstone-etl-demo' ``` ## Use Apache Beam PipelineOptions for any global settings We MUST do this since the point of Apache Beam is to enable parallelism (in processing). How this is accomplished is beyond the scope of this notebook. But suffice it to say that any notion of a global variable cannot be implemented in the manner one is normally implemented - e.g. with Python global variables. However, a PipelineOptions object IS passed to each and every worker node by Apache Beam. Therefore, we accomplish global settings to be shared by all workers - e.g. the working directory and the final destination filepaths to be output by the pipeline - by passing would-be global settings to PipelineOptions, which are required to bootstrap each worker node by Apache Beam. ### Custom Apache Beam Pipeline options The `beam__common.FIDSCapstonePipelineOptions` class was written to do just that and allows us to create and use our own custom options in Apache Beam pipelines. Without it, attempting to set custom options on the Pipeline will fail since Apache Beam's PipelineOptions class will reject any options it doesn't recognize. ```python disp_source(beam__common.FIDSCapstonePipelineOptions) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

class FIDSCapstonePipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_MAX_TARGET_VIDEOS)}',
default=None
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_WORK_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_DATA_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_TMP_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_CORPUS_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_CORPUS_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_DOCUMENT_ASL_CONSULTANT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_ASL_CONSULTANT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_SEGMENT_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VIDEO_FRAME_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_VIDEO_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_FRAME_DS_PATH)}',
default=None,
)
parser.add_argument(
f'--{opt_name_to_command_line_opt(fidscs_globals.OPT_NAME_VOCABULARY_DS_PATH)}',
default=None,
)
### PipelineOptions Initialization For this notebook, we execute locally (vs. GCP Dataflow) - that is, we use the Apache Beam's `DirectRunner`. Actually, we use a variant of - the `InteractiveRunner` - geared specifically for running in notebooks. But it is still run locally. Some `PipelineOptions` options differ (or are not needed), relative to the `DataflowRunner`. To see the full implementation on how this differs from using the `Dataflow` runner, start by inspecting [run_cloud__etl.py](./run_cloud__etl.py) and follow the code. Initializing the `dict` upon which `PipelineOptions` are based has been wrapped up within the `beam__common.make_fids_options_dict` function. ```python disp_source(beam__common.make_fids_options_dict) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def make_fids_options_dict(work_dir, max_target_videos=-1, beam_gcp_project=fidscs_globals.GCP_PROJECT):
data_dir = fileio.path_join(work_dir, fidscs_globals.DATA_DIR_NAME)
tmp_dir = fileio.path_join(data_dir, fidscs_globals.TMP_DIR_NAME)
videos_dir = fileio.path_join(data_dir, fidscs_globals.VIDEO_DIR_NAME)
stitched_video_frames_dir = fileio.path_join(data_dir, fidscs_globals.STICHED_VIDEO_FRAMES_DIR_NAME)
corpus_dir = fileio.path_join(tmp_dir, fidscs_globals.CORPUS_BASE)
corpus_ds_path = fileio.path_join(data_dir, fidscs_globals.CORPUS_DS_FNAME)
document_asl_cconsultant_ds_path = fileio.path_join(data_dir, fidscs_globals.DOCUMENT_ASL_CONSULTANT_DS_FNAME)
asl_consultant_ds_path = fileio.path_join(data_dir, fidscs_globals.ASL_CONSULTANT_DS_FNAME)
video_indexes_dir = fileio.path_join(tmp_dir, fidscs_globals.VIDEO_INDEX_BASE)
selected_video_index_path = fileio.path_join(video_indexes_dir, fidscs_globals.SELECTED_VIDEO_INDEX)
video_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_DS_FNAME)
video_segment_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_SEGMENT_DS_FNAME)
video_frame_ds_path = fileio.path_join(data_dir, fidscs_globals.VIDEO_FRAME_DS_FNAME)
utterance_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_DS_FNAME)
utterance_video_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_VIDEO_DS_FNAME)
utterance_token_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_TOKEN_DS_FNAME)
utterance_token_frame_ds_path = fileio.path_join(data_dir, fidscs_globals.UTTERANCE_TOKEN_FRAME_DS_FNAME)
vocabulary_ds_path = fileio.path_join(data_dir, fidscs_globals.VOCABULARY_DS_FNAME)
return {
fidscs_globals.OPT_NAME_PROJECT: beam_gcp_project,
fidscs_globals.OPT_NAME_MAX_TARGET_VIDEOS: max_target_videos,
fidscs_globals.OPT_NAME_WORK_DIR: work_dir,
fidscs_globals.OPT_NAME_DATA_DIR: data_dir,
fidscs_globals.OPT_NAME_TMP_DIR: tmp_dir,
fidscs_globals.OPT_NAME_VIDEO_DIR: videos_dir,
fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR: stitched_video_frames_dir,
fidscs_globals.OPT_NAME_CORPUS_DIR: corpus_dir,
fidscs_globals.OPT_NAME_CORPUS_DS_PATH: corpus_ds_path,
fidscs_globals.OPT_NAME_DOCUMENT_ASL_CONSULTANT_DS_PATH: document_asl_cconsultant_ds_path,
fidscs_globals.OPT_NAME_ASL_CONSULTANT_DS_PATH: asl_consultant_ds_path,
fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR: video_indexes_dir,
fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH: selected_video_index_path,
fidscs_globals.OPT_NAME_VIDEO_DS_PATH: video_ds_path,
fidscs_globals.OPT_NAME_VIDEO_SEGMENT_DS_PATH: video_segment_ds_path,
fidscs_globals.OPT_NAME_VIDEO_FRAME_DS_PATH: video_frame_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_DS_PATH: utterance_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_VIDEO_DS_PATH: utterance_video_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_DS_PATH: utterance_token_ds_path,
fidscs_globals.OPT_NAME_UTTERANCE_TOKEN_FRAME_DS_PATH: utterance_token_frame_ds_path,
fidscs_globals.OPT_NAME_VOCABULARY_DS_PATH: vocabulary_ds_path
}
#### PipelineOptions geared for the `InteractiveRunner` Note that `InteractiveRunner` is a variant of the `DirectRunner` that allows us to run an Apache Beam pipeline with a Jupyter Notebook. Documentation for `InteractiveRunner` can be found [here](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development). First, it must be reiterated that we must use this runner in order to collect (reduce) data kept in Apache Beam `Pcollection`s for conversion to Pandas `DataFrame`s for display within this notebook. Apache Beam `Pcollection`s can generally and *roughly* be thought of as Resilient Distributed Datasets. The documentation for Apache Beam `Pcollection`s can be found in the **Apache Beam Programming Guide** located [here](https://beam.apache.org/documentation/programming-guide/). But **`Pcollection`s are the basis for all processing within an Apache Beam pipeline**. Also note that the `InteractiveRunner` is not really meant to be used for enterprise (read: "Big Data") pipelines. The runner used for production in this project is the `DataFlow` Google Cloud Platform runner. The reader is reminded that the point of this notebook, however, is to present a demonstration of only a subset of the full Apache Beam pipeline (used in this project). ```python options = { # 'runner': 'DirectRunner', 'runner': 'InteractiveRunner', 'environment_type': 'DOCKER', 'direct_num_workers': 0, # 0 is use all available cores 'direct_running_mode': 'multi_threading', # ['in_memory', 'multi_threading', 'multi_processing'] 'streaming': False # set to True if data source is unbounded (e.g. GCP PubSub), } options.update(beam__common.make_fids_options_dict(WORK_DIR, max_target_videos=MAX_TARGET_VIDEOS)) ``` #### Finally, instantiate the `PipelineOptions` (using the above `options` `dict`) ```python job_suffix = 'boostrap-vid-index' job_name = f"{PIPELINE_BASE_JOB_NAME}--{job_suffix}" options.update({ 'job_name': job_name }) pipeline_options = PipelineOptions(flags=[], **options) # easier to pass in options from command-line this way print(f"PipelineOptions:\n{pipeline_options.get_all_options()}\n") ``` PipelineOptions: {'runner': 'InteractiveRunner', 'streaming': False, 'beam_services': {}, 'type_check_strictness': 'DEFAULT_TO_ANY', 'type_check_additional': '', 'pipeline_type_check': True, 'runtime_type_check': False, 'performance_runtime_type_check': False, 'direct_runner_use_stacked_bundle': True, 'direct_runner_bundle_repeat': 0, 'direct_num_workers': 0, 'direct_running_mode': 'multi_threading', 'dataflow_endpoint': 'https://dataflow.googleapis.com', 'project': 'sc-fids-capstone', 'job_name': 'sc-fids-capstone-etl-demo--boostrap-vid-index', 'staging_location': None, 'temp_location': None, 'region': None, 'service_account_email': None, 'no_auth': False, 'template_location': None, 'labels': None, 'update': False, 'transform_name_mapping': None, 'enable_streaming_engine': False, 'dataflow_kms_key': None, 'flexrs_goal': None, 'hdfs_host': None, 'hdfs_port': None, 'hdfs_user': None, 'hdfs_full_urls': False, 'num_workers': None, 'max_num_workers': None, 'autoscaling_algorithm': None, 'machine_type': None, 'disk_size_gb': None, 'disk_type': None, 'worker_region': None, 'worker_zone': None, 'zone': None, 'network': None, 'subnetwork': None, 'worker_harness_container_image': None, 'sdk_harness_container_image_overrides': None, 'use_public_ips': None, 'min_cpu_platform': None, 'dataflow_worker_jar': None, 'dataflow_job_file': None, 'experiments': None, 'number_of_worker_harness_threads': None, 'profile_cpu': False, 'profile_memory': False, 'profile_location': None, 'profile_sample_rate': 1.0, 'requirements_file': None, 'requirements_cache': None, 'setup_file': None, 'beam_plugins': None, 'save_main_session': False, 'sdk_location': 'default', 'extra_packages': None, 'prebuild_sdk_container_engine': None, 'prebuild_sdk_container_base_image': None, 'docker_registry_push_url': None, 'job_endpoint': None, 'artifact_endpoint': None, 'job_server_timeout': 60, 'environment_type': 'DOCKER', 'environment_config': None, 'environment_options': None, 'sdk_worker_parallelism': 1, 'environment_cache_millis': 0, 'output_executable_path': None, 'artifacts_dir': None, 'job_port': 0, 'artifact_port': 0, 'expansion_port': 0, 'flink_master': '[auto]', 'flink_version': '1.10', 'flink_job_server_jar': None, 'flink_submit_uber_jar': False, 'spark_master_url': 'local[4]', 'spark_job_server_jar': None, 'spark_submit_uber_jar': False, 'spark_rest_url': None, 'on_success_matcher': None, 'dry_run': False, 'wait_until_finish_duration': None, 'pubsubRootUrl': None, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_session_token': None, 's3_endpoint_url': None, 's3_region_name': None, 's3_api_version': None, 's3_verify': None, 's3_disable_ssl': False, 'fidscs_capstone_max_target_videos': 50, 'fidscs_capstone_work_dir': '/tmp', 'fidscs_capstone_data_dir': '/tmp/data', 'fidscs_capstone_tmp_dir': '/tmp/data/tmp', 'fidscs_capstone_videos_dir': '/tmp/data/videos', 'fidscs_capstone_stitched_video_frames_dir': '/tmp/data/stitched_video_frames', 'fidscs_capstone_corpus_dir': '/tmp/data/tmp/ncslgr-xml', 'fidscs_capstone_corpus_ds_path': '/tmp/data/ncslgr-corpus-index.csv', 'fidscs_capstone_document_asl_cconsultant_ds_path': '/tmp/data/document-consultant-index.csv', 'fidscs_capstone_asl_consultant_ds_path': '/tmp/data/consultant-index.csv', 'fidscs_capstone_video_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'fidscs_capstone_selected_video_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'fidscs_capstone_video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv', 'fidscs_capstone_video_segment_ds_path': '/tmp/data/document-consultant-targetvideo-segment-index.csv', 'fidscs_capstone_video_frame_ds_path': '/tmp/data/document-consultant-targetvideo-frame-index.csv', 'fidscs_capstone_utterance_ds_path': '/tmp/data/document-consultant-utterance-index.csv', 'fidscs_capstone_utterance_video_ds_path': '/tmp/data/document-consultant-utterance-targetvideo-index.csv', 'fidscs_capstone_utterance_token_ds_path': '/tmp/data/document-consultant-utterance-token-index.csv', 'fidscs_capstone_utterance_token_frame_ds_path': '/tmp/data/document-consultant-targetvideo-utterance-token-frame-index.csv', 'fidscs_capstone_vocabulary_ds_path': '/tmp/data/vocabulary-index.csv'} #### But before running the pipeline, create necessary filestructure within `WORK_DIR` ```python if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_DATA_DIR], options)[0]: fileio.make_dirs(options[fidscs_globals.OPT_NAME_DATA_DIR], options) if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_TMP_DIR], options)[0]: fileio.make_dirs(options[fidscs_globals.OPT_NAME_TMP_DIR], options) if not beam__common.dataset_csv_files_exist(options): if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_VIDEO_DIR], options)[0]: fileio.make_dirs(options[fidscs_globals.OPT_NAME_VIDEO_DIR], options) if not fileio.dir_path_exists(options[fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR], options)[0]: fileio.make_dirs(options[fidscs_globals.OPT_NAME_STITCHED_VIDEO_FRAMES_DIR], options) ``` We are now ready to execute the pipeline. But before doing so, let's discuss how it works.


There are two top-level functions used by the "boostrap-vid-index" pipeline, in this order: 1. `data_extractor__beam.pl__1__bootstrap_target_video_index` 2. `data_extractor__beam.pl__2__write_target_vid_index_csv`


Let's examine the source code for `data_extractor__beam.pl__1__bootstrap_target_video_index`... The following python source code illustrates the programming paradigm used in all Apache Beam (stands for **B**atch and Str**eam** processing) pipelines. ```python disp_source(data_extractor__beam.pl__1__bootstrap_target_video_index) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def pl__1__bootstrap_target_video_index(pl):
if not fileio.file_path_exists(pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH], pl._options._all_options)[0]:
sel_vid_index_path = (
pl
| "Beam PL: create initial pcoll containing information for boostrap_target_video_index" >> beam.Create(
[ # one row containing dict of:
# 1. url of video indexes archive
# 2. local destination (path) for the downloaded archive
# 3. local destination (path) which will receive the extracted archive csv files (there are more than one)
# 4. final path to the selected videx index csv
# (note that the dict is not laid out in the above order)
{
'vid_indexes_dir': pl._options._all_options[fidscs_globals.OPT_NAME_VIDEO_INDEXES_DIR],
'sel_vid_index_path': pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH],
'video_indexes_archive': fidscs_globals.VIDEO_INDEXES_ARCHIVE,
'tmp_dir': pl._options._all_options[fidscs_globals.OPT_NAME_TMP_DIR],
'video_ds_path': pl._options._all_options[fidscs_globals.OPT_NAME_VIDEO_DS_PATH]
}
]
)
# | "Beam PL: bootstrap target video index" >> beam.Map(boostrap_target_video_index) # boostrap_target_video_index outputs SELECTED_VIDEO_INDEX_PATH but beam.Map() wraps this in a pcoll and is fed to...
| "Beam PL: bootstrap target video index" >> beam.ParDo(TargetVideoIndexBootstrapper(pl._options._all_options)) # boostrap_target_video_index outputs SELECTED_VIDEO_INDEX_PATH but beam.Map() wraps this in a pcoll and is fed to...
)
else:
sel_vid_index_path = (
pl
| "Beam PL: create initial pcoll containing path to existing sel_vid_index" >> beam.Create([pl._options._all_options[fidscs_globals.OPT_NAME_SELECTED_VIDEO_INDEX_PATH]])
| "Beam PL: print path to existing sel_vid_index" >> beam.ParDo(beam__common.PipelinePcollPrinter(msg="FOUND EXISTING SEL VID INDEX"))
)
full_target_vid_index_schemad_pcoll = (
sel_vid_index_path
| "Beam PL: read video index into pcoll" >> beam.FlatMap(beam__common.load_vid_index_csv)
| "Beam PL: apply schema to video index pcoll" >> beam.Map(
lambda x: beam.Row(
target_video_filename=str(urllib.parse.quote(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[0]])),
video_seq_id=int(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[1]]),
perspective_cam_id=int(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[2]]),
compressed_mov_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[3]]),
uncompressed_avi_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[4]]),
uncompressed_avi_mirror_1_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[5]]),
uncompressed_avi_mirror_2_url=str(x[fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX[6]])
)
)
)
return full_target_vid_index_schemad_pcoll
The gist of `data_extractor__beam.pl__1__bootstrap_target_video_index` is that it will ensure that the video index exists locally before any other dependent pipeline can execute. If it doesn't, it will download/extract the contents of the video index archive from [http://www.bu.edu/asllrp/ncslgr-for-download/video_index-20120129.zip](http://www.bu.edu/asllrp/ncslgr-for-download/zip). The first noteable point to make is that it uses the custom class `data_extractor__beam.TargetVideoIndexBootstrapper`, which inherits `beam__common.PipelinePcollElementProcessor`, which in turn inherits from from Apache Beam's `DoFn` class. Inheriting from Apache Beam's `DoFn` allows the inherited class to be used in Apache Beam pipelines via `beam.ParDo` (which stands for "**Par**allel **Do**"). Full documentation can be found [here](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/). There is nothing particularly noteworthy about the internal implementation of `data_extractor__beam.TargetVideoIndexBootstrapper`. It simply downloads the video index archive (to a memfile) and extracts its contents (in-memory). Please see its implementation for details if you are interested. Source code for `data_extractor__beam.pl__2__write_target_vid_index_csv` is listed below. It simply writes the bytes extracted from the archive to destintation path `/data/video_index-20120129.csv` (using an Apache Beam `schema` so that column names can easily be referenced/manipulated later). ```python disp_source(data_extractor__beam.pl__2__write_target_vid_index_csv) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def pl__2__write_target_vid_index_csv(full_target_vid_index_schemad_pcoll, d_pl_options):
vid_index_path = fileio.path_join(d_pl_options[fidscs_globals.OPT_NAME_DATA_DIR], fidscs_globals.VIDEO_INDEX_BASE+'.csv')
if not fileio.file_path_exists(vid_index_path, d_pl_options)[0]:
sorted_full_target_vid_index_schemad_pcoll = beam__common.pl__X__sort_pcoll(full_target_vid_index_schemad_pcoll, pcoll_label="full_target_vid_index")
sorted_corpus_index_csv_rows_pcoll = (
sorted_full_target_vid_index_schemad_pcoll
| "Beam PL: re-apply schema to sorted_full_target_vid_index" >> beam.Map(lambda sorted_full_target_vid_index_schemad_pcoll_row: beam.Row(
target_video_filename=sorted_full_target_vid_index_schemad_pcoll_row.target_video_filename,
video_seq_id=sorted_full_target_vid_index_schemad_pcoll_row.video_seq_id,
perspective_cam_id=sorted_full_target_vid_index_schemad_pcoll_row.perspective_cam_id,
compressed_mov_url=sorted_full_target_vid_index_schemad_pcoll_row.compressed_mov_url,
uncompressed_avi_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_url,
uncompressed_avi_mirror_1_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_mirror_1_url,
uncompressed_avi_mirror_2_url=sorted_full_target_vid_index_schemad_pcoll_row.uncompressed_avi_mirror_2_url
)
)
| beam.Map(lambda sorted_full_target_vid_index_schemad_pcoll_row: beam__common.beam_row_to_csv_string(sorted_full_target_vid_index_schemad_pcoll_row))
)
return beam__common.pl__X__write_pcoll_to_csv(
sorted_corpus_index_csv_rows_pcoll,
"TARGET-VIDEO-INDEX",
fidscs_globals.VIDEO_INDEXES_ARCHIVE,
fidscs_globals.SCHEMA_COL_NAMES__VIDEO_INDEX,
d_pl_options
)
else:
print(f"FOUND EXISTING VID INDEX: {vid_index_path}")
return [vid_index_path]
#### We are now ready to execute the first step of the "boostrap-vid-index" pipeline! ```python n_partitions = 8 # hardcoded for now but we need to retrieve this from beam to be the number of workers pl = beam.Pipeline(options=pipeline_options) full_target_vid_index_schemad_pcoll = data_extractor__beam.pl__1__bootstrap_target_video_index(pl) ``` That seems fast! That's because the pipeline wasn't acctually executed yet. What Apache Beam did in this case was create the corresponding pipelines *execution graph* (which is actually a *Directed Acyclic Graph*). With the `InteractiveRunner`, the pipeline only gets executed when it is required. This happens in notebooks by calling `ib.collect` or `ib.show`, which essentially executes and then reduces the distributed `Pcollection`. In this case, we use `ib.collect` which also stuffs the results into a Pandas `DataFrame` for viewing purposes within notebooks. Note that this is NOT done in production (in the cloud, in GCP Dataflow) since Pandas `DataFrame`s aren't needed and are simply impractical for "Big Data" solutions. Pandas `DataFrame`s really don't serve this purpose. Can you imagine attempting to hold all the corresponding tensor bytes for 561,000+ images in memory?? Anyway, moving on... before calling `ib.collect`, we must first tell Apache Beam to "record" all `Pcollection`s (up to a certain point) by calling `ib.watch(locals())`. Note that only `Pcollection`s prior to calling `ib.watch(locals())` are eligible for "collection" (conversion to Pandas `DataFrame`s). ```python # we require this in order to make use of ib.show() (which provides visualization of the pcolls specified) or ib.collect() (which creates a pandas dataframe from a pcoll) # but all pcolls we wish to visualize must be created prior to executing the following line ib.watch(locals()) ``` We can now collect the `full_target_vid_index_schemad_pcoll` `Pcollection` into a Pandas `DataFrame` for display in this notebook. ```python df_full_target_vid_index = ib.collect(full_target_vid_index_schemad_pcoll) ```
VIDEO-INDEX BOOTSTRAP INFO: {'vid_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'sel_vid_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'video_indexes_archive': 'video_index-20120129.zip', 'tmp_dir': '/tmp/data/tmp', 'video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv'} unzipping http://www.bu.edu/asllrp/ncslgr-for-download/video_index-20120129.zip in-memory... DONE ```python df_full_target_vid_index ```
0 1 2 3 4 5 6
0 http://csr.bu.edu/asl/sequences/compressed/mas... 0 539_219_small_0.mov 219
1 http://csr.bu.edu/asl/sequences/compressed/sla... 1 539_219_small_1.mov http://csr.bu.edu/asl0/uploading/2000_02_29/se... 219
2 http://csr.bu.edu/asl/sequences/compressed/sla... 2 539_219_small_2.mov http://csr.bu.edu/asl0/uploading/2000_02_29/se... http://csr.bu.edu/asl0/working/tapes1/2000_01_... http://csr.bu.edu/asl0/uploading/2000_02_25/sl... 219
3 http://csr.bu.edu/asl/private/compressed/maste... 0 548_master_small.mov http://csr.bu.edu/asl0/working/tapes2/2000_06_... http://csr.bu.edu/asl0/uploading/2000_06_07/ma... 548
4 http://csr.bu.edu/asl/private/compressed/slave... 1 548_slave1_small.mov http://csr.bu.edu/asl0/working/tapes2/2000_06_... 548
... ... ... ... ... ... ... ...
2607 http://csr.bu.edu/asl/private/downloads/2001_8... 0 siblings_1066_small_0.mov http://csr.bu.edu/asl0/working/tapes2/2001_07_... http://csr.bu.edu/asl0/working/tapes2/2001_07_... 1066
2608 http://csr.bu.edu/asl/private/downloads/2001_8... 2 siblings_1066_small_2.mov http://csr.bu.edu/asl0/uploading/2001_07_24/sl... 1066
2609 http://csr.bu.edu/asl/private/downloads/2003_0... 0 whitewater_1049_small_0.mov http://csr.bu.edu/asl0/working/tapes2/2001_07_... http://csr.bu.edu/asl0/working/tapes2/2001_07_... 1049
2610 http://csr.bu.edu/asl/private/downloads/2003_0... 2 whitewater_1049_small_2.mov http://csr.bu.edu/asl0/working/tapes2/2001_07_... http://csr.bu.edu/asl0/working/tapes2/2001_07_... 1049
2611 http://csr.bu.edu/asl/private/downloads/2003_0... 3 whitewater_1049_small_3.mov 1049

2612 rows × 7 columns

Note that this isn't entirely useful yet since we don't have any corresponding column names (in the above Pandas `DataFrame`). We have applied a `schema` to the `Pcollection` but that doesn't get applied to the Pandas `DataFrame` since applying a `schema` to a `Pcollection` is carried out by mappinng each row to a literal Apache Beam `Row` object, thereby effectively converting each element to an *unhashed* `dict`. Thus, we cannot guarantee the ordering of the columns will be fixed. We must therefore use the `schema` to refer to columns by name. But, we do see that there are 2,612 corresponding target videos to download. Note that since target videos are actually comprised of segments, there may actually be more videos than that that we download in the end (if we were to download them all... which is exactly what is done in production, on GCP Dataflow). This is done inline while writing the `Pcollection` (collected into the above Pandas `DataFrame` just for viewing) to the destination `/data/video_index-20120129.csv` file path (by `data_extractor__beam.pl__2__write_target_vid_index_csv`). But, as a nuance of collecting a `Pcollection` into a `DataFrame`, we can't simply call `data_extractor__beam.pl__2__write_target_vid_index_csv` now if we want to view the resulting `Pcollection` as a `DataFrame`. Recall that only `Pcollection`s prior to calling `ib.watch(locals())` are eligible for "collection" (conversion to Pandas `DataFrame`s), which we already did. This means we must re-execute the first step (`data_extractor__beam.pl__1__bootstrap_target_video_index`), followed by `data_extractor__beam.pl__2__write_target_vid_index_csv`, call `ib.watch(locals())`, and then finally call `ib.collect` on each of the corresponding `Pcollection`s in order to view them. But won't that mean that `data_extractor__beam.pl__1__bootstrap_target_video_index` will re-download the video index? ANSWER: no because it was written specifically to guard against that case. Take a look at its source and you'll see. If the video index exists locally, it will simply load it from the "tmp" directory and the resulting `Pcollection` is used as input for `data_extractor__beam.pl__2__write_target_vid_index_csv` (which will apply a `schema` and then write it to the final destination path `/data/video_index-20120129.csv`). Let's do that now...


#### The full "boostrap-vid-index" pipeline ```python # create a new instance of the pipeline pl = beam.Pipeline(options=pipeline_options) full_target_vid_index_schemad_pcoll = data_extractor__beam.pl__1__bootstrap_target_video_index(pl) _ = data_extractor__beam.pl__2__write_target_vid_index_csv(full_target_vid_index_schemad_pcoll, pl._options._all_options) ``` We know that observing the `full_target_vid_index_schemad_pcoll` `Pcollection` won't be particularly useful and the `Pcollection` that `data_extractor__beam.pl__2__write_target_vid_index_csv` outputs simply has the destination path after it successfully writes `full_target_vid_index_schemad_pcoll` to `/data/video_index-20120129.csv`, which isn't particularly interesting. But we need to be sure this pipeline completes before executing the more interesting "download-videos-extract-frames" pipeline. So instead of calling `ib.collect` to force the above pipeline to run, we'll simply call `pl.run` instead (since we are not particularly interested in viewing any `Pcollection`-to-`DataFrame` conversions from it). ```python print(f"\n\n****************************** Starting pipeline job: {job_name} ******************************") pl.run(); print(f"****************************** Finished pipeline job: {job_name} ******************************") ``` ****************************** Starting pipeline job: sc-fids-capstone-etl-demo--boostrap-vid-index ****************************** FOUND EXISTING SEL VID INDEX: /tmp/data/tmp/video_index-20120129/files_by_video_name.csv TARGET-VIDEO-INDEX CSV WRITTEN TO STORAGE: /tmp/data/video_index-20120129.csv ****************************** Finished pipeline job: sc-fids-capstone-etl-demo--boostrap-vid-index ******************************


#### The "download-videos-extract-frames" pipeline


The "download-videos-extract-frames" pipeline is comprised of four steps: 1. `beam__common.pl__1__read_target_vid_index_csv` 2. `data_extractor__beam.pl__2__filter_target_vid_index` 3. `data_extractor__beam.pl__3__parallel_download_videos` 4. `data_extractor__beam.pl__4__parallel_extract_target_video_frames` The function names used for each step suggest what they do. So I will only show source code for `data_extractor__beam.pl__3__parallel_download_videos` and `data_extractor__beam.pl__4__parallel_extract_target_video_frames`, and provide short explanations for steps 1 and 2. Step 1 obviously reads `/data/video_index-20120129.csv` from storage into a `Pcollection` to be used as input for `data_extractor__beam.pl__2__filter_target_vid_index`, which simply selects the first `MAX_TARGET_VIDEOS` from the full list of records from the `full_target_vid_index_schemad_pcoll` `Pcollection` that `beam__common.pl__1__read_target_vid_index_csv` returns. Note that `data_extractor__beam.pl__2__write_target_vid_index_csv`, in addition to applying a `schema`, also applies a row *id* and writes to `/data/video_index-20120129.csv` in the order of that index. `beam__common.pl__1__read_target_vid_index_csv` returns the corresponding `Pcollection` ordered by this index. Let's now inspect source code for steps 3 and 4... ```python disp_source(data_extractor__beam.pl__3__parallel_download_videos) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def pl__3__parallel_download_videos(vid_index_schemad_pcoll, d_pl_options, n_partitions=8):
vid_index_schemad_pcoll_download_partitions = (
vid_index_schemad_pcoll
| "Beam PL: partition schemad video index for download parallelization" >> beam.Partition(
lambda vid_index_row, num_partitions: random.randint(0,num_partitions-1),
n_partitions
)
)
partition_download_results = [None for i in range(n_partitions)]
for i, vid_index_schemad_pcoll_partition in enumerate(vid_index_schemad_pcoll_download_partitions):
p_label = f"p{i+1}"
p_label_indented = f"\t{p_label}"
p_dl_results = (
vid_index_schemad_pcoll_partition
| f"Beam PL: {p_label} gather download info for video segments" >> beam.ParDo(VideoSegmentInfoGatherer(d_pl_options))
| f"Beam PL: {p_label} download video segments" >> beam.ParDo(VideoSegmentDownloader(d_pl_options, f"{p_label_indented}"))
)
partition_download_results[i] = p_dl_results
merged_download_results = (
(p_dl_r for p_dl_r in partition_download_results)
| f"Beam PL: merge download results" >> beam.Flatten()
)
return merged_download_results
Now things get really interesting with `data_extractor__beam.pl__3__parallel_download_videos`... What we do here is explicitly tell Apache Beam to create 8 independent *partitions*, each of which will download videos independently of one another, corresponding to worker nodes. Note that either threads or worker nodes. How that plays out is beyond the scope of this notebook. Suffice it to say that this results in much faster processing than simply executing sequentially. When they are all done, the results are merged (via `beam.Flatten`) into a single `Pcollection` to be supplied as input to `data_extractor__beam.pl__4__parallel_extract_target_video_frames`. ```python disp_source(data_extractor__beam.pl__4__parallel_extract_target_video_frames) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def pl__4__parallel_extract_target_video_frames(merged_download_results, d_pl_options, n_partitions=8):
"""
# ******************** EXTRACT SEGMENT-FRAMES IN PARALLEL: BEGIN ********************
# NOTE! THIS IS A CRUCIAL PIECE SO PAY ATTENTION TO THE FOLLOWING!!
# ********** --> IMPORTANT VIDEO-FRAME EXTRACTION PROCESSING INFORMATION<-- (BEGIN) **********
# We partitioned vid_index_schemad_pcoll so that video-SEGMENT downloads can occur independently.
# Downloading segments can occur independently since there is no correlation between each segment
# AS FAR AS DOWNLOADING IS CONCERNED.
#
# However, AS FAR AS EXTRACTION IS CONCERNED, each segment is related by the target video composed
# of each segment. The segment-videos themselves are ordered as they compose the final target
# video corresponding of ordered segment videos. For example, if a target video is composed of
# three segment videos, those segments occur in a certain order, as specified by the video index.
# Expanding upon this example, suppose target video "some_story_given_by_john_doe_0.mov", was recorded
# and saved in three corresponding video segments (to save space, I guess?)
# "some_story_given_by_john_doe_0_1.mov", "some_story_given_by_john_doe_0_2.mov", and
# "some_story_given_by_john_doe_0_3.mov". Note that the trailing "0" in the TARGET VIDEO filename
# indicates the camera perspective... all stories are potentially filmed from multiple synchronized
# camera perspectives/angles - there were obvioiusly multiple synchronized video recorders used in
# in that case. However, for this example, we are focusing on the target video for camera perspective 0.
# Anyway, as said, there are three segments which compose the target video. THESE SEGMENT VIDEOS
# ARE ORDERED (in time). THEREFORE, THE FRAMES COMPOSING EACH SEGMENT VIDEO ARE CONSEQUENTLY ORDERED
# (in time). THE BOTTOM LINE IS THAT WE NOW NEED TO GROUP SEGMENT VIDEOS, KEYED BY CORRESPONDING
# TARGET VIDEO. FURTHERMORE, THE COLLECTION OF SEGMENT VIDEOS FOR EACH TARGET VIDEO MUST BE ORDERED.
# THAT IS, WE MUST EXTRACT SEGMENT FRAMES AND SAVE THEM TO THE FILE SYSTEM WITH A FILE NAMING SCHEME
# THAT REFLECTS FRAME ORDER OF THE UNION OF ALL SEGMENT FRAMES. IF WE EXTRACT THE FRAMES OF EACH
# ORDERED SEGMENT, THEN A SIMPLE NUMERIC INDEX AS SEGMENT-FRAME FILENAME WILL DO THE TRICK.
# ********** --> IMPORTANT VIDEO-FRAME EXTRACTION PROCESSING INFORMATION<-- (END) **********
"""
# GROUP segment videos by target video
# note that this depends on the DAG - i.e. will not occur until partition_download_results are ready which, of course, does not occur until all videos have been downloaded
target_vid_seg_frame_extraction_partitions = (
merged_download_results
| f"Beam PL: group extraction info for video segments by target video" >> beam.GroupBy(lambda d: d['target_video_fname'])
| f"Beam PL: partition target video segment info for extraction parallelization" >> beam.Partition(
lambda vid_index_row, num_partitions: random.randint(0,num_partitions-1),
n_partitions
)
)
partition_extraction_results = [None for i in range(n_partitions)]
for i, p in enumerate(target_vid_seg_frame_extraction_partitions):
p_label = f"p{i+1}"
p_label_indented = f"\t{p_label}"
p_extraction_results = (
p
| f"Beam PL: {p_label} extract frames of each segment per target video" >> beam.ParDo(SegmentFrameExtractor(d_pl_options, f"{p_label_indented}", debug=False))
)
partition_extraction_results[i] = p_extraction_results
(
p_extraction_results
| f"Beam PL: {p_label} count target videos processed" >> beam.combiners.Count.Globally()
| f"Beam PL: {p_label} print target videos processed count" >> beam.ParDo(beam__common.PipelinePcollPrinter(label=p_label_indented, msg="target videos processed"))
)
merged_extraction_results = (
(p_extraction_results for p_extraction_results in partition_extraction_results)
| f"Beam PL: merge extraction results" >> beam.Flatten()
)
_ = (
merged_extraction_results
| "Beam PL: apply schema to merged extraction results pcoll" >> beam.Map(lambda x: beam.Row(
video_fname=str(x[0]),
n_stitched_frames=int(x[1])
))
# | "Beam PL: count total frames extracted" >> beam.transforms.sql.SqlTransform(f"SELECT SUM(n_stitched_frames) AS total_frames_extracted FROM PCOLLECTION") # this is VERY, VERY SLOW
| "Beam PL: select n_stitched_frames" >> beam.Map(lambda extraction_results_row: extraction_results_row.n_stitched_frames)
| "Beam PL: count total frames extracted" >> beam.CombineGlobally(sum)
| f"Beam PL: print total frames extracted" >> beam.ParDo(beam__common.PipelinePcollPrinter(msg="TOTAL FRAMES EXTRACTED"))
)
return merged_extraction_results
From the first part of this notebook... This step leverages Apache Beam's parallelism as well. But we MUST take care to ensure that a single worker extracts the frames of each segment associated with the target video. This is because frames are ordered/sequenced. Allowing two different workers to extract frames of different segments associated with the same final target video would likely result in frames being extracted out of order (due to parallelism). Therefore, we partition the extraction task by final target video in order to ensure a single worker handles all segments associated with a single target video. But we do want parallelism to occurr at the final target video level. Before creating the pipeline execution graph, it is worth taking a deeper look into the internals of how we use the `OpenCV` library to process the videos (extract frames). The `data_extractor__beam.SegmentFrameExtractor` wraps the `data_extractor__beam.beam_extract_frames`, which houses the logic for this processing. There are also a couple of helper functions that `` uses: `data_extractor__beam.capture_segment_video` and `data_extractor__beam.write_frame_to_file`. These will be listed after `data_extractor__beam.beam_extract_frames`. ```python disp_source(data_extractor__beam.beam_extract_frames) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def beam_extract_frames(tpl_target_video_extraction_info, d_pl_options, label="", debug=False):
"""
expects tpl_target_video_extraction_info: (video_fname, list({'target_video_fname': target_video_fname, 'target_video_frames_dir': target_video_frames_dir, 'segment_url': str(url), 'segment_fname': str(url).split('/')[-1]}))
"""

# # log_results = []
target_video_fname = tpl_target_video_extraction_info[0]
segment_dicts = sorted(tpl_target_video_extraction_info[1], key=lambda segment_dict: segment_dict['segment_fname'])
target_video_frames_dir = segment_dicts[0]['target_video_frames_dir']

target_stitched_vid_name = target_video_frames_dir.split(os.path.sep)[-1]
if not fileio.dir_path_exists(target_video_frames_dir, d_pl_options)[0]:
fileio.make_dirs(target_video_frames_dir, d_pl_options)

video_dir = d_pl_options[fidscs_globals.OPT_NAME_VIDEO_DIR]
local_vid_segment_paths = [fileio.path_join(video_dir, segment_dict['segment_fname']) for segment_dict in segment_dicts]
for segment_dict in segment_dicts:
segment_dict['n_frames_extracted'] = 0

# create local dir for extraction (since OpenCV works only with local file system currently) if we have GCS filesystem
truly_local_vid_dir = None
truly_local_vid_dir_suffix = None
fs = FileSystems.get_filesystem(video_dir)
if type(fs) == GCSFileSystem:
truly_local_vid_dir_suffix = '/'.join(video_dir.split('/')[1:])
truly_local_vid_dir = '/tmp'+truly_local_vid_dir_suffix
# print(f"\t\tGCS storage detected! Extracting frames to truly_local_vid_dir {truly_local_vid_dir} (and will then upload to GCS after that)...")
if debug: print(f"\t\t{truly_local_vid_dir} exists: {fileio.dir_path_exists(truly_local_vid_dir, d_pl_options)}")
if not fileio.dir_path_exists(truly_local_vid_dir, d_pl_options)[0]:
if debug: print(f"\tcreating {truly_local_vid_dir}...")
truly_local_vid_dir_path_segs = truly_local_vid_dir.split('/')
if debug: print(f"\t\ttruly_local_vid_dir_path_segs: {truly_local_vid_dir_path_segs}")
s_cum_path = ''
for i, truly_local_vid_dir_path_seg in enumerate(truly_local_vid_dir_path_segs[1:]):
s_cum_path += '/'+truly_local_vid_dir_path_seg
fileio.make_dirs(s_cum_path, d_pl_options)
if debug: print(f"\t\t{s_cum_path} exists: {fileio.dir_path_exists(s_cum_path, d_pl_options)}")

vc_results = [capture_segment_video(local_vid_segment_path, truly_local_vid_dir, d_pl_options, debug=debug) for local_vid_segment_path in local_vid_segment_paths]
vid_caps = [vc_result[0] for vc_result in vc_results]
truly_local_target_video_frames_dirs = [vc_result[1] for vc_result in vc_results]

for seg_vid_cap in vid_caps:
seg_vid_cap.set(cv2.CAP_PROP_FPS, fidscs_globals.FPS)
frame_counts = list(map(lambda vc: int(vc.get(cv2.CAP_PROP_FRAME_COUNT)), vid_caps))
n_frames_expected = sum(frame_counts)

failed_target_videos = []

n_stitched_frames = 0
if n_frames_expected > 0:
# get count of existing stitched frames in target_stitched_vid_frames_dir
n_stitched_frames = len(fileio.list_dir(target_video_frames_dir, d_pl_options))

b_restitch = n_stitched_frames < n_frames_expected
n_stitched_frames = 0 if b_restitch else n_stitched_frames

for i, seg_vid_cap in enumerate(vid_caps):
segment_dict = segment_dicts[i]
_n_frames_expected = frame_counts[i]

if b_restitch:
success, frame = seg_vid_cap.read()
n_frames = 0
while success:
write_frame_to_file(
frame,
n_stitched_frames,
target_video_frames_dir,
truly_local_target_video_frames_dir=truly_local_target_video_frames_dirs[i],
debug=debug
)

n_frames += 1
n_stitched_frames += 1
success, frame = seg_vid_cap.read()

seg_path = local_vid_segment_paths[i]
seg_fname = seg_path.split(os.path.sep)[-1]
if n_frames != _n_frames_expected:
print(f"{label+': ' if len(label)>0 else ''}{fidscs_globals.VALIDATION_FATAL_ERROR_TEXT} Cannot stitch together target video {target_video_fname} since {_n_frames_expected} frames were expected from segment {seg_fname} ({seg_path}) but only {n_frames} were successfully extracted")
failed_target_videos.append(target_video_fname)
fail = True
break
else:
print(f"{label+': ' if len(label)>0 else ''}Added {n_stitched_frames} frames from segment {seg_fname} for target video {target_video_fname} (stitched-frames dir {target_video_frames_dir})")

else:
n_frames = _n_frames_expected
print(f"{label+': ' if len(label)>0 else ''}Found existing stiched-frames for {target_stitched_vid_name} ({n_stitched_frames} frames in {target_video_frames_dir})")

segment_dict['n_frames_extracted'] = n_frames

else:
if fidscs_globals.OUTPUT_INFO_LEVEL <= fidscs_globals.OUTPUT_INFO_LEVEL__WARNING:
print(f"\t{fidscs_globals.VALIDATION_WARNING_TEXT} Cannot stitch together target video {target_video_fname} since cv2.CAP_PROP_FRAME_COUNT reports segments have zero frames")
failed_target_videos.append(target_video_fname)
fail = True

if truly_local_vid_dir is not None:
for truly_local_target_video_frames_dir in truly_local_target_video_frames_dirs:
fileio.delete_file(truly_local_target_video_frames_dir, d_pl_options, recursive=True, debug=True)

return [(tpl_target_video_extraction_info[0], n_stitched_frames, segment_dicts)]
```python disp_source(data_extractor__beam.capture_segment_video) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def capture_segment_video(vid_segment_path, truly_local_vid_dir, d_pl_options, debug=False):
video_fname = vid_segment_path.split('/')[-1]

truly_local_target_video_frames_dir = None

fs = FileSystems.get_filesystem(vid_segment_path)
if type(fs) == GCSFileSystem:
if debug: print(f"\n\n\tattempting to open video {vid_segment_path} for reading...")
with fileio.open_file_read(vid_segment_path) as f:
if debug: print(f"\t\tSUCCESS")

# now read from local bytes and write to GCS
buffer = f.read()
truly_local_vid_segment_path = truly_local_vid_dir+'/'+video_fname
if debug: print(f"\t\tattempting to write {truly_local_vid_segment_path} (truly) locally...")
with fileio.open_file_write(truly_local_vid_segment_path) as f_local:
f_local.write(buffer)
f_local.close()
if debug: print(f"\t\t\tSUCCESS")
f.close()

vid_segment_path = truly_local_vid_segment_path

# (truly local) dir for saving frames
truly_local_target_video_frames_dir = truly_local_vid_dir+'/'+fidscs_globals.STICHED_VIDEO_FRAMES_DIR_NAME+'/'+video_fname.split('.')[0]
if debug: print(f"\t\t\tattempting to create directory {truly_local_target_video_frames_dir} (truly_local_target_video_frames_dir) for frames extracted from (truly local) video {truly_local_vid_segment_path}...")
if not fileio.dir_path_exists(truly_local_target_video_frames_dir, d_pl_options)[0]:
if debug: print(f"\t\t\t\tcreating {truly_local_target_video_frames_dir}...")
fileio.make_dirs(truly_local_target_video_frames_dir, d_pl_options)
truly_local_target_video_frames_dir_exists = fileio.dir_path_exists(truly_local_target_video_frames_dir, d_pl_options)[0]
if debug: print(f"\t\t\t\t\t{truly_local_target_video_frames_dir} exists: {truly_local_target_video_frames_dir_exists}")
if not truly_local_target_video_frames_dir_exists:
raise Exception(f"required directory truly_local_target_video_frames_dir {truly_local_target_video_frames_dir_exists} does not exist")

if debug: print(f"\t\t\tattempting to capture (cv2.VideoCapture) video {vid_segment_path})...")

# finally, capture the video bytes
return cv2.VideoCapture(vid_segment_path), truly_local_target_video_frames_dir
```python disp_source(data_extractor__beam.write_frame_to_file) ``` <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">

def write_frame_to_file(frame, index, target_video_frames_dir, truly_local_target_video_frames_dir=None, debug=False):
local_frame_path = fileio.path_join(target_video_frames_dir, f"{index}.jpg") # this is the final frame path

if truly_local_target_video_frames_dir is not None:
# write truly local frame file
truly_local_frame_path = truly_local_target_video_frames_dir+'/'+f"{index}.jpg"
if debug: print(f"\t\t\t\t\t\tattempting to write {truly_local_frame_path} frame...")
cv2.imwrite(truly_local_frame_path, frame)
if debug: print(f"\t\t\t\t\t\t\tSUCCESS")
if debug: print(f"\t\t\t\t\t\t\tattempting to open {truly_local_frame_path} for read...")
with fileio.open_file_read(truly_local_frame_path) as f_truly_local_frame:
buffer = f_truly_local_frame.read()
if debug: print(f"\t\t\t\t\t\t\t\tSUCCESS")
if debug: print(f"\t\t\t\t\t\t\t\t\tattempting to open {local_frame_path} for final write...")
with fileio.open_file_write(local_frame_path) as f_frame_final:
f_frame_final.write(buffer)
f_frame_final.close()
if debug: print(f"\t\t\t\t\t\t\t\t\t\tSUCCESS")
buffer = None
f_truly_local_frame.close()

else:
if debug: print(f"\t\t\t\t\t\t\t\t\tattempting to open {local_frame_path} for final write...")
cv2.imwrite(local_frame_path, frame)
if debug: print(f"\t\t\t\t\t\t\t\t\t\tSUCCESS")


We are now ready to execute the "download-videos-extract-frames" pipeline. But first we must...


#### Create the "download-videos-extract-frames" pipeline execution graph ```python job_suffix = 'download-videos-extract-frames' job_name = f"{PIPELINE_BASE_JOB_NAME}--{job_suffix}" options.update({ 'job_name': job_name }) pipeline_options = PipelineOptions(flags=[], **options) # easier to pass in options from command-line this way print(f"PipelineOptions:\n{pipeline_options.get_all_options()}\n") pl = beam.Pipeline(options=pipeline_options) full_target_vid_index_schemad_pcoll = beam__common.pl__1__read_target_vid_index_csv(pl) filtered_target_vid_index_schemad_pcoll = data_extractor__beam.pl__2__filter_target_vid_index(full_target_vid_index_schemad_pcoll, pl._options._all_options) merged_download_results = data_extractor__beam.pl__3__parallel_download_videos(filtered_target_vid_index_schemad_pcoll, pl._options._all_options, n_partitions) merged_extraction_results = data_extractor__beam.pl__4__parallel_extract_target_video_frames(merged_download_results, pl._options._all_options, n_partitions) ``` PipelineOptions: {'runner': 'InteractiveRunner', 'streaming': False, 'beam_services': {}, 'type_check_strictness': 'DEFAULT_TO_ANY', 'type_check_additional': '', 'pipeline_type_check': True, 'runtime_type_check': False, 'performance_runtime_type_check': False, 'direct_runner_use_stacked_bundle': True, 'direct_runner_bundle_repeat': 0, 'direct_num_workers': 0, 'direct_running_mode': 'multi_threading', 'dataflow_endpoint': 'https://dataflow.googleapis.com', 'project': 'sc-fids-capstone', 'job_name': 'sc-fids-capstone-etl-demo--download-videos-extract-frames', 'staging_location': None, 'temp_location': None, 'region': None, 'service_account_email': None, 'no_auth': False, 'template_location': None, 'labels': None, 'update': False, 'transform_name_mapping': None, 'enable_streaming_engine': False, 'dataflow_kms_key': None, 'flexrs_goal': None, 'hdfs_host': None, 'hdfs_port': None, 'hdfs_user': None, 'hdfs_full_urls': False, 'num_workers': None, 'max_num_workers': None, 'autoscaling_algorithm': None, 'machine_type': None, 'disk_size_gb': None, 'disk_type': None, 'worker_region': None, 'worker_zone': None, 'zone': None, 'network': None, 'subnetwork': None, 'worker_harness_container_image': None, 'sdk_harness_container_image_overrides': None, 'use_public_ips': None, 'min_cpu_platform': None, 'dataflow_worker_jar': None, 'dataflow_job_file': None, 'experiments': None, 'number_of_worker_harness_threads': None, 'profile_cpu': False, 'profile_memory': False, 'profile_location': None, 'profile_sample_rate': 1.0, 'requirements_file': None, 'requirements_cache': None, 'setup_file': None, 'beam_plugins': None, 'save_main_session': False, 'sdk_location': 'default', 'extra_packages': None, 'prebuild_sdk_container_engine': None, 'prebuild_sdk_container_base_image': None, 'docker_registry_push_url': None, 'job_endpoint': None, 'artifact_endpoint': None, 'job_server_timeout': 60, 'environment_type': 'DOCKER', 'environment_config': None, 'environment_options': None, 'sdk_worker_parallelism': 1, 'environment_cache_millis': 0, 'output_executable_path': None, 'artifacts_dir': None, 'job_port': 0, 'artifact_port': 0, 'expansion_port': 0, 'flink_master': '[auto]', 'flink_version': '1.10', 'flink_job_server_jar': None, 'flink_submit_uber_jar': False, 'spark_master_url': 'local[4]', 'spark_job_server_jar': None, 'spark_submit_uber_jar': False, 'spark_rest_url': None, 'on_success_matcher': None, 'dry_run': False, 'wait_until_finish_duration': None, 'pubsubRootUrl': None, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_session_token': None, 's3_endpoint_url': None, 's3_region_name': None, 's3_api_version': None, 's3_verify': None, 's3_disable_ssl': False, 'fidscs_capstone_max_target_videos': 50, 'fidscs_capstone_work_dir': '/tmp', 'fidscs_capstone_data_dir': '/tmp/data', 'fidscs_capstone_tmp_dir': '/tmp/data/tmp', 'fidscs_capstone_videos_dir': '/tmp/data/videos', 'fidscs_capstone_stitched_video_frames_dir': '/tmp/data/stitched_video_frames', 'fidscs_capstone_corpus_dir': '/tmp/data/tmp/ncslgr-xml', 'fidscs_capstone_corpus_ds_path': '/tmp/data/ncslgr-corpus-index.csv', 'fidscs_capstone_document_asl_cconsultant_ds_path': '/tmp/data/document-consultant-index.csv', 'fidscs_capstone_asl_consultant_ds_path': '/tmp/data/consultant-index.csv', 'fidscs_capstone_video_indexes_dir': '/tmp/data/tmp/video_index-20120129', 'fidscs_capstone_selected_video_index_path': '/tmp/data/tmp/video_index-20120129/files_by_video_name.csv', 'fidscs_capstone_video_ds_path': '/tmp/data/document-consultant-targetvideo-index.csv', 'fidscs_capstone_video_segment_ds_path': '/tmp/data/document-consultant-targetvideo-segment-index.csv', 'fidscs_capstone_video_frame_ds_path': '/tmp/data/document-consultant-targetvideo-frame-index.csv', 'fidscs_capstone_utterance_ds_path': '/tmp/data/document-consultant-utterance-index.csv', 'fidscs_capstone_utterance_video_ds_path': '/tmp/data/document-consultant-utterance-targetvideo-index.csv', 'fidscs_capstone_utterance_token_ds_path': '/tmp/data/document-consultant-utterance-token-index.csv', 'fidscs_capstone_utterance_token_frame_ds_path': '/tmp/data/document-consultant-targetvideo-utterance-token-frame-index.csv', 'fidscs_capstone_vocabulary_ds_path': '/tmp/data/vocabulary-index.csv'} This time we would like to observe the results (collected into Pandas `DataFrame`s)... ```python # we require this in order to make use of ib.show() (which provides visualization of the pcolls specified) or ib.collect() (which creates a pandas dataframe from a pcoll) # but all pcolls we wish to visualize must be created prior to executing the following line ib.watch(locals()) ``` And calling `ib.collect` forces the pipeline to actually run...


#### Run the full "download-videos-extract-frames" pipeline We do this by collecting `Pcollection`s into Pandas `DataFrame`s for viewing with *Interactive Beam*. ```python print(f"\n\n****************************** Starting pipeline job: {job_name} ******************************") df_full_target_vid_index_schemad_pcoll = ib.collect(full_target_vid_index_schemad_pcoll) df_filtered_target_vid_index_schemad_pcoll = ib.collect(filtered_target_vid_index_schemad_pcoll) df_merged_download_results = ib.collect(merged_download_results) df_merged_extraction_results = ib.collect(merged_extraction_results) print(f"****************************** Finished pipeline job: {job_name} ******************************") ``` ****************************** Starting pipeline job: sc-fids-capstone-etl-demo--download-videos-extract-frames ******************************

p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1508_small_3.mov to /tmp/data/videos/_1508_small_3.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1510_small_0.mov to /tmp/data/videos/_1510_small_0.mov p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1510_small_1.mov to /tmp/data/videos/_1510_small_1.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1510_small_3.mov to /tmp/data/videos/_1510_small_3.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1512_small_0.mov to /tmp/data/videos/_1512_small_0.mov p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1512_small_1.mov to /tmp/data/videos/_1512_small_1.mov p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1512_small_3.mov to /tmp/data/videos/_1512_small_3.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1513_small_0.mov to /tmp/data/videos/_1513_small_0.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1513_small_1.mov to /tmp/data/videos/_1513_small_1.mov p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1513_small_3.mov to /tmp/data/videos/_1513_small_3.mov p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1516_small_0.mov to /tmp/data/videos/_1516_small_0.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1516_small_1.mov to /tmp/data/videos/_1516_small_1.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1516_small_3.mov to /tmp/data/videos/_1516_small_3.mov p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1522_small_0.mov to /tmp/data/videos/_1522_small_0.mov p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1522_small_1.mov to /tmp/data/videos/_1522_small_1.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1522_small_3.mov to /tmp/data/videos/_1522_small_3.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1523_small_0.mov to /tmp/data/videos/_1523_small_0.mov p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1523_small_1.mov to /tmp/data/videos/_1523_small_1.mov p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1523_small_3.mov to /tmp/data/videos/_1523_small_3.mov p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1524_small_0.mov to /tmp/data/videos/_1524_small_0.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1524_small_1.mov to /tmp/data/videos/_1524_small_1.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1524_small_3.mov to /tmp/data/videos/_1524_small_3.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1525_small_0.mov to /tmp/data/videos/_1525_small_0.mov p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1525_small_1.mov to /tmp/data/videos/_1525_small_1.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1525_small_3.mov to /tmp/data/videos/_1525_small_3.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1526_small_0.mov to /tmp/data/videos/_1526_small_0.mov p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1526_small_1.mov to /tmp/data/videos/_1526_small_1.mov p4: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1526_small_3.mov to /tmp/data/videos/_1526_small_3.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1531_small_0.mov to /tmp/data/videos/_1531_small_0.mov p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1531_small_1.mov to /tmp/data/videos/_1531_small_1.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1531_small_3.mov to /tmp/data/videos/_1531_small_3.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1532_small_0.mov to /tmp/data/videos/_1532_small_0.mov p1: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1532_small_1.mov to /tmp/data/videos/_1532_small_1.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1532_small_3.mov to /tmp/data/videos/_1532_small_3.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1533_small_0.mov to /tmp/data/videos/_1533_small_0.mov p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1533_small_1.mov to /tmp/data/videos/_1533_small_1.mov p4: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1533_small_3.mov to /tmp/data/videos/_1533_small_3.mov p4: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1535_small_0.mov to /tmp/data/videos/_1535_small_0.mov p8: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1535_small_1.mov to /tmp/data/videos/_1535_small_1.mov p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1535_small_3.mov to /tmp/data/videos/_1535_small_3.mov p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1536_small_0.mov to /tmp/data/videos/_1536_small_0.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1536_small_1.mov to /tmp/data/videos/_1536_small_1.mov p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1536_small_3.mov to /tmp/data/videos/_1536_small_3.mov p6: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1537_small_0.mov to /tmp/data/videos/_1537_small_0.mov p2: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1537_small_1.mov to /tmp/data/videos/_1537_small_1.mov p7: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1537_small_3.mov to /tmp/data/videos/_1537_small_3.mov p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1538_small_0.mov to /tmp/data/videos/_1538_small_0.mov p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1538_small_1.mov to /tmp/data/videos/_1538_small_1.mov p3: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1538_small_3.mov to /tmp/data/videos/_1538_small_3.mov p5: Downloaded http://csr.bu.edu/asl/private/downloads/2002_8_15/_1539_small_0.mov to /tmp/data/videos/_1539_small_0.mov
p3: Added 81 frames from segment _1516_small_0.mov for target video _1516_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1516_small_0) p3: Added 81 frames from segment _1526_small_1.mov for target video _1526_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1526_small_1) p4: Added 81 frames from segment _1516_small_3.mov for target video _1516_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1516_small_3) p6: Added 81 frames from segment _1536_small_3.mov for target video _1536_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1536_small_3) p8: Added 85 frames from segment _1512_small_0.mov for target video _1512_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1512_small_0) p4: Added 89 frames from segment _1510_small_1.mov for target video _1510_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1510_small_1) p2: Added 93 frames from segment _1533_small_0.mov for target video _1533_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1533_small_0) p6: Added 129 frames from segment _1523_small_3.mov for target video _1523_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1523_small_3) p8: Added 71 frames from segment _1537_small_1.mov for target video _1537_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1537_small_1) p6: Added 65 frames from segment _1524_small_1.mov for target video _1524_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1524_small_1) p4: Added 81 frames from segment _1526_small_3.mov for target video _1526_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1526_small_3) p7: Added 65 frames from segment _1524_small_0.mov for target video _1524_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1524_small_0) p4: Added 81 frames from segment _1516_small_1.mov for target video _1516_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1516_small_1) p6: Added 89 frames from segment _1510_small_3.mov for target video _1510_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1510_small_3) p3: Added 121 frames from segment _1531_small_0.mov for target video _1531_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1531_small_0) p5: Added 93 frames from segment _1533_small_3.mov for target video _1533_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1533_small_3) p4: Added 63 frames from segment _1532_small_3.mov for target video _1532_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1532_small_3) p5: Added 83 frames from segment _1513_small_3.mov for target video _1513_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1513_small_3) p7: Added 81 frames from segment _1536_small_1.mov for target video _1536_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1536_small_1) p8: Added 93 frames from segment _1533_small_1.mov for target video _1533_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1533_small_1) p3: Added 81 frames from segment _1526_small_0.mov for target video _1526_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1526_small_0) p3: Added 71 frames from segment _1537_small_0.mov for target video _1537_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1537_small_0) p5: Added 71 frames from segment _1537_small_3.mov for target video _1537_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1537_small_3) p5: Added 65 frames from segment _1524_small_3.mov for target video _1524_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1524_small_3) p7: Added 89 frames from segment _1510_small_0.mov for target video _1510_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1510_small_0) p5: Added 129 frames from segment _1523_small_1.mov for target video _1523_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1523_small_1) p4: Added 89 frames from segment _1508_small_3.mov for target video _1508_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1508_small_3) p5: Added 111 frames from segment _1539_small_0.mov for target video _1539_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1539_small_0) p6: Added 63 frames from segment _1532_small_1.mov for target video _1532_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1532_small_1) p5: Added 85 frames from segment _1512_small_1.mov for target video _1512_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1512_small_1) p6: Added 111 frames from segment _1535_small_0.mov for target video _1535_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1535_small_0) p5: Added 85 frames from segment _1512_small_3.mov for target video _1512_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1512_small_3) p4: Added 81 frames from segment _1522_small_1.mov for target video _1522_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1522_small_1) p8: Added 81 frames from segment _1522_small_0.mov for target video _1522_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1522_small_0) p1: Added 121 frames from segment _1531_small_1.mov for target video _1531_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1531_small_1) p4: Added 83 frames from segment _1513_small_1.mov for target video _1513_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1513_small_1) p2: Added 71 frames from segment _1525_small_1.mov for target video _1525_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1525_small_1) p3: Added 89 frames from segment _1538_small_0.mov for target video _1538_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1538_small_0) p7: Added 89 frames from segment _1538_small_3.mov for target video _1538_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1538_small_3) p7: Added 63 frames from segment _1532_small_0.mov for target video _1532_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1532_small_0) p3: Added 71 frames from segment _1525_small_3.mov for target video _1525_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1525_small_3) p6: Added 129 frames from segment _1523_small_0.mov for target video _1523_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1523_small_0) p6: Added 71 frames from segment _1525_small_0.mov for target video _1525_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1525_small_0) p8: Added 81 frames from segment _1536_small_0.mov for target video _1536_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1536_small_0) p6: Added 81 frames from segment _1522_small_3.mov for target video _1522_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1522_small_3) p2: Added 111 frames from segment _1535_small_1.mov for target video _1535_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1535_small_1) p2: Added 83 frames from segment _1513_small_0.mov for target video _1513_small_0.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1513_small_0) p4: Added 121 frames from segment _1531_small_3.mov for target video _1531_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1531_small_3) p3: Added 111 frames from segment _1535_small_3.mov for target video _1535_small_3.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1535_small_3) p2: Added 89 frames from segment _1538_small_1.mov for target video _1538_small_1.mov (stitched-frames dir /tmp/data/stitched_video_frames/_1538_small_1) ****************************** Finished pipeline job: sc-fids-capstone-etl-demo--download-videos-extract-frames ****************************** ```python df_filtered_target_vid_index_schemad_pcoll ```
target_video_filename video_seq_id perspective_cam_id compressed_mov_url uncompressed_avi_url uncompressed_avi_mirror_1_url uncompressed_avi_mirror_2_url
0 _1508_small_3.mov 1508 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
1 _1510_small_0.mov 1510 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
2 _1510_small_1.mov 1510 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
3 _1510_small_3.mov 1510 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
4 _1512_small_0.mov 1512 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
5 _1512_small_1.mov 1512 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
6 _1512_small_3.mov 1512 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
7 _1513_small_0.mov 1513 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
8 _1513_small_1.mov 1513 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
9 _1513_small_3.mov 1513 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
10 _1516_small_0.mov 1516 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
11 _1516_small_1.mov 1516 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
12 _1516_small_3.mov 1516 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
13 _1522_small_0.mov 1522 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
14 _1522_small_1.mov 1522 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
15 _1522_small_3.mov 1522 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
16 _1523_small_0.mov 1523 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
17 _1523_small_1.mov 1523 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
18 _1523_small_3.mov 1523 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
19 _1524_small_0.mov 1524 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
20 _1524_small_1.mov 1524 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
21 _1524_small_3.mov 1524 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
22 _1525_small_0.mov 1525 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
23 _1525_small_1.mov 1525 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
24 _1525_small_3.mov 1525 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
25 _1526_small_0.mov 1526 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
26 _1526_small_1.mov 1526 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
27 _1526_small_3.mov 1526 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
28 _1531_small_0.mov 1531 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
29 _1531_small_1.mov 1531 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
30 _1531_small_3.mov 1531 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
31 _1532_small_0.mov 1532 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
32 _1532_small_1.mov 1532 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
33 _1532_small_3.mov 1532 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
34 _1533_small_0.mov 1533 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
35 _1533_small_1.mov 1533 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
36 _1533_small_3.mov 1533 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
37 _1535_small_0.mov 1535 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
38 _1535_small_1.mov 1535 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
39 _1535_small_3.mov 1535 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
40 _1536_small_0.mov 1536 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
41 _1536_small_1.mov 1536 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
42 _1536_small_3.mov 1536 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
43 _1537_small_0.mov 1537 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
44 _1537_small_1.mov 1537 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
45 _1537_small_3.mov 1537 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
46 _1538_small_0.mov 1538 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
47 _1538_small_1.mov 1538 1 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
48 _1538_small_3.mov 1538 3 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
49 _1539_small_0.mov 1539 0 http://csr.bu.edu/asl/private/downloads/2002_8... http://csr.bu.edu/asl0/working/tapes3/2002_08_...
```python df_merged_download_results ```
target_video_fname target_video_frames_dir segment_url segment_fname
0 _1508_small_3.mov /tmp/data/stitched_video_frames/_1508_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1508_small_3.mov
1 _1513_small_1.mov /tmp/data/stitched_video_frames/_1513_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1513_small_1.mov
2 _1523_small_0.mov /tmp/data/stitched_video_frames/_1523_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1523_small_0.mov
3 _1525_small_3.mov /tmp/data/stitched_video_frames/_1525_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1525_small_3.mov
4 _1532_small_1.mov /tmp/data/stitched_video_frames/_1532_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1532_small_1.mov
5 _1536_small_0.mov /tmp/data/stitched_video_frames/_1536_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1536_small_0.mov
6 _1538_small_3.mov /tmp/data/stitched_video_frames/_1538_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1538_small_3.mov
7 _1512_small_0.mov /tmp/data/stitched_video_frames/_1512_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1512_small_0.mov
8 _1516_small_3.mov /tmp/data/stitched_video_frames/_1516_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1516_small_3.mov
9 _1524_small_1.mov /tmp/data/stitched_video_frames/_1524_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1524_small_1.mov
10 _1531_small_0.mov /tmp/data/stitched_video_frames/_1531_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1531_small_0.mov
11 _1533_small_3.mov /tmp/data/stitched_video_frames/_1533_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1533_small_3.mov
12 _1537_small_1.mov /tmp/data/stitched_video_frames/_1537_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1537_small_1.mov
13 _1510_small_1.mov /tmp/data/stitched_video_frames/_1510_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1510_small_1.mov
14 _1516_small_0.mov /tmp/data/stitched_video_frames/_1516_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1516_small_0.mov
15 _1523_small_3.mov /tmp/data/stitched_video_frames/_1523_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1523_small_3.mov
16 _1526_small_1.mov /tmp/data/stitched_video_frames/_1526_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1526_small_1.mov
17 _1533_small_0.mov /tmp/data/stitched_video_frames/_1533_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1533_small_0.mov
18 _1536_small_3.mov /tmp/data/stitched_video_frames/_1536_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1536_small_3.mov
19 _1510_small_0.mov /tmp/data/stitched_video_frames/_1510_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1510_small_0.mov
20 _1513_small_3.mov /tmp/data/stitched_video_frames/_1513_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1513_small_3.mov
21 _1523_small_1.mov /tmp/data/stitched_video_frames/_1523_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1523_small_1.mov
22 _1526_small_0.mov /tmp/data/stitched_video_frames/_1526_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1526_small_0.mov
23 _1532_small_3.mov /tmp/data/stitched_video_frames/_1532_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1532_small_3.mov
24 _1536_small_1.mov /tmp/data/stitched_video_frames/_1536_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1536_small_1.mov
25 _1539_small_0.mov /tmp/data/stitched_video_frames/_1539_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1539_small_0.mov
26 _1512_small_3.mov /tmp/data/stitched_video_frames/_1512_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1512_small_3.mov
27 _1522_small_1.mov /tmp/data/stitched_video_frames/_1522_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1522_small_1.mov
28 _1525_small_0.mov /tmp/data/stitched_video_frames/_1525_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1525_small_0.mov
29 _1531_small_3.mov /tmp/data/stitched_video_frames/_1531_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1531_small_3.mov
30 _1535_small_1.mov /tmp/data/stitched_video_frames/_1535_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1535_small_1.mov
31 _1538_small_0.mov /tmp/data/stitched_video_frames/_1538_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1538_small_0.mov
32 _1510_small_3.mov /tmp/data/stitched_video_frames/_1510_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1510_small_3.mov
33 _1516_small_1.mov /tmp/data/stitched_video_frames/_1516_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1516_small_1.mov
34 _1524_small_0.mov /tmp/data/stitched_video_frames/_1524_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1524_small_0.mov
35 _1526_small_3.mov /tmp/data/stitched_video_frames/_1526_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1526_small_3.mov
36 _1533_small_1.mov /tmp/data/stitched_video_frames/_1533_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1533_small_1.mov
37 _1537_small_0.mov /tmp/data/stitched_video_frames/_1537_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1537_small_0.mov
38 _1512_small_1.mov /tmp/data/stitched_video_frames/_1512_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1512_small_1.mov
39 _1522_small_0.mov /tmp/data/stitched_video_frames/_1522_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1522_small_0.mov
40 _1524_small_3.mov /tmp/data/stitched_video_frames/_1524_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1524_small_3.mov
41 _1531_small_1.mov /tmp/data/stitched_video_frames/_1531_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1531_small_1.mov
42 _1535_small_0.mov /tmp/data/stitched_video_frames/_1535_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1535_small_0.mov
43 _1537_small_3.mov /tmp/data/stitched_video_frames/_1537_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1537_small_3.mov
44 _1513_small_0.mov /tmp/data/stitched_video_frames/_1513_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1513_small_0.mov
45 _1522_small_3.mov /tmp/data/stitched_video_frames/_1522_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1522_small_3.mov
46 _1525_small_1.mov /tmp/data/stitched_video_frames/_1525_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1525_small_1.mov
47 _1532_small_0.mov /tmp/data/stitched_video_frames/_1532_small_0 http://csr.bu.edu/asl/private/downloads/2002_8... _1532_small_0.mov
48 _1535_small_3.mov /tmp/data/stitched_video_frames/_1535_small_3 http://csr.bu.edu/asl/private/downloads/2002_8... _1535_small_3.mov
49 _1538_small_1.mov /tmp/data/stitched_video_frames/_1538_small_1 http://csr.bu.edu/asl/private/downloads/2002_8... _1538_small_1.mov
```python df_merged_extraction_results.columns = ['segment_fname', 'frames', 'segment_dicts'] df_merged_extraction_results ```
segment_fname frames segment_dicts
0 _1523_small_3.mov 129 [{'target_video_fname': '_1523_small_3.mov', '...
1 _1531_small_0.mov 121 [{'target_video_fname': '_1531_small_0.mov', '...
2 _1537_small_0.mov 71 [{'target_video_fname': '_1537_small_0.mov', '...
3 _1523_small_1.mov 129 [{'target_video_fname': '_1523_small_1.mov', '...
4 _1525_small_1.mov 71 [{'target_video_fname': '_1525_small_1.mov', '...
5 _1525_small_0.mov 71 [{'target_video_fname': '_1525_small_0.mov', '...
6 _1538_small_1.mov 89 [{'target_video_fname': '_1538_small_1.mov', '...
7 _1536_small_3.mov 81 [{'target_video_fname': '_1536_small_3.mov', '...
8 _1537_small_1.mov 71 [{'target_video_fname': '_1537_small_1.mov', '...
9 _1513_small_3.mov 83 [{'target_video_fname': '_1513_small_3.mov', '...
10 _1524_small_3.mov 65 [{'target_video_fname': '_1524_small_3.mov', '...
11 _1513_small_1.mov 83 [{'target_video_fname': '_1513_small_1.mov', '...
12 _1536_small_0.mov 81 [{'target_video_fname': '_1536_small_0.mov', '...
13 _1516_small_3.mov 81 [{'target_video_fname': '_1516_small_3.mov', '...
14 _1533_small_3.mov 93 [{'target_video_fname': '_1533_small_3.mov', '...
15 _1510_small_0.mov 89 [{'target_video_fname': '_1510_small_0.mov', '...
16 _1512_small_3.mov 85 [{'target_video_fname': '_1512_small_3.mov', '...
17 _1523_small_0.mov 129 [{'target_video_fname': '_1523_small_0.mov', '...
18 _1531_small_3.mov 121 [{'target_video_fname': '_1531_small_3.mov', '...
19 _1526_small_1.mov 81 [{'target_video_fname': '_1526_small_1.mov', '...
20 _1524_small_0.mov 65 [{'target_video_fname': '_1524_small_0.mov', '...
21 _1532_small_3.mov 63 [{'target_video_fname': '_1532_small_3.mov', '...
22 _1508_small_3.mov 89 [{'target_video_fname': '_1508_small_3.mov', '...
23 _1522_small_1.mov 81 [{'target_video_fname': '_1522_small_1.mov', '...
24 _1531_small_1.mov 121 [{'target_video_fname': '_1531_small_1.mov', '...
25 _1535_small_3.mov 111 [{'target_video_fname': '_1535_small_3.mov', '...
26 _1510_small_1.mov 89 [{'target_video_fname': '_1510_small_1.mov', '...
27 _1524_small_1.mov 65 [{'target_video_fname': '_1524_small_1.mov', '...
28 _1533_small_1.mov 93 [{'target_video_fname': '_1533_small_1.mov', '...
29 _1512_small_1.mov 85 [{'target_video_fname': '_1512_small_1.mov', '...
30 _1538_small_0.mov 89 [{'target_video_fname': '_1538_small_0.mov', '...
31 _1538_small_3.mov 89 [{'target_video_fname': '_1538_small_3.mov', '...
32 _1516_small_0.mov 81 [{'target_video_fname': '_1516_small_0.mov', '...
33 _1516_small_1.mov 81 [{'target_video_fname': '_1516_small_1.mov', '...
34 _1536_small_1.mov 81 [{'target_video_fname': '_1536_small_1.mov', '...
35 _1535_small_0.mov 111 [{'target_video_fname': '_1535_small_0.mov', '...
36 _1525_small_3.mov 71 [{'target_video_fname': '_1525_small_3.mov', '...
37 _1535_small_1.mov 111 [{'target_video_fname': '_1535_small_1.mov', '...
38 _1512_small_0.mov 85 [{'target_video_fname': '_1512_small_0.mov', '...
39 _1526_small_3.mov 81 [{'target_video_fname': '_1526_small_3.mov', '...
40 _1537_small_3.mov 71 [{'target_video_fname': '_1537_small_3.mov', '...
41 _1532_small_1.mov 63 [{'target_video_fname': '_1532_small_1.mov', '...
42 _1522_small_0.mov 81 [{'target_video_fname': '_1522_small_0.mov', '...
43 _1513_small_0.mov 83 [{'target_video_fname': '_1513_small_0.mov', '...
44 _1533_small_0.mov 93 [{'target_video_fname': '_1533_small_0.mov', '...
45 _1510_small_3.mov 89 [{'target_video_fname': '_1510_small_3.mov', '...
46 _1526_small_0.mov 81 [{'target_video_fname': '_1526_small_0.mov', '...
47 _1539_small_0.mov 111 [{'target_video_fname': '_1539_small_0.mov', '...
48 _1532_small_0.mov 63 [{'target_video_fname': '_1532_small_0.mov', '...
49 _1522_small_3.mov 81 [{'target_video_fname': '_1522_small_3.mov', '...
```python print(f"We extracted {df_merged_extraction_results.frames.sum()} frames from {df_merged_extraction_results.segment_fname.count()} downloaded segments.") ``` We extracted 4382 frames from 50 downloaded segments.


## Conclusion This notebook hopefully adequately demonstrated the power of data processing parallelism that can be accomplished by using Apache Beam. Remember, the full pipeline is executed in the GCP Dataflow environment. This project was really only feasible doing it in that environment. The following statistics demonstrate this.


### Trying to do it locally, without Apache Beam As I said when I began this summary, when I first started my work on this project, I did some initial testing using the Pandas `DataFrame` approach. First, I ran out of space and my MacBook Pro was very upset with me. Second, the sequence didn't complete. Third, I let the process run for more than 18 hours. When I rebooted, I found the sequence had only progressed about 20% of the way through the entire list of videos. Of course, since I required a way to relate other information about the videos and frames, such as the data about the ASL consultant, camera perspective, etc., I had the "brilliant" idea to stuff each and every image byte into a table (csv). No wonder I ran out of space. I estimated that had I let it finish, assuming I had enough local disk space to do so, it would have taken close to a week to do so.


### And with Apache Beam using GCP Dataflow as the runner, coupled with GCS for storage Whereas, **by executing the pipeline in GCP Dataflow, the entire pipeline took a little over 4 hours to complete, including the validation-train split** (which will be discussed in another blog post.). See the snapshot below.


![Dataflow-Jobs-All-ETL](./Dataflow-Jobs-All-ETL.png)


#### Other Statistics ##### Total Frame Count: 561,000+ ##### Total Disk Space: 17 GB+ ##### 19 Datasets But only 2 are used to train the DNN. Please keep in mind that **I did this, basically at half resolution, at 30 FPS when the videos were produced at 60 FPS**. **I had to compromise *somewhere***.


### Final Report The total calendar time from start to finish was a little over four months. That seems like a long time. But this was a MASSIVE project and before I realized I bit off more than I could chew initially, I was already committed. So I persevered, did A LOT of research, and learned how to do Big Data. During that time frame I worked on this nearly every day, taking time off to sleep and when I needed a sanity check. Most of the time I worked an average of probably at least 10 hours per day. One thing I will say is that Apache Beam is not for the light-hearted. It is fairly low-level. This is not the Pandas framework, folks. One needs to build nearly everything oneself. BUT... the catch is that one can do Big Data. One can do data processing the likes of which would not otherwise be possible on a single, local machine. That's BIG! It's not called "Big Data" for nothing. In spite of all the pain, I give Apache Beam 5 out of 5 stars. It was well worth the blood, sweat, and tears. So... thank you and goodnight!