Pandora.Apache.Avro.IDL.To.Apache.Parquet 0.11.32

dotnet add package Pandora.Apache.Avro.IDL.To.Apache.Parquet --version 0.11.32
NuGet\Install-Package Pandora.Apache.Avro.IDL.To.Apache.Parquet -Version 0.11.32
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Pandora.Apache.Avro.IDL.To.Apache.Parquet" Version="0.11.32" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Pandora.Apache.Avro.IDL.To.Apache.Parquet --version 0.11.32
#r "nuget: Pandora.Apache.Avro.IDL.To.Apache.Parquet, 0.11.32"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Pandora.Apache.Avro.IDL.To.Apache.Parquet as a Cake Addin
#addin nuget:?package=Pandora.Apache.Avro.IDL.To.Apache.Parquet&version=0.11.32

// Install Pandora.Apache.Avro.IDL.To.Apache.Parquet as a Cake Tool
#tool nuget:?package=Pandora.Apache.Avro.IDL.To.Apache.Parquet&version=0.11.32

Pandora.Apache.Avro.IDL.To.Apache.Parquet

CI / CD

Table of Contents

  1. Background

  2. How to use the library

    1. Package dependencies

    2. Package imports

    3. Generating random AVRO data

    4. Logger and DataLakeServiceClient

    5. Loop-logic

    6. Delta-control files (optional)

    7. Main method

  3. How to contribute

    1. Package dependencies

    2. Package imports

    3. Logger

    4. isNullable and fieldToType

    5. Iterating over local AVSC files

    6. Generate directed graphs

    7. Generate SVG and PNG files

  4. Project dependencies

    1. Library

    2. Samples

    3. Unit Tests

Background

Currently, when working with Apache Kafka® and Azure Databricks® (Apache Spark®), there is a built-in mechanism to transform Apache Avro® data to Apache Parquet® files. The issue with this approach, if we think in medallion lakehouse architecture, is that AVRO with nested data, will be persisted in a single PARQUET file in the bronze layer (full, raw and unprocessed history of each dataset) relying on ArrayType, MapType and StructType to represent the nested data. This will make it a bit more tedious to post-process data respectively in the following layers: silver (validated and deduplicated data) and gold (data as knowledge).

Medallion lakehouse architecture
Figure 1: Delta lake medallion architecture and data mesh

To avoid this issue, we present an open-source library, that will help transform AVRO, with nested data, to multiple PARQUET files where each of the nested data elements will be represented as an extension table (separate file). This will allow to merge both the bronze and silver layers (full, raw and history of each dataset combined with defined structure, enforced schemas as well validated and deduplicated data), to make it easier for data engineers/scientists and business analysts to combine data with already known logic (SQL joins) and tools.

Azure Databricks notebook
Figure 2: Azure Databricks python notebook and SQL cell

As two of the medallion layers are being combined to a single, it might lead to the possible saving of a ⅓ in disk usage and hereby using fewer servers and less computing power. Furthermore, since we aren't relying on a naive approach, when flattening and storing data, it could further lead to greater savings and a more sustainable and environmentally friendly approach.

Green Software Foundation
Figure 3: Green Software Foundation with the Linux Foundation to put sustainability at the core of software engineering

Back to TOC

How to use the library

In order to show how to use the library to convert AVRO nested data to PARQUET files, we will rely on some succinct demo script snippets. The fully working script is available at: ./demo/avroidl2parquet.fsx.

Back to TOC

Package dependencies (A2P)

#r "nuget: Azure.Storage.Files.DataLake,              12.12.01"
#r "nuget: Microsoft.Extensions.Logging,               7.00.00"
#r "nuget: Newtonsoft.Json,                           13.00.02"
#r "nuget: Pandora.Apache.Avro.IDL.To.Apache.Parquet,  0.11.21"

// Specify the local Sample DLL file
#I @"../Pandora.Apache.Avro.IDL.To.Apache.Parquet.Samples/bin/Release/net6.0/"
#r @"Pandora.Apache.Avro.IDL.To.Apache.Parquet.Samples.dll"

For this demo script, besides our own package, we will need the following Microsoft packages:

  • Azure.Storage.Files.DataLake: To deliver the created PARQUET and CONTROL files to the delta-lake.
  • Microsoft.Extensions.Logging: Our library needs an instance of an ILogger.

Furthermore, we will also need:

  • Newtonsoft.Json: This package is needed to parse and pass the AVRO schema to transform it into a PARQUET schema.

And finally, we will be using a local dotnet project, containing some of the AVRO IDL test samples, taken from Apache AVRO on GitHub.

Back to TOC

Package imports (A2P)

Once we have added the packages to our script, we can then import the following namespaces:

open Microsoft.Extensions.Logging

open Azure.Storage.Files.DataLake
open Azure.Storage.Files.DataLake.Models

open Newtonsoft.Json
open Newtonsoft.Json.Linq

open Pandora.Apache
open Pandora.Databricks
open Pandora.Utils

open org.apache.avro
open org.apache.avro.test

Back to TOC

Generating random AVRO data

In order to generate random AVRO IDL data, we will rely on the following module, which will serialize the specific data-types and deserialize into generic types:

[<RequireQualifiedAccess>]
module Test =
  
  open System.Collections.Generic
  
  let private r = new Random()
  
  // local:
  // - org.apache.avro.Interop
  let private interop () =
    let m = new org.apache.avro.MD5 ()
    m.Value <- Array.init 16 (fun _ -> 0x30uy)
    
    let s = new Node ()
    s.label    <- String.Empty
    s.children <- [| |]
    
    let n = new Node ()
    n.label    <- String.Empty
    n.children <- [| s |]
    
    let f = new Foo ()
    f.label <- "label"
    let d = new Dictionary<string,Foo>()
    d.["foo"] <- f
    
    let i = new Interop ()
    
    i.stringField <- String.Empty
    i.nullField   <- null
    i.mapField    <- d
    i.unionField  <-
      [| 42.0                              :> obj
      ;  [| "bytes is a byte sequence"B |] :> obj
      ;  true                              :> obj
      |][r.Next(0,3)]
    i.enumField   <- Kind.A
    i.fixedField  <- m
    i.recordField <- n
    
    i
    |> Avro.Bytes.Specific.serialize
    |> Avro.Bytes.Generic.deserialize (i.Schema.ToString())
  
  // local:
  // - org.apache.avro.test.TestRecord
  let private testRecord () =
    let m = new org.apache.avro.test.MD5()
    m.Value <- Array.init 16 (fun _ -> 0x30uy)
    
    let t = new TestRecord ()
    
    t.name         <- "name"
    t.kind         <- Kind.BAZ
    t.status       <- Status.A
    t.hash         <- m
    t.nullableHash <-
      [| null
      ;  m
      |][r.Next(0,2)]
    t.value        <- 42.0
    t.average      <- 42.0f
    t.t            <-
      [| Unchecked.defaultof<TimeSpan>
      ;  TimeSpan.Zero
      |][r.Next(0,2)]
    t.l            <- 42L
    t.a            <- [| "string array" |]
    t.prop         <-
      [| null
      ;  "foobar"
      |][r.Next(0,2)]
    
    t
    |> Avro.Bytes.Specific.serialize
    |> Avro.Bytes.Generic.deserialize (t.Schema.ToString())
  
  
  let private cases =
    [| interop
    ;  testRecord
    |]
  
  let randomEvent () =
    let i =
      r.Next
        ( 0
        , Array.length cases
        )
    cases[i] ()

Back to TOC

Logger and DataLakeServiceClient

As our library require to pass an ILogger we can easily create one as:

let logger () =
  let lf = new LoggerFactory ()
  lf.CreateLogger ()

For the DataLakeServiceClient we can create the following value (dlsc), which can then be used in the rest of the script without having to send it as a function parameter:

let dlsc =
  "AZURE_DATALAKE_ENV_CONN_STR"
  |> Environment.GetEnvironmentVariable
  |> fun connStr ->
    let opts = new DataLakeClientOptions()
    opts.Retry.NetworkTimeout <- TimeSpan.FromMinutes 15 (* In case network is lost *)
    DataLakeServiceClient
      ( connectionString = connStr
      , options          = opts
      )

Back to TOC

Loop-logic

For the recursive and asynchronous loop logic, we will pass the created logger, a cancellation token and the number of data elements to create of a given AVRO IDL instance.

We will then create a UTC date and timestamp as well as its representation as a date-time offset:

let dts = DateTime.UtcNow
let off = new DateTimeOffset(dts)

Next step is to define the values for the environment, AST and PARQUET tables:

let env = Parquet.Schema.Ast.Environment.empty ()
let ast = Parquet.Schema.Ast.empty ()
      
let tabs = Parquet.Tables.empty log ast

We create a sequence of AVRO IDL test events:

Seq.init n (
  fun _ ->
    Test.randomEvent ()
)

and we then transform them to PARQUET tables:

…
|> Seq.iteri (
  fun i gen ->
    let idx = i + 1
    let sha =
      gen
      |> Avro.Bytes.Generic.serialize
      |> Hash.SHA256.Bytes.toBytes
    
    let gn  = gen.Schema.Name
    let gns = gen.Schema.Namespace
    
    let fqdn =
      Parquet.Schema.Ast.Fqdn.FQDN
        (      gn
        , Some gns
        )
          
    let (env', ast', es) =
      if not (ast.ContainsKey fqdn) then
        gen.Schema.ToString()
        |> JToken.Parse
        |> Avro.Schema.toParquetSchema log None env ast
      else
        ( env
        , ast
        , Seq.empty
        )
          
    if Seq.isEmpty es then
      let tabs' = Parquet.Tables.update log (Some tabs) ast'
    
      Parquet.Tables.populate
        log
        off
        (Some sha) None None
        ast'
        gen
        gn (Some gns)
        tabs'
      if 0 = idx % m then
        ( Date.timestamp 0
        , sprintf "%032i" i
        )
        ||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Generated data items: %s"
        |> Output.stdout
    else
      Date.timestamp 0
      |> printfn "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | Errors:"
      es
      |> Seq.iter (
        fun e ->
          ( Date.timestamp 0
          , e
          )
          ||> sprintf "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | - %s"
          |> Output.stdout
      )
)

NOTE: If a given schema is already in the AST, we will skip it, as we are only parsing once a given AVRO IDL schema to a PARQUET schema.

Once we have generated the PARQUET tables, we will transform them to bytes and then store them on the data lake. For this, we will need to define a file system client:

let fsc =
  "AZURE_DATALAKE_DELTA_BLOB"
  |> Environment.GetEnvironmentVariable
  |> dlsc.GetFileSystemClient

Afterwards, we will iterate over the generated tables, which aren't empty, generate the bytes and then store then in the Azure Tables Storage:

tabs
|> Seq.filter (
  fun table -> 0 < table.Value.Count
)
|> Seq.map (
  fun table ->
    ( table
    , table.Value
      |> Parquet.Tables.toBytes log dts 
    )
)
|> Seq.iter(
  fun (table, parquet) ->
    let thash =
      table.Value.Schema.ToString()
      |> Hash.SHA256.sum
    
    let ppath =
      Path.Combine
        ( "AZURE_DATALAKE_DELTA_PATH"
          |> Environment.GetEnvironmentVariable
        , table.Key.Replace(".", "/")
        , thash
        , dts.ToString("yyyy-MM-dd")
          |> sprintf "pj_pds=%s"
        )
          
    (* Submit PARQUET file to Azure Table Storage with enabled Delta Lake *)
    let isppath =
      new DataLakePathClient
        ( fileSystemClient = fsc
        , path             = ppath
        )
      |> fun dlpc ->
        dlpc.ExistsAsync
          ( cancellationToken = ct
          )
        |> Async.AwaitTask
        |> Async.RunSynchronously
  
    if not isppath.Value then
      fsc.CreateDirectoryAsync
        ( path              = ppath
        , cancellationToken = ct
        )
      |> Async.AwaitTask
      |> Async.RunSynchronously
      |> ignore
          
    let pdc =
      ppath
      |> fsc.GetDirectoryClient
  
    let pfc =
      parquet.Key
      |> pdc.GetFileClient
          
    let _ =
            
      use ms = new MemoryStream(parquet.Value)
            
      pfc.UploadAsync
        ( content           = ms
        , overwrite         = false
        , cancellationToken = ct
        )
      |> Async.AwaitTask
      |> Async.RunSynchronously
      |> ignore
          
    ( Date.timestamp 0
    , parquet.Key
    )
    ||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Uploaded to the Azure Data Lake: %s"
    |> Output.stdout
)
Parquet folder structure on Azure Table Storage
Figure 4: Parquet folder structure on Azure Table Storage

Back to TOC

Delta-control files (optional)

With the code above, we will only add PARQUET files to the Azure Table Storage, but if we want to get the benefits of the delta lake, we will need to provide a JSONL control for each of the uploaded PARQUET files. This can be achieved by modifying the code above like this:

tabs
|> Seq.filter (
  fun table -> 0 < table.Value.Count
)
|> Seq.map (
  fun table ->
    ( table
    , table.Value
      |> Parquet.Tables.toBytes log dts 
    )
)
|> Seq.iter(
  fun (table, parquet) ->
    …
    (* Submit PARQUET file to Azure Table Storage with enabled Delta Lake *)
    …
    ( Date.timestamp 0
    , parquet.Key
    )
    ||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Uploaded to the Azure Data Lake: %s"
    |> Output.stdout

    (* Submit CONTROL file to Azure Table Storage `_delta_log` folder *)
    let cpath =
      Path.Combine
        ( "AZURE_DATALAKE_DELTA_PATH"
          |> Environment.GetEnvironmentVariable
        , table.Key.Replace(".", "/")
        , thash
        , "_delta_log"
        )
          
    let iscpath =
      new DataLakePathClient
        ( fileSystemClient = fsc
        , path             = cpath
        )
      |> fun dlpc ->
        dlpc.ExistsAsync
          ( cancellationToken = ct
          )
        |> Async.AwaitTask
        |> Async.RunSynchronously
  
    if not iscpath.Value then
      fsc.CreateDirectoryAsync
        ( path              = cpath
        , cancellationToken = ct
        )
      |> Async.AwaitTask
      |> Async.RunSynchronously
      |> ignore
          
    control log ct fsc dts parquet schema cpath
)

where we ensure that a _delta_log folder exists and is populated by our control function which takes: the logger, the cancellation token, the date-timestamp, the parquet filename & bytes key-value pair, the table schema and the _delta_log folder path.

The first thing we need to do, is to find the next index to be used in the delta lake. It's mandatory that the naming of the sequence of control files is uniform with no gaps. Once we have found the next index in the sequence, we will generate a JSONL control file and we will try to upload it. As the Azure Table Storage relies on optimistic concurrency, other processes might have added the next control file in the sequence. Therefore, we will catch the provided error (Azure.RequestFailedException or System.AggregateException) and retry with the next index.

try
  let idx =
    fsc.GetPathsAsync
      ( path              = cpath
      , recursive         = false
      , cancellationToken = ct
      )
    |> fun ps ->
      ps.GetAsyncEnumerator()
      |> Seq.unfold(
        fun it ->
          let next =
            it.MoveNextAsync().AsTask()
            |> Async.AwaitTask
            |> Async.RunSynchronously
          if next then
            let name = it.Current.Name
            let json =
              name
              |> Path.GetExtension
              |> ((=) ".json")
            if not json then
              ( -1
              , it
              )
              |> Some
            else
              ( name
                |> Path.GetFileNameWithoutExtension
                |> int
              , it
              )
              |> Some
          else
            None
      )
      |> Seq.fold max (-1)
      |> ((+) 1)
    
  let jsonl =
    DeltaLake.JSONL.init
      ( log )
      ( dts )
      ( parquet.Value.LongLength )
      ( schema.GetDataFields()
        |> DeltaLake.JSONL.Schema.init log
      )
      ( parquet.Key )
      |> DeltaLake.toBytes log idx cpath
    
  let jdc =
    cpath
    |> fsc.GetDirectoryClient
    
  let jfc =
    jsonl.Key
    |> Path.GetFileName
    |> jdc.GetFileClient
    
  let _ =
            
    use ms = new MemoryStream(jsonl.Value)
            
    jfc.UploadAsync
      ( content           = ms
      , overwrite         = false
      , cancellationToken = ct
      )
    |> Async.AwaitTask
    |> Async.RunSynchronously
    |> ignore
    
  ( Date.timestamp 0
  , jsonl.Key
    |> Path.GetFileName
  )
  ||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Uploaded to the Azure Data Lake: %s"
  |> Output.stdout
with
  | :? System.AggregateException
  | :? Azure.RequestFailedException ->
    ( Date.timestamp 0
    , parquet.Key
    )
    ||> sprintf "%s | net.pandora.avroidl2parquet | WARNING | DEMO | Upload retrying Azure Data Lake: %s"
    |> Output.stdout
    control log ct fsc dts parquet schema cpath
  | ex ->
    ( Date.timestamp 0
    , ex
    )
    ||> sprintf "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | Unexpected error:\n%A"
    |> failwith
JSONL control files in _delta_log folder on Azure Table Storage
Figure 5: JSONL control files in _delta_log folder on Azure Table Storage

Back to TOC

Main method

We can now bind the loop to a logic function that will help us to shutdown the script by pressing ENTER

let logic log cts amount =
  [ Async.Control.exit cts
  ; loop log cts.Token amount
  ]
  |> Async.Choice

let _ =
  
  Date.timestamp 0
  |> sprintf "%s | net.pandora.avroidl2parquet | STARTED | DEMO"
  |> Output.stdout
  
  try
    
    let sample =
      fsi.CommandLineArgs
      |> Array.skip 1
      |> fun xs ->
        if 0 < Array.length xs then
          xs.[0]
          |> int
        else
          1
    
    let cts = new CancellationTokenSource()
    let log = logger ()
    
    (* Interrupt script by pressing ENTER *)
    Date.timestamp 0
    |> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Press ENTER to exit"
    |> Output.stdout
    
    logic log cts sample
    |> Async.RunSynchronously
    |> Option.defaultValue ()
    
    Date.timestamp 0
    |> sprintf "%s | net.pandora.avroidl2parquet | STOPPED | DEMO"
    |> Output.stdout
    
    00
  with ex ->
    ( Date.timestamp 0
    , ex
    )
    ||> sprintf "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | Unexpected error:\n%A"
    |> Output.stdout
    -1

As mentioned above, the fully working script is available at: ./demo/avroidl2parquet.fsx.

Back to TOC

How to contribute

As this library is open-source, we would like for others to help us to add functionality. Therefore, we are providing a sample, where we showcase how by having access to the Abstract Syntax Tree (AST), we can easily create Entity Relationship (ER) diagrams, with the cardinality (numerical) relationship between rows of one table and rows in another, in the Graphviz DOT language. The fully working script is available at: ./demo/avroidl2dot.fsx.

Back to TOC

Package dependencies (A2D)

#r "nuget: Microsoft.Extensions.Logging,              7.00.00"
#r "nuget: Newtonsoft.Json,                          13.00.02"
#r "nuget: Pandora.Apache.Avro.IDL.To.Apache.Parquet, 0.11.26"

For this demo script, besides our own package, we will need the following Microsoft package:

  • Microsoft.Extensions.Logging: Our library needs an instance of an ILogger.

Furthermore, we will also need:

  • Newtonsoft.Json: This package is needed to parse and pass the AVRO schema to transform it into a PARQUET schema.

Back to TOC

Package imports (A2D)

Once we have added the packages to our script, we can then import the following namespaces:

open System
open System.IO
open System.Text

open Microsoft.Extensions.Logging

open Newtonsoft.Json.Linq

open Pandora.Apache
open Pandora.Utils

Back to TOC

Logger

As our library require to pass an ILogger we can easily create one as:

let logger () =
  let lf = new LoggerFactory ()
  lf.CreateLogger ()

Back to TOC

isNullable and fieldToType

In order to represent correctly the ER-diagram cardinality and ordinality, we will need to use the following two functions:

let isNullable = function
  | Parquet.Schema.Ast.Type.NULL      ->
    true
  | Parquet.Schema.Ast.Type.UNION fts ->
    ( fts
      |> Seq.exists (
        function
          | Parquet.Schema.Ast.Type.NULL -> true
          | ____________________________ -> false
      )
    ) &&
    ( fts
      |> Seq.filter (
        function
          | Parquet.Schema.Ast.Type.NULL -> false
          | ____________________________ -> true
      )
      |> Seq.length = 1
    )
  | _________________________________ ->
    false

let rec fieldToType = function
  (* # PRIMITIVE TYPES *)
  (* > NOTE: We can't specify `unspecified` as `null` so we rely on an nullable string *)
  | Parquet.Schema.Ast.Type.NULL           -> "string"
  | Parquet.Schema.Ast.Type.BOOLEAN        -> "boolean"
  | Parquet.Schema.Ast.Type.INT            -> "integer"
  | Parquet.Schema.Ast.Type.LONG           -> "long"
  | Parquet.Schema.Ast.Type.FLOAT          -> "float"
  | Parquet.Schema.Ast.Type.DOUBLE         -> "double"
  | Parquet.Schema.Ast.Type.BYTES          -> "binary"
  | Parquet.Schema.Ast.Type.STRING         -> "string"
  (* # LOGICAL TYPES *)
  | Parquet.Schema.Ast.Type.DATE           -> "date"
  | Parquet.Schema.Ast.Type.DECIMAL   _    -> "decimal"
  | Parquet.Schema.Ast.Type.TIMESTAMP_MS   -> "timestamp"
  | Parquet.Schema.Ast.Type.TIME_MS        -> "long"
  (* # COMPLEX TYPES *)
  (* > NOTE: Array and Maps are re-factored to a Record type *)
  | Parquet.Schema.Ast.Type.ARRAY     _
  | Parquet.Schema.Ast.Type.MAP       _    -> String.Empty
  | Parquet.Schema.Ast.Type.UNION     ts   ->
    ts
    |> Seq.filter (
      function
        | Parquet.Schema.Ast.Type.NULL     -> false
        | ____________________________     -> true
    )
    |> Seq.head
    |> fieldToType
  (* # NAMED SCHEMA TYPES *)
  | Parquet.Schema.Ast.Type.ENUM      fqdn ->
    fqdn
    |> Parquet.Schema.Ast.Fqdn.toString 
    |> sprintf "string (enum = %s)"
  (* > NOTE: Errors are Record types *)
  | Parquet.Schema.Ast.Type.ERROR     _    -> String.Empty
  | Parquet.Schema.Ast.Type.FIXED     _    -> "binary"
  (* > NOTE: Record types are filtered out of field types *)
  | Parquet.Schema.Ast.Type.RECORD    _    -> String.Empty
ER-diagram cardinality and ordinality
Figure 6: ER-diagram cardinality and ordinality

Back to TOC

Iterating over local AVSC files

For this demo sample will be limited to a single AVRO IDL, which was converted to the following AVSC ../avro/avsc/Interop.avsc file:

Directory.GetFiles
  ( Path.Combine
      ( __SOURCE_DIRECTORY__
      , @"../avro/avsc/"
      )
  // , "*.avsc" // NOTE: Retrieve all AVSC files from the folder
  , @"Interop.avsc"
  , SearchOption.TopDirectoryOnly
  )
|> …

NOTE: As you can see from above, it's very easy to iterate over all AVSC files in the folder.

Back to TOC

Generate Directed Graphs

Firstly, we will define the values for the environment and the AST:

let env = Parquet.Schema.Ast.Environment.empty ()
let ast = Parquet.Schema.Ast.empty ()

We will then read the AVRO schema from the file:

let schema = File.ReadAllText f

and we will then transform it to a PARQUET schema, which if successfully parsed, will will generate a directed graph:

schema
|> JToken.Parse
|> Avro.Schema.toParquetSchema log None env ast
|> fun (env', ast', es) ->
  if Seq.isEmpty es then
    let tabs = Parquet.Tables.update log None ast
    let _ =
      use fs =
        new FileStream
          ( path =
              Path.Combine
                ( __SOURCE_DIRECTORY__
                , @"dots"
                , Path.GetFileNameWithoutExtension f
                  |> sprintf "%s.dot" 
                )
          , mode = FileMode.OpenOrCreate
          )
      use sw =
        new StreamWriter
          ( stream   = fs
          , encoding = UTF8Encoding false
          )
      sprintf "digraph er {"
      |> sw.WriteLine

      sprintf "  /* Graph */"
      |> sw.WriteLine
      sprintf "  graph[rankdir=RL, overlap=false, splines=polyline]"
      |> sw.WriteLine
      sprintf "  /* Vertices */"
      |> sw.WriteLine
      
      ast'
      |> Seq.sortBy (fun ts -> Parquet.Schema.Ast.Fqdn.``namespace`` ts.Key)
      |> Seq.iter (
        fun ts ->
          let fqdn = Parquet.Schema.Ast.Fqdn.toString ts.Key
          let name = fqdn.Replace('.','_')
          let hash =
            tabs.[fqdn].Schema.ToString()
            |> Hash.SHA256.sum
          seq {
            yield
              ( sprintf "%s [shape=record, label=\"%s (%s)|" name fqdn hash
              )
            yield
              ( seq {
                  yield "<pj_uid> pj_uid: bytearray (nullable = false)"
                  yield "<pj_pds> pj_pds: datetimeoffset (nullable = false)"
                  yield "<pj_sha> pj_sha: bytearray (nullable = true)"
                  yield "<pj_dts> pj_dts: datetimeoffset (nullable = false)"
                  yield "<pj_pid> pj_pid: bytearray (nullable = true)"
                  yield "<pj_fid> pj_fid: string (nullable = true)"
                  yield!
                    ( ts.Value
                      |> Seq.filter (
                        fun fs ->
                          match fs.Value with
                            | Parquet.Schema.Ast.Type.RECORD _ -> false
                            | ________________________________ -> true
                      )
                      |> Seq.sortBy (fun fs -> fs.Key)
                      |> Seq.map(
                        fun fs ->
                          let uid = fs.Key.Replace('.','_')
                          let typ = fieldToType fs.Value
                          let fid = 
                            fs.Key.Replace('.','/')
                            |> Path.GetFileName
                          let isn = isNullable fs.Value
                          sprintf "<%s> %s: %s (nullable = %b)" uid fid typ isn
                      )
                    )
                }
                |> Seq.reduce (sprintf "%s|%s")
              )
            yield
              ( sprintf "\"]"
              )
          }
          |> Seq.fold ((+)) String.Empty
          |> sprintf "  %s"
          |> sw.WriteLine
      )
        
      sprintf "  /* Edges */"
      |> sw.WriteLine
        
      ast'
      |> Seq.sortBy (fun ts -> Parquet.Schema.Ast.Fqdn.``namespace`` ts.Key)
      |> Seq.iter (
        fun ts ->
          let fqdn = Parquet.Schema.Ast.Fqdn.toString ts.Key
          let name = fqdn.Replace('.','_')
          seq {
            yield!
              ( ts.Value
                |> Seq.map (
                  fun fs ->
                    match fs.Value with
                      | Parquet.Schema.Ast.Type.RECORD (fqdn', otrans) ->
                        Some
                          ( fs.Key
                          , fqdn'
                          , otrans
                          , isNullable fs.Value
                          )
                      | ______________________________________________ ->
                        None
                )
                |> Seq.choose id
                |> Seq.sortBy (fun (key,_,_,_) -> key)
                |> Seq.map(
                  fun (key, fqdn', otrans, nullable) ->
                    let both = "dir=both"
                    let head = "arrowhead=none"
                    let fqdn'' = Parquet.Schema.Ast.Fqdn.toString fqdn'
                    let name'   = fqdn''.Replace('.','_')
                    let card =
                      match otrans with
                        | Some trans ->
                          match trans with
                            | Parquet.Schema.Ast.Type.Transformation.NULLABLE -> "noneteeodot"
                            | Parquet.Schema.Ast.Type.Transformation.ARRAY
                            | Parquet.Schema.Ast.Type.Transformation.MAP      -> "invodot"
                            | Parquet.Schema.Ast.Type.Transformation.UNION  _ ->
                              if nullable then
                                "noneteeodot"
                              else
                                "noneteetee"
                        | None       ->
                          if nullable then
                            "noneteeodot"
                          else
                            "noneteetee"
                    key.Replace('.','/')
                    |> Path.GetFileName
                    |> sprintf "%s:pj_pid -> %s:pj_uid [ %s, %s, arrowtail=%s, label=\"pj_fid = %s\" ];"
                        name' name
                        head both
                        card
                )
              )
          }
          |> Seq.fold (sprintf "%s\n  %s") String.Empty
          |> fun cs ->
            if String.IsNullOrEmpty cs then
              ()
            else
              cs
              |> sprintf "  %s"
              |> sw.WriteLine
      )
        
      sprintf "}"
      |> sw.WriteLine
              
      sw.Flush()
      fs.Flush()
      sw.Close()
      fs.Close()
              
    ( Date.timestamp                   0
    , Path.GetFileNameWithoutExtension f
    )
    ||> sprintf "%s | net.pandora.avroidl2dot | VERBOSE | DOTS | Created the ER-diagram for: %s"
    |> Output.stdout
  else
    es
    |> Seq.iter (
      fun e ->
        ( Date.timestamp 0
        , e
        )
        ||> sprintf "%s | net.pandora.avroidl2dot | FAILURE | DOTS | Unexpected error:\n- %s"
        |> Output.stdout
    )

NOTE: Generated syntax is bound to the Graphviz DOT language.

As mentioned above, the fully working script is available at: ./demo/avroidl2dot.fsx.

Back to TOC

Generate SVG and PNG files

Once the DOT files have been generated, then with the Graphviz tool, we can run the following bash script and generate (+vector) image files:

echo '# Generate SVG and PNG files from DOT files'
for f in $(find $dots -name "*.dot")
do
    echo "Generating SVG from $f"
    dot -T svg $f > $f.svg
    echo "Generating PNG from $f"
    dot -T png $f > $f.png
done
echo
Interop (AVRO IDL) ER-diagram with cardinality and ordinality
Figure 7: Interop (AVRO IDL) ER-diagram with cardinality and ordinality

The fully working bash script is available at: ./demo/avroidl2dot.bash.

Back to TOC

Project dependencies

Back to TOC

Library

Dependency Author License
FSharp.Core Microsoft MIT License
Apache.Avro The Apache Software Foundation Apache License 2.0
Newtonsoft.Json James Newton-King MIT License
Parquet.Net Ivan G MIT License

Back to TOC

Samples

Dependency Author License
Apache.Avro The Apache Software Foundation Apache License 2.0

Back to TOC

Unit Tests

Dependency Author License
Microsoft.NET.Test.Sdk Microsoft MIT License
coverlet.collector .NET foundation MIT License
xunit .NET foundation Apache License 2.0
xunit.runner.visualstudio .NET foundation Apache License 2.0

Back to TOC

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.11.32 225 5/10/2023
0.11.31 199 4/17/2023
0.11.30 246 3/21/2023
0.11.29 244 3/14/2023
0.11.28 251 3/6/2023
0.11.27 250 3/6/2023
0.11.26 268 3/6/2023
0.11.25 250 3/4/2023
0.11.24 264 3/4/2023
0.11.23 242 3/4/2023
0.11.22 241 2/24/2023
0.11.21 267 2/16/2023
0.11.20 259 2/16/2023
0.11.19 258 2/15/2023
0.11.18 267 2/15/2023
0.11.17 258 2/15/2023
0.11.16 250 2/15/2023
0.11.15 266 2/14/2023
0.11.14 261 2/14/2023
0.11.13 269 2/14/2023
0.11.12 268 2/14/2023
0.11.11 257 2/14/2023
0.11.10 254 2/14/2023
0.11.9 251 2/14/2023
0.11.8 255 2/14/2023
0.11.7 272 2/14/2023
0.11.6 281 2/13/2023
0.11.5 282 2/13/2023
0.11.4 293 2/8/2023
0.11.3 282 2/8/2023
0.11.2 289 2/6/2023
0.11.1 287 2/6/2023
0.11.0 305 2/3/2023