dotfiles

My personal shell configs and stuff
git clone git://git.alex.balgavy.eu/dotfiles.git
Log | Files | Refs | Submodules | README | LICENSE

commit 8dff29304b7ee923d8b4c6c810d5812a7be027f9
parent fbc653c426bbcd9e7447023c417182b841470066
Author: Alex Balgavy <a.balgavy@gmail.com>
Date:   Sun, 29 Sep 2019 14:07:07 -0400

adb tools for syncing with phone


Former-commit-id: 8bbc0cb492dd2bd7b86a68d16c46fdcf0b1009cc
Diffstat:
Abin/adb-channel | 22++++++++++++++++++++++
Abin/adb-sync | 883+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 905 insertions(+), 0 deletions(-)

diff --git a/bin/adb-channel b/bin/adb-channel @@ -0,0 +1,22 @@ +#!/bin/sh + +set -e + +t=`mktemp -d -t adb-channel.XXXXXX` + +remote=${1} +activity=${2} +delay=${3} + +atexit() { + [ -z "${activity}" ] || adb shell am force-stop ${activity%%/*} + adb forward --remove localfilesystem:"${t}/sock" + rm -rf "${t}" +} +trap atexit EXIT +trap 'exit 0' HUP INT ALRM TERM + +[ -z "${activity}" ] || adb shell -n am start -W ${activity} +[ -z "${delay}" ] || sleep "${delay}" +adb forward localfilesystem:"${t}/sock" "${remote}" +socat stdio unix:"${t}/sock" diff --git a/bin/adb-sync b/bin/adb-sync @@ -0,0 +1,883 @@ +#!/usr/bin/env python3 + +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Sync files from/to an Android device.""" + +from __future__ import unicode_literals +import argparse +import locale +import logging +import os +import re +import stat +import subprocess +import time +from types import TracebackType +from typing import Callable, cast, Dict, List, IO, Iterable, Optional, Tuple, Type + + +class OSLike(object): + + def listdir(self, path: bytes) -> Iterable[bytes]: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + def lstat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + def stat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + def unlink(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + def rmdir(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + def makedirs(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + def utime(self, path: bytes, times: Tuple[float, float]) -> None: # os's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + +class GlobLike(object): + + def glob(self, path: bytes) -> Iterable[bytes]: # glob's name, so pylint: disable=g-bad-name + raise NotImplementedError('Abstract') + + +class Stdout(object): + + def __init__(self, args: List[bytes]) -> None: + """Closes the process's stdout when done. + + Usage: + with Stdout(...) as stdout: + DoSomething(stdout) + + Args: + args: Which program to run. + + Returns: + An object for use by 'with'. + """ + self.popen = subprocess.Popen(args, stdout=subprocess.PIPE) + + def __enter__(self) -> IO: + return self.popen.stdout + + def __exit__(self, exc_type: Optional[Type[BaseException]], + exc_val: Optional[Exception], + exc_tb: Optional[TracebackType]) -> bool: + self.popen.stdout.close() + if self.popen.wait() != 0: + raise OSError('Subprocess exited with nonzero status.') + return False + + +class AdbFileSystem(GlobLike, OSLike): + """Mimics os's file interface but uses the adb utility.""" + + def __init__(self, adb: List[bytes]) -> None: + self.stat_cache = {} # type: Dict[bytes, os.stat_result] + self.adb = adb + + # Regarding parsing stat results, we only care for the following fields: + # - st_size + # - st_mtime + # - st_mode (but only about S_ISDIR and S_ISREG properties) + # Therefore, we only capture parts of 'ls -l' output that we actually use. + # The other fields will be filled with dummy values. + LS_TO_STAT_RE = re.compile( + br"""^ + (?: + (?P<S_IFREG> -) | + (?P<S_IFBLK> b) | + (?P<S_IFCHR> c) | + (?P<S_IFDIR> d) | + (?P<S_IFLNK> l) | + (?P<S_IFIFO> p) | + (?P<S_IFSOCK> s)) + [-r][-w][-xsS] + [-r][-w][-xsS] + [-r][-w][-xtT] # Mode string. + [ ]+ + (?: + [0-9]+ # number of hard links + [ ]+ + )? + [^ ]+ # User name/ID. + [ ]+ + [^ ]+ # Group name/ID. + [ ]+ + (?(S_IFBLK) [^ ]+[ ]+[^ ]+[ ]+) # Device numbers. + (?(S_IFCHR) [^ ]+[ ]+[^ ]+[ ]+) # Device numbers. + (?(S_IFDIR) [0-9]+ [ ]+)? # directory Size. + (?(S_IFREG) + (?P<st_size> [0-9]+) # Size. + [ ]+) + (?P<st_mtime> + [0-9]{4}-[0-9]{2}-[0-9]{2} # Date. + [ ] + [0-9]{2}:[0-9]{2}) # Time. + [ ] + # Don't capture filename for symlinks (ambiguous). + (?(S_IFLNK) .* | (?P<filename> .*)) + $""", re.DOTALL | re.VERBOSE) + + def LsToStat(self, line: bytes) -> Tuple[os.stat_result, bytes]: + """Convert a line from 'ls -l' output to a stat result. + + Args: + line: Output line of 'ls -l' on Android. + + Returns: + os.stat_result for the line. + + Raises: + OSError: if the given string is not a 'ls -l' output line (but maybe an + error message instead). + """ + + match = self.LS_TO_STAT_RE.match(line) + if match is None: + logging.error('Could not parse %r.', line) + raise OSError('Unparseable ls -al result.') + groups = match.groupdict() + + # Get the values we're interested in. + st_mode = ( # 0755 + stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + | stat.S_IXOTH) + if groups['S_IFREG']: + st_mode |= stat.S_IFREG + if groups['S_IFBLK']: + st_mode |= stat.S_IFBLK + if groups['S_IFCHR']: + st_mode |= stat.S_IFCHR + if groups['S_IFDIR']: + st_mode |= stat.S_IFDIR + if groups['S_IFIFO']: + st_mode |= stat.S_IFIFO + if groups['S_IFLNK']: + st_mode |= stat.S_IFLNK + if groups['S_IFSOCK']: + st_mode |= stat.S_IFSOCK + st_size = None if groups['st_size'] is None else int(groups['st_size']) + st_mtime = int( + time.mktime( + time.strptime( + match.group('st_mtime').decode('ascii'), '%Y-%m-%d %H:%M'))) + + # Fill the rest with dummy values. + st_ino = 1 + st_rdev = 0 + st_nlink = 1 + st_uid = -2 # Nobody. + st_gid = -2 # Nobody. + st_atime = st_ctime = st_mtime + + stbuf = os.stat_result((st_mode, st_ino, st_rdev, st_nlink, st_uid, st_gid, + st_size, st_atime, st_mtime, st_ctime)) + filename = groups['filename'] + return stbuf, filename + + def QuoteArgument(self, arg: bytes) -> bytes: + # Quotes an argument for use by adb shell. + # Usually, arguments in 'adb shell' use are put in double quotes by adb, + # but not in any way escaped. + arg = arg.replace(b'\\', b'\\\\') + arg = arg.replace(b'"', b'\\"') + arg = arg.replace(b'$', b'\\$') + arg = arg.replace(b'`', b'\\`') + arg = b'"' + arg + b'"' + return arg + + def IsWorking(self) -> bool: + """Tests the adb connection.""" + # This string should contain all possible evil, but no percent signs. + # Note this code uses 'date' and not 'echo', as date just calls strftime + # while echo does its own backslash escape handling additionally to the + # shell's. Too bad printf "%s\n" is not available. + test_strings = [ + b'(', b'(; #`ls`$PATH\'"(\\\\\\\\){};!\xc0\xaf\xff\xc2\xbf' + ] + for test_string in test_strings: + good = False + with Stdout(self.adb + + [b'shell', + b'date +%s' % (self.QuoteArgument(test_string),)]) as stdout: + for line in stdout: + line = line.rstrip(b'\r\n') + if line == test_string: + good = True + if not good: + return False + return True + + def listdir(self, path: bytes) -> Iterable[bytes]: # os's name, so pylint: disable=g-bad-name + """List the contents of a directory, caching them for later lstat calls.""" + with Stdout(self.adb + + [b'shell', + b'ls -al %s' % (self.QuoteArgument(path + b'/'),)]) as stdout: + for line in stdout: + if line.startswith(b'total '): + continue + line = line.rstrip(b'\r\n') + try: + statdata, filename = self.LsToStat(line) + except OSError: + continue + if filename is None: + logging.error('Could not parse %r.', line) + else: + self.stat_cache[path + b'/' + filename] = statdata + yield filename + + def lstat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name + """Stat a file.""" + if path in self.stat_cache: + return self.stat_cache[path] + with Stdout( + self.adb + + [b'shell', b'ls -ald %s' % (self.QuoteArgument(path),)]) as stdout: + for line in stdout: + if line.startswith(b'total '): + continue + line = line.rstrip(b'\r\n') + statdata, _ = self.LsToStat(line) + self.stat_cache[path] = statdata + return statdata + raise OSError('No such file or directory') + + def stat(self, path: bytes) -> os.stat_result: # os's name, so pylint: disable=g-bad-name + """Stat a file.""" + if path in self.stat_cache and not stat.S_ISLNK( + self.stat_cache[path].st_mode): + return self.stat_cache[path] + with Stdout( + self.adb + + [b'shell', b'ls -aldL %s' % (self.QuoteArgument(path),)]) as stdout: + for line in stdout: + if line.startswith(b'total '): + continue + line = line.rstrip(b'\r\n') + statdata, _ = self.LsToStat(line) + self.stat_cache[path] = statdata + return statdata + raise OSError('No such file or directory') + + def unlink(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name + """Delete a file.""" + if subprocess.call( + self.adb + [b'shell', b'rm %s' % (self.QuoteArgument(path),)]) != 0: + raise OSError('unlink failed') + + def rmdir(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name + """Delete a directory.""" + if subprocess.call( + self.adb + + [b'shell', b'rmdir %s' % (self.QuoteArgument(path),)]) != 0: + raise OSError('rmdir failed') + + def makedirs(self, path: bytes) -> None: # os's name, so pylint: disable=g-bad-name + """Create a directory.""" + if subprocess.call( + self.adb + + [b'shell', b'mkdir -p %s' % (self.QuoteArgument(path),)]) != 0: + raise OSError('mkdir failed') + + def utime(self, path: bytes, times: Tuple[float, float]) -> None: + # TODO(rpolzer): Find out why this does not work (returns status 255). + """Set the time of a file to a specified unix time.""" + atime, mtime = times + timestr = time.strftime('%Y%m%d.%H%M%S', + time.localtime(mtime)).encode('ascii') + if subprocess.call( + self.adb + + [b'shell', + b'touch -mt %s %s' % (timestr, self.QuoteArgument(path))]) != 0: + raise OSError('touch failed') + timestr = time.strftime('%Y%m%d.%H%M%S', + time.localtime(atime)).encode('ascii') + if subprocess.call( + self.adb + + [b'shell', + b'touch -at %s %s' % (timestr, self.QuoteArgument(path))]) != 0: + raise OSError('touch failed') + + def glob(self, path: bytes) -> Iterable[bytes]: # glob's name, so pylint: disable=g-bad-name + with Stdout( + self.adb + + [b'shell', b'for p in %s; do echo "$p"; done' % (path,)]) as stdout: + for line in stdout: + yield line.rstrip(b'\r\n') + + def Push(self, src: bytes, dst: bytes) -> None: + """Push a file from the local file system to the Android device.""" + if subprocess.call(self.adb + [b'push', src, dst]) != 0: + raise OSError('push failed') + + def Pull(self, src: bytes, dst: bytes) -> None: + """Pull a file from the Android device to the local file system.""" + if subprocess.call(self.adb + [b'pull', src, dst]) != 0: + raise OSError('pull failed') + + +def BuildFileList(fs: OSLike, path: bytes, follow_links: bool, + prefix: bytes) -> Iterable[Tuple[bytes, os.stat_result]]: + """Builds a file list. + + Args: + fs: File system provider (can be os or AdbFileSystem()). + path: Initial path. + follow_links: Whether to follow symlinks while iterating. May recurse + endlessly. + prefix: Path prefix for output file names. + + Yields: + File names from path (prefixed by prefix). + Directories are yielded before their contents. + """ + try: + if follow_links: + statresult = fs.stat(path) + else: + statresult = fs.lstat(path) + except OSError: + return + if stat.S_ISDIR(statresult.st_mode): + yield prefix, statresult + try: + files = fs.listdir(path) + except OSError: + return + for n in files: + if n == b'.' or n == b'..': + continue + for t in BuildFileList(fs, path + b'/' + n, follow_links, + prefix + b'/' + n): + yield t + elif stat.S_ISREG(statresult.st_mode): + yield prefix, statresult + elif stat.S_ISLNK(statresult.st_mode) and not follow_links: + yield prefix, statresult + else: + logging.info('Unsupported file: %r.', path) + + +def DiffLists(a: Iterable[Tuple[bytes, os.stat_result]], + b: Iterable[Tuple[bytes, os.stat_result]] + ) -> Tuple[List[Tuple[bytes, os.stat_result]], List[ + Tuple[bytes, os.stat_result, os + .stat_result]], List[Tuple[bytes, os.stat_result]]]: + """Compares two lists. + + Args: + a: the first list. + b: the second list. + + Returns: + a_only: the items from list a. + both: the items from both list, with the remaining tuple items combined. + b_only: the items from list b. + """ + a_only = [] # type: List[Tuple[bytes, os.stat_result]] + b_only = [] # type: List[Tuple[bytes, os.stat_result]] + both = [] # type: List[Tuple[bytes, os.stat_result, os.stat_result]] + + a_revlist = sorted(a) + a_revlist.reverse() + b_revlist = sorted(b) + b_revlist.reverse() + + while True: + if not a_revlist: + b_only.extend(reversed(b_revlist)) + break + if not b_revlist: + a_only.extend(reversed(a_revlist)) + break + a_item = a_revlist[len(a_revlist) - 1] + b_item = b_revlist[len(b_revlist) - 1] + if a_item[0] == b_item[0]: + both.append((a_item[0], a_item[1], b_item[1])) + a_revlist.pop() + b_revlist.pop() + elif a_item[0] < b_item[0]: + a_only.append(a_item) + a_revlist.pop() + elif a_item[0] > b_item[0]: + b_only.append(b_item) + b_revlist.pop() + else: + raise + + return a_only, both, b_only + + +class DeleteInterruptedFile(object): + + def __init__(self, dry_run: bool, fs: OSLike, name: bytes) -> None: + """Sets up interrupt protection. + + Usage: + with DeleteInterruptedFile(False, fs, name): + DoSomething() + + If DoSomething() should get interrupted, the file 'name' will be deleted. + The exception otherwise will be passed on. + + Args: + dry_run: If true, we don't actually delete. + fs: File system object. + name: File name to delete. + + Returns: + An object for use by 'with'. + """ + self.dry_run = dry_run + self.fs = fs + self.name = name + + def __enter__(self) -> None: + pass + + def __exit__(self, exc_type: Optional[Type[BaseException]], + exc_val: Optional[Exception], + exc_tb: Optional[TracebackType]) -> bool: + if exc_type is not None: + logging.info('Interrupted-%s-Delete: %r', + 'Pull' if self.fs == os else 'Push', self.name) + if not self.dry_run: + self.fs.unlink(self.name) + return False + + +class FileSyncer(object): + """File synchronizer.""" + + def __init__(self, adb: AdbFileSystem, local_path: bytes, remote_path: bytes, + local_to_remote: bool, remote_to_local: bool, + preserve_times: bool, delete_missing: bool, + allow_overwrite: bool, allow_replace: bool, copy_links: bool, + dry_run: bool) -> None: + self.local = local_path + self.remote = remote_path + self.adb = adb + self.local_to_remote = local_to_remote + self.remote_to_local = remote_to_local + self.preserve_times = preserve_times + self.delete_missing = delete_missing + self.allow_overwrite = allow_overwrite + self.allow_replace = allow_replace + self.copy_links = copy_links + self.dry_run = dry_run + self.num_bytes = 0 + self.start_time = time.time() + + # Attributes filled in later. + local_only = None # type: List[Tuple[bytes, os.stat_result]] + both = None # type: List[Tuple[bytes, os.stat_result, os.stat_result]] + remote_only = None # type: List[Tuple[bytes, os.stat_result]] + src_to_dst = None # type: Tuple[bool, bool] + dst_to_src = None # type: Tuple[bool, bool] + src_only = None # type: Tuple[List[Tuple[bytes, os.stat_result]], List[Tuple[bytes, os.stat_result]]] + dst_only = None # type: Tuple[List[Tuple[bytes, os.stat_result]], List[Tuple[bytes, os.stat_result]]] + src = None # type: Tuple[bytes, bytes] + dst = None # type: Tuple[bytes, bytes] + dst_fs = None # type: Tuple[OSLike, OSLike] + push = None # type: Tuple[str, str] + copy = None # type: Tuple[Callable[[bytes, bytes], None], Callable[[bytes, bytes], None]] + + def IsWorking(self) -> bool: + """Tests the adb connection.""" + return self.adb.IsWorking() + + def ScanAndDiff(self) -> None: + """Scans the local and remote locations and identifies differences.""" + logging.info('Scanning and diffing...') + locallist = BuildFileList( + cast(OSLike, os), self.local, self.copy_links, b'') + remotelist = BuildFileList(self.adb, self.remote, self.copy_links, b'') + self.local_only, self.both, self.remote_only = DiffLists( + locallist, remotelist) + if not self.local_only and not self.both and not self.remote_only: + logging.warning('No files seen. User error?') + self.src_to_dst = (self.local_to_remote, self.remote_to_local) + self.dst_to_src = (self.remote_to_local, self.local_to_remote) + self.src_only = (self.local_only, self.remote_only) + self.dst_only = (self.remote_only, self.local_only) + self.src = (self.local, self.remote) + self.dst = (self.remote, self.local) + self.dst_fs = (self.adb, cast(OSLike, os)) + self.push = ('Push', 'Pull') + self.copy = (self.adb.Push, self.adb.Pull) + + def PerformDeletions(self) -> None: + """Perform all deleting necessary for the file sync operation.""" + if not self.delete_missing: + return + for i in [0, 1]: + if self.src_to_dst[i] and not self.dst_to_src[i]: + if not self.src_only[i] and not self.both: + logging.error('Cowardly refusing to delete everything.') + else: + for name, s in reversed(self.dst_only[i]): + dst_name = self.dst[i] + name + logging.info('%s-Delete: %r', self.push[i], dst_name) + if stat.S_ISDIR(s.st_mode): + if not self.dry_run: + self.dst_fs[i].rmdir(dst_name) + else: + if not self.dry_run: + self.dst_fs[i].unlink(dst_name) + del self.dst_only[i][:] + + def PerformOverwrites(self) -> None: + """Delete files/directories that are in the way for overwriting.""" + src_only_prepend = ( + [], [] + ) # type: Tuple[List[Tuple[bytes, os.stat_result]], List[Tuple[bytes, os.stat_result]]] + for name, localstat, remotestat in self.both: + if stat.S_ISDIR(localstat.st_mode) and stat.S_ISDIR(remotestat.st_mode): + # A dir is a dir is a dir. + continue + elif stat.S_ISDIR(localstat.st_mode) or stat.S_ISDIR(remotestat.st_mode): + # Dir vs file? Nothing to do here yet. + pass + else: + # File vs file? Compare sizes. + if localstat.st_size == remotestat.st_size: + continue + l2r = self.local_to_remote + r2l = self.remote_to_local + if l2r and r2l: + # Truncate times to full minutes, as Android's "ls" only outputs minute + # accuracy. + localminute = int(localstat.st_mtime / 60) + remoteminute = int(remotestat.st_mtime / 60) + if localminute > remoteminute: + r2l = False + elif localminute < remoteminute: + l2r = False + if l2r and r2l: + logging.warning('Unresolvable: %r', name) + continue + if l2r: + i = 0 # Local to remote operation. + src_stat = localstat + dst_stat = remotestat + else: + i = 1 # Remote to local operation. + src_stat = remotestat + dst_stat = localstat + dst_name = self.dst[i] + name + logging.info('%s-Delete-Conflicting: %r', self.push[i], dst_name) + if stat.S_ISDIR(localstat.st_mode) or stat.S_ISDIR(remotestat.st_mode): + if not self.allow_replace: + logging.info('Would have to replace to do this. ' + 'Use --force to allow this.') + continue + if not self.allow_overwrite: + logging.info('Would have to overwrite to do this, ' + 'which --no-clobber forbids.') + continue + if stat.S_ISDIR(dst_stat.st_mode): + kill_files = [ + x for x in self.dst_only[i] if x[0][:len(name) + 1] == name + b'/' + ] + self.dst_only[i][:] = [ + x for x in self.dst_only[i] if x[0][:len(name) + 1] != name + b'/' + ] + for l, s in reversed(kill_files): + if stat.S_ISDIR(s.st_mode): + if not self.dry_run: + self.dst_fs[i].rmdir(self.dst[i] + l) + else: + if not self.dry_run: + self.dst_fs[i].unlink(self.dst[i] + l) + if not self.dry_run: + self.dst_fs[i].rmdir(dst_name) + elif stat.S_ISDIR(src_stat.st_mode): + if not self.dry_run: + self.dst_fs[i].unlink(dst_name) + else: + if not self.dry_run: + self.dst_fs[i].unlink(dst_name) + src_only_prepend[i].append((name, src_stat)) + for i in [0, 1]: + self.src_only[i][:0] = src_only_prepend[i] + + def PerformCopies(self) -> None: + """Perform all copying necessary for the file sync operation.""" + for i in [0, 1]: + if self.src_to_dst[i]: + for name, s in self.src_only[i]: + src_name = self.src[i] + name + dst_name = self.dst[i] + name + logging.info('%s: %r', self.push[i], dst_name) + if stat.S_ISDIR(s.st_mode): + if not self.dry_run: + self.dst_fs[i].makedirs(dst_name) + else: + with DeleteInterruptedFile(self.dry_run, self.dst_fs[i], dst_name): + if not self.dry_run: + self.copy[i](src_name, dst_name) + if stat.S_ISREG(s.st_mode): + self.num_bytes += s.st_size + if not self.dry_run: + if self.preserve_times: + logging.info('%s-Times: accessed %s, modified %s', self.push[i], + time.asctime(time.localtime(s.st_atime)), + time.asctime(time.localtime(s.st_mtime))) + self.dst_fs[i].utime(dst_name, (s.st_atime, s.st_mtime)) + + def TimeReport(self) -> None: + """Report time and amount of data transferred.""" + if self.dry_run: + logging.info('Total: %d bytes', self.num_bytes) + else: + end_time = time.time() + dt = end_time - self.start_time + rate = self.num_bytes / 1024.0 / dt + logging.info('Total: %d KB/s (%d bytes in %.3fs)', rate, self.num_bytes, + dt) + + +def ExpandWildcards(globber: GlobLike, path: bytes) -> Iterable[bytes]: + if path.find(b'?') == -1 and path.find(b'*') == -1 and path.find(b'[') == -1: + return [path] + return globber.glob(path) + + +def FixPath(src: bytes, dst: bytes) -> Tuple[bytes, bytes]: + # rsync-like path munging to make remote specifications shorter. + append = b'' + pos = src.rfind(b'/') + if pos >= 0: + if src.endswith(b'/'): + # Final slash: copy to the destination "as is". + pass + else: + # No final slash: destination name == source name. + append = src[pos:] + else: + # No slash at all - use same name at destination. + append = b'/' + src + # Append the destination file name if any. + # BUT: do not append "." or ".." components! + if append != b'/.' and append != b'/..': + dst += append + return (src, dst) + + +def main() -> None: + logging.basicConfig(level=logging.INFO) + + parser = argparse.ArgumentParser( + description='Synchronize a directory between an Android device and the ' + 'local file system') + parser.add_argument( + 'source', + metavar='SRC', + type=str, + nargs='+', + help='The directory to read files/directories from. ' + 'This must be a local path if -R is not specified, ' + 'and an Android path if -R is specified. If SRC does ' + 'not end with a final slash, its last path component ' + 'is appended to DST (like rsync does).') + parser.add_argument( + 'destination', + metavar='DST', + type=str, + help='The directory to write files/directories to. ' + 'This must be an Android path if -R is not specified, ' + 'and a local path if -R is specified.') + parser.add_argument( + '-e', + '--adb', + metavar='COMMAND', + default='adb', + type=str, + help='Use the given adb binary and arguments.') + parser.add_argument( + '--device', + action='store_true', + help='Directs command to the only connected USB device; ' + 'returns an error if more than one USB device is present. ' + 'Corresponds to the "-d" option of adb.') + parser.add_argument( + '--emulator', + action='store_true', + help='Directs command to the only running emulator; ' + 'returns an error if more than one emulator is running. ' + 'Corresponds to the "-e" option of adb.') + parser.add_argument( + '-s', + '--serial', + metavar='DEVICE', + type=str, + help='Directs command to the device or emulator with ' + 'the given serial number or qualifier. Overrides ' + 'ANDROID_SERIAL environment variable. Use "adb devices" ' + 'to list all connected devices with their respective serial number. ' + 'Corresponds to the "-s" option of adb.') + parser.add_argument( + '-H', + '--host', + metavar='HOST', + type=str, + help='Name of adb server host (default: localhost). ' + 'Corresponds to the "-H" option of adb.') + parser.add_argument( + '-P', + '--port', + metavar='PORT', + type=str, + help='Port of adb server (default: 5037). ' + 'Corresponds to the "-P" option of adb.') + parser.add_argument( + '-R', + '--reverse', + action='store_true', + help='Reverse sync (pull, not push).') + parser.add_argument( + '-2', + '--two-way', + action='store_true', + help='Two-way sync (compare modification time; after ' + 'the sync, both sides will have all files in the ' + 'respective newest version. This relies on the clocks ' + 'of your system and the device to match.') + parser.add_argument( + '-t', + '--times', + action='store_true', + help='Preserve modification times when copying.') + parser.add_argument( + '-d', + '--delete', + action='store_true', + help='Delete files from DST that are not present on ' + 'SRC. Mutually exclusive with -2.') + parser.add_argument( + '-f', + '--force', + action='store_true', + help='Allow deleting files/directories when having to ' + 'replace a file by a directory or vice versa. This is ' + 'disabled by default to prevent large scale accidents.') + parser.add_argument( + '-n', + '--no-clobber', + action='store_true', + help='Do not ever overwrite any ' + 'existing files. Mutually exclusive with -f.') + parser.add_argument( + '-L', + '--copy-links', + action='store_true', + help='transform symlink into referent file/dir') + parser.add_argument( + '--dry-run', + action='store_true', + help='Do not do anything - just show what would be done.') + args = parser.parse_args() + + localpatterns = [os.fsencode(x) for x in args.source] + remotepath = os.fsencode(args.destination) + adb_args = os.fsencode(args.adb).split(b' ') + if args.device: + adb_args += [b'-d'] + if args.emulator: + adb_args += [b'-e'] + if args.serial: + adb_args += [b'-s', os.fsencode(args.serial)] + if args.host: + adb_args += [b'-H', os.fsencode(args.host)] + if args.port: + adb_args += [b'-P', os.fsencode(args.port)] + adb = AdbFileSystem(adb_args) + + # Expand wildcards, but only on the remote side. + localpaths = [] + remotepaths = [] + if args.reverse: + for pattern in localpatterns: + for src in ExpandWildcards(adb, pattern): + src, dst = FixPath(src, remotepath) + localpaths.append(src) + remotepaths.append(dst) + else: + for src in localpatterns: + src, dst = FixPath(src, remotepath) + localpaths.append(src) + remotepaths.append(dst) + + preserve_times = args.times + delete_missing = args.delete + allow_replace = args.force + allow_overwrite = not args.no_clobber + copy_links = args.copy_links + dry_run = args.dry_run + local_to_remote = True + remote_to_local = False + if args.two_way: + local_to_remote = True + remote_to_local = True + if args.reverse: + local_to_remote, remote_to_local = remote_to_local, local_to_remote + localpaths, remotepaths = remotepaths, localpaths + if allow_replace and not allow_overwrite: + logging.error('--no-clobber and --force are mutually exclusive.') + parser.print_help() + return + if delete_missing and local_to_remote and remote_to_local: + logging.error('--delete and --two-way are mutually exclusive.') + parser.print_help() + return + + # Two-way sync is only allowed with disjoint remote and local path sets. + if (remote_to_local and local_to_remote) or delete_missing: + if ((remote_to_local and len(localpaths) != len(set(localpaths))) or + (local_to_remote and len(remotepaths) != len(set(remotepaths)))): + logging.error( + '--two-way and --delete are only supported for disjoint sets of ' + 'source and destination paths (in other words, all SRC must ' + 'differ in basename).') + parser.print_help() + return + + for i in range(len(localpaths)): + logging.info('Sync: local %r, remote %r', localpaths[i], remotepaths[i]) + syncer = FileSyncer(adb, localpaths[i], remotepaths[i], local_to_remote, + remote_to_local, preserve_times, delete_missing, + allow_overwrite, allow_replace, copy_links, dry_run) + if not syncer.IsWorking(): + logging.error('Device not connected or not working.') + return + try: + syncer.ScanAndDiff() + syncer.PerformDeletions() + syncer.PerformOverwrites() + syncer.PerformCopies() + finally: + syncer.TimeReport() + + +if __name__ == '__main__': + main()