crawl function

Future<CrawlResult> crawl(
  1. List<Uri> seeds,
  2. Set<String> hostGlobs,
  3. bool shouldCheckExternal,
  4. UrlSkipper skipper,
  5. bool verbose,
  6. bool ansiTerm,
  7. Stream stopSignal,
  8. Stdout stdout,
)

Implementation

Future<CrawlResult> crawl(
    List<Uri> seeds,
    Set<String> hostGlobs,
    bool shouldCheckExternal,
    UrlSkipper skipper,
    bool verbose,
    bool ansiTerm,
    Stream<dynamic> stopSignal,
    Stdout stdout) async {
  // Redirect output to injected [stdout] for better testing.
  void print(Object message) => stdout.writeln(message);

  Cursor? cursor;
  TextPen? pen;
  if (ansiTerm) {
    Console.init();
    cursor = Cursor();
    pen = TextPen();
  }

  if (verbose) {
    print('Crawl will start on the following URLs: $seeds');
    print('Crawl will check pages only on URLs satisfying: $hostGlobs');
    print('Crawl will skip links that match patterns: $skipper');
  }

  final uriGlobs = hostGlobs.map((glob) => UriGlob(glob)).toList();

  // Maps from URLs (without fragment) to where their corresponding destination
  // lives.
  final bin = <String, Bin>{};

  // The queue of destinations that haven't been tried yet. Destinations in
  // the front of the queue take precedence.
  final open = Queue<Destination>.from(seeds
      .map((uri) => Destination(uri)
        ..isSeed = true
        ..isSource = true
        ..isExternal = false)
      .toSet());
  for (final destination in open) {
    bin[destination.url] = Bin.open;
  }

  // Queue for the external destinations.
  final openExternal = Queue<Destination>();

  final inProgress = <Destination>{};

  // The set of destinations that have been tried.
  final closed = <Destination>{};

  // Servers we are connecting to.
  final servers = <String, ServerInfo>{};
  final unknownServers = Queue<String>();
  final serversInProgress = <String>{};
  seeds.map((uri) => uri.authority).toSet().forEach((String host) {
    servers[host] = ServerInfo(host);
    unknownServers.add(host);
  });

  if (verbose) {
    print('Crawl will check the following servers (and their robots.txt) '
        'first: $unknownServers');
  }

  // Crate the links Set.
  final links = <Link>{};

  int threads;
  if (shouldCheckExternal ||
      seeds.any(
          (seed) => seed.host != 'localhost' && seed.host != '127.0.0.1')) {
    threads = defaultThreads;
  } else {
    threads = localhostOnlyThreads;
  }
  if (verbose) print('Using $threads threads.');

  final pool = Pool(threads, hostGlobs);
  await pool.spawn();

  var count = 0;
  if (!verbose) {
    if (cursor != null) {
      cursor.write('Crawling: $count');
    } else {
      print('Crawling...');
    }
  }

  // TODO:
  // - --cache for creating a .linkcheck.cache file

  final allDone = Completer<void>();

  // Respond to Ctrl-C
  late final StreamSubscription<void> stopSignalSubscription;
  stopSignalSubscription = stopSignal.listen((dynamic _) async {
    if (pen != null) {
      pen
          .text('\n')
          .red()
          .text('Ctrl-C')
          .normal()
          .text(' Terminating crawl.')
          .print();
    } else {
      print('\nSIGINT: Terminating crawl');
    }
    await pool.close();
    allDone.complete();
    await stopSignalSubscription.cancel();
  });

  /// Creates new jobs and sends them to the Pool of Workers, if able.
  void sendNewJobs() {
    while (unknownServers.isNotEmpty && pool.anyIdle) {
      final host = unknownServers.removeFirst();
      pool.checkServer(host);
      serversInProgress.add(host);
      if (verbose) {
        print('Checking robots.txt and availability of server: $host');
      }
    }

    bool serverIsKnown(Destination destination) =>
        servers.keys.contains(destination.uri.authority);

    final availableDestinations =
        _zip(open.where(serverIsKnown), openExternal.where(serverIsKnown));

    // In order not to touch the underlying iterables, we keep track
    // of the destinations we want to remove.
    final destinationsToRemove = <Destination>[];

    for (final destination in availableDestinations) {
      if (pool.allBusy) break;

      destinationsToRemove.add(destination);

      final host = destination.uri.authority;
      final server = servers[host];
      if (server == null || server.hasNotConnected) {
        destination.didNotConnect = true;
        closed.add(destination);
        bin[destination.url] = Bin.closed;
        if (verbose) {
          print('Automatically failing $destination because server $host has '
              'failed before.');
        }
        continue;
      }

      final serverBouncer = server.bouncer;
      if (serverBouncer != null &&
          !serverBouncer.allows(destination.uri.path)) {
        destination.wasDeniedByRobotsTxt = true;
        closed.add(destination);
        bin[destination.url] = Bin.closed;
        if (verbose) {
          print('Skipping $destination because of robots.txt at $host.');
        }
        continue;
      }

      final delay = server.getThrottlingDuration();
      if (delay > ServerInfo.minimumDelay) {
        // Some other worker is already waiting with a checkPage request.
        // Let's try and see if we have more interesting options down the
        // iterable. Do not remove it.
        destinationsToRemove.remove(destination);
        continue;
      }

      final worker = pool.checkPage(destination, delay);
      server.markRequestStart(delay);
      if (verbose) {
        print('Added: $destination to $worker with '
            '${delay.inMilliseconds}ms delay');
      }
      inProgress.add(destination);
      bin[destination.url] = Bin.inProgress;
    }

    for (final destination in destinationsToRemove) {
      open.remove(destination);
      openExternal.remove(destination);
    }

    if (unknownServers.isEmpty &&
        open.isEmpty &&
        openExternal.isEmpty &&
        pool.allIdle) {
      allDone.complete();
      return;
    }
  }

  // Respond to new server info from Worker
  pool.serverCheckResults.listen((ServerInfoUpdate result) {
    serversInProgress.remove(result.host);
    servers
        .putIfAbsent(result.host, () => ServerInfo(result.host))
        .updateFromServerCheck(result);
    if (verbose) {
      print('Server check of ${result.host} complete.');
    }

    if (verbose) {
      count += 1;
      print('Server check for ${result.host} complete: '
          "${result.didNotConnect ? 'didn\'t connect' : 'connected'}, "
          "${result.robotsTxtContents.isEmpty ? 'no robots.txt' : 'robots.txt found'}.");
    } else {
      if (cursor != null) {
        cursor.moveLeft(count.toString().length);
        count += 1;
        cursor.write(count.toString());
      } else {
        count += 1;
      }
    }

    sendNewJobs();
  });

  // Respond to fetch results from a Worker
  pool.fetchResults.listen((FetchResults result) {
    assert(bin[result.checked.url] == Bin.inProgress);

    // Find the destination this result is referring to.
    final destinations = inProgress
        .where((dest) => dest.url == result.checked.url)
        .toList(growable: false);
    if (destinations.isEmpty) {
      if (verbose) {
        print("WARNING: Received result for a destination that isn't in "
            'the inProgress set: $result');
        final isInOpen =
            open.where((dest) => dest.url == result.checked.url).isNotEmpty;
        final isInOpenExternal = openExternal
            .where((dest) => dest.url == result.checked.url)
            .isNotEmpty;
        final isInClosed =
            closed.where((dest) => dest.url == result.checked.url).isNotEmpty;
        print('- the url is in open: $isInOpen; '
            'in open external: $isInOpenExternal, in closed: $isInClosed');
      }
      return;
    } else if (destinations.length > 1) {
      if (verbose) {
        print('WARNING: Received result for a url (${result.checked.url} '
            'that matches several objects in the inProgress set: '
            '$destinations');
      }
      return;
    }
    final checked = destinations.single;

    inProgress.remove(checked);
    checked.updateFromResult(result.checked);

    if (verbose) {
      count += 1;
      print('Done checking: $checked (${checked.statusDescription}) '
          '=> ${result.links.length} links');
      if (checked.isBroken) {
        print('- BROKEN');
      }
    } else {
      if (cursor != null) {
        cursor.moveLeft(count.toString().length);
        count += 1;
        cursor.write(count.toString());
      } else {
        count += 1;
      }
    }

    closed.add(checked);
    bin[checked.url] = Bin.closed;

    final newDestinations = <Destination>{};

    // Add links' destinations to [newDestinations] if they haven't been
    // seen before.
    for (final link in result.links) {
      // Mark links as skipped first.
      if (skipper.skips(link.destinationUrlWithFragment)) {
        link.wasSkipped = true;
        if (verbose) {
          print('- will not be checking: ${link.destination} - '
              '${skipper.explain(link.destinationUrlWithFragment)}');
        }
        continue;
      }

      if (bin[link.destination.url] == null) {
        // Completely new destination.
        assert(open.where((d) => d.url == link.destination.url).isEmpty);
        assert(
            openExternal.where((d) => d.url == link.destination.url).isEmpty);
        assert(inProgress.where((d) => d.url == link.destination.url).isEmpty);
        assert(closed.where((d) => d.url == link.destination.url).isEmpty);

        final alreadyOnCurrent = newDestinations.lookup(link.destination);
        if (alreadyOnCurrent != null) {
          if (verbose) {
            print('- destination: ${link.destination} already '
                'seen on this page');
          }
          continue;
        }

        if (verbose) {
          print('- completely new destination: ${link.destination}');
        }

        newDestinations.add(link.destination);
      }
    }

    links.addAll(result.links);

    for (final destination in newDestinations) {
      if (destination.isInvalid) {
        if (verbose) {
          print('Will not be checking: $destination - invalid url');
        }
        continue;
      }

      // Making sure this is set. The next (wasSkipped) section could
      // short-circuit this loop so we have to assign to isExternal here
      // while we have the chance.
      destination.isExternal =
          !uriGlobs.any((glob) => glob.matches(destination.uri));

      if (destination.isUnsupportedScheme) {
        // Don't check unsupported schemes (like mailto:).
        closed.add(destination);
        bin[destination.url] = Bin.closed;
        if (verbose) {
          print('Will not be checking: $destination - unsupported scheme');
        }
        continue;
      }

      // The URL is external and wasn't skipped. We'll find out whether to
      // check it according to the [shouldCheckExternal] option.
      if (destination.isExternal) {
        if (shouldCheckExternal) {
          openExternal.add(destination);
          bin[destination.url] = Bin.openExternal;
          continue;
        } else {
          // Don't check external destinations.
          closed.add(destination);
          bin[destination.url] = Bin.closed;
          if (verbose) {
            print('Will not be checking: $destination - external');
          }
          continue;
        }
      }

      if (destination.isSource) {
        open.addFirst(destination);
        bin[destination.url] = Bin.open;
      } else {
        open.addLast(destination);
        bin[destination.url] = Bin.open;
      }
    }

    // Do any destinations have different hosts? Add them to unknownServers.
    final newHosts = newDestinations
        .where((destination) => !destination.isInvalid)
        .where((destination) => !destination.isUnsupportedScheme)
        .where((destination) => shouldCheckExternal || !destination.isExternal)
        .map((destination) => destination.uri.authority)
        .where((String host) =>
            !unknownServers.contains(host) &&
            !serversInProgress.contains(host) &&
            !servers.keys.contains(host));
    unknownServers.addAll(newHosts);

    // Continue sending new jobs.
    sendNewJobs();
  });

  if (verbose) {
    pool.messages.listen((message) {
      print(message);
    });
  }

  // Start the crawl. First, check servers for robots.txt etc.
  sendNewJobs();

  // This will suspend until after everything is done (or user presses Ctrl-C).
  await allDone.future;

  if (verbose) {
    print('All jobs are done or user pressed Ctrl-C');
  }

  await stopSignalSubscription.cancel();

  if (verbose) {
    print('Deduping destinations');
  }

  // Fix links (dedupe destinations).
  final urlMap = {
    for (final destination in closed) destination.url: destination
  };
  for (final link in links) {
    final canonical = urlMap[link.destination.url];
    // Note: If it wasn't for the possibility to SIGINT the process, we could
    // assert there is exactly one Destination per URL. There might not be,
    // though.
    if (canonical != null) {
      link.destination = canonical;
    }
  }

  if (verbose) {
    print('Closing the isolate pool');
  }

  if (!pool.isShuttingDown) {
    await pool.close();
  }

  assert(open.isEmpty);
  assert(closed.every((destination) =>
      destination.wasTried ||
      destination.isUnsupportedScheme ||
      (destination.isExternal && !shouldCheckExternal) ||
      destination.isUnsupportedScheme ||
      destination.wasDeniedByRobotsTxt));

  if (verbose) {
    print('Broken links');
    links.where((link) => link.destination.isBroken).forEach(print);
  }

  return CrawlResult(links, closed);
}