I have a rather large collection of music that I've built up over the years. Properly organizing it and finding duplicates is time consuming. Over the weekend, I spent some time working on scripts to find duplicates. My first iteration is below. It's basically just an implementation of fdupes (
My script is below, but here's a basic explanation of how it works:
apt-get install fdupes
) that ignores the ID3 header in MP3 files. It uses Mutagen to handle the ID3 stuff.import hashlibAfter that, I came across locality sensitive hashing (LSH) and decided it would be a nice way to speed up duplicate detection for non-exact duplicates. Here's my explanation of LSH in a nutshell. Pick some hash function that will result in collisions with similar values (i.e. concatenating a sampling of data from the value). For example, if your value was a list of four digits, pick the first two and concatenate them. That hash would have lots of collisions for similar values (any that share the same first two digits). Now, extend that hash function to include some randomness in the output. Instead of picking the first two integers in the list, pick a random two. If you hash the value many times, it creates a uniform distribution of hash keys based on the entire value you're hashing. So, similar values will often collide with each other. Once you've hashed all your values in this way, you can simply count up the collisions to find duplicates.
import os
import sys
import mutagen
from mutagen.id3 import ID3
def get_mp3_digest(path):
id3_size = ID3(path).size
fp = open(path)
fp.seek(id3_size) # Igore the ID3 header.
digest = hashlib.md5(fp.read()).hexdigest()
fp.close()
return digest
mp3s = {}
top = sys.argv[1]
for root, dirs, files in os.walk(top):
for f in files:
if f.endswith('.mp3'):
path = os.path.join(root, f)
try:
digest = get_mp3_digest(path)
except mutagen.id3.error, e:
print >>sys.stderr, 'Error generating digest for %r.\n%r' % (path, e)
else:
mp3s.setdefault(digest, []).append(path)
print >>sys.stderr, root
for digest, paths in mp3s.items():
if len(paths) > 1:
for path in paths:
print path
My script is below, but here's a basic explanation of how it works:
- Convert the path into all lower case, strip all non-alpha characters, and concatenate it into one long string.
- Pick a random sample of m letters from the resulting string and concatenate them into a hash key for the path.
- Repeat n times per path.
- For each path and for each generate hash key per path, count up the number of collisions with other paths in the hash table.
- For any paths which collide more than t times, record the path as a duplicate.
import osIt was very snappy and worked quite well for me. There are several natural extensions of this that include looking at ID3 tags. It's possible I'll get around to that as well :)
import random
import re
import sys
import mutagen
from mutagen.mp3 import MP3
NUM_CHARS_TO_PICK = 50 # Number of chars to pick from each path (sample size).
NUM_BUCKETS_PER_PATH = 50 # Number of samples to pick from each path.
MATCH_THRESHOLD = 5 # Number of collisions required to indicate a duplicate.
PERCENT_DIFFERENCE_THRESHOLD = 0.025 # Threshold for matching length.
NONALPHA_RE = re.compile(r'[^a-z]')
def fuzzy_compare_mp3_length(path, other_path):
try:
audio = MP3(path)
other_audio = MP3(other_path)
except mutagen.mp3.error, e:
logging.error('Error comparing quality of %r and %r.\n%r' %
(path, other_path, e))
return False
pd = abs(audio.info.length - other_audio.info.length)
pd /= (audio.info.length + other_audio.info.length) / 2.0
return pd < PERCENT_DIFFERENCE_THRESHOLD
def compare_mp3_quality(path, other_path):
try:
audio = MP3(path)
other_audio = MP3(other_path)
except mutagen.mp3.error, e:
logging.error('Error comparing quality of %r and %r.\n%r' %
(path, other_path, e))
return 0
return audio.info.bitrate - other_audio.info.bitrate
def hash(s):
indexes = random.sample(xrange(len(s)), NUM_CHARS_TO_PICK)
indexes.sort()
return ''.join(s[i] for i in indexes)
buckets = {}
paths = {}
# First we fill buckets using a LSH.
for top in sys.argv[1:]:
for root, dirs, files in os.walk(top):
for f in files:
path = os.path.join(root, f)
name, ext = os.path.splitext(path[len(top):])
if ext != '.mp3':
continue
letters = NONALPHA_RE.sub('', name.lower())
# We need to pad any strings that are too short.
letters += 'x' * (NUM_CHARS_TO_PICK - len(letters))
for n in range(NUM_BUCKETS_PER_PATH):
bucket_key = hash(letters)
buckets.setdefault(bucket_key, []).append(path)
paths.setdefault(path, []).append(bucket_key)
print >>sys.stderr, root
checked = []
for path, path_bucket_keys in paths.iteritems():
# Skip already checked paths. We assume that duplicate detection is
# communitive. If dupe(A, B) and dupe(B, C) then dupe(A, C).
if path in checked:
continue
# Count up all the bucket collisions by path. This count includes our
# current path (it will exist once in all its own buckets)
collisions = {}
for bucket_key in path_bucket_keys:
for other_path in buckets[bucket_key]:
collisions[other_path] = collisions.get(other_path, 0) + 1
# All paths that collided more times than the threshold are duplicates.
dupes = set()
for other_path, count in collisions.iteritems():
if count > MATCH_THRESHOLD:
dupes.add(other_path)
# If we have dupes based on path similarity, sort them by quality and remove
# any dupes with significantly different lengths.
if len(dupes) > 1:
checked.extend(dupes)
dupes = sorted(dupes, cmp=compare_mp3_quality, reverse=True)
for dupe in dupes[1:]:
if not fuzzy_compare_mp3_length(dupes[0], dupe):
dupes.remove(dupe)
# If we still have dupes, output them to the rm script.
if len(dupes) > 1:
print '# rm %r' % dupes[0]
for dupe in dupes[1:]:
print 'rm %r' % dupe