Learn AI Series):Exercise 1: CTC decoder comparison tool.
import numpy as np
class CTCDecoder:
"""Compare greedy vs beam search CTC decoding."""
def __init__(self, charset):
self.charset = charset
self.blank = 0
def greedy_decode(self, logits):
"""Greedy: argmax per timestep, collapse, remove
blanks."""
indices = logits.argmax(axis=1)
decoded = []
prev = None
for idx in indices:
if idx != self.blank and idx != prev:
decoded.append(idx)
prev = idx
return "".join(
self.charset[i - 1] for i in decoded
if 0 < i <= len(self.charset)
)
def beam_search_decode(self, logits, beam_width=5):
"""Beam search: maintain top-k partial
sequences."""
# Each beam: (sequence, log_probability)
beams = [([], 0.0)]
for t in range(logits.shape[0]):
probs = np.exp(
logits[t]
- np.max(logits[t])
)
probs /= probs.sum()
candidates = []
for seq, score in beams:
for c in range(logits.shape[1]):
lp = np.log(probs[c] + 1e-12)
new_seq = seq + [c]
candidates.append(
(new_seq, score + lp)
)
# Keep top beam_width
candidates.sort(
key=lambda x: x[1], reverse=True
)
beams = candidates[:beam_width]
# Decode best beam: collapse + remove blanks
best_seq = beams[0][0]
decoded = []
prev = None
for idx in best_seq:
if idx != self.blank and idx != prev:
decoded.append(idx)
prev = idx
return "".join(
self.charset[i - 1] for i in decoded
if 0 < i <= len(self.charset)
)
def make_hello_logits(num_classes, T=20,
noise_sigma=0.0):
"""Create synthetic logits for 'HELLO'."""
rng = np.random.RandomState(42)
logits = rng.randn(T, num_classes) * 0.1
target = [8, 5, 12, 12, 15] # H E L L O
# Place characters at evenly spaced positions
# with blanks in between
positions = [2, 5, 8, 12, 16]
for pos, char_idx in zip(positions, target):
logits[pos, char_idx] = 5.0
# Blanks between chars
for t in range(T):
if t not in positions:
logits[t, 0] = 3.0
# Add noise
logits += rng.randn(T, num_classes) * noise_sigma
return logits
charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
decoder = CTCDecoder(charset)
num_classes = len(charset) + 1 # +1 for blank
print(f"{'Sigma':<8} {'Greedy':<12} {'Beam':<12}")
print("-" * 32)
for sigma in [0.0, 0.1, 0.5, 1.0, 2.0]:
logits = make_hello_logits(
num_classes, noise_sigma=sigma
)
greedy = decoder.greedy_decode(logits)
beam = decoder.beam_search_decode(
logits, beam_width=5
)
print(f"{sigma:<8} {greedy:<12} {beam:<12}")
At zero noise both decoders produce "HELLO" correctly. As noise increases, greedy decoding starts making errors first because it commits to the single best character at each position without considering how that choice affects the overall sequence. Beam search maintains multiple hypotheses, so it can recover from a noisy timestep if the surrounding timesteps provide enough signal. At very high noise (sigma=2.0), both decoders will struggle -- but beam search degrades more gracefully.
Exercise 2: Text detection post-processor.
import numpy as np
from collections import deque
class TextDetectionPostProcessor:
"""Post-process text detection probability maps."""
def threshold_and_contour(self, prob_map,
threshold=0.5):
"""Binarize and find connected components
via BFS flood-fill."""
h, w = prob_map.shape
binary = (prob_map >= threshold).astype(int)
visited = np.zeros_like(binary, dtype=bool)
components = []
for r in range(h):
for c in range(w):
if binary[r, c] and not visited[r, c]:
# BFS flood-fill
comp = []
queue = deque([(r, c)])
visited[r, c] = True
while queue:
cr, cc = queue.popleft()
comp.append((cr, cc))
for dr, dc in [(-1, 0),
(1, 0),
(0, -1),
(0, 1)]:
nr, nc = cr + dr, cc + dc
if (0 <= nr < h
and 0 <= nc < w
and binary[nr, nc]
and not visited[
nr, nc]):
visited[nr, nc] = True
queue.append((nr, nc))
# Compute bounding box
rows = [p[0] for p in comp]
cols = [p[1] for p in comp]
box = {
"min_row": min(rows),
"max_row": max(rows),
"min_col": min(cols),
"max_col": max(cols),
"area": len(comp),
}
bh = box["max_row"] - box["min_row"]
bw = box["max_col"] - box["min_col"]
box["aspect"] = (
bw / max(bh, 1))
components.append(box)
return components
def filter_boxes(self, boxes, min_area=100,
min_aspect=0.2,
max_aspect=15.0):
kept, removed = [], []
for b in boxes:
if b["area"] < min_area:
removed.append(("too_small", b))
elif b["aspect"] < min_aspect:
removed.append(("too_tall", b))
elif b["aspect"] > max_aspect:
removed.append(("too_wide", b))
else:
kept.append(b)
return kept, removed
# Synthetic probability map: 200x300
rng = np.random.RandomState(42)
prob = np.zeros((200, 300), dtype=np.float32)
# 5 horizontal text strips (width >> height)
for i, row in enumerate([20, 60, 100, 140, 175]):
w_start = 30 + i * 10
prob[row:row + 8, w_start:w_start + 150] = 0.9
# 3 square noise blobs
for cx, cy in [(250, 30), (260, 120), (270, 180)]:
r1 = max(cy - 8, 0)
r2 = min(cy + 8, 200)
c1 = max(cx - 8, 0)
c2 = min(cx + 8, 300)
prob[r1:r2, c1:c2] = 0.85
pp = TextDetectionPostProcessor()
comps = pp.threshold_and_contour(prob)
kept, removed = pp.filter_boxes(comps)
print(f"Total components: {len(comps)}")
print(f"\nKept ({len(kept)}):")
for b in kept:
print(f" area={b['area']:>5}, "
f"aspect={b['aspect']:.1f}")
print(f"\nRemoved ({len(removed)}):")
for reason, b in removed:
print(f" {reason}: area={b['area']:>5}, "
f"aspect={b['aspect']:.1f}")
The horizontal text strips have high aspect ratios (wide and short -- typical text geometry) so they pass the filter. The square noise blobs have aspect ratios near 1.0, which doesn't match text geometry -- text is almost always significantly wider than it is tall (for horizontal scripts) or taller than wide (for vertical scripts like Chinese columns). The aspect ratio filter is a simple but effective heuristic for separating real text regions from noise.
Exercise 3: OCR accuracy evaluation framework.
import numpy as np
class OCREvaluator:
"""Evaluate OCR accuracy at character and word
level."""
def levenshtein_distance(self, s1, s2):
"""Standard DP edit distance."""
m, n = len(s1), len(s2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(m + 1):
dp[i][0] = i
for j in range(n + 1):
dp[0][j] = j
for i in range(1, m + 1):
for j in range(1, n + 1):
cost = 0 if s1[i - 1] == s2[j - 1] else 1
dp[i][j] = min(
dp[i - 1][j] + 1,
dp[i][j - 1] + 1,
dp[i - 1][j - 1] + cost,
)
return dp[m][n]
def character_accuracy(self, predicted,
ground_truth):
dist = self.levenshtein_distance(
predicted, ground_truth
)
max_len = max(len(predicted),
len(ground_truth), 1)
return 1.0 - dist / max_len
def word_accuracy(self, pred_words, gt_words):
if not gt_words:
return 1.0 if not pred_words else 0.0
matches = sum(
1 for p, g in zip(pred_words, gt_words)
if p.lower() == g.lower()
)
return matches / max(
len(pred_words), len(gt_words)
)
rng = np.random.RandomState(42)
charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnop"
ground_truths = []
for _ in range(20):
length = rng.randint(5, 15)
word = "".join(
rng.choice(list(charset))
for _ in range(length)
)
ground_truths.append(word)
def make_perfect(gt_list):
return list(gt_list)
def make_good(gt_list, rng):
result = []
for s in gt_list:
chars = list(s)
n_subs = rng.randint(1, 3)
for _ in range(n_subs):
pos = rng.randint(0, len(chars))
chars[pos] = rng.choice(list(charset))
result.append("".join(chars))
return result
def make_poor(gt_list, rng):
result = []
for s in gt_list:
if rng.random() < 0.3:
continue # 30% missed
chars = list(s)
n_changes = max(1, int(len(chars) * 0.3))
for _ in range(n_changes):
op = rng.choice(["sub", "ins", "del"])
if op == "sub" and chars:
p = rng.randint(0, len(chars))
chars[p] = rng.choice(list(charset))
elif op == "ins":
p = rng.randint(0, len(chars) + 1)
chars.insert(
p, rng.choice(list(charset)))
elif op == "del" and len(chars) > 1:
p = rng.randint(0, len(chars))
chars.pop(p)
result.append("".join(chars))
# Add random false positives
for _ in range(rng.randint(1, 4)):
length = rng.randint(3, 8)
fp = "".join(
rng.choice(list(charset))
for _ in range(length)
)
result.append(fp)
return result
ev = OCREvaluator()
perfect = make_perfect(ground_truths)
good = make_good(ground_truths, rng)
poor = make_poor(ground_truths, rng)
print(f"{'Engine':<10} {'CharAcc':>8} {'WordAcc':>8} "
f"{'AvgEdit':>8}")
print("-" * 38)
for name, preds in [("perfect", perfect),
("good", good),
("poor", poor)]:
n = min(len(preds), len(ground_truths))
ca = np.mean([
ev.character_accuracy(preds[i],
ground_truths[i])
for i in range(n)
])
wa = ev.word_accuracy(
preds[:n], ground_truths[:n]
)
ed = np.mean([
ev.levenshtein_distance(preds[i],
ground_truths[i])
for i in range(n)
])
print(f"{name:<10} {ca:>8.3f} {wa:>8.3f} "
f"{ed:>8.1f}")
The perfect engine gets character accuracy 1.0 and word accuracy 1.0, as expected -- zero edit distance everywhere. The "good" engine has high character accuracy (most characters correct, just 1-2 substitutions per word) but significantly lower word accuracy (a single wrong character makes the entire word wrong). The "poor" engine shows low scores on both metrics. This gap between character and word accuracy is exactly why both metrics matter in practice: a system might be 95% accurate at the character level but only 70% accurate at the word level, and for many applications (postal codes, account numbers, drug names) it's the word-level accuracy that matters.
Here we go! We've now spent six episodes building a comprehensive visual understanding toolkit: image processing fundamentals (#77), object detection in two parts (#78-79), pixel-level segmentation (#80), pose estimation with tracking (#81), and OCR for text in images (#82). We can classify, detect, segment, track body structure, and read text. That's an extremely powerful set of capabilities for working with individual images.
But the visual world doesn't come in stills. It comes in motion. Video is how humans actually experience the world -- a continuous stream of frames where the changes between frames carry as much information as the frames themselves. A photo of a person with their arm extended could mean anything. A video of that same person shows they're throwing a ball, or waving goodbye, or reaching for a light switch. The temporal dimension is where actions live, and modeling time is what separates video understanding from image understanding ;-)
Can't you just run an image model on every frame? Technically yes, and sometimes that's all you need (as we did with object tracking in episode #81). But frame-by-frame processing misses the point. A YOLO detector running on every frame will give you bounding boxes everywhere, but it can NOT tell you whether the basketball player is dribbling or shooting, whether the car is accelerating or braking, whether the person stumbled and fell or sat down intentionally. Those distinctions exist ONLY in the temporal patterns between frames.
A video is a 4D tensor: (T, H, W, C) -- frames, height, width, channels. At 30fps and 1080p resolution, one second of video is 30 x 1080 x 1920 x 3 = roughly 186 million values. One minute is about 11 billion values. That's orders of magnitude more data than a single image, and it's the first practical problem you hit when building video models.
The solution is subsampling. You can't (and don't need to) process every frame at full resolution. Video models typically pick T frames from the clip -- usually 8, 16, or 32 -- evenly spaced across the clip, and resize each frame to a managable resolution like 224x224 or 256x256:
import torch
import cv2
import numpy as np
def sample_frames(video_path, num_frames=16,
size=224):
"""Uniformly sample frames from a video and
resize them for model input."""
cap = cv2.VideoCapture(video_path)
total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
indices = np.linspace(
0, total - 1, num_frames, dtype=int
)
frames = []
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frame = cv2.resize(frame, (size, size))
frame = cv2.cvtColor(
frame, cv2.COLOR_BGR2RGB
)
frames.append(frame)
cap.release()
# (T, H, W, C) -> (C, T, H, W) for PyTorch
tensor = torch.tensor(
np.stack(frames), dtype=torch.float32
)
tensor = tensor.permute(3, 0, 1, 2) / 255.0
return tensor
clip = sample_frames("basketball.mp4",
num_frames=16)
print(f"Clip tensor: {clip.shape}")
# (3, 16, 224, 224)
Notice the axis reordering: PyTorch 3D convolutions expect (C, T, H, W) with channels first, which is the same convention as 2D convolutions (C, H, W) but with an extra temporal dimension inserted between channels and spatial dimensions. This is a pattern we've seen repeatedly -- as we discussed in episode #45 regarding CNN conventions (channels first for PyTorch, channels last for TensorFlow).
A 2D convolution (episode #45) slides a kernel across height and width. A 3D convolution slides a kernel across height, width, AND time. Where a 2D kernel might be 3x3 (spatial), a 3D kernel is 3x3x3 (temporal x spatial x spatial). It captures local patterns that span both space and time simultaneously -- a hand moving to the right over three consecutive frames, a ball rising through a sequence of positions, a mouth opening to speak.
import torch.nn as nn
class Simple3DCNN(nn.Module):
"""Minimal 3D CNN for action recognition."""
def __init__(self, num_classes=400):
super().__init__()
self.features = nn.Sequential(
# Input: (batch, 3, 16, 224, 224)
nn.Conv3d(
3, 64,
kernel_size=(3, 7, 7),
stride=(1, 2, 2),
padding=(1, 3, 3)),
nn.BatchNorm3d(64),
nn.ReLU(),
nn.MaxPool3d(
kernel_size=(1, 3, 3),
stride=(1, 2, 2),
padding=(0, 1, 1)),
nn.Conv3d(
64, 128,
kernel_size=(3, 3, 3),
padding=(1, 1, 1)),
nn.BatchNorm3d(128),
nn.ReLU(),
nn.MaxPool3d(
kernel_size=(2, 2, 2),
stride=(2, 2, 2)),
nn.Conv3d(
128, 256,
kernel_size=(3, 3, 3),
padding=(1, 1, 1)),
nn.BatchNorm3d(256),
nn.ReLU(),
nn.AdaptiveAvgPool3d((1, 1, 1)),
)
self.classifier = nn.Linear(256, num_classes)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
return self.classifier(x)
model = Simple3DCNN()
clip = torch.randn(2, 3, 16, 224, 224)
output = model(clip)
print(f"Prediction: {output.shape}") # (2, 400)
The first Conv3d layer uses an asymmetric kernel -- (3, 7, 7) means 3 frames temporally but 7x7 spatially. This is deliberate: the initial layer needs a large spatial receptive field (to capture texture and edges, as we learned in episode #45) but only a small temporal window (adjacent frames are very similar, so a large temporal kernel at the first layer would be wasteful). The temporal stride is 1 (preserve all temporal resolution) while the spatial stride is 2 (halve spatial dimensions) because spatial redundancy is much higher than temporal redundancy in early layers.
3D convolutions are expensive though. A 3x3x3 kernel has 27 values per input/output channel pair, compared to 9 for a 2D 3x3 kernel. The seminal C3D paper (Tran et al., 2015) showed that 3D CNNs DO learn useful spatiotemporal features, but training was slow and the models were large.
The efficiency breakthrough came from factorized convolutions. Instead of a full 3D kernel, decompose it into a 2D spatial convolution followed by a 1D temporal convolution. This is the (2+1)D approach used in R(2+1)D (Tran et al., 2018):
class FactorizedConv3d(nn.Module):
"""(2+1)D factorized convolution:
spatial then temporal."""
def __init__(self, in_ch, out_ch, mid_ch=None):
super().__init__()
mid_ch = mid_ch or out_ch
# Spatial: 1x3x3 (each frame independently)
self.spatial = nn.Sequential(
nn.Conv3d(in_ch, mid_ch,
(1, 3, 3), padding=(0, 1, 1)),
nn.BatchNorm3d(mid_ch),
nn.ReLU(),
)
# Temporal: 3x1x1 (across frames at each
# spatial position)
self.temporal = nn.Sequential(
nn.Conv3d(mid_ch, out_ch,
(3, 1, 1), padding=(1, 0, 0)),
nn.BatchNorm3d(out_ch),
nn.ReLU(),
)
def forward(self, x):
return self.temporal(self.spatial(x))
# Compare parameter counts
full_3d = nn.Conv3d(64, 64, (3, 3, 3))
factored = FactorizedConv3d(64, 64)
p_full = sum(p.numel() for p in full_3d.parameters())
p_fact = sum(
p.numel() for p in factored.parameters()
)
print(f"Full 3D params: {p_full:,}")
print(f"Factored params: {p_fact:,}")
print(f"Ratio: {p_fact / p_full:.2f}")
This factorization has two benefits: fewer parameters (2D + 1D < 3D in terms of total kernel values) AND an additional nonlinearity between the spatial and temporal stages. That extra ReLU between the two operations actually increases the model's representational power -- it can represent a larger space of functions than a single 3D kernel of the same total parameter count. Having said that, it is possible for very complex spatiotemporal patterns to be harder to learn with factored convolutions since they force a separation between space and time that may not always be natural.
ViT (episode #54) treats images as sequences of patches. The natural extension to video: treat clips as sequences of spacetime patches. The Video Vision Transformer (ViViT) takes a video of shape (T, H, W) and divides it into non-overlapping tubes of size (t, h, w), flattens each tube into a vector, and processes the entire set with a standard transformer encoder:
class VideoTransformerEncoder(nn.Module):
"""Simplified ViViT-style video encoder."""
def __init__(self, patch_size=(2, 16, 16),
embed_dim=768, num_heads=12,
num_layers=12):
super().__init__()
t, h, w = patch_size
self.patch_embed = nn.Conv3d(
3, embed_dim,
kernel_size=patch_size,
stride=patch_size
)
self.cls_token = nn.Parameter(
torch.randn(1, 1, embed_dim)
)
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim,
nhead=num_heads,
dim_feedforward=embed_dim * 4,
batch_first=True,
norm_first=True,
)
self.transformer = nn.TransformerEncoder(
encoder_layer, num_layers=num_layers
)
self.norm = nn.LayerNorm(embed_dim)
def forward(self, x):
# x: (batch, 3, T, H, W)
patches = self.patch_embed(x)
b, d, t, h, w = patches.shape
# Flatten spatial+temporal -> sequence
patches = patches.flatten(2).permute(0, 2, 1)
# (batch, num_patches, embed_dim)
cls = self.cls_token.expand(b, -1, -1)
tokens = torch.cat([cls, patches], dim=1)
out = self.norm(self.transformer(tokens))
return out[:, 0] # [CLS] token
encoder = VideoTransformerEncoder(
num_layers=2, num_heads=4, embed_dim=256
)
clip = torch.randn(1, 3, 16, 224, 224)
feat = encoder(clip)
print(f"Video feature: {feat.shape}")
# (1, 256) -- single vector per clip
The key challenge with video transformers is the quadratic cost of self-attention (as we discussed in episode #52). A 16-frame, 224x224 video with patch size (2, 16, 16) produces 8 x 14 x 14 = 1,568 tokens. Self-attention over 1,568 tokens requires computing a 1,568 x 1,568 attention matrix at every layer. Double the frame count to 32 and you get 3,136 tokens -- that's a 3,136 x 3,136 matrix. The memory and compute costs grow quadratically, which is why most practical video transformers use factorized attention: first attend across space (within each frame), then attend across time (at each spatial position). This reduces the complexity from O(N^2) where N = T * H' * W' to O(T * (H'W')^2 + H'W' * T^2), which is dramatically cheaper for typical video dimensions.
All the approaches above learn to detect motion implicitly from raw pixel changes. Optical flow makes motion explicit: for each pixel in frame t, it computes a displacement vector (dx, dy) pointing to where that pixel "moved" in frame t+1. The result is a dense motion field across the entire image.
import cv2
import numpy as np
def compute_optical_flow(frame1, frame2):
"""Compute dense optical flow between two frames
using the Farneback method."""
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
flow = cv2.calcOpticalFlowFarneback(
gray1, gray2, None,
pyr_scale=0.5, levels=3, winsize=15,
iterations=3, poly_n=5, poly_sigma=1.2,
flags=0,
)
return flow # (H, W, 2) -- dx, dy per pixel
def flow_to_color(flow):
"""Visualize optical flow as a color image.
HSV encoding: hue = direction,
value = magnitude."""
mag, ang = cv2.cartToPolar(
flow[:, :, 0], flow[:, :, 1]
)
hsv = np.zeros(
(*flow.shape[:2], 3), dtype=np.uint8
)
hsv[:, :, 0] = ang * 180 / np.pi / 2
hsv[:, :, 1] = 255
hsv[:, :, 2] = cv2.normalize(
mag, None, 0, 255, cv2.NORM_MINMAX
)
return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
def compute_flow_stats(flow):
"""Basic statistics from an optical flow field."""
mag = np.sqrt(
flow[:, :, 0] ** 2 + flow[:, :, 1] ** 2
)
return {
"mean_magnitude": float(mag.mean()),
"max_magnitude": float(mag.max()),
"std_magnitude": float(mag.std()),
"dominant_direction": float(
np.arctan2(
flow[:, :, 1].mean(),
flow[:, :, 0].mean()
) * 180 / np.pi
),
}
# Synthetic example: create two frames with
# a shifted rectangle (simulating rightward motion)
frame1 = np.zeros((240, 320, 3), dtype=np.uint8)
frame1[80:160, 100:200] = 200 # white rectangle
frame2 = np.zeros((240, 320, 3), dtype=np.uint8)
frame2[80:160, 110:210] = 200 # shifted 10px right
flow = compute_optical_flow(frame1, frame2)
stats = compute_flow_stats(flow)
print(f"Flow shape: {flow.shape}")
for k, v in stats.items():
print(f" {k}: {v:.2f}")
Optical flow provides a powerful input signal for action recognition. Instead of feeding raw frames to a model, you can feed flow maps. A "running" action has strong horizontal flow in the lower body region. A "waving" action has oscillating flow in the hand region. A "falling" action has strong downward flow across the whole body ;-)
The seminal Two-Stream Networks (Simonyan and Zisserman, 2014) exploited this by running one CNN on RGB frames (the "spatial stream" -- captures appearance) and another CNN on stacked optical flow frames (the "temporal stream" -- captures motion), then combining their predictions. This was one of the first architectures that could reliably recognize actions in video, and the two-stream idea (separate processing of appearance and motion, then fusion) remains influencial even though modern architectures handle both within a single model.
Modern deep learning methods like RAFT (Recurrent All-Pairs Field Transforms, Teed and Deng, 2020) compute optical flow with neural networks themselves, achieving much higher quality than classical methods like Farneback. RAFT iteratively refines flow estimates using a correlation volume that captures all pairwise similarities between pixels in the two frames. It's basically an attention mechanism (episode #51) applied to dense correspondence.
For practical action recognition, torchvision provides pretrained models that you can use right away:
import torch
from torchvision.models.video import (
r3d_18, R3D_18_Weights
)
# Load pretrained R3D (3D ResNet) for Kinetics-400
weights = R3D_18_Weights.DEFAULT
model = r3d_18(weights=weights)
model.eval()
# Preprocess video clip
preprocess = weights.transforms()
# clip: (C, T, H, W) with T=16 frames
clip = torch.randn(3, 16, 112, 112)
clip = preprocess(clip).unsqueeze(0)
with torch.no_grad():
predictions = model(clip)
top5 = predictions.topk(5)
categories = weights.meta["categories"]
for score, idx in zip(top5.values[0],
top5.indices[0]):
print(f"{categories[idx]}: {score:.2f}")
Kinetics-400 is the standard benchmark for video understanding: 400 action classes, 300K+ video clips sourced from YouTube. Classes range from "abseiling" and "air drumming" through "playing guitar", "riding a bike", and "washing dishes" to "yoga" and "zumba". Models trained on Kinetics transfer well to other video understanding tasks, similar to how ImageNet-pretrained CNNs (episode #46) transfer to other image tasks. There's also Kinetics-600 and Kinetics-700 with more classes, and domain-specific benchmarks like ActivityNet (untrimmed long videos) and AVA (spatio-temporal action detection -- labeling WHAT each person is doing WHERE in each frame).
The R3D_18 model above is a 3D ResNet-18 -- it takes the standard ResNet architecture (episode #46) and replaces all 2D convolutions with 3D convolutions, all 2D BatchNorm with 3D BatchNorm, and all 2D pooling with 3D pooling. The residual connections work identically. It's actually a beautifull example of how a proven 2D architecture can be systematically lifted into the temporal dimension.
Kinetics assumes trimmed clips -- each clip contains exactly one action, neatly cropped. Real-world video is untrimmed -- a 10-minute security camera feed contains 3 seconds of someone running, 20 seconds of walking, and 9 minutes of nothing happening. Temporal action detection finds when actions start and end within a long video:
import torch
import torch.nn as nn
class TemporalProposalNet(nn.Module):
"""Generate action proposals in untrimmed video.
Simplified version of BSN/BMN approach."""
def __init__(self, feature_dim=2048,
num_classes=20, seq_len=100):
super().__init__()
# Temporal convolutions over pre-extracted
# features (one vector per snippet)
self.temporal = nn.Sequential(
nn.Conv1d(feature_dim, 512,
kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv1d(512, 256,
kernel_size=3, padding=1),
nn.ReLU(),
)
# Start/end boundary prediction
self.start_head = nn.Conv1d(256, 1, 1)
self.end_head = nn.Conv1d(256, 1, 1)
# Action class prediction
self.cls_head = nn.Conv1d(
256, num_classes, 1
)
def forward(self, features):
# features: (batch, feature_dim, T)
x = self.temporal(features)
starts = torch.sigmoid(self.start_head(x))
ends = torch.sigmoid(self.end_head(x))
classes = self.cls_head(x)
return starts, ends, classes
model = TemporalProposalNet()
# 100 time steps, each with a 2048-dim feature
features = torch.randn(1, 2048, 100)
starts, ends, classes = model(features)
print(f"Start probabilities: {starts.shape}")
# (1, 1, 100) -- per-timestep start probability
print(f"End probabilities: {ends.shape}")
# (1, 1, 100) -- per-timestep end probability
print(f"Class scores: {classes.shape}")
# (1, 20, 100) -- per-timestep class scores
The two-stage pattern here should feel familiar from object detection (episode #78): first generate proposals (temporal segments instead of spatial boxes), then classify each proposal. The start and end heads are analogous to the region proposal network in Faster R-CNN, and the classification head assigns an action label to each proposed segment. The temporal convolutions play the role that spatial convolutions play in image models -- capturing local patterns, but along the time axis instead of height/width.
Processing video at scale requires engineering beyond just the model architecture. Here are the practical strategies that make real-world video systems work:
Temporal stride: don't process every frame. For a 30fps video, sampling every 4th frame (effective 7.5fps) usually captures all meaningful motion. Most human actions unfold over hundreds of milliseconds to seconds, NOT individual 33ms frames. A person throwing a ball takes about 0.5 seconds -- that's 15 frames at 30fps, and you can capture the essential motion with just 4 of those frames.
Keyframe selection: instead of uniform sampling, detect scene changes or motion spikes and sample around them. A 10-minute security camera video might have 3 interesting moments -- uniform sampling wastes compute on the 9+ minutes of nothing.
Sliding window streaming: for real-time applications, you can't wait for the full clip. Process frames as they arrive, maintaining a sliding window buffer:
class StreamingClassifier:
"""Sliding window for streaming video
classification."""
def __init__(self, model, window_size=16,
stride=4):
self.model = model
self.buffer = []
self.window_size = window_size
self.stride = stride
self.frames_since_pred = 0
def process_frame(self, frame_tensor):
"""Add a frame and optionally return a
prediction.
frame_tensor: (C, H, W)
Returns prediction or None.
"""
self.buffer.append(frame_tensor)
if len(self.buffer) > self.window_size:
self.buffer.pop(0)
self.frames_since_pred += 1
if (len(self.buffer) == self.window_size
and self.frames_since_pred
>= self.stride):
clip = torch.stack(
self.buffer
).permute(1, 0, 2, 3).unsqueeze(0)
# clip: (1, C, T, H, W)
self.frames_since_pred = 0
with torch.no_grad():
return self.model(clip)
return None
def reset(self):
self.buffer = []
self.frames_since_pred = 0
# Usage pattern (pseudo-code):
# classifier = StreamingClassifier(model)
# for frame in video_stream:
# pred = classifier.process_frame(frame)
# if pred is not None:
# action = categories[pred.argmax()]
# print(f"Detected: {action}")
The stride parameter controls the trade-off between latency and compute. With stride=1, you get a new prediction for every incoming frame (maximum responsiveness, maximum compute cost). With stride=4, you get a prediction every 4th frame (lower compute, but the system reacts to changes 4 frames later). For a 30fps video with stride=4, you're still getting predictions at 7.5Hz, which is plenty for most real-time applications.
Multi-scale temporal analysis: some actions are fast (a punch, a clap) while others are slow (getting up from a chair, parking a car). A single window size can't capture both well. Production systems often run multiple classifiers at different temporal scales -- a short window for fast actions and a long window for slow ones -- and combine their outputs.
Action recognition is the "ImageNet of video" -- the default benchmark task. But video understanding encompasses much more:
Video captioning: generate a natural language description of what happens in a video. This combines the video encoder (3D CNN or video transformer) with a language decoder (similar to the seq2seq models from episode #50 or the GPT-style decoder from episode #58). The visual features from the video model serve as the "encoder" representation, and the text decoder generates words conditioned on those features.
Video question answering (VideoQA): given a video and a natural language question ("What color is the car that turns left?"), produce an answer. This is the video equivalent of visual question answering and requires both spatial understanding (identifying objects) and temporal reasoning (understanding events and their sequence).
Moment retrieval: given a text query ("the person picks up the red cup"), find the exact temporal segment in the video where that action occurs. This is essentially temporal action detection but conditioned on free-form text instead of a fixed set of action classes.
All of these tasks share the same fundamental challenge: extracting meaningful temporal representations from video data. The architectures we covered today -- 3D CNNs, factorized convolutions, video transformers -- form the backbone of all these downstream applications.
We've now covered the full spectrum of visual understanding tasks: from static images (classification, detection, segmentation, pose, OCR) to dynamic video (action recognition, temporal detection, motion analysis). The vision section of this series is approaching its completion, and there are some fascinating topics left to explore in the generative direction -- how neural networks can create visual content, not just understand it. Image generation has exploded in recent years, and the underlying techniques connect deeply to everything we've built so far.
Exercise 1: Build a 3D convolution parameter analyzer. Create a class Conv3DAnalyzer that: (a) takes a list of 3D convolution specifications as tuples of (in_channels, out_channels, kernel_t, kernel_h, kernel_w) and computes the total parameter count for each layer (including bias), (b) implements factorized_equivalent(spec) that computes the parameter count if each 3D convolution were replaced by a (2+1)D factored pair (spatial 1xHxW followed by temporal Tx1x1), using the output channels as the intermediate channel count, (c) implements flops_estimate(spec, input_t, input_h, input_w) that estimates the number of multiply-accumulate operations for each layer assuming stride=1 and "same" padding (output spatial dimensions equal input dimensions), (d) test with the three layers from the Simple3DCNN model in this episode: (3, 64, 3, 7, 7), (64, 128, 3, 3, 3), (128, 256, 3, 3, 3). Print a comparison table showing full 3D params vs factored params, the savings ratio, and estimated FLOPs for each layer with input dimensions (16, 224, 224), (8, 56, 56), and (4, 28, 28) respectively.
Exercise 2: Implement a temporal action proposal generator. Create a class ActionProposalGenerator that: (a) takes a 1D signal of shape (T,) representing per-frame "actionness" scores (0-1, where high values indicate action is happening), (b) implements generate_proposals(scores, threshold=0.5, min_duration=3) that finds contiguous segments where scores exceed the threshold, merges segments separated by fewer than 2 frames, and filters out segments shorter than min_duration, (c) for each proposal, computes start frame, end frame, duration, mean score within the segment, and peak score, (d) implements nms_temporal(proposals, overlap_threshold=0.3) that performs temporal non-maximum suppression -- if two proposals overlap by more than overlap_threshold fraction of the shorter proposal's duration, keep only the one with the higher mean score, (e) generate a synthetic actionness signal of 200 frames containing 3 clear action peaks (frames 20-40, 80-110, 150-170) with Gaussian noise added, and 2 brief noise spikes (frames 55-57, 130-132). Run the full pipeline and verify that the 3 real actions are detected and the noise spikes are filtered by min_duration. Print the final proposals with their frame ranges and scores.
Exercise 3: Build a video clip feature comparison tool. Create a class VideoFeatureAnalyzer that: (a) simulates video clip features by generating synthetic feature tensors of shape (C, T, H, W) for "similar" clips (same action, small random perturbation) and "different" clips (different random features), (b) implements temporal_pooling(features, method) supporting three methods: "average" (mean across T), "max" (max across T), and "attention" (learned weighted sum -- generate random attention weights, softmax them, then compute weighted average across T), each producing a (C, H, W) tensor, (c) implements clip_similarity(feat1, feat2) that computes cosine similarity between two clip-level features (after flattening spatial dimensions), (d) generates 5 "anchor" clips, 5 "similar" clips (anchor + small noise), and 5 "different" clips (random). For each pooling method, compute similarity between all anchor-similar pairs and all anchor-different pairs, then print a table showing mean similar-pair similarity vs mean different-pair similarity for each method. Verify that all three methods produce higher similarity for similar pairs than different pairs, and discuss which method shows the largest gap.