观察事物,需要提纲挈领!软件的启动是观察软件最好的入口!

Prometheus server实现入口文件在cmd/prometheus/main.go

Prometheus的配置由两个部分构成:

  1. 配置文件:prometheus.yml
  2. 启动flag:启动时指定

入口函数如下:

func main() {
    ...
}

初始化一个flagConfig,这个配置贯穿了整个程序

cfg := flagConfig{
  notifier: notifier.Options{
    Registerer: prometheus.DefaultRegisterer,
  },
  web: web.Options{
    Registerer: prometheus.DefaultRegisterer,
    Gatherer:   prometheus.DefaultGatherer,
  },
  promlogConfig: promlog.Config{},
}

使用kingpin注册flag

a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server").UsageWriter(os.Stdout)

a.Version(version.Print("prometheus"))

a.HelpFlag.Short('h')

a.Flag("config.file", "Prometheus configuration file path.").
Default("prometheus.yml").StringVar(&cfg.configFile)

a.Flag("web.listen-address", "Address to listen on for UI, API, and telemetry.").
Default("0.0.0.0:9090").StringVar(&cfg.web.ListenAddress)

...

_, err := a.Parse(os.Args[1:])
if err != nil {
  fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing commandline arguments"))
  a.Usage(os.Args[1:])
  os.Exit(2)
}

加载配置文件,检查配置合法性

if _, err := config.LoadFile(cfg.configFile); err != nil {
  level.Error(logger).Log("msg", fmt.Sprintf("Error loading config (--config.file=%s)", cfg.configFile), "err", err)
  os.Exit(2)
}

初始化各种组件

var (
  localStorage  = &readyStorage{}
  scraper       = &readyScrapeManager{}
  remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
  fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

var (
  ctxWeb, cancelWeb = context.WithCancel(context.Background())
  ctxRule           = context.Background()

  notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))

  ctxScrape, cancelScrape = context.WithCancel(context.Background())
  discoveryManagerScrape  = discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))

  ctxNotify, cancelNotify = context.WithCancel(context.Background())
  discoveryManagerNotify  = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))

  scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)

  opts = promql.EngineOpts{
    Logger:                   log.With(logger, "component", "query engine"),
    Reg:                      prometheus.DefaultRegisterer,
    MaxSamples:               cfg.queryMaxSamples,
    Timeout:                  time.Duration(cfg.queryTimeout),
    ActiveQueryTracker:       promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
    LookbackDelta:            time.Duration(cfg.lookbackDelta),
    NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get,
    EnableAtModifier:         cfg.enablePromQLAtModifier,
    EnableNegativeOffset:     cfg.enablePromQLNegativeOffset,
  }

  queryEngine = promql.NewEngine(opts)

  ruleManager = rules.NewManager(&rules.ManagerOptions{
    Appendable:      fanoutStorage,
    Queryable:       localStorage,
    QueryFunc:       rules.EngineQueryFunc(queryEngine, fanoutStorage),
    NotifyFunc:      sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
    Context:         ctxRule,
    ExternalURL:     cfg.web.ExternalURL,
    Registerer:      prometheus.DefaultRegisterer,
    Logger:          log.With(logger, "component", "rule manager"),
    OutageTolerance: time.Duration(cfg.outageTolerance),
    ForGracePeriod:  time.Duration(cfg.forGracePeriod),
    ResendDelay:     time.Duration(cfg.resendDelay),
  })
)

scraper.Set(scrapeManager)

将配置更新到flagConfig

cfg.web.Context = ctxWeb
cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration
cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes
cfg.web.TSDBDir = cfg.localStoragePath
cfg.web.LocalStorage = localStorage
cfg.web.Storage = fanoutStorage
cfg.web.QueryEngine = queryEngine
cfg.web.ScrapeManager = scrapeManager
cfg.web.RuleManager = ruleManager
cfg.web.Notifier = notifierManager
cfg.web.LookbackDelta = time.Duration(cfg.lookbackDelta)

cfg.web.Version = &web.PrometheusVersion{
  Version:   version.Version,
  Revision:  version.Revision,
  Branch:    version.Branch,
  BuildUser: version.BuildUser,
  BuildDate: version.BuildDate,
  GoVersion: version.GoVersion,
}

注册各种组件的reload函数,main函数的实现精彩之一,结合后面的各种组件启动,非常巧妙的实现了加载操作,

reloaders := []reloader{
  {
    name:     "remote_storage",
    reloader: remoteStorage.ApplyConfig,
  }, {
    name:     "web_handler",
    reloader: webHandler.ApplyConfig,
  }, {
    name: "query_engine",
    reloader: func(cfg *config.Config) error {
      if cfg.GlobalConfig.QueryLogFile == "" {
        queryEngine.SetQueryLogger(nil)
        return nil
      }

      l, err := logging.NewJSONFileLogger(cfg.GlobalConfig.QueryLogFile)
      if err != nil {
        return err
      }
      queryEngine.SetQueryLogger(l)
      return nil
    },
  }, {
    // The Scrape and notifier managers need to reload before the Discovery manager as
    // they need to read the most updated config when receiving the new targets list.
    name:     "scrape",
    reloader: scrapeManager.ApplyConfig,
  }, {
    name: "scrape_sd",
    reloader: func(cfg *config.Config) error {
      c := make(map[string]discovery.Configs)
      for _, v := range cfg.ScrapeConfigs {
        c[v.JobName] = v.ServiceDiscoveryConfigs
      }
      return discoveryManagerScrape.ApplyConfig(c)
    },
  }, {
    name:     "notify",
    reloader: notifierManager.ApplyConfig,
  }, {
    name: "notify_sd",
    reloader: func(cfg *config.Config) error {
      c := make(map[string]discovery.Configs)
      for k, v := range cfg.AlertingConfig.AlertmanagerConfigs.ToMap() {
        c[k] = v.ServiceDiscoveryConfigs
      }
      return discoveryManagerNotify.ApplyConfig(c)
    },
  }, {
    name: "rules",
    reloader: func(cfg *config.Config) error {
      // Get all rule files matching the configuration paths.
      var files []string
      for _, pat := range cfg.RuleFiles {
        fs, err := filepath.Glob(pat)
        if err != nil {
          // The only error can be a bad pattern.
          return errors.Wrapf(err, "error retrieving rule files for %s", pat)
        }
        files = append(files, fs...)
      }
      return ruleManager.Update(
        time.Duration(cfg.GlobalConfig.EvaluationInterval),
        files,
        cfg.GlobalConfig.ExternalLabels,
      )
    },
  },
}

这里声明TSDB、配置加载等的通知channel,用于控制组件启动的顺序

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
  C     chan struct{}
  once  sync.Once
  Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
  C: make(chan struct{}),
}
reloadReady.Close = func() {
  reloadReady.once.Do(func() {
    close(reloadReady.C)
  })
}

使用jaeger,进行链路追踪

closer, err := initTracing(logger)
if err != nil {
  level.Error(logger).Log("msg", "Unable to init tracing", "err", err)
  os.Exit(2)
}
defer closer.Close()

设置web listener以及验证web的配置

listener, err := webHandler.Listener()
if err != nil {
  level.Error(logger).Log("msg", "Unable to start web listener", "err", err)
  os.Exit(1)
}

err = toolkit_web.Validate(*webConfig)
if err != nil {
  level.Error(logger).Log("msg", "Unable to validate web configuration file", "err", err)
  os.Exit(1)
}

然后,来到main函数的高潮,设置各个组件,底层使用了oklog/run,便于一组goroutine的控制,

var g run.Group
{
  // Termination handler.
  term := make(chan os.Signal, 1)
  signal.Notify(term, os.Interrupt, syscall.SIGTERM)
  cancel := make(chan struct{})
  g.Add(
    func() error {
      // Don't forget to release the reloadReady channel so that waiting blocks can exit normally.
      select {
        case <-term:
        level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
        reloadReady.Close()
        case <-webHandler.Quit():
        level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...")
        case <-cancel:
        reloadReady.Close()
      }
      return nil
    },
    func(err error) {
      close(cancel)
    },
  )
}
{
  // Scrape discovery manager.
  g.Add(
    func() error {
      err := discoveryManagerScrape.Run()
      level.Info(logger).Log("msg", "Scrape discovery manager stopped")
      return err
    },
    func(err error) {
      level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
      cancelScrape()
    },
  )
}
{
  // Notify discovery manager.
  g.Add(
    func() error {
      err := discoveryManagerNotify.Run()
      level.Info(logger).Log("msg", "Notify discovery manager stopped")
      return err
    },
    func(err error) {
      level.Info(logger).Log("msg", "Stopping notify discovery manager...")
      cancelNotify()
    },
  )
}
{
  // Scrape manager.
  g.Add(
    func() error {
      // When the scrape manager receives a new targets list
      // it needs to read a valid config for each job.
      // It depends on the config being in sync with the discovery manager so
      // we wait until the config is fully loaded.
      <-reloadReady.C

      err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
      level.Info(logger).Log("msg", "Scrape manager stopped")
      return err
    },
    func(err error) {
      // Scrape manager needs to be stopped before closing the local TSDB
      // so that it doesn't try to write samples to a closed storage.
      level.Info(logger).Log("msg", "Stopping scrape manager...")
      scrapeManager.Stop()
    },
  )
}
{
  // Reload handler.

  // Make sure that sighup handler is registered with a redirect to the channel before the potentially
  // long and synchronous tsdb init.
  hup := make(chan os.Signal, 1)
  signal.Notify(hup, syscall.SIGHUP)
  cancel := make(chan struct{})
  g.Add(
    func() error {
      <-reloadReady.C

      for {
        select {
          case <-hup:
          if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
            level.Error(logger).Log("msg", "Error reloading config", "err", err)
          }
          case rc := <-webHandler.Reload():
          if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
            level.Error(logger).Log("msg", "Error reloading config", "err", err)
            rc <- err
          } else {
            rc <- nil
          }
          case <-cancel:
          return nil
        }
      }

    },
    func(err error) {
      // Wait for any in-progress reloads to complete to avoid
      // reloading things after they have been shutdown.
      cancel <- struct{}{}
    },
  )
}
{
  // Initial configuration loading.
  cancel := make(chan struct{})
  g.Add(
    func() error {
      select {
        case <-dbOpen:
        // In case a shutdown is initiated before the dbOpen is released
        case <-cancel:
        reloadReady.Close()
        return nil
      }

      if err := reloadConfig(cfg.configFile, logger, noStepSubqueryInterval, reloaders...); err != nil {
        return errors.Wrapf(err, "error loading config from %q", cfg.configFile)
      }

      reloadReady.Close()

      webHandler.Ready()
      level.Info(logger).Log("msg", "Server is ready to receive web requests.")
      <-cancel
      return nil
    },
    func(err error) {
      close(cancel)
    },
  )
}
{
  // Rule manager.
  g.Add(
    func() error {
      <-reloadReady.C
      ruleManager.Run()
      return nil
    },
    func(err error) {
      ruleManager.Stop()
    },
  )
}
{
  // TSDB.
  opts := cfg.tsdb.ToTSDBOptions()
  cancel := make(chan struct{})
  g.Add(
    func() error {
      level.Info(logger).Log("msg", "Starting TSDB ...")
      if cfg.tsdb.WALSegmentSize != 0 {
        if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
          return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
        }
      }
      db, err := openDBWithMetrics(
        cfg.localStoragePath,
        logger,
        prometheus.DefaultRegisterer,
        &opts,
      )
      if err != nil {
        return errors.Wrapf(err, "opening storage failed")
      }

      switch fsType := prom_runtime.Statfs(cfg.localStoragePath); fsType {
        case "NFS_SUPER_MAGIC":
        level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
        default:
        level.Info(logger).Log("fs_type", fsType)
      }

      level.Info(logger).Log("msg", "TSDB started")
      level.Debug(logger).Log("msg", "TSDB options",
                              "MinBlockDuration", cfg.tsdb.MinBlockDuration,
                              "MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
                              "MaxBytes", cfg.tsdb.MaxBytes,
                              "NoLockfile", cfg.tsdb.NoLockfile,
                              "RetentionDuration", cfg.tsdb.RetentionDuration,
                              "WALSegmentSize", cfg.tsdb.WALSegmentSize,
                              "AllowOverlappingBlocks", cfg.tsdb.AllowOverlappingBlocks,
                              "WALCompression", cfg.tsdb.WALCompression,
                             )

      startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000)
      localStorage.Set(db, startTimeMargin)
      close(dbOpen)
      <-cancel
      return nil
    },
    func(err error) {
      if err := fanoutStorage.Close(); err != nil {
        level.Error(logger).Log("msg", "Error stopping storage", "err", err)
      }
      close(cancel)
    },
  )
}
{
  // Web handler.
  g.Add(
    func() error {
      if err := webHandler.Run(ctxWeb, listener, *webConfig); err != nil {
        return errors.Wrapf(err, "error starting web server")
      }
      return nil
    },
    func(err error) {
      cancelWeb()
    },
  )
}
{
  // Notifier.

  // Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
  // so keep this interrupt after the ruleManager.Stop().
  g.Add(
    func() error {
      // When the notifier manager receives a new targets list
      // it needs to read a valid config for each job.
      // It depends on the config being in sync with the discovery manager
      // so we wait until the config is fully loaded.
      <-reloadReady.C

      notifierManager.Run(discoveryManagerNotify.SyncCh())
      level.Info(logger).Log("msg", "Notifier manager stopped")
      return nil
    },
    func(err error) {
      notifierManager.Stop()
    },
  )
}

最后启动g,整体程序就运行起来了,g.Run正常运行期间不会返回,一旦上面注册的一个goroutine失败,整体退出

if err := g.Run(); err != nil {
  level.Error(logger).Log("err", err)
  os.Exit(1)
}
level.Info(logger).Log("msg", "See you next time!")

下一篇我们开始研究Prometheus server的Initial configuration loading代码。